Source code for pararealml.operators.fdm.fdm_operator

from typing import Optional, Tuple, Callable, Dict

import numpy as np

from pararealml.constrained_problem import ConstrainedProblem
from pararealml.constraint import apply_constraints_along_last_axis
from pararealml.differential_equation import Lhs
from pararealml.initial_value_problem import InitialValueProblem
from pararealml.operator import Operator, discretize_time_domain
from pararealml.operators.fdm.numerical_differentiator import \
    NumericalDifferentiator
from pararealml.operators.fdm.fdm_symbol_mapper import FDMSymbolMapper, \
    FDMSymbolMapArg
from pararealml.operators.fdm.numerical_integrator import \
    NumericalIntegrator
from pararealml.solution import Solution

BoundaryConstraintsCache = Dict[
    Optional[float],
    Tuple[Optional[np.ndarray], Optional[np.ndarray]]
]

YConstraintsCache = Dict[Optional[float], Optional[np.ndarray]]


[docs]class FDMOperator(Operator): """ A finite difference method based conventional differential equation solver. """ def __init__( self, integrator: NumericalIntegrator, differentiator: NumericalDifferentiator, d_t: float): """ :param integrator: the differential equation integrator to use :param differentiator: the differentiator to use :param d_t: the temporal step size to use """ super(FDMOperator, self).__init__(d_t, True) self._integrator = integrator self._differentiator = differentiator
[docs] def solve( self, ivp: InitialValueProblem, parallel_enabled: bool = True) -> Solution: cp = ivp.constrained_problem t = discretize_time_domain(ivp.t_interval, self._d_t) y = np.empty((len(t) - 1,) + cp.y_vertices_shape) y_i = ivp.initial_condition.discrete_y_0(True) if not cp.are_all_boundary_conditions_static: init_boundary_constraints = cp.create_boundary_constraints( True, t[0]) init_y_constraints = cp.create_y_vertex_constraints( init_boundary_constraints[0]) apply_constraints_along_last_axis(init_y_constraints, y_i) y_constraints_cache: YConstraintsCache = {} boundary_constraints_cache: BoundaryConstraintsCache = {} y_next = self._create_y_next_function( ivp, y_constraints_cache, boundary_constraints_cache) for i, t_i in enumerate(t[:-1]): y[i] = y_i = y_next(t_i, y_i) if not cp.are_all_boundary_conditions_static: y_constraints_cache.clear() boundary_constraints_cache.clear() return Solution(ivp, t[1:], y, vertex_oriented=True, d_t=self._d_t)
def _create_y_next_function( self, ivp: InitialValueProblem, y_constraints_cache: YConstraintsCache, boundary_constraints_cache: BoundaryConstraintsCache ) -> Callable[[float, np.ndarray], np.ndarray]: """ Creates a function that returns the value of y(t + d_t) given t and y. :param ivp: the initial value problem :param boundary_constraints_cache: a cache for boundary constraints for different t values :param y_constraints_cache: a cache for overall y constraints for different t values :return: the function defining the value of y at the next time point """ cp = ivp.constrained_problem eq_sys = cp.differential_equation.symbolic_equation_system symbol_mapper = FDMSymbolMapper(cp, self._differentiator) d_y_over_d_t_eq_indices = \ eq_sys.equation_indices_by_type(Lhs.D_Y_OVER_D_T) y_eq_indices = eq_sys.equation_indices_by_type(Lhs.Y) y_laplacian_eq_indices = \ eq_sys.equation_indices_by_type(Lhs.Y_LAPLACIAN) y_constraint_func, d_y_constraint_func = \ self._create_constraint_functions( cp, y_constraints_cache, boundary_constraints_cache) def d_y_over_d_t_function(t: float, y: np.ndarray) -> np.ndarray: d_y_over_d_t = np.zeros(y.shape) d_y_over_d_t_rhs = symbol_mapper.map_concatenated( FDMSymbolMapArg(t, y, d_y_constraint_func), Lhs.D_Y_OVER_D_T) d_y_over_d_t[..., d_y_over_d_t_eq_indices] = d_y_over_d_t_rhs return d_y_over_d_t def y_next_function(t: float, y: np.ndarray) -> np.ndarray: y_next = self._integrator.integral( y, t, self._d_t, d_y_over_d_t_function, y_constraint_func) if len(y_eq_indices): y_constraint = y_constraint_func(t + self._d_t) y_constraint = None if y_constraint is None \ else y_constraint[y_eq_indices] y_rhs = symbol_mapper.map_concatenated( FDMSymbolMapArg(t, y, d_y_constraint_func), Lhs.Y) y_next[..., y_eq_indices] = \ apply_constraints_along_last_axis(y_constraint, y_rhs) if len(y_laplacian_eq_indices): y_constraint = y_constraint_func(t + self._d_t) y_constraint = None if y_constraint is None \ else y_constraint[y_laplacian_eq_indices] d_y_constraint = d_y_constraint_func(t + self._d_t) d_y_constraint = None if d_y_constraint is None \ else d_y_constraint[:, y_laplacian_eq_indices] y_laplacian_rhs = symbol_mapper.map_concatenated( FDMSymbolMapArg(t, y, d_y_constraint_func), Lhs.Y_LAPLACIAN) y_next[..., y_laplacian_eq_indices] = \ self._differentiator.anti_laplacian( y_laplacian_rhs, cp.mesh, y_constraint, d_y_constraint) return y_next return y_next_function @staticmethod def _create_constraint_functions( cp: ConstrainedProblem, y_constraints_cache: YConstraintsCache, boundary_constraints_cache: BoundaryConstraintsCache ) -> Tuple[ Callable[[float], Optional[np.ndarray]], Callable[[float], Optional[np.ndarray]] ]: """ Creates two functions that return the constraints on y and the boundary constraints on the spatial derivatives of y with respect to the normals of the boundaries respectively. :param cp: the constrained problems to create the constraint functions for :param boundary_constraints_cache: a cache for boundary constraints for different t values :param y_constraints_cache: a cache for overall y constraints for different t values :return: a tuple of two functions that return the two different constraints given t """ if not cp.differential_equation.x_dimension: return lambda _: None, lambda _: None if cp.are_all_boundary_conditions_static: return lambda _: cp.static_y_vertex_constraints, \ lambda _: cp.static_boundary_vertex_constraints[1] def d_y_constraints_function( t: Optional[float]) -> Optional[np.ndarray]: if t in boundary_constraints_cache: return boundary_constraints_cache[t][1] boundary_constraints = cp.create_boundary_constraints(True, t) boundary_constraints_cache[t] = boundary_constraints return boundary_constraints[1] if not cp.are_there_boundary_conditions_on_y: return lambda _: cp.static_y_vertex_constraints, \ d_y_constraints_function def y_constraints_function( t: Optional[float]) -> Optional[np.ndarray]: if t in y_constraints_cache: return y_constraints_cache[t] if t in boundary_constraints_cache: boundary_constraints = boundary_constraints_cache[t] else: boundary_constraints = \ cp.create_boundary_constraints(True, t) boundary_constraints_cache[t] = boundary_constraints y_constraints = \ cp.create_y_vertex_constraints(boundary_constraints[0]) y_constraints_cache[t] = y_constraints return y_constraints return y_constraints_function, d_y_constraints_function