Source code for pararealml.operators.ml.auto_regression.auto_regression_operator

from __future__ import annotations

from typing import Tuple, Callable, Optional, Protocol, List

import numpy as np
from sklearn.metrics import mean_squared_error
from sklearn.model_selection import train_test_split

from pararealml.constrained_problem import ConstrainedProblem
from pararealml.initial_condition import DiscreteInitialCondition
from pararealml.initial_value_problem import InitialValueProblem
from pararealml.operator import Operator, discretize_time_domain
from pararealml.solution import Solution


[docs]class AutoRegressionOperator(Operator): """ A supervised machine learning operator that uses auto regression to model a high fidelity operator for solving initial value problems. """ def __init__( self, d_t: float, vertex_oriented: bool): """ :param d_t: the temporal step size to use :param vertex_oriented: whether the operator is to evaluate the solutions of IVPs at the vertices or cell centers of the spatial meshes """ super(AutoRegressionOperator, self).__init__(d_t, vertex_oriented) self._model: Optional[SKLearnRegressor] = None @property def model(self) -> Optional[SKLearnRegressor]: """ The regression model behind the operator. """ return self._model @model.setter def model(self, model: Optional[SKLearnRegressor]): self._model = model
[docs] def solve( self, ivp: InitialValueProblem, parallel_enabled: bool = True) -> Solution: if self._model is None: raise ValueError('operator has no model') cp = ivp.constrained_problem diff_eq = cp.differential_equation y_shape = cp.y_shape(self._vertex_oriented) inputs = self._create_input_placeholder(cp) t = discretize_time_domain(ivp.t_interval, self._d_t) y = np.empty((len(t) - 1,) + y_shape) y_i = ivp \ .initial_condition \ .discrete_y_0(self._vertex_oriented) \ .reshape((-1, diff_eq.y_dimension)) for i, t_i in enumerate(t[:-1]): inputs[:, -diff_eq.x_dimension - 1] = t_i inputs[:, :-diff_eq.x_dimension - 1] = y_i.reshape((1, -1)) y_i = self._model.predict(inputs) y[i, ...] = y_i.reshape(y_shape) return Solution( ivp, t[1:], y, vertex_oriented=self._vertex_oriented, d_t=self._d_t)
[docs] def generate_data( self, ivp: InitialValueProblem, oracle: Operator, iterations: int, perturbation_function: Callable[[float, np.ndarray], np.ndarray], isolate_perturbations: bool = False ) -> Tuple[np.ndarray, np.ndarray]: """ Generates data to train an operator model by using the oracle to repeatedly solve sub-IVPs with perturbed initial conditions and a time domain extent matching the step size of this operator. :param ivp: the IVP to train the regression model on :param oracle: the operator providing the training data :param iterations: the number of data generation iterations :param perturbation_function: a function that takes a time argument, representing the start of a sub-IVP's time domain, and the discrete initial conditions for the sub-IVP and returns a perturbed version of the initial conditions :param isolate_perturbations: whether to stop perturbations from propagating through to the subsequent sub-IVPs :return: a tuple of the inputs and the target outputs """ if iterations <= 0: raise ValueError('number of iterations must be greater than 0') cp = ivp.constrained_problem diff_eq = cp.differential_equation x_dim = diff_eq.x_dimension y_dim = diff_eq.y_dimension t = discretize_time_domain(ivp.t_interval, self._d_t)[:-1] y_0 = ivp.initial_condition.discrete_y_0(self._vertex_oriented) unperturbed_sub_y0s: List[Optional[np.ndarray]] = [None] * (len(t) - 1) single_time_point_inputs = self._create_input_placeholder(cp) n_spatial_points = single_time_point_inputs.shape[0] single_epoch_inputs = np.tile(single_time_point_inputs, (len(t), 1)) single_epoch_inputs[:, -x_dim - 1] = np.repeat(t, n_spatial_points) inputs = np.tile(single_epoch_inputs, (iterations, 1)) targets = np.empty((inputs.shape[0], y_dim)) for epoch in range(iterations): offset = epoch * n_spatial_points * len(t) y_i = y_0 for i, t_i in enumerate(t): perturbed_y_i = perturbation_function(t_i, y_i) if perturbed_y_i.shape != y_i.shape: raise ValueError( f'perturbed y shape {perturbed_y_i.shape} must match ' f'input y shape {y_i.shape}') sub_ivp = InitialValueProblem( cp, (t_i, t_i + self._d_t), DiscreteInitialCondition( cp, perturbed_y_i, self._vertex_oriented)) y_next = oracle.solve(sub_ivp) \ .discrete_y(self._vertex_oriented)[-1, ...] t_offset = offset + i * n_spatial_points inputs[t_offset:t_offset + n_spatial_points, :-x_dim - 1] = \ perturbed_y_i.reshape((1, -1)) targets[t_offset:t_offset + n_spatial_points, :] = \ y_next.reshape((-1, y_dim)) if isolate_perturbations and i < len(t) - 1: y_next = unperturbed_sub_y0s[i] if y_next is None: sub_ivp = InitialValueProblem( cp, (t_i, t_i + self._d_t), DiscreteInitialCondition( cp, y_i, self._vertex_oriented)) y_next = oracle.solve(sub_ivp) \ .discrete_y(self._vertex_oriented)[-1, ...] unperturbed_sub_y0s[i] = y_next y_i = y_next return inputs, targets
[docs] def fit_model( self, model: SKLearnRegressor, data: Tuple[np.ndarray, np.ndarray], test_size: float = .2, score_func: Callable[[np.ndarray, np.ndarray], float] = mean_squared_error ) -> Tuple[float, float]: """ Fits the regression model to the training share of the provided data points using random splitting, it stores the fitted model as a member variable for solving IVPs, and it returns the loss of the model evaluated on both the training and test data sets. :param model: the regression model to train :param data: a tuple of the inputs and the target outputs :param test_size: the fraction of all data points that should be used for testing :param score_func: the prediction scoring function to use :return: the training and test losses """ x_train, x_test, y_train, y_test = train_test_split( data[0], data[1], train_size=1. - test_size, test_size=test_size) model.fit(x_train, y_train) self._model = model y_train_hat = model.predict(x_train) y_test_hat = model.predict(x_test) train_score = score_func(y_train, y_train_hat) test_score = score_func(y_test, y_test_hat) return train_score, test_score
[docs] def train( self, ivp: InitialValueProblem, oracle: Operator, model: SKLearnRegressor, iterations: int, perturbation_function: Callable[[float, np.ndarray], np.ndarray], isolate_perturbations: bool = False, test_size: float = .2, score_func: Callable[[np.ndarray, np.ndarray], float] = mean_squared_error) -> Tuple[float, float]: """ Fits a regression model to training data generated by the oracle. The inputs of the model are time t, spatial coordinates x, and the values of the solution at all points of the IVP's mesh at time t (for ODEs, no spatial coordinates are included in the features and the solution is evaluated merely at t). The model outputs are the predicted values of the solution at x and t + d_t (for ODEs, it is again just the solution at t + d_t). The training data is generated by using the oracle to repeatedly solve sub-IVPs with perturbed initial conditions and a time domain extent matching the step size of this operator. :param ivp: the IVP to train the regression model on :param oracle: the operator providing the training data :param model: the model to fit to the training data :param iterations: the number of data generation iterations :param perturbation_function: a function that takes a time argument, representing the start of a sub-IVP's time domain, and the discrete initial values for the sub-IVP's solution and returns a perturbed version of the initial values :param isolate_perturbations: whether to stop perturbations from propagating through to the subsequent sub-IVPs :param test_size: the fraction of all data points that should be used for testing :param score_func: the prediction scoring function to use :return: the training and test losses """ data = self.generate_data( ivp, oracle, iterations, perturbation_function, isolate_perturbations=isolate_perturbations) return self.fit_model(model, data, test_size, score_func)
def _create_input_placeholder( self, cp: ConstrainedProblem) -> np.ndarray: """ Creates a placeholder array for the model inputs. If the constrained problem is a PDE, it pre-populates the first x_dimension columns corresponding to the spatial coordinates. :param cp: the constrained problem to base the inputs on :return: the placeholder array for the model inputs """ diff_eq = cp.differential_equation if not diff_eq.x_dimension: return np.empty((1, 1 + diff_eq.y_dimension)) x = cp.mesh.all_index_coordinates(self._vertex_oriented, flatten=True) t = np.empty((len(x), 1)) y = np.empty((len(x), diff_eq.y_dimension * len(x))) return np.hstack([y, t, x])
[docs]class SKLearnRegressor(Protocol): """A protocol class for scikit-learn regression models."""
[docs] def fit( self, x: np.typing.ArrayLike, y: np.typing.ArrayLike, sample_weight: Optional[np.typing.ArrayLike] = None ) -> SKLearnRegressor: ...
[docs] def predict(self, x: np.typing.ArrayLike) -> np.ndarray: ...
[docs] def score( self, x: np.typing.ArrayLike, y: np.typing.ArrayLike, sample_weight: Optional[np.typing.ArrayLike] = None ) -> float: ...