"""
Samna-backed bridge to Xylo dev kit for SYNS63300 Xylo IMU
"""
import numpy as np
import samna
from samna.xyloImu.configuration import XyloConfiguration, InputInterfaceConfig
from rockpool.nn.modules.module import Module
from rockpool.parameters import SimulationParameter
from . import xylo_imu_devkit_utils as hdkutils
from .xylo_imu_devkit_utils import XyloIMUHDK
# - Typing
from typing import Optional, Union, Callable, List, Tuple
from warnings import warn
import time
try:
from rich import print
except:
pass
# - Configure exports
__all__ = [
"config_from_specification",
"save_config",
"load_config",
"XyloSamna",
]
[docs]def config_from_specification(
weights_in: np.ndarray,
weights_out: np.ndarray,
weights_rec: Optional[np.ndarray] = None,
dash_mem: Optional[np.ndarray] = None,
dash_mem_out: Optional[np.ndarray] = None,
dash_syn: Optional[np.ndarray] = None,
dash_syn_out: Optional[np.ndarray] = None,
threshold: Optional[np.ndarray] = None,
threshold_out: Optional[np.ndarray] = None,
bias_hidden: Optional[np.ndarray] = None,
bias_out: Optional[np.ndarray] = None,
weight_shift_in: int = 0,
weight_shift_rec: int = 0,
weight_shift_out: int = 0,
aliases: Optional[List[List[int]]] = None,
*args,
**kwargs,
) -> XyloConfiguration:
"""
Convert a full network specification to a xylo config and validate it
See Also:
For detailed information about the networks supported on Xylo, see :ref:`/devices/xylo-overview.ipynb`
Args:
weights_in (np.ndarray): A quantised 8-bit input weight matrix ``(Nin, Nin_res, 1)``. The third dimension specifies connections onto the second input synapse for each neuron. ``Nin_res`` indicates the number of hidden-layer neurons that receive input from the input channels.
weights_out (np.ndarray): A quantised 8-bit output weight matrix ``(Nhidden, Nout)``.
weights_rec (np.ndarray): A quantised 8-bit recurrent weight matrix ``(Nhidden, Nhidden, 1)``. The third dimension specified connections onto the second input synapse for each neuron. Default: ``0``
dash_mem (np.ndarray): A vector or list ``(Nhidden,)`` specifing decay bitshift for neuron state for each hidden layer neuron. Default: ``1``
dash_mem_out (np.ndarray): A vector or list ``(Nout,)`` specifing decay bitshift for neuron state for each output neuron. Default: ``1``
dash_syn (np.ndarray): A vector or list ``(Nhidden,)`` specifing decay bitshift for synapse 1 state for each hidden layer neuron. Default: ``1``
dash_syn_out (np.ndarray): A vector or list ``(Nout,)`` specifing decay bitshift for synapse state for each output layer neuron. Default: ``1``
threshold (np.ndarray): A vector or list ``(Nhidden,)`` specifing the firing threshold for each hidden layer neuron. Default: ``0``
threshold_out (np.ndarray): A vector or list ``(Nout,)`` specifing the firing threshold for each output layer neuron. Default: ``0``
bias_hidden (np.ndarray): A vector or list ``(Nhidden,)`` specifing the bias for each hidden layer neuron. Default: ``0``
bias_out (np.ndarray): A vector or list ``(Nout,)`` specifing the bias for each output layer neuron. Default: ``0``
weight_shift_in (int): The number of bits to left-shift each input weight. Default: ``0``
weight_shift_rec (int): The number of bits to left-shift each recurrent weight. Default: ``0``
weight_shift_out (int): The number of bits to left-shift each output layer weight. Default: ``0``
aliases (Optional[List[List[int]]]): For each neuron in the hidden population, a list containing the alias targets for that neuron
Returns: (:py:class:`.samna.xyloImu.XyloConfiguration`, bool, str): config, is_valid, message
``config`` will be a `XyloConfiguration`.
``is_valid`` will be a boolean flag ``True`` iff the configuration is valid.
``message`` will be an empty string if the configuration is valid, or a message indicating why the configuration is invalid.
"""
# - Check input weights
if weights_in.ndim < 2:
raise ValueError(
f"Input weights must be at least 2 dimensional `(Nin, Nin_res [, 1])`. Found {weights_in.shape}"
)
# - Check output weights
if weights_out.ndim != 2:
raise ValueError("Output weights must be 2 dimensional `(Nhidden, Nout)`")
# - Get network shape
if weights_in.ndim < 3:
weights_in = np.expand_dims(weights_in, -1)
Nin, NIEN, Nsyn = weights_in.shape
if weights_rec.ndim < 3:
weights_rec = np.expand_dims(weights_rec, -1)
Nhidden, _, Nsyn = weights_rec.shape
NOEN, Nout = weights_out.shape
# - Check number of input synapses
if Nsyn > 1:
raise ValueError(
f"Only 1 input synapse is supported on Xylo-IMU. Found {Nsyn}."
)
# - Check input and hidden weight sizes
if NIEN > Nhidden:
raise ValueError(
f"Input expansion weight dimension `NIEN` ({NIEN}) must be <= `Nhidden` ({Nhidden})."
)
# - Check output and hidden weight sizes
if NOEN > Nhidden:
raise ValueError(
f"Output expansion weight dimension `NOEN` ({NOEN}) must be <= `Nhidden` ({Nhidden})."
)
# - Provide default `weights_rec`
weights_rec = (
np.zeros((Nhidden, Nhidden, 1), "int") if weights_rec is None else weights_rec
)
# - Check `weights_rec`
if weights_rec.ndim != 3 or weights_rec.shape[0] != weights_rec.shape[1]:
raise ValueError(
"Recurrent weights must be of shape `(Nhidden, Nhidden [, 1])`"
)
# - Check aliases
if aliases is not None and len(aliases) != Nhidden:
raise ValueError(
f"Aliases list must have `Nhidden` entries (`Nhidden` = {Nhidden})"
)
# - Check bitshift TCs, assign defaults
dash_mem = np.ones(Nhidden, "int") if dash_mem is None else np.array(dash_mem)
dash_syn = np.ones(Nhidden, "int") if dash_syn is None else np.array(dash_syn)
if bias_hidden is not None:
bias_hidden = np.round(np.array(bias_hidden)).astype("int")
if bias_out is not None:
bias_out = np.round(np.array(bias_out)).astype("int")
if np.size(dash_mem) != Nhidden or np.size(dash_syn) != Nhidden:
raise ValueError(
f"`dash_mem`, `dash_syn` need `Nhidden` entries (`Nhidden` = {Nhidden})"
+ f" found {np.size(dash_mem)}, {np.size(dash_syn)}"
)
dash_mem_out = (
np.ones(Nout, "int") if dash_mem_out is None else np.array(dash_mem_out)
)
dash_syn_out = (
np.ones(Nout, "int") if dash_syn_out is None else np.array(dash_syn_out)
)
if np.size(dash_mem_out) != Nout or np.size(dash_syn_out) != Nout:
raise ValueError(
f"`dash_mem_out` and `dash_syn_out` need `Nout` entries (`Nout` = {Nout})"
)
# - Check thresholds, assign defaults
threshold = np.zeros(Nhidden, "int") if threshold is None else np.array(threshold)
threshold_out = (
np.zeros(Nout, "int") if threshold_out is None else np.array(threshold_out)
)
if threshold.size != Nhidden:
raise ValueError(
f"`thresholds` needs `Nhidden` entries (`Nhidden` = {Nhidden})"
)
if threshold_out.size != Nout:
raise ValueError(f"`thresholds_out` needs `Nout` entries (`Nout` = {Nout})")
# - Check data types
if (
weights_in.dtype.kind not in "ui"
or weights_rec.dtype.kind not in "ui"
or weights_out.dtype.kind not in "ui"
):
warn(
"`weights...` arguments should be provided as `int` data types. I am rounding and casting these to `int`."
)
if (
threshold.dtype.kind not in "ui"
or dash_syn.dtype.kind not in "ui"
or dash_syn_out.dtype.kind not in "ui"
or dash_mem.dtype.kind not in "ui"
or dash_mem_out.dtype.kind not in "ui"
):
warn(
"Neuron and synapse parameter arguments should be provided as `int` data types. I am rounding and casting these to `int`."
)
# - Round and cast all parameters to integer
weights_in = np.round(weights_in).astype("int8")
weights_out = np.round(weights_out).astype("int8")
weights_rec = np.round(weights_rec).astype("int8")
dash_mem = np.round(dash_mem).astype("int8")
dash_mem_out = np.round(dash_mem_out).astype("int8")
dash_syn = np.round(dash_syn).astype("int8")
dash_syn_out = np.round(dash_syn_out).astype("int8")
threshold = np.round(threshold).astype("int")
threshold_out = np.round(threshold_out).astype("int")
weight_shift_in = np.round(weight_shift_in).astype("int8")
weight_shift_rec = np.round(weight_shift_rec).astype("int8")
weight_shift_out = np.round(weight_shift_out).astype("int8")
if aliases is not None:
aliases = [np.round(a).astype("int") for a in aliases]
# - Build the configuration
config = samna.xyloImu.configuration.XyloConfiguration()
# general
config.imu_if_input_enable = False
config.debug.always_update_omp_stat = True
if bias_hidden is not None or bias_out is not None:
config.bias_enable = True
config.hidden.aliasing = aliases is not None
config.input.weight_bit_shift = weight_shift_in
config.hidden.weight_bit_shift = weight_shift_rec
config.readout.weight_bit_shift = weight_shift_out
if weights_in.shape[1] > 128:
warn(
"More than 128 input expansion neurons (IEN) detected. Only the first 128 will be used."
)
config.input.weights = weights_in[:, :128, 0]
else:
config.input.weights = weights_in[:, :, 0]
config.hidden.weights = weights_rec[:, :, 0]
if weights_out.shape[0] > 128:
warn(
"More than 128 output expansion neurons (OEN) detected. Only the last 128 will be used."
)
config.readout.weights = weights_out[-128:, :]
else:
config.readout.weights = weights_out
hidden_neurons = []
for i in range(len(weights_rec)):
neuron = samna.xyloImu.configuration.HiddenNeuron()
if aliases is not None and len(aliases[i]) > 0:
neuron.alias_target = aliases[i][0]
neuron.i_syn_decay = dash_syn[i]
neuron.v_mem_decay = dash_mem[i]
neuron.threshold = threshold[i]
if bias_hidden is not None:
neuron.v_mem_bias = bias_hidden[i]
hidden_neurons.append(neuron)
config.hidden.neurons = hidden_neurons
readout_neurons = []
for i in range(np.shape(weights_out)[1]):
neuron = samna.xyloImu.configuration.OutputNeuron()
neuron.i_syn_decay = dash_syn_out[i]
neuron.v_mem_decay = dash_mem_out[i]
neuron.threshold = threshold_out[i]
if bias_out is not None:
neuron.v_mem_bias = bias_out[i]
readout_neurons.append(neuron)
config.readout.neurons = readout_neurons
# - Validate the configuration and return
is_valid, message = samna.xyloImu.validate_configuration(config)
return config, is_valid, message
[docs]def save_config(config: XyloConfiguration, filename: str) -> None:
"""
Save a Xylo configuration to disk in JSON format
Args:
config (XyloConfiguration): The configuration to write
filename (str): The filename to write to
"""
with open(filename, "w") as f:
f.write(config.to_json())
[docs]def load_config(filename: str) -> XyloConfiguration:
"""
Read a Xylo configuration from disk in JSON format
Args:
filename (str): The filename to read from
Returns:
`.XyloConfiguration`: The configuration loaded from disk
"""
# - Create a new config object
conf = XyloConfiguration()
# - Read the configuration from file
with open(filename) as f:
conf.from_json(f.read())
# - Return the configuration
return conf
[docs]class XyloSamna(Module):
"""
A spiking neuron :py:class:`.Module` backed by the Xylo hardware, via `samna`.
Use :py:func:`.config_from_specification` to build and validate a configuration for Xylo.
See Also:
See the tutorials :ref:`/devices/xylo-imu/xylo-imu-intro.ipynb` and :ref:`/devices/torch-training-spiking-for-xylo.ipynb` for a high-level overview of building and deploying networks for Xylo.
"""
[docs] def __init__(
self,
device: XyloIMUHDK,
config: XyloConfiguration = None,
dt: float = 1e-3,
output_mode: str = "Spike",
power_frequency: Optional[float] = 5.0,
*args,
**kwargs,
):
"""
Instantiate a Module with Xylo dev-kit backend
Args:
device (XyloIMUHDK): An opened `samna` device to a Xylo dev kit
config (XyloConfiguration): A Xylo configuration from `samna`
dt (float): The simulation time-step to use for this Module
output_mode (str): The readout mode for the Xylo device. This must be one of ``["Spike", "Isyn", "Vmem"]``. Default: "Spike", return events from the output layer.
power_frequency (float): The frequency of power measurement. Default: 5.0
"""
# - Check input arguments
if device is None:
raise ValueError("`device` must be a valid, opened Xylo HDK device.")
# - Check output mode specification
if output_mode not in ["Spike", "Vmem", "Isyn"]:
raise ValueError(
f'{output_mode} is not supported. Must be one of `["Spike", "Vmem", "Isyn"]`.'
)
self._output_mode = output_mode
# - Get a default configuration
if config is None:
config = samna.xyloImu.configuration.XyloConfiguration()
# - Get the network shape
Nin, Nhidden = np.shape(config.input.weights)
_, Nout = np.shape(config.readout.weights)
# - Initialise the superclass
super().__init__(
shape=(Nin, Nhidden, Nout), spiking_input=True, spiking_output=True
)
# - Store the device
self._device: XyloIMUHDK = device
""" `.XyloHDK`: The Xylo HDK used by this module """
# - Register buffers to read and write events
self._read_buffer = hdkutils.new_xylo_read_buffer(device)
""" `.XyloIMUReadBuffer`: The read buffer for the connected HDK """
self._write_buffer = hdkutils.new_xylo_write_buffer(device)
""" `.XyloIMUWriteBuffer`: The write buffer for the connected HDK """
# - Store the timestep
self.dt: Union[float, SimulationParameter] = dt
""" float: Simulation time-step of the module, in seconds """
# - Store the configuration (and apply it)
self.config: Union[
XyloConfiguration, SimulationParameter
] = SimulationParameter(shape=(), init_func=lambda _: config)
""" `.XyloConfiguration`: The HDK configuration applied to the Xylo module """
# - Keep a registry of the current recording mode, to save unnecessary reconfiguration
self._last_record_mode: Optional[bool] = None
""" bool: The most recent (and assumed still valid) recording mode """
# - Store the timestep
self.dt: Union[float, SimulationParameter] = dt
""" float: Simulation time-step of the module, in seconds """
# - Set power measurement module
self._power_buf, self.power = hdkutils.set_power_measure(
self._device, power_frequency
)
@property
def config(self):
# - Return the configuration stored on Xylo HDK
return self._device.get_model().get_configuration()
@config.setter
def config(self, new_config):
# - Test for a valid configuration
is_valid, msg = samna.xyloImu.validate_configuration(new_config)
if not is_valid:
raise ValueError(f"Invalid configuration for the Xylo HDK: {msg}")
# - Write the configuration to the device
hdkutils.apply_configuration(self._device, new_config)
self._config = new_config
[docs] def _config_hibernation_mode(self):
"""
Configure the Xylo HDK to use hibernation mode
"""
self.config = hdkutils.config_hibernation_mode(self._config, True)
[docs] def evolve(
self,
input: np.ndarray,
record: bool = False,
record_power: bool = False,
read_timeout: Optional[float] = None,
*args,
**kwargs,
) -> Tuple[np.ndarray, dict, dict]:
"""
Evolve a network on the Xylo HDK in accelerated-time mode
Sends a series of events to the Xylo HDK, evolves the network over the input events, and returns the output events produced during the input period. Optionally record internal state of the network, selectable with the ``record`` flag.
Args:
input (np.ndarray): A raster ``(T, Nin)`` specifying for each bin the number of input events sent to the corresponding input channel on Xylo, at the corresponding time point. Up to 15 input events can be sent per bin.
record (bool): Iff ``True``, record and return all internal state of the neurons and synapses on Xylo. Default: ``False``, do not record internal state.
record_power (bool): Iff ``True``, record the power consumption during each evolve.
read_timeout (Optional[float]): Set an explicit read timeout for the entire simulation time. This should be sufficient for the simulation to complete, and for data to be returned. Default: ``None``, set a reasonable default timeout.
Returns:
(np.ndarray, dict, dict): ``output``, ``new_state``, ``rec_dict``.
``output`` is a raster ``(T, Nout)``, containing events for each channel in each time bin. Time bins in ``output`` correspond to the time bins in ``input``.
``new_state`` is an empty dictionary. The Xylo HDK does not permit querying or setting state.
``rec_dict`` is a dictionary containing recorded internal state of Xylo during evolution, if the ``record`` argument is ``True``. Otherwise this is an empty dictionary.
"""
# - Get the network size
Nin, Nhidden, Nout = self.shape[:]
# - Configure the recording mode
self._configure_accel_time_mode(Nhidden, Nout, record)
Nhidden_monitor = Nhidden if record else 0
Nout_monitor = Nout if record or self._output_mode == "Isyn" else 0
# - Manage RAM access
if Nhidden_monitor > 0 or Nout_monitor > 0:
hdkutils.enable_ram_access(self._device, True)
else:
hdkutils.enable_ram_access(self._device, False)
start_timestep = (
hdkutils.get_current_timestep(self._read_buffer, self._write_buffer) + 1
)
final_timestep = start_timestep + len(input) - 1
# -- Encode input events
input_events_list = []
# - Locate input events
spikes = np.argwhere(input)
counts = input[np.nonzero(input)]
# - Generate input events
for timestep, channel, count in zip(spikes[:, 0], spikes[:, 1], counts):
for _ in range(count):
event = samna.xyloImu.event.Spike(
neuron_id=channel, timestep=start_timestep + timestep
)
input_events_list.append(event)
# - Add a `TriggerProcessing` event to ensure all time-steps are processed
event = samna.xyloImu.event.TriggerProcessing(
target_timestep=final_timestep + 1
)
input_events_list.append(event)
# - Clear the read and state buffers
self._read_buffer.get_events()
# - Clear the power recording buffer, if recording power
self._power_buf.clear_events()
# - Write the events and trigger the simulation
self._write_buffer.write(input_events_list)
# - Determine a reasonable read timeout
if read_timeout is None:
read_timeout = 2 * len(input) * self.dt * Nhidden / 100.0
read_timeout = read_timeout * 30.0 if record else read_timeout
# - Wait until the simulation is finished
start_time = time.time()
read_events, is_timeout = hdkutils.blocking_read(
self._read_buffer,
timeout=max(read_timeout, 1.0),
target_timestep=final_timestep,
)
inf_duration = time.time() - start_time
if is_timeout:
message = f"Processing didn't finish for {read_timeout}s. Read {len(read_events)} events"
readout_events = [e for e in read_events if hasattr(e, "timestep")]
if len(readout_events) > 0:
message += f", first timestep: {readout_events[0].timestep}, final timestep: {readout_events[-1].timestep}, target timestep: {final_timestep}"
raise TimeoutError(message)
# - Read the simulation output data
xylo_data = hdkutils.decode_accel_mode_data(
read_events,
Nin,
Nhidden_monitor,
Nout_monitor,
Nout,
start_timestep,
final_timestep,
)
if record_power:
# - Get all recent power events from the power measurement
ps = self._power_buf.get_events()
# - Separate out power meaurement events by channel
channels = samna.xyloImuBoards.MeasurementChannels
io_power = np.array([e.value for e in ps if e.channel == int(channels.Io)])
core_power = np.array(
[e.value for e in ps if e.channel == int(channels.Core)]
)
if record:
rec_dict = {
"Vmem": np.array(xylo_data.V_mem_hid),
"Isyn": np.array(xylo_data.I_syn_hid),
"Spikes": np.array(xylo_data.Spikes_hid),
"Vmem_out": np.array(xylo_data.V_mem_out),
"Isyn_out": np.array(xylo_data.I_syn_out),
"times": np.arange(start_timestep, final_timestep + 1),
"inf_duration": inf_duration,
}
else:
rec_dict = {}
# - Return power recordings if requested
if record_power:
rec_dict.update(
{
"io_power": io_power,
"core_power": core_power,
"inf_duration": inf_duration,
}
)
# - This module holds no state
new_state = {}
# - Return spike output, new state and record dictionary
if self._output_mode == "Spike":
return xylo_data.Spikes_out, new_state, rec_dict
elif self._output_mode == "Isyn":
return xylo_data.I_syn_out, new_state, rec_dict
elif self._output_mode == "Vmem":
return xylo_data.V_mem_out, new_state, rec_dict