Source code for rockpool.devices.dynapse.mapping.mapper

"""
Dynap-SE2 graph mapper package
"""

from __future__ import annotations
from typing import Dict, List, Optional, Tuple

import numpy as np

from copy import deepcopy

from rockpool.graph import GraphModule
from rockpool.typehints import FloatVector

from rockpool.devices.dynapse.lookup import NUM_NEURONS

from .utils import lifnet_to_dynapsim
from .container import DynapseGraphContainer

__all__ = ["mapper"]


[docs]def mapper( graph: GraphModule, in_place=False, clustering_method: str = "greedy", n_cluster: Optional[int] = None, max_neuron_per_cluster: int = NUM_NEURONS, ) -> Dict[str, float]: """ mapper maps a computational graph onto Dynap-SE2 architecture. Each neural core shares a parameter group in Dynap-SE2. Therefore neurons inside the same core operates with the same current setting. `mapper` function cluster parameters in groups and allocates an hardware posititon for neurons. See Also: The tutorials in :ref:`/devices/DynapSE/post-training.ipynb` :param graph: Any graph (constraints apply) aimed to be deployed to Dynap-SE2 :type graph: GraphModule :param clustering_method: the clustering approach. ``"random"`` or ``"greedy"``, defaults to ``"greedy"`` :type clustering_method: str, optional :param n_cluster: number of clusters. minimum number of clusters possible if None, defaults to None :type n_cluster: int, optional :param max_neuron_per_cluster: maximum number of neurons allowed per cluster, defaults to ``NUM_NEURONS`` :type max_neuron_per_cluster: int, optional :return: a specification object which can be used to create a config object :rtype: Dict[str, float] """ ## -- Convert the network if necessary -- ## try: wrapper = DynapseGraphContainer.from_graph_holder(graph) except: graph = ( lifnet_to_dynapsim(deepcopy(graph)) if not in_place else lifnet_to_dynapsim(graph) ) wrapper = DynapseGraphContainer.from_graph_holder(graph) ## -- Compute and check the number of neurons and number of clusters -- ## n_neurons = __get_num_neurons(*wrapper.current_dict.values()) n_cluster, core_map = __get_cluster_map( n_neurons, clustering_method, n_cluster, max_neuron_per_cluster ) ## -- Compute the centers -- ## def __cluster_center(param: FloatVector) -> FloatVector: """ __cluster_center computes the cluster center of a single parameter :param param: any parameter, each index indicating a neuron's individual current :type param: FloatVector :return: a list of cluster centers :rtype: FloatVector """ param = np.array(param) centers = [np.mean(param[core_map == c]) for c in range(n_cluster)] return centers clustered_currents = { key: __cluster_center(val) for key, val in wrapper.current_dict.items() } ## -- Create the spec dictionary -- ## specs = { "mapped_graph": graph, "weights_in": wrapper.w_in, "weights_rec": wrapper.w_rec, "Iscale": wrapper.Iscale, "n_cluster": n_cluster, "core_map": core_map, "n_neuron": n_neurons, "unclustered": wrapper.current_dict, } specs.update(clustered_currents) return specs
### --- Private Section --- ### def __get_num_neurons(*args: List[FloatVector]) -> int: """ __get_num_neurons process the given list of FloatVectors and obtains the common length :raises ValueError: Number of neurons indicated does not match! :return: the elements of all the vectors provided :rtype: int """ n_neurons = len(args[0]) for arg in args: if len(arg) != n_neurons: raise ValueError(f"Number of neurons indicated does not match!") return n_neurons def __get_cluster_map( n_neurons: int, clustering_method: str, n_cluster: Optional[int], max_neuron_per_cluster: int = NUM_NEURONS, ) -> Tuple[int, np.ndarray]: """ __get_cluster_map _summary_ :param n_neurons: _description_ :type n_neurons: int :param clustering_method: the clustering approach. "random" or "greedy" :type clustering_method: str :param n_cluster: number of clusters. minimum number of clusters possible if None :type n_cluster: int :param max_neuron_per_cluster: maximum number of neurons allowed per cluster, defaults to NUM_NEURONS :type max_neuron_per_cluster: int, optional :raises ValueError: At least {min_cluster} clusters required to allocate {n_neurons} neurons! :raises ValueError: Number of clusters {n_cluster} cannot be greater than number of neurons {n_neurons} :raises ValueError: Unknown clustering method! :return: _description_ :rtype: Tuple[int, np.ndarray] """ min_cluster = n_neurons // max_neuron_per_cluster + 1 ### --- Check if the cluster size and maximum number of neurons per cluster match --- ### if n_cluster is None: n_cluster = min_cluster elif n_cluster < min_cluster: raise ValueError( f"At least {min_cluster} clusters required to allocate {n_neurons} neurons!" ) if n_cluster > n_neurons: raise ValueError( f"Number of clusters {n_cluster} cannot be greater than number of neurons {n_neurons}" ) ## -- Get a clustered core map in accordance with the clustering method chosen -- ## if clustering_method == "greedy": neuron_per_cluster = np.floor(n_neurons / n_cluster) core_map = np.array([n // neuron_per_cluster for n in range(n_neurons)]) elif clustering_method == "random": core_map = np.random.permutation([n % n_cluster for n in range(n_neurons)]) else: raise ValueError("Unknown clustering method!") return n_cluster, core_map