强化学习DQN实践(gymnasium+pytorch)
Pytorch官方教程中有强化学习教程,但是很多中文翻译都太老了,里面的代码也不能跑了
这篇blog按照官方最新教程实现,并加入了一些个人理解
工具
- gymnasium:由gym升级而来,官方定义:An API standard for reinforcement learning with a diverse collection of reference environments。提供强化学习的“环境”
pip install gymnasium
- pytorch
任务
倒立摆模型,使用强化学习控制小车来使倒立摆稳定,有小车向左和向右两个action
gym中提供了很多类似的强化学习“环境”
代码
准备工作
import gymnasium as gym
import math
import random
import matplotlib
import matplotlib.pyplot as plt
from collections import namedtuple, deque
from itertools import countimport torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as Fenv = gym.make("CartPole-v1")# set up matplotlib
is_ipython = 'inline' in matplotlib.get_backend()
if is_ipython:from IPython import displayplt.ion()# if GPU is to be used
device = torch.device("cuda" if torch.cuda.is_available() else"mps" if torch.backends.mps.is_available() else"cpu"
)
定义transition,包括state,action,next_state,reward:
Transition = namedtuple('Transition',('state', 'action', 'next_state', 'reward'))class ReplayMemory(object):'''样本经验池,用于存储过往的transition,这些信息会被用来训练模型'''def __init__(self, capacity):self.memory = deque([], maxlen=capacity)def push(self, *args):"""Save a transition"""self.memory.append(Transition(*args))def sample(self, batch_size):return random.sample(self.memory, batch_size)def __len__(self):return len(self.memory)
DQN算法
class DQN(nn.Module):def __init__(self, n_observations, n_actions):super(DQN, self).__init__()self.layer1 = nn.Linear(n_observations, 128)self.layer2 = nn.Linear(128, 128)self.layer3 = nn.Linear(128, n_actions)# Called with either one element to determine next action, or a batch# during optimization. Returns tensor([[left0exp,right0exp]...]).def forward(self, x):x = F.relu(self.layer1(x))x = F.relu(self.layer2(x))return self.layer3(x)
根据上篇强化学习理论blog,强化学习的流程是通过贝尔曼最优公式得到最优策略 π ( s ) \pi(s) π(s),求解需要知道reward函数 r ( s , a ) r(s,a) r(s,a),action value函数 Q ( s , a ) Q(s,a) Q(s,a),其中reward函数可以自行定义,action value如何得到?
在Q-learning算法中,设定策略为贪心策略,即每次都是选择action value最大的action执行,这样就可以将原本原贝尔曼公式中的state value变为action value中,将 V ( s ′ ) V(s^{\prime}) V(s′)用 Q π ( s ′ , π ( s ′ ) ) Q^{\pi}(s^{\prime},\pi (s^{\prime})) Qπ(s′,π(s′))来代替:
Q π ( s , a ) = r + γ Q π ( s ′ , π ( s ′ ) ) Q^{\pi}(s,a) = r + \gamma Q^{\pi}(s^{\prime},\pi (s^{\prime})) Qπ(s,a)=r+γQπ(s′,π(s′))
实际更新流程变为:
- 初始化 Q 0 Q_0 Q0
- 使用结果估计 Q ~ ( s , a ) = r + γ Q 0 ( s ′ , π ( s ′ ) ) \tilde{Q}(s,a) = r + \gamma Q_0(s^{\prime},\pi (s^{\prime})) Q~(s,a)=r+γQ0(s′,π(s′))
- 计算误差 δ \delta δ: δ = Q 0 ( s , a ) − Q ~ ( s , a ) \delta = Q_0(s,a)-\tilde{Q}(s,a) δ=Q0(s,a)−Q~(s,a)
- 使用损失函数 L L L: Δ = L ( δ ) \Delta = L(\delta) Δ=L(δ)
- 更新 Q Q Q: Q 1 = Q 0 − α Δ Q_1 = Q_0-\alpha \Delta Q1=Q0−αΔ
- …
经过迭代后便会收敛到最优策略。
DNQ将Q-learning中的Q表改进为了神经网络,对于连续状态的环境更合适。而且会将过往的决策过程记录下来维护一个样本池,方便一次更新多个样本,而不是按时间一次次运行更新,这样回放机制就会减少应用于高度相关的状态序列时因为前后样本存在关联导致的强化学习震荡和发散的问题。
强化学习分类:
通过学习目标分类:
- 基于价值的方法,训练agent学习行为价值函数,隐式学习了策略,如Q-learning
- 基于策略的方法,训练agent直接学习策略
- 把 value-based 和 policy-based 结合起来就是 演员-评论家(Actor-Critic)方法。这一类 agent 需要显式地学习价值函数和策略。如DDPG
通过交互策略和更新策略间的关系分类
- on-policy方法:目标策略和交互策略是同一个策略 π ( s ) \pi(s) π(s)
- off-policy方法:使用一种行为策略 μ ( s ) \mu(s) μ(s)来与环境交互,学习的目标策略是 π ( s ) \pi(s) π(s)
训练准备
# BATCH_SIZE is the number of transitions sampled from the replay buffer
# GAMMA 折扣因子,用于计算discounted
# EPS_START:随机选择action的概率初始值。
# EPS_END 随机选择action的概率末尾值
# EPS_DECAY controls the rate of exponential decay of epsilon, higher means a slower decay
# TAU is the update rate of the target network
# LR 优化器的学习率
BATCH_SIZE = 128
GAMMA = 0.99
EPS_START = 0.9
EPS_END = 0.05
EPS_DECAY = 1000
TAU = 0.005
LR = 1e-4# Get number of actions from gym action space
n_actions = env.action_space.n
# Get the number of state observations
state, info = env.reset()
n_observations = len(state)policy_net = DQN(n_observations, n_actions).to(device)
target_net = DQN(n_observations, n_actions).to(device)
target_net.load_state_dict(policy_net.state_dict())optimizer = optim.AdamW(policy_net.parameters(), lr=LR, amsgrad=True)
memory = ReplayMemory(10000)steps_done = 0def select_action(state):'''选择一个action,随机选择or使用模型输出,这么做是为了能够全面学习所有行为,而不陷入局部最优随着迭代随机选择的概率会逐渐减小至EPS_END'''global steps_donesample = random.random()eps_threshold = EPS_END + (EPS_START - EPS_END) * \math.exp(-1. * steps_done / EPS_DECAY)steps_done += 1if sample > eps_threshold:with torch.no_grad():# t.max(1) will return the largest column value of each row.# second column on max result is index of where max element was# found, so we pick action with the larger expected reward.return policy_net(state).max(1).indices.view(1, 1)else:return torch.tensor([[env.action_space.sample()]], device=device, dtype=torch.long)episode_durations = []def plot_durations(show_result=False):'''画过去的随机概率的变化过程'''plt.figure(1)durations_t = torch.tensor(episode_durations, dtype=torch.float)if show_result:plt.title('Result')else:plt.clf()plt.title('Training...')plt.xlabel('Episode')plt.ylabel('Duration')plt.plot(durations_t.numpy())# Take 100 episode averages and plot them tooif len(durations_t) >= 100:means = durations_t.unfold(0, 100, 1).mean(1).view(-1)means = torch.cat((torch.zeros(99), means))plt.plot(means.numpy())plt.pause(0.001) # pause a bit so that plots are updatedif is_ipython:if not show_result:display.display(plt.gcf())display.clear_output(wait=True)else:display.display(plt.gcf())
train loop
def optimize_model():'''模型更新函数,从memory中抽取一个batch,然后更新'''if len(memory) < BATCH_SIZE:returntransitions = memory.sample(BATCH_SIZE)# Transpose the batch (see https://stackoverflow.com/a/19343/3343043 for# detailed explanation). This converts batch-array of Transitions# to Transition of batch-arrays.batch = Transition(*zip(*transitions))# Compute a mask of non-final states and concatenate the batch elements# (a final state would've been the one after which simulation ended)non_final_mask = torch.tensor(tuple(map(lambda s: s is not None,batch.next_state)), device=device, dtype=torch.bool)non_final_next_states = torch.cat([s for s in batch.next_stateif s is not None])state_batch = torch.cat(batch.state)action_batch = torch.cat(batch.action)reward_batch = torch.cat(batch.reward)# 计算Q(s,a),模型输出是每个action的概率,根据这个输出到action_batch中获取action value# These are the actions which would've been taken# for each batch state according to policy_netstate_action_values = policy_net(state_batch).gather(1, action_batch)# Compute V(s_{t+1}) for all next states.# Expected values of actions for non_final_next_states are computed based# on the "older" target_net; selecting their best reward with max(1).values# This is merged based on the mask, such that we'll have either the expected# state value or 0 in case the state was final.next_state_values = torch.zeros(BATCH_SIZE, device=device)with torch.no_grad():next_state_values[non_final_mask] = target_net(non_final_next_states).max(1).values# Compute the expected Q valuesexpected_state_action_values = (next_state_values * GAMMA) + reward_batch# Compute Huber losscriterion = nn.SmoothL1Loss()loss = criterion(state_action_values, expected_state_action_values.unsqueeze(1))# Optimize the modeloptimizer.zero_grad()loss.backward()# In-place gradient clippingtorch.nn.utils.clip_grad_value_(policy_net.parameters(), 100)optimizer.step()if torch.cuda.is_available() or torch.backends.mps.is_available():num_episodes = 600
else:num_episodes = 50for i_episode in range(num_episodes):# Initialize the environment and get its statestate, info = env.reset()state = torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0)for t in count():action = select_action(state)observation, reward, terminated, truncated, _ = env.step(action.item())reward = torch.tensor([reward], device=device)done = terminated or truncatedif terminated:next_state = Noneelse:next_state = torch.tensor(observation, dtype=torch.float32, device=device).unsqueeze(0)# Store the transition in memorymemory.push(state, action, next_state, reward)# Move to the next statestate = next_state# Perform one step of the optimization (on the policy network)optimize_model()# 软更新,即老的权重使用新的权重进行加权更新# θ′ ← τ θ + (1 −τ )θ′target_net_state_dict = target_net.state_dict()policy_net_state_dict = policy_net.state_dict()for key in policy_net_state_dict:target_net_state_dict[key] = policy_net_state_dict[key]*TAU + target_net_state_dict[key]*(1-TAU)target_net.load_state_dict(target_net_state_dict)if done:episode_durations.append(t + 1)plot_durations()breakprint('Complete')
plot_durations(show_result=True)
plt.ioff()
plt.show()
Ref
强化学习基本原理
Pytorch官方tutorial