Source code for cuqi.problem._problem

from __future__ import annotations
import numpy as np
import time
from typing import Tuple

import cuqi
from cuqi import config
from cuqi.distribution import Distribution, Gaussian, InverseGamma, LMRF, GMRF, Lognormal, Posterior, Beta, JointDistribution, Gamma, ModifiedHalfNormal, CMRF
from cuqi.implicitprior import RegularizedGaussian, RegularizedGMRF
from cuqi.density import Density
from cuqi.model import LinearModel, Model
from cuqi.likelihood import Likelihood
from cuqi.geometry import _DefaultGeometry
from cuqi.utilities import ProblemInfo
from cuqi.array import CUQIarray
import warnings
import matplotlib.pyplot as plt

from copy import copy

[docs] class BayesianProblem(object): """ Representation of a Bayesian inverse problem defined by any number of densities (distributions and likelihoods), e.g. .. math:: \\begin{align*} \mathrm{density}_1 &\sim \pi_1 \\newline \mathrm{density}_2 &\sim \pi_2 \\newline &\\vdots \end{align*} The main goal of this class is to provide fully automatic methods for computing samples or point estimates of the Bayesian problem. This class uses :class:`~cuqi.distribution.JointDistribution` to model the Bayesian problem, and to condition on observed data. We term the resulting distribution the *target distribution*. Parameters ---------- \*densities: Density The densities that represent the Bayesian Problem. Each density is passed as comma-separated arguments. Can be Distribution, Likelihood etc. \**data: ndarray, Optional Any potential observed data. The data should be passed as keyword arguments, where the keyword is the name of the density that the data is associated with. Data can alternatively be set using the :meth:`set_data` method, after the problem has been initialized. Examples -------- **Basic syntax** Given distributions for ``x``, ``y`` and ``z``, we can define a Bayesian problem and set the observed data ``y=y_data`` as follows: .. code-block:: python BP = BayesianProblem(x, y, z).set_data(y=y_data) **Complete example** Consider a Bayesian inverse problem with a Gaussian prior and a Gaussian likelihood. Assume that the forward model is a linear model with a known matrix :math:`\mathbf{A}: \mathbf{x} \mapsto \mathbf{y}` and that we have observed data :math:`\mathbf{y}=\mathbf{y}^\mathrm{obs}`. The Bayesian model can be summarized as .. math:: \\begin{align*} \mathbf{x} &\sim \mathcal{N}(\mathbf{0}, 0.1^2 \mathbf{I}) \\newline \mathbf{y} &\sim \mathcal{N}(\mathbf{A}\mathbf{x}, 0.05^2 \mathbf{I}). \end{align*} Using the :class:`BayesianProblem` class, we can define the problem, set the data and compute samples from the posterior distribution associated with the problem as well as estimates such as the Maximum Likelihood (ML) and Maximum A Posteriori (MAP). .. note:: In this case, we use a forward model from the :mod:`~cuqi.testproblem` module, but any custom forward model can added via the :mod:`~cuqi.model` module. .. code-block:: python # Import modules import cuqi import numpy as np import matplotlib.pyplot as plt # Deterministic forward model and data (1D convolution) A, y_data, probInfo = cuqi.testproblem.Deconvolution1D().get_components() # Bayesian model x = cuqi.distribution.Gaussian(np.zeros(A.domain_dim), 0.1) y = cuqi.distribution.Gaussian(A@x, 0.05) # Define Bayesian problem and set data BP = cuqi.problem.BayesianProblem(y, x).set_data(y=y_data) # Compute MAP estimate x_MAP = BP.MAP() # Compute samples from posterior x_samples = BP.sample_posterior(1000) # Plot results x_samples.plot_ci(exact=probInfo.exactSolution) plt.show() # Plot difference between MAP and sample mean (x_MAP - x_samples.mean()).plot() plt.title("MAP estimate - sample mean") plt.show() Notes ----- In the simplest form the Bayesian problem represents a posterior distribution defined by two densities, i.e., .. math:: \pi_\mathrm{posterior}(\\boldsymbol{\\theta} \mid \mathbf{y}) \propto \pi_1(\mathbf{y} \mid \\boldsymbol{\\theta}) \pi_2(\\boldsymbol{\\theta}), where :math:`\pi_1(\mathbf{y} \mid \\boldsymbol{\\theta})` is a :class:`~cuqi.likelihood.Likelihood` function and :math:`\pi_2(\\boldsymbol{\\theta})` is a :class:`~cuqi.distribution.Distribution`. In this two-density case, the joint distribution reduces to a :class:`~cuqi.distribution.Posterior` distribution. Most functionality is currently only implemented for this simple case. """
[docs] def get_components(self) -> Tuple[Model, CUQIarray, ProblemInfo]: """ Method that returns the model, the data and additional information to be used in formulating the Bayesian problem. """ problem_info = ProblemInfo() #Instead of a dict, we use our ProblemInfo dataclass. for key, value in vars(problem_info).items(): if hasattr(self, key): setattr(problem_info,key,vars(self)[key]) return self.model, self.data, problem_info
[docs] def __init__(self, *densities: Density, **data: np.ndarray): self._target = JointDistribution(*densities)(**data)
[docs] def set_data(self, **kwargs) -> BayesianProblem: """ Set the data of the problem. This conditions the underlying joint distribution on the data. """ if not isinstance(self._target, JointDistribution): raise ValueError("Unable to set data for this problem. Maybe data is already set?") self._target = self._target(**kwargs) return self
@property def data(self): """Extract the observed data from likelihood""" return self.likelihood.data @property def likelihood(self) -> Likelihood: """The likelihood function.""" if not isinstance(self._target, Posterior): raise ValueError(f"Unable to extract likelihood from this problem. Current target is: \n {self._target}") return self._target.likelihood @likelihood.setter def likelihood(self, likelihood): if not isinstance(self._target, Posterior): raise ValueError(f"Unable to set likelihood for this problem. Current target is: \n {self._target}") self._target.likelihood = likelihood @property def prior(self) -> Distribution: """The prior distribution""" if not isinstance(self._target, Posterior): raise ValueError(f"Unable to extract prior from this problem. Current target is: \n {self._target}") return self._target.prior @prior.setter def prior(self, prior): if not isinstance(self._target, Posterior): raise ValueError(f"Unable to set prior for this problem. Current target is: \n {self._target}") self._target.prior = prior @property def model(self) -> Model: """Extract the cuqi model from likelihood.""" return self.likelihood.model @property def posterior(self) -> Posterior: """Create posterior distribution from likelihood and prior.""" if not isinstance(self._target, Posterior): raise ValueError(f"Unable to extract posterior for this problem. Current target is: \n {self._target}") return self._target
[docs] def ML(self, disp=True, x0=None) -> CUQIarray: """ Compute the Maximum Likelihood (ML) estimate of the posterior. Parameters ---------- disp : bool display info messages? (True or False). x0 : CUQIarray or ndarray User-specified initial guess for the solver. Defaults to a ones vector. Returns ------- CUQIarray ML estimate of the posterior. Solver info is stored in the returned CUQIarray `info` attribute. """ if disp: # Print warning to user about the automatic solver selection print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!") print("!!! Automatic solver selection is a work-in-progress !!!") print("!!! Always validate the computed results. !!!") print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!") print("") x_ML, solver_info = self._solve_max_point(self.likelihood, disp=disp, x0=x0) # Wrap the result in a CUQIarray and add solver info x_ML = cuqi.array.CUQIarray(x_ML, geometry=self.likelihood.geometry) x_ML.info = solver_info return x_ML
[docs] def MAP(self, disp=True, x0=None) -> CUQIarray: """ Compute the Maximum A Posteriori (MAP) estimate of the posterior. Parameters ---------- disp : bool display info messages? (True or False). x0 : CUQIarray or ndarray User-specified initial guess for the solver. Defaults to a ones vector. Returns ------- CUQIarray MAP estimate of the posterior. Solver info is stored in the returned CUQIarray `info` attribute. """ if disp: # Print warning to user about the automatic solver selection print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!") print("!!! Automatic solver selection is a work-in-progress !!!") print("!!! Always validate the computed results. !!!") print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!") print("") if self._check_posterior(self, Gaussian, Gaussian, LinearModel, max_dim=config.MAX_DIM_INV): if disp: print(f"Using direct MAP of Gaussian posterior. Only works for small-scale problems with dim<={config.MAX_DIM_INV}.") b = self.data A = self.model.get_matrix() Ce = self.likelihood.distribution.cov x0 = self.prior.mean Cx = self.prior.cov # If Ce and Cx are scalar, make them into matrices if np.size(Ce)==1: Ce = Ce.ravel()[0]*np.eye(self.model.range_dim) if np.size(Cx)==1: Cx = Cx.ravel()[0]*np.eye(self.model.domain_dim) #Basic MAP estimate using closed-form expression Tarantola 2005 (3.37-3.38) rhs = b-A@x0 sysm = A@Cx@A.T+Ce x_MAP = x0 + Cx@(A.T@np.linalg.solve(sysm,rhs)) solver_info = {"solver": "direct"} else: # If no specific implementation exists, use numerical optimization. x_MAP, solver_info = self._solve_max_point(self.posterior, disp=disp, x0=x0) # Wrap the result in a CUQIarray and add solver info x_MAP = cuqi.array.CUQIarray(x_MAP, geometry=self.posterior.geometry) x_MAP.info = solver_info return x_MAP
[docs] def sample_posterior(self, Ns, Nb=None, callback=None, legacy=False) -> cuqi.samples.Samples: """Sample the posterior. Sampler choice and tuning is handled automatically. Parameters ---------- Ns : int Number of samples to draw. Nb : int or None, *Optional* Number of burn-in samples. If not provided, 20% of the samples will be used for burn-in. callback : callable, *Optional* If set this function will be called after every sample. If the parameter `legacy` is set to False, which is the default, the callback function should take three arguments: the sampler object, the index of the current sampling step, the total number of requested samples. The last two arguments are integers. An example of the callback function signature in the case is: `callback(sampler, sample_index, num_of_samples)`. If the parameter `legacy` is set to True, the signature of the callback function is `callback(sample, sample_index)`, where `sample` is the current sample and `sample_index` is the index of the sample. An example is shown in demos/demo31_callback.py. legacy : bool, *Optional* Default is False. If set to True, the sampler selection will use the samplers from the legacy sampler module, :mod:`cuqi.legacy.sampler` module. Returns ------- samples : cuqi.samples.Samples Samples from the posterior. """ # Print warning to user about the automatic sampler selection print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!") print("!!! Automatic sampler selection is a work-in-progress. !!!") print("!!! Always validate the computed results. !!!") print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!") print("") if not legacy: print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!") print("!!! Using samplers from cuqi.sampler !!!") print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!") print("") # Set up burn-in if not provided if Nb is None: Nb = int(0.2*Ns) # If target is a joint distribution, try Gibbs sampling # This is still very experimental! if isinstance(self._target, JointDistribution): return self._sampleGibbs(Ns, Nb, callback=callback, legacy=legacy) # For Gaussian small-scale we can use direct sampling if self._check_posterior(self, Gaussian, Gaussian, LinearModel, config.MAX_DIM_INV) and not self._check_posterior(self, GMRF): return self._sampleMapCholesky(Ns, callback) # For larger-scale Gaussian we use Linear RTO. TODO: Improve checking once we have a common Gaussian class. elif hasattr(self.prior,"sqrtprecTimesMean") and hasattr(self.likelihood.distribution,"sqrtprec") and isinstance(self.model,LinearModel): return self._sampleLinearRTO(Ns, Nb, callback, legacy=legacy) # For LMRF we use our awesome unadjusted Laplace approximation! elif self._check_posterior(self, LMRF, Gaussian): return self._sampleUGLA(Ns, Nb, callback, legacy=legacy) # If we have gradients, use NUTS! # TODO: Fix cases where we have gradients but NUTS fails (see checks) elif self._check_posterior(self, must_have_gradient=True) and not self._check_posterior(self, (Beta, InverseGamma, Lognormal)): return self._sampleNUTS(Ns, Nb, callback, legacy=legacy) # For Gaussians with non-linear model we use pCN elif self._check_posterior(self, (Gaussian, GMRF), Gaussian): return self._samplepCN(Ns, Nb, callback, legacy=legacy) # For Regularized Gaussians with linear models we use RegularizedLinearRTO elif self._check_posterior(self, (RegularizedGaussian, RegularizedGMRF), Gaussian, LinearModel): return self._sampleRegularizedLinearRTO(Ns, Nb, callback, legacy=legacy) else: raise NotImplementedError(f"Automatic sampler choice is not implemented for model: {type(self.model)}, likelihood: {type(self.likelihood.distribution)} and prior: {type(self.prior)} and dim {self.prior.dim}. Manual sampler choice can be done via the 'sampler' module. Posterior distribution can be extracted via '.posterior' of any testproblem (BayesianProblem).")
[docs] def sample_prior(self, Ns, callback=None) -> cuqi.samples.Samples: """ Sample the prior distribution. Sampler choice and tuning is handled automatically. """ # Try sampling prior directly try: return self.prior.sample(Ns) except NotImplementedError: pass # If no direct method exists redefine posterior to one with a constant likelihood and sample from posterior print("Using MCMC methods from sampler module to sample prior.") print("Make sure enough samples are drawn for convergence.") print("") # Create a copy of self prior_problem = copy(self) # Set likelihood to constant model = cuqi.model.LinearModel(lambda x: 0*x, lambda y: 0*y, self.model.range_geometry, self.model.domain_geometry) likelihood = cuqi.distribution.Gaussian(model, 1).to_likelihood(np.zeros(self.model.range_dim)) # p(y|x)=constant prior_problem.likelihood = likelihood # Set up burn-in Nb = int(0.2*Ns) # Now sample prior problem return prior_problem.sample_posterior(Ns, Nb, callback)
[docs] def UQ(self, Ns=1000, Nb=None, percent=95, exact=None, legacy=False) -> cuqi.samples.Samples: """ Run an Uncertainty Quantification (UQ) analysis on the Bayesian problem and provide a summary of the results. Parameters ---------- Ns : int, *Optional* Number of samples to draw. Defaults to 1000. Nb : int, *Optional* Number of burn-in samples. If not provided, 20% of the samples will be used for burn-in. exact : ndarray or dict[str, ndarray], *Optional* Exact solution to the problem. If provided the summary will include a comparison to the exact solution. If a dict is provided, the keys should be the names of the variables and the values should be the exact solution for each variable. percent : float, *Optional* The credible interval to plot. Defaults to 95%. legacy : bool, *Optional* Default is False. If set to True, the sampler selection will use the samplers from the legacy sampler module, :mod:`cuqi.legacy.sampler` module. Returns ------- samples : cuqi.samples.Samples Samples from the posterior. The samples can be used to compute further statistics and plots. """ print(f"Computing {Ns} samples") samples = self.sample_posterior(Ns, Nb, legacy=legacy) print("Plotting results") # Gibbs case if isinstance(samples, dict): for key, value in samples.items(): if exact is not None and key in exact: self._plot_UQ_for_variable( value, percent=percent, exact=exact[key]) else: self._plot_UQ_for_variable( value, percent=percent, exact=None) # Single parameter case else: self._plot_UQ_for_variable( samples, percent=percent, exact=exact) return samples
def _plot_UQ_for_variable( self, samples: cuqi.samples.Samples, percent=None, exact=None): """ Do a fitting UQ plot for a single variable given by samples. """ # Potentially extract exact solution if exact is None and hasattr(self, 'exactSolution'): exact = self.exactSolution # Plot traces for single parameters if samples.shape[0] == 1: samples.plot_trace(exact=exact) else: # Else plot credible intervals # If plot_ci throws a NotImplementedError (likely coming from # _plot_envelope method), we try to plot the CI for the parameters # instead and plot the mean and the variance using the function # representation of the samples geometry. try: samples.plot_ci(percent=percent, exact=exact) except NotImplementedError as nie: print( "Unable to plot CI for samples with the underlying " + f"geometry: {samples.geometry}. Plotting the CI for the " + "parameters instead.") self._alternative_plot_UQ_for_variable( samples, percent=percent, exact=exact) def _alternative_plot_UQ_for_variable( self, samples: cuqi.samples.Samples, percent=None, exact=None): """ Alternative visualization for UQ analysis used when plot_ci method fails for the given samples geometry. """ samples.plot_ci(percent=percent, exact=exact, plot_par=True) plt.figure() samples.plot_mean() plt.title("Sample parameter mean converted\nto function representation") plt.figure() samples.funvals.vector.plot_mean() plt.title("Sample mean of function representation") plt.figure() samples.plot_variance() plt.title( "Sample parameter variance converted\nto function representation") plt.figure() samples.funvals.vector.plot_variance() plt.title("Sample variance of function representation") def _sampleLinearRTO(self, Ns, Nb, callback=None, legacy=False): if not legacy: print("Using cuqi.sampler LinearRTO sampler.") print(f"burn-in: {Nb/Ns*100:g}%") sampler = cuqi.sampler.LinearRTO(self.posterior, callback=callback) ti = time.time() sampler.warmup(Nb) sampler.sample(Ns) samples = sampler.get_samples().burnthin(Nb) print('Elapsed time:', time.time() - ti) else: print("Using cuqi.legacy.sampler LinearRTO sampler.") print(f"burn-in: {Nb/Ns*100:g}%") # Start timing ti = time.time() # Sample sampler = cuqi.legacy.sampler.LinearRTO(self.posterior, callback=callback) samples = sampler.sample(Ns, Nb) # Print timing print('Elapsed time:', time.time() - ti) return samples def _sampleMapCholesky(self, Ns, callback=None): print(f"Using direct sampling of Gaussian posterior. Only works for small-scale problems with dim<={config.MAX_DIM_INV}.") print("No burn-in needed for direct sampling.") # Start timing ti = time.time() b = self.data A = self.model.get_matrix() Ce = self.likelihood.distribution.cov x0 = self.prior.mean Cx = self.prior.cov # If Ce and Cx are scalar, make them into matrices if np.size(Ce)==1: Ce = Ce.ravel()[0]*np.eye(self.model.range_dim) if np.size(Cx)==1: Cx = Cx.ravel()[0]*np.eye(self.model.domain_dim) # Preallocate samples n = self.prior.dim x_s = np.zeros((n,Ns)) x_map = self.MAP(disp=False) #Compute MAP estimate C = np.linalg.inv(A.T@(np.linalg.inv(Ce)@A)+np.linalg.inv(Cx)) L = np.linalg.cholesky(C) for s in range(Ns): x_s[:,s] = x_map.parameters + L@np.random.randn(n) # display iterations if (s % 5e2) == 0: print("\r",'Sample', s, '/', Ns, end="") # Callback function if callback is not None: callback(x_s[:,s], s) print("\r",'Sample', s+1, '/', Ns) print('Elapsed time:', time.time() - ti) return cuqi.samples.Samples(x_s,self.model.domain_geometry) def _sampleCWMH(self, Ns, Nb, callback=None, legacy=False): if not legacy: print("Using cuqi.sampler Component-wise Metropolis-Hastings (CWMH) sampler.") print(f"burn-in: {Nb/Ns*100:g}%, scale: 0.05, x0: 0.5 (vector)") scale = 0.05*np.ones(self.prior.dim) x0 = 0.5*np.ones(self.prior.dim) sampler = cuqi.sampler.CWMH(self.posterior, scale, x0, callback=callback) ti = time.time() sampler.warmup(Nb) sampler.sample(Ns) x_s = sampler.get_samples().burnthin(Nb) print('Elapsed time:', time.time() - ti) else: print("Using cuqi.legacy.sampler Component-wise Metropolis-Hastings (CWMH) sampler (sample_adapt)") print(f"burn-in: {Nb/Ns*100:g}%, scale: 0.05, x0: 0.5 (vector)") # Dimension n = self.prior.dim # Set up target and proposal def proposal(x_t, sigma): return np.random.normal(x_t, sigma) # Set up sampler scale = 0.05*np.ones(n) x0 = 0.5*np.ones(n) MCMC = cuqi.legacy.sampler.CWMH(self.posterior, proposal, scale, x0, callback=callback) # Run sampler ti = time.time() x_s = MCMC.sample_adapt(Ns,Nb); #ToDo: Make results class print('Elapsed time:', time.time() - ti) return x_s def _samplepCN(self, Ns, Nb, callback=None, legacy=False): if not legacy: print("Using cuqi.sampler preconditioned Crank-Nicolson (pCN) sampler.") print(f"burn-in: {Nb/Ns*100:g}%, scale: 0.02") scale = 0.02 sampler = cuqi.sampler.PCN(self.posterior, scale, callback=callback) ti = time.time() sampler.warmup(Nb) sampler.sample(Ns) x_s = sampler.get_samples().burnthin(Nb) print('Elapsed time:', time.time() - ti) else: print("Using cuqi.legacy.sampler preconditioned Crank-Nicolson (pCN) sampler (sample_adapt)") print(f"burn-in: {Nb/Ns*100:g}%, scale: 0.02") scale = 0.02 MCMC = cuqi.legacy.sampler.pCN(self.posterior, scale, callback=callback) #Run sampler ti = time.time() x_s = MCMC.sample_adapt(Ns, Nb) print('Elapsed time:', time.time() - ti) return x_s def _sampleNUTS(self, Ns, Nb, callback=None, legacy=False): if not legacy: print("Using cuqi.sampler No-U-Turn (NUTS) sampler.") print(f"burn-in: {Nb/Ns*100:g}%") sampler = cuqi.sampler.NUTS(self.posterior, callback=callback) ti = time.time() sampler.warmup(Nb) sampler.sample(Ns) x_s = sampler.get_samples().burnthin(Nb) print('Elapsed time:', time.time() - ti) else: print("Using cuqi.legacy.sampler No-U-Turn (NUTS) sampler") print(f"burn-in: {Nb/Ns*100:g}%") MCMC = cuqi.legacy.sampler.NUTS(self.posterior, callback=callback) # Run sampler ti = time.time() x_s = MCMC.sample_adapt(Ns,Nb) print('Elapsed time:', time.time() - ti) return x_s def _sampleUGLA(self, Ns, Nb, callback=None, legacy=False): if not legacy: print("Using cuqi.sampler Unadjusted Gaussian Laplace Approximation (UGLA) sampler.") print(f"burn-in: {Nb/Ns*100:g}%") sampler = cuqi.sampler.UGLA(self.posterior, callback=callback) ti = time.time() sampler.warmup(Nb) sampler.sample(Ns) samples = sampler.get_samples().burnthin(Nb) print('Elapsed time:', time.time() - ti) else: print("Using cuqi.legacy.sampler UGLA sampler") print(f"burn-in: {Nb/Ns*100:g}%") # Start timing ti = time.time() # Sample sampler = cuqi.legacy.sampler.UGLA(self.posterior, callback=callback) samples = sampler.sample(Ns, Nb) # Print timing print('Elapsed time:', time.time() - ti) return samples def _sampleRegularizedLinearRTO(self, Ns, Nb, callback=None, legacy=False): if not legacy: print("Using cuqi.sampler Regularized LinearRTO sampler.") print(f"burn-in: {Nb/Ns*100:g}%") sampler = cuqi.sampler.RegularizedLinearRTO(self.posterior, maxit=100, stepsize = "automatic", abstol=1e-10, callback=callback) ti = time.time() sampler.warmup(Nb) sampler.sample(Ns) samples = sampler.get_samples().burnthin(Nb) print('Elapsed time:', time.time() - ti) else: print("Using cuqi.legacy.sampler Regularized LinearRTO sampler.") print(f"burn-in: {Nb/Ns*100:g}%") # Start timing ti = time.time() # Sample sampler = cuqi.legacy.sampler.RegularizedLinearRTO(self.posterior, maxit=100, stepsize = "automatic", abstol=1e-10, callback=callback) samples = sampler.sample(Ns, Nb) # Print timing print('Elapsed time:', time.time() - ti) return samples def _solve_max_point(self, density, disp=True, x0=None): """ This is a helper function for point estimation of the maximum of a density (e.g. posterior or likelihood) using solver module. Parameters ---------- density : Density (Distribution or Likelihood) The density or likelihood to compute the maximum point of the negative log. disp : bool display info messages? (True or False). """ # Get the function to minimize (negative log-likelihood or negative log-posterior) def func(x): return -density.logd(x) # Initial value if not given if x0 is None: x0 = np.ones(self.model.domain_dim) # Get the gradient (if available) try: density.gradient(x0) def gradfunc(x): return -density.gradient(x) if disp: print("Optimizing with exact gradients") except (NotImplementedError, AttributeError): gradfunc = None if disp: print("Optimizing with approximate gradients.") # Compute point estimate if self._check_posterior(self, CMRF, must_have_gradient=True): # Use L-BFGS-B for CMRF prior as it has better performance for this multi-modal posterior if disp: print(f"Using scipy.optimize.L_BFGS_B on negative log of {density.__class__.__name__}") if disp: print("x0: ones vector") solver = cuqi.solver.ScipyLBFGSB(func, x0, gradfunc=gradfunc) else: if disp: print(f"Using scipy.optimize.minimize on negative log of {density.__class__.__name__}") if disp: print("x0: ones vector") solver = cuqi.solver.ScipyMinimizer(func, x0, gradfunc=gradfunc) x_MAP, solver_info = solver.solve() # Add info on solver choice solver_info["solver"] = "L-BFGS-B" return x_MAP, solver_info def _check_geometries_consistency(self, geom1, geom2, fail_msg): """checks geom1 and geom2 consistency . If both are of type `_DefaultGeometry` they need to be equal. If one of them is of `_DefaultGeometry` type, it will take the value of the other one. If both of them are user defined, they need to be consistent""" if isinstance(geom1,_DefaultGeometry): if isinstance(geom2,_DefaultGeometry): if geom1 == geom2: return geom1,geom2 else: return geom2, geom2 else: if isinstance(geom2,_DefaultGeometry): return geom1,geom1 else: if geom1 == geom2: return geom1,geom2 raise Exception(fail_msg) @staticmethod def _check_posterior(posterior, prior_type=None, likelihood_type=None, model_type=None, max_dim=None, must_have_gradient=False): """Returns true if components of the posterior reflects the types (can be tuple of types) given as input.""" # Prior check if prior_type is None: P = True else: P = isinstance(posterior.prior, prior_type) # Likelihood check if likelihood_type is None: L = True else: L = isinstance(posterior.likelihood.distribution, likelihood_type) # Model check if model_type is None: M = True else: M = isinstance(posterior.model, model_type) #Dimension check if max_dim is None: D = True else: D = posterior.model.domain_dim<=max_dim and posterior.model.range_dim<=max_dim # Require gradient? if must_have_gradient: try: posterior.posterior.gradient(np.zeros(posterior.posterior.dim)) G = True except (NotImplementedError, AttributeError): G = False else: G = True return L and P and M and D and G def _sampleGibbs(self, Ns, Nb, callback=None, legacy=False): """ This is a helper function for sampling from the posterior using Gibbs sampler. """ if not legacy: print("Using cuqi.sampler HybridGibbs sampler") print(f"burn-in: {Nb/Ns*100:g}%") print("") # Start timing ti = time.time() # Sampling strategy sampling_strategy = self._determine_sampling_strategy(legacy=False) sampler = cuqi.sampler.HybridGibbs( self._target, sampling_strategy, callback=callback) sampler.warmup(Nb) sampler.sample(Ns) samples = sampler.get_samples() # Dict with Samples objects for each parameter # Now apply burnthin to each value in dict for key, value in samples.items(): samples[key] = value.burnthin(Nb) # Print timing print('Elapsed time:', time.time() - ti) else: print("Using Gibbs sampler") print(f"burn-in: {Nb/Ns*100:g}%") print("") if callback is not None: raise NotImplementedError("Callback not implemented for the legacy Gibbs sampler. It is only implemented for cuqi.sampler Gibbs (cuqi.sampler.HybridGibbs) sampler.") # Start timing ti = time.time() # Sampling strategy sampling_strategy = self._determine_sampling_strategy() sampler = cuqi.legacy.sampler.Gibbs(self._target, sampling_strategy) samples = sampler.sample(Ns, Nb) # Print timing print('Elapsed time:', time.time() - ti) return samples def _determine_sampling_strategy(self, legacy=True): """ This is a helper function for determining the sampling strategy for Gibbs sampler. It is still very experimental and not very robust. """ # We determine sampling strategy by sequentially conditioning each variable on the others. # We then re-use the _check_posterior method to select the best sampler for each variable. # In the future we may consider refactoring these methods into one more robust way of # determining the sampling strategy. # Joint distribution and parameters joint = self._target par_names = joint.get_parameter_names() # Go through each parameter and condition on the others, then select the best sampler sampling_strategy = {} for par_name in par_names: # Dict of all other parameters to condition on with ones vector as initial value other_params = {par_name_: np.ones(joint.get_density(par_name_).dim) for par_name_ in par_names if par_name_ != par_name} # Condition on all other parameters to get target conditional distribution cond_target = joint(**other_params) # If not Posterior, we cant get sampling strategy (for now) if not isinstance(cond_target, Posterior): raise NotImplementedError(f"Unable to determine sampling strategy for {par_name} with target {cond_target}") # Gamma or ModifiedHalfNormal prior, Gaussian or RegularizedGaussian likelihood -> Conjugate if self._check_posterior(cond_target, (Gamma, ModifiedHalfNormal), (Gaussian, GMRF, RegularizedGaussian, RegularizedGMRF)): if not legacy: sampling_strategy[par_name] = cuqi.sampler.Conjugate() else: sampling_strategy[par_name] = cuqi.legacy.sampler.Conjugate # Gamma prior, LMRF likelihood -> ConjugateApprox elif self._check_posterior(cond_target, Gamma, LMRF): if not legacy: sampling_strategy[par_name] = cuqi.sampler.ConjugateApprox() else: sampling_strategy[par_name] = cuqi.legacy.sampler.ConjugateApprox # Gaussian prior, Gaussian likelihood, Linear model -> LinearRTO elif self._check_posterior(cond_target, (Gaussian, GMRF), Gaussian, LinearModel): if not legacy: sampling_strategy[par_name] = cuqi.sampler.LinearRTO() else: sampling_strategy[par_name] = cuqi.legacy.sampler.LinearRTO # Implicit Regularized Gaussian prior, Gaussian likelihood, linear model -> RegularizedLinearRTO elif self._check_posterior(cond_target, (RegularizedGaussian, RegularizedGMRF), Gaussian, LinearModel): if not legacy: sampling_strategy[par_name] = cuqi.sampler.RegularizedLinearRTO() else: sampling_strategy[par_name] = cuqi.legacy.sampler.RegularizedLinearRTO # LMRF prior, Gaussian likelihood, Linear model -> UGLA elif self._check_posterior(cond_target, LMRF, Gaussian, LinearModel): if not legacy: sampling_strategy[par_name] = cuqi.sampler.UGLA() else: sampling_strategy[par_name] = cuqi.legacy.sampler.UGLA else: raise NotImplementedError(f"Unable to determine sampling strategy for {par_name} with target {cond_target}") print("Automatically determined sampling strategy:") for dist_name, strategy in sampling_strategy.items(): if not legacy: print(f"\t{dist_name}: {strategy.__class__.__name__} (mcmc.sampler)") else: print(f"\t{dist_name}: {strategy.__name__}") print("") return sampling_strategy def __repr__(self): return f"BayesianProblem with target: \n {self._target}"