Langevin Diffusion Tutorial: Hands-on IntroductionΒΆ
Diffusion models have generated a lot of excitement in recent years as powerful generative models that allow to generate novel images from text prompts (see Stable Diffusion), molecules, or proteins. Diffusion models rely on Stochastic Differential Equations, a hard to digest mathematical concept for somehow new to the field. Therefore, I want to provide a bit more intuition to SDEs. In this tutorial, we will study a specific stochastic differential equation: the Langevin diffusion - a fundamental SDE to understand diffusion models. In the next tutorial, we will then dive deeper into Ito-SDEs - the basis for most concepts.
Introduction to Langevin diffusion. The Langevin diffusion has been discovered in physics to describe the motion of particles driven by random and deterministic forces. Due to the random forces, it is a stochastic process that in generative AI describes the evolution of a probability distribution over time. It is named after the French mathematician Paul Langevin, who first introduced the concept in the early 20th century.
In the context of generative modeling, the Langevin diffusion is often used as a way to sample from a probability distribution $p(\mathbf{x})$ that is difficult to sample from directly. $p$ can be the distribution of random portraits of people - and we can generate a novel image from it. The following are examples of images generated by a Langevin diffusion from Song and Ermon 2019 trained on the MNIST dataset (left), the CelebA dataset (middle), and CIFAR-10 (right):
The idea of the Langevin diffusion is to start with an initial "seed" sample $X_{0}$ and then iteratively update it (i.e. compute $X_{t+s}$ from $X_t$) using a stochastic differential equation that pushes $X_t$ into regions of high probability $p$. If we run the updates long enough ($t\to\infty$), we have
$$\mathbb{P}(X_t\in A) \approx p(A)$$
In other words, by evolving $X_t$ over time long enough, we can approximately generate samples from a probability distribution $p$. If $p$ describes a distribution over images or text, we can generate novel images, text, and many more data types. Here is an example from Nvidia describes the evolution of an initial white noise seed $X_0$ to a sample $X_t$ that gives a realistic sample of a cat:
What is this post about? The goal of this post is to give a hands-on introduction to the Langevin diffusion - understand why it is working and how it can be used. We are focusing here on explaining the mathematical concept to help you understand the literature on diffusion models.
What's next? In my next tutorial, I will you a thorough introduction into the world SDEs and how they can be used for diffusion models.
import numpy as np
import math
import matplotlib.pyplot as plt
from typing import List, Callable
from itertools import product
import sys
import seaborn as sns
1. Preparation: Define tools to visualize and construct distributionsΒΆ
Let's begin by first constructing a probability distribution $p(\mathbf{x})$ that we would like to sample from. In this post, we consider 2-dimensional Gaussian mixture distributions. A Gaussian mixture is given by a weighted sum of Gaussian distributions: $$p(\mathbf{x}) = \sum\limits_{j=1}^{n}q_j \mathcal{N}(\mathbf{x};\mu_i,\Sigma_{i}), \quad \text{where } q_j\in\mathbb{R},\sum\limits_{j=1}^{n}q_j = 1,\mu_i\in\mathbb{R}^2,\Sigma_{i}\in\mathbb{R}^{2\times 2}$$
This chapter describes how we construct such distributions. For the rushed reader, this chapter can safely skipped.
1.1. Visualize probability densitiesΒΆ
Let's build a utility function that allows us to plot probability densities.
def plot_density(density_func: Callable, fpath: str = None, min_x: float = -5.0, max_x: float = 5.0, n_grid_points: int = 50, plot_contours: bool = False, my_axis=None, **kwargs):
"""
Function to plot a probability density.
Arguments:
density_func: probability density function.
fpath: save image to this file path (if given)
min_x,max_x: float - borders of grid
n_grid_points: int - number of grid points,
**kwargs: arguments for density function
Returns:
axis object with plot.
"""
oned_grid = np.linspace(min_x, max_x, n_grid_points)
twod_grid = np.array([[x,y] for x,y in product(oned_grid,oned_grid)])
extent = [oned_grid.min(), oned_grid.max(), oned_grid.min(), oned_grid.max()]
result = density_func(x=twod_grid,**kwargs)
if my_axis is None:
fig,my_axis = plt.subplots()
my_axis.imshow(result.reshape(n_grid_points,n_grid_points).transpose(),interpolation='bilinear',origin='lower', extent = extent, cmap=plt.get_cmap('YlOrRd'))
if plot_contours:
my_axis.contour(result.reshape(n_grid_points,n_grid_points).transpose(),interpolation='bilinear',origin='lower', extent = extent,color='black')
if fpath is not None:
plt.savefig(fpath)
return my_axis
1.2. Gaussian distributionsΒΆ
Let's build a module that allows to compute various functions of a Gaussian distribution, namely the density $\mathcal{N}(\mathbf{x};\mu,\Sigma)$, log-density $\log \mathcal{N}(\mathbf{x};\mu,\Sigma)$, score $\nabla_{\mathbf{x}} \log \mathcal{N}(\mathbf{x};\mu,\Sigma)$, and gradient of the density $\nabla_{\mathbf{x}}\mathcal{N}(\mathbf{x};\mu,\Sigma)$.
def get_gaussian(representation: str, x: np.array, mean: np.array = None, cov: np.array = None):
"""
Function to compute Gaussian.
representation - str: how to represent the Gaussian distribution.
Can be either "density", "log_density", "gradient_of_density", or "score"
x - np.array: shape (n,k) - data, i.e. n locations in k-dimensional euclidean space.
mean - shape (k): mean of Gaussian
cov - shape (k,k): covariance matrix
"""
#Get dimensions:
n = x.shape[0]
k = x.shape[1]
#Center data:
if mean is not None:
assert len(mean) == k
x = x - mean[None,:]
#Check covariance matrix:
if cov is None:
cov = np.eye(k)
else:
assert cov.shape == (k,k)
#Invert covariance matrix:
inv_cov = np.linalg.inv(cov)
#Compute exponent of the normal distribution:
quadratic_product = (x*np.dot(x,inv_cov)).sum(axis=1)
#Normalizing constant:
denominator = math.sqrt((2**x.shape[1])*np.linalg.det(cov))
#Compute formulas for various representations:
if representation == "density":
#1. The density of the normal distribution is given by the well-known formula:
return np.exp(-0.5*quadratic_product)/denominator
elif representation == "log_density":
#2. The log-density simplifies to:
return -0.5*quadratic_product - np.log(denominator)
elif representation == "gradient_of_density":
#3. By applying the chain rule, one can infer that the gradient of the density is:
return -np.dot(x,inv_cov)*np.exp(-0.5*quadratic_product[:,None])/denominator
elif representation == "score":
#4. The score is the gradient of log_density, i.e.:
return -np.dot(x,inv_cov)
def gaussian_density(**kwargs):
"""Function to get Gaussian density."""
return get_gaussian(representation="density", **kwargs)
def gaussian_log_density(**kwargs):
"""Function to get Gaussian log density."""
return get_gaussian(representation="log_density", **kwargs)
def gaussian_score(**kwargs):
"""Function to get Gaussian score."""
return get_gaussian(representation="score", **kwargs)
def gaussian_grad_density(**kwargs):
"""Function to get Gaussian gradient of density."""
return get_gaussian(representation="gradient_of_density", **kwargs)
def plot_gaussian_density(**kwargs):
"""Function to plot a Gaussian density"""
return plot_density(density_func=gaussian_density,**kwargs)
Let's plot an example distribution $\mathcal{N}(\mathbf{x};\mu,\Sigma)$ for $\mu=(1,2)^T$ and $$\Sigma = \begin{pmatrix} 2 & 1 \\ 1 & 2\end{pmatrix}$$
plot_gaussian_density(mean=np.array([1,2]),cov=np.array([[2,1],[1,2]]));
1.2 Gaussian mixturesΒΆ
Let's build a function that allows to build Gaussian mixture distributions.
def get_gaussian_mixture(representation: str,
x: np.array,
mean_list: List[np.array],
cov_list: List[np.array] = None,
mixture_probs: np.array = None):
"""Function to compute Gaussian mixtures.
representation - str: how to represent the Gaussian mixture.
Can be either "density" or "score"
x - np.array: shape (n,k) - data, i.e. n locations in k-dimensional euclidean space.
mean_list - list of mean arrays of shape (k): means of individual Gaussian distributions
cov_list - list of covariance matrices of shape (k,k): covariance matrices of individual Gaussian distributions
"""
if cov_list is None:
cov_list = len(mean_list)*[None]
if mixture_probs is None:
mixture_probs = np.ones(len(mean_list))/len(mean_list)
if representation == "density":
#1. The density of a Gaussian mixture is the weighted sum of its individual
#Gaussian distributions.
density = np.zeros(shape=len(x))
for mean,cov,prob in zip(mean_list,cov_list,mixture_probs):
density += prob*gaussian_density(x=x,mean=mean,cov=cov)
return density
elif representation == "score":
#2. The score of a Gaussian mixture is given by the weighted sum of the individual
#gradients of the probability density divided by the density.
#Initalize:
score = np.zeros(shape=(len(x),x.shape[1]))
#Nominator is the mixture of the gradient of the density:
for mean,cov,prob in zip(mean_list,cov_list,mixture_probs):
score += prob*get_gaussian(representation="gradient_of_density",x=x,mean=mean,cov=cov)
#Denominator of the score is the density:
density = get_gaussian_mixture("density",x,mean_list,cov_list,mixture_probs)
#Compute fraction:
score = score/density[:,None]
return score
#else:
# sys.exit(f"Invalid representation: {representation}.")
def gaussian_mixture_density(**kwargs):
"""Function to get density of a Gaussian mixture distribution."""
return get_gaussian_mixture(representation="density", **kwargs)
def gaussian_mixture_score(**kwargs):
"""Function to get the score of a Gaussian mixture distribution."""
return get_gaussian_mixture(representation="score", **kwargs)
def plot_gaussian_mixture_density(**kwargs):
"""Function to plot a Gaussian mixture density."""
return plot_density(density_func=gaussian_mixture_density,**kwargs)
Let's plot an example mixture distribution
MEAN_LIST = [np.array([-2,3]),np.array([0,0]), np.array([3,-4]),np.array([-3,-3]),np.array([6,4])]
COV_LIST = [np.array([[2,1],[1,2]]),np.array([[1,-1],[-1,4]]),np.array([[1,0.2],[0.2,1.0]]),np.eye(2),np.eye(2)]
MIXTURE_PROBS = [0.225,0.3,0.125,0.15,0.20]
plot_gaussian_mixture_density(mean_list=MEAN_LIST,
cov_list=COV_LIST,
mixture_probs=MIXTURE_PROBS,
min_x=-8.0,
max_x=8.0,
plot_contours=True);
/var/folders/z8/hy1b8vjj63xfzqjbz_r7pz2nx23r5d/T/ipykernel_67580/278216885.py:22: UserWarning: The following kwargs were not used by contour: 'interpolation', 'color' my_axis.contour(result.reshape(n_grid_points,n_grid_points).transpose(),interpolation='bilinear',origin='lower', extent = extent,color='black')
Our Goal is to build a Langevin diffusion that samples from the above probability distribution.
2. The Langevin Stochastic Differential EquationΒΆ
The Langevin Diffusion describes a Stochastic Differential Equation, i.e. the evolution of random variable $X_t\in\mathbb{R}$ over time $t\geq 0$:
\begin{align*} dX_t = \nabla\log p(X_t) + \sqrt{2}W_t \end{align*} where $W_t$ is a Brownian motion. If you are not familar with SDEs, you are probably wondering at this point what such a notation means or even what a Brownian motion is. For our purposes, the above equation simply means that the above process can be approximately computed by the following discretization: \begin{align*} X_{t+s} = X_{t}+s\nabla\log p(X_{t})+\sqrt{2s}\epsilon \end{align*} where $s>0$ is the step size and $\epsilon\sim\mathcal{N}(0,\mathbf{I}_d)$. In other words, the update of the step $X_t$ is performed by:
- Going into the direction of the gradient: $\nabla\log p(X_{t})$.
- Adding some random Gaussian noise $\epsilon\sim\mathcal{N}(0,I)$.
2.1. Intuitively understanding the Langevin SDEΒΆ
Intuitively understand the Langevin Diffusion is simpler than you might think. Remember that our goal is to sample from $p(\mathbf{x})$. That means that $X_t$ evolves over time such that $\mathbb{P}(X_t\in A)\approx p(A)$. The direction of the gradient $\nabla\log p(X_{t})$ pushes $X_t$ into directions of high probability, i.e. where $p(\mathbf{x})$ is high. That ensures that we move to regions of high probability. Let's plot the score $\nabla\log p(X_{t})$ and look at it.
def plot_score_vector_field(score_func: Callable, density_func: Callable, fpath: str = None, min_x: float = -5.0, max_x: float = 5.0, n_grid_points: int = 50, plot_contours: bool = False, **kwargs):
"""
Function to plot a probability density.
Arguments:
density_func: probability density function.
fpath: save image to this file path (if given)
min_x,max_x: float - borders of grid
n_grid_points: int - number of grid points,
**kwargs: arguments for density function
Returns:
axis object with plot.
"""
oned_grid = np.linspace(min_x, max_x, n_grid_points)
twod_grid = np.array([[x,y] for x,y in product(oned_grid,oned_grid)])
extent = [oned_grid.min(), oned_grid.max(), oned_grid.min(), oned_grid.max()]
result = density_func(x=twod_grid,**kwargs)
score = score_func(x=twod_grid,**kwargs)
fig,my_axis = plt.subplots(figsize=(12,12))
my_axis.imshow(result.reshape(n_grid_points,n_grid_points).transpose(),interpolation='bilinear',origin='lower', extent = extent, cmap=plt.get_cmap('YlOrRd'))
if plot_contours:
my_axis.contour(result.reshape(n_grid_points,n_grid_points).transpose(),interpolation='bilinear',origin='lower', extent = extent,color='black')
if fpath is not None:
plt.savefig(fpath)
my_axis.quiver(twod_grid[:,0],twod_grid[:,1],score[:,0],score[:,1],scale=60.0)
return my_axis
plot_score_vector_field(score_func=gaussian_mixture_score,
density_func=gaussian_mixture_density,
mean_list=MEAN_LIST,
cov_list=COV_LIST,
mixture_probs=MIXTURE_PROBS,
min_x=-8.0,
max_x=8.0,
plot_contours=False)
<Axes: >
As one can see above, the score vector field $\nabla\log p(X_{t})$ points into directions of high probability. If we just moved into the direction of $\nabla\log p(X_{t})$, we would simply perform gradient ascent (the sister algorithm of the famous gradient descent algorithm). In particular, we would very quickly end up in a local maximum of $p(\mathbf{x})$ - there would be no movement anywhere, we would be stuck.
Therefore, we need to inject some noise into our diffusion $X_t$ to keep some randomness in the process. With the Gaussian noise injected, the Langevin diffusion moves into directions of high probability $p(\mathbf{x})$ on average. That makes it plausible that this allows us to sample from $p(\mathbf{x})$.
Intuitively, the Langevin diffusion is gradient ascent maximing the log-probability $\log p(\mathbf{x})$ injected with random Gaussian noise $\epsilon\sim\mathcal{N}(0,I)$.
2.2. A Mathematical Derivation of the Langevin DiffusionΒΆ
Let's try to understand mathematically why the Langevin Diffusion converges to the probability distribution $p(\mathbf{x})$ over time. Our goal is to show that $p$ is an equilibrium distribution for the Langevin diffusion for $\tau\to 0$. I.e. if $X_t\sim p$, then also $X_{t+s}\sim p$ for every $s>0$.
Let $f:\mathbb{R}^d\to\mathbb{R}$ be a test function, i.e. a smooth (infinitely differentiable) function with compact support. Then it suffices to show that $\mu_{t}(f)=\mathbb{E}[f(X_{t})]$ is constant across time $t$ for $\tau\to 0$ (assuming regularity of $p$, in particular $p>0$ everywhere).
Let's do a Taylor approximation of order 2: \begin{align*} \mu_{t+\tau}(f)=& \mathbb{E}[f(X_{t+\tau})]\\ =&\mathbb{E}[f(X_{t}+\tau\nabla\log p(X_{t})+\sqrt{2\tau}\epsilon)]\\ \approx&\mathbb{E}[f(X_t)]+\mathbb{E}[\nabla f(X_t)^T(\tau\nabla\log p(X_{t})+\sqrt{2\tau}\epsilon))]\\ &+ \frac{1}{2}\mathbb{E}[(\tau\nabla\log p(X_{t})+\sqrt{2\tau}\epsilon)^T\nabla^2 f(X_t)(\tau\nabla\log p(X_{t})+\sqrt{2\tau}\epsilon)]\\ =&\mathbb{E}[f(X_t)]+\tau\mathbb{E}[\nabla f(X_t)^T(\nabla\log p(X_{t})]+\sqrt{2\tau}\mathbb{E}[\nabla f(X_t)^T\epsilon]\\ &+\frac{\tau^2}{2}\mathbb{E}[\nabla\log p(X_{t})^T\nabla^2 f(X_t)\nabla\log p(X_{t})]\\ &+\sqrt{2\tau}\mathbb{E}[\epsilon^T\nabla^2 f(X_t)(\tau\nabla\log p(X_{t}))]\\ &+\tau\mathbb{E}[\epsilon^T\nabla^2 f(X_t)\epsilon]\\ =&\mu_t(f)+A+B+C+D+E \end{align*} Let's compute the summands $A,B,C,D,E$ step by step. We know that $B,D=0$ as $\mathbb{E}[\epsilon]=0$ and $\epsilon$ is independent of $X_t$. Let's compute A by partial integration and show that it equals $-E$: \begin{align*} A=&\tau\mathbb{E}[\nabla f(X_t)^T\nabla\log p(X_{t})] \\ =&\tau \int \nabla f(x)^T\nabla\log p(x)p(x)dx \\ =&\tau \int \nabla f(x)^T\nabla p(x)dx \\ =&-\tau \int \text{tr}(\nabla^2 f(x))p(x)dx \\ =&-\tau\mathbb{E}[\text{tr}(\nabla^2 f(X_t))]\\ =&-\tau\mathbb{E}[\epsilon^T\text{tr}(\nabla^2 f(X_t))\epsilon]\\ =&-E \end{align*} where have used $X_t\sim p$ by assumption. Therefore, we can see that $\mu_{t+\tau}(f)-\mu_{t}(f)=A+C+E=-E+C+E=C$ and hence, \begin{align*} \frac{|\mu_{t+\tau}(f)-\mu_t(f)|}{\tau} \approx & \frac{|C|}{\tau}\\ \leq & \frac{\tau^2}{2\tau}\mathbb{E}[|\nabla\log p(X_{t})^T\nabla^2 f(X_t)\nabla\log p(X_{t})|]\\ \leq & \frac{\tau}{2}\|g\|_{\max}\ \end{align*} where we use that $g(x)=\nabla\log p(x)^T\nabla^2 f(x)\nabla\log p(x)$ is bounded because it is a continuous function with compact support. Therefore, it holds that \begin{align*} \frac{d}{dt}\mu_t(f)=0 \end{align*} In other words, we have that for all $t,s>0$: $$\mathbb{E}[f(X_{t})]=\mu_t(f)=\mu_{t+s}(f)=\mathbb{E}[f(X_{t+s})]$$
By approximating the indicator function $\mathbb{1}_{A}$ for a rectangle $A=[a_1,b_1]\times\dots\times[a_d,b_d]\subset\mathbb{R}^d$ with smooth functions $f_k$, we an derive that:
$$\mathbb{P}(X_t\in A) = \mathbb{E}[\mathbb{1}_A(X_{t})] = \lim\limits_{k\to\infty} \mathbb{E}[f_k(X_{t})] = \lim\limits_{k\to\infty} \mathbb{E}[f_k(X_{t+s})] = \mathbb{P}(X_{t+s}\in A) $$
In other words, the distribution of $X_t$ is constant over time
Remark: While it is not a sufficient condition strictly speaking that a distribution $p$ is an equilibrium distribution of an SDE for $X_t$ to converge to $p$, it is sufficient in many cases. The goal is this post is to give mathematical intuition for the Langevin diffusion. For a rigorous proof, we refer to the mathematical literature.
4. Implement Langevin DiffusionΒΆ
Finally, let's implement the Langevin diffusion: \begin{align*} X_{t+s} = X_{t}+s\nabla\log p(X_{t})+\sqrt{2s}\epsilon \end{align*}
def langevin_diffusion(x_start: np.array, score_func: Callable, step_size: float, n_steps: int,**kwargs):
"""Function to compute Langevin diffusion from n starting points.
Arguments:
x_start - shape (n,k): starting points
score_func: scoring function
step_size: step size
n_steps: number of diffusion steps
Returns:
np.array of shape (n_steps,n,k)"""
#Data dimensions:
dim_data = x_start.shape[1]
n_traj = x_start.shape[0]
#Init trajectory:
x_traj = [x_start]
for i in range(n_steps):
x_curr = x_traj[-1] #Get last point
score = score_func(x=x_curr,**kwargs) #Get score
noise = np.random.normal(size=(n_traj,dim_data)) #Get noise
x_next = x_curr + step_size*score+np.sqrt(2*step_size)*noise #Update position
x_traj.append(x_next) #Append new position
return np.stack(x_traj)
4. Run Langevin Diffusion on Target DistributionΒΆ
Let's run it:
N_STEPS = 100000
STEP_SIZE = 0.4
x_start = np.array([[1,2]])
x_traj = langevin_diffusion(x_start=x_start,
score_func=gaussian_mixture_score,
step_size=STEP_SIZE,
n_steps=N_STEPS,
mean_list=MEAN_LIST,
cov_list=COV_LIST,
mixture_probs=MIXTURE_PROBS)
x_traj = x_traj.squeeze()
5. Plot TrajectoryΒΆ
Let's plot the trajetory up to step 1000 - we can see that the diffusion seems to roughly correspond to the probability distribution. However, at step 1000, it has not yet captured all regions of high probability. At step 5000, we seem to converge.
ax = plot_gaussian_mixture_density(mean_list=MEAN_LIST,
cov_list=COV_LIST,
mixture_probs=MIXTURE_PROBS,
min_x=min(x_traj[:,0].min(),x_traj[:,1].min()),
max_x=max(x_traj[:,0].max(),x_traj[:,1].max()));
ax.plot(x_traj[:1000,0],x_traj[:1000,1],color='black',alpha=1.0,linewidth=0.5);
ax.set_title("Trajectory of Langevin diffusion up to step 1000")
ax = plot_gaussian_mixture_density(mean_list=MEAN_LIST,
cov_list=COV_LIST,
mixture_probs=MIXTURE_PROBS,
min_x=min(x_traj[:,0].min(),x_traj[:,1].min()),
max_x=max(x_traj[:,0].max(),x_traj[:,1].max()));
ax.plot(x_traj[:5000,0],x_traj[:5000,1],color='black',alpha=1.0,linewidth=0.5);
ax.set_title("Trajectory of Langevin diffusion up to step 5000")
Text(0.5, 1.0, 'Trajectory of Langevin diffusion up to step 5000')
5. Plot the convergence of the distribution over timeΒΆ
Let's plot the convergence of the distribution of $X_1,X_2,\dots,X_n$ over time for increasing $n$:
from celluloid import Camera # getting the camera
import matplotlib.pyplot as plt
import numpy as np
from IPython.display import HTML # to show the animation in Jupyter
# the camera gets the fig we'll plot
fig, my_axis = plt.subplots(figsize=(12,12))
camera = Camera(fig)
steps = [10,20,40,80]+(80+np.exp(np.linspace(0,np.log(N_STEPS),10)).astype('int')).tolist()
steps = steps + 4*[steps[-1]]
for idx,step in enumerate(steps):
plot_gaussian_mixture_density(mean_list=MEAN_LIST,
cov_list=COV_LIST,
mixture_probs=MIXTURE_PROBS,
min_x=min(x_traj[:,0].min(),x_traj[:,1].min()),
max_x=max(x_traj[:,0].max(),x_traj[:,1].max()),
plot_contours=False, my_axis=my_axis);
sns.kdeplot(x=x_traj[:step,0],y=x_traj[:step,1],ax=my_axis,color='black')
camera.snap()
animation = camera.animate() # animation ready
animation.save('langevin_convergence.mp4')
plt.close()
HTML(animation.to_html5_video()) # displaying the animation
6. ConclusionΒΆ
We are done! The Langevin diffusion is a fundamental concept for sampling from a probability distribution and hence for generative AI. In my next tutorial, I will you a thorough introduction into the world SDEs and how they can be used for diffusion models.