强化学习入门与理论知识学习. 强化学习(Reinforcement Learning, RL)是机器学习的一个分支,它关注的是智能体(agent)如何在环境中采取行动以最大化某种累积奖励(reward)。与监督学习和无监督学习不同,强化学习不要求智能体有明确的训练数据,而是通过与环境的交互来学习最优的行为策略.
背景知识
对数概率
采样的对数概率是指从某个概率分布中采样得到一个特定值时,该值的对数概率。对数概率通常用于数值稳定性和计算效率。
假设有一个概率分布 P, 其概率质量函数 (PMF) 或概率密度函数 (PDF) 为 $p(x)$, 对于从该分布中采样得到的值 $x$, 其对数概率定义为:
\[\log_e{p(x)}\]-
为什么使用对数概率?
-
数值稳定性:在处理非常小的概率值时,直接计算概率可能会导致数值下溢(underflow)。对数概率可以避免这种情况,因为对数函数可以将非常小的数值转换为较大的负数。
-
计算效率:对数概率在计算梯度时更加稳定,尤其是在优化算法中,如策略梯度方法。
-
熵
分布的熵是衡量概率分布不确定性的指标。熵越高,表示分布的不确定性越大;熵越低,表示分布的不确定性越小。
对于离散概率分布 $P$, 其熵 $H(P)$ 定义为:
\[H(P) = - \sum_i p_i \log_e{p_i}\]其中, $p_i$ 是第 $i$ 个类别的概率.
-
为什么需要熵?
-
衡量不确定性:熵可以用来衡量一个概率分布的不确定性。在强化学习中,熵可以用来鼓励探索,避免策略过早收敛到某个动作。
-
正则化:在训练过程中,添加熵项作为正则化项可以防止模型过度自信,提高模型的泛化能力。
-
假设有一个概率分布 $P = [0.1, 0.3, 0.6]$ , 那么其熵为 $H(P) = -(0.1\log{0.1} + 0.3\log{0.3} + 0.6\log{0.6})$
我们使用 pytorch 计算最终结果:
import torch
from torch.distributions import Categorical
# 定义一个概率分布
probs = torch.tensor([0.1, 0.3, 0.6])
dist = Categorical(probs)
# 计算分布的熵
entropy = dist.entropy()
print("分布的熵:", entropy.item()) # 输出熵
分布的熵: 0.897945761680603
KL 散度
-
定义
KL 散度(Kullback-Leibler Divergence),也称为相对熵,是衡量两个概率分布差异的一种方法。它在信息论、统计学和机器学习等领域有广泛的应用。
KL 散度衡量的是两个概率分布 $P$ 和 $Q$ 之间差异的非对称距离
-
对于离散分布
\[D_{KL}(P | Q) = \sum P(x) log( \frac {P(x)} {Q(x)} )\] -
对于连续分布
\[D_{KL}(P | Q) = D_{KL}(P | Q) = \int P(x) \log \left( \frac{P(x)}{Q(x)} \right) dx\]
-
-
性质
-
非负性
$D_{KL}(P \parallel Q) \ge 0$ 当且仅当 $P = Q$ 时, KL 散度为0
-
非对称性
KL 散度是非对称的, $D_{KL}(P \parallel Q) \neq D_{KL}(Q \parallel P)$, 从 $P$ 到 $Q$ 的 KL 散度与 $Q$ 到 $P$ 的 KL 散度是不同的
-
-
计算示例
假设我们有两个离散概率分布 $P$ 和 $Q$:
\(P = [0.4, 0.2, 0.3]\) \(Q = [0.3, 0.5, 0.2]\)
则 KL 散度 $D_{KL}(P \parallel Q)$ 为:
\[D_{KL}(P | Q) = 0.4log(\frac {0.4} {0.3}) + 0.2log(\frac {0.2} {0.5}) + 0.3log(\frac {0.3} {0.2}) = 0.05345\]
-
KL 散度损失
KL散度损失(Kullback-Leibler Divergence Loss)是一种基于KL散度的损失函数,广泛应用于机器学习和深度学习中,尤其是在需要衡量两个概率分布差异的场景中。KL散度损失可以用于正则化、模型训练、生成模型(如变分自编码器VAE)和强化学习等领域。以下是对KL散度损失的详细解析,包括其定义、计算方法、应用场景和实现方式。
在强化学习中,KL散度损失可以用于限制策略更新的幅度。例如,在PPO(Proximal Policy Optimization)中,KL散度损失用于确保策略更新不会偏离旧策略太远。
pytorch 实现:
import torch import torch.nn as nn # 定义两个概率分布 P = torch.tensor([0.4, 0.2, 0.4], dtype=torch.float32) Q = torch.tensor([0.3, 0.5, 0.2], dtype=torch.float32) # 计算KL散度损失 kl_loss = nn.KLDivLoss(reduction="sum") loss = kl_loss(torch.log(Q), P) print(f"KL散度损失: {loss.item()}")
自定义实现:
import numpy as np # 定义两个概率分布 P = np.array([0.4, 0.2, 0.4]) Q = np.array([0.3, 0.5, 0.2]) # 计算KL散度损失 kl_divergence = np.sum(P * np.log(P / Q)) print(f"KL散度损失: {kl_divergence}")
随机梯度上升, SGA
随机梯度上升是一种优化算法,用于最大化目标函数。它通过在每次迭代中随机选择一个样本(或小批量样本)来近似计算目标函数的梯度,并沿着梯度方向更新参数。
假设目标函数为 $J(\theta)$, 其中 $\theta$ 是模型参数, 随机梯度上升的目标是最大化 $J(\theta)$, 在每次迭代中, 随机选择一个样本 $(x^(i), y^(i))$, 计算梯度:
\[\nabla_{\theta} J(\theta) \approx \nabla_{\theta} J_i(\theta)\]更新参数:
\[\theta_{k+1} = \theta_k + \alpha \cdot \nabla_{\theta} J_i(\theta_k)\]如果以上更新式是 $\theta_{k+1} = \theta_k - \alpha \cdot \nabla_{\theta} J_i(\theta_k)$ 则是随机梯度下降法, 参数更新的方向相反
其中:
-
$\alpha$ 是学习率
-
$J_i(\theta)$ 是基于样本 $i$ 的目标函数
SGA 和 随机梯度下降 SGD 的区别
-
SGA 用于最大化似然函数, 策略梯度等
-
SGD 用于最小化损失函数, 如线性回归, 逻辑回归等
Episode
episode 是指从初始状态开始,到结束状态(终止状态)为止的一系列状态、动作和奖励的序列。在这个过程中,智能体(agent)会根据当前的状态选择动作,环境(environment)会根据智能体的动作返回新的状态和奖励,这个过程会一直持续,直到达到终止状态。在强化学习的训练过程中,通常会通过多个 episode 来让智能体学习。智能体会在每个 episode 中尝试不同的策略,通过观察奖励来调整自己的行为,从而逐渐学习到更好的策略。也可以通过 episode 来评估智能体的性能。例如,统计智能体在多个 episode 中获得的平均奖励,或者统计智能体成功完成任务(达到终止状态)的比例等,以此来衡量智能体的学习效果。
-
性质
-
独立性
不同的 episode 之间通常是独立的。虽然智能体会根据之前 episode 的经验来调整自己的策略,但每个 episode 的过程本身是独立的,不会受到其他 episode 的直接影响。
-
有限状态
每个 episode 都有明确的起点(初始状态)和终点(终止状态)。初始状态通常是预先设定的,终止状态则是满足某个特定条件的状态,比如游戏结束、任务完成等。
-
PPO
PPO(Proximal Policy Optimization,近端策略优化)是一种基于策略优化的强化学习算法,由 OpenAI 于 2017 年提出。它旨在解决传统策略梯度方法中训练不稳定、效率低下的问题。PPO 通过引入一个截断的目标函数,限制新策略和旧策略之间的差异,避免策略更新过快导致的不稳定性,从而提高训练的稳定性和效率。
-
PPO 的主要组成部分
-
策略网络 (Actor)
用于根据当前状态选择动作, 学习策略 $\pi(a \parallel s)$, 即在给定状态下可选择动作的概率分布.
-
价值网络 (Critic)
用于估计状态价值函数 $V(s)$, 衡量从当前状态开始能够获得的预期累积奖励值.
-
奖励函数 (Reward)
用于衡量智能体在特定状态下采取特定动作后获得的即时奖励.
-
优势函数 (Advantage)
衡量在特定状态下采取特定动作相对于平均策略的优势程度.
-
超参数 (Hyper Parameters)
包括截断范围参数 $\epsilon$, 折扣因子 $\gamma$, 学习率, 熵正则化系数等, 用于干预算法的运行细节.
-
-
PPO 中各网络的的损失函数
-
策略网络 (Actor) 损失
策略网络的损失函数由截断的目标函数(Clipped Surrogate Objective, CSO)和策略熵正则化项(Entropy Regularization, ER)构成.
-
其中, 截断的目标函数的数学表示为:
\[L^{CLIP}(\theta) = E[min(r_t(\theta)A_t,clip(r_t(\theta), 1 - \epsilon, 1 + \epsilon)A_t)]\]其中:
-
$E$ 是数学期望函数
-
$min$ 是求最小值的函数
-
$clip$ 是用于将 $r_t(\theta)$ 限制在 $[1-\epsilon, 1+\epsilon]$ 范围内的函数
-
$r_t(\theta) = \frac {\pi_{\theta}(a \parallel s)} {\pi_{\theta_{old}}(a \parallel s)}$ 是新旧策略对动作选择概率的比率
-
$A_t$ 是优势函数
优势函数的计算见后文
-
$\epsilon$ 是控制截断范围的参数, 用于控制策略更新的幅度
其主要作用是:
-
限制策略更新幅度: 截断的目标函数通过限制新旧策略之间的差异,防止策略更新过大而导致训练不稳定。它确保策略在每次更新时不会发生剧烈变化,从而保持训练的稳定性.
-
优化策略方向: 该函数利用优势函数 $A_t$ 来引导策略的更新方向,使得策略更倾向于选择那些能够获得更高累计奖励的动作.
-
-
而策略熵正则化项的数学表示为:
\[L^{ENT}(\theta) = E_t[H(\pi_\theta(s_t))]\]其中, $H(\pi_\theta(s_t))$ 是策略的熵, 具体计算式为:
\[H(\pi_\theta(s_t)) = - \sum_a \pi_\theta(a | s_t)log_e\pi_\theta(a | s_t)\]其主要作用是:
-
鼓励探索: 策略熵正则化项通过增加策略的熵,鼓励策略在选择动作时更加随机,从而促进智能体对环境的充分探索。这有助于避免策略过早收敛到局部最优解.
-
平衡策略更新: 在总损失函数中,策略熵正则化项与截断的目标函数一起,通过权重系数 $c_2$ 来平衡策略的探索与稳定性.
-
综上, 我们可以得到策略网络的总损失函数:
\[L^{policy}(\theta) = L^{CLIP}(\theta) - c_2L^{ENT}(\theta)\]其中, $c_2$ 是权重系数, 用于平衡截断的目标函数和熵正则化项.
-
-
价值网络 (Critic) 损失
价值网络的损失函数通常采用均方误差 (MSE) 来衡量预测值和目标值之间的差异, 数学表示为:
\[L^{VF}(\theta) = E[(V(s_t) - V^{target}(s_t))^2]\]其中:
-
$V(s_t)$ 是价值网络对状态 $s_t$ 的预测值, 而 $V^{target}(s_t)$ 是目标值, 它可以由不同的方法计算得出:
-
蒙特卡洛估计 (Monte Carlo Estimation)
蒙特卡洛估计通过采样整个序列的回报来计算目标值, 如果从状态 $s_t$ 开始, 直到终止状态的回报为 $G_t$, 则有:
\[V^{target}(s_t) = G_t\]其中, $G_t$ 通常定义为: $G_t = R_{t+1} + \gamma R_{t+2} + \gamma^2R_{t+3} + … + \gamma^{T-t-1}R_T$, 这里, $R_i$ 是在时间步 $i$ 获得的即时奖励, $\gamma$ 是折扣因子, $T$ 是序列的终止时间步.
-
时间差分 (Temporal Difference, TD) 目标
TD 目标结合即时奖励和下一个状态的预测值来计算目标值. 常见的 TD 目标如下:
-
$TD(0)$:
$V^{target}(s_t) = R_{t+1} + \gamma V(s_{t+1})$
其中, $R_{t+1}$ 是智能体在时间步 $t$ 处于状态 $s_t$ 并执行动作 $a_t$ 后, 环境(奖励函数)反馈的奖励值, $\gamma$ 是折扣因子, 范围在 0 到 1 之间, 用于决定未来奖励的当前价值, $V(s_{t+1})$ 是对下一个状态 $s_{t+1}$ 的价值预测.
-
$TD(\lambda)$:
$V^{target}(s_t) = (1 - \lambda)V^{target}(s_t) + \lambda(R_{t + 1} + \gamma V(s_{t+1}))$
其中, $\lambda$ 是一个介于 0 和 1 之间的参数, 用于控制目标值的平滑程度.
在这里我给出我的理解, 这可能有点绕, 使用 TD 方法时, 目标值 $V^{target}(s_t)$ 是会基于当前的价值函数 $V$ 计算得出的, 而价值函数会通过参数更新向 $V^{target}$ 逼近, 以更好地拟合目标值. 这种更新方法的有效性基于以下几点:
-
引导: TD 方法使用当前价值函数对下一个状态的价值估计引导目标值计算, 能够在不需要等待整个 episode.
-
逐步改进: 每次更新都基于当前的估计和实际的环境反馈, 逐步改进价值函数的准确性.
-
平衡探索与利用: 通过不断调整价值函数, 智能体能够在探索新策略和利用已知信息之间找到平衡, 更有效地学习更优策略.
-
-
-
-
总损失函数
虽然策略网络和价值网络的损失函数是分开计算的,但在某些深度神经网络实现中,它们共享部分网络结构(例如,使用一个共同的特征提取部分)。通过将两个损失函数组合成一个总损失函数,可以同时优化整个网络的参数,提高训练效率和稳定性。
总损失函数通常如下构造:
\[L(\theta) = E_t[L^{CLIP}(\theta) - c_1L^{VF}(\theta) + c_2L^{ENT}(\theta)]\]其中, $c_1, c_2$ 是权重系数, 用于平衡策略优化, 值函数更新和熵正则化.
-
-
计算优势函数
在 PPO 算法中, 优势函数 $A_t$ 是一个关键组件, 用于衡量智能体在状态 $s_t$ 下采取动作 $a_t$ 相对于平均策略的优势程度, 并指导策略的更新方向. 优势函数有多种计算方法:
-
蒙特卡洛方法 (MC)
直接使用采样得到的回报减去状态价值函数的估计.
\[A_t = G_t - V(s_t)\]$G_t$ 是从时间步 $t$ 开始的累计折扣奖励, $V(s_t)$ 是状态 $s_t$ 的价值函数估计值.
-
时序差分方法 (TD)
利用 TD 误差, 来估计优势函数
\[A_t = \delta_t + \gamma \lambda \delta_{t+1} + (\gamma \lambda)^2 \delta_{t+2} + \ldots\]其中, $\delta_t = r_t + \gamma V(s_{t+1}) - V(s_t)$ 是时序差分误差, $\gamma$ 是折扣因子, $\lambda$ 是一个介于 0 和 1 之间的参数, 用于控制优势函数的时间范围.
-
广义优势估计 (GAE, 最常见)
其结合了 MC 和 TD 的优点, 数学表示为:
\[A_t = \sum_{l=0}^{n-1}(\gamma\lambda)^l\delta_{t+l}\]其中, $n$ 是采样步数, GAE 通过调整 $\gamma$ 参数在偏差和方差之间取得平衡, 提供更稳定的优势函数估计.
GAE 这一块我不理解, 后续补充
-
优势函数的具体作用
-
指导策略更新
优势函数 $A_t$ 用于衡量智能体在状态 $s_t$ 下采取动作 $a_t$ 相对于当前策略的平均表现有多好, 如果 $A_t > 0$, 表示该动作比平均策略更好, 如果 $A_t < 0$ 表示该动作比平均策略更差, 策略更新时, 会倾向于增加 $A_t > 0$ 的动作的概率, 减少 $A_t < 0$ 的动作的概率.
-
减少方差
通过使用价值函数作为基线(Baseline),优势函数能够减少策略梯度估计的方差,提高学习的稳定性.
-
-
-
pytorch 实现 PPO
import gymnasium as gym import torch import torch.nn as nn import torch.optim as optim import torch.nn.functional as F # 定义策略网络,用于输出动作的概率分布 class PolicyNet(nn.Module): def __init__(self, state_dim, hidden_dim, action_dim): super(PolicyNet, self).__init__() self.fc1 = nn.Linear(state_dim, hidden_dim) # 输入层到隐藏层的全连接层 self.fc2 = nn.Linear(hidden_dim, action_dim) # 隐藏层到输出层的全连接层 def forward(self, x): x = F.relu(self.fc1(x)) # 使用ReLU激活函数对隐藏层输出进行激活 return F.softmax(self.fc2(x), dim=1) # 使用softmax函数将输出层的值转换为概率分布 # 定义价值网络,用于输出状态的价值 class ValueNet(nn.Module): def __init__(self, state_dim, hidden_dim): super(ValueNet, self).__init__() self.fc1 = nn.Linear(state_dim, hidden_dim) # 输入层到隐藏层的全连接层 self.fc2 = nn.Linear(hidden_dim, 1) # 隐藏层到输出层的全连接层 def forward(self, x): x = F.relu(self.fc1(x)) # 使用ReLU激活函数对隐藏层输出进行激活 return self.fc2(x) # 输出状态的价值 # 定义PPO算法类 class PPO: def __init__(self, state_dim, hidden_dim, action_dim, actor_lr, critic_lr, lmbda, epochs, eps, gamma, device): # 初始化策略网络和价值网络 self.actor = PolicyNet(state_dim, hidden_dim, action_dim).to(device) self.critic = ValueNet(state_dim, hidden_dim).to(device) # 定义优化器,用于更新策略网络和价值网络的参数 self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=actor_lr) self.critic_optimizer = optim.Adam(self.critic.parameters(), lr=critic_lr) # 设置超参数 self.gamma = gamma # 折扣因子 self.lmbda = lmbda # 优势函数的通用优势估计参数 self.epochs = epochs # 每次更新时的训练轮数 self.eps = eps # PPO算法中的截断范围参数 self.device = device # 设备(CPU或GPU) # 定义策略网络的采样函数,根据当前状态选择动作 def take_action(self, state): state = torch.tensor([state], dtype=torch.float).to(self.device) # 将状态转换为张量 probs = self.actor(state) # 计算动作的概率分布 action_dist = torch.distributions.Categorical(probs) # 定义分类分布 action = action_dist.sample() # 从分布中采样动作 return action.item() # 返回动作的值 # 定义PPO算法的更新函数 def update(self, transition_dict): # 将数据转换为张量 states = torch.tensor(transition_dict['states'], dtype=torch.float).to(self.device) actions = torch.tensor(transition_dict['actions']).view(-1, 1).to(self.device) rewards = torch.tensor(transition_dict['rewards'], dtype=torch.float).view(-1, 1).to(self.device) next_states = torch.tensor(transition_dict['next_states'], dtype=torch.float).to(self.device) dones = torch.tensor(transition_dict['dones'], dtype=torch.float).view(-1, 1).to(self.device) # 计算时序差分误差和优势函数 td_target = rewards + self.gamma * self.critic(next_states) * (1 - dones) # 计算时序差分目标 td_delta = td_target - self.critic(states) # 计算时序差分误差 advantage = compute_advantage(self.gamma, self.lmbda, td_delta.cpu()).to(self.device) # 计算优势函数 # 计算旧策略的概率 old_log_probs = torch.log(self.actor(states).gather(1, actions)).detach() # 计算旧策略下动作的概率 # 更新策略网络和价值网络 for _ in range(self.epochs): log_probs = torch.log(self.actor(states).gather(1, actions)) # 计算新策略下动作的概率 ratio = torch.exp(log_probs - old_log_probs) # 计算概率比值 surr1 = ratio * advantage # 计算目标函数的第一个部分 surr2 = torch.clamp(ratio, 1 - self.eps, 1 + self.eps) * advantage # 计算目标函数的第二个部分 actor_loss = torch.mean(-torch.min(surr1, surr2)) # 计算策略网络的损失函数 critic_loss = torch.mean(F.mse_loss(self.critic(states), td_target.detach())) # 计算价值网络的损失函数 self.actor_optimizer.zero_grad() # 清空策略网络的梯度 self.critic_optimizer.zero_grad() # 清空价值网络的梯度 actor_loss.backward() # 计算策略网络的梯度 critic_loss.backward() # 计算价值网络的梯度 self.actor_optimizer.step() # 更新策略网络的参数 self.critic_optimizer.step() # 更新价值网络的参数 # 定义计算优势函数的函数 def compute_advantage(gamma, lmbda, td_delta): advantage_list = [] # 存储优势函数的列表 advantage = 0.0 # 初始化优势函数 # 从后往前计算优势函数 for delta in reversed(td_delta): advantage = gamma * lmbda * advantage + delta.item() # 计算优势函数 advantage_list.append(advantage) # 将优势函数添加到列表中 advantage_list.reverse() # 将列表反转,恢复顺序 return torch.tensor(advantage_list, dtype=torch.float) # 返回优势函数的张量 # 设置超参数 actor_lr = 1e-3 # 策略网络的学习率 critic_lr = 1e-2 # 价值网络的学习率 num_episodes = 500 # 训练的轮数 hidden_dim = 128 # 隐藏层的维度 gamma = 0.98 # 折扣因子 lmbda = 0.95 # 优势函数的通用优势估计参数 epochs = 10 # 每次更新时的训练轮数 eps = 0.2 # PPO算法中的截断范围参数 device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu") # 设置设备 # 创建环境 env_name = 'CartPole-v1' env = gym.make(env_name) # 初始化PPO算法 state_dim = env.observation_space.shape[0] # 状态的维度 action_dim = env.action_space.n # 动作的维度 agent = PPO(state_dim, hidden_dim, action_dim, actor_lr, critic_lr, lmbda, epochs, eps, gamma, device) # 训练过程 for episode in range(num_episodes): state = env.reset()[0] # 重置环境,获取初始状态 done = False # 是否完成标志 episode_return = 0 # 当前回合的回报 transition_dict = {'states': [], 'actions': [], 'next_states': [], 'rewards': [], 'dones': []} # 存储数据的字典 while not done: action = agent.take_action(state) # 根据当前策略选择动作 next_state, reward, terminated, truncated, _ = env.step(action) # 执行动作,获取下一个状态、奖励等信息 done = terminated or truncated # 判断是否完成 transition_dict['states'].append(state) # 添加当前状态 transition_dict['actions'].append(action) # 添加当前动作 transition_dict['next_states'].append(next_state) # 添加下一个状态 transition_dict['rewards'].append(reward) # 添加奖励 transition_dict['dones'].append(done) # 添加是否完成标志 state = next_state # 更新当前状态 episode_return += reward # 累加回报 agent.update(transition_dict) # 更新策略网络和价值网络 print(f"Episode: {episode+1}, Return: {episode_return}") # 打印当前回合的信息
GRPO
…待续
DPO
…待续
RLHF
…待续