Source code for cuqi.distribution._truncated_normal
import numpy as np
from scipy.special import erf
from cuqi.utilities import force_ndarray
from cuqi.distribution import Distribution
from cuqi.distribution import Normal
[docs]
class TruncatedNormal(Distribution):
"""
Truncated Normal probability distribution.
Generates instance of cuqi.distribution.TruncatedNormal.
It allows the user to specify upper and lower bounds on random variables
represented by a Normal distribution. This distribution is suitable for a
small dimension setup (e.g. `dim`=3 or 4). Using TruncatedNormal
Distribution with a larger dimension can lead to a high rejection rate when
used within MCMC samplers.
The variables of this distribution are iid.
Parameters
------------
mean : float or array_like of floats
mean of distribution
std : float or array_like of floats
standard deviation
low : float or array_like of floats
lower bound of the distribution
high : float or array_like of floats
upper bound of the distribution
Example
-----------
.. code-block:: python
#Generate Normal with mean 0, standard deviation 1 and bounds [-2,2]
p = cuqi.distribution.TruncatedNormal(mean=0, std=1, low=-2, high=2)
samples = p.sample(5000)
"""
[docs]
def __init__(self, mean=None, std=None, low=-np.inf, high=np.inf, is_symmetric=False, **kwargs):
# Init from abstract distribution class
super().__init__(is_symmetric=is_symmetric, **kwargs)
# Init specific to this distribution
self.mean = mean
self.std = std
self.low = low
self.high = high
# Init underlying normal distribution
self._normal = Normal(self.mean, self.std, is_symmetric=True, **kwargs)
@property
def mean(self):
""" Mean of the distribution """
return self._mean
@mean.setter
def mean(self, value):
self._mean = force_ndarray(value, flatten=True)
if hasattr(self, '_normal'):
self._normal.mean = self._mean
@property
def std(self):
""" Std of the distribution """
return self._std
@std.setter
def std(self, value):
self._std = force_ndarray(value, flatten=True)
if hasattr(self, '_normal'):
self._normal.std = self._std
@property
def low(self):
""" Lower bound of the distribution """
return self._low
@low.setter
def low(self, value):
self._low = force_ndarray(value, flatten=True)
@property
def high(self):
""" Higher bound of the distribution """
return self._high
@high.setter
def high(self, value):
self._high = force_ndarray(value, flatten=True)
[docs]
def logpdf(self, x):
"""
Computes the unnormalized logpdf at the given values of x.
"""
# the unnormalized logpdf
# check if x falls in the range between np.array a and b
if np.any(x < self.low) or np.any(x > self.high):
return -np.inf
else:
return self._normal.logpdf(x)
def _gradient(self, x, *args, **kwargs):
"""
Computes the gradient of the unnormalized logpdf at the given values of x.
"""
# check if x falls in the range between np.array a and b
if np.any(x < self.low) or np.any(x > self.high):
return np.nan*np.ones_like(x)
else:
return self._normal.gradient(x, *args, **kwargs)
def _sample(self, N=1, rng=None):
"""
Generates random samples from the distribution.
"""
max_iter = 1e9 # maximum number of trials to avoid infinite loop
samples = []
for i in range(int(max_iter)):
if len(samples) == N:
break
sample = self._normal.sample(1,rng)
if np.all(sample >= self.low) and np.all(sample <= self.high):
samples.append(sample)
# raise a error if the number of iterations exceeds max_iter
if i == max_iter-1:
raise RuntimeError("Failed to generate {} samples within {} iterations".format(N, max_iter))
return np.array(samples).T.reshape(-1,N)