DaDRA

Source code for dadra.dyn_sys

import numpy as np

from collections.abc import Callable
from functools import partial
from inspect import signature, Parameter
from numpy.random import default_rng

from dadra.disturbance import Disturbance
from dadra.sampling import make_sample_n
from scipy.integrate import odeint


[docs]class System: """Parent class that serves as an interface for dynamic systems objects :param dyn_func: The function defining the dynamics of the system :type dyn_func: function :param state_dim: The degrees of freedom of the system :type state_dim: int :param intervals: The intervals corresponding to the possible values of the initial states of the variables in the system :type intervals: list """ def __init__(self, dyn_func, state_dim, intervals): self.dyn_func = dyn_func self.state_dim = state_dim self.intervals = intervals
[docs] def check_intervals(self): """Checks whether the list of intervals in this instance of the :class:`dadra.DynamicSystem` class is valid :raises ValueError: If there is not an interval defined for each variable :raises ValueError: If not all intervals include an upper and a lower bound :raises ValueError: If not all intervals consist of two numbers """ if len(self.intervals) < self.state_dim: raise ValueError("There must be an interval defined for each variable") for interval in self.intervals: if len(interval) != 2: raise ValueError( "Each interval must include an upper and a lower bound" ) for v in interval: if type(v) == int or type(v) == float: pass else: raise ValueError("Each interval must consist of two numbers")
[docs] def check_dyn_func(self): """Checks whether the dynamic function in this instance of the :class:`dadra.DynamicSystem` class is valid :raises TypeError: If the provided dynamic function is not callable :raises ValueError: If there is an invalid number of required arguments in the dynamic function """ if not isinstance(self.dyn_func, Callable): raise TypeError("A callable dynamic function must be provided") sig = signature(self.dyn_func) num_params = len(sig.parameters) num_required = num_params for p in sig.parameters.values(): if p.default != Parameter.empty or p.kind == Parameter.VAR_POSITIONAL: num_required -= 1 num_extra_intervs = len(self.intervals) - self.state_dim if num_required > (2 + num_extra_intervs): raise ValueError("Invalid number of required arguments in dynamic function")
[docs] def system_sampler(self, x=None): """Obtains sample from specified system :param x: Placeholder variable over which to evaluate, defaults to None :type x: NoneType, optional :return: The sample from the specified system at the last timestep :rtype: numpy.ndarray """ pass
[docs] def sample_system(self, N): """Draws ``N`` samples from the specified system :param N: The number of samples to be drawn :type N: int :return: Array of ``N`` samples :rtype: numpy.ndarray """ pass
[docs]class SimpleSystem(System): """Class implementation of a dynamical system that allows for parallelized sampling. :param dyn_func: The function defining the dynamics of the system :type dyn_func: function :param intervals: The intervals corresponding to the possible values of the initial states of the variables in the system :type intervals: list :param state_dim: The degrees of freedom of the system :type state_dim: int :param timesteps: The number of timesteps over which to compute the sample, defaults to 100 :type timesteps: int, optional :param parts: The number of parts to partition the time interval into for computing the sample, defaults to 1001 :type parts: int, optional :param all_time: If True, each sample will include all timesteps from the system, rather than only the last timestep, defaults to False :type all_time: bool, optional """ def __init__( self, dyn_func, intervals, state_dim, timesteps=100, parts=1001, all_time=False ): """Constructor method""" self.dyn_func = dyn_func self.intervals = intervals self.state_dim = state_dim self.timesteps = timesteps self.parts = parts self.all_time = all_time self.check_intervals() self.check_dyn_func()
[docs] @classmethod def from_list(cls, func_list, intervals, timesteps=100, parts=1001, all_time=False): """Class method that allows for an instance of :class:`dadra.DynamicSystem` to be initialized using a list of functions, one for each variable, to define the dynamics of a system rather than a single function :param func_list: The list of functions, one for each variable, that define the dynamics of the system :type func_list: list :param intervals: The intervals corresponding to the possible values of the initial states of the variables in the system :type intervals: list :param timesteps: The number of timesteps over which to compute the sample, defaults to 100 :type timesteps: int, optional :param parts: The number of parts to partition the time interval into for computing the sample, defaults to 1001 :type parts: int, optional :param all_time: If True, each sample will include all timesteps from the system, rather than only the last timestep, defaults to False :type all_time: bool, optional :raises TypeError: If not all functions in ``func_list`` are callable :raises ValueError: If not all functions in ``func_list`` include parameters for the intial state and time :raises ValueError: If not all functions in ``func_list`` include parameters for the intial state, time, and the provided additional arguments :return: A :class:`dadra.DynamicSystem` object :rtype: :class:`dadra.DynamicSystem` """ state_dim = len(func_list) num_intervs = len(intervals) num_extra_intervs = num_intervs - state_dim for func in func_list: if not isinstance(func, Callable): raise TypeError( "There must be a callable function defined for each variable" ) sig = signature(func) num_params = len(sig.parameters) if num_extra_intervs == 0 and num_params != 2: raise ValueError( "Each dynamic function must be a function of the initial state and time" ) elif num_extra_intervs > 0 and num_params != (2 + num_extra_intervs): raise ValueError( "Each dynamic function must be a function of the initial state, time, and the provided additional arguments" ) dyn_func = partial(SimpleSystem.dyn_to_sys, func_list=func_list) dyn_sys = cls(dyn_func, intervals, state_dim, timesteps, parts, all_time) return dyn_sys
[docs] @staticmethod def dyn_to_sys(arr, t, func_list, *args): """Static method which allows for defining the dynamics of a system from an initial state, the time, and a list of functions for each dimension :param arr: Array input, corresponding to the initial state :type arr: numpy.ndarray :param t: The time :type t: float :param func_list: A list of functions, one for each dimension :type func_list: list :return: The functional dynamics of the system :rtype: list """ if len(args) != 0: derivs = [f(arr, t, *args) for f in func_list] else: derivs = [f(arr, t) for f in func_list] return derivs
[docs] def system_sampler(self, x=None): """Obtains sample from specified system :param x: Placeholder variable over which to evaluate, defaults to None :type x: NoneType, optional :return: The sample from the specified system at the last timestep if the ``all_time`` is False and including all timesteps otherwise :rtype: numpy.ndarray """ t = np.linspace(0, self.timesteps, self.parts) ru = default_rng().uniform rand_intervals = [ru(lower, upper) for lower, upper in self.intervals] initial_state = np.array(rand_intervals[: self.state_dim]) extra_intervs = tuple(rand_intervals[self.state_dim :]) if len(extra_intervs) != 0: sol = odeint(self.dyn_func, initial_state, t, args=extra_intervs) else: sol = odeint(self.dyn_func, initial_state, t) if not self.all_time: return sol[-1] else: return sol
[docs] def sample_system(self, N): """Draws ``N`` samples from the specified system :param N: The number of samples to be drawn :type N: int :return: Array of ``N`` samples :rtype: numpy.ndarray """ return make_sample_n(self.system_sampler)(N)
[docs]class DisturbedSystem(System): def __init__( self, dyn_func, intervals, state_dim, disturbance: Disturbance, timesteps=100, parts=1001, all_time=False, ): """Class implementation of a dynamical system with disturbance that allows for parallelized sampling :param dyn_func: The function defining the dynamics of the system :type dyn_func: function :param intervals: The intervals corresponding to the possible values of the initial states of the variables in the system :type intervals: list :param state_dim: The degrees of freedom of the system :type state_dim: int :param disturbance: The disturbance added to the system :type disturbance: :class:`dadra.Disturbance` :type timesteps: int, optional :param parts: The number of parts to partition the time interval into for computing the sample, defaults to 1001 :type parts: int, optional :param all_time: If True, each sample will include all timesteps from the system, rather than only the last timestep, defaults to False :type all_time: bool, optional """ self.dyn_func = dyn_func self.intervals = intervals self.state_dim = state_dim self.disturbance = disturbance self.timesteps = timesteps self.parts = parts self.all_time = all_time self.check_intervals() self.check_dyn_func()
[docs] def system_sampler(self, x=None): """Obtains sample from specified system with disturbance :param x: Placeholder variable over which to evaluate, defaults to None :type x: NoneType, optional :return: The sample from the specified system at the last timestep if the ``all_time`` is False and including all timesteps otherwise """ t = np.linspace(0, self.timesteps, self.parts) ru = default_rng().uniform initial_state = np.array([ru(lower, upper) for lower, upper in self.intervals]) self.disturbance.draw_alphas() sol = odeint(self.dyn_func, initial_state, t, args=tuple([self.disturbance])) if not self.all_time: return sol[-1] else: return sol
[docs] def sample_system(self, N): """Draws ``N`` samples from the specified system :param N: The number of samples to be drawn :type N: int :return: Array of ``N`` samples :rtype: numpy.ndarray """ return make_sample_n(self.system_sampler)(N)
[docs]class Sampler(System): """class implementation of a dynamical system with a means of sampling the system specified by the user :param sample_fn: A function to sample from :type sample_fn: function :param state_dim: The degrees of freedom of the system :type state_dim: int :param timesteps: The number of timesteps over which to compute the sample, defaults to 100 :type timesteps: int, optional :param parts: The number of parts to partition the time interval into for computing the sample, defaults to 1001 :type parts: int, optional :param all_time: If True, each sample will include all timesteps from the system, rather than only the last timestep, defaults to False :type all_time: bool, optional """ def __init__(self, sample_fn, state_dim, timesteps, parts, all_time): # self.sample_system = make_sample_n(sample_fn) self.sample_fn = sample_fn self.state_dim = state_dim self.timesteps = timesteps self.parts = parts self.all_time = all_time
[docs] def sample_system(self, N): """Draws ``N`` samples from the specified system :param N: The number of samples to be drawn :type N: int :return: Array of ``N`` samples :rtype: numpy.ndarray """ return make_sample_n(self.sample_fn)(N)