Source code for devices.xylo.syns61201.afe_samna

"""
samna-backed module for interfacing with the Xylo-A2 AFE HW module
"""

import time
import warnings

import samna

from samna.afe2.configuration import AfeConfiguration as AFE2Configuration
from samna.afe2 import validate_configuration

from rockpool.nn.modules.module import Module
from rockpool.parameters import SimulationParameter
from rockpool import TSEvent
from rockpool.typehints import P_float

from . import xa2_devkit_utils as hdu
from .xa2_devkit_utils import XyloA2HDK

try:
    from tqdm.auto import tqdm
except ModuleNotFoundError:

    def tqdm(wrapped, *args, **kwargs):
        return wrapped


from typing import Union, Dict, Any, Tuple, Optional


__all__ = ["AFESamna", "load_afe_config", "save_afe_config"]


[docs]class AFESamna(Module): """ Interface to the Audio Front-End module on a Xylo-A2 HDK This module uses ``samna`` to interface to the AFE hardware on a Xylo-A2 HDK. It permits recording from the AFE hardware. To record from the module, use the :py:meth:`~.AFESamna.evolve` method. You need to pass this method an empty matrix, with the desired number of time-steps. The time-step ``dt`` is specified at module instantiation. A simulation of the module is available in :py:class:`.AFESim`. Warnings: This module does not currently support manual configuration. A fixed configuration is provided which uses auto-calibration, applied when the module is instantiated. This takes approximately 50 seconds to configure, leading to slow instantiation. See Also: For information about the Audio Front-End design, and examples of using :py:class:`.AFESim` for a simulation of the AFE, see :ref:`/devices/analog-frontend-example.ipynb`. Examples: Instantiate an AFE module, connected to a Xylo-A2 HDK >>> from rockpool.devices.xylo import AFESamna >>> import rockpool.devices.xylo.syns61201.xa2_devkit_utils as xdu >>> afe_hdks = xdu.find_xylo_a2_boards() >>> afe = AFESamna(afe_hdks[0], dt = 10e-3) Use the module to record some audio events >>> import numpy as np >>> audio_events = afe(np.zeros([0, 100, 0])) """
[docs] def __init__( self, device: XyloA2HDK, config: Optional[AFE2Configuration] = None, dt: float = 1e-3, auto_calibrate: bool = False, amplify_level: str = "low", change_count: Optional[int] = None, hibernation_mode: bool = False, divisive_norm: bool = False, divisive_norm_params: Optional[dict] = {}, calibration_params: Optional[dict] = {}, read_register: bool = False, *args, **kwargs, ): """ Instantiate an AFE module, via a samna backend Args: device (AFE2HDK): A connected AFE2 HDK device. config (AFE2Configuration): A samna AFE2 configuration object. dt (float): The desired spike time resolution in seconds. auto_calibrate (bool): If True, will apply auto-calibration. amplify_level(str): The level of volume gain. Defaul "low" is the one without gain. change_count (int): If is not None, AFE event counter will change from outputting 1 spike out of 4 into outputting 1 out of change_count. hibernation_mode (bool): If True, hibernation mode will be switched on, which only outputs events if it receives inputs above a threshold. divisive_norm (bool): If True, divisive normalization will be switched on. divisive_norm_params (Dict): Specify the divisive normalization parameters, should be structured as {"s": , "p": , "iaf_bias": }. calibration_params (Dict): Specify the calibration parameters. read_register (bool): If True, will print all register values of AFE after initialization. """ # - Check input arguments if device is None: raise ValueError( "`device` must be a valid, opened Xylo AFE V2 HDK self._device." ) # - Check params dict if (type(divisive_norm_params).__name__ != "dict") or ( type(calibration_params).__name__ != "dict" ): raise ValueError( "`divisive_norm_params` and `calibration_params` must be dict." ) # - Get a default configuration if config is not None: manual_config = True print("Setting a manual configuration...") else: manual_config = False config = samna.afe2.configuration.AfeConfiguration() # - Determine how many output channels we have Nout = len(config.analog_top.channels) # - Initialise the superclass super().__init__(shape=(0, Nout), spiking_input=True, spiking_output=True) # - Store the HDK device node self._device = device # - Store the dt parameter self.dt: P_float = SimulationParameter(dt) # - Create write and read buffers self._xylo_core_read_buf = hdu.Xylo2ReadBuffer() graph = samna.graph.EventFilterGraph() graph.sequential( [self._device.get_xylo_model_source_node(), self._xylo_core_read_buf] ) self._afe_read_buf = hdu.AFE2ReadBuffer() graph = samna.graph.EventFilterGraph() graph.sequential([self._device.get_afe_model_source_node(), self._afe_read_buf]) self._afe_write_buf = hdu.AFE2WriteBuffer() graph = samna.graph.EventFilterGraph() graph.sequential([self._afe_write_buf, self._device.get_afe_model_sink_node()]) # - Check that we have a correct device version self._chip_version, self._chip_revision = hdu.read_afe2_module_version( self._afe_read_buf, self._afe_write_buf ) if self._chip_version != 1 or self._chip_revision != 0: raise ValueError( f"AFE version is {(self._chip_version, self._chip_revision)}; expected (1, 0)." ) if not manual_config: # - Change counter threshold if change_count is not None: if change_count < 0: raise ValueError( f"{change_count} is negative. Must be non-negative values." ) config = hdu.config_afe_channel_thresholds(config, change_count) # - Apply auto-calibration if auto_calibrate is set to True if auto_calibrate: self._auto_calibration( self._device, config, calibration_params, apply_config=False ) # - Amplify input volume config = hdu.config_lna_amplification(config, level=amplify_level) # - Set up divisive normalization if divisive_norm: config = hdu.DivisiveNormalization( config=config, **divisive_norm_params, ) # - Set up hibernation mode if hibernation_mode: config = hdu.config_AFE_hibernation(config) config.aer_2_saer.hibernation.mode = 2 config.aer_2_saer.hibernation.reset = 1 # - Apply configuration self._device.get_afe_model().apply_configuration(config) self._config = config # - Read all registers if read_register: hdu.read_all_afe2_register(self._afe_read_buf, self._afe_write_buf)
[docs] def _auto_calibration( self, device: XyloA2HDK, config: AFE2Configuration, calibration_params: dict, apply_config: bool = True, ) -> None: """ Perform AFE auto-calibration. Args: device (XyloA2HDK): A connected AFE2 HDK device config (AFE2Configuration): A configuration for AFE calibration_params (Dict): Specify the calibration parameters apply_config (bool): If True, will apply configuration to AFE """ print("Configuring AFE...") config = hdu.apply_afe2_default_config( afe2hdk=device, config=config, **calibration_params, ) print("Configured AFE") if apply_config: device.get_afe_model().apply_configuration(config)
[docs] def evolve(self, input_data, record: bool = False) -> Tuple[Any, Any, Any]: """ Use the AFE HW module to record live audio and return as encoded events Args: input_data (np.ndarray): An array ``[0, T, 0]``, specifying the number of time-steps to record. Returns: (np.ndarray, dict, dict) output_events, {}, {} """ # - Handle auto batching input_data, _ = self._auto_batch(input_data) # - For how long should we record? duration = input_data.shape[1] * self.dt # - Record events timestamps, channels = hdu.read_afe2_events_blocking( self._device, self._afe_write_buf, self._afe_read_buf, duration ) # - Convert to an event raster events_ts = TSEvent( timestamps, channels, t_start=0.0, t_stop=duration, num_channels=self.size_out, ).raster(self.dt, add_events=True) # - Return output, state, record dict return events_ts, self.state(), {}
@property def _version(self) -> Tuple[int, int]: """ Return the version and revision numbers of the connected Xylo-AFE2 chip Returns: (int, int): version, revision """ return (self._chip_version, self._chip_revision)
[docs] def save_config(self, filename): """ Save an AFE configuration to disk in JSON format Args: filename (str): The filename to write to """ save_afe_config(self._config, filename)
def load_afe_config(filename: str) -> AFE2Configuration: """ Read an AFE configuration from disk in JSON format Args: filename (str): The filename to read from Returns: `AFE2Configuration`: The configuration loaded from disk """ # - Create a new config object conf = AFE2Configuration() # - Read the configuration from file with open(filename) as f: conf.from_json(f.read()) # - Return the configuration return conf def save_afe_config(config: AFE2Configuration, filename: str) -> None: """ Save an AFE configuration to disk in JSON format Args: config (AFE2Configuration): The configuration to write filename (str): The filename to write to """ with open(filename, "w") as f: f.write(config.to_json())