Source code for cuqi.sampler._cwmh
import numpy as np
import cuqi
from cuqi.sampler import ProposalBasedSampler
from cuqi.array import CUQIarray
from numbers import Number
[docs]
class CWMH(ProposalBasedSampler):
"""Component-wise Metropolis Hastings sampler.
Allows sampling of a target distribution by a component-wise random-walk
sampling of a proposal distribution along with an accept/reject step.
Parameters
----------
target : `cuqi.distribution.Distribution` or lambda function
The target distribution to sample. Custom logpdfs are supported by using
a :class:`cuqi.distribution.UserDefinedDistribution`.
proposal : `cuqi.distribution.Distribution` or callable method
The proposal to sample from. If a callable method it should provide a
single independent sample from proposal distribution. Defaults to a
Gaussian proposal. *Optional*.
scale : float or ndarray
Scale parameter used to define correlation between previous and proposed
sample in random-walk. *Optional*. If float, the same scale is used for
all dimensions. If ndarray, a (possibly) different scale is used for
each dimension.
initial_point : ndarray
Initial parameters. *Optional*
callback : callable, optional
A function that will be called after each sampling step. It can be useful for monitoring the sampler during sampling.
The function should take three arguments: the sampler object, the index of the current sampling step, the total number of requested samples. The last two arguments are integers. An example of the callback function signature is: `callback(sampler, sample_index, num_of_samples)`.
kwargs : dict
Additional keyword arguments to be passed to the base class
:class:`ProposalBasedSampler`.
Example
-------
.. code-block:: python
import numpy as np
import cuqi
# Parameters
dim = 5 # Dimension of distribution
mu = np.arange(dim) # Mean of Gaussian
std = 1 # standard deviation of Gaussian
# Logpdf function
logpdf_func = lambda x: -1/(std**2)*np.sum((x-mu)**2)
# Define distribution from logpdf as UserDefinedDistribution (sample
# and gradients also supported as inputs to UserDefinedDistribution)
target = cuqi.distribution.UserDefinedDistribution(
dim=dim, logpdf_func=logpdf_func)
# Set up sampler
sampler = cuqi.sampler.CWMH(target, scale=1)
# Sample
samples = sampler.sample(2000).get_samples()
"""
_STATE_KEYS = ProposalBasedSampler._STATE_KEYS.union(['_scale_temp'])
[docs]
def __init__(self, target:cuqi.density.Density=None, proposal=None, scale=1,
initial_point=None, **kwargs):
super().__init__(target, proposal=proposal, scale=scale,
initial_point=initial_point, **kwargs)
def _initialize(self):
if isinstance(self.scale, Number):
self.scale = np.ones(self.dim)*self.scale
self._acc = [np.ones((self.dim))] # Overwrite acc from ProposalBasedSampler with list of arrays
# Handling of temporary scale parameter due to possible bug in old CWMH
self._scale_temp = self.scale.copy()
@property
def scale(self):
""" Get the scale parameter. """
return self._scale
@scale.setter
def scale(self, value):
""" Set the scale parameter. """
if self._is_initialized and isinstance(value, Number):
value = np.ones(self.dim)*value
self._scale = value
[docs]
def validate_target(self):
if not isinstance(self.target, cuqi.density.Density):
raise ValueError(
"Target should be an instance of "+\
f"{cuqi.density.Density.__class__.__name__}")
# Fail when there is no log density, which is currently assumed to be the case in case NaN is returned.
if np.isnan(self.target.logd(self._get_default_initial_point(self.dim))):
raise ValueError("Target does not have valid logd")
[docs]
def validate_proposal(self):
if not isinstance(self.proposal, cuqi.distribution.Distribution):
raise ValueError("Proposal must be a cuqi.distribution.Distribution object")
if not self.proposal.is_symmetric:
raise ValueError("Proposal must be symmetric")
@property
def proposal(self):
if self._proposal is None:
self._proposal = cuqi.distribution.Normal(
mean=lambda location: location,
std=lambda scale: scale,
geometry=self.dim,
)
return self._proposal
@proposal.setter
def proposal(self, value):
self._proposal = value
[docs]
def step(self):
# Initialize x_t which is used to store the current CWMH sample
x_t = self.current_point.copy()
# Initialize x_star which is used to store the proposed sample by
# updating the current sample component-by-component
x_star = self.current_point.copy()
# Propose a sample x_all_components from the proposal distribution
# for all the components
target_eval_t = self.current_target_logd
if isinstance(self.proposal,cuqi.distribution.Distribution):
x_all_components = self.proposal(
location= self.current_point, scale=self.scale).sample()
else:
x_all_components = self.proposal(self.current_point, self.scale)
# Initialize acceptance rate
acc = np.zeros(self.dim)
# Loop over all the components of the sample and accept/reject
# each component update.
for j in range(self.dim):
# propose state x_star by updating the j-th component
x_star[j] = x_all_components[j]
# evaluate target
target_eval_star = self.target.logd(x_star)
# compute Metropolis acceptance ratio
alpha = min(0, target_eval_star - target_eval_t)
# accept/reject
u_theta = np.log(np.random.rand())
if (u_theta <= alpha) and \
(not np.isnan(target_eval_star)) and \
(not np.isinf(target_eval_star)):
x_t[j] = x_all_components[j]
target_eval_t = target_eval_star
acc[j] = 1
x_star = x_t.copy()
self.current_target_logd = target_eval_t
self.current_point = x_t
return acc
[docs]
def tune(self, skip_len, update_count):
# Store update_count in variable i for readability
i = update_count
# Optimal acceptance rate for CWMH
star_acc = 0.21/self.dim + 0.23
# Mean of acceptance rate over the last skip_len samples
hat_acc = np.mean(self._acc[i*skip_len:(i+1)*skip_len], axis=0)
# Compute new intermediate scaling parameter scale_temp
# Factor zeta ensures that the variation of the scale update vanishes
zeta = 1/np.sqrt(update_count+1)
scale_temp = np.exp(
np.log(self._scale_temp) + zeta*(hat_acc-star_acc))
# Update the scale parameter
self.scale = np.minimum(scale_temp, np.ones(self.dim))
self._scale_temp = scale_temp