策略的结合——Actor-Critic算法求解冰湖游戏
策略的结合——Actor-Critic算法求解冰湖游戏
Actor-Critic算法是一种结合了策略优化方法和值函数方法的强化学习算法。它由两个主要部分组成:Actor和Critic。
-
Actor部分:
- 负责根据当前策略选择动作。
- 其输入是环境的状态信息,输出则是各个动作的选择概率。
- 随后,根据这些概率来随机选择将要执行的动作。
-
Critic部分:
- 评估Actor选择的动作的好坏。
- 其输入也是环境的状态信息,但输出是一个价值评估,即该状态或动作状态值。
- 这个评估值用于指导Actor调整其动作选择策略。
Actor-Critic算法的优势主要包括:
- 单步更新:与传统的Policy Gradient方法相比,Actor-Critic可以实现单步更新,而不是基于整个回合的更新,从而大大提高了学习效率。
- 结合了两种方法的优点:该算法融合了策略优化方法(如Policy Gradient)和值函数方法(如Q-Learning)的优势,使得它能够在处理大型状态空间或连续动作空间时表现出较高的效率和可扩展性。
- 适应性强:Actor-Critic方法不需要对环境的内部机制进行建模,因此它可以适应任意复杂的连续和离散动作环境。
- 完整的观测信息利用:在每个时间步,算法都可以利用完整的观测信息(包括状态和奖励),而不需要依赖记忆库。
注意:本文用到了PyTorch库,gym强化学习环境库,需要提前安装。
- gym环境安装:https://github.com/Farama-Foundation/Gymnasium
- gym环境介绍文档:https://gymnasium.farama.org/environments/classic_control/mountain_car/
- pytorch官网:https://pytorch.org/
本文所使用的python版本为3.11.8
step1:冰湖(Frozen Lake)游戏介绍
官方地址:https://gymnasium.farama.org/environments/toy_text/frozen_lake/
FrozenLake(冰湖)涉及从起点穿过冰湖到达目标而不掉入任何洞中。由于冰湖的滑溜特性,玩家可能无法总是按照预期的方向移动。
import gymnasium as gym # 导入gym包
env = gym.make('FrozenLake-v1', desc=None, map_name="4x4", is_slippery=True) # 创建冰湖游戏环境
observation, info = env.reset() # 初始化环境for _ in range(1):action = env.action_space.sample() # 随机选择一个动作observation, reward, terminated, truncated, info = env.step(action) # 执行动作if terminated or truncated: # 当达到终点时,重置环境observation, info = env.reset()env.close() # 关闭环境
action, observation
(1, 4)
创建环境
gym.make('FrozenLake-v1', desc=None, map_name="4x4", is_slippery=True)
代码中:
desc=None
: 用于指定非预加载的地图。可以指定一个自定义地图,例如:
desc=["SFFF", "FHFH", "FFFH", "HFFG"]
,则可以创建一个自定义的冰湖地图,其中S位置代表起点,F表示冰面,H表示冰洞,G表示终点。
可以通过调用generate_random_map函数来指定一个随机生成的地图:
from gymnasium.envs.toy_text.frozen_lake import generate_random_map
gym.make('FrozenLake-v1', desc=generate_random_map(size=8))
map_name="4x4"
: 使用任何预加载的地图的ID。如果desc=None
,则会使用map_name
。如果desc
和map_name
都为None
,则会生成一个随机的8x8地图,其中80%的位置是冻结的。
"4x4":["SFFF","FHFH","FFFH","HFFG"
]"8x8": ["SFFFFFFF","FFFFFFFF","FFFHFFFF","FFFFFHFF","FFFHFFFF","FHHFFFHF","FHFFHFHF","FFFHFFFG",
]
is_slippery=True
: 如果为True
,玩家将以1/3的概率朝预期方向移动,否则将以相等的概率1/3朝任一垂直方向移动。
例如,如果动作为向左并且is_slippery
为True
,那么:
- P(向左移动)=1/3
- P(向上移动)=1/3
- P(向下移动)=1/3
环境动作状态
动作空间:动作的形状是(1,)的int64变量,在范围{0,3}内,表示玩家应该向哪个方向移动。
- 0:向左移动
- 1:向下移动
- 2:向右移动
- 3:向上移动
观察空间:观察值是一个int值,代表玩家的当前位置,计算方式为 current_row * nrows + current_col(其中行和列都从0开始)。
例如,在4x4地图中的目标位置可以这样计算:3 * 4 + 3 = 15。可能的观察数量取决于地图的大小。
初始状态:情节开始时,玩家处于状态[0](位置[0,0]),即S字母所在位置。
奖励:
- 达到目标:+1
- 掉入洞中:0
- 到达冰冻区域:0
结束:如果出现以下情况,则一次游戏结束:
终止:
- 玩家掉入一个洞中。
- 玩家到达目标位置,即max(nrow) * max(ncol) - 1(位置[max(nrow)-1,max(ncol)-1])。
截断(当使用时间限制包装器时):
- 对于4x4环境,情节的长度为100;对于FrozenLake8x8-v1环境,情节的长度为200。
step2:Actor-Critic算法架构的搭建
Actor-Critic框架如下图所示:
对于Actor网络的更新,其主要结合了策略梯度算法。策略梯度算法通俗简单的来说,就是利用某些参数,让能够获得较大奖励的动作更容易被选中,换言之,如果把策略梯度网络看成一个函数,我们要想办法用梯度上升的方法更新网络,如果某个动作获得了较大的收益,我们则会通过不同动作收益的不同,利用梯度上升更新网络,让收益更大的动作梯度上升的更快一点。在策略梯度中,可以把梯度写成下面这个更加一般的形式:
g = E [ ∑ t = 0 T ψ t ∇ θ log π θ ( a t ∣ s t ) ] g=\mathbb{E}\left[\sum_{t=0}^T \psi_t \nabla_\theta \log \pi_\theta\left(a_t \mid s_t\right)\right] g=E[t=0∑Tψt∇θlogπθ(at∣st)]
其中, ψ t \psi_t ψt就是文中所说的网络执行动作所带来的收益相关的参数。有以下几种常见的形式进行策略梯度更新,在Actor-Critic算法中,我们使用的是时序差分的方法。
- ∑ t ′ = t T γ t ′ − t r t ′ \sum_{t^{\prime}=t}^T \gamma^{t^{\prime}-t} r_{t^{\prime}} ∑t′=tTγt′−trt′
- ∑ t ′ = t T γ t ′ − t r t ′ − b ( s t ) \sum_{t^{\prime}=t}^T \gamma^{t^{\prime}-t} r_{t^{\prime}}-b\left(s_t\right) ∑t′=tTγt′−trt′−b(st)
- Q π θ ( s t , a t ) Q^{\pi_\theta}\left(s_t, a_t\right) Qπθ(st,at)
- r t + γ V π θ ( s t + 1 ) − V π θ ( s t ) r_t+\gamma V^{\pi_\theta}\left(s_{t+1}\right)-V^{\pi_\theta}\left(s_t\right) rt+γVπθ(st+1)−Vπθ(st) 时序差分方法
此时,Critic网络也可通过时序差分的方法更新网络。
首先我们定义Actor网络和Critic网络:
import torch
import torch.nn.functional as Fclass Actor(torch.nn.Module):def __init__(self, state_dim, hidden_dim, action_dim):'''Actor网络:state_dim表示状态空间维度,hidden_dim表示隐藏层维度,action_dim表示动作空间维度'''super(Actor, self).__init__()self.fc1 = torch.nn.Linear(state_dim, hidden_dim)self.fc2 = torch.nn.Linear(hidden_dim, action_dim)def forward(self, x):x = F.relu(self.fc1(x))return F.softmax(self.fc2(x), dim=1)class Critic(torch.nn.Module):def __init__(self, state_dim, hidden_dim):'''Critic网络:state_dim表示状态空间维度,hidden_dim表示隐藏层维度,输出为1,表示当前状态价值'''super(Critic, self).__init__()self.fc1 = torch.nn.Linear(state_dim, hidden_dim)self.fc2 = torch.nn.Linear(hidden_dim, 1)def forward(self, x):x = F.relu(self.fc1(x))return self.fc2(x)
接着,我们设计一个Actor-Critic算法,该算法包含Actor网络和Critic网络,并可以使用时序差分算法更新网络。
def to_tensor(data, dtype=torch.float, device=None): """将numpy数组转换为tensor,并移动到指定设备""" tensor = torch.tensor(data, dtype=dtype) if device is not None: tensor = tensor.to(device) return tensorclass ActorCritic(torch.nn.Module): # 该类继承torch.nn.Moduledef __init__(self, state_dim, hidden_dim, action_dim, gamma=0.99): # 初始化函数super(ActorCritic, self).__init__() # 继承父类初始化函数self.actor = Actor(state_dim, hidden_dim, action_dim) # 初始化Actor网络self.critic = Critic(state_dim, hidden_dim) # 初始化Critic网络self.optimizer_actor = torch.optim.Adam(self.actor.parameters(), lr=0.001) # 初始化Actor网络的优化器self.optimizer_critic = torch.optim.Adam(self.critic.parameters(), lr=0.001) # 初始化Critic网络的优化器self.gamma = gamma # 定义折扣因子self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # 定义设备self.to(self.device) # 将模型移动到设备上def take_actionAsProbs(self, state):'''按照Actor网络输出的动作概率执行动作'''state = torch.FloatTensor(state).to(self.device).view(1, -1) # 将状态数据放入GPUaction_probs = self.actor(state) # 获取动作概率action = torch.multinomial(action_probs, 1) # 根据动作概率采样动作return action.item() # 返回动作def take_action(self, state):'''按照Actor网络输出的动作概率的最大值执行动作'''state = torch.FloatTensor(state).to(self.device).view(1, -1) # 将状态数据放入GPUaction_probs = self.actor(state) # 获取动作概率action = torch.argmax(action_probs, dim=1) # 根据动作概率采样动作return action.item() # 返回动作def update(self, transition_dict):'''更新网络参数,其中传入的transition_dict是一个字典,包含以下字段:states: 状态列表actions: 动作列表rewards: 奖励列表next_states: 下一个状态列表dones: 是否结束列表每个列表中元素都是一一对应的'''# 将数据转换为tensor并移至设备 states_tensor = to_tensor(transition_dict['states'], device=self.device) actions_tensor = to_tensor(transition_dict['actions'], dtype=torch.int64, device=self.device).view(-1, 1) rewards_tensor = to_tensor(transition_dict['rewards'], device=self.device).view(-1, 1) next_states_tensor = to_tensor(transition_dict['next_states'], device=self.device) dones_tensor = to_tensor(transition_dict['dones'], dtype=torch.int64, device=self.device).view(-1, 1) # 计算时序差分目标 td_target = rewards_tensor + self.gamma * self.critic(next_states_tensor) * (1 - dones_tensor) # 计算时序差分误差 td_delta = td_target - self.critic(states_tensor) # 计算行动者的对数概率 log_probs = torch.log(self.actor(states_tensor).gather(1, actions_tensor)) # 计算行动者损失 actor_loss = torch.mean(-log_probs * td_delta.detach()) # 计算评论家损失(均方误差) critic_loss = F.mse_loss(self.critic(states_tensor), td_target.detach()) critic_loss = torch.mean(critic_loss) # 如果需要对多个样本取平均的话 # 优化行动者和评论家网络 self.optimizer_actor.zero_grad() self.optimizer_critic.zero_grad() actor_loss.backward() # 反向传播以计算行动者网络的梯度 critic_loss.backward() # 反向传播以计算评论家网络的梯度 self.optimizer_actor.step() # 更新行动者网络的参数 self.optimizer_critic.step() # 更新评论家网络的参数
接下来,我们就可以进行强化学习训练过程了:
import numpy as np
from tqdm import tqdm
def one_hot_encode(k, n): """ 生成一个one-hot编码列表 """ # 创建一个长度为n的全0列表 one_hot_vector = [0] * n # 在第k个位置设置为1 one_hot_vector[k] = 1 return one_hot_vector num_episodes = 3000 # 总的训练轮数
hidden_dim = 32 # 隐含层数量
gamma = 0.99 # 折扣因子env = gym.make('FrozenLake-v1', desc=None, map_name="4x4", is_slippery=False) # 创建冰湖游戏环境
state_dim = 16 # 状态维度,这里我们将使用one hot的编码方式将状态编码为一个16维的向量
action_dim = env.action_space.n # 动作维度agent = ActorCritic(state_dim, hidden_dim, action_dim, gamma) # 创建Actor-Critic算法对象return_list = [] # 记录每一轮的回报
for i in range(10):with tqdm(total=int(num_episodes / 10), desc='Iteration %d' % i) as pbar:for i_episode in range(int(num_episodes / 10)):episode_return = 0transition_dict = {'states': [],'actions': [],'next_states': [],'rewards': [],'dones': []}state, info = env.reset() # 初始化环境state = one_hot_encode(state, state_dim) # 将状态编码为一个16维的向量done = Falsewhile not done:action = agent.take_actionAsProbs(state)next_state, reward, terminated, truncated, info = env.step(action) # 执行动作next_state = one_hot_encode(next_state, state_dim)done = terminated or truncated # 判断是否结束transition_dict['states'].append(state)transition_dict['actions'].append(action)transition_dict['next_states'].append(next_state)transition_dict['rewards'].append(reward)transition_dict['dones'].append(done)state = next_stateepisode_return += rewardreturn_list.append(episode_return)agent.update(transition_dict)if (i_episode + 1) % 10 == 0:pbar.set_postfix({'episode':'%d' % (num_episodes / 10 * i + i_episode + 1),'return':'%.3f' % np.mean(return_list[-10:])})pbar.update(1)
Iteration 0: 100%|██████████| 300/300 [00:01<00:00, 186.02it/s, episode=300, return=0.000]
Iteration 1: 100%|██████████| 300/300 [00:01<00:00, 210.48it/s, episode=600, return=1.000]
Iteration 2: 100%|██████████| 300/300 [00:01<00:00, 218.73it/s, episode=900, return=1.000]
Iteration 3: 100%|██████████| 300/300 [00:01<00:00, 226.33it/s, episode=1200, return=1.000]
Iteration 4: 100%|██████████| 300/300 [00:01<00:00, 227.92it/s, episode=1500, return=1.000]
Iteration 5: 100%|██████████| 300/300 [00:01<00:00, 220.93it/s, episode=1800, return=1.000]
Iteration 6: 100%|██████████| 300/300 [00:01<00:00, 229.02it/s, episode=2100, return=1.000]
Iteration 7: 100%|██████████| 300/300 [00:01<00:00, 230.06it/s, episode=2400, return=1.000]
Iteration 8: 100%|██████████| 300/300 [00:01<00:00, 228.16it/s, episode=2700, return=1.000]
Iteration 9: 100%|██████████| 300/300 [00:01<00:00, 226.62it/s, episode=3000, return=1.000]
可以看到智能体可以较快收敛,让我们看看智能体在游戏上的表现吧!
import matplotlib.pyplot as plt
%matplotlib inline
from IPython import display
import timedef show_state(env, step=0, info=""):plt.figure(3)plt.clf()plt.imshow(env.render())plt.title("Step: %d %s" % (step, info))plt.axis('off')display.clear_output(wait=True)display.display(plt.gcf())env = gym.make('FrozenLake-v1', desc=None, map_name="4x4", is_slippery=False, render_mode='rgb_array') # 创建冰湖游戏环境
state, info = env.reset()
for _ in range(12):state = one_hot_encode(state, state_dim) # 将状态编码为一个16维的向量action = agent.take_action(state)state, reward, terminated, truncated, info = env.step(action)done = truncated or terminatedshow_state(env, action, info)time.sleep(0.5)if done:state, info = env.reset()
env.close()
可以看到,智能体在冰湖环境中可以表现的较好,读者可以尝试将冰湖游戏环境中的is_slippery参数设置为True,看看智能体在游戏中的表现;另外,当冰湖环境为动态变化时,比如随机生成起点,终点,冰洞的位置时,此时又该如何设计网络呢?