Skip to content

Instantly share code, notes, and snippets.

@aflaxman
Forked from apatil/history_steps.py
Created April 15, 2011 00:02
Show Gist options
  • Save aflaxman/920869 to your computer and use it in GitHub Desktop.
Save aflaxman/920869 to your computer and use it in GitHub Desktop.
import pymc as pm
import numpy as np
# FIXME: Need to store duplicates too, when jumps are rejected. That means some mechanism
# for making sure the history is full-rank needs to be employed.
class HistoryCovarianceStepper(pm.StepMethod):
_state = ['n_points','history','tally','verbose']
def __init__(self, stochastic, n_points=None, init_history=None, verbose=None, tally=False):
"""
n_points : int, optional
init_history : (n,d)-array, optional
if init_history is included, n_points parameter is ignored and self.n_points == len(init_history)
"""
if not np.iterable(stochastic) or isinstance(stochastic, pm.Variable):
stochastic = [stochastic]
pm.StepMethod.__init__(self, stochastic, verbose, tally)
# Initialization methods
self.check_type()
self.dimension()
self.index = 0
if init_history is None:
self.n_points = n_points or max(self.dim, 20)
self.history = np.empty((self.n_points, self.dim))
for i in xrange(self.n_points):
for s in self.stochastics:
self.history[i,self._slices[s]] = s._random(**s.parents.value)
else:
self.history = init_history
self.n_points = len(init_history)
self.history_mean = np.mean(self.history, axis=0)
self._state = HistoryCovarianceStepper._state
def unstore(self):
index = (self.index-1)%self.n_points
self.history_mean += (self.last_value - self.history[index])/self.n_points
self.history[index] = self.last_value
def get_current_value(self):
current_value = np.empty(self.dim)
for s in self.stochastics:
current_value[self._slices[s]] = s.value
return current_value
def set_current_value(self, v):
for s in self.stochastics:
s.value = v[self._slices[s]]
def store(self, v):
self.last_value = self.history[self.index].copy()
self.history[self.index] = v
self.index = (self.index + 1) % self.n_points
self.history_mean += (v-self.last_value)/self.n_points
def check_type(self):
"""Make sure each stochastic has a correct type, and identify discrete stochastics."""
self.isdiscrete = {}
for stochastic in self.stochastics:
if stochastic.dtype in pm.StepMethods.integer_dtypes:
self.isdiscrete[stochastic] = True
elif stochastic.dtype in pm.StepMethods.bool_dtypes:
raise 'Binary stochastics not supported by AdaptativeMetropolis.'
else:
self.isdiscrete[stochastic] = False
def choose_direction(self, norm=True):
direction = np.dot(np.random.normal(size=self.n_points), self.history) - self.history_mean
if norm:
direction /= np.dot(direction, direction)
return direction
def dimension(self):
"""Compute the dimension of the sampling space and identify the slices
belonging to each stochastic.
"""
self.dim = 0
self._slices = {}
for stochastic in self.stochastics:
if isinstance(stochastic.value, np.matrix):
p_len = len(stochastic.value.A.ravel())
elif isinstance(stochastic.value, np.ndarray):
p_len = len(stochastic.value.ravel())
else:
p_len = 1
self._slices[stochastic] = slice(self.dim, self.dim + p_len)
self.dim += p_len
class HistoryAM(HistoryCovarianceStepper, pm.Metropolis):
_state = HistoryCovarianceStepper._state + ['accepted','rejected','adaptive_scale_factor','proposal_distribution','proposal_sd']
def __init__(self, stochastic, n_points=None, init_history=None, verbose=None, tally=False, proposal_sd=1):
HistoryCovarianceStepper.__init__(self, stochastic, n_points, init_history, verbose, tally)
self.proposal_distribution = "None"
self.adaptive_scale_factor = 1
self.accepted = 0
self.rejected = 0
self.proposal_sd=proposal_sd
self._state = HistoryAM._state
def reject(self):
for stochastic in self.stochastics:
stochastic.revert()
self.unstore()
def propose(self):
direction = self.choose_direction()
proposed_value = self.get_current_value() + direction * np.random.normal()*self.adaptive_scale_factor*self.proposal_sd
self.set_current_value(proposed_value)
self.store(proposed_value)
class HRAM(HistoryCovarianceStepper):
_state = HistoryCovarianceStepper._state + ['xprime_n', 'xprime_sds']
def __init__(self, stochastic, n_points=None, init_history=None, verbose=None, tally=False, xprime_sds=5, xprime_n=101):
HistoryCovarianceStepper.__init__(self, stochastic, n_points, init_history, verbose, tally)
self.xprime_sds = xprime_sds
self.xprime_n = xprime_n
self._state = HRAM._state
def step(self):
direction = self.choose_direction(norm=False)
direction += .5*np.random.normal(size=self.dim) # perturb direction to ensure full-rank exploration (TODO: make size of perturbation adaptive, too)
current_value = self.get_current_value()
x_prime = np.outer(np.linspace(-self.xprime_sds,self.xprime_sds,self.xprime_n),direction) + current_value
lps = np.empty(self.xprime_n)
for i in xrange(self.xprime_n):
self.set_current_value(x_prime[i])
try:
lps[i] = self.logp_plus_loglike
except pm.ZeroProbability:
lps[i] = -np.inf
next_value = x_prime[pm.rcategorical(np.exp(lps-pm.flib.logsum(lps)))]
self.set_current_value(next_value)
self.store(next_value)
if __name__ == '__main__':
import time
for kls in [HistoryAM, HRAM, pm.AdaptiveMetropolis]:
x = pm.Normal('x',0,1,size=1000)
y = pm.Normal('y',x,100,size=1000)
M = pm.MCMC([x,y])
# M.use_step_method(HistoryAM, [x,y])
if kls is pm.AdaptiveMetropolis:
M.use_step_method(kls, [x,y], delay=10, interval=10)
else:
M.use_step_method(kls, [x,y])
sm = M.step_method_dict[x][0]
if kls is HRAM:
sm.accepted = 0
sm.rejected = 0
t1 = time.time()
M.sample(100)
t2 = time.time()
print 'Class %s: %s seconds, accepted %i, rejected %i'%(kls.__name__, t2-t1, sm.accepted, sm.rejected)
import pylab as pl
pl.clf()
pl.plot(M.trace('x')[:,0], M.trace('y')[:,0],linestyle='none',marker='.',markersize=8,color=(.1,0,.8,),alpha=10./len(M.trace('x')[:]),markeredgewidth=0)
pl.title(kls.__name__)
from IPython.Debugger import Pdb
Pdb(color_scheme='Linux').set_trace()
# a = np.empty((1000,2))
# for i in xrange(1000):
# M.step_method_dict[x][0].step()
# a[i,0] = x.value
# a[i,1] = y.value
#
# import pylab as pl
# pl.plot(a[:,0], a[:,1], 'b.')
\documentclass[a4paper]{article}
\usepackage{fullpage}
\usepackage{epsfig}
\usepackage{pdfsync}
\usepackage{amsfonts}
\usepackage{amsmath}
\usepackage{amsthm}
\newtheorem{lemma}{Lemma}
\newtheorem*{definition}{Definition}
\newtheorem*{conjecture}{Conjecture}
\newtheorem*{theorem}{Theorem}
\begin{document}
\title{History-dependent MCMC}
\author{Anand}
\maketitle
You want to sample from target distribution $\pi$ on finite state space $\mathcal X$. You have a non-Markov kernel $K:\mathcal X^T\rightarrow(\mathcal X\rightarrow \mathbb R)$, so $K(x_{1\ldots T},\cdot)$ for length-$T$ history $x_{1\ldots T}$ is a probability distribution on $\mathcal X$. Define the $j$-step-ahead kernel $K^j$ by
\begin{eqnarray*}
K^j(x_{1\ldots T},x_{T+j})=\sum_{x_T+1}\ldots\sum_{x_{T+j-1}} \prod_{i=1}^j K(x_{i\ldots T+i-1}, x_{T+i}).
\end{eqnarray*}
\begin{definition}\label{def:marginally stationary}
$K$ is \emph{marginally stationary} with respect to $\pi$ if, for any distribution $\mu$ on $\mathcal X^T$ with marginals $\pi$,
\begin{eqnarray*}
E_{\mu(x_{1\ldots T})}\left[K(x_{1\ldots T}, x_{T+1})\right]=\pi(x_{t+1}).
\end{eqnarray*}
\end{definition}
That is, if $x_{1\ldots T}$ are distributed such that their marginals are all $\pi$, then after application of $x$ $x_{T+1}$ is distributed as $\pi$ as well. Note that for $T=1$, $K$ is a Markov kernel whose stationary distribution is $\pi$.
\begin{lemma}\label{lem:one}
If $K$ is marginally stationary with respect to $\pi$, and the distribution $\mu$ of $x_{1\ldots T}$ has marginals $\pi$, then the distribution $\mu^1$ of $x_{2\ldots T+1}$ after one iteration of $K$ has all of its marginals equal to $\pi$.
\end{lemma}
\begin{proof}
$\mu^1$ is
\begin{eqnarray*}
\mu^1(x_{2\ldots T+1})=\sum_{x_1}\mu(x_{1\ldots T})K(x_{1\ldots T},x_{T+1}).
\end{eqnarray*}
By marginally stationarity of $K$, the marginal of $x_{T+1}$ is $\pi$. For $i\in [2,T]$, the marginal can be computed as
\begin{eqnarray*}
\sum_{x_1}\ldots\sum_{x_{i-1}}\sum_{x_{i+1}}\ldots \sum_{x_T}\mu(x_{1\ldots T})\sum_{x_{T+1}}K(x_{1\ldots T},x_{T+1})
\end{eqnarray*}
which is simply equal to the $i$'th marginal of $\mu$, which is $\pi$ by hypothesis.
\end{proof}
\begin{lemma}\label{lem:two}
If $K$ is marginally stationary with respect to $\pi$, then the distribution $\mu^j$ of $x_{j+1\ldots T+j}$ after $j>1$ iterations of $K$ has all of its marginals equal to $\pi$.
\end{lemma}
\begin{proof}
$\mu^j$ is
\begin{eqnarray*}
\sum_{x_1}\ldots \sum_{x_j} \mu(x_{1\ldots T}) \prod_{i=1}^j K(x_{i\ldots T+i-1}, x_{T+i})\\
= \sum_{x_j} \left[\sum_{x_1}\ldots \sum_{x_{j-1}} \mu(x_{1\ldots T}) \prod_{i=1}^{j-1} K(x_{i\ldots T+i-1}, x_{T+i}) \right] K(x_{j\ldots T+j-1}, x_{T+j})\\
=\sum_{x_j} \mu^{j-1}(x_{j\ldots T+j-1}) K(x_{j\ldots T+j-1}, x_{T+j})
\end{eqnarray*}
A similar argument to lemma \ref{lem:one} shows that, if $\mu^{j-1}$'s marginals are all $\pi$, then $\mu^j$'s marginals are also all $\pi$. Since lemma \ref{lem:one} says that $\mu^1$'s marginals are all $\pi$, the claim follows by induction.
\end{proof}
\begin{theorem}\label{thm:marginals-preserved}
If $K$ is marginally stationary with respect to $\pi$, then $\pi$ is the stationary distribution for $K$ in the sense that, if the distribution $\mu$ of $x_{1\ldots T}$ has marginals $\pi$, then the marginal distribution $K^j\mu$ of $x_{T+j}$ is equal to $\pi$.
\end{theorem}
\begin{proof}\label{pf:marginals-preserved}
This is a special case of lemma \ref{lem:two}: after $K^j$ is applied to $\mu$, the marginals of the distribution of $x_{j+1\ldots T+j}$ are all equal to $\pi$.
\end{proof}
That means that many iterations of $K$ produce a usable MCMC trace for $x$ if the distribution of the initial history has the correct marginals. Note that all the theorems can be converted to `if and only if $K$ is marginally stationary' by considering all $\mu$ with marginals $\pi$, rather than particular distributions. % That makes me think it should be possible to prove the following, which says that if $K$ is marginally stationary with respect to $\pi$ then it is a valid jumping strategy.
% \begin{conjecture}
% If $K$ is marginally stationary with respect to $\pi$, then $K^j\eta$ converges to $\pi$ in some sense as $j\rightarrow \infty$ from any starting distribution $\eta$.
% \end{conjecture}
% \section*{Another class}
% Suppose $K$ is of the form $K(x_{1\ldots T},x_{T+1})=Q(x_{T},x_{T+1};\theta(x_{1\ldots T-1}))$, and that $Q$ is a reversible Markov kernel with respect to $\pi$ regardless of $\theta$. Wikipedia says that, for any probability distribution $\eta$ and tuning parameter $\theta$, Kullback-Leibler distance to $\pi$ decreases:
% \begin{eqnarray*}
% D\left(\eta(x_T)||\pi(x_T)\right)\ge D\left(\sum_{x_T\in\mathcal X}\eta(x_T)Q(x_T,x_{T+1};\theta)||\pi(x_T)\right)
% \end{eqnarray*}
% with equality only if $\eta=\pi$.
%
% % FIXME: This may not be true.
% \begin{lemma}\label{lem:Blah}
% For any starting distribution $\mu$,
% \begin{eqnarray*}
% D(\mu_T(x_T)||\pi(x_T))\ge D\left(\sum_{x_T\in\mathcal X}\mu_T(x_T)\sum_{x_1\in\mathcal X}\sum_{x_{T-1}\in\mathcal X}Q(x_T,x_{T+1};\theta)||\pi(x_T)\right)
% \end{eqnarray*}
% where $\mu_T$ denotes the $T$'th marginal of $\mu$. That is, if $K$ is used to generate an MCMC trace, the Kullback-Leibler divergence between the marginal of the head of the trace and $\pi$ does not increase. \textbf{NOTE:} this may not be true.
% \end{lemma}
% \begin{proof}\label{pf:kl-decreases}
% Expanding the left-hand term yields
% \begin{eqnarray*}
% D(\mu_T(x_T)||\pi(x_T))=\sum_{x_1\in\mathcal X}\ldots \sum_{x_T\in\mathcal X} \mu(x_{1\ldots T})\log \frac{\sum_{x_1\in\mathcal X}\ldots\sum_{x_{T-1}\in\mathcal X} \mu(x_{1\ldots T})}{\pi(x_T)}
% \end{eqnarray*}
% \end{proof}
\end{document}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment