Source code for devices.xylo.syns63300.imu_monitor

"""
Samna-backed bridge to Xylo dev kit for Xylo IMU
"""

# - Samna imports
import samna
from samna.xyloImu.configuration import XyloConfiguration

from . import xylo_imu_devkit_utils as hdkutils
from .xylo_imu_devkit_utils import XyloIMUHDK, set_xylo_core_clock_freq
from .imuif_sim import IMUIFSim

import time
import numpy as np
from rockpool.nn.modules.module import Module
from rockpool.parameters import SimulationParameter

# - Typing
from typing import Optional, Union, Callable, List, Tuple
from warnings import warn

try:
    from tqdm.auto import tqdm, trange
except ModuleNotFoundError:

    def tqdm(wrapped, *args, **kwargs):
        return wrapped

    def trange(obj, *args, **kwargs):
        return range(obj)


# - Configure exports
__all__ = ["XyloIMUMonitor"]

Default_Main_Clock_Rate = 50.0  # 50 MHz


[docs]class XyloIMUMonitor(Module): """ A spiking neuron :py:class:`.Module` backed by the Xylo-IMU hardware, via `samna`. :py:class:`.XyloIMUMonitor` operates continuously in real-time, receiving and processing data from an IMU sensor with the deployed SNN. Results are continuously output from the HDK and buffered. On evolution, :py:class:`.XyloIMUMonitor` returns a chunk of buffered processed time of a specified duration. Use :py:func:`~.devices.xylo.syns63300.config_from_specification` to build and validate a configuration for Xylo. .. Warning:: :py:class:`.XyloIMUMonitor` blocks FPGA access to the IMU sensor on the Xylo HDK, if ``prerecorded_imu_input = False``, because it connects the IMU sensor directly to Xylo. This means that other modules such as :py:class:`.IMUData` that attempt to connect to the IMU sensor may fail. :py:class:`.XyloIMUMonitor` will reset the HDK on deletion, releasing the IMU sensor for use. >>> mod = XyloIMUMonitor(hdk, ...) >>> del mod """
[docs] def __init__( self, device: XyloIMUHDK, config: Optional[XyloConfiguration] = None, output_mode: str = "Spike", prerecorded_imu_input: bool = False, main_clk_rate: float = Default_Main_Clock_Rate, hibernation_mode: bool = False, interface_params: dict = dict(), power_frequency: float = 5.0, *args, **kwargs, ): """ Instantiate a Module with Xylo dev-kit backend. Args: device (XyloIMUHDK): An opened `samna` device to a Xylo dev kit config (XyloConfiguraration): A Xylo configuration from `samna` output_mode (str): The readout mode for the Xylo device. This must be one of ``["Spike", "Vmem"]``. Default: "Spike", return events from the output layer. prerecorded_imu_input (bool): If ``True``, use prerocorded imu data from PC as input. If ``False``, use the live IMU sensor on the HDK. Default: ``False``, use the IMU sensor. main_clk_rate (float): The main clock rate of Xylo, in MHz hibernation_mode (bool): If True, hibernation mode will be switched on, which only outputs events if it receives inputs above a threshold. interface_params(dict): The dictionary of Xylo interface parameters used for the `hdkutils.config_if_module` function, the keys of which must be "num_avg_bitshif", "select_iaf_output", "sampling_period", "filter_a1_list", "filter_a2_list", "scale_values", "Bb_list", "B_wf_list", "B_af_list", "iaf_threshold_values". power_frequency (float): The frequency of power measurement. Default: 5.0 """ # - Check input arguments if device is None: raise ValueError("`device` must be a valid, opened Xylo HDK device.") # - Check output mode specification if output_mode not in ["Spike", "Vmem"]: raise ValueError( f'{output_mode} is not supported. Must be one of `["Spike", "Vmem"]`.' ) self._output_mode = output_mode # - Get a default configuration if config is None: config = samna.xyloImu.configuration.XyloConfiguration() # - Get the network shape Nin, Nhidden = np.shape(config.input.weights) _, Nout = np.shape(config.readout.weights) # - Register buffers to read and write events, monitor state self._read_buffer = hdkutils.new_xylo_read_buffer(device) self._write_buffer = hdkutils.new_xylo_write_buffer(device) # - Build a filter graph to filter `Readout` events from Xylo IMU self._readout_graph = samna.graph.EventFilterGraph() _, etf0, self._readout_buffer = self._readout_graph.sequential( [ device.get_model_source_node(), "XyloImuOutputEventTypeFilter", samna.graph.JitSink(), ] ) etf0.set_desired_type("xyloImu::event::Readout") self._readout_graph.start() # - Initialise the superclass super().__init__(shape=(3, Nout), spiking_input=False, spiking_output=True) # - Store the device self._device: XyloIMUHDK = device """ `.XyloHDK`: The Xylo HDK used by this module """ # - Store the configuration (and apply it) self.config: Union[ XyloConfiguration, SimulationParameter ] = SimulationParameter(shape=(), init_func=lambda _: config) """ `XyloConfiguration`: The HDK configuration applied to the Xylo module """ # - Enable hibernation mode if hibernation_mode: self.config.enable_hibernation_mode = True # - Store the timestep self.dt: Union[float, SimulationParameter] = ( 1 / 200 ) # Fixed computation step rate of 200Hz for Xylo IMU """ float: Simulation time-step of the module, in seconds """ # - Store the io module self._io = self._device.get_io_module() # - Store the choice of external imu input self._external_imu_input = prerecorded_imu_input # - Select source of IMU accel data input if prerecorded_imu_input: self._device.enable_manual_input_acceleration(True) else: self._device.enable_manual_input_acceleration(False) # - Set main clock rate in MHz self._main_clk_rate: float = set_xylo_core_clock_freq( self._device, main_clk_rate ) """ float: Xylo main clock frequency in MHz """ # - Configure to auto mode self._enable_realtime_mode(interface_params) # - Disable RAM access to save power hdkutils.enable_ram_access(self._device, False) # - Set power measurement module self._power_buf, self.power_monitor = hdkutils.set_power_measure( self._device, power_frequency )
@property def config(self): # - Return the configuration stored on Xylo HDK return self._device.get_model().get_configuration() @config.setter def config(self, new_config): # - Test for a valid configuration is_valid, msg = samna.xyloImu.validate_configuration(new_config) if not is_valid: raise ValueError(f"Invalid configuration for the Xylo HDK: {msg}") # - Write the configuration to the device hdkutils.apply_configuration(self._device, new_config) # - Store the configuration locally self._config = new_config
[docs] def _enable_realtime_mode(self, interface_params: dict): """ Configure the Xylo HDK to use real-time mode. Args: interface_params (dict): specify the interface parameters """ # - Config the streaming mode config = hdkutils.config_realtime_mode( self._config, self.dt, int(self._main_clk_rate * 1e6), ) # - Config the IMU interface and apply current configuration config.input_interface = IMUIFSim(**interface_params).export_config() self.config = config
def __del__(self): """ Delete the XyloIMUMonitor object and reset the HDK. """ # - Reset the HDK to clean up self._device.reset_board_soft()
[docs] def evolve( self, input_data: np.ndarray, record: bool = False, record_power: bool = False, read_timeout: Optional[float] = None, ) -> Tuple[np.ndarray, dict, dict]: """ Evolve a network on the Xylo HDK in Real-time mode. Args: input_data (np.ndarray): An array ``[T, 3]``, specifying the number of time-steps to record. If using external imu data input, the `input_data` is the external imu data. The first dimension is timesteps, and the last dimension is 3 channels of accelerations along x, y, z axes. record (bool): ``False``, do not return a recording dictionary. Recording internal state is not supported by :py:class:`.XyloIMUMonitor` record_power (bool): If ``True``, record the power consumption during each evolve. read_timeout (float): A duration in seconds for a read timeout. Default: 2x the real-time duration of the evolution Returns: Tuple[np.ndarray, dict, dict] output_events, {}, rec_dict output_events is an array that stores the output events of T time-steps """ # - Check `record` flag if record: raise ValueError( "Recording internal state is not supported by XyloIMUMonitor." ) # - Get data shape input_data, _ = self._auto_batch(input_data) Nb, Nt, Nc = input_data.shape if Nb > 1: raise ValueError( f"Batched data are not supported by XyloIMUMonitor. Got batched input data with shape {[Nb, Nt, Nc]}." ) if self._external_imu_input and Nc != 3: raise ValueError( f"When providing IMU input data, 3 channels of input are required. Received input with shape [{Nt, Nc}]." ) # - Discard the batch dimension input_data = input_data[0] # - Get the current time step, determine duration start_timestep = ( hdkutils.get_current_timestep(self._read_buffer, self._write_buffer) + 1 ) end_timestep = start_timestep + Nt - 1 # - Determine a read timeout read_timeout = 3 * Nt * self.dt if read_timeout is None else read_timeout # - Send external IMU input to Xylo, if requested if self._external_imu_input: # - Ensure configuration of Xylo hdkutils.apply_configuration(self._device, self._config) # - Encode IMU data and send to FPGA imu_input = hdkutils.encode_imu_data(input_data) self._readout_buffer.get_events() self._write_buffer.write(imu_input) # - Clear the power recording buffer, if recording power if record_power: self._power_buf.clear_events() # - Process in real-time mode for a desired number of time steps self._write_buffer.write( [samna.xyloImu.event.TriggerProcessing(target_timestep=end_timestep + 1)] ) # - Blocking read of events until simulation is finished read_events, is_timeout = hdkutils.blocking_read( self._readout_buffer, target_timestep=end_timestep, timeout=read_timeout ) if is_timeout: raise TimeoutError( f"Reading events timeout after {read_timeout} seconds. Read {len(read_events)} events, expected {Nt}. Last event timestep: {read_events[-1].timestep if len(read_events) > 0 else 'None'}, waiting for timestep {end_timestep}." ) rec_dict = {} if record_power: # - Get all recent power events from the power measurement ps = self._power_buf.get_events() # - Separate out power meaurement events by channel channels = samna.xyloImuBoards.MeasurementChannels io_power = np.array([e.value for e in ps if e.channel == int(channels.Io)]) core_power = np.array( [e.value for e in ps if e.channel == int(channels.Core)] ) rec_dict.update( { "io_power": io_power, "core_power": core_power, } ) # - Decode data read from Xylo vmem_out_ts, spike_out_ts = hdkutils.decode_realtime_mode_data( read_events, self.size_out, start_timestep, end_timestep ) out = vmem_out_ts if self._output_mode == "vmem" else spike_out_ts return out, {}, rec_dict