24/11/10 算法笔记 强化学习A3C
A3C算法的原理
A3C算法采用Actor-Critic结构,由Actor和Critic两个网络组成。Actor网络的目标是学习策略函数,即在给定状态下选择动作的概率分布。Critic网络的目标是学习状态值函数或者状态-动作值函数,用于评估不同状态或状态-动作对的价值。
A3C算法的训练过程可以分为以下几个步骤:
初始化神经网络参数。
创建多个并行的训练线程,每个线程独立运行一个智能体与环境交互,并使用Actor和Critic网络实现策略和价值的近似。
每个线程根据当前的策略网络选择动作,并观测到新的状态和奖励,将这些信息存储在经验回放缓冲区中。
当一个线程达到一定的时间步数或者轨迹结束时,该线程将经验回放缓冲区中的数据抽样出来,并通过计算优势函数进行梯度更新。
每个线程进行一定次数的梯度更新后,将更新的参数传递给主线程进行整体参数更新。
重复上述步骤直到达到预定的训练轮次或者达到终止条件为止。
A3C算法采用了Asynchronous(异步)的训练方式,每个线程独立地与环境交互,并通过参数共享来实现梯度更新。这种异步训练的方式可以提高训练的效率和稳定性,并且能够学习到更好的策略和价值函数。
A3C算法的功能
A3C算法具有以下功能和特点:
支持连续动作空间和高维状态空间的强化学习;
通过多个并行的智能体实现快速而稳定的训练;
利用Actor和Critic两个网络分别学习策略和价值函数,具有更好的学习效果和收敛性;
通过异步训练的方式提高了训练的效率和稳定性。
A3C代码:
1. 导入必要的库
import torch
import torch.nn.functional as F
import torch.multiprocessing as mp
import gymnasium as gym
import numpy as np
from queue import Empty
2. 定义全局参数
GLOBAL_MAX_EPISODE = 1000
GAMMA = 0.98
3. 定义共享的Adam优化器
SharedAdam
优化器被设计为允许在多进程环境中共享优化器的状态,这在分布式训练或多线程训练中非常有用。
class SharedAdam(torch.optim.Adam):def __init__(self, params, lr=1e-3, betas=(0.9, 0.99), eps=1e-8,weight_decay=0):super(SharedAdam, self).__init__(params, lr=lr, betas=betas, eps=eps, weight_decay=weight_decay)#为每个参数初始化Adam优化器所需的状态。for group in self.param_groups:for p in group['params']:state = self.state[p]state['step'] = 0state['exp_avg'] = torch.zeros_like(p.data)state['exp_avg_sq'] = torch.zeros_like(p.data)# 这两行代码使得 exp_avg 和 exp_avg_sq 可以在多个进程间共享。这是通过调用 PyTorch 的 .share_memory_() 方法实现的,该方法将张量移动到共享内存中,使其能够在多个进程间共享。state['exp_avg'].share_memory_()state['exp_avg_sq'].share_memory_()
4. 定义策略网络(Actor)
class PolicyNet(torch.nn.Module):def __init__(self, state_dim, hidden_dim, action_dim):super(PolicyNet, self).__init__()self.fc1 = torch.nn.Linear(state_dim, hidden_dim)self.fc2 = torch.nn.Linear(hidden_dim, action_dim)def forward(self, x):x = F.relu(self.fc1(x))return self.fc2(x)
5. 定义价值网络(Critic)
class ValueNet(torch.nn.Module):def __init__(self, state_dim, hidden_dim):super(ValueNet, self).__init__()self.fc1 = torch.nn.Linear(state_dim, hidden_dim)self.fc2 = torch.nn.Linear(hidden_dim, 1)def forward(self, x):x = F.relu(self.fc1(x))return self.fc2(x)
6. 定义A3C智能体
class A3Cagent:def __init__(self, state_dim, hidden_dim, action_dim, actor_lr, critic_lr, env, numOfCPU):self.global_actor = PolicyNet(state_dim, hidden_dim, action_dim)self.global_critic = ValueNet(state_dim, hidden_dim)#这两行代码使得全局Actor和Critic网络的参数能够在多个进程间共享。self.global_actor.share_memory()self.global_critic.share_memory()#创建了两个优化器,分别用于更新Critic和Actor网络的参数。这里使用的是自定义的SharedAdam优化器,它允许在多个进程间共享优化器的状态。self.global_critic_optimizer = SharedAdam(self.global_critic.parameters(), lr=critic_lr, betas=(0.92, 0.999))self.global_actor_optimizer = SharedAdam(self.global_actor.parameters(), lr=actor_lr, betas=(0.92, 0.999))#这些行代码初始化了环境、全局剧集计数器、全局剧集奖励和结果队列self.env = envself.global_episode = mp.Value('i', 0)self.global_episode_reward = mp.Value('d', 0.)self.res_queue = mp.Queue()#创建了一个工作进程列表,每个工作进程都是Worker类的实例。每个工作进程将在不同的CPU核心上运行,并行地进行训练。self.workers = [Worker(i, self.global_actor, self.global_critic, self.global_critic_optimizer,self.global_actor_optimizer, self.env, state_dim, hidden_dim, action_dim,self.global_episode, self.global_episode_reward, self.res_queue) for i in range(numOfCPU)]def train(self):
#这行代码启动所有工作进程。[w.start() for w in self.workers]res = []while True:r = self.res_queue.get()if r is not None:res.append(r)else:break
#这行代码等待所有工作进程完成。[w.join() for w in self.workers]
state_dim
: 状态空间的维度。hidden_dim
: 隐藏层的维度。action_dim
: 动作空间的维度。actor_lr
: Actor网络的学习率。critic_lr
: Critic网络的学习率。env
: 环境对象,智能体将在此环境中进行交互。numOfCPU
: 用于并行训练的CPU核心数。
总的来说,A3Cagent
类负责初始化A3C算法所需的所有组件,并协调多个工作进程进行并行训练。这种方法可以显著加速训练过程,因为它允许智能体在多个环境副本上同时进行探索和学习。
7. 定义Worker类,Worker
类代表一个独立的工作者,它负责与环境交互,收集经验,并更新智能体的行为。
class Worker(object):def __init__(self, name, globalAC, sess):
#创建了一个连续动作空间的MountainCar环境,并将其解包,以便直接访问环境的底层接口。self.env = gym.make('MountainCarContinuous-v0').unwrappedself.name = name
#初始化了一个ActorCritic对象,这是智能体的核心,负责学习策略(Actor)和价值函数(Critic)。self.AC = ActorCritic(name, sess, globalAC)
#保存了TensorFlow会话,用于执行计算图。self.sess = sessdef work(self): total_step = 1 #初始化全局步数为1buffer_s, buffer_a, buffer_r = [], [], [] #初始化三个空列表,用于存储状态、动作和奖励。while not coord.should_stop() and global_episodes < no_of_episodes:
# 这是一个循环,只要协调器没有停止且全局的episode数没有达到预设值,工作者就会继续工作s = self.env.reset() #重置环境,获取初始状态。ep_r = 0for ep_t in range(no_of_ep_steps):if self.name == 'W_0' and render: #检查是否是第一个工作者并且是否需要渲染环境。self.env.render() #如果需要,渲染环境。a = self.AC.choose_action(s) #智能体选择一个动作。s_, r, done, info = self.env.step(a) #执行动作,获取新的状态、奖励、是否结束和额外信息。done = True if ep_t == no_of_ep_steps - 1 else Falseep_r += rbuffer_s.append(s) #将当前状态添加到缓冲区。buffer_a.append(a) buffer_r.append((r+8)/8)
#如果episode结束,则价值函数的值为0,否则计算新状态的价值。if total_step % update_global == 0 or done:if done:v_s_ = 0else:v_s_ = self.sess.run(self.AC.v, {self.AC.s: s_[np.newaxis, :]})[0, 0]buffer_v_target = []for r in buffer_r[::-1]: #反向遍历缓冲区中的奖励。v_s_ = r + gamma * v_s_ #使用TD(lambda)更新公式计算价值目标。buffer_v_target.append(v_s_)buffer_v_target.reverse() #将价值目标列表反转,以便与状态、动作和奖励列表匹配。