"""
Simple (explicit) Runge-Kutta integrators to forward integrate dynamic forward models
"""
from __future__ import print_function
from builtins import zip
from builtins import range
from builtins import object
from abc import ABCMeta, abstractmethod
import torch
from . import utils
import numpy as np
from future.utils import with_metaclass
[docs]class RKIntegrator(with_metaclass(ABCMeta, object)):
"""
Abstract base class for Runge-Kutta integration: x' = f(x(t),u(t),t)
"""
def __init__(self,f,u,pars,params):
"""
Constructor
:param f: function to be integrated
:param u: input to the function
:param pars: parameters to be passed to the integrator
:param params: general ParameterDict() parameters for setting
"""
self.nrOfTimeSteps_perUnitTimeInterval = params[('number_of_time_steps', 10,
'Number of time-steps to per unit time-interval integrate the PDE')]
"""number of time steps for the integration"""
self.f = f
"""Function to be integrated"""
if pars is None:
self.pars = []
else:
self.pars = pars
"""parameters for the integrator"""
if u is None:
self.u = lambda t,pars,vo: []
else:
self.u = u
"""input for integration"""
[docs] def set_pars(self,pars):
self.pars = pars
[docs] def set_number_of_time_steps(self, nr):
"""
Sets the number of time-steps per unit time-interval the integration
:param nr: number of timesteps per unit time-interval
"""
self.nrOfTimeSteps_perUnitTimeInterval = nr
[docs] def get_number_of_time_steps(self):
"""
Returns the number of time steps per unit time-interval that are used for the integration
:return: number of time steps per unit time-interval
"""
return self.nrOfTimeSteps_perUnitTimeInterval
[docs] def get_dt(self):
"""
Returns the time-step
:return: timestep dt
"""
dt = 1.0/self.nrOfTimeSteps_perUnitTimeInterval
return dt
[docs] def solve(self,x,fromT,toT,variables_from_optimizer=None):
"""
Solves the differential equation.
:param x: initial condition for state of the equation
:param fromT: time to start integration from
:param toT: time to end integration
:param variables_from_optimizer: allows passing variables from the optimizer (for example an iteration count)
:return: Returns state, x, at time toT
"""
# arguments need to be list so we can pass multiple variables at the same time
assert type(x)==list
dT = toT-fromT
nr_of_timepoints = int(round(self.nrOfTimeSteps_perUnitTimeInterval*dT))
timepoints = np.linspace(fromT, toT, nr_of_timepoints + 1)
dt = timepoints[1]-timepoints[0]
currentT = fromT
#iter = 0
for i in range(0, nr_of_timepoints):
#print('RKIter = ' + str( iter ) )
#iter+=1
x = self.solve_one_step(x, currentT, dt, variables_from_optimizer)
currentT += dt
#print( x )
return x
def _xpyts(self, x, y, v):
# x plus y times scalar
return [a+b*v for a,b in zip(x,y)]
def _xts(self, x, v):
# x times scalar
return [a*v for a in x]
def _xpy(self, x, y):
return [a+b for a,b in zip(x,y)]
[docs] @abstractmethod
def solve_one_step(self, x, t, dt, variables_from_optimizer=None):
"""
Abstract method to be implemented by the different Runge-Kutta methods to advance one step.
Both x and the output of f are expected to be lists
:param x: initial state
:param t: initial time
:param dt: time increment
:param variables_from_optimizer: allows passing variables from the optimizer (for example an iteration count)
:return: returns the state at t+dt
"""
pass
[docs]class EulerForward(RKIntegrator):
"""
Euler-forward integration
"""
[docs] def solve_one_step(self, x, t, dt, vo=None):
"""
One step for Euler-forward
:param x: state at time t
:param t: initial time
:param dt: time increment
:param vo: variables from optimizer
:return: state at x+dt
"""
#xp1 = [a+b*dt for a,b in zip(x,self.f(t,x,self.u(t)))]
xp1 = self._xpyts(x, self.f(t, x, self.u(t, self.pars, vo), self.pars, vo), dt)
return xp1
[docs]class RK4(RKIntegrator):
"""
Runge-Kutta 4 integration
"""
[docs] def debugging(self,input,t,k):
x = utils.checkNan(input)
if np.sum(x):
print("find nan at {} step".format(t))
print("flag m: {}, location k{}".format(x[0],k))
print("flag phi: {}, location k{}".format(x[1],k))
raise ValueError("nan error")
[docs] def solve_one_step(self, x, t, dt, vo=None):
"""
One step for Runge-Kutta 4
:param x: state at time t
:param t: initial time
:param dt: time increment
:param vo: variables from optimizer
:return: state at x+dt
"""
k1 = self._xts(self.f(t, x, self.u(t, self.pars, vo), self.pars, vo), dt)
#self.debugging(k1,t,1)
k2 = self._xts(self.f(t + 0.5 * dt, self._xpyts(x, k1, 0.5), self.u(t + 0.5 * dt, self.pars, vo), self.pars, vo), dt)
#self.debugging(k2, t, 2)
k3 = self._xts(self.f(t + 0.5 * dt, self._xpyts(x, k2, 0.5), self.u(t + 0.5 * dt, self.pars, vo), self.pars, vo), dt)
#self.debugging(k3, t, 3)
k4 = self._xts(self.f(t + dt, self._xpy(x, k3), self.u(t + dt, self.pars, vo), self.pars, vo), dt)
#self.debugging(k4, t, 4)
# now iterate over all the elements of the list describing state x
xp1 = []
for i in range(len(x)):
xp1.append( x[i] + k1[i]/6. + k2[i]/3. + k3[i]/3. + k4[i]/6. )
return xp1