jupytext | kernelspec | ||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
+++ {"id": "Pq7u0kdRwDje"}
(reinforcement_learning)=
:::{post} Aug 5, 2022 :tags: Aesara, Reinforcement Learning :category: advanced, how-to :author: Ricardo Vieira :::
Reinforcement Learning models are commonly used in behavioral research to model how animals and humans learn, in situtions where they get to make repeated choices that are followed by some form of feedback, such as a reward or a punishment.
In this notebook we will consider the simplest learning scenario, where there are only two possible actions. When an action is taken, it is always followed by an immediate reward. Finally, the outcome of each action is independent from the previous actions taken. This scenario is sometimes referred to as the multi-armed bandit problem.
Let's say that the two actions (e.g., left and right buttons) are associated with a unit reward 40% and 60% of the time, respectively. At the beginning the learning agent does not know which action
When an action is chosen and a reward
where
where the
:id: QTq-0HMw7dBK
import aesara
import aesara.tensor as at
import arviz as az
import matplotlib.pyplot as plt
import numpy as np
import pymc as pm
import scipy
from matplotlib.lines import Line2D
seed = sum(map(ord, "RL_PyMC"))
rng = np.random.default_rng(seed)
az.style.use("arviz-darkgrid")
%config InlineBackend.figure_format = "retina"
+++ {"id": "aG_Nxvr5wC4B"}
:id: hcPVL7kZ8Zs2
def generate_data(rng, alpha, beta, n=100, p_r=None):
if p_r is None:
p_r = [0.4, 0.6]
actions = np.zeros(n, dtype="int")
rewards = np.zeros(n, dtype="int")
Qs = np.zeros((n, 2))
# Initialize Q table
Q = np.array([0.5, 0.5])
for i in range(n):
# Apply the Softmax transformation
exp_Q = np.exp(beta * Q)
prob_a = exp_Q / np.sum(exp_Q)
# Simulate choice and reward
a = rng.choice([0, 1], p=prob_a)
r = rng.random() < p_r[a]
# Update Q table
Q[a] = Q[a] + alpha * (r - Q[a])
# Store values
actions[i] = a
rewards[i] = r
Qs[i] = Q.copy()
return actions, rewards, Qs
:id: ceNagbmsZXW6
true_alpha = 0.5
true_beta = 5
n = 150
actions, rewards, Qs = generate_data(rng, true_alpha, true_beta, n)
---
colab:
base_uri: https://localhost:8080/
height: 208
id: MDhJI8vOXZeU
outputId: 60f7ee37-2d1f-44ad-afff-b9ba7d82a8d8
tags: [hide-input]
---
_, ax = plt.subplots(figsize=(12, 5))
x = np.arange(len(actions))
ax.plot(x, Qs[:, 0] - 0.5 + 0, c="C0", lw=3, alpha=0.3)
ax.plot(x, Qs[:, 1] - 0.5 + 1, c="C1", lw=3, alpha=0.3)
s = 7
lw = 2
cond = (actions == 0) & (rewards == 0)
ax.plot(x[cond], actions[cond], "o", ms=s, mfc="None", mec="C0", mew=lw)
cond = (actions == 0) & (rewards == 1)
ax.plot(x[cond], actions[cond], "o", ms=s, mfc="C0", mec="C0", mew=lw)
cond = (actions == 1) & (rewards == 0)
ax.plot(x[cond], actions[cond], "o", ms=s, mfc="None", mec="C1", mew=lw)
cond = (actions == 1) & (rewards == 1)
ax.plot(x[cond], actions[cond], "o", ms=s, mfc="C1", mec="C1", mew=lw)
ax.set_yticks([0, 1], ["left", "right"])
ax.set_ylim(-1, 2)
ax.set_ylabel("action")
ax.set_xlabel("trial")
reward_artist = Line2D([], [], c="k", ls="none", marker="o", ms=s, mew=lw, label="Reward")
no_reward_artist = Line2D(
[], [], ls="none", marker="o", mfc="w", mec="k", ms=s, mew=lw, label="No reward"
)
Qvalue_artist = Line2D([], [], c="k", ls="-", lw=3, alpha=0.3, label="Qvalue (centered)")
ax.legend(handles=[no_reward_artist, Qvalue_artist, reward_artist], fontsize=12, loc=(1.01, 0.27));
+++ {"id": "6RNLAtqDXgG_"}
The plot above shows a simulated run of 150 trials, with parameters
Solid and empty dots indicate actions followed by rewards and no-rewards, respectively. The solid line shows the estimated
The change in line height following each outcome is directly related to the
+++ {"id": "LUTfha8Hc1ap"}
Having generated the data, the goal is to now 'invert the model' to estimate the learning parameters
I employ the handy scipy.optimize.minimize function, to quickly retrieve the values of
This was also helpful when I later wrote the Aesara function that computed the choice probabilities in PyMC. First, the underlying logic is the same, the only thing that changes is the syntax. Second, it provides a way to be confident that I did not mess up, and what I was actually computing was what I intended to.
:id: lWGlRE3BjR0E
def llik_td(x, *args):
# Extract the arguments as they are passed by scipy.optimize.minimize
alpha, beta = x
actions, rewards = args
# Initialize values
Q = np.array([0.5, 0.5])
logp_actions = np.zeros(len(actions))
for t, (a, r) in enumerate(zip(actions, rewards)):
# Apply the softmax transformation
Q_ = Q * beta
logp_action = Q_ - scipy.special.logsumexp(Q_)
# Store the log probability of the observed action
logp_actions[t] = logp_action[a]
# Update the Q values for the next trial
Q[a] = Q[a] + alpha * (r - Q[a])
# Return the negative log likelihood of all observed actions
return -np.sum(logp_actions[1:])
+++ {"id": "xXZgywFIgz6J"}
The function llik_td
is strikingly similar to the generate_data
one, except that instead of simulating an action and reward in each trial, it stores the log-probability of the observed action.
The function scipy.special.logsumexp
is used to compute the term
In the end, the function returns the negative sum of all the log probabilities, which is equivalent to multiplying the probabilities in their original scale.
(The first action is ignored just to make the output comparable to the later Aesara function. It doesn't actually change any estimation, as the initial probabilities are fixed and do not depend on either the
---
colab:
base_uri: https://localhost:8080/
height: 34
id: -E8B-rrBgy0j
outputId: 7c18b426-8d50-4706-f940-45ec716877f4
---
llik_td([true_alpha, true_beta], *(actions, rewards))
+++ {"id": "WT2UwuKWvRCq"}
Above, I computed the negative log likelihood of the data given the true
Below, I let scipy find the MLE values for the two parameters:
---
colab:
base_uri: https://localhost:8080/
height: 260
id: W1MOBxvw4Zl9
outputId: 39a73f7a-2362-4ef7-cc03-1e9aeda35ecf
---
x0 = [true_alpha, true_beta]
result = scipy.optimize.minimize(llik_td, x0, args=(actions, rewards), method="BFGS")
print(result)
print("")
print(f"MLE: alpha = {result.x[0]:.2f} (true value = {true_alpha})")
print(f"MLE: beta = {result.x[1]:.2f} (true value = {true_beta})")
+++ {"id": "y_cXP93QeVVM"}
The estimated MLE values are relatively close to the true ones. However, this procedure does not give any idea of the plausible uncertainty around these parameter values. To get that, I'll turn to PyMC for a bayesian posterior estimation.
But before that, I will implement a simple vectorization optimization to the log-likelihood function that will be more similar to the Aesara counterpart. The reason for this is to speed up the slow bayesian inference engine down the road.
:id: 4knb5sKW9V66
def llik_td_vectorized(x, *args):
# Extract the arguments as they are passed by scipy.optimize.minimize
alpha, beta = x
actions, rewards = args
# Create a list with the Q values of each trial
Qs = np.ones((n, 2), dtype="float64")
Qs[0] = 0.5
for t, (a, r) in enumerate(
zip(actions[:-1], rewards[:-1])
): # The last Q values were never used, so there is no need to compute them
Qs[t + 1, a] = Qs[t, a] + alpha * (r - Qs[t, a])
Qs[t + 1, 1 - a] = Qs[t, 1 - a]
# Apply the softmax transformation in a vectorized way
Qs_ = Qs * beta
logp_actions = Qs_ - scipy.special.logsumexp(Qs_, axis=1)[:, None]
# Return the logp_actions for the observed actions
logp_actions = logp_actions[np.arange(len(actions)), actions]
return -np.sum(logp_actions[1:])
---
colab:
base_uri: https://localhost:8080/
height: 34
id: w9Z_Ik7AlBQC
outputId: 445a7838-29d0-4f21-bfd8-5b65606af286
---
llik_td_vectorized([true_alpha, true_beta], *(actions, rewards))
---
colab:
base_uri: https://localhost:8080/
height: 34
id: bDPZJe7RqCZX
outputId: a90fbb47-ee9b-4390-87ff-f4b39ece8fca
---
%timeit llik_td([true_alpha, true_beta], *(actions, rewards))
---
colab:
base_uri: https://localhost:8080/
height: 34
id: Dvrqf878swBX
outputId: 94bf3268-0eab-4ce9-deb9-5d1527b3c19d
---
%timeit llik_td_vectorized([true_alpha, true_beta], *(actions, rewards))
+++ {"id": "YAs_zpPZyopT"}
The vectorized function gives the same results, but runs almost one order of magnitude faster.
When implemented as an Aesara function, the difference between the vectorized and standard versions was not this drastic. Still, it ran twice as fast, which meant the model also sampled at twice the speed it would otherwise have!
+++ {"id": "tC7xbCCIL7K4"}
The most challenging part was to create an Aesara function/loop to estimate the Q values when sampling our parameters with PyMC.
:id: u8L_FAB4hle1
def update_Q(action, reward, Qs, alpha):
"""
This function updates the Q table according to the RL update rule.
It will be called by aesara.scan to do so recursevely, given the observed data and the alpha parameter
This could have been replaced be the following lamba expression in the aesara.scan fn argument:
fn=lamba action, reward, Qs, alpha: at.set_subtensor(Qs[action], Qs[action] + alpha * (reward - Qs[action]))
"""
Qs = at.set_subtensor(Qs[action], Qs[action] + alpha * (reward - Qs[action]))
return Qs
:id: dHzhTy20g4vh
# Transform the variables into appropriate Aesara objects
rewards_ = at.as_tensor_variable(rewards, dtype="int32")
actions_ = at.as_tensor_variable(actions, dtype="int32")
alpha = at.scalar("alpha")
beta = at.scalar("beta")
# Initialize the Q table
Qs = 0.5 * at.ones((2,), dtype="float64")
# Compute the Q values for each trial
Qs, _ = aesara.scan(
fn=update_Q, sequences=[actions_, rewards_], outputs_info=[Qs], non_sequences=[alpha]
)
# Apply the softmax transformation
Qs = Qs * beta
logp_actions = Qs - at.logsumexp(Qs, axis=1, keepdims=True)
# Calculate the negative log likelihod of the observed actions
logp_actions = logp_actions[at.arange(actions_.shape[0] - 1), actions_[1:]]
neg_loglike = -at.sum(logp_actions)
+++ {"id": "C9Ayn6-kzhPN"}
Let's wrap it up in a function to test out if it's working as expected.
---
colab:
base_uri: https://localhost:8080/
height: 89
id: g1hkTd75xxwo
outputId: a2310fd3-cac2-48c6-9d22-3c3b72410427
---
aesara_llik_td = aesara.function(
inputs=[alpha, beta], outputs=neg_loglike, on_unused_input="ignore"
)
result = aesara_llik_td(true_alpha, true_beta)
float(result)
+++ {"id": "AmcoU1CF5ix-"}
The same result is obtained, so we can be confident that the Aesara loop is working as expected. We are now ready to implement the PyMC model.
:id: c70L4ZBT7QLr
def aesara_llik_td(alpha, beta, actions, rewards):
rewards = at.as_tensor_variable(rewards, dtype="int32")
actions = at.as_tensor_variable(actions, dtype="int32")
# Compute the Qs values
Qs = 0.5 * at.ones((2,), dtype="float64")
Qs, updates = aesara.scan(
fn=update_Q, sequences=[actions, rewards], outputs_info=[Qs], non_sequences=[alpha]
)
# Apply the sotfmax transformation
Qs = Qs[:-1] * beta
logp_actions = Qs - at.logsumexp(Qs, axis=1, keepdims=True)
# Calculate the log likelihood of the observed actions
logp_actions = logp_actions[at.arange(actions.shape[0] - 1), actions[1:]]
return at.sum(logp_actions) # PyMC expects the standard log-likelihood
---
colab:
base_uri: https://localhost:8080/
height: 245
id: XQNBZLMvAdbo
outputId: 65d7a861-476c-4598-985c-e0b0fcd744c4
---
with pm.Model() as m:
alpha = pm.Beta(name="alpha", alpha=1, beta=1)
beta = pm.HalfNormal(name="beta", sigma=10)
like = pm.Potential(name="like", var=aesara_llik_td(alpha, beta, actions, rewards))
tr = pm.sample(random_seed=rng)
---
colab:
base_uri: https://localhost:8080/
height: 539
id: vgSumt-oATfN
outputId: eb3348a4-3092-48c8-d8b4-678af0173079
---
az.plot_trace(data=tr);
---
colab:
base_uri: https://localhost:8080/
height: 408
id: BL84iT_RAzEL
outputId: dcd4174b-4148-45cb-f72d-973f1487d8c2
---
az.plot_posterior(data=tr, ref_val=[true_alpha, true_beta]);
+++ {"id": "1FtAp76PBLCr"}
In this example, the obtained posteriors are nicely centered around the MLE values. What we have gained is an idea of the plausible uncertainty around these values.
In this last section I provide an alternative implementation of the model using a Bernoulli likelihood.
+++
:::{Note}
One reason why it's useful to use the Bernoulli likelihood is that one can then do prior and posterior predictive sampling as well as model comparison. With pm.Potential
you cannot do it, because PyMC does not know what is likelihood and what is prior nor how to generate random draws. Neither of this is a problem when using a pm.Bernoulli
likelihood.
:::
:id: pQdszDk_qYCX
def right_action_probs(alpha, beta, actions, rewards):
rewards = at.as_tensor_variable(rewards, dtype="int32")
actions = at.as_tensor_variable(actions, dtype="int32")
# Compute the Qs values
Qs = 0.5 * at.ones((2,), dtype="float64")
Qs, updates = aesara.scan(
fn=update_Q, sequences=[actions, rewards], outputs_info=[Qs], non_sequences=[alpha]
)
# Apply the sotfmax transformation
Qs = Qs[:-1] * beta
logp_actions = Qs - at.logsumexp(Qs, axis=1, keepdims=True)
# Return the probabilities for the right action, in the original scale
return at.exp(logp_actions[:, 1])
---
colab:
base_uri: https://localhost:8080/
height: 121
id: S55HgqZiTfpa
outputId: a2db2d68-8bf3-4773-8368-5b6dff310e4b
---
with pm.Model() as m_alt:
alpha = pm.Beta(name="alpha", alpha=1, beta=1)
beta = pm.HalfNormal(name="beta", sigma=10)
action_probs = right_action_probs(alpha, beta, actions, rewards)
like = pm.Bernoulli(name="like", p=action_probs, observed=actions[1:])
tr_alt = pm.sample(random_seed=rng)
---
colab:
base_uri: https://localhost:8080/
height: 452
id: zjXW103JiDRQ
outputId: aafc1b1e-082e-414b-cac7-0ad805097057
---
az.plot_trace(data=tr_alt);
:id: SDJN2w117eox
az.plot_posterior(data=tr_alt, ref_val=[true_alpha, true_beta]);
%load_ext watermark
%watermark -n -u -v -iv -w -p aeppl,xarray
:::{bibliography} :filter: docname in docnames :::
+++
-
Authored by Ricardo Vieira in June 2022
-
Adapted PyMC code from Maria Eckstein (GitHub, PyMC Discourse)
-
Adapted MLE code from Robert Wilson and Anne Collins {cite:p}
collinswilson2019
(GitHub)
-
-
Re-executed by Juan Orduz in August 2022 (pymc-examples#410)
+++
:::{include} ../page_footer.md :::