Source code for cuqi.diagnostics

# ===================================================================
# Created by:
# Felipe Uribe @ DTU
# ===================================================================
# Version 2021-04
# ===================================================================
import numpy as np
import scipy as sp
import scipy.stats as sps
import warnings



# ===================================================================
# ===================================================================
# ===================================================================
[docs] def Geweke(X, A = 0.1, B = 0.5): # Geweke's MCMC convergence diagnostic: # Test for equality of the means of the first A% (default 10%) # and last B (50%) of a Markov chain Ns, _ = X.shape nA = int(np.floor(A*Ns)) nB = int(Ns - np.floor(B*Ns)) # extract sub chains X_A, X_B = X[:nA, :], X[nB:, :] # compute the mean mean_X_A, mean_X_B = X_A.mean(axis=0), X_B.mean(axis=0) # Spectral estimates for variance var_X_A, var_X_B = spectrum0(X_A), spectrum0(X_B) # params of the geweke z = (mean_X_A - mean_X_B) / (np.sqrt((var_X_A/nA) + (var_X_B/(Ns-nB+1)))) p = 2*(1-sps.norm.cdf(abs(z))) # show idx1 = np.where(p >= 0.95) idx2 = np.where(p < 0.95) warnings.warn("Geweke's diagnostics is a work-in-progress") print('Geweke test passed at indices ', idx1, '\n') print('Geweke test NOT passed at indices ', idx2, '\n') return z, p
# ===================================================================
[docs] def spectrum0(x): # Spectral density at frequency zero # Marko Laine <marko.laine@fmi.fi> m, n = x.shape s = np.empty(n) for i in range(n): # check this later: sp.signal.welch(x[:, i])[0] to avoid 'spectrum' fun spec, _ = spectrum(x[:, i], m) s[i] = spec[0] return s
# ===================================================================
[docs] def spectrum(x, nfft): # Power spectral density using Hanning window # Marko Laine <marko.laine@fmi.fi> n = len(x) nw = int(np.fix(nfft/4)) noverlap = int(np.fix(nw/2)) if (n < nw): x[nw], n = 0, nw # Hanning window idx = np.arange(1, nw+1, 1) w = 0.5*(1 - np.cos(2*np.pi*idx/(nw+1))) # check this later: np.hanning(nw) # estimate PSD k = int(np.fix((n-noverlap)/(nw-noverlap))) # number of windows kmu = k*np.linalg.norm(w)**2 # normalizing scale factor y = np.zeros(nfft) for _ in range(k): xw = w*x[idx-1] idx += (nw - noverlap) Xx = abs(np.fft.fft(xw, nfft))**2 y += Xx y = y*(1/kmu) # normalize n2 = int(np.floor(nfft/2)) idx2 = np.arange(0, n2, 1) y = y[idx2] f = 1/(n*idx2) return y, f
# =================================================================== # =================================================================== # ===================================================================