Source code for cuqi.legacy.sampler._gibbs

from cuqi.distribution import JointDistribution
from cuqi.legacy.sampler import Sampler
from cuqi.samples import Samples
from typing import Dict, Union
import numpy as np
import sys
import warnings

[docs] class Gibbs: """ Gibbs sampler for sampling a joint distribution. Gibbs sampling samples the variables of the distribution sequentially, one variable at a time. When a variable represents a random vector, the whole vector is sampled simultaneously. The sampling of each variable is done by sampling from the conditional distribution of that variable given the values of the other variables. This is often a very efficient way of sampling from a joint distribution if the conditional distributions are easy to sample from. Parameters ---------- target : cuqi.distribution.JointDistribution Target distribution to sample from. sampling_strategy : dict Dictionary of sampling strategies for each parameter. Keys are parameter names. Values are sampler objects. Example ------- .. code-block:: python import cuqi import numpy as np # Model and data A, y_obs, probinfo = cuqi.testproblem.Deconvolution1D(phantom='square').get_components() n = A.domain_dim # Define distributions d = cuqi.distribution.Gamma(1, 1e-4) l = cuqi.distribution.Gamma(1, 1e-4) x = cuqi.distribution.GMRF(np.zeros(n), lambda d: d) y = cuqi.distribution.Gaussian(A, lambda l: 1/l) # Combine into a joint distribution and create posterior joint = cuqi.distribution.JointDistribution(d, l, x, y) posterior = joint(y=y_obs) # Define sampling strategy sampling_strategy = { 'x': cuqi.legacy.sampler.LinearRTO, ('d', 'l'): cuqi.legacy.sampler.Conjugate, } # Define Gibbs sampler sampler = cuqi.legacy.sampler.Gibbs(posterior, sampling_strategy) # Run sampler samples = sampler.sample(Ns=1000, Nb=200) # Plot results samples['x'].plot_ci(exact=probinfo.exactSolution) samples['d'].plot_trace(figsize=(8,2)) samples['l'].plot_trace(figsize=(8,2)) """
[docs] def __init__(self, target: JointDistribution, sampling_strategy: Dict[Union[str,tuple], Sampler]): warnings.warn(f"\nYou are using the legacy sampler '{self.__class__.__name__}'.\n" f"This will be removed in a future release of CUQIpy.\n" f"Please consider using the new samplers in the 'cuqi.sampler' module.\n", UserWarning, stacklevel=2) # Store target and allow conditioning to reduce to a single density self.target = target() # Create a copy of target distribution (to avoid modifying the original) # Parse samplers and split any keys that are tuple into separate keys self.samplers = {} for par_name in sampling_strategy.keys(): if isinstance(par_name, tuple): for par_name_ in par_name: self.samplers[par_name_] = sampling_strategy[par_name] else: self.samplers[par_name] = sampling_strategy[par_name] # Store parameter names self.par_names = self.target.get_parameter_names()
# ------------ Public methods ------------
[docs] def sample(self, Ns, Nb=0): """ Sample from target distribution """ # Initial points current_samples = self._get_initial_points() # Compute how many samples were already taken previously at_Nb = self._Nb at_Ns = self._Ns # Allocate memory for samples self._allocate_samples_warmup(Nb) self._allocate_samples(Ns) # Sample tuning phase for i in range(at_Nb, at_Nb+Nb): current_samples = self.step_tune(current_samples) self._store_samples(self.samples_warmup, current_samples, i) self._print_progress(i+1+at_Nb, at_Nb+Nb, 'Warmup') # Sample phase for i in range(at_Ns, at_Ns+Ns): current_samples = self.step(current_samples) self._store_samples(self.samples, current_samples, i) self._print_progress(i+1, at_Ns+Ns, 'Sample') # Convert to samples objects and return return self._convert_to_Samples(self.samples)
[docs] def step(self, current_samples): """ Sequentially go through all parameters and sample them conditionally on each other """ # Extract par names par_names = self.par_names # Sample from each conditional distribution for par_name in par_names: # Dict of all other parameters to condition on other_params = {par_name_: current_samples[par_name_] for par_name_ in par_names if par_name_ != par_name} # Set up sampler for current conditional distribution sampler = self.samplers[par_name](self.target(**other_params)) # Take a MCMC step current_samples[par_name] = sampler.step(current_samples[par_name]) # Ensure even 1-dimensional samples are 1D arrays current_samples[par_name] = current_samples[par_name].reshape(-1) return current_samples
[docs] def step_tune(self, current_samples): """ Perform a single MCMC step for each parameter and tune the sampler """ # Not implemented. No tuning happening here yet. Requires samplers to be able to be modified after initialization. return self.step(current_samples)
# ------------ Private methods ------------ def _allocate_samples(self, Ns): """ Allocate memory for samples """ # Allocate memory for samples samples = {} for par_name in self.par_names: samples[par_name] = np.zeros((self.target.get_density(par_name).dim, Ns)) # Store samples in self if hasattr(self, 'samples'): # Append to existing samples (This makes a copy) for par_name in self.par_names: samples[par_name] = np.hstack((self.samples[par_name], samples[par_name])) self.samples = samples def _allocate_samples_warmup(self, Nb): """ Allocate memory for samples """ # If we already have warmup samples and more are requested raise error if hasattr(self, 'samples_warmup') and Nb != 0: raise ValueError('Sampler already has run warmup phase. Cannot run warmup phase again.') # Allocate memory for samples samples = {} for par_name in self.par_names: samples[par_name] = np.zeros((self.target.get_density(par_name).dim, Nb)) self.samples_warmup = samples def _get_initial_points(self): """ Get initial points for each parameter """ initial_points = {} for par_name in self.par_names: if hasattr(self, 'samples'): initial_points[par_name] = self.samples[par_name][:, -1] elif hasattr(self, 'samples_warmup'): initial_points[par_name] = self.samples_warmup[par_name][:, -1] elif hasattr(self.target.get_density(par_name), 'init_point'): initial_points[par_name] = self.target.get_density(par_name).init_point else: initial_points[par_name] = np.ones(self.target.get_density(par_name).dim) return initial_points def _store_samples(self, samples, current_samples, i): """ Store current samples at index i of samples dict """ for par_name in self.par_names: samples[par_name][:, i] = current_samples[par_name] def _convert_to_Samples(self, samples): """ Convert each parameter in samples dict to cuqi.samples.Samples object with correct geometry """ samples_object = {} for par_name in self.par_names: samples_object[par_name] = Samples(samples[par_name], self.target.get_density(par_name).geometry) return samples_object def _print_progress(self, s, Ns, phase): """Prints sampling progress""" if Ns < 2: # Don't print progress if only one sample return if (s % (max(Ns//100,1))) == 0: msg = f'{phase} {s} / {Ns}' sys.stdout.write('\r'+msg) if s==Ns: msg = f'{phase} {s} / {Ns}' sys.stdout.write('\r'+msg+'\n') # ------------ Private properties ------------ @property def _Ns(self): """ Number of samples already taken """ if hasattr(self, 'samples'): return self.samples[self.par_names[0]].shape[-1] else: return 0 @property def _Nb(self): """ Number of samples already taken in warmup phase """ if hasattr(self, 'samples_warmup'): return self.samples_warmup[self.par_names[0]].shape[-1] else: return 0