Source code for cuqi.pde._pde

from abc import ABC, abstractmethod
import scipy
from inspect import getsource
from scipy.interpolate import interp1d
import numpy as np
from cuqi.utilities import get_non_default_args


[docs] class PDE(ABC): """ Parametrized PDE abstract base class Parameters ----------- PDE_form : callable function Callable function which returns a tuple of the needed PDE components (expected components are explained in the subclasses) observation_map: a function handle A function that takes the PDE solution as input and the returns the observed solution. e.g. `observation_map=lambda u: u**2` or `observation_map=lambda u: u[0]` grid_sol: np.ndarray The grid on which solution is defined grid_obs: np.ndarray The grid on which the observed solution should be interpolated (currently only supported for 1D problems). """
[docs] def __init__(self, PDE_form, grid_sol=None, grid_obs=None, observation_map=None): self.PDE_form = PDE_form self.grid_sol = grid_sol self.grid_obs = grid_obs self.observation_map = observation_map self._stored_non_default_args = None
[docs] @abstractmethod def assemble(self, *args, **kwargs): pass
[docs] @abstractmethod def solve(self): pass
[docs] @abstractmethod def observe(self,solution): pass
def __repr__(self) -> str: msg = "CUQI {}.".format(self.__class__.__name__) if hasattr(self,"PDE_form"): msg+="\nPDE form expression:\n{}".format(getsource(self.PDE_form)) return msg @staticmethod def _compare_grid(grid1,grid2): """Compares two grids and returns if they are equal""" # If one of the grids are none, we assume they are equal if grid1 is None or grid2 is None: return True m, n = len(grid1), len(grid2) if (m == n): equal_arrays = (grid1 == grid2).all() else: equal_arrays = False return equal_arrays @property def _non_default_args(self): """Returns the non-default arguments of the PDE_form function""" if self._stored_non_default_args is None: self._stored_non_default_args = get_non_default_args(self.PDE_form) return self._stored_non_default_args @property def grid_sol(self): if hasattr(self,"_grid_sol"): return self._grid_sol else: return None @grid_sol.setter def grid_sol(self,value): self._grids_equal = self._compare_grid(value,self.grid_obs) self._grid_sol = value @property def grid_obs(self): if hasattr(self,"_grid_obs"): return self._grid_obs else: return None @grid_obs.setter def grid_obs(self,value): if value is None: value = self.grid_sol self._grids_equal = self._compare_grid(value,self.grid_sol) self._grid_obs = value @property def grids_equal(self): return self._grids_equal def _parse_args_add_to_kwargs( self, *args, map_name, **kwargs): """ Private function that parses the input arguments and adds them as keyword arguments matching (the order of) the non default arguments of the pde class. """ # If any args are given, add them to kwargs if len(args) > 0: if len(kwargs) > 0: raise ValueError( + map_name.lower() + " input is specified both as positional and keyword arguments. This is not supported." ) # Check if the number of args does not match the number of # non_default_args of the model if len(args) != len(self._non_default_args): raise ValueError( "The number of positional arguments does not match the number of non-default arguments of " + map_name.lower() + "." ) # Add args to kwargs following the order of non_default_args for idx, arg in enumerate(args): kwargs[self._non_default_args[idx]] = arg # Check kwargs matches non_default_args if set(list(kwargs.keys())) != set(self._non_default_args): error_msg = ( map_name.lower() + f" input is specified by keywords arguments {list(kwargs.keys())} that does not match the non_default_args of " + map_name + f" {self._non_default_args}." ) raise ValueError(error_msg) # Make sure order of kwargs is the same as non_default_args kwargs = {k: kwargs[k] for k in self._non_default_args} return kwargs
[docs] class LinearPDE(PDE): """ Parametrized Linear PDE base class Parameters ----------- PDE_form : callable function Callable function which returns a tuple of the needed PDE components (expected components are explained in the subclasses) linalg_solve: lambda function or function handle linear system solver function to solve the arising linear system with the signature :meth:`x, val1, val2, ...=linalg_solve(A,b,**linalg_solve_kwargs)` where A is the linear operator and b is the right hand side. `linalg_solve_kwargs` is any keywords arguments that the function :meth:`linalg_solve` can take. x is the solution of A*x=b of type `numpy.ndarray`. val1, val2, etc. are optional and can be a one or more values the solver return, e.g. information and number of iterations (for iterative solvers). If linalg_solve is None, :meth:`scipy.linalg.solve` will be used. linalg_solve_kwargs: a dictionary A dictionary of the keywords arguments that linalg_solve can take. kwargs: See :class:`~cuqi.pde.PDE` for the remaining keyword arguments. """
[docs] def __init__(self, PDE_form, linalg_solve=None, linalg_solve_kwargs=None, **kwargs): super().__init__(PDE_form, **kwargs) if linalg_solve is None: linalg_solve = scipy.linalg.solve if linalg_solve_kwargs is None: linalg_solve_kwargs = {} self._linalg_solve = linalg_solve self._linalg_solve_kwargs = linalg_solve_kwargs
def _solve_linear_system(self, A, b, linalg_solve, kwargs): """Helper function that solves the linear system `A*x=b` using the provided solve method `linalg_solve` and its keyword arguments `kwargs`. It then returns the output in the format: `solution`, `info`""" returned_values = linalg_solve(A, b, **kwargs) if isinstance(returned_values, tuple): solution = returned_values[0] info = returned_values[1:] else: solution = returned_values info = None return solution, info
[docs] class SteadyStateLinearPDE(LinearPDE): """Linear steady state PDE. Parameters ----------- PDE_form : callable function Callable function with signature `PDE_form(parameter1, parameter2, ...)` where `parameter1`, `parameter2`, etc. are the Bayesian unknown parameters (the user can choose any names for these parameters, e.g. `a`, `b`, etc.). The function returns a tuple with the discretized differential operator A and right-hand-side b. The types of A and b are determined by what the method :meth:`linalg_solve` accepts as first and second parameters, respectively. kwargs: See :class:`~cuqi.pde.LinearPDE` for the remaining keyword arguments. Example -------- See demo demos/demo24_fwd_poisson.py for an illustration on how to use SteadyStateLinearPDE with varying solver choices. And demos demos/demo25_fwd_poisson_2D.py and demos/demo26_fwd_poisson_mixedBC.py for examples with mixed (Dirichlet and Neumann) boundary conditions problems. demos/demo25_fwd_poisson_2D.py also illustrates how to observe on a specific boundary, for example. """
[docs] def __init__(self, PDE_form, **kwargs): super().__init__(PDE_form, **kwargs)
[docs] def assemble(self, *args, **kwargs): """Assembles differential operator and rhs according to PDE_form""" kwargs = self._parse_args_add_to_kwargs( *args, map_name="assemble", **kwargs ) self.diff_op, self.rhs = self.PDE_form(**kwargs)
[docs] def solve(self): """Solve the PDE and returns the solution and an information variable `info` which is a tuple of all variables returned by the function `linalg_solve` after the solution.""" if not hasattr(self, "diff_op") or not hasattr(self, "rhs"): raise Exception("PDE is not assembled.") return self._solve_linear_system(self.diff_op, self.rhs, self._linalg_solve, self._linalg_solve_kwargs)
[docs] def observe(self, solution): if self.grids_equal: solution_obs = solution else: solution_obs = interp1d(self.grid_sol, solution, kind='quadratic')(self.grid_obs) if self.observation_map is not None: solution_obs = self.observation_map(solution_obs) return solution_obs
[docs] class TimeDependentLinearPDE(LinearPDE): """Time Dependent Linear PDE with fixed time stepping using Euler method (backward or forward). Parameters ----------- PDE_form : callable function Callable function with signature `PDE_form(parameter1, parameter2, ..., t)` where `parameter1`, `parameter2`, etc. are the Bayesian unknown parameters (the user can choose any names for these parameters, e.g. `a`, `b`, etc.) and `t` is the time at which the PDE form is evaluated. The function returns a tuple of (`differential_operator`, `source_term`, `initial_condition`) where `differential_operator` is the linear operator at time `t`, `source_term` is the source term at time `t`, and `initial_condition` is the initial condition. The types of `differential_operator` and `source_term` are determined by what the method :meth:`linalg_solve` accepts as linear operator and right-hand side, respectively. The type of `initial_condition` should be the same type as the solution returned by :meth:`linalg_solve`. time_steps : ndarray An array of the discretized times corresponding to the time steps that starts with the initial time and ends with the final time time_obs : array_like or str If passed as an array_like, it is an array of the times at which the solution is observed. If passed as a string it can be set to `final` to observe at the final time step, or `all` to observe at all time steps. Default is `final`. method: str Time stepping method. Currently two options are available `forward_euler` and `backward_euler`. kwargs: See :class:`~cuqi.pde.LinearPDE` for the remaining keyword arguments Example ----------- See demos/demo34_TimeDependentLinearPDE.py for 1D heat and 1D wave equations. """
[docs] def __init__(self, PDE_form, time_steps, time_obs='final', method='forward_euler', **kwargs): super().__init__(PDE_form, **kwargs) self.time_steps = time_steps self.method = method # Set time_obs if time_obs is None: raise ValueError("time_obs cannot be None") elif isinstance(time_obs, str): if time_obs.lower() == 'final': time_obs = time_steps[-1:] elif time_obs.lower() == 'all': time_obs = time_steps else: raise ValueError("if time_obs is a string, it can only be set " +"to `final` or `all`") self._time_obs = time_obs
@property def method(self): return self._method @property def _non_default_args(self): """Returns the non-default arguments of the PDE_form function""" if self._stored_non_default_args is None: self._stored_non_default_args = get_non_default_args(self.PDE_form) # Remove the time argument from the non-default arguments # since it is provided automatically by `solve` method and is not # an argument to be inferred in Bayesian inference setting. if 't' in self._stored_non_default_args: self._stored_non_default_args.remove('t') return self._stored_non_default_args @method.setter def method(self, value): if value.lower() != 'forward_euler' and value.lower() != 'backward_euler': raise ValueError( "method can be set to either `forward_euler` or `backward_euler`") self._method = value
[docs] def assemble(self, *args, **kwargs): """Assemble PDE""" kwargs = self._parse_args_add_to_kwargs(*args, map_name="assemble", **kwargs) self._parameter_kwargs = kwargs
[docs] def assemble_step(self, t): """Assemble time step at time t""" self.diff_op, self.rhs, self.initial_condition = self.PDE_form( **self._parameter_kwargs, t=t )
[docs] def solve(self): """Solve PDE by time-stepping""" # initialize time-dependent solution self.assemble_step(self.time_steps[0]) u = np.empty((len(self.initial_condition), len(self.time_steps))) u[:, 0] = self.initial_condition if self.method == 'forward_euler': for idx, t in enumerate(self.time_steps[:-1]): dt = self.time_steps[idx+1] - t self.assemble_step(t) u_pre = u[:, idx] u[:, idx+1] = (dt*self.diff_op + np.eye(len(u_pre)))@u_pre + dt*self.rhs # from u at time t, gives u at t+dt info = None if self.method == 'backward_euler': for idx, t in enumerate(self.time_steps[1:]): dt = t - self.time_steps[idx] self.assemble_step(t) u_pre = u[:, idx] A = np.eye(len(u_pre)) - dt*self.diff_op # from u at time t-dt, gives u at t u[:, idx+1], info = self._solve_linear_system( A, u_pre + dt*self.rhs, self._linalg_solve, self._linalg_solve_kwargs) return u, info
[docs] def observe(self, solution): # If observation grid is the same as solution grid and observation time # is the final time step then no need to interpolate if self.grids_equal and np.all(self.time_steps[-1:] == self._time_obs): solution_obs = solution[..., -1] # Interpolate solution in time and space to the observation # time and space else: # Raise error if solution is 2D or 3D in space if len(solution.shape) > 2: raise ValueError("Interpolation of solutions of 2D and 3D "+ "space dimensions based on the provided "+ "grid_obs and time_obs are not supported. "+ "You can, instead, pass a custom "+ "observation_map and pass grid_obs and "+ "time_obs as None.") # Interpolate solution in space and time to the observation # time and space solution_obs = scipy.interpolate.RectBivariateSpline( self.grid_sol, self.time_steps, solution)(self.grid_obs, self._time_obs) # Apply observation map if self.observation_map is not None: solution_obs = self.observation_map(solution_obs) # squeeze if only one time observation if len(self._time_obs) == 1: solution_obs = solution_obs.squeeze() return solution_obs