Source code for cuqi.testproblem._testproblem

import numpy as np
from scipy.linalg import toeplitz
from scipy.sparse import csc_matrix
from scipy.integrate import quad_vec
from scipy.signal import fftconvolve
from scipy.ndimage import convolve1d

import cuqi
from cuqi.model import LinearModel
from cuqi.distribution import Gaussian
from cuqi.problem import BayesianProblem
from cuqi.geometry import Geometry, MappedGeometry, StepExpansion, KLExpansion, KLExpansion_Full, CustomKL, Continuous1D, Continuous2D, Image2D
from cuqi.array import CUQIarray
import warnings



#=============================================================================
class _Deblur(BayesianProblem):
    """
    1D Deblur test problem.

    Parameters
    ------------
    dim : int, default 128
        size of the (dim,dim) deblur problem.

    bounds : len=2 list of int's, default [0,1]
        Lower and upper bounds for the mesh.

    blur_size : int, default 48
        size of blur.

    noise_std : scalar, default 0.1
        Standard deviation of the noise.

    prior : cuqi.distribution.Distribution, default Gaussian
        Distribution of the prior.

    Attributes
    ----------
    data : ndarray
        Generated (noisy) data

    model : cuqi.model.Model
        Deblur forward model

    likelihood : cuqi.likelihood.Likelihood
        Likelihood function.

    prior : cuqi.distribution.Distribution, Default Gaussian
        Distribution of the prior

    exactSolution : ndarray
        Exact solution (ground truth)

    exactData : ndarray
        Noise free data

    mesh : ndarray
        The mesh the model is defined on.

    meshsize : float
        Size of each mesh element.

    Methods
    ----------
    MAP()
        Compute MAP estimate of posterior.
        NB: Requires prior to be defined.

    sample_posterior(Ns)
        Sample Ns samples of the posterior.
        NB: Requires prior to be defined.

    """
    def __init__(self, dim=128, bounds=[0, 1], blur_size=48, noise_std=0.1, prior=None):
        
        warnings.warn("DEPRECATED: Use Deconvolution1D instead.")
        
        # mesh
        mesh = np.linspace(bounds[0], bounds[1], dim)
        meshsize = mesh[1] - mesh[0]

        # set-up computational model kernel
        kernel = lambda x, y, blur_size: blur_size / 2*np.exp(-blur_size*abs((x-y)))   # blurring kernel

        # convolution matrix
        T1, T2 = np.meshgrid(mesh, mesh)
        A = meshsize*kernel(T1, T2, blur_size)
        maxval = A.max()
        A[A < 5e-3*maxval] = 0
        A = csc_matrix(A)   # make A sparse

        # Store forward model
        model = LinearModel(A)
        
        # Prior
        if prior is None:
            prior = Gaussian(np.zeros(dim), 1, name="x")

        # Store data distribution
        data_dist = Gaussian(model(prior), noise_std**2, name="y")
        
        # Generate inverse-crime free data (still same blur size)
        data, f_true, g_true = self._generateData(mesh, kernel, blur_size, data_dist)

        # Likelihood
        likelihood = data_dist.to_likelihood(data)

        #Initialize deblur as BayesianProblem cuqi problem
        super().__init__(likelihood, prior)
        
        #Store other properties
        self.meshsize = meshsize
        self.exactSolution = f_true
        self.exactData = g_true
        self.mesh = mesh
        
    def _generateData(self,mesh,kernel,blur_size,data_dist):

        # f is piecewise constant
        x_min, x_max = mesh[0], mesh[-1]
        vals = np.array([0, 2, 3, 2, 0, 1, 0])
        conds = lambda x: [(x_min <= x) & (x < 0.1), (0.1 <= x) & (x < 0.15), (0.15 <= x) & (x < 0.2),  \
                (0.20  <= x) & (x < 0.25), (0.25 <= x) & (x < 0.3), (0.3 <= x) & (x < 0.6), \
                (0.6 <= x) & (x <= x_max)]
        f_signal = lambda x: np.piecewise(x, conds(x), vals)

        # numerically integrate the convolution
        g_conv = lambda x: quad_vec(lambda y: f_signal(y)*kernel(x, y, blur_size), x_min, x_max)
        # se also np.convolve(kernel(...), f_true, mode='same')

        # true values
        f_true = f_signal(mesh)
        g_true = g_conv(mesh)[0]

        # noisy data
        data = g_true + data_dist(np.zeros(len(mesh))).sample() #np.squeeze(noise.sample(1)) #np.random.normal(loc=0, scale=self.sigma_obs, size=(self.dim))

        return data, f_true, g_true

#=============================================================================
[docs] class Deconvolution1D(BayesianProblem): """ Create a 1D periodic deconvolution test problem defined by the inverse problem .. math:: \mathbf{b} = \mathbf{A}\mathbf{x}, where :math:`\mathbf{b}` is a (noisy) convolved signal, :math:`\mathbf{x}` is a sharp (clean) signal and :math:`\mathbf{A}` is a convolution operator. The convolution operator is defined by specifying a point spread function and boundary conditions and is computed via scipy.ndimage.convolve1D. By default, the matrix representation of the convolution operator is computed and stored. The inputs are padded to fit the boundary conditions. https://docs.scipy.org/doc/scipy/reference/generated/scipy.ndimage.convolve1d.html Parameters ------------ dim : int, default 128 size of the (dim,dim) deconvolution problem PSF : string or ndarray, default 'Gauss' | Determines type of the underlying point spread function (PSF). | Depending if use_legacy is True or False, the following options are available: | 'Gauss' - a 1D Gaussian blur function | 'Moffat' - a 1D Moffat blur function (non-LEGACY only) | 'Defocus' - an out-of-focus 1D blur function (non-LEGACY only) | 'sinc' or 'prolate' - a sinc function (LEGACY only) | 'vonMises' - a periodic version of the Gauss function (LEGACY only) | ndarray - a custom PSF represented as a 1D ndarray. PSF_param : scalar, default depends on PSF | A parameter that determines the shape of the PSF; | the larger the parameter, the larger the blur on the signal. | Ignored if PSF is a ndarray. PSF_size : int, default equal to dim | The size of the PSF. | Ignored if PSF is a ndarray. BC : string, default 'periodic' | Boundary conditions for the convolution. | 'zero' - zero boundary conditions | 'periodic' - periodic boundary conditions | 'Mirror' - Reflected around center of last pixel boundary conditions | 'Reflect' - Reflected around edge of last pixel boundary conditions | 'Nearest' - Replicates last element of boundary phantom : string or ndarray, default 'sinc' | The phantom that is sampled to produce the exact solution (signal). | 'Gauss' - a Gaussian function | 'sinc' - a sinc function | 'vonMises' - a periodic version of the Gauss function | 'square' - a "top hat" function | 'hat' - a triangular hat function | 'bumps' - two bumps | 'derivGauss' - the first derivative of Gauss function | 'pc' - Piece-wise constant phantom | 'skyscraper' - Piece-wise constant phantom with multiple peaks | ndarray - a custom phantom phantom_param : scalar, default depends on phantom | A parameter that determines the horizontal scaling of the | function; the larger the parameter the more horizontally | compressed. Does not apply to phantom = 'bumps', 'pc', 'skyscraper' or ndarray. noise_type : string, default 'gaussian' | The type of noise | "Gaussian" - Additive Gaussian white noise¨ | "scaledGaussian" - Gaussian noise with standard deviation | scaled by magnitude of the data for each | point. noise_std : scalar, default 0.01 Standard deviation of the noise prior : cuqi.distribution.Distribution, Default Gaussian Distribution of the prior use_legacy : bool, Default False If True, use the legacy matrix representation of the forward model. The legacy representation has different choices for the PSF. The data is scaled differently than the non-legacy representation. """
[docs] def __init__(self, dim=128, PSF="gauss", PSF_param=None, PSF_size=None, BC="periodic", phantom="sinc", phantom_param=None, noise_type="gaussian", noise_std=0.01, prior=None, use_legacy=False, ): # Set up forward model if use_legacy: # Legacy matrix representation if BC != "periodic": raise ValueError("Legacy matrix representation only supports periodic boundary conditions") if PSF_size is not None: raise ValueError("Legacy matrix representation does not support PSF_size") A = _getCirculantMatrix(dim, PSF, PSF_param) model = cuqi.model.LinearModel(A, range_geometry=Continuous1D(dim), domain_geometry=Continuous1D(dim)) else: # New convolution operator based on scipy.ndimage.convolve1d Afun = _getConvolutionOperator(dim, PSF, PSF_param, PSF_size, BC) # For efficiency, we create a sparse matrix representation of the forward model, # instead of using the matrix-free representation. For 1D problems, there is # no need to use the matrix-free representation, since the matrix is small. Id = np.eye(dim) A = np.array([Afun(Id[:, i]) for i in range(dim)]) A = csc_matrix(A) # make it sparse model = cuqi.model.LinearModel(A, range_geometry=Continuous1D(dim), domain_geometry=Continuous1D(dim)) # Set up exact solution if isinstance(phantom, np.ndarray): if phantom.ndim != 1 or phantom.shape[0] != dim: raise ValueError("phantom must be a 1D array of length dim") if phantom_param is not None: warnings.warn("phantom_param is ignored when phantom is a ndarray") x_exact = phantom elif isinstance(phantom, str): x_exact = _getExactSolution(dim, phantom, phantom_param) else: raise ValueError(f"Unknown phantom type {phantom}") x_exact = CUQIarray(x_exact, geometry=model.domain_geometry) # Generate exact data y_exact = model.forward(x_exact) # Set up prior if prior is None: prior = cuqi.distribution.Gaussian(np.zeros(dim), 1, name="x") # Define and add noise #TODO: Add Poisson and logpoisson if noise_type.lower() == "gaussian": data_dist = cuqi.distribution.Gaussian(model(prior), noise_std**2, name="y") elif noise_type.lower() == "scaledgaussian": data_dist = cuqi.distribution.Gaussian(model(prior), (y_exact*noise_std)**2, name="y") else: raise NotImplementedError("This noise type is not implemented") # Generate data data = data_dist(x_exact).sample() # Likelihood likelihood = data_dist.to_likelihood(data) # Set up as Bayesian problem super().__init__(likelihood, prior) # Store exact values self.exactSolution = x_exact self.exactData = y_exact self.infoString = "Noise type: Additive {} with std: {}".format(noise_type.capitalize(),noise_std)
def _getConvolutionOperator(dim, PSF, PSF_param, PSF_size, BC): # Boundary condition translation if BC.lower() == "zero": mode = "constant" elif BC.lower() == "periodic": mode = "wrap" elif BC.lower() == "mirror": mode = "mirror" elif BC.lower() == "reflect": mode = "reflect" elif BC.lower() == "nearest": mode = "nearest" else: raise ValueError("Unknown boundary condition") if PSF_size is None: PSF_size = dim # PSF setup if isinstance(PSF, np.ndarray): if PSF.ndim != 1: raise ValueError("PSF must be a 1D array") P = PSF elif isinstance(PSF, str): if PSF.lower() == "gauss": P, _ = _GaussPSF_1D(PSF_size, PSF_param) elif PSF.lower() == "moffat": P, _ = _MoffatPSF_1D(PSF_size, PSF_param) elif PSF.lower() == "defocus": P, _ = _DefocusPSF_1D(PSF_size, PSF_param) else: raise ValueError("Unknown PSF type") # Convolution matrix return lambda x: convolve1d(x, P, mode=mode) def _createPSF_1D(PSF_size, PSF_func): """ Create a 1D normalized PSF of size PSF_size using the function PSF_func. """ # Set up grid points x = np.arange(-np.fix(PSF_size/2), np.ceil(PSF_size/2)) # Compute the PSF PSF = PSF_func(x) # Normalize the PSF. PSF /= PSF.sum() # find the center center = np.where(PSF == PSF.max())[0][0] return PSF, center.astype(int) def _GaussPSF_1D(PSF_size, PSF_param): """ Create a 1D normalized Gaussian PSF of size PSF_size with standard deviation PSF_param. """ # Set default value for PSF_param if PSF_param is None: PSF_param = 10 # Set up Gaussian function PSF_func = lambda x: np.exp( -0.5*((x**2)/(PSF_param**2)) ) return _createPSF_1D(PSF_size, PSF_func) def _MoffatPSF_1D(PSF_size, PSF_param, beta=1): """ Create a 1D normalized Moffat PSF of size PSF_size with standard deviation PSF_param. """ # Set default value for PSF_param if PSF_param is None: PSF_param = 10 # Set up Moffat function PSF_func = lambda x: ( 1 + (x**2)/(PSF_param**2) )**(-beta) return _createPSF_1D(PSF_size, PSF_func) def _DefocusPSF_1D(PSF_size, PSF_param): """ Create a 1D normalized defocus PSF of size PSF_size with standard deviation PSF_param. """ # Set default value for PSF_param if PSF_param is None: PSF_param = 10 center = np.fix(int(PSF_size/2)) if (PSF_param == 0): # the PSF is a delta function and so the blurring matrix is I PSF = np.zeros(PSF_size) PSF[center] = 1 else: PSF = np.ones(PSF_size) / (np.pi * PSF_param**2) k = np.arange(1, PSF_size+1) aa = (k-center)**2 idx = np.array((aa > (PSF_param**2))) PSF[idx] = 0 PSF = PSF / PSF.sum() return PSF, center.astype(int) def _getCirculantMatrix(dim, PSF, PSF_param): """ GetCircMatrix Create a circulant matrix for deconvolution examples _getCirculantMatrix(dim, PSF, PSF_param) Input: dim = size of the circulant matrix A PSF = string that determined type of the underlying PSF 'Gauss' - a Gaussian function 'sinc' or 'prolate' - a sinc function 'vonMises' - a periodic version of the Gauss function ndarray - a custom PSF. PSF_param = a parameter that determines the shape of the PSF; the larger the parameter, to slower the initial decay of the singular values of A Output: circulant matrix Based on Matlab code by Per Christian Hansen, DTU Compute, April 7, 2021 """ if not (dim % 2) == 0: raise NotImplementedError("Circulant matrix not implemented for odd numbers") if isinstance(PSF, np.ndarray): if PSF_param is not None: warnings.warn("PSF_param is ignored when PSF is a ndarray") if PSF.ndim != 1 or PSF.shape[0] != dim: raise ValueError("kernel must be a 1D array of length dim") h = np.roll(PSF, -int(dim/2)) #h = h/np.linalg.norm(h)**2 # TODO: Normalize hflip = np.concatenate((h[0:1], np.flipud(h[1:]))) return toeplitz(hflip,h) dim_half = dim/2 grid = np.arange(dim_half+1)/dim if PSF.lower() == "gauss": if PSF_param is None: PSF_param = 10 h = np.exp(-(PSF_param*grid)**2) h = np.concatenate((h,np.flipud(h[1:-1]))) hflip = np.concatenate((h[0:1], np.flipud(h[1:]))) return toeplitz(hflip,h) elif PSF.lower() == "sinc" or PSF.lower() == "prolate": if PSF_param is None: PSF_param = 15 h = np.sinc(PSF_param*grid) h = np.concatenate((h,np.flipud(h[1:-1]))) hflip = np.concatenate((h[0:1], np.flipud(h[1:]))) return toeplitz(hflip,h) elif PSF.lower() == "vonmises": if PSF_param is None: PSF_param = 5 h = np.exp(np.cos(2*np.pi*grid)) h = (h/h[0])**PSF_param h = np.concatenate((h,np.flipud(h[1:-1]))) hflip = np.concatenate((h[0:1], np.flipud(h[1:]))) return toeplitz(hflip,h) else: raise NotImplementedError(f"PSF {PSF.lower()} is not implemented for the legacy convolution operator.") def _getExactSolution(dim, phantom, phantom_param): """ GetExactSolution Create a periodic solution Input: dim = length of the vector x phantom = the phantom that is sampled to produce x 'Gauss' - a Gaussian function 'sinc' - a sinc function 'vonMises' - a periodic version of the Gauss function 'square' - a "top hat" function 'hat' - a triangular hat function 'bumps' - two bumps 'derivGauss' - the first derivative of Gauss function 'pc' - Piece-wise constant phantom with one tall peak and one wide peak 'skyscraper' - Piece-wise constant phantom with multiple peaks of varying height param = A parameter that determines the horizontal scaling of the function; the larger the parameter the more horizontally compressed. Does not apply to phantom = 'bumps' or ndarray. Output: x = column vector, scaled such that max(x) = 1 Based on Matlab code by Per Christian Hansen, DTU Compute, April 7, 2021 """ if phantom.lower() == "gauss": if phantom_param is None: phantom_param = 5 return np.exp(-(phantom_param*np.linspace(-1,1,dim))**2) elif phantom.lower() == "sinc": if phantom_param is None: phantom_param = 5 return np.sinc(phantom_param*np.linspace(-1,1,dim)) elif phantom.lower() == "vonmises": if phantom_param is None: phantom_param = 5 x = np.exp(np.cos(np.pi*np.linspace(-1,1,dim))) return (x/np.max(x))**phantom_param elif phantom.lower() == "square": if phantom_param is None: phantom_param = 15 if phantom_param < 3: raise ValueError("The 'square' phantom_param must be larger than or equal to 3") x = np.zeros(dim) dimh = int(np.round(dim/2)) w = int(np.round(dim/phantom_param)) x[(dimh-w):(dimh+w)] = 1 return x elif phantom.lower() == "hat": if phantom_param is None: phantom_param = 15 if phantom_param < 3: raise ValueError("The 'hat' phantom_param must be larger than or equal to 3") x = np.zeros(dim) dimh = int(np.round(dim/2)) w = int(np.round(dim/phantom_param)) x[(dimh-w-1):(dimh)] = np.arange(w+1)/w x[(dimh-1):(dimh+w)] = np.flipud(np.arange(w+1))/w return x elif phantom.lower() == "bumps": if phantom_param is not None: warnings.warn("phantom_param is not used for phantom = 'bumps'") h = np.pi/dim a1 = 1; c1 = 12; t1 = 0.8 a2 = 0.5; c2 = 5; t2 = -0.5 grid = np.linspace(0.5,dim-0.5,dim) x = a1*np.exp(-c1*(-np.pi/2 + grid*h - t1)**2)+a2*np.exp(-c2*(-np.pi/2 + grid*h - t2)**2) return x elif phantom.lower() == "derivgauss": if phantom_param is None: phantom_param = 5 x = np.diff(_getExactSolution(dim+1,'gauss',phantom_param)) return x/np.max(x) elif phantom.lower() == 'pc': if phantom_param is not None: warnings.warn("phantom_param is not used for phantom = 'pc'") mesh = np.linspace(0,1,dim) x_min, x_max = mesh[0], mesh[-1] vals = np.array([0, 2, 3, 2, 0, 1, 0]) conds = lambda x: [(x_min <= x) & (x < 0.1), (0.1 <= x) & (x < 0.15), (0.15 <= x) & (x < 0.2), \ (0.20 <= x) & (x < 0.25), (0.25 <= x) & (x < 0.3), (0.3 <= x) & (x < 0.6), \ (0.6 <= x) & (x <= x_max)] f_signal = lambda x: np.piecewise(x, conds(x), vals) x = f_signal(mesh) return x elif phantom.lower() == 'skyscraper': if phantom_param is not None: warnings.warn("phantom_param is not used for phantom = 'skyscraper'") x_min = 0 x_max = 1 vals = np.array([0, 1.5, 0, 1.3, 0, 0.75, 0, 0.25, 0, 1, 0]) conds = lambda x: [(x_min <= x) & (x < 0.10), (0.10 <= x) & (x < 0.15), (0.15 <= x) & (x < 0.20), \ (0.20 <= x) & (x < 0.25), (0.25 <= x) & (x < 0.35), (0.35 <= x) & (x < 0.38),\ (0.38 <= x) & (x < 0.45), (0.45 <= x) & (x < 0.55), \ (0.55 <= x) & (x < 0.75), (0.75 <= x) & (x < 0.8), (0.8 <= x) & (x <= x_max)] f_fun = lambda x: np.piecewise(x, conds(x), vals) mesh = np.linspace(x_min, x_max, dim) x = f_fun(mesh) return x else: raise NotImplementedError("This phantom is not implemented")
[docs] class Poisson1D(BayesianProblem): """ 1D Poisson test problem. Discretized 1D Poisson equation (steady-state linear PDE). Parameters ------------ dim : int | size of the grid for the poisson problem endpoint : float | Location of end-point of grid. source : lambda function | Function for source term. field_type : str or cuqi.geometry.Geometry | Field type of domain. The accepted values are: | a Geometry object. | "KL": a :class:`cuqi.geometry.KLExpansion` geometry object will be created and set as a domain geometry. | "KL_Full": a :class:`cuqi.geometry.KLExpansion_Full` geometry object will be created and set as a domain geometry. | "Step": a :class:`cuqi.geometry.StepExpansion` geometry object will be created and set as a domain geometry. | "CustomKL": a :class:`cuqi.geometry.CustomKL` geometry object will be created and set as a domain geometry. | None: a :class:`cuqi.geometry.Continuous1D` geometry object will be created and set as a domain geometry. field_params : dict | A dictionary of key word arguments that the underlying geometry accepts. (Passed to the underlying geometry when field type is `KL`, `KL_Full`, `CustomKL`, `Step`). For example, for `Step` field type, the dictionary can be `{"n_steps": 3}`. map : lambda function | Mapping used to modify field. imap : lambda function | Inverse of KL map. SNR : int | Signal-to-noise ratio observation_grid_map : lambda function | Function that takes the grid as input and returns a sub-grid of the nodes where observations are available, e.g. `observation_grid_map = lambda x: x[np.where(x>5.0)]`. Attributes ---------- data : ndarray Generated (noisy) data model : cuqi.model.PDEModel Poisson 1D model prior : cuqi.distribution.Distribution Distribution of the prior likelihood : cuqi.likelihood.Likelihood Likelihood function exactSolution : ndarray Exact solution (ground truth) exactData : ndarray Noise free data Methods ---------- MAP() Compute MAP estimate of posterior. NB: Requires prior to be defined. sample_posterior(Ns) Sample Ns samples of the posterior. NB: Requires prior to be defined. """
[docs] def __init__(self, dim=128, endpoint=1, source=lambda xs: 10*np.exp( -( (xs - 0.5)**2 ) / 0.02), field_type=None, field_params=None, map=None, imap=None, SNR=200, observation_grid_map=None, exactSolution=None): # Prepare PDE form N = dim-1 # Number of solution nodes dx = endpoint/N # step size grid = np.linspace(dx, endpoint, N, endpoint=False) Dx = - np.diag(np.ones(N), 0) + np.diag(np.ones(N-1), 1) #Dx vec = np.zeros(N) vec[0] = 1 Dx = np.concatenate([vec.reshape([1, -1]), Dx], axis=0) Dx /= dx # FD derivative matrix rhs = source(grid) # Grids for model grid_domain = np.linspace(0, endpoint, dim, endpoint=True) grid_range = np.linspace(1./(dim-1), endpoint, dim-1, endpoint=False) # PDE form: LHS(x)u=rhs(x) grid_obs = grid_range if observation_grid_map is not None: grid_obs = observation_grid_map(grid_range) PDE_form = lambda x: (Dx.T @ np.diag(x) @ Dx, rhs) PDE = cuqi.pde.SteadyStateLinearPDE(PDE_form, grid_sol=grid_range, grid_obs=grid_obs) # Set up geometries for model if field_params is None: field_params = {} if isinstance(field_type,Geometry): domain_geometry = field_type elif field_type=="KL": domain_geometry = KLExpansion(grid_domain, **field_params) elif field_type=="KL_Full": domain_geometry = KLExpansion_Full(grid_domain, **field_params) elif field_type=="Step": domain_geometry = StepExpansion(grid_domain, **field_params) elif field_type=="CustomKL": domain_geometry = CustomKL(grid_domain, **field_params) else: domain_geometry = Continuous1D(grid_domain) if map is not None: domain_geometry = MappedGeometry(domain_geometry, map, imap) range_geometry = Continuous1D(grid_obs) # Prepare model model = cuqi.model.PDEModel(PDE,range_geometry,domain_geometry) # Set up exact solution if exactSolution is None: x_exact = np.exp( 5*grid_domain*np.exp(-2*grid_domain)*np.sin(endpoint-grid_domain) ) else: x_exact = exactSolution x_exact = CUQIarray(x_exact, is_par=False, geometry=domain_geometry) # Generate exact data y_exact = model.forward(x_exact,is_par=False) # Add noise to data sigma = np.linalg.norm(y_exact)/SNR sigma2 = sigma*sigma # variance of the observation Gaussian noise data = y_exact + np.random.normal(0, sigma, y_exact.shape) # Bayesian model x = cuqi.distribution.Gaussian(np.zeros(model.domain_dim), 1) y = cuqi.distribution.Gaussian(model, sigma2) # Initialize Deconvolution as BayesianProblem problem super().__init__(y, x, y=data) # Store exact values self.exactSolution = x_exact self.exactData = y_exact
[docs] class Heat1D(BayesianProblem): """ 1D Heat test problem. Discretized Heat equation (time-dependent linear PDE). Parameters ------------ dim : int | size of the grid for the heat problem endpoint : float | Location of end-point of grid. max_time : float | The last time step. field_type : str or cuqi.geometry.Geometry | Field type of domain. The accepted values are: | a Geometry object. | "KL": a :class:`cuqi.geometry.KLExpansion` geometry object will be created and set as a domain geometry. | "KL_Full": a :class:`cuqi.geometry.KLExpansion_Full` geometry object will be created and set as a domain geometry. | "Step": a :class:`cuqi.geometry.StepExpansion` geometry object will be created and set as a domain geometry. | "CustomKL": a :class:`cuqi.geometry.CustomKL` geometry object will be created and set as a domain geometry. | None: a :class:`cuqi.geometry.Continuous1D` geometry object will be created and set as a domain geometry. field_params : dict | A dictionary of key word arguments that the underlying geometry accepts. (Passed to the underlying geometry when field type is `KL`, `KL_Full`, `CustomKL`, `Step`). For example, for `Step` field type, the dictionary can be `{"n_steps": 3}`. map : lambda function | If given, an underlying `MappedGeometry` geometry object is created which applies the mapping on the field, e.g. for log parameterization: `map = lambda x:np.exp(x)`. imap : lambda function | The inverse of the provided map. SNR : int | Signal-to-noise ratio exactSolution : ndarray or CUQIarray | The exact solution of the problem which is the heat model initial condition in this test problem. If provided as None, an exact solution is generated, if provided as ndarray, it is assumed to be function values and it is converted to a CUQIarray. observation_grid_map : lambda function | Function that takes the grid as input and returns a sub-grid of the nodes where observations are available, e.g. `observation_grid_map = lambda x: x[np.where(x>5.0)]`. Attributes ---------- data : ndarray Generated (noisy) data model : cuqi.model.PDEModel Heat 1D model prior : cuqi.distribution.Distribution Distribution of the prior likelihood : cuqi.likelihood.Likelihood Likelihood function exactSolution : ndarray Exact solution (ground truth) exactData : ndarray Noise free data Methods ---------- MAP() Compute MAP estimate of posterior. NB: Requires prior to be defined. sample_posterior(Ns) Sample Ns samples of the posterior. NB: Requires prior to be defined. """
[docs] def __init__(self, dim=128, endpoint=1, max_time=0.2, field_type=None, field_params=None,map=None, imap=None, SNR=200, exactSolution=None, observation_grid_map=None): # Prepare PDE form N = dim # Number of solution nodes dx = endpoint/(N+1) # space step size cfl = 5/11 # the cfl condition to have a stable solution dt_approx = cfl*dx**2 # defining approximate time step size max_iter = int(max_time/dt_approx) # number of time steps Dxx = (np.diag( -2*np.ones(N) ) + np.diag(np.ones(N-1),-1) + np.diag(np.ones(N-1),1))/dx**2 # FD diffusion operator # Grids for model grid_domain = np.linspace(dx, endpoint, N, endpoint=False) grid_range = np.linspace(dx, endpoint, N, endpoint=False) time_steps = np.linspace(0,max_time,max_iter+1,endpoint=True) # PDE form (diff_op, IC, time_steps) grid_obs = grid_range if observation_grid_map is not None: grid_obs = observation_grid_map(grid_obs) def PDE_form(IC, t): return (Dxx, np.zeros(N), IC) PDE = cuqi.pde.TimeDependentLinearPDE( PDE_form, time_steps, grid_sol=grid_domain, grid_obs=grid_obs) # Set up geometries for model if field_params is None: field_params = {} if isinstance(field_type,Geometry): domain_geometry = field_type elif field_type=="KL": domain_geometry = KLExpansion(grid_domain, **field_params) elif field_type=="KL_Full": domain_geometry = KLExpansion_Full(grid_domain, **field_params) elif field_type=="Step": domain_geometry = StepExpansion(grid_domain, **field_params) elif field_type=="CustomKL": domain_geometry = CustomKL(grid_domain, **field_params) else: domain_geometry = Continuous1D(grid_domain) if map is not None: domain_geometry = MappedGeometry(domain_geometry,map,imap) range_geometry = Continuous1D(grid_obs) # Prepare model model = cuqi.model.PDEModel(PDE,range_geometry,domain_geometry) if exactSolution is not None: x_exact = CUQIarray(exactSolution, is_par = False, geometry=domain_geometry) # Set up exact solution else: if field_type=="Step": n_steps = domain_geometry.n_steps x_exact = CUQIarray(domain_geometry.par2fun(np.array(range(n_steps))), is_par=False, geometry=domain_geometry) else: grid_domain = model.domain_geometry.grid x_exact = grid_domain*np.exp(-2*grid_domain)*np.sin(endpoint-grid_domain) x_exact = CUQIarray(x_exact, is_par=False, geometry=domain_geometry) #x_exact = 100*grid_domain*np.exp(-5*grid_domain)*np.sin(endpoint-grid_domain) # Generate exact data y_exact = model.forward(x_exact, is_par=False) # Add noise to data sigma = np.linalg.norm(y_exact)/SNR sigma2 = sigma*sigma # variance of the observation Gaussian noise data = y_exact + np.random.normal(0, sigma, y_exact.shape) # Bayesian model x = cuqi.distribution.Gaussian(np.zeros(model.domain_dim), 1) y = cuqi.distribution.Gaussian(model(x), sigma2) # Initialize Heat1D as BayesianProblem problem super().__init__(y, x, y=data) # Store exact values self.exactSolution = x_exact self.exactData = y_exact self.infoString = f"Noise type: Additive i.i.d. noise with mean zero and signal to noise ratio: {SNR}"
[docs] class Abel1D(BayesianProblem): """ 1D Abel test problem. 1D model of rotationally symmetric computed tomography. Parameters ------------ dim : int size of the grid for the problem endpoint : float Location of end-point of grid. field_type : str or cuqi.geometry.Geometry Field type of domain. KL_map : lambda function Mapping used to modify field. KL_imap : lambda function Inverse of KL map. SNR : int Signal-to-noise ratio Attributes ---------- data : ndarray Generated (noisy) data model : cuqi.model.LinearModel Abel 1D model prior : cuqi.distribution.Distribution Distribution of the prior likelihood : cuqi.likelihood.Likelihood Likelihood function exactSolution : ndarray Exact solution (ground truth) exactData : ndarray Noise free data Methods ---------- MAP() Compute MAP estimate of posterior. NB: Requires prior to be defined. sample_posterior(Ns) Sample Ns samples of the posterior. NB: Requires prior to be defined. """
[docs] def __init__(self, dim=128, endpoint=1, field_type=None, field_params=None, KL_map=None, KL_imap=None, SNR=100): N = dim # number of quadrature points h = endpoint/N # quadrature weight tvec = np.linspace(h/2, endpoint-h/2, N).reshape(1, -1) svec = tvec.reshape(-1, 1) + h/2 tmat = np.tile( tvec, [N, 1] ) smat = np.tile( svec, [1, N] ) idx = np.where(tmat<smat) # only applying the quadrature on 0<x<1 A = np.zeros([N,N]) # Abel integral operator A[idx[0], idx[1]] = h/np.sqrt( np.abs( smat[idx[0], idx[1]] - tmat[idx[0], idx[1]] ) ) # discretization grid = np.linspace(0, endpoint, N) # Geometry if field_params is None: field_params = {} if isinstance(field_type,Geometry): domain_geometry = field_type elif field_type=="KL": domain_geometry = KLExpansion(grid,**field_params) elif field_type=="Step": domain_geometry = StepExpansion(grid,**field_params) elif field_type=="CustomKL": domain_geometry = CustomKL(grid,**field_params) else: domain_geometry = Continuous1D(grid) if KL_map is not None: domain_geometry = MappedGeometry(domain_geometry,KL_map,KL_imap) range_geometry = Continuous1D(grid) # Set up model model = LinearModel(A,range_geometry=range_geometry, domain_geometry=domain_geometry) # Set up exact solution x_exact = np.sin(tvec*np.pi)*np.exp(-2*tvec) x_exact.shape = (dim,) x_exact = CUQIarray(x_exact, is_par=False, geometry=domain_geometry) # Generate exact data y_exact = model.forward(x_exact,is_par=False) # Add noise to data sigma = np.linalg.norm(y_exact)/SNR sigma2 = sigma*sigma # variance of the observation Gaussian noise data = y_exact + np.random.normal(0, sigma, y_exact.shape ) # Bayesian model x = cuqi.distribution.Gaussian(np.zeros(model.domain_dim), 1) y = cuqi.distribution.Gaussian(model(x), sigma2) # Initialize Deconvolution as BayesianProblem problem super().__init__(y, x, y=data) # Store exact values self.exactSolution = x_exact self.exactData = y_exact
class _Deconv_1D(BayesianProblem): """ 1D Deconvolution test problem. Discreate linear problem from blurring kernel. Parameters ------------ dim : int size of the grid for the problem endpoint : float Location of end-point of grid. field_type : str or cuqi.geometry.Geometry Field type of domain. KL_map : lambda function Mapping used to modify field. KL_imap : lambda function Inverse of KL map. SNR : int Signal-to-noise ratio Attributes ---------- data : ndarray Generated (noisy) data model : cuqi.model.LinearModel Deconvolution 1D model prior : cuqi.distribution.Distribution Distribution of the prior likelihood : cuqi.likelihood.Likelihood Likelihood function exactSolution : ndarray Exact solution (ground truth) exactData : ndarray Noise free data Methods ---------- MAP() Compute MAP estimate of posterior. NB: Requires prior to be defined. sample_posterior(Ns) Sample Ns samples of the posterior. NB: Requires prior to be defined. """ def __init__(self, dim=128, endpoint=1, kernel=None, blur_size=48, field_type=None, field_params=None, KL_map=None, KL_imap=None, SNR=100): warnings.warn("DEPRECATED: Use Deconvolution1D instead.") N = dim # number of quadrature points h = endpoint/N # quadrature weight grid = np.linspace(0, endpoint, N) if kernel is None: kernel = lambda x, y, blur_size_var: blur_size_var / 2*np.exp(-blur_size*abs((x-y))) # blurring kernel # convolution matrix T1, T2 = np.meshgrid(grid, grid) A = h*kernel(T1, T2, blur_size) maxval = A.max() A[A < 5e-3*maxval] = 0 A = csc_matrix(A) # make A sparse # discretization if isinstance(field_type,Geometry): domain_geometry = field_type elif field_type=="KL": domain_geometry = KLExpansion(grid,field_params) elif field_type=="Step": domain_geometry = StepExpansion(grid) elif field_type=="CustomKL": domain_geometry = CustomKL(grid,field_params) else: domain_geometry = Continuous1D(grid) if KL_map is not None: domain_geometry = MappedGeometry(domain_geometry,KL_map,KL_imap) range_geometry = Continuous1D(grid) # Set up model model = LinearModel(A,range_geometry=range_geometry, domain_geometry=domain_geometry) # Prior prior = cuqi.distribution.Gaussian(np.zeros(model.domain_dim), 1, geometry=model.domain_geometry, name="x") # Set up exact solution x_exact = prior.sample() # Generate exact data y_exact = model.forward(x_exact) # Add noise to data sigma = np.linalg.norm(y_exact)/SNR sigma2 = sigma*sigma # variance of the observation Gaussian noise data = y_exact + np.random.normal(0, sigma, y_exact.shape) likelihood = cuqi.distribution.Gaussian(model(prior), sigma2, name="y").to_likelihood(data) # Initialize Deconvolution as BayesianProblem problem super().__init__(likelihood, prior) # Store exact values self.exactSolution = x_exact self.exactData = y_exact #=============================================================================
[docs] class Deconvolution2D(BayesianProblem): """ Create a 2D Deconvolution test problem defined by the inverse problem .. math:: \mathbf{b} = \mathbf{A}\mathbf{x}, where :math:`\mathbf{b}` is a (noisy) blurred image, :math:`\mathbf{x}` is a sharp image and :math:`\mathbf{A}` is a convolution operator. The convolution operator is defined by specifying a point spread function and boundary conditions and is computed (matrix-free) via scipy.signal.fftconvolve. The inputs are padded to fit the boundary conditions. https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.fftconvolve.html. Parameters ---------- dim : int | size of the (dim,dim) deconvolution problem PSF : string or ndarray | Determines type of the underlying point spread function (PSF). | 'Gauss' - a Gaussian blur | 'Moffat' - a Moffat blur blur | 'Defocus' - an out-of-focus blur | ndarray - Custom user-specified PSF PSF_param : scalar | A parameter that determines the shape of the PSF; | the larger the parameter, the larger blur on the image. | Ignored if PSF is given as ndarray PSF_size : int | Defines the size of the PSF. The size becomes (PSF_size, PSF_size). | Ignored if PSF is given as ndarray BC : string | Boundary conditions of convolution | 'zero' - Zero boundary | 'periodic' - Periodic boundary | 'Neumann' - Neumann (reflective) boundary | 'Mirror' - Mirrored boundary | 'Nearest' - Replicates last element of boundary phantom : string or ndarray | The phantom (sharp image) that is convolved. | If ndarray it should be a square 2D array representing an image. | The image will automatically be resized to fit the problem size. | If string it should be any 2D phantom defined in cuqi.data. | The string is lowercased and any hyphens are replaced | with underscores to match a method name in cuqi.data. | Examples: | 'astronaut' - a photo of an astronaut. | 'camera' - a photo of a man with a camera. | 'cat' - a photo of a cat. | 'cookie' - cartoon cookie. | 'satellite' - a photo of a satellite. noise_type : string | The type of noise | "Gaussian" - Gaussian white noise¨ | "scaledGaussian" - Gaussian noise where standard deviation is scaled by by data. noise_std : scalar | Standard deviation of the noise. prior : Distribution | Option to set the prior distribution. | If set all components of the posterior | are defined and sampling can be achieved. Attributes ---------- data : CUQIarray Generated (noisy) data model : Model Deconvolution forward model prior : Distribution Distribution of the prior likelihood : Likelihood The likelihood function (automatically computed from data distribution) exactSolution : CUQIarray Exact solution (ground truth) exactData : CUQIarray Noise free data. Methods ------- MAP() Compute MAP estimate of posterior. NB: Requires prior to be defined sample_posterior(Ns) Sample Ns samples of the posterior. NB: Requires prior to be defined """
[docs] def __init__(self, dim=128, PSF="gauss", PSF_param=2.56, PSF_size=21, BC="periodic", phantom="satellite", noise_type="gaussian", noise_std=0.0036, prior = None): # setting up the geometry domain_geometry = Image2D((dim, dim)) range_geometry = Image2D((dim, dim)) # set boundary conditions if (BC.lower() == "neumann"): BC = "symmetric" elif (BC.lower() == "zero"): BC = "constant" # cval = 0 elif (BC.lower() == "nearest"): BC = "edge" # the input is extended by replicating the last pixel elif (BC.lower() == "mirror"): BC = "reflect" # reflecting about the center of the last pixel elif (BC.lower() == "periodic"): BC = "wrap" else: raise TypeError("Unknown BC type") # Set up PSF. if isinstance(PSF, np.ndarray): P = PSF elif isinstance(PSF, str): if PSF.lower() == "gauss": P, _ = _GaussPSF(np.array([PSF_size, PSF_size]), PSF_param) elif PSF.lower() == "moffat": P, _ = _MoffatPSF(np.array([PSF_size, PSF_size]), PSF_param, 1) elif PSF.lower() == "defocus": P, _ = _DefocusPSF(np.array([PSF_size, PSF_size]), PSF_param) else: raise TypeError(f"Unknown PSF: {PSF}.") # build forward model model = cuqi.model.LinearModel(lambda x: _proj_forward_2D(x, P, BC), lambda x: _proj_backward_2D(x, P, BC), range_geometry, domain_geometry) # User provided phantom as ndarray if isinstance(phantom, np.ndarray): if phantom.ndim > 2: raise ValueError("Input phantom image must be an image (matrix) or vector") if phantom.ndim == 1: N = int(round(np.sqrt(len(phantom)))) phantom = phantom.reshape(N,N) phantom = cuqi.data.imresize(phantom, dim) # Resize phantom (if wrong size) x_exact2D = phantom # If phantom is string its a specific case elif isinstance(phantom, str): # lowercase and replace hyphens with underscores to match library method names phantom = phantom.lower().replace("-", "_") if hasattr(cuqi.data, phantom): x_exact2D = getattr(cuqi.data, phantom)(size=dim) else: raise ValueError(f"Phantom {phantom} not found in cuqi.data phantom library.") else: raise TypeError("Unknown phantom type. Must be ndarray or string.") x_exact = x_exact2D.flatten() x_exact = CUQIarray(x_exact, is_par=True, geometry=domain_geometry) # Generate exact data (blurred) y_exact = model@x_exact # Create prior if prior is None: prior = cuqi.distribution.Gaussian(np.zeros(model.domain_dim), 1, geometry=domain_geometry, name="x") # Data distribution if noise_type.lower() == "gaussian": data_dist = cuqi.distribution.Gaussian(model(prior), noise_std**2, geometry=range_geometry, name="y") elif noise_type.lower() == "scaledgaussian": data_dist = cuqi.distribution.Gaussian(model(prior), (y_exact*noise_std)**2, geometry=range_geometry, name="y") else: raise NotImplementedError("This noise type is not implemented") # Generate noisy data data = data_dist(x_exact).sample() # Likelihood likelihood = data_dist.to_likelihood(data) # Initialize Deconvolution as BayesianProblem problem super().__init__(likelihood, prior) self.exactSolution = x_exact self.exactData = y_exact self.infoString = "Noise type: Additive {} with std: {}".format(noise_type.capitalize(),noise_std) self.Miscellaneous = {"PSF" : P, "BC": BC}
#========================================================================= def _proj_forward_2D(X, P, BC): PSF_size = max(P.shape) X_padded = np.pad(X, PSF_size//2, mode=BC) Ax = fftconvolve(X_padded, P, mode='valid') if not PSF_size & 0x1: # If PSF_size is even Ax = Ax[1:, 1:] # Remove first row and column to fit convolve math return Ax #========================================================================= def _proj_backward_2D(B, P, BC): P = np.flipud(np.fliplr(P)) # Flip PSF return _proj_forward_2D(B, P, BC) # =================================================================== # Array with PSF for Gaussian blur (astronomic turbulence) # =================================================================== def _GaussPSF(dim, s): if hasattr(dim, "__len__"): m, n = dim[0], dim[1] else: m, n = dim, dim s1, s2 = s, s # Set up grid points to evaluate the Gaussian function x = np.arange(-np.fix(n/2), np.ceil(n/2)) y = np.arange(-np.fix(m/2), np.ceil(m/2)) X, Y = np.meshgrid(x, y) # Compute the Gaussian, and normalize the PSF. PSF = np.exp( -0.5* ((X**2)/(s1**2) + (Y**2)/(s2**2)) ) PSF /= PSF.sum() # find the center mm, nn = np.where(PSF == PSF.max()) center = np.array([mm[0], nn[0]]) return PSF, center.astype(int) # =================================================================== # Array with PSF for Moffat blur (astronomical telescope) # =================================================================== def _MoffatPSF(dim, s, beta): if hasattr(dim, "__len__"): m, n = dim[0], dim[1] else: m, n = dim, dim s1, s2 = s, s # Set up grid points to evaluate the Gaussian function x = np.arange(-np.fix(n/2), np.ceil(n/2)) y = np.arange(-np.fix(m/2), np.ceil(m/2)) X, Y = np.meshgrid(x, y) # Compute the Gaussian, and normalize the PSF. PSF = ( 1 + (X**2)/(s1**2) + (Y**2)/(s2**2) )**(-beta) PSF = PSF / PSF.sum() # find the center mm, nn = np.where(PSF == PSF.max()) center = np.array([mm[0], nn[0]]) return PSF, center.astype(int) # =================================================================== # Array with PSF for out-of-focus blur # =================================================================== def _DefocusPSF(dim, R): if hasattr(dim, "__len__"): m, n = dim[0], dim[1] else: m, n = dim, dim center = np.fix((np.array([m, n]))/2) if (R == 0): # the PSF is a delta function and so the blurring matrix is I PSF = np.zeros((m, n)) PSF[center[0], center[1]] = 1 else: PSF = np.ones((m, n)) / (np.pi * R**2) k = np.arange(1, max(m, n)+1) aa, bb = (k-center[0])**2, (k-center[1])**2 A, B = np.meshgrid(aa, aa), np.meshgrid(bb, bb) idx = np.array(((A[0].T + B[0]) > (R**2))) PSF[idx] = 0 PSF = PSF / PSF.sum() return PSF, center.astype(int) #=============================================================================
[docs] class WangCubic(BayesianProblem): """ Two parameters and one observation cubic test problem. Parameters ------------ noise_std : scalar Standard deviation of the noise prior : cuqi.distribution.Distribution Distribution of the prior data : scalar Observed data Notes ----- Based on Section 3.3.2 in Wang (2015): Z. Wang, "An Optimization Based Algorithm for Bayesian Inference". Master thesis. MIT. 2015 https://dspace.mit.edu/bitstream/handle/1721.1/98815/921147308-MIT.pdf?sequence=1&isAllowed=y """
[docs] def __init__(self, noise_std=1, prior=None, data=None): # forward model and gradient def forward(x): return 10*x[1] - 10*x[0]**3 + 5*x[0]**2 + 6*x[0] def jacobian(x): return np.array([[-30*x[0]**2 + 10*x[0] + 6, 10]]) model = cuqi.model.Model(forward, range_geometry=1, domain_geometry=2, jacobian=jacobian) # define prior if prior is None: prior = cuqi.distribution.Gaussian(np.array([1, 0]), 1, name="x") # data if data is None: data = 1 # data distribution is Gaussian data_dist = cuqi.distribution.Gaussian(model(prior), noise_std**2, name="y") # Define Gaussian likelihood likelihood = data_dist.to_likelihood(data) # Set up Bayesian Problem super().__init__(likelihood, prior) # Store exact values self.exactSolution = None self.exactData = None self.infoString = "Noise type: Additive {} with std: {}".format('Gaussian', noise_std)