Source code for cuqi.legacy.sampler._mh

import numpy as np
import cuqi
from cuqi.legacy.sampler import ProposalBasedSampler


[docs] class MH(ProposalBasedSampler): """Metropolis Hastings sampler. Allows sampling of a target distribution by random-walk sampling of a proposal distribution along with an accept/reject step. Parameters ---------- target : `cuqi.distribution.Distribution` or lambda function The target distribution to sample. Custom logpdfs are supported by using a :class:`cuqi.distribution.UserDefinedDistribution`. proposal : `cuqi.distribution.Distribution` or callable method The proposal to sample from. If a callable method it should provide a single independent sample from proposal distribution. Defaults to a Gaussian proposal. *Optional*. scale : float Scale parameter used to define correlation between previous and proposed sample in random-walk. *Optional*. x0 : ndarray Initial parameters. *Optional* dim : int Dimension of parameter space. Required if target and proposal are callable functions. *Optional*. callback : callable, *Optional* If set this function will be called after every sample. The signature of the callback function is `callback(sample, sample_index)`, where `sample` is the current sample and `sample_index` is the index of the sample. An example is shown in demos/demo31_callback.py. Example ------- .. code-block:: python # Parameters dim = 5 # Dimension of distribution mu = np.arange(dim) # Mean of Gaussian std = 1 # standard deviation of Gaussian # Logpdf function logpdf_func = lambda x: -1/(std**2)*np.sum((x-mu)**2) # Define distribution from logpdf as UserDefinedDistribution (sample and gradients also supported) target = cuqi.distribution.UserDefinedDistribution(dim=dim, logpdf_func=logpdf_func) # Set up sampler sampler = cuqi.legacy.sampler.MH(target, scale=1) # Sample samples = sampler.sample(2000) """ #target, proposal=None, scale=1, x0=None, dim=None # super().__init__(target, proposal=proposal, scale=scale, x0=x0, dim=dim)
[docs] def __init__(self, target, proposal=None, scale=None, x0=None, dim=None, **kwargs): """ Metropolis-Hastings (MH) sampler. Default (if proposal is None) is random walk MH with proposal that is Gaussian with identity covariance""" super().__init__(target, proposal=proposal, scale=scale, x0=x0, dim=dim, **kwargs)
@ProposalBasedSampler.proposal.setter def proposal(self, value): fail_msg = "Proposal should be either None, symmetric cuqi.distribution.Distribution or a lambda function." if value is None: self._proposal = cuqi.distribution.Gaussian(np.zeros(self.dim), 1) elif not isinstance(value, cuqi.distribution.Distribution) and callable(value): raise NotImplementedError(fail_msg) elif isinstance(value, cuqi.distribution.Distribution) and value.is_symmetric: self._proposal = value else: raise ValueError(fail_msg) self._proposal.geometry = self.target.geometry def _sample(self, N, Nb): if self.scale is None: raise ValueError("Scale must be set to sample without adaptation. Consider using sample_adapt instead.") Ns = N+Nb # number of simulations # allocation samples = np.empty((self.dim, Ns)) target_eval = np.empty(Ns) acc = np.zeros(Ns, dtype=int) # initial state samples[:, 0] = self.x0 target_eval[0] = self.target.logd(self.x0) acc[0] = 1 # run MCMC for s in range(Ns-1): # run component by component samples[:, s+1], target_eval[s+1], acc[s+1] = self.single_update(samples[:, s], target_eval[s]) self._print_progress(s+2,Ns) #s+2 is the sample number, s+1 is index assuming x0 is the first sample self._call_callback(samples[:, s+1], s+1) # remove burn-in samples = samples[:, Nb:] target_eval = target_eval[Nb:] accave = acc[Nb:].mean() print('\nAverage acceptance rate:', accave, '\n') # return samples, target_eval, accave def _sample_adapt(self, N, Nb): # Set intial scale if not set if self.scale is None: self.scale = 0.1 Ns = N+Nb # number of simulations # allocation samples = np.empty((self.dim, Ns)) target_eval = np.empty(Ns) acc = np.zeros(Ns) # initial state samples[:, 0] = self.x0 target_eval[0] = self.target.logd(self.x0) acc[0] = 1 # initial adaptation params Na = int(0.1*N) # iterations to adapt hat_acc = np.empty(int(np.floor(Ns/Na))) # average acceptance rate of the chains lambd = self.scale star_acc = 0.234 # target acceptance rate RW i, idx = 0, 0 # run MCMC for s in range(Ns-1): # run component by component samples[:, s+1], target_eval[s+1], acc[s+1] = self.single_update(samples[:, s], target_eval[s]) # adapt prop spread using acc of past samples if ((s+1) % Na == 0): # evaluate average acceptance rate hat_acc[i] = np.mean(acc[idx:idx+Na]) # d. compute new scaling parameter zeta = 1/np.sqrt(i+1) # ensures that the variation of lambda(i) vanishes lambd = np.exp(np.log(lambd) + zeta*(hat_acc[i]-star_acc)) # update parameters self.scale = min(lambd, 1) # update counters i += 1 idx += Na # display iterations self._print_progress(s+2,Ns) #s+2 is the sample number, s+1 is index assuming x0 is the first sample # remove burn-in samples = samples[:, Nb:] target_eval = target_eval[Nb:] accave = acc[Nb:].mean() print('\nAverage acceptance rate:', accave, 'MCMC scale:', self.scale, '\n') return samples, target_eval, accave
[docs] def single_update(self, x_t, target_eval_t): # propose state xi = self.proposal.sample(1) # sample from the proposal x_star = x_t + self.scale*xi.flatten() # MH proposal # evaluate target target_eval_star = self.target.logd(x_star) # ratio and acceptance probability ratio = target_eval_star - target_eval_t # proposal is symmetric alpha = min(0, ratio) # accept/reject u_theta = np.log(np.random.rand()) if (u_theta <= alpha): x_next = x_star target_eval_next = target_eval_star acc = 1 else: x_next = x_t target_eval_next = target_eval_t acc = 0 return x_next, target_eval_next, acc