"""
Classes to manage time series data
"""
## -- Import statements
# - Built-ins
import copy
import collections.abc
from pathlib import Path
from tempfile import TemporaryFile
from rockpool.utilities.backend_management import backend_available
from typing import (
Union,
List,
Tuple,
Optional,
Iterable,
TypeVar,
Type,
Dict,
Hashable,
Any,
)
from warnings import warn
# - Third party libraries
import numpy as np
import scipy.interpolate as spint
# - Plotting backends
_global_plotting_backend = None
if backend_available("matplotlib"):
import matplotlib as mpl
from matplotlib import pyplot as plt
_MPL_AVAILABLE = True
_global_plotting_backend = "matplotlib"
else:
_MPL_AVAILABLE = False
if backend_available("holoviews"):
import holoviews as hv
_HV_AVAILABLE = True
if not _MPL_AVAILABLE:
_global_plotting_backend = "holoviews"
else:
_HV_AVAILABLE = False
if not _MPL_AVAILABLE:
_global_plotting_backend = None
# - Define exports
__all__ = [
"TimeSeries",
"TSEvent",
"TSContinuous",
"TSDictOnDisk",
"set_global_ts_plotting_backend",
"get_global_ts_plotting_backend",
"load_ts_from_file",
]
TS = TypeVar("TS", bound="TimeSeries")
# - Type alias for array-like objects
ArrayLike = Union[np.ndarray, List, Tuple]
### -- Code for setting plotting backend
# - Absolute tolerance, e.g. for comparing float values
_TOLERANCE_ABSOLUTE = 1e-9
_TOLERANCE_RELATIVE = 1e-6
# - Global plotting backend
def set_global_ts_plotting_backend(backend: Union[str, None], verbose=True):
"""
Set the plotting backend for use by :py:class:`TimeSeries` classes
:param str backend: One of {"holoviews", "matplotlib"}
:param bool verbose: If ``True``, print feedback about the backend. Default: ``True``
"""
global _global_plotting_backend
if backend in ("holoviews", "holo", "Holoviews", "HoloViews", "hv"):
if _HV_AVAILABLE:
_global_plotting_backend = "holoviews"
if verbose:
print("Global plotting backend has been set to holoviews.")
else:
raise RuntimeError("Holoviews is not available.")
elif backend in ("matplotlib", "mpl", "mp", "pyplot", "plt"):
if _MPL_AVAILABLE:
_global_plotting_backend = "matplotlib"
if verbose:
print("Global plotting backend has been set to matplotlib.")
else:
raise RuntimeError("Matplotlib is not available.")
elif backend is None:
_global_plotting_backend = None
if verbose:
print("No global plotting backend is set.")
else:
raise ValueError("Plotting backend not recognized.")
def get_global_ts_plotting_backend() -> str:
"""
Return a string representing the current plotting backend
:return str: Current plotting backend. One of {"holoviews", "matplotlib"}
"""
global _global_plotting_backend
return _global_plotting_backend
def load_ts_from_file(
path: Union[str, Path], expected_type: Optional[str] = None
) -> "TimeSeries":
"""
Load a timeseries object from an ``npz`` file
:param Union[str, Path] path: Filepath to load file
:param Optional[str] expected_type: Specify expected type of timeseires (:py:class:`TSContinuous` or py:class:`TSEvent`). Default: ``None``, use whichever type is loaded.
:return TimeSeries: Loaded time series object
:raises TypeError: Unsupported or unexpected type
"""
# - Load npz file from specified path
try:
# Loading from temporary files may require "rewinding".
path.seek(0)
except AttributeError:
pass
loaded_data = np.load(path)
# - Check for expected type
try:
loaded_type = loaded_data["str_type"].item()
except KeyError:
try:
loaded_type = loaded_data["strType"].item()
except KeyError:
type_key = [k for k in loaded_data if k.startswith("type")]
if type_key:
loaded_type = type_key[0][5:]
else:
if expected_type is not None:
loaded_type = expected_type
warn(
f"Cannot determine type of Timeseries at {path}. "
+ f"Will assume expected type ('{expected_type}')."
)
else:
raise KeyError(f"Cannot determine type of Timeseries at {path}.")
if expected_type is not None:
if not loaded_type == expected_type:
raise TypeError(
"Timeseries at `{}` is of type `{}`, which does not match expected type `{}`.".format(
path, loaded_type, expected_type
)
)
if "name" in loaded_data:
name = loaded_data["name"].item()
else:
name_keys = [k for k in loaded_data if k.startswith("name")]
if name_keys:
name = name_keys[0][5:]
else:
name = "unnamed"
if loaded_type == "TSContinuous":
if "interp_kind" in loaded_data:
interp_kind = loaded_data["interp_kind"].item()
else:
interp_kind_keys = [k for k in loaded_data if k.startswith("interp_kind")]
if interp_kind_keys:
interp_kind = interp_kind_keys[0][12:]
else:
interp_kind = "linear"
ts = TSContinuous(
times=loaded_data["times"],
samples=loaded_data["samples"],
t_start=loaded_data["t_start"].item(),
t_stop=loaded_data["t_stop"].item(),
interp_kind=interp_kind,
periodic=loaded_data["periodic"].item(),
name=name,
)
if "trial_start_times" in loaded_data:
ts.trial_start_times = loaded_data["trial_start_times"]
elif loaded_type == "TSEvent":
ts = TSEvent(
times=loaded_data["times"],
channels=loaded_data["channels"],
t_start=loaded_data["t_start"].item(),
t_stop=loaded_data["t_stop"].item(),
periodic=loaded_data["periodic"].item(),
num_channels=loaded_data["num_channels"].item(),
name=name,
)
if "trial_start_times" in loaded_data:
ts.trial_start_times = loaded_data["trial_start_times"]
else:
raise TypeError("Type `{}` not supported.".format(loaded_type))
return ts
def _extend_periodic_times(
t_start: float, t_stop: float, series: "TimeSeries"
) -> np.ndarray:
"""
Replicate out a periodic time base for later trimming, to ensure that the original time base is repeated correctly
:param float t_start: Desired start time of the new series
:param float t_stop: Desired end time of the new series
:param TimeSeries series: The periodic :py:class:`TimeSeries` to replicate
:return np.array: A vector of times corresponding to the replicated time base
"""
# - Repeat events sufficiently often
# Number of additional required repetitions to append before and after
num_reps_after = (
int(np.ceil((t_stop - series.t_start) / series.duration))
if t_stop > series.t_stop
else 1
)
num_reps_before = (
int(np.ceil((series.t_start - t_start) / series.duration))
if t_start < series.t_start
else 0
)
num_reps_total = num_reps_before + num_reps_after
# - Correct times so they extend over the prolongued period and do not repeat
# Enumerate periods so that originally defined period is 0
periods = np.arange(num_reps_total) - num_reps_before
correct_periods = series.duration * np.repeat(periods, series.times.size)
return np.tile(series.times, num_reps_total) + correct_periods
## - Convenience method to return a nan array
def full_nan(shape: Union[tuple, int]) -> np.array:
"""
Build an all-NaN array
:param ArrayLike[int] shape: The desired shape of the NaN matrix
:return np.array: The all-NaN matrix
"""
a = np.empty(shape)
a.fill(np.nan)
return a
### --- TimeSeries base class
[docs]class TimeSeries:
"""
Base class to represent a continuous or event-based time series. You should use the subclasses `.TSContinuous` and
`.TSEvent` to represent continuous-time and event-based time series, respectively. See :ref:`/basics/time_series.ipynb` for futher explanation and examples.
"""
[docs] def __init__(
self,
times: ArrayLike = [],
periodic: bool = False,
t_start: Optional[float] = None,
t_stop: Optional[float] = None,
plotting_backend: Optional[str] = None,
name: str = "unnamed",
):
"""
Represent a continuous or event-based time series
:param ArrayLike times: [Tx1] vector of time samples
:param bool periodic: Treat the time series as periodic around the end points. Default: ``False``
:param Optional[float] t_start: If not ``None``, the series start time is ``t_start``, otherwise ``times[0]``
:param Optional[float] t_stop: If not ``None``, the series stop time is ``t_stop``, otherwise ``times[-1]``
:param Optional[str] plotting_backend: Determines plotting backend. If ``None``, backend will be chosen automatically based on what is available.
:param str name: Name of the `.TimeSeries` object. Default: "unnamed"
"""
# - Convert time trace to numpy arrays
times = np.atleast_1d(times).flatten().astype(float)
if (np.diff(times) < 0).any():
raise ValueError(
f"TimeSeries `{name}`: The time trace must be not decreasing"
)
# - Assign attributes
self._times = times
self.periodic = periodic
self.name = name
self._t_start = (
(0 if np.size(times) == 0 else times[0])
if t_start is None
else float(t_start)
)
self.t_stop = (
(self.t_start if np.size(times) == 0 else times[-1])
if t_stop is None
else float(t_stop)
)
self.set_plotting_backend(
plotting_backend if plotting_backend is not None else None, verbose=False
)
[docs] def delay(self: TS, offset: Union[int, float], inplace: bool = False) -> TS:
"""
Return a copy of ``self`` that is delayed by an offset
For delaying self, use the ``inplace`` argument, or ``.times += ...`` instead.
:param float Offset: Time by which to offset this time series
:param bool inplace: If ``True``, conduct operation in-place (Default: ``False``; create a copy)
:return TimeSeries: New `.TimeSeries`, delayed
"""
if not inplace:
series = self.copy()
else:
series = self
# - Store previous t_start and t_stop
t_start_old = series.t_start
t_stop_old = series.t_stop
# - Shift time trace
if not self.isempty():
series.times += offset
# - Shift t_start and t_stop
series._t_start = t_start_old + offset
series._t_stop = t_stop_old + offset
return series
[docs] def start_at_zero(self: TS, inplace: bool = False) -> TS:
"""
Convenience function that calls the `~.TimeSeries.delay` method such that ``self.t_start`` falls at ``0``.
:return TimeSeries: New TimeSeries, with t_start at 0
"""
return self.delay(offset=-self.t_start, inplace=inplace)
[docs] def start_at(self: TS, t_start: float, inplace: bool = False) -> TS:
"""
Convenience function that calls the `~.TimeSeries.delay` method such that ``self.t_start`` falls at ``t_start``.
:param float t_start: Time to which ``self.t_start`` should be shifted;
:param bool inplace: If ``True``, conduct operation in-place (Default: ``False``; create a copy)
:return TimeSeries: New `.TimeSeries`, delayed
"""
return self.delay(offset=t_start - self.t_start, inplace=inplace)
[docs] def isempty(self) -> bool:
"""
Test if this `.TimeSeries` object is empty
:return bool: ``True`` iff the `.TimeSeries` object contains no samples
"""
return np.size(self.times) == 0
[docs] def print(self):
"""Print an overview of the time series"""
print(self.__repr__())
[docs] def set_plotting_backend(self, backend: Union[str, None], verbose: bool = True):
"""
Set which plotting backend to use with the `~.TimeSeries.plot` method
:param str backend: Specify a backend to use. Supported: {"holoviews", "matplotlib"}
:param bool verbose: If True, print feedback about which backend has been set
"""
if backend in ("holoviews", "holo", "Holoviews", "HoloViews", "hv"):
if _HV_AVAILABLE:
self._plotting_backend = "holoviews"
if verbose:
print(
"{} `{}`: Plotting backend has been set to holoviews.".format(
type(self).__name__, self.name
)
)
else:
raise RuntimeError("Holoviews is not available.")
elif backend in ("matplotlib", "mpl", "mp", "pyplot", "plt"):
if _MPL_AVAILABLE:
self._plotting_backend = "matplotlib"
if verbose:
print(
"{} `{}`: Plotting backend has been set to matplotlib.".format(
type(self).__name__, self.name
)
)
else:
raise RuntimeError("Matplotlib is not available.")
elif backend is None:
self._plotting_backend = None
if verbose:
print(
"{} `{}`: Using global plotting backend.".format(
type(self).__name__, self.name
)
)
else:
raise ValueError("Plotting backend not recognized.")
[docs] def copy(self: TS) -> TS:
"""
Return a deep copy of this time series
:return TimeSeries: copy of `self`
"""
return copy.deepcopy(self)
[docs] def contains(self, times: Union[int, float, ArrayLike]) -> bool:
"""
Does the time series contain the time range specified in the given time trace?
Always true for periodic series
:param ArrayLike times: Array-like containing time points
:return bool: True iff all specified time points are contained within this time series
"""
return self.periodic or (
self.t_start <= np.min(times) and self.t_stop >= np.max(times)
)
[docs] def _modulo_period(
self, times: Union[ArrayLike, float, int]
) -> Union[ArrayLike, float, int]:
"""_modulo_period - Calculate provided times modulo `self.duration`"""
return self.t_start + np.mod(times - self.t_start, self.duration)
def __len__(self):
return self._times.size
[docs] @classmethod
def concatenate_t(
cls: Type[TS],
series: Iterable[TS],
offset: Union[None, float, Iterable[Union[float, None]]] = None,
) -> TS:
"""
Append multiple TimeSeries objects in time to a new series
:param Iterable series: Time series to be tacked at the end of each other. These series must have the same number of channels.
:param Union[None, float, Iterable] offset: Offset to be introduced between time traces. First value corresponds to delay of first time series.
:return TimeSeries: Time series with data from series in ``series``
"""
# - Convert `series` to list, to be able to extract information about objects
if isinstance(series, cls):
series = [series]
elif isinstance(series, collections.abc.Iterable):
series = list(series)
else:
cls_name = str(cls).split("'")[1].split(".")[-1]
raise TypeError(f"{cls_name}: `series` must be of type {cls_name}.")
# - Determine t_start of first series, to avoid wrong delays.
# Determine class to enable calling the method through `TimeSeries` parent class.
try:
t_start = series[0].t_start
subclass = series[0].__class__
except IndexError: # `series` is empty
return cls()
new_series = subclass(t_start=t_start)
return new_series.append_t(series, offset=offset, inplace=False)
[docs] @classmethod
def load(
cls: Type[TS], path: Union[str, Path], expected_type: Optional[str] = None
) -> TS:
"""
Load TimeSeries object from file. If called from a subclass of :py:class'TimeSeries`,
the type of the stored object must match that of the method class.
:param Union[str, Path] path: Path to load from.
:param Optional[str] expected_type: Specify expected type of timeseires (:py:class:`TSContinuous` or py:class:`TSEvent`). Can only be set if method is called from py:class:`TimeSeries` class. Default: ``None``, use whichever type is loaded.
:return TimeSeries: Loaded time series object
:raises TypeError: Unsupported or unexpected type
:raises TypeError: Argument `expected_type` is defined if class is not `TimeSeries`.
"""
if cls != TimeSeries:
if expected_type is not None:
raise TypeError(
"Argument `expected_type` can only be provided when calling this "
+ "method from `TimeSeries` class."
)
# - Extract class name as string
expected_type = str(cls).strip("'<>").split(".")[-1]
return load_ts_from_file(path, expected_type)
@property
def times(self):
"""(ArrayLike[float]) Array of sample times"""
return self._times
@times.setter
def times(self, new_times: ArrayLike):
# - Check time trace for correct size
if np.size(new_times) != np.size(self._times):
raise ValueError(
f"TSContinuous `{self.name}`: "
+ "New time trace must have the same number of elements as the original trace."
)
# - Make sure time trace is sorted
if (np.diff(new_times) < 0).any():
raise ValueError(
f"TSContinuous `{self.name}`: "
+ "The time trace must be sorted and not decreasing"
)
# - Store new time trace
self._times = np.atleast_1d(new_times).flatten().astype(float)
if np.size(self._times) > 0:
# - Fix t_start and t_stop
self._t_start = min(self._t_start, new_times[0])
self._t_stop = max(self._t_stop, new_times[-1])
@property
def t_start(self) -> float:
"""(float) Start time of time series"""
return self._t_start
@t_start.setter
def t_start(self, new_start):
try:
# - Largest allowed value for new_start
max_start = self._times[0] if self._times.size > 0 else self._t_stop
if new_start < max_start:
self._t_start = new_start
elif new_start - max_start < _TOLERANCE_ABSOLUTE:
self._t_start = max_start
else:
raise ValueError(
"TimeSeries `{}`: t_start must be less or equal to {}. It was {}.".format(
self.name, max_start, new_start
)
)
except AttributeError:
# - If self._t_stop is not defined yet (instantiation)
self._t_start = float(new_start)
@property
def t_stop(self) -> float:
"""(float) Stop time of time series (final sample)"""
return self._t_stop
@t_stop.setter
def t_stop(self, new_stop):
# - Smallest allowed value for new_stop
min_stop = self._times[-1] if self._times.size > 0 else self._t_start
if new_stop >= min_stop:
self._t_stop = new_stop
elif min_stop - new_stop < _TOLERANCE_ABSOLUTE:
self._t_stop = min_stop
else:
raise ValueError(
"TimeSeries `{}`: t_stop must be greater or equal to {}. It was {}.".format(
self.name, min_stop, new_stop
)
)
@property
def duration(self) -> float:
"""(float) Duration of TimeSeries"""
return self._t_stop - self._t_start
@property
def plotting_backend(self):
"""(str) Current plotting backend"""
return (
self._plotting_backend
if self._plotting_backend is not None
else _global_plotting_backend
)
@property
def name(self) -> str:
return self._name
@name.setter
def name(self, new_name: Optional[str] = None):
# - Default name: 'unnamed'
self._name = "unnamed" if new_name is None else new_name
### --- Continuous-valued time series
[docs]class TSContinuous(TimeSeries):
"""
Represents a continuously-sampled time series. Mutliple time series can be represented by a single `.TSContinuous` object, and have identical time bases. Temporal periodicity is supported. See :ref:`/basics/time_series.ipynb` for further explanation and examples.
:Examples:
Build a linearly-increasing time series that extends from 0 to 1 second
>>> time_base = numpy.linspace(0, 1, 100)
>>> samples = time_base
>>> ts = TSContinuous(time_base, samples)
Build a periodic time series as a sinusoid
>>> time_base = numpy.linspace(0, 2 * numpy.pi, 100)
>>> samples = numpy.sin(time_base)
>>> ts = TSContinuous(time_base, samples, periodic = True)
Build an object containing five random time series
>>> time_base = numpy.linspace(0, 1, 100)
>>> samples = numpy.random.rand((100, 5))
>>> ts = TSContinuous(time_base, samples)
Manipulate time series using standard operators
>>> ts + 5
>>> ts - 3
>>> ts * 2
>>> ts / 7
>>> ts // 3
>>> ts ** 2
>>> ts1 + ts2
...
Manipulate time series data in time
>>> ts.delay(4)
>>> ts.clip(start, stop, [channel1, channel2, channel3])
Combine time series data
>>> ts1.append_t(ts2) # Appends the second time series, along the time axis
>>> ts1.append_c(ts2) # Appends the second time series as an extra channel
.. note:: All :py:class:`TSContinuous` manipulation methods return a copy by default. Most methods accept an optional `inplace` flag, which if ``True`` causes the operation to be performed in place.
Resample a time series using functional notation, list notation, or using the :py:func:`.resample` method.
>>> ts(0.5)
>>> ts([0, .1, .2, .3])
>>> ts(numpy.array([0, .1, .2, .3]))
>>> ts[0.5]
>>> ts[0, .1, .2, .3]
>>> ts.resample(0.5)
>>> ts.resample([0, .1, .2, .3])
Resample using slice notation
>>> ts[0:.1:1]
Resample and select channels simultaneously
>>> ts[0:.1:1, :3]
"""
_samples = []
[docs] def __init__(
self,
times: Optional[ArrayLike] = None,
samples: Optional[ArrayLike] = None,
num_channels: Optional[int] = None,
periodic: bool = False,
t_start: Optional[float] = None,
t_stop: Optional[float] = None,
name: str = "unnamed",
units: Optional[str] = None,
interp_kind: str = "previous",
fill_value: str = "extrapolate",
):
"""
TSContinuous - Represents a continuously-sample time series, supporting interpolation and periodicity.
:param ArrayLike times: [Tx1] vector of time samples
:param ArrayLike samples: [TxM] matrix of values corresponding to each time sample
:param Optional[in] num_channels: If ``samples`` is None, determines the number of channels of ``self``. Otherwise it has no effect at all.
:param bool periodic: Treat the time series as periodic around the end points. Default: ``False``
:param Optional[float] t_start: If not ``None``, the series start time is ``t_start``, otherwise ``times[0]``
:param Optional[float] t_stop: If not ``None``, the series stop time is ``t_stop``, otherwise ``times[-1]``
:param str name: Name of the `.TSContinuous` object. Default: ``"unnamed"``
:param Optional[str] units: Units of the `.TSContinuous` object. Default: ``None``
:param str interp_kind: Specify the interpolation type. Default: ``"previous"``
:param str fill_value: Specify the method to fill values outside sample times. Default: ``"extrapolate"``. **Sampling beyond `.t_stop` is still not permitted**
If the time series is not periodic (the default), then NaNs will be returned for any values outside `t_start` and `t_stop`.
"""
if times is None:
times = np.array([])
if samples is None:
num_channels = 0 if num_channels is None else num_channels
samples = np.zeros((0, num_channels))
# - Convert everything to numpy arrays
times = np.atleast_1d(times).flatten().astype(float)
samples = np.atleast_1d(samples)
# - Check arguments
if np.any(np.diff(times) < 0):
raise ValueError(
f"TSContinuous `{name}`: The time trace must be sorted and not decreasing."
)
# - Initialize superclass
super().__init__(
times=times, periodic=periodic, t_start=t_start, t_stop=t_stop, name=name
)
# - Assign attributes
self.interp = None
self.fill_value = fill_value
self._interp_kind = interp_kind
self.samples = samples.astype("float") # Also creates an interpolator
self.units = units
# - Default: Throw exceptions when sampled output contains `NaN`s
self.beyond_range_exception = True
# - Default: Change sample times that are slightly out of range
self.approx_limit_times = True
## -- Alternative constructor for clocked time series
[docs] @staticmethod
def from_clocked(
samples: np.ndarray,
dt: float,
t_start: float = 0.0,
periodic: bool = False,
name: str = None,
interp_kind: str = "previous",
) -> "TSContinuous":
"""
Convenience method to create a new continuous time series from a clocked sample.
``samples`` is an array of clocked samples, sampled at a regular interval ``dt``. Each sample is assumed to occur at the **start** of a time bin, such that the first sample occurs at ``t = 0`` (or ``t = t_start``). A continuous time series will be returned, constructed using ``samples``, and filling the time ``t = 0`` to ``t = N*dt``, with ``t_start`` and ``t_stop`` set appropriately.
:param np.ndarray samples: A clocked set of contiguous-time samples, with a sample interval of ``dt``. ``samples`` must be of shape ``[T, C]``, where ``T`` is the number of time bins, and ``C`` is the number of channels.
:param float dt: The sample interval for ``samples``
:param float t_start: The time of the first sample.
:param bool periodic: Flag specifying whether or not the time series will be generated as a periodic series. Default:``False``, do not generate a periodic time series.
:param Optional[str] name: Optional string to set as the name for this time series. Default: ``None``
:param str interp_kind: String specifying the interpolation method to be used for the returned time series. Any string accepted by `scipy.interp1d` is accepted. Default: `"previous"`, sample-and-hold interpolation.
:return `.TSContinuous` : A continuous time series containing ``samples``.
"""
if samples is None or np.size(samples) == 0:
raise TypeError(
"TSContinuous.from_clocked: `samples` must not be empty or `None`."
)
# - Ensure that `samples` is an ndarray
samples = np.atleast_1d(samples)
# - Build a time base
time_base = np.arange(0, np.shape(samples)[0]) * dt + t_start
# - Return a continuous time series
return TSContinuous(
time_base,
samples,
t_stop=time_base[-1] + dt,
periodic=periodic,
name=name,
interp_kind=interp_kind,
)
## -- Methods for plotting and printing
[docs] def plot(
self,
times: Optional[Union[int, float, ArrayLike]] = None,
target: Optional[Union["mpl.axes.Axes", "hv.Curve", "hv.Overlay"]] = None,
channels: Optional[Union[ArrayLike, int]] = None,
stagger: Optional[Union[float, int]] = None,
skip: Optional[int] = None,
dt: Optional[float] = None,
*args,
**kwargs,
):
"""
Visualise a time series on a line plot
:param Optional[ArrayLike] times: Time base on which to plot. Default: time base of time series
:param Optional target: Axes (or other) object to which plot will be added.
:param Optional[ArrayLike] channels: Channels of the time series to be plotted.
:param Optional[float] stagger: Stagger to use to separate each series when plotting multiple series. (Default: `None`, no stagger)
:param Optional[int] skip: Skip several series when plotting multiple series
:param Optiona[float] dt: Resample time series to this timestep before plotting
:return: Plot object. Either holoviews Layout, or matplotlib plot
"""
if dt is not None and times is None:
times = np.arange(self.t_start, self.t_stop, dt)
samples = self(times)
elif times is not None:
samples = self(times)
else:
times = self.times
samples = self.samples
if channels is not None:
samples = samples[:, channels]
if skip is not None and skip != 0:
samples = samples[:, ::skip]
if stagger is not None and stagger != 0:
samples = samples + np.arange(0, samples.shape[1] * stagger, stagger)
if target is None:
# - Determine plotting backend
if self._plotting_backend is None:
backend = _global_plotting_backend
else:
backend = self._plotting_backend
# - Handle holoviews plotting
if backend == "holoviews":
if kwargs == {}:
vhCurves = [
hv.Curve((times, data)).redim(x="Time") for data in samples.T
]
else:
vhCurves = [
hv.Curve((times, data)).redim(x="Time").options(*args, **kwargs)
for data in samples.T
]
if len(vhCurves) > 1:
return hv.Overlay(vhCurves).relabel(group=self.name)
else:
return vhCurves[0].relabel(self.name)
elif backend == "matplotlib":
# - Add `self.name` as label only if a label is not already present
kwargs["label"] = kwargs.get("label", self.name)
# - Get current axes
ax = plt.gca()
# - Set the ylabel, if it isn't already set
if ax.get_ylabel() == "" and self.units is not None:
ax.set_ylabel(self.units)
# - Set the xlabel, if it isn't already set
if ax.get_xlabel() == "":
ax.set_xlabel("Time (s)")
# - Set the title, if it isn't already set
if ax.get_title() == "" and self.name != "unnamed":
ax.set_title(self.name)
# - Set the extent of the time axis
ax.set_xlim(times[0], times[-1])
# - Plot the curves
return ax.plot(times, samples, **kwargs)
else:
raise RuntimeError(
f"TSContinuous: `{self.name}`: No plotting back-end set."
)
else:
# - Infer current plotting backend from type of `target`
if _HV_AVAILABLE and isinstance(target, (hv.Curve, hv.Overlay)):
if kwargs == {}:
for data in samples.T:
target *= hv.Curve((times, data)).redim(x="Time")
else:
for data in samples.T:
target *= (
hv.Curve((times, data))
.redim(x="Time")
.options(*args, **kwargs)
)
return target.relabel(group=self.name)
elif _MPL_AVAILABLE and isinstance(target, mpl.axes.Axes):
# - Add `self.name` as label only if a label is not already present
kwargs["label"] = kwargs.get("label", self.name)
target.plot(times, samples, **kwargs)
return target
else:
raise TypeError(
f"TSContinuous: `{self.name}`: Unrecognized type for `target`. "
+ "It must be matplotlib Axes or holoviews Curve or Overlay and "
+ "the corresponding backend must be installed in your environment."
)
[docs] def print(
self,
full: bool = False,
num_first: int = 4,
num_last: int = 4,
limit_shorten: int = 10,
):
"""
Print an overview of the time series and its values
:param bool full: Print all samples of ``self``, no matter how long it is
:param int num_first: Shortened version of printout contains samples at first `num_first` points in `.times`
:param int num_last: Shortened version of printout contains samples at last `num_last` points in `.times`
:param int limit_shorten: Print shortened version of self if it comprises more than `limit_shorten` time points and if `full` is False
"""
s = "\n"
if len(self.times) <= 10 or full:
summary = s.join(
[
"{}: \t {}".format(t, samples)
for t, samples in zip(self.times, self.samples)
]
)
else:
summary0 = s.join(
[
"{}: \t {}".format(t, samples)
for t, samples in zip(
self.times[:num_first], self.samples[:num_first]
)
]
)
summary1 = s.join(
[
"{}: \t {}".format(t, samples)
for t, samples in zip(
self.times[-num_last:],
self.samples[-num_last + int(self.periodic) :],
)
]
)
if self.periodic:
summary1 += f"\n{self.times[-1]}: \t {self(self.times[-1])}"
summary1 += "\n\t (...periodic...)"
summary = summary0 + "\n\t...\n" + summary1
print(self.__repr__() + "\n" + summary)
[docs] def to_dict(
self,
dtype_times: Union[None, str, type, np.dtype] = None,
dtype_samples: Union[None, str, type, np.dtype] = None,
) -> Dict:
"""
Store data and attributes of this :py:class:`.TSContinuous` in a :py:class:`dict`.
:param Union[None, str, type, np.dtype] dtype_times: Data type in which `times` are to be returned, for example to save space.
:param Union[None, str, type, np.dtype] dtype_samples: Data type in which `samples` are to be returned, for example to save space.
:return: Dict with data and attributes of this :py:class:`.TSContinuous`.
"""
if dtype_times is not None:
# Make sure that broadcast values in `times` are not beyond `t_stop` and `t_start`
times = self.times.astype(dtype_times)
# Cannot simply clip `times` because of rounding issues.
t_start = np.clip(self.t_start, None, np.min(times))
t_stop = np.clip(self.t_stop, np.max(times), None)
else:
times = self.times
t_start = self.t_start
t_stop = self.t_stop
if dtype_samples is not None:
samples = self.samples.astype(dtype_samples)
else:
samples = self.samples
# - Collect attributes in dict
attributes = {
"times": times,
"samples": samples,
"t_start": np.array(t_start),
"t_stop": np.array(t_stop),
f"interp_kind_{self._interp_kind}": np.array([]),
"periodic": np.array(self.periodic),
f"name_{self.name}": np.array([]),
f"type_TSContinuous": np.array(
[]
), # Indicate that this object is TSContinuous
}
# - Some modules add a `trial_start_times` attribute to the object.
if hasattr(self, "trial_start_times"):
attributes["trial_start_times"] = np.asarray(self.trial_start_times)
return attributes
[docs] def save(
self,
path: Union[str, Path],
verbose: bool = False,
dtype_times: Union[None, str, type, np.dtype] = None,
dtype_samples: Union[None, str, type, np.dtype] = None,
):
"""
Save this time series as an ``npz`` file using np.savez
:param str path: Path to save file
:param Union[None, str, type, np.dtype] dtype_times: Data type in which `times` are to be stored, for example to save space.
:param Union[None, str, type, np.dtype] dtype_samples: Data type in which `samples` are to be stored, for example to save space.
"""
# - Collect attributes in dict
attributes = self.to_dict(dtype_times=dtype_times, dtype_samples=dtype_samples)
# - Write the file
np.savez(path, **attributes)
if verbose:
missing_ending = path.split(".")[-1] != "npz" # np.savez will add ending
print(
"TSContinuous `{}` has been stored in `{}`.".format(
self.name, path + missing_ending * ".npz"
)
)
## -- Methods for manipulating timeseries
[docs] def clip(
self,
t_start: Optional[float] = None,
t_stop: Optional[float] = None,
channels: Union[int, ArrayLike, None] = None,
include_stop: bool = True,
sample_limits: bool = True,
inplace: bool = False,
) -> "TSContinuous":
"""
Return a TSContinuous which is restricted to given time limits and only contains events of selected channels
:param float t_start: Time from which on events are returned
:param float t_stop: Time until which events are returned
:param ArrayLike channels: Channels of which events are returned
:param bool include_stop: True -> If there are events with time ``t_stop`` include them. False -> Exclude these samples. Default: True.
:param bool sample_limits: If True, make sure that a sample exists at ``t_start`` and, if ``include_stop`` is True, at ``t_stop``, as long as not both are None.
:param bool inplace: Conduct operation in-place (Default: False; create a copy)
:return TSContinuous: clipped_series: New TSContinuous clipped to bounds
"""
# - Create a new time series, or modify this time series
if not inplace:
clipped_series = self.copy()
else:
clipped_series = self
# Handle `None` time limits
t_start: float = self.t_start if t_start is None else t_start
include_stop = True if t_stop is None else include_stop
t_stop: float = self.t_stop if t_stop is None else t_stop
# - Ensure time bounds are sorted
t_start, t_stop = sorted((t_start, t_stop))
# - Handle periodic time series
times_to_choose: np.ndarray = (
_extend_periodic_times(t_start, t_stop, clipped_series)
if clipped_series.periodic
else clipped_series.times
)
# - Mark which times lie within bounds
times_in_limits: np.ndarray = np.logical_and(
times_to_choose >= t_start, times_to_choose < t_stop
)
if include_stop:
# - Include samples at time `t_stop`
times_in_limits[times_to_choose == t_stop] = True
# - Pick matching times
times: np.ndarray = times_to_choose[times_in_limits]
if sample_limits:
add_start: bool = times.size == 0 or times[0] > t_start
if not clipped_series.contains(t_start):
warn(
f"TSContinuous '{self.name}`: `t_start` beyond interpolation-range. "
)
add_stop: bool = times.size == 0 or (times[-1] < t_stop and include_stop)
if not clipped_series.contains(t_stop):
warn(
f"TSContinuous '{self.name}`: `t_stop` beyond interpolation-range. "
)
# - Only generate new array once
if add_start and add_stop:
times = np.r_[t_start, times, t_stop]
elif add_start:
times = np.r_[t_start, times]
elif add_stop:
times = np.r_[times, t_stop]
# - Sample at the chosen time points and return
clipped_series.resample(times, channels, inplace=True)
# - Update t_start and t_stop
clipped_series._t_start = t_start
clipped_series._t_stop = t_stop
return clipped_series
[docs] def resample(
self,
times: Union[int, float, ArrayLike],
channels: Union[int, float, ArrayLike, None] = None,
inplace: bool = False,
) -> "TSContinuous":
"""
Return a new time series sampled to the supplied time base
:param ArrayLike times: T desired time points to resample
:param Optional[ArrayLike] channels: Channels to be used. Default: None (use all channels)
:param bool inplace: True -> Conduct operation in-place (Default: False; create a copy)
:return TSContinuous: Time series resampled to new time base and with desired channels.
"""
if not inplace:
resampled_series = self.copy()
else:
resampled_series = self
# - Make sure `times` is array
times = np.atleast_1d(times)
# - Resample time series
if channels is None:
resampled_series._samples = self(times)
else:
# - Convert to 1d array so that integer as index also results in 2D samples
channels = np.atleast_1d(channels)
if self.num_channels == 0 and channels.size > 0:
# - Handle empty series
raise IndexError(
f"TSContinuous `{self.name}` does not have any channels."
)
try:
resampled_series._samples = self(times)[:, channels]
except IndexError:
raise IndexError(
f"TSContinuous `{self.name}`: "
+ f"Channels must be between 0 and {self.num_channels - 1}."
)
resampled_series._times = times
if times.size > 0:
resampled_series._t_start = times[0]
resampled_series._t_stop = times[-1]
resampled_series._create_interpolator()
return resampled_series
[docs] def to_clocked(
self,
dt: float,
) -> np.ndarray:
"""
Resample this time series to a synchronous clock and return the samples
This method will generate a time base that begins at :py:attr:`.t_start` and extends to at least :py:attr:`.t_stop`, sampled on a clock defined by ``dt``. The time series will be resampled to that time base, using the defined interpolation method, and the clocked samples will be returned as a raster.
Args:
dt (float): The desired clock time step, in seconds
Returns:
np.ndarray: The samples from the clocked time series
"""
# - Build a time base that spans the extend of the time series
num_samples = np.ceil((self.t_stop - self.t_start) / dt)
time_base = np.arange(num_samples) * dt + self.t_start
# - Resample the time series to the supplied time base and return
return self.resample(time_base).samples
## -- Methods for combining time series
[docs] def merge(
self,
other_series: Union["TSContinuous", Iterable["TSContinuous"]],
remove_duplicates: bool = True,
inplace: bool = False,
) -> "TSContinuous":
"""
Merge other time series to this one, by interleaving in time. Maintain each time series' time values and channel IDs.
:param Union["TSContinuous", Iterable["TSContinuous"]] other_series: time series that is merged to self or iterable thereof to merge multiple series
:param bool remove_duplicates: If ``True``, time points in other series that are also in ``self.times`` are discarded. Otherwise they are included in the new time trace and come after the corresponding points of self.times.
:param bool inplace: Conduct operation in-place (Default: ``False``; create a copy)
:return TSContinuous: The merged time series
"""
# - Create a new time series, or modify this time series
if not inplace:
merged_series = self.copy()
else:
merged_series = self
# - Ensure there is a list of timeseries to work on
if isinstance(other_series, TSContinuous):
series_list = [merged_series, other_series]
else:
try:
series_list = [merged_series] + list(other_series)
except TypeError:
raise TypeError(
f"TSContinuous `{self.name}`: `other_series` must be `TSContinuous`"
" or iterable thereof."
)
# - Check series class
if not all(isinstance(series, TSContinuous) for series in series_list):
raise TypeError(
f"TSContinuous `{self.name}`: Can only merge with `TSContinuous` objects."
)
# - Handle empty `self`
if merged_series.num_channels == 0 and len(merged_series) == 0:
num_channels = max(series.num_channels for series in series_list)
merged_series._samples = np.zeros((0, num_channels))
# - Handle empty other series (without changing original object) and channel numbers
for i_series, series in enumerate(series_list[1:]):
if series.num_channels == 0 and len(series) == 0:
series = series.copy()
series._samples = np.zeros((0, merged_series.num_channels))
series_list[i_series + 1] = series.copy()
elif series.num_channels != merged_series.num_channels:
raise ValueError(
f"TSContinuous `{self.name}`: `other_series` must include "
f"the same number of traces ({merged_series.num_channels}). "
f"Other series number {i_series} has {series.num_channels}."
)
if remove_duplicates:
# - For each time point in each series a boolean array indicating whether points are used or removed
use_points_list = [
np.ones(series.times.size, bool) for series in series_list
]
# - Iterate over series pairwise and remove duplicate time points in second series of pair
for i_s0, series0 in enumerate(series_list[:-1]):
for series1, use_points1 in zip(
series_list[i_s0 + 1 :], use_points_list[i_s0 + 1 :]
):
self._mask_duplicate_time_points(series0, series1, use_points1)
times_series = [
series._times[use] for series, use in zip(series_list, use_points_list)
]
samples_series = [
series._samples[use]
for series, use in zip(series_list, use_points_list)
]
else:
times_series = [series._times for series in series_list]
samples_series = [series._samples for series in series_list]
# - Merge time traces and samples
times_new: np.ndarray = np.concatenate(times_series)
samples_new: np.ndarray = np.concatenate(samples_series, axis=0)
# - Indices for sorting new time trace and samples. Use mergesort as stable sorting algorithm.
idcs_sorted: np.ndarray = np.argsort(times_new, kind="mergesort")
# - Update data of new time series
merged_series._times = times_new[idcs_sorted]
merged_series._samples = samples_new[idcs_sorted]
merged_series._t_start: float = min(series.t_start for series in series_list)
merged_series._t_stop: float = max(series.t_stop for series in series_list)
# - Create new interpolator
merged_series._create_interpolator()
# - Return merged TS
return merged_series
[docs] @staticmethod
def _mask_duplicate_time_points(series0, series1, use_points1):
if not (series0.t_start > series1.t_stop or series0.t_stop < series1.t_start):
# Determine region of overlap
overlap: np.ndarray = np.where(
(series0.times >= series1.t_start) & (series0.times <= series1.t_stop)
)[0]
# Array of bools indicating which sampled time points of series1 do not occur in series0
not_unique = [(t == series0.times[overlap]).any() for t in series1.times]
# Update which points of series1 are to be used
use_points1[not_unique] = False
[docs] def append_c(
self, other_series: "TSContinuous", inplace: bool = False
) -> "TSContinuous":
"""
Append another time series to this one, along the samples axis (i.e. add new channels)
:param TSContinuous other_series: Another time series. Will be resampled to the time base of ``self``
:param bool inplace: Conduct operation in-place (Default: ``False``; create a copy)
:return `TSContinuous`: Current time series, with new channels appended
"""
# - Check other_series
if not isinstance(other_series, TSContinuous):
raise TypeError(
f"TSContinuous `{self.name}`: "
+ "`other_series` must be a TSContinuous object."
)
# - Resample other_series to own time base
other_samples = other_series(self.times)
# - Create a new time series, or modify this time series
if not inplace:
appended_series = self.copy()
else:
appended_series = self
# - Combine samples
appended_series.samples = np.concatenate(
(np.atleast_2d(appended_series.samples), other_samples), 1
)
# - Create new interpolator
appended_series._create_interpolator()
# - Return appended TS
return appended_series
[docs] def append_t(
self,
other_series: Union["TSContinuous", Iterable["TSContinuous"]],
offset: Union[float, Iterable[Union[float, None]], None] = None,
inplace: bool = False,
) -> "TSContinuous":
"""
Append another time series to this one, along the time axis
:param Union["TSContinuous", Iterable[TSContinuous]] other_series: Time series to be tacked on to the end of the called series object. These series must have the same number of channels as ``self`` or be empty.
:param Union[float, Iterable[float], Iterable[None], None] offset: If not None, defines distance between last sample of one series and first sample of the next. Otherwise the offset will be the median of all timestep sizes of the first of the two series, or 0 if that series has len < 2.
:param bool inplace: Conduct operation in-place (Default: ``False``; create a copy)
:return TSContinuous: Time series containing data from ``self``, with the other series appended in time
"""
# - Ensure there is a list of timeseries to work on
if isinstance(other_series, TSContinuous):
other_series = [other_series]
else:
try:
other_series = list(other_series)
except TypeError:
raise TypeError(
f"TSContinuous `{self.name}`: `other_series` must be `TSContinuous`"
" or iterable thereof."
)
# - Check series class
if not all(isinstance(series, TSContinuous) for series in other_series):
raise TypeError(
f"TSContinuous `{self.name}`: Can only merge with `TSContinuous` objects."
)
# - Same for offsets
if not isinstance(offset, collections.abc.Iterable):
offset_list = [offset] * len(other_series)
else:
offset_list = list(offset)
if len(offset_list) != len(other_series):
warn(
f"TSContinuous `{self.name}`: Numbers of provided offsets and "
+ "TSContinuous objects do not match. Will ignore excess elements."
)
for i, (prev_series, series, offset) in enumerate(
zip([self] + other_series[:-1], other_series, offset_list)
):
if offset is None:
if len(series) > 0 and len(prev_series) > 1:
# - If ``self`` is empty then append new elements directly. Otherwise leave an offset
# corresponding to the median distance between time points in `self._times`.
offset_list[i] = np.median(np.diff(prev_series._times))
else:
# - No offset with empty series
offset_list[i] = 0
# - Translate offsets so that they correspond to indiviual delays for each series
# Delay for first appended series:
delay1 = offset_list[0] + self.t_stop - other_series[0].t_start
delay_list = [delay1]
# Add delays for other lists
for prev_series, curr_series, offset in zip(
other_series[:-1], other_series[1:], offset_list[1:]
):
# Time at which previous series stops
stop_previous = delay_list[-1] + prev_series.t_stop
# Delay for current series
delay_list.append(stop_previous + offset - curr_series.t_start)
other_series = [
series.delay(delay) for series, delay in zip(other_series, delay_list)
]
# - Let ``self.merge()`` do the rest
try:
return self.merge(
other_series=other_series, remove_duplicates=False, inplace=inplace
)
except TypeError:
# - Provide matching exception
raise TypeError(
f"TSContinuous `{self.name}`: Can only append `TSContinuous` objects."
)
## -- Internal methods
[docs] def _create_interpolator(self):
"""
Build an interpolator for the samples in this TimeSeries.
Replaces the current interpolator.
"""
if np.size(self.times) == 0:
self.interp = lambda t: None
elif np.size(self.times) == 1:
# - Handle sample for single time step (`interp1d` would cause error)
def single_sample(t):
times = np.array(t).flatten()
samples = np.empty((times.size, self.num_channels))
samples.fill(np.nan)
if self._fill_value == "extrapolate":
assign_val = np.logical_and(
self.t_start <= times, times <= self.t_stop
)
else:
assign_val = times == self.times[0]
samples[assign_val] = self.samples[0]
return samples
self.interp = single_sample
else:
# - Construct interpolator
self.interp = spint.interp1d(
self._times,
self._samples,
kind=self._interp_kind,
axis=0,
assume_sorted=True,
bounds_error=False,
copy=False,
fill_value=self._fill_value,
)
[docs] def _interpolate(self, times: Union[int, float, ArrayLike]) -> np.ndarray:
"""
Interpolate the time series to the provided time points
:param ArrayLike times: Array of ``T`` desired interpolated time points
:return np.ndarray: Array of interpolated values. Will have the shape ``TxN``, where ``N`` is the number of channels in ``self``
"""
# Make sure `times` is an array
times = np.array(times)
# - Handle empty series
if self.isempty():
return np.zeros((np.size(times), 0))
# - Enforce periodicity
if self.periodic and self.duration > 0:
times = (times - self._t_start) % self.duration + self._t_start
# Time points that define the range of the interpolator
t_first = self.times[0]
t_last = self.t_stop if self._interp_kind == "previous" else self.times[-1]
# Time points outside of this range
is_early = np.array(times < t_first)
is_late = np.array(times > t_last)
# - Correct time points that are slightly out of range
if self.approx_limit_times:
tol = min(_TOLERANCE_ABSOLUTE, _TOLERANCE_RELATIVE * self.duration)
# Find values in `times` that are slightly before first or slightly after
# last sample
t_first_approx = t_first - tol
t_last_approx = t_last + tol
set_t_first = np.logical_and(is_early, times >= t_first_approx)
set_t_last = np.logical_and(is_late, times <= t_last_approx)
times = np.where(set_t_first, t_first, times)
times = np.where(set_t_last, t_last, times)
if np.logical_or(set_t_first, set_t_last).any():
warn(
f"TSContinuous `{self.name}`: Some of the requested time points "
+ "were slightly outside the time range of this series (by at "
+ f"most {tol} s) and were approximated by "
+ f"the first or last time point of this series. To prevent this "
+ f"behavior, set the `approx_limit_times` attribute to `False`."
)
is_early[set_t_first] = False
is_late[set_t_last] = False
# - Warn or throw exception if output contains `NaN`s
if is_early.any() or is_late.any():
error_msg = (
f"TSContinuous `{self.name}`: Some of the requested time points are "
+ "beyond the first and last time points of this series and cannot "
+ "be sampled.\n"
+ "If you think that this is due to rounding errors, try setting "
+ "the `approx_limit_times` attribute to `True`.\n"
)
if self.beyond_range_exception:
raise ValueError(
error_msg
+ "If you want to sample at these time points anyway, you can "
+ "set the `beyond_range_exception` attribute of this time series "
+ "to `False` and will receive `NaN` as values."
)
else:
warn(
error_msg
+ "Will return `NaN` for these time points."
+ "To raise a ValueError in situations like this, set the "
+ "`beyond_range_exception` attribute of this time series to `True`."
)
samples = np.reshape(self.interp(times), (-1, self.num_channels))
# - Catch invalid times, replace with NaN
invalid_times = np.logical_or(
np.array(times) < self.t_start, np.array(times) > self.t_stop
)
samples[invalid_times, :] = np.nan
# - Return the sampled data
return samples
[docs] def _compatible_shape(self, other_samples) -> np.ndarray:
"""
Attempt to make ``other_samples`` a compatible shape to ``self.samples``.
:param ArrayLike other_samples: Samples to convert
:return np.ndarray: Array the same shape as ``self.samples``
:raises: ValueError if broadcast fails
"""
try:
return np.broadcast_to(other_samples, self.samples.shape).copy()
except ValueError:
raise ValueError(
f"TSContinuous `{self.name}`: Input data (shape {other_samples.shape})"
f" could not be broadcast to samples shape ({self.samples.shape})."
)
## -- Magic methods
def __call__(self, times: Union[int, float, ArrayLike]):
"""
ts(tTime1, tTime2, ...) - Interpolate the time series to the provided time points
:param ArrayLike t_time: Scalar, list or ``np.array`` of ``T`` desired interpolated time points
:return np.array: Array of interpolated values. Will have the shape ``TxN``
"""
return self._interpolate(times)
def __getitem__(
self,
# indices_time: Union[ArrayLike, float, slice, None] = None,
# indices_channel: Union[ArrayLike, int, slice, None] = None,
indices: Union[
Tuple[
Union[ArrayLike, float, slice, None], Union[ArrayLike, int, slice, None]
],
ArrayLike,
float,
slice,
None,
] = None,
) -> "TSContinuous":
"""
ts[indices_time, indices_channel] - Interpolate the time series to the provided time points or, if a slice is provided between given limits with given step size. Use channels provided in indices_channel, in matching order.
:param ArrayLike[float] indices_time: float, array-like or slice of T desired interpolated time points
:param ArrayLike[int] indices_channel: int, array-like or slice of desired channels in desired order
:return TSContinuous: TSContinuous with chosen time points and channels
"""
# - Handle format of funciton argument
if isinstance(indices, tuple):
if len(indices) == 0:
indices_time = indices_channel = None
elif len(indices) == 1:
# Assume indices refer to time
indices_time = indices[0]
indices_channel = None
elif len(indices) == 2:
# Both time and channel indices are given
indices_time, indices_channel = indices
else:
raise IndexError(
f"TSContinuous: `{self.name}`: Supports at most 2 indices"
+ " (times and channels)."
)
else:
# Assume indices refer to time
indices_time = indices
indices_channel = None
# - Handle channel indices
if isinstance(indices_channel, slice):
ch_start = 0 if indices_channel.start is None else indices_channel.start
ch_stop = (
self.num_channels
if indices_channel.stop is None
else indices_channel.stop
)
ch_step = 1 if indices_channel.step is None else indices_channel.step
channels = np.arange(ch_start, ch_stop, abs(ch_step))
if ch_step < 0:
# - Invert order of channels
channels = channels[::-1]
else:
channels = indices_channel
# - Handle time indices
if indices_time is None:
indices_time = slice(None)
if isinstance(indices_time, slice):
if indices_time.step is None:
# - Use `self.clip`
return self.clip(
t_start=indices_time.start,
t_stop=indices_time.stop,
channels=channels,
inplace=False,
include_stop=indices_time.stop is None,
sample_limits=False,
)
else:
# - Prepare time points for `self.resample`
t_start: float = (
self.t_start if indices_time.start is None else indices_time.start
)
t_stop: float = (
self.t_stop if indices_time.stop is None else indices_time.stop
)
# - Determine time points at which series is sampled
time_points: np.ndarray = np.arange(
t_start, t_stop, abs(indices_time.step)
)
# - Make sure time points are within limits
time_points = time_points[
np.logical_and(
time_points >= self.t_start, time_points < self.t_stop
)
]
# - Invert order if step is negative
if indices_time.step < 0:
time_points = time_points[::-1]
else:
time_points = indices_time
return self.resample(time_points, channels, inplace=False)
def __repr__(self) -> str:
"""
Return a string representation of this object
:return str: String description
"""
if self.isempty():
beginning: str = f"Empty TSContinuous object `{self.name}` "
else:
beginning: str = "{}periodic TSContinuous object `{}` ".format(
int(not self.periodic) * "non-", self.name
)
return beginning + "from t={} to {}. Samples: {}. Channels: {}".format(
self.t_start, self.t_stop, self.samples.shape[0], self.num_channels
)
# - Iteration
def __iter__(self):
"""
Yield tuples of sample times and values
:yield Tuple[float, float]: Yields a tuple [times, samples]
"""
for t, val in zip(self.times, self.samples):
yield (t, val)
## -- Operator overloading
# - Addition
def __add__(self, other_samples: Union[TimeSeries, Any]) -> TimeSeries:
return self.copy().__iadd__(other_samples)
def __radd__(self, other_samples: TimeSeries) -> TimeSeries:
return self + other_samples
def __iadd__(self, other_samples: Union[TimeSeries, Any]) -> TimeSeries:
if isinstance(other_samples, TSContinuous):
other_samples = self._compatible_shape(other_samples(self.times))
else:
other_samples = self._compatible_shape(other_samples)
# - Treat NaNs as zero
is_nan_self = np.isnan(self.samples)
is_nan_other = np.isnan(other_samples)
self.samples[is_nan_self] = 0
other_samples[is_nan_other] = 0
# - Perform addition
new_samples = self.samples + other_samples
self.samples = new_samples
# - Fill in nans
self.samples[np.logical_and(is_nan_self, is_nan_other)] = np.nan
return self
# - Subtraction
def __sub__(self, other_samples: Union[TimeSeries, Any]) -> TimeSeries:
return self.copy().__isub__(other_samples)
def __rsub__(self, other_samples: TimeSeries) -> TimeSeries:
return -(self - other_samples)
def __isub__(self, other_samples: Union[TimeSeries, Any]) -> TimeSeries:
if isinstance(other_samples, TSContinuous):
other_samples = self._compatible_shape(other_samples(self.times))
else:
other_samples = self._compatible_shape(other_samples)
# - Treat NaNs as zero
is_nan_self = np.isnan(self.samples)
is_nan_other = np.isnan(other_samples)
self.samples[is_nan_self] = 0
other_samples[is_nan_other] = 0
# - Perform subtraction
self.samples -= other_samples
# - Fill in nans
self.samples[np.logical_and(is_nan_self, is_nan_other)] = np.nan
return self
# - Multiplication
def __mul__(self, other_samples: Union[TimeSeries, Any]) -> TimeSeries:
return self.copy().__imul__(other_samples)
def __rmul__(self, other_samples: TimeSeries) -> TimeSeries:
return self * other_samples
def __imul__(self, other_samples: Union[TimeSeries, Any]) -> TimeSeries:
if isinstance(other_samples, TSContinuous):
other_samples = self._compatible_shape(other_samples(self.times))
else:
other_samples = self._compatible_shape(other_samples)
# - Propagate NaNs
is_nan_self = np.isnan(self.samples)
is_nan_other = np.isnan(other_samples)
# - Perform multiplication
self.samples *= other_samples
# - Fill in nans
self.samples[np.logical_or(is_nan_self, is_nan_other)] = np.nan
return self
# - Division
def __truediv__(self, other_samples: Union[TimeSeries, Any]) -> TimeSeries:
return self.copy().__itruediv__(other_samples)
def __rtruediv__(self, other_samples: TimeSeries) -> TimeSeries:
self_copy = self.copy()
self_copy.samples = 1 / self_copy.samples
return self_copy * other_samples
def __itruediv__(self, other_samples: Union[TimeSeries, Any]) -> TimeSeries:
if isinstance(other_samples, TSContinuous):
other_samples = self._compatible_shape(
np.reshape(other_samples(self.times), (np.size(self.times), -1))
)
else:
other_samples = self._compatible_shape(other_samples)
# - Propagate NaNs
is_nan_self = np.isnan(self.samples)
is_nan_other = np.isnan(other_samples)
# - Perform division
self.samples /= other_samples
# - Fill in nans
self.samples[np.logical_or(is_nan_self, is_nan_other)] = np.nan
# - Re-create interpolator
self._create_interpolator()
return self
# - Floor division
def __floordiv__(self, other_samples: Union[TimeSeries, Any]) -> TimeSeries:
return self.copy().__ifloordiv__(other_samples)
def __rfloordiv__(self, other_samples: TimeSeries) -> TimeSeries:
self_copy = self.copy()
self_copy.samples = 1 / self_copy.samples
return self_copy // (1 / other_samples)
def __ifloordiv__(self, other_samples: Union[TimeSeries, Any]) -> TimeSeries:
if isinstance(other_samples, TSContinuous):
other_samples = self._compatible_shape(other_samples(self.times))
else:
other_samples = self._compatible_shape(other_samples)
# - Propagate NaNs
is_nan_self = np.isnan(self.samples)
is_nan_other = np.isnan(other_samples)
# - Perform division
self.samples //= other_samples
# - Fill in nans
self.samples[np.logical_or(is_nan_self, is_nan_other)] = np.nan
return self
# - Matrix multiplication
def __matmul__(self, matrix) -> TimeSeries:
return self.copy().__imatmul__(matrix)
def __rmatmul__(self, matrix) -> TimeSeries:
raise NotImplementedError
def __imatmul__(self, matrix) -> TimeSeries:
self.samples = self.samples @ matrix
return self
# - Exponentiation
def __pow__(self, exponent: Union[TimeSeries, Any]) -> TimeSeries:
return self.copy().__ipow__(exponent)
def __rpow__(self, base: Union[np.ndarray, Any]) -> TimeSeries:
new_series = self.copy()
base = new_series._compatible_shape(base)
# - Propagate NaNs
is_nan_self = np.isnan(new_series.samples)
is_nan_other = np.isnan(base)
# - Perform exponentiation
new_series.samples = base**new_series.samples
# - Fill in nans
new_series.samples[np.logical_or(is_nan_self, is_nan_other)] = np.nan
return new_series
def __ipow__(self, exponent: Union[TimeSeries, Any]) -> TimeSeries:
if isinstance(exponent, TSContinuous):
exponent = self._compatible_shape(exponent(self.times))
else:
exponent = self._compatible_shape(exponent)
# - Propagate NaNs
is_nan_self = np.isnan(self.samples)
is_nan_other = np.isnan(exponent)
# - Perform exponentiation
self.samples **= exponent
# - Fill in nans
self.samples[np.logical_or(is_nan_self, is_nan_other)] = np.nan
return self
# - Absolute
def __abs__(self) -> TimeSeries:
self_copy = self.copy()
self_copy.samples = np.abs(self_copy.samples)
return self_copy
# - Negative
def __neg__(self) -> TimeSeries:
self_copy = self.copy()
self_copy.samples = -self_copy.samples
return self_copy
## -- Properties
@property
def samples(self):
"""(ArrayLike[float]) Value of time series at sampled times"""
return self._samples
@samples.setter
def samples(self, new_samples: ArrayLike):
# - Make sure that if assigned empty samples array, number of traces is implicityly
# with as second dimension of `new_samples`
if np.size(new_samples) == 0 and np.ndim(new_samples) < 2:
new_samples = new_samples.reshape(0, 0)
# - Promote to 2d
new_samples = np.atleast_2d(new_samples)
# - Permit a one-dimensional sample input, promote to 2d
if (new_samples.shape[0] == 1) and (np.size(self.times) > 1):
new_samples = np.reshape(new_samples, (np.size(self.times), -1))
# - Check samples for correct size
if new_samples.shape[0] != np.size(self.times):
raise ValueError(
f"TSContinuous `{self.name}`: "
+ "New samples matrix must have the same number of samples as `.times`."
)
# - Store new time trace
self._samples = new_samples
# - Create a new interpolator
self._create_interpolator()
# - Extend setter of times to update interpolator
@property
def times(self):
"""(ArrayLike[float]) Array of sample times"""
return self._times
@times.setter
def times(self, new_times: ArrayLike):
super(TSContinuous, self.__class__).times.fset(self, new_times)
# - Create a new interpolator
self._create_interpolator()
@property
def num_traces(self):
"""(int) Synonymous to ``num_channels``"""
return self.samples.shape[1]
@property
def num_channels(self):
"""(int) Number of channels (dimension of sample vectors) in this TimeSeries object"""
return self.samples.shape[1]
@property
def max(self):
"""(float) Maximum value of time series"""
return np.nanmax(self.samples)
@property
def min(self):
"""(float) Minimum value of time series"""
return np.nanmin(self.samples)
@property
def beyond_range_exception(self):
return self._nan_exception
@beyond_range_exception.setter
def beyond_range_exception(self, raise_exception: bool):
try:
self._nan_exception = bool(raise_exception)
except (TypeError, ValueError):
raise TypeError(
f"TSContinuous `{self.name}`: `beyond_range_exception` must be of boolean type."
)
@property
def approx_limit_times(self):
return self._approx_limit_times
@approx_limit_times.setter
def approx_limit_times(self, approx: bool):
try:
self._approx_limit_times = bool(approx)
except (TypeError, ValueError):
raise TypeError(
f"TSContinuous `{self.name}`: `approx_limit_times` must be of boolean type."
)
@property
def fill_value(self):
return self._fill_value
@fill_value.setter
def fill_value(self, value):
if isinstance(value, str):
if value != "extrapolate":
raise ValueError(
f"TSContinuous `{self.name}`: "
+ "Fill_value` must be either `extrapolate` or a fill value to pass to `scipy.interpolate`."
)
self._fill_value = value
if self.interp is not None:
self._create_interpolator()
### --- Event time series
[docs]class TSEvent(TimeSeries):
"""
Represents a discrete time series, composed of binary events (present or absent). This class is primarily used to represent spike trains or event trains to communicate with spiking neuron layers, or to communicate with event-based computing systems. See :ref:`/basics/time_series.ipynb` for further explanation and examples.
`.TSEvent` supports multiple channels of event time series encapsulated by a single object, as well as periodic time series.
:Examples:
Build a series of several random event times
>>> times = numpy.cumsum(numpy.random.rand(10))
>>> ts = TSEvent(times)
"""
[docs] def __init__(
self,
times: Optional[ArrayLike] = None,
channels: Optional[Union[int, ArrayLike]] = None,
periodic: bool = False,
t_start: Optional[float] = None,
t_stop: Optional[float] = None,
name: Optional[str] = None,
num_channels: Optional[int] = None,
):
"""
Represent discrete events in time
:param Optional[ArrayLike[float]] times: ``Tx1`` vector of event times
:param Optional[ArrayLike[int]] channels: ``Tx1`` vector of event channels (Default: all events are in channel 0)
:param bool periodic: Is this a periodic TimeSeries (Default: False; non-periodic)
:param float t_start: Explicitly specify the start time of this series. If ``None``, then ``times[0]`` is taken to be the start time
:param float t_stop: Explicitly specify the stop time of this series. If ``None``, then ``times[-1]`` is taken to be the stop time
:param Optional[str] name: Name of the time series (Default: None)
:param Optional[int] num_channels: Total number of channels in the data source. If ``None``, the total channel number is taken to be ``max(channels)``
"""
# - Default time trace: empty
if times is None:
times = np.array([])
else:
times = np.atleast_1d(times).flatten().astype(float)
# - Make sure `t_stop` is larger than any provided time point.
if times.size > 0:
if t_stop is None:
raise TypeError(
"If `times` is not `None`, `t_stop` must be a float strictly "
+ "greater than the largest entry in `times`."
)
if np.max(times) >= t_stop:
raise ValueError(
f"`t_stop` (here {t_stop}) must be strictly greater than the "
+ f"largest entry in `times` (here {np.max(times)})."
)
# - Default name: 'unnamed'
name = "unnamed" if name is None else name
# - Default channel: zero
if channels is None or np.size(channels) == 0:
channels = np.zeros(np.size(times))
min_num_ch = min(np.size(times), 1)
# - Handle scalar channel
elif isinstance(channels, int):
min_num_ch = channels + 1
channels = np.array([channels for _ in times])
# - Array-like of channels
else:
if np.size(channels) != np.size(times):
# - Make sure item sizes match
raise ValueError(
f"TSEvent `{name}`: `channels` must have the same number of "
+ "elements as `times`, be an integer or None."
)
else:
min_num_ch = np.amax(channels) + 1
if num_channels is None:
# - Infer number of channels from maximum channel id in channels
num_channels = min_num_ch
else:
if num_channels < min_num_ch:
raise ValueError(
f"TSEvent `{name}`: num_channels must be None or greater than the highest channel ID."
)
# - Initialize superclass
super().__init__(
times=times, periodic=periodic, t_start=t_start, t_stop=t_stop, name=name
)
# - Store total number of channels
self._num_channels = int(num_channels)
# - Store channels
self.channels = np.array(channels, "int").flatten()
[docs] def print(
self,
full: bool = False,
num_first: int = 4,
num_last: int = 4,
limit_shorten: int = 10,
):
"""
Print an overview of the time series and its values
:param bool full: Print all samples of ``self``, no matter how long it is. Default: ``False``
:param int limit_shorten: Print shortened version of ``self`` if it comprises more than ``limit_shorten`` time points and ``full`` is ``False``. Default: 4
:param int num_first: Shortened version of printout contains samples at first ``num_first`` points in ``self.times``. Default: 4
:param int num_last: Shortened version of printout contains samples at last ``num_last`` points in ``self.times``. Default: 4
"""
s = "\n"
if len(self.times) <= 10 or full:
summary = s.join(
[f"{t}: \t {ch}" for t, ch in zip(self.times, self.channels)]
)
else:
summary0 = s.join(
[
f"{t}: \t {ch}"
for t, ch in zip(self.times[:num_first], self.channels[:num_first])
]
)
summary1 = s.join(
[
f"{t}: \t {ch}"
for t, ch in zip(self.times[-num_last:], self.channels[-num_last:])
]
)
summary = summary0 + "\n\t...\n" + summary1
print(self.__repr__() + "\nTime \t Ch.-ID" + "\n" + summary)
## -- Methods for plotting and printing
[docs] def plot(
self,
time_limits: Optional[Tuple[Optional[float], Optional[float]]] = None,
target: Union["mpl.axes.Axes", "hv.Scatter", "hv.Overlay", None] = None,
channels: Union[ArrayLike, int, None] = None,
*args,
**kwargs,
):
"""
Visualise this time series on a scatter plot
:param Optional[float, float] time_limits: Tuple with times between which to plot. Default: plot all times
:param Optional[axis] target: Object to which plot will be added. Default: new plot
:param ArrayLike[int] channels: Channels that are to be plotted. Default: plot all channels
:param args, kwargs: Optional arguments to pass to plotting function
:return: Plot object. Either holoviews Layout, or matplotlib plot
"""
# - Filter spikes by time
if time_limits is None:
t_start = self.t_start
t_stop = self.t_stop
else:
exception_limits = (
f"TSEvent `{self.name}`: `time_limits` must be None or tuple "
+ "of length 2."
)
try:
# - Make sure `time_limits` has correct length
if len(time_limits) != 2:
raise ValueError(exception_limits)
else:
t_start = self.t_start if time_limits[0] is None else time_limits[0]
t_stop = self.t_stop if time_limits[1] is None else time_limits[1]
except TypeError:
raise TypeError(exception_limits)
# - Choose matching events
times, channels = self(t_start, t_stop, channels)
if target is None:
if self._plotting_backend is None:
backend = _global_plotting_backend
else:
backend = self._plotting_backend
if backend == "holoviews":
return (
hv.Scatter((times, channels), *args, **kwargs)
.redim(x="Time", y="Channel")
.relabel(self.name)
)
elif backend == "matplotlib":
# - Add `self.name` as label only if a label is not already present
kwargs["label"] = kwargs.get("label", self.name)
# - Get current axes
ax = plt.gca()
# - Set the ylabel, if it isn't already set
if ax.get_ylabel() == "":
ax.set_ylabel("Channels")
# - Set the xlabel, if it isn't already set
if ax.get_xlabel() == "":
ax.set_xlabel("Time (s)")
# - Set the title, if it isn't already set
if ax.get_title() == "" and self.name != "unnamed":
ax.set_title(self.name)
# - Set the extent of the time axis
ax.set_xlim(self.t_start, self.t_stop)
# - Set the extent of the channels axis
ax.set_ylim(-1, self.num_channels)
# - Plot the curves
return ax.scatter(times, channels, *args, **kwargs)
else:
raise RuntimeError(f"TSEvent: `{self.name}`: No plotting back-end set.")
else:
# - Infer current plotting backend from type of `target`
if _HV_AVAILABLE and isinstance(target, (hv.Curve, hv.Overlay)):
target *= (
hv.Scatter((times, channels), *args, **kwargs)
.redim(x="Time", y="Channel")
.relabel(self.name)
)
return target.relabel(group=self.name)
elif _MPL_AVAILABLE and isinstance(target, mpl.axes.Axes):
# - Add `self.name` as label only if a label is not already present
kwargs["label"] = kwargs.get("label", self.name)
target.scatter(times, channels, *args, **kwargs)
return target
else:
raise TypeError(
f"TSEvent: `{self.name}`: Unrecognized type for `target`. "
+ "It must be matplotlib Axes or holoviews Curve or Overlay and "
+ "the corresponding backend must be installed in your environment."
)
## -- Methods for manipulating timeseries
[docs] def clip(
self,
t_start: Optional[float] = None,
t_stop: Optional[float] = None,
channels: Union[int, ArrayLike, None] = None,
remap_channels: bool = False,
inplace: bool = False,
) -> "TSEvent":
"""
Return a `TSEvent` which is restricted to given time limits and only contains events of selected channels
If time limits are provided, `.t_start` and `.t_stop` attributes of the new time series will correspond to those. If `remap_channels` is ``True``, channels IDs will be mapped to a continuous sequence of integers starting from 0 (e.g. [1, 3, 6]->[0, 1, 2]). In this case `.num_channels` will be set to the number of different channels in ``channels``. Otherwise `.num_channels` will keep its original values, which is also the case for all other attributes. If `inplace` is True, modify ``self`` accordingly.
:param Optional[float] t_start: Time from which on events are returned. Default: `.t_start`
:param Optional[float] t_stop: Time until which events are returned. Default: `.t_stop`
:param Optional[ArrayLike[int]] channels: Channels of which events are returned. Default: All channels
:param bool remap_channels: Map channel IDs to continuous sequence starting from 0. Set `num_channels` to largest new ID + 1. Default: ``False``, do not remap channels
:param bool inplace: Iff ``True``, the operation is performed in place (Default: False)
:return `.TSEvent`: `.TSEvent` containing events from the requested channels
"""
if not inplace:
new_series = self.copy()
else:
new_series = self
# - Extract matching events
time_data, channel_data = new_series(t_start, t_stop, channels)
# - Update new timeseries
new_series._times = time_data
if t_start is not None:
new_series._t_start = t_start
if t_stop is not None:
new_series._t_stop = t_stop
if remap_channels:
if channel_data.size > 0:
# - Set channel IDs to sequence starting from 0
unique_channels, channel_indices = np.unique(
channel_data, return_inverse=True
)
num_channels = unique_channels.size
new_series._channels = np.arange(num_channels)[channel_indices]
else:
new_series._channels = channel_data
new_series._num_channels = (
np.unique(channels).size
if channels is not None
else np.unique(channel_data).size
)
else:
new_series._channels = channel_data
return new_series
[docs] def remap_channels(
self, channel_map: ArrayLike, inplace: bool = False
) -> "TSEvent":
"""
Renumber channels in the :py:class:`.TSEvent`
Maps channels 0..``self.num_channels-1`` to the channels in ``channel_map``.
:param ArrayLike[int] channel_map: List of channels that existing channels should be mapped to, in order. Must be of size ``self.num_channels``.
:param bool inplace: Specify whether operation should be performed in place (Default: ``False``, a copy is returned)
"""
if not inplace:
new_series = self.copy()
else:
new_series = self
channel_map = np.asarray(channel_map)
if not channel_map.size == new_series.num_channels:
raise ValueError(
f"TSEvent `{new_series.name}`: "
+ f"`channel_map` must be of size {new_series.num_channels}."
)
new_series.channels = channel_map[new_series.channels]
return new_series
## -- Methods for finding and extracting data
[docs] def raster(
self,
dt: float,
t_start: float = None,
t_stop: float = None,
num_timesteps: int = None,
channels: np.ndarray = None,
add_events: bool = False,
) -> np.ndarray:
"""
Return a rasterized version of the time series data, where each data point represents a time step
Events are represented in a boolean matrix, where the first axis corresponds to time, the second axis to the channel. Events that happen between time steps are projected to the preceding step. If two events happen during one time step within a single channel, they are counted as one, unless ``add_events`` is ``True``.
Time bins for the raster extend ``[t, t+dt)``, that is **explicitly excluding events that occur at** ``t+dt``. Such events would be included in the following time bin.
To generate a time trace that corresponds to the raster, you can use :py:func:`numpy.arange` as follows::
num_timesteps = np.ceil((t_stop - t_start) / dt)
bin_starts = np.arange(num_timesteps) * dt + t_start
bin_stops = bin_starts + dt
bin_mid = bin_starts + dt/2
Note that the modulo computation is numerically unstable as expressed above. Internally we use a numerically more stable version, with::
def mod(num, div):
return (num - div * np.floor(num/div))
num_timesteps = int(np.ceil((t_stop - t_start) / dt)
:param float dt: Duration of single time step in raster
:param Optional[float] t_start: Time where to start raster. Default: None (use ``self.t_start``)
:param Optional[float] t_stop: Time where to stop raster. This time point is not included in the raster. Default: ``None`` (use ``self.t_stop``. If ``num_timesteps`` is provided, ``t_stop`` is ignored.
:param Optional[int] num_timesteps: Specify number of time steps directly, instead of providing ``t_stop``. Default: ``None`` (use ``t_start``, ``t_stop`` and ``dt`` to determine raster size)
:param Optional[ArrayLike[int]] channels: Channels from which data is to be used. Default: ``None`` (use all channels)
:param bool add_events: If ``True``, return an integer raster containing number of events for each time step and channel. Default: ``False``, merge simultaneous events in a single channel, and return a boolean raster
:param bool endpoint: If ``True``, an extra time bin is added to the raster after ``t_stop``, to ensure that any events occurring at ``t_stop`` are included in the raster. Default: ``False``, do not include events occurring at ``t_stop``.
:return ArrayLike: event_raster Boolean matrix with ``True`` indicating presence of events for each time step and channel. If ``add_events == True``, the raster consists of integers indicating the number of events per time step and channel. First axis corresponds to time, second axis to channel.
"""
# - Numerically stable modulo function
def mod(num, div):
return num - div * np.floor(num / div)
# - Filter time and channels
t_start = self.t_start if t_start is None else t_start
if channels is None:
channels = channels_clip = np.arange(self.num_channels)
elif np.amax(channels) >= self.num_channels:
# - Only use channels that are within range of channels of this timeseries
channels_clip = np.intersect1d(channels, np.arange(self.num_channels))
# - Channels for which series is not defined
channels_undefined = np.setxor1d(channels_clip, channels)
warn(
f"TSEvent `{self.name}` is not defined for some of the channels provided "
+ f"in `channels` argument ({', '.join(channels_undefined.astype(str))}). "
+ f"Will assume that there are no events for these channels."
)
else:
channels_clip = channels
# - Determine t_stop and num_timesteps
if num_timesteps is not None and t_stop is not None:
# - Check that only one of `t_stop` and `num_timesteps` is provided
raise ValueError(
"Only one of `t_stop` and `num_timesteps` may be provided."
)
elif num_timesteps is not None:
# - Use `num_timesteps` to determine `t_stop`
t_stop = t_start + num_timesteps * dt
elif t_stop is None:
# - Use own `t_stop`
t_stop = self.t_stop
# - Compute number of raster timesteps
num_timesteps = int(np.ceil((t_stop - t_start) / dt))
# - Clip the time series to include only the events of interest
series = self.clip(
t_start=t_start,
t_stop=t_stop,
channels=channels_clip,
remap_channels=False,
)
# - Create raster for storing event data
raster_type = int if add_events else bool
event_raster = np.zeros((num_timesteps, channels.size), raster_type)
# - Handle empty time series
if len(series) == 0:
return event_raster
# - Select data according to time base
event_times = series.times
event_channels = series.channels
## -- Convert input events and samples to boolean or integer raster
# - Only consider rasters that have non-zero length
if num_timesteps > 0:
# - Compute indices for event times and filter to valid time bins
time_indices = np.floor((event_times - t_start) / dt).astype(int)
time_indices = time_indices[time_indices < num_timesteps]
if add_events:
# - Accumulate events per time step and channel
for idx_t, idx_ch in zip(time_indices, event_channels):
event_raster[idx_t, idx_ch] += 1
else:
# - Print a warning if there are multiple spikes in one time step and channel
if (
(
np.diff(np.c_[time_indices, event_channels], axis=0)
== np.zeros(2)
)
.all(axis=1)
.any(axis=0)
):
print(
f"TSEvent `{self.name}`: There are channels with multiple events"
+ " per time step. Consider using a smaller `dt` or setting `add_events = True`."
)
# - Mark spiking indices with True
event_raster[time_indices, event_channels] = True
# - Return the raster
return event_raster
[docs] def xraster(
self,
dt: float,
t_start: Optional[float] = None,
t_stop: Optional[float] = None,
num_timesteps: Optional[int] = None,
channels: Optional[np.ndarray] = None,
add_events: Optional[bool] = None,
endpoint: Optional[bool] = None,
) -> np.ndarray:
"""
Generator which ``yield`` s a rasterized time series data, where each data point represents a time step
Events are represented in a boolean matrix, where the first axis corresponds to time, the second axis to the channel. Events that happen between time steps are projected to the preceding one. If two events happen during one time step within a single channel, they are counted as one.
:param float dt: Duration of single time step in raster
:param Optional[float] t_start: Time where to start raster. Default: ``None`` (use ``self.t_start``)
:param Optional[float] t_stop: Time where to stop raster. This time point is not included in the raster. Default: ``None`` (use ``self.t_stop``. If ``num_timesteps`` is provided, ``t_stop`` is ignored.
:param Optional[int] num_timesteps: Specify number of time steps directly, instead of providing ``t_stop``. Default: ``None`` (use ``t_start``, ``t_stop`` and ``dt`` to determine raster size.
:param Optional[ArrayLike[int]] channels: Channels from which data is to be used. Default: ``None`` (use all channels)
:param Optional[bool] add_events: If ``True``, return an integer raster containing number of events for each time step and channel. Default: ``False``, merge simultaneous events in a single channel, and return a boolean raster
:param Optional[bool] endpoint: If ``True``, an extra time bin is added to the raster after ``t_stop``, to ensure that any events occurring at ``t_stop`` are included in the raster. Default: ``False``, do not include events occurring at ``t_stop``.
:yields ArrayLike: event_raster - Boolean matrix with ``True`` indicating presence of events for each time step and channel. If ``add_events == True``, the raster consists of integers indicating the number of events per time step and channel. First axis corresponds to time, second axis to channel.
"""
event_raster = self.raster(
dt=dt,
t_start=t_start,
t_stop=t_stop,
num_timesteps=num_timesteps,
channels=channels,
add_events=add_events,
endpoint=endpoint,
)
yield from event_raster # Yield one row at a time
[docs] @staticmethod
def from_raster(
raster: np.ndarray,
dt: float = 1.0,
t_start: float = 0.0,
t_stop: Optional[float] = None,
name: Optional[str] = None,
periodic: bool = False,
num_channels: Optional[int] = None,
spikes_at_bin_start: bool = False,
) -> "TSEvent":
"""
Create a `.TSEvent` object from a raster array
Given a rasterised event time series, with dimensions [TxC], `~.TSEvent.from_raster` will generate a event
time series as a `.TSEvent` object.
.. rubric:: Example
The following code will generate a Poisson event train with 200 time steps of 1ms each, and 20 channels, with a spiking probability of 10% per time bin::
T = 200
C = 20
dt = 1e-3
spike_prob = 0.1
raster = np.random.rand((T, C)) > spike_prob
spikes_ts = TSEvent.from_raster(raster, dt)
:param np.ndarray raster: An array of events ``(T, C)``. Each row corresponds to a clocked time step of `dt` duration. Each bin contains the number of spikes present in that bin
:param float dt: Duration of each time bin in seconds
:param float t_start: The start time of the first bin in ``raster``. Default: ``0.``
:param float t_stop: The stop time of the time series. Default: the total duration of the provided raster
:param Optional[str] name: The name of the returned time series. Default: ``None``
:param bool periodic: The ``periodic`` flag passed to the new time series
:param Optional[int] num_channels: The ``num_channels`` argument passed to the new time series. Default: ``None``, use the number of channels ``C`` in ``raster``
:param bool spikes_at_bin_start: Iff ``True``, then spikes in ``raster`` are considered to occur at the start of the time bin. If ``False``, then spikes occur half-way through each time bin. Default: ``False``, spikes occur half-way through each time bin.
:return TSEvent: A new `.TSEvent` containing the events in ``raster``
"""
# - Make sure ``raster`` is a numpy array of integer type
raster = np.asarray(raster, int)
# - Reshape if the array is 1d
if len(raster.shape) == 1:
raster = np.atleast_2d(raster).T
# - Compute `t_stop` if not provided
if t_stop is None:
t_stop = raster.shape[0] * dt + t_start
# - Find spike events
spike_present = raster > 0
spikes_per_bin = raster[spike_present]
spikes = np.repeat(np.argwhere(raster), spikes_per_bin, axis=0)
# - Determine the number of channels
num_channels = raster.shape[1] if num_channels is None else num_channels
# - Convert to a new TSEvent object and return
return TSEvent(
spikes[:, 0] * dt + t_start + dt / 2 * int(not spikes_at_bin_start),
spikes[:, 1],
name=name,
periodic=periodic,
num_channels=num_channels,
t_start=t_start,
t_stop=t_stop,
)
[docs] def to_dict(
self,
dtype_times: Union[None, str, type, np.dtype] = None,
dtype_channels: Union[None, str, type, np.dtype] = None,
) -> Dict:
"""
Store data and attributes of this :py:class:`.TSEvent` in a ``Dict``.
:param Union[None, str, type, np.dtype] dtype_times: Data type in which ``times`` are to be returned, for example to save space.
:param Union[None, str, type, np.dtype] dtype_channels: Data type in which ``channels`` are to be returned, for example to save space.
:return: Dict with data and attributes of this :py:class:`.TSEvent`.
"""
if dtype_times is not None:
# Make sure that broadcast values in `times` are not beyond `t_stop` and `t_start`
times = self.times.astype(dtype_times)
# Cannot simply clip `times` because of rounding issues.
t_start = np.clip(self.t_start, None, np.min(times))
res = np.finfo(dtype_times).resolution
t_stop = np.clip(self.t_stop, np.max(times) + res, None)
else:
times = self.times
t_start = self.t_start
t_stop = self.t_stop
if dtype_channels is not None:
if np.iinfo(dtype_channels).max < self.num_channels:
raise ValueError(
f"TSEvent `{self.name}`: type `{dtype_channels}` not sufficient "
+ f"for number of channels in this series ({self.num_channels})."
)
channels = self.channels.astype(dtype_channels)
else:
channels = self.channels
# - Collect attributes in dict
attributes = {
"times": times,
"channels": channels,
"t_start": np.array(t_start),
"t_stop": np.array(t_stop),
"periodic": np.array(self.periodic),
"num_channels": np.array(self.num_channels),
f"name_{self.name}": np.array([]),
f"type_TSEvent": np.array([]), # Indicate that the object is TSEvent
}
# - Some modules add a `trial_start_times` attribute to the object.
if hasattr(self, "trial_start_times"):
attributes["trial_start_times"] = self.trial_start_times
return attributes
[docs] def save(
self,
path: Union[str, Path],
verbose: bool = False,
dtype_times: Union[None, str, type, np.dtype] = None,
dtype_channels: Union[None, str, type, np.dtype] = None,
):
"""
Save this :py:class:`.TSEvent` as an ``npz`` file using :py:meth:`np.savez`
:param str path: Path to save file
:param bool verbose: Print path information after successfully saving.
:param Union[None, str, type, np.dtype] dtype_times: Data type in which `times` are to be stored, for example to save space.
:param Union[None, str, type, np.dtype] dtype_channels: Data type in which `channels` are to be stored, for example to save space.
"""
# - Collect attributes in dict
attributes = self.to_dict(
dtype_times=dtype_times, dtype_channels=dtype_channels
)
# - Write the file
np.savez(path, **attributes)
if verbose:
missing_ending = path.split(".")[-1] != "npz" # np.savez will add ending
print(
"TSEvent `{}` has been stored in `{}`.".format(
self.name, path + missing_ending * ".npz"
)
)
## -- Methods for combining time series
[docs] def append_c(self, other_series: "TSEvent", inplace: bool = False) -> "TSEvent":
"""
Append another time series to ``self`` along the channels axis
The channel IDs in ``other_series`` are shifted by ``self.num_channels``. Event times remain the same.
:param TSEvent other_series: :py:class:`TSEvent` or list of :py:class:`TSEvent` that will be appended to ``self``.
:param bool inplace: Conduct operation in-place (Default: ``False``; create a copy)
:return TSEvent: :py:class:`TSEvent` containing data in ``self``, with other TS appended along the channels axis
"""
# - Create a new time series, or modify this time series
if not inplace:
appended_series = self.copy()
else:
appended_series = self
# - Ensure we have a list of timeseries to work on
if isinstance(other_series, TSEvent):
series_list = [appended_series, other_series]
else:
try:
series_list = [appended_series] + list(other_series)
except TypeError:
raise TypeError(
f"TSEvent `{self.name}`: `other_series` must be `TSEvent` or list thereof."
)
# - Check series class
if not all(isinstance(series, TSEvent) for series in series_list):
raise TypeError(
f"TSEvent `{self.name}`: Can only append `TSEvent` objects."
)
# - Determine t_start and t_stop
t_start_new = min(series.t_start for series in series_list)
t_stop_new = max(series.t_stop for series in series_list)
# - Determine number of channels for each series
nums_channels = [series.num_channels for series in series_list]
# - Shift for each TSEvent's channels
channel_shifts = np.cumsum([0] + nums_channels[:-1])
# - Stop if no non-empty series is left
if not series_list:
return appended_series
# - Merge all samples
times_new = np.concatenate([series.times for series in series_list])
channels_new = np.concatenate(
[
series.channels + shift
for series, shift in zip(series_list, channel_shifts)
]
)
# - Sort on time and merge
sort_indices = np.argsort(times_new)
appended_series._times = times_new[sort_indices]
appended_series._channels = channels_new[sort_indices].astype(int)
appended_series._t_start = t_start_new
appended_series._t_stop = t_stop_new
appended_series._num_channels = int(np.sum(nums_channels))
return appended_series
[docs] def append_t(
self,
other_series: Union[TimeSeries, Iterable[TimeSeries]],
offset: Union[float, Iterable[Union[float, None]], None] = None,
remove_duplicates: bool = False,
inplace: bool = False,
) -> "TSEvent":
"""
Append another time series to this one along the time axis
``t_start`` from ``other_series`` is shifted to ``self.t_stop + offset``.
:param TSEvent other_series: :py:class:`TSEvent` or list of :py:class:`TSEvent` that will be appended to ``self`` along the time axis
:param Optional[float] offset: Scalar or iterable with at least the same number of elements as ``other_series``. If scalar, use same value for all timeseries. Event times from ``other_series`` will be shifted by ``self.t_stop + offset``. Default: 0
:param bool remove_duplicates: If ``True``, duplicate events will be removed from the resulting timeseries. Duplicates can occur if ``offset`` is negative. Default: ``False``, do not remove duplicate events.
:param bool inplace: If ``True``, conduct operation in-place (Default: ``False``; return a copy)
:return TSEvent: :py:class:`TSEvent` containing events from ``self``, with other TS appended in time
"""
# - Ensure we have a list of timeseries to work on
if isinstance(other_series, TSEvent):
other_series = [other_series]
else:
try:
other_series = list(other_series)
except TypeError:
raise TypeError(
f"TSEvent `{self.name}`: `other_series` must be `TSEvent` or list thereof."
)
# - Same for offsets
if not isinstance(offset, collections.abc.Iterable):
# - Handle `None` offsets
if offset is None:
offset = 0
offset_list = [offset] * len(other_series)
else:
offset_list = list(offset)
if len(offset_list) != len(other_series):
warn(
f"TSEvent `{self.name}`: Numbers of provided offsets and TSEvent "
+ "objects do not match. Will ignore excess elements."
)
# - Handle `None` offsets
for i, os in enumerate(offset_list):
if os is None:
offset_list[i] = 0
# - Translate offsets so that they correspond to indiviual delays for each series
# Delay for first appended series:
delay1 = offset_list[0] + self.t_stop - other_series[0].t_start
delay_list = [delay1]
# Add delays for other lists
for prev_series, curr_series, offset in zip(
other_series[:-1], other_series[1:], offset_list[1:]
):
# Time at which previous series stops
stop_previous = delay_list[-1] + prev_series.t_stop
# Delay for current series
delay_list.append(stop_previous + offset - curr_series.t_start)
# - Let self.merge do the rest
try:
return self.merge(
other_series=other_series,
delay=delay_list,
remove_duplicates=remove_duplicates,
inplace=inplace,
)
except TypeError:
# - Provide matching exception
raise TypeError(
f"TSEvent `{self.name}`: Can only append `TSEvent` objects."
)
[docs] def merge(
self,
other_series: Union[TimeSeries, Iterable[TimeSeries]],
delay: Union[float, Iterable[float]] = 0,
remove_duplicates: bool = False,
inplace: bool = False,
) -> "TSEvent":
"""
Merge another :py:class:`TSEvent` into this one so that they may overlap in time
:param TSEvent other_series: :py:class:`TSEvent` or list of :py:class:`TSEvent` to merge into ``self``
:param Union[float, Iterable[float]] delay: Scalar or iterable with at least the number of elements as other_series. If scalar, use same value for all timeseries. Delay ``other_series`` series by this value before merging.
:param bool remove_duplicates: If ``True``, remove duplicate events in resulting timeseries. Default: ``False``, do not remove duplicates.
:param bool inplace: If ``True``, operation will be performed in place (Default: ``False``, return a copy)
:return TSEvent: ``self`` with new samples included
"""
# - Create a new time series, or modify this time series
if not inplace:
merged_series = self.copy()
else:
merged_series = self
# - Ensure we have a list of timeseries to work on
if isinstance(other_series, TSEvent):
series_list = [merged_series, other_series]
else:
try:
series_list = [merged_series] + list(other_series)
except TypeError:
raise TypeError(
f"TSEvent `{self.name}`: `other_series` must be `TSEvent` or list thereof."
)
# - Same for offsets
if not isinstance(delay, collections.abc.Iterable):
delay_list = [0] + [delay] * (len(series_list) - 1)
else:
delay_list = [0] + list(delay)
if len(delay_list) != len(series_list):
warn(
f"TSEvent `{self.name}`: Numbers of provided offsets and TSEvent "
+ "objects do not match. Will ignore excess elements."
)
# - Check series class
if not all(isinstance(series, TSEvent) for series in series_list):
raise TypeError(
f"TSEvent `{self.name}`: Can only merge with `TSEvent` objects."
)
# - Apply delay
series_list = [
series.delay(delay) for series, delay in zip(series_list, delay_list)
]
# - Determine number of channels
merged_series._num_channels = max(series.num_channels for series in series_list)
# - Determine t_start and t_stop
t_start_new = min(series.t_start for series in series_list)
t_stop_new = max(series.t_stop for series in series_list)
# - Merge all samples
times_new = np.concatenate([series.times for series in series_list])
channels_new = np.concatenate([series.channels for series in series_list])
# - Remove events with same times and channels
if remove_duplicates:
times_new, channels_new = np.unique((times_new, channels_new), axis=1)
# - Sort on time and merge
sort_indices = np.argsort(times_new)
merged_series._times = times_new[sort_indices]
merged_series._channels = channels_new[sort_indices].astype(int)
merged_series._t_start = t_start_new
merged_series._t_stop = t_stop_new
return merged_series
## -- Internal methods
[docs] def _matching_channels(
self,
channels: Union[int, ArrayLike, None] = None,
event_channels: Union[int, ArrayLike, None] = None,
) -> np.ndarray:
"""
Return boolean array of which events match a given channel selection
:param ArrayLike[int] channels: Channels of which events are to be indicated ``True``. Default: ``None``, use all channels
:params ArrayLike[int] event_channels: Channel IDs for each event. If not provided (Default: ``None``), then use self._channels
:return ArrayLike[bool]: A matrix ``TxC`` indicating which events match the requested channels
"""
if channels is None:
channels = np.arange(self.num_channels)
else:
# - Check `channels` for validity
if np.asarray(channels).size > 0 and not (
np.min(channels) >= 0 and np.max(channels) < self.num_channels
):
raise IndexError(
f"TSEvent `{self.name}`: `channels` must be between 0 and {self.num_channels}."
)
# - Make sure elements in `channels` are unique for better performance
channels = np.unique(channels)
# - Use a defined list of event channels, if provided
if event_channels is None:
event_channels = self._channels
# - Boolean array of which events match selected channels
include_events = np.isin(event_channels, channels)
return include_events
## -- Magic methods
def __call__(
self,
t_start: Optional[float] = None,
t_stop: Optional[float] = None,
channels: Optional[Union[int, ArrayLike]] = None,
) -> Tuple[np.ndarray, np.ndarray]:
"""
ts(...) - Return events in interval between indicated times, ignoring
events at `t_stop`.
:param Optional[float] t_start: Time from which on events are returned
:param Optional[float] t_stop: Time until which events are returned
:param Optional[Union[int, ArrayLike]] channels: Channels of which events are returned
:return:
np.ndarray Times of events
np.ndarray Channels of events
"""
# - Get default start and end values from time series data
if t_start is None:
t_start: float = self.t_start
if t_stop is None:
t_stop: float = self.t_stop
# - Permit unsorted bounds
if t_stop < t_start:
t_start, t_stop = t_stop, t_start
# - Handle a periodic time series
if self.periodic:
# - Repeat events sufficiently often
all_times = _extend_periodic_times(t_start, t_stop, self)
num_reps = int(np.round(all_times.size / self.channels.size))
all_channels = np.tile(self.channels, num_reps)
else:
all_times = self.times
all_channels = self.channels
# - Events with matching channels
channel_matches = self._matching_channels(channels, all_channels)
# - Ignore events from stop time onwards
choose_events_stop: np.ndarray = all_times < t_stop
# - Extract matching events and return
choose_events: np.ndarray = (
(all_times >= t_start) & (choose_events_stop) & channel_matches
)
return all_times[choose_events], all_channels[choose_events]
def __getitem__(self, ind: Union[ArrayLike, slice, int]) -> "TSEvent":
"""
ts[tTime1, tTime2, ...] - Index the events of `self` by with the argument
and return TSEvent with corresponding events.
Other attributes, including `num_channels` and `duration`
are the same as in `self`.
:return:
np.array of indexed event times
np.array of indexed event channels
"""
indexed_times: np.ndarray = np.atleast_1d(self.times[ind])
indexed_channels: np.ndarray = np.atleast_1d(self.channels[ind])
# - New TSEvent with the selected events
new_series = self.copy()
new_series._times = indexed_times
new_series._channels = indexed_channels
return new_series
def __repr__(self):
"""
__repr__() - Return a string representation of this object
:return: str String description
"""
if self.isempty():
return "Empty `TSEvent` object `{}` from t={} to t={}.".format(
self.name, self.t_start, self.t_stop
)
else:
return "{}periodic `TSEvent` object `{}` from t={} to {}. Channels: {}. Events: {}".format(
int(not self.periodic) * "non-",
self.name,
self.t_start,
self.t_stop,
self.num_channels,
self.times.size,
)
# - Iteration
def __iter__(self):
"""
Yield tuples of event times and channels
:yield Tuple[float, int]: Yields a tuple [times, channels]
"""
for t, ch in zip(self.times, self.channels):
yield (t, ch)
## -- Properties
@property
def channels(self):
"""
(ArrayLike[int]) Event channel indices. A ``Tx1`` vector, where each element ``t`` corresponds to the event time in ``self.times[t]``.
"""
return self._channels
@channels.setter
def channels(self, new_channels: np.ndarray):
new_channels = np.asarray(new_channels)
# - Check size of new data
if np.size(new_channels) != 1 and np.size(new_channels) != np.size(self.times):
raise ValueError(
f"TSEvent `{self.name}`: `new_channels` must be the same size as `times`."
)
# - Handle scalar channel
if np.size(new_channels) == 1:
new_channels = np.repeat(new_channels, np.size(self._times))
# - Update self.num_channels
if new_channels.size > 0:
highest_channel = np.amax(new_channels)
if self.num_channels <= highest_channel:
self.num_channels = highest_channel + 1
# print(
# f"TSEvent `{self.name}`: `num_channels` has been increased "
# + f"to {self.num_channels}."
# )
# - Assign channels
self._channels = new_channels
@property
def num_channels(self):
"""
(int) The maximum number of channels represented by this :py:class:`TSEvent`
"""
return self._num_channels
@num_channels.setter
def num_channels(self, new_num_ch):
if self.channels.size > 0:
min_num_ch = np.amax(self.channels)
else:
min_num_ch = 0
if new_num_ch < min_num_ch:
raise ValueError(
f"TSContinuous `{self.name}`: `num_channels` must be at least {min_num_ch}."
)
else:
self._num_channels = new_num_ch
@property
def times(self):
"""(ArrayLike[float]) Array of sample times"""
return self._times
@times.setter
def times(self, new_times: ArrayLike):
# - Check time trace for correct size
if np.size(new_times) != np.size(self._times):
raise ValueError(
f"TSContinuous `{self.name}`: "
+ "New time trace must have the same number of elements as the original trace."
)
# - Make sure time trace is sorted
if (np.diff(new_times) < 0).any():
raise ValueError(
f"TSContinuous `{self.name}`: "
+ "The time trace must be sorted and not decreasing"
)
# - Store new time trace
self._times = np.atleast_1d(new_times).flatten().astype(float)
if np.size(self._times) > 0:
# - Fix t_start and t_stop
self._t_start = min(self._t_start, new_times[0])
res = np.finfo(self._times.dtype).resolution
self._t_stop = max(self._t_stop, new_times[-1] + res)
@property
def t_stop(self) -> float:
"""(float) Stop time of time series (final sample)"""
return self._t_stop
@t_stop.setter
def t_stop(self, new_stop):
# - Smallest allowed value for new_stop
res = np.finfo(self._times.dtype).resolution
min_stop = self._times[-1] + res if self._times.size > 0 else self._t_start
if new_stop >= min_stop:
self._t_stop = new_stop
elif min_stop - new_stop < _TOLERANCE_ABSOLUTE:
self._t_stop = min_stop
else:
raise ValueError(
"TimeSeries `{}`: t_stop must be greater or equal to {}. It was {}.".format(
self.name, min_stop, new_stop
)
)
### --- Dict-like object to store TimeSeries on disk
class TSDictOnDisk(collections.abc.MutableMapping):
"""
Behaves like a dict. However, if a `TimeSeries` is added, it will be stored in a temporary file to reduce main memory usage.
"""
def __init__(self, data: Union[Dict, "TSDictOnDisk"] = {}):
"""
TSDictOnDisk - dict-like container that stores TimeSeries in temporary files to save memory.
:param Union[Dict, TSDictOnDisk] data: Data with which the object should be instantiatied.
"""
# - Dict to hold non-`TimeSeries` objects, to emulate behavior of a normal dict
self._mapping = {}
# - Dict for handles to temporary files that store `TimeSeries`
self._mapping_ts = {}
# - Add provided data to `self`.
self.update(data)
def insert(self, data: Union[Dict, "TSDictOnDisk"]):
"""
insert - Similar to 'self.update'. The difference is that `update` would store any TimeSeries in `data` in a temporary file, whereas `insert` keeps TimeSeries from `data` in the memory if they are not already in a temporary file (e.g. when including a dict).
:data: Dict or TSDictOnDisk that is to be inserted.
"""
# - Make sure existing keys are overwritten
include_keys = set(data.keys())
remove_keys = include_keys.intersection(self.keys())
for k in remove_keys:
del self[k]
# - Insert `TSDictOnDisk`
if isinstance(data, TSDictOnDisk):
self._mapping.update(data._mapping)
self._mapping_ts.update(data._mapping_ts)
# - Insert dict
else:
self._mapping.update(data)
def __getitem__(self, key: Hashable) -> Any:
"""
dod[key] - Access an object of `self` by its key.
:return:
The object to which the `key` corresponds.
"""
if key in self._mapping_ts:
# - Load `TimeSeries` from temporary file
return load_ts_from_file(self._mapping_ts[key])
else:
# - Return value stored under `key`.
return self._mapping[key]
def __setitem__(self, key: Hashable, value: Any):
"""
dod[key] = value - Add an object to self together with a (hashable) key.
"""
if isinstance(value, TimeSeries):
# - Store `TimeSeries` in a temporary file, whose handle is added as value to `self._mapping_ts`.
self._mapping_ts[key] = TemporaryFile()
value.save(self._mapping_ts[key])
# - Make sure existing keys are overwritten, also in `self._mapping`.
if key in self._mapping:
del self._mapping[key]
else:
# - Store `value` in `self._mapping`.
self._mapping[key] = value
# - Make sure existing keys are overwritten, also in `self._mapping_ts`.
if key in self._mapping_ts:
del self._mapping_ts[key]
def __delitem__(self, key: Hashable):
"""del dod[key] - Delete an object from self by its key."""
# - Delete the object corresponding to `key` from the correct dict.
if key in self._mapping_ts:
del self._mapping_ts[key]
else:
del self._mapping[key]
def __len__(self):
"""len(dod) - Return the total number of objects stored in `self`."""
return len(self._mapping) + len(self._mapping_ts)
def __iter__(self):
"""
for x in dod:... - First iterate over non-`TimeSeries` objects then over
`TimeSeries` that are stored in temporary files.
:yield Any: The objects stored in `self`.
"""
for obj in iter(self._mapping):
yield obj
for obj in iter(self._mapping_ts):
yield obj
def __repr__(self):
"""
Return a string representation of this object
:return str: String description
"""
return (
f"{type(self).__name__}.\nKeys of stored TimeSeries objects:\n"
+ f"{list(self._mapping_ts)}\nOther keys:\n"
+ f"{list(self._mapping)}"
)