from abc import ABC, abstractmethod
from typing import Callable, Sequence, Optional, Union
import numpy as np
from scipy.optimize import newton
from pararealml.constraint import Constraint, apply_constraints_along_last_axis
[docs]class NumericalIntegrator(ABC):
"""
A base class for numerical integrators.
"""
[docs] @abstractmethod
def integral(
self,
y: np.ndarray,
t: float,
d_t: float,
d_y_over_d_t: Callable[[float, np.ndarray], np.ndarray],
y_constraint_function: Callable[
[Optional[float]],
Optional[Union[Sequence[Constraint], np.ndarray]]
]) -> np.ndarray:
"""
Estimates the value of y(t + d_t).
:param y: the value of y(t)
:param t: the value of t
:param d_t: the amount of increase in t
:param d_y_over_d_t: a function that returns the value of y'(t) given
t and y
:param y_constraint_function: a function that, given t, returns a
sequence of constraints on the values of the solution containing a
constraint for each element of y
:return: the value of y(t + d_t).
"""
[docs]class ForwardEulerMethod(NumericalIntegrator):
"""
The forward Euler method, an explicit first order Runge-Kutta method.
"""
[docs] def integral(
self,
y: np.ndarray,
t: float,
d_t: float,
d_y_over_d_t: Callable[[float, np.ndarray], np.ndarray],
y_constraint_function: Callable[
[Optional[float]],
Optional[Union[Sequence[Constraint], np.ndarray]]
]) -> np.ndarray:
y_next_constraints = y_constraint_function(t + d_t)
return apply_constraints_along_last_axis(
y_next_constraints,
y + d_t * d_y_over_d_t(t, y))
[docs]class ExplicitMidpointMethod(NumericalIntegrator):
"""
The explicit midpoint method, a second order Runge-Kutta method.
"""
[docs] def integral(
self,
y: np.ndarray,
t: float,
d_t: float,
d_y_over_d_t: Callable[[float, np.ndarray], np.ndarray],
y_constraint_function: Callable[
[Optional[float]],
Optional[Union[Sequence[Constraint], np.ndarray]]
]) -> np.ndarray:
half_d_t = d_t / 2.
y_half_next_constraints = y_constraint_function(t + half_d_t)
y_next_constraints = y_constraint_function(t + d_t)
y_hat = apply_constraints_along_last_axis(
y_half_next_constraints,
y + half_d_t * d_y_over_d_t(t, y))
return apply_constraints_along_last_axis(
y_next_constraints,
y + d_t * d_y_over_d_t(t + half_d_t, y_hat))
[docs]class RK4(NumericalIntegrator):
"""
The RK4 method, an explicit fourth order Runge-Kutta method.
"""
[docs] def integral(
self,
y: np.ndarray,
t: float,
d_t: float,
d_y_over_d_t: Callable[[float, np.ndarray], np.ndarray],
y_constraint_function: Callable[
[Optional[float]],
Optional[Union[Sequence[Constraint], np.ndarray]]
]) -> np.ndarray:
half_d_t = d_t / 2.
y_half_next_constraints = y_constraint_function(t + half_d_t)
y_next_constraints = y_constraint_function(t + d_t)
k1 = d_t * d_y_over_d_t(t, y)
k2 = d_t * d_y_over_d_t(
t + half_d_t,
apply_constraints_along_last_axis(
y_half_next_constraints,
y + k1 / 2.))
k3 = d_t * d_y_over_d_t(
t + half_d_t,
apply_constraints_along_last_axis(
y_half_next_constraints,
y + k2 / 2.))
k4 = d_t * d_y_over_d_t(
t + d_t,
apply_constraints_along_last_axis(
y_next_constraints,
y + k3))
return apply_constraints_along_last_axis(
y_next_constraints,
y + (k1 + 2. * k2 + 2. * k3 + k4) / 6.)
[docs]class ImplicitMethod(NumericalIntegrator, ABC):
"""
A base class for implicit numerical integrators.
"""
def __init__(self, tol: float = 1.48e-8, max_iterations: int = 50):
"""
:param tol: the tolerance value to use for solving the equation for y
at the next time step through the secant method
:param max_iterations: the maximum allowed number of secant method
iterations
"""
if tol < 0.:
raise ValueError('tolerance must be non-negative')
if max_iterations <= 0:
raise ValueError(
'number of maximum iterations must be greater than 0')
self._tol = tol
self._max_iterations = max_iterations
def _solve(
self,
y_next_residual_function: Callable[[np.ndarray], np.ndarray],
y_next_init: np.ndarray) -> np.ndarray:
"""
Solves the implicit equation for y at the next time step.
:param y_next_residual_function: the difference of the left and the
right-hand sides of the equation as a function of y at the next
time step
:param y_next_init: the initial guess for the value of y at the next
time step
:return: y at the next time step
"""
return newton(
y_next_residual_function,
y_next_init,
tol=self._tol,
maxiter=self._max_iterations)
[docs]class BackwardEulerMethod(ImplicitMethod):
"""
The backward Euler method, an implicit first order Runge-Kutta method.
"""
def __init__(self, tol: float = 1.48e-8, max_iterations: int = 50):
"""
:param tol: the tolerance value to use for solving the equation for y
at the next time step through the secant method
:param max_iterations: the maximum allowed number of secant method
iterations
"""
super(BackwardEulerMethod, self).__init__(tol, max_iterations)
[docs] def integral(
self,
y: np.ndarray,
t: float,
d_t: float,
d_y_over_d_t: Callable[[float, np.ndarray], np.ndarray],
y_constraint_function: Callable[
[Optional[float]],
Optional[Union[Sequence[Constraint], np.ndarray]]
]) -> np.ndarray:
t_next = t + d_t
y_next_constraints = y_constraint_function(t_next)
y_next_init = apply_constraints_along_last_axis(
y_next_constraints,
y + d_t * d_y_over_d_t(t, y))
def y_next_residual_function(y_next: np.ndarray) -> np.ndarray:
return y_next - apply_constraints_along_last_axis(
y_next_constraints,
y + d_t * d_y_over_d_t(t_next, y_next))
return self._solve(y_next_residual_function, y_next_init)
[docs]class CrankNicolsonMethod(ImplicitMethod):
"""
A first order implicit-explicit method combining the forward and backward
Euler methods.
"""
def __init__(
self,
a: float = .5,
tol: float = 1.48e-8,
max_iterations: int = 50):
"""
:param a: the weight of the backward Euler term of the update; the
forward Euler term's weight is 1 - a
:param tol: the tolerance value to use for solving the equation for y
at the next time step through the secant method
:param max_iterations: the maximum allowed number of secant method
iterations
"""
if not (0. <= a <= 1.):
raise ValueError('the value of \'a\' must be between 0 and 1')
self._a = a
self._b = 1. - a
super(CrankNicolsonMethod, self).__init__(tol, max_iterations)
[docs] def integral(
self,
y: np.ndarray,
t: float,
d_t: float,
d_y_over_d_t: Callable[[float, np.ndarray], np.ndarray],
y_constraint_function: Callable[
[Optional[float]],
Optional[Union[Sequence[Constraint], np.ndarray]]
]) -> np.ndarray:
t_next = t + d_t
forward_update = d_t * d_y_over_d_t(t, y)
y_next_constraints = y_constraint_function(t_next)
y_next_init = apply_constraints_along_last_axis(
y_next_constraints, y + forward_update)
def y_next_residual_function(y_next: np.ndarray) -> np.ndarray:
return y_next - apply_constraints_along_last_axis(
y_next_constraints,
y +
self._a * d_t * d_y_over_d_t(t_next, y_next) +
self._b * forward_update)
return self._solve(y_next_residual_function, y_next_init)