强化学习入门与理论知识学习. 强化学习(Reinforcement Learning, RL)是机器学习的一个分支,它关注的是智能体(agent)如何在环境中采取行动以最大化某种累积奖励(reward)。与监督学习和无监督学习不同,强化学习不要求智能体有明确的训练数据,而是通过与环境的交互来学习最优的行为策略.

背景知识

对数概率

采样的对数概率是指从某个概率分布中采样得到一个特定值时,该值的对数概率。对数概率通常用于数值稳定性和计算效率。

假设有一个概率分布 P, 其概率质量函数 (PMF) 或概率密度函数 (PDF) 为 $p(x)$, 对于从该分布中采样得到的值 $x$, 其对数概率定义为:

\[\log_e{p(x)}\]
  • 为什么使用对数概率?

    1. 数值稳定性:在处理非常小的概率值时,直接计算概率可能会导致数值下溢(underflow)。对数概率可以避免这种情况,因为对数函数可以将非常小的数值转换为较大的负数。

    2. 计算效率:对数概率在计算梯度时更加稳定,尤其是在优化算法中,如策略梯度方法。

分布的熵是衡量概率分布不确定性的指标。熵越高,表示分布的不确定性越大;熵越低,表示分布的不确定性越小。

对于离散概率分布 $P$, 其熵 $H(P)$ 定义为:

\[H(P) = - \sum_i p_i \log_e{p_i}\]

其中, $p_i$ 是第 $i$ 个类别的概率.

  • 为什么需要熵?

    1. 衡量不确定性:熵可以用来衡量一个概率分布的不确定性。在强化学习中,熵可以用来鼓励探索,避免策略过早收敛到某个动作。

    2. 正则化:在训练过程中,添加熵项作为正则化项可以防止模型过度自信,提高模型的泛化能力。

假设有一个概率分布 $P = [0.1, 0.3, 0.6]$ , 那么其熵为 $H(P) = -(0.1\log{0.1} + 0.3\log{0.3} + 0.6\log{0.6})$

我们使用 pytorch 计算最终结果:

import torch
from torch.distributions import Categorical

# 定义一个概率分布
probs = torch.tensor([0.1, 0.3, 0.6])
dist = Categorical(probs)

# 计算分布的熵
entropy = dist.entropy()
print("分布的熵:", entropy.item())  # 输出熵
分布的熵: 0.897945761680603

KL 散度

  1. 定义

    KL 散度(Kullback-Leibler Divergence),也称为相对熵,是衡量两个概率分布差异的一种方法。它在信息论、统计学和机器学习等领域有广泛的应用。

    KL 散度衡量的是两个概率分布 $P$ 和 $Q$ 之间差异的非对称距离

    • 对于离散分布

      \[D_{KL}(P | Q) = \sum P(x) log( \frac {P(x)} {Q(x)} )\]
    • 对于连续分布

      \[D_{KL}(P | Q) = D_{KL}(P | Q) = \int P(x) \log \left( \frac{P(x)}{Q(x)} \right) dx\]
  2. 性质

    1. 非负性

      $D_{KL}(P \parallel Q) \ge 0$ 当且仅当 $P = Q$ 时, KL 散度为0

    2. 非对称性

      KL 散度是非对称的, $D_{KL}(P \parallel Q) \neq D_{KL}(Q \parallel P)$, 从 $P$ 到 $Q$ 的 KL 散度与 $Q$ 到 $P$ 的 KL 散度是不同的

  3. 计算示例

    假设我们有两个离散概率分布 $P$ 和 $Q$:

    \(P = [0.4, 0.2, 0.3]\) \(Q = [0.3, 0.5, 0.2]\)

    则 KL 散度 $D_{KL}(P \parallel Q)$ 为:

    \[D_{KL}(P | Q) = 0.4log(\frac {0.4} {0.3}) + 0.2log(\frac {0.2} {0.5}) + 0.3log(\frac {0.3} {0.2}) = 0.05345\]
  • KL 散度损失

    KL散度损失(Kullback-Leibler Divergence Loss)是一种基于KL散度的损失函数,广泛应用于机器学习和深度学习中,尤其是在需要衡量两个概率分布差异的场景中。KL散度损失可以用于正则化、模型训练、生成模型(如变分自编码器VAE)和强化学习等领域。以下是对KL散度损失的详细解析,包括其定义、计算方法、应用场景和实现方式。

    在强化学习中,KL散度损失可以用于限制策略更新的幅度。例如,在PPO(Proximal Policy Optimization)中,KL散度损失用于确保策略更新不会偏离旧策略太远。

    pytorch 实现:

      import torch
      import torch.nn as nn
    
      # 定义两个概率分布
      P = torch.tensor([0.4, 0.2, 0.4], dtype=torch.float32)
      Q = torch.tensor([0.3, 0.5, 0.2], dtype=torch.float32)
    
      # 计算KL散度损失
      kl_loss = nn.KLDivLoss(reduction="sum")
      loss = kl_loss(torch.log(Q), P)
    
      print(f"KL散度损失: {loss.item()}")
    

    自定义实现:

      import numpy as np
    
      # 定义两个概率分布
      P = np.array([0.4, 0.2, 0.4])
      Q = np.array([0.3, 0.5, 0.2])
    
      # 计算KL散度损失
      kl_divergence = np.sum(P * np.log(P / Q))
    
      print(f"KL散度损失: {kl_divergence}")
    

随机梯度上升, SGA

随机梯度上升是一种优化算法,用于最大化目标函数。它通过在每次迭代中随机选择一个样本(或小批量样本)来近似计算目标函数的梯度,并沿着梯度方向更新参数。

假设目标函数为 $J(\theta)$, 其中 $\theta$ 是模型参数, 随机梯度上升的目标是最大化 $J(\theta)$, 在每次迭代中, 随机选择一个样本 $(x^(i), y^(i))$, 计算梯度:

\[\nabla_{\theta} J(\theta) \approx \nabla_{\theta} J_i(\theta)\]

更新参数:

\[\theta_{k+1} = \theta_k + \alpha \cdot \nabla_{\theta} J_i(\theta_k)\]

如果以上更新式是 $\theta_{k+1} = \theta_k - \alpha \cdot \nabla_{\theta} J_i(\theta_k)$ 则是随机梯度下降法, 参数更新的方向相反

其中:

  • $\alpha$ 是学习率

  • $J_i(\theta)$ 是基于样本 $i$ 的目标函数

SGA 和 随机梯度下降 SGD 的区别

  • SGA 用于最大化似然函数, 策略梯度等

  • SGD 用于最小化损失函数, 如线性回归, 逻辑回归等

Episode

episode 是指从初始状态开始,到结束状态(终止状态)为止的一系列状态、动作和奖励的序列。在这个过程中,智能体(agent)会根据当前的状态选择动作,环境(environment)会根据智能体的动作返回新的状态和奖励,这个过程会一直持续,直到达到终止状态。在强化学习的训练过程中,通常会通过多个 episode 来让智能体学习。智能体会在每个 episode 中尝试不同的策略,通过观察奖励来调整自己的行为,从而逐渐学习到更好的策略。也可以通过 episode 来评估智能体的性能。例如,统计智能体在多个 episode 中获得的平均奖励,或者统计智能体成功完成任务(达到终止状态)的比例等,以此来衡量智能体的学习效果。

  • 性质

    • 独立性

      不同的 episode 之间通常是独立的。虽然智能体会根据之前 episode 的经验来调整自己的策略,但每个 episode 的过程本身是独立的,不会受到其他 episode 的直接影响。

    • 有限状态

      每个 episode 都有明确的起点(初始状态)和终点(终止状态)。初始状态通常是预先设定的,终止状态则是满足某个特定条件的状态,比如游戏结束、任务完成等。

PPO

PPO(Proximal Policy Optimization,近端策略优化)是一种基于策略优化的强化学习算法,由 OpenAI 于 2017 年提出。它旨在解决传统策略梯度方法中训练不稳定、效率低下的问题。PPO 通过引入一个截断的目标函数,限制新策略和旧策略之间的差异,避免策略更新过快导致的不稳定性,从而提高训练的稳定性和效率。

  • PPO 的主要组成部分

    1. 策略网络 (Actor)

      用于根据当前状态选择动作, 学习策略 $\pi(a \parallel s)$, 即在给定状态下可选择动作的概率分布.

    2. 价值网络 (Critic)

      用于估计状态价值函数 $V(s)$, 衡量从当前状态开始能够获得的预期累积奖励值.

    3. 奖励函数 (Reward)

      用于衡量智能体在特定状态下采取特定动作后获得的即时奖励.

    4. 优势函数 (Advantage)

      衡量在特定状态下采取特定动作相对于平均策略的优势程度.

    5. 超参数 (Hyper Parameters)

      包括截断范围参数 $\epsilon$, 折扣因子 $\gamma$, 学习率, 熵正则化系数等, 用于干预算法的运行细节.

  • PPO 中各网络的的损失函数

    1. 策略网络 (Actor) 损失

      策略网络的损失函数由截断的目标函数(Clipped Surrogate Objective, CSO)和策略熵正则化项(Entropy Regularization, ER)构成.

      • 其中, 截断的目标函数的数学表示为:

        \[L^{CLIP}(\theta) = E[min(r_t(\theta)A_t,clip(r_t(\theta), 1 - \epsilon, 1 + \epsilon)A_t)]\]

        其中:

        • $E$ 是数学期望函数

        • $min$ 是求最小值的函数

        • $clip$ 是用于将 $r_t(\theta)$ 限制在 $[1-\epsilon, 1+\epsilon]$ 范围内的函数

        • $r_t(\theta) = \frac {\pi_{\theta}(a \parallel s)} {\pi_{\theta_{old}}(a \parallel s)}$ 是新旧策略对动作选择概率的比率

        • $A_t$ 是优势函数

          优势函数的计算见后文

        • $\epsilon$ 是控制截断范围的参数, 用于控制策略更新的幅度

        其主要作用是:

        • 限制策略更新幅度: 截断的目标函数通过限制新旧策略之间的差异,防止策略更新过大而导致训练不稳定。它确保策略在每次更新时不会发生剧烈变化,从而保持训练的稳定性.

        • 优化策略方向: 该函数利用优势函数 $A_t$ 来引导策略的更新方向,使得策略更倾向于选择那些能够获得更高累计奖励的动作.

      • 而策略熵正则化项的数学表示为:

        \[L^{ENT}(\theta) = E_t[H(\pi_\theta(s_t))]\]

        其中, $H(\pi_\theta(s_t))$ 是策略的熵, 具体计算式为:

        \[H(\pi_\theta(s_t)) = - \sum_a \pi_\theta(a | s_t)log_e\pi_\theta(a | s_t)\]

        其主要作用是:

        • 鼓励探索: 策略熵正则化项通过增加策略的熵,鼓励策略在选择动作时更加随机,从而促进智能体对环境的充分探索。这有助于避免策略过早收敛到局部最优解.

        • 平衡策略更新: 在总损失函数中,策略熵正则化项与截断的目标函数一起,通过权重系数 $c_2$ 来平衡策略的探索与稳定性.

      综上, 我们可以得到策略网络的总损失函数:

      \[L^{policy}(\theta) = L^{CLIP}(\theta) - c_2L^{ENT}(\theta)\]

      其中, $c_2$ 是权重系数, 用于平衡截断的目标函数和熵正则化项.

    2. 价值网络 (Critic) 损失

      价值网络的损失函数通常采用均方误差 (MSE) 来衡量预测值和目标值之间的差异, 数学表示为:

      \[L^{VF}(\theta) = E[(V(s_t) - V^{target}(s_t))^2]\]

      其中:

      • $V(s_t)$ 是价值网络对状态 $s_t$ 的预测值, 而 $V^{target}(s_t)$ 是目标值, 它可以由不同的方法计算得出:

        1. 蒙特卡洛估计 (Monte Carlo Estimation)

          蒙特卡洛估计通过采样整个序列的回报来计算目标值, 如果从状态 $s_t$ 开始, 直到终止状态的回报为 $G_t$, 则有:

          \[V^{target}(s_t) = G_t\]

          其中, $G_t$ 通常定义为: $G_t = R_{t+1} + \gamma R_{t+2} + \gamma^2R_{t+3} + … + \gamma^{T-t-1}R_T$, 这里, $R_i$ 是在时间步 $i$ 获得的即时奖励, $\gamma$ 是折扣因子, $T$ 是序列的终止时间步.

        2. 时间差分 (Temporal Difference, TD) 目标

          TD 目标结合即时奖励和下一个状态的预测值来计算目标值. 常见的 TD 目标如下:

          • $TD(0)$:

            $V^{target}(s_t) = R_{t+1} + \gamma V(s_{t+1})$

            其中, $R_{t+1}$ 是智能体在时间步 $t$ 处于状态 $s_t$ 并执行动作 $a_t$ 后, 环境(奖励函数)反馈的奖励值, $\gamma$ 是折扣因子, 范围在 0 到 1 之间, 用于决定未来奖励的当前价值, $V(s_{t+1})$ 是对下一个状态 $s_{t+1}$ 的价值预测.

          • $TD(\lambda)$:

            $V^{target}(s_t) = (1 - \lambda)V^{target}(s_t) + \lambda(R_{t + 1} + \gamma V(s_{t+1}))$

            其中, $\lambda$ 是一个介于 0 和 1 之间的参数, 用于控制目标值的平滑程度.

          在这里我给出我的理解, 这可能有点绕, 使用 TD 方法时, 目标值 $V^{target}(s_t)$ 是会基于当前的价值函数 $V$ 计算得出的, 而价值函数会通过参数更新向 $V^{target}$ 逼近, 以更好地拟合目标值. 这种更新方法的有效性基于以下几点:

          • 引导: TD 方法使用当前价值函数对下一个状态的价值估计引导目标值计算, 能够在不需要等待整个 episode.

          • 逐步改进: 每次更新都基于当前的估计和实际的环境反馈, 逐步改进价值函数的准确性.

          • 平衡探索与利用: 通过不断调整价值函数, 智能体能够在探索新策略和利用已知信息之间找到平衡, 更有效地学习更优策略.

    3. 总损失函数

      虽然策略网络和价值网络的损失函数是分开计算的,但在某些深度神经网络实现中,它们共享部分网络结构(例如,使用一个共同的特征提取部分)。通过将两个损失函数组合成一个总损失函数,可以同时优化整个网络的参数,提高训练效率和稳定性。

      总损失函数通常如下构造:

      \[L(\theta) = E_t[L^{CLIP}(\theta) - c_1L^{VF}(\theta) + c_2L^{ENT}(\theta)]\]

      其中, $c_1, c_2$ 是权重系数, 用于平衡策略优化, 值函数更新和熵正则化.

  • 计算优势函数

    在 PPO 算法中, 优势函数 $A_t$ 是一个关键组件, 用于衡量智能体在状态 $s_t$ 下采取动作 $a_t$ 相对于平均策略的优势程度, 并指导策略的更新方向. 优势函数有多种计算方法:

    1. 蒙特卡洛方法 (MC)

      直接使用采样得到的回报减去状态价值函数的估计.

      \[A_t = G_t - V(s_t)\]

      $G_t$ 是从时间步 $t$ 开始的累计折扣奖励, $V(s_t)$ 是状态 $s_t$ 的价值函数估计值.

    2. 时序差分方法 (TD)

      利用 TD 误差, 来估计优势函数

      \[A_t = \delta_t + \gamma \lambda \delta_{t+1} + (\gamma \lambda)^2 \delta_{t+2} + \ldots\]

      其中, $\delta_t = r_t + \gamma V(s_{t+1}) - V(s_t)$ 是时序差分误差, $\gamma$ 是折扣因子, $\lambda$ 是一个介于 0 和 1 之间的参数, 用于控制优势函数的时间范围.

    3. 广义优势估计 (GAE, 最常见)

      其结合了 MC 和 TD 的优点, 数学表示为:

      \[A_t = \sum_{l=0}^{n-1}(\gamma\lambda)^l\delta_{t+l}\]

      其中, $n$ 是采样步数, GAE 通过调整 $\gamma$ 参数在偏差和方差之间取得平衡, 提供更稳定的优势函数估计.

      GAE 这一块我不理解, 后续补充

    • 优势函数的具体作用

      1. 指导策略更新

        优势函数 $A_t$ 用于衡量智能体在状态 $s_t$ 下采取动作 $a_t$ 相对于当前策略的平均表现有多好, 如果 $A_t > 0$, 表示该动作比平均策略更好, 如果 $A_t < 0$ 表示该动作比平均策略更差, 策略更新时, 会倾向于增加 $A_t > 0$ 的动作的概率, 减少 $A_t < 0$ 的动作的概率.

      2. 减少方差

        通过使用价值函数作为基线(Baseline),优势函数能够减少策略梯度估计的方差,提高学习的稳定性.

  • pytorch 实现 PPO

      import gymnasium as gym
      import torch
      import torch.nn as nn
      import torch.optim as optim
      import torch.nn.functional as F
    
    
      # 定义策略网络,用于输出动作的概率分布
      class PolicyNet(nn.Module):
          def __init__(self, state_dim, hidden_dim, action_dim):
              super(PolicyNet, self).__init__()
              self.fc1 = nn.Linear(state_dim, hidden_dim)  # 输入层到隐藏层的全连接层
              self.fc2 = nn.Linear(hidden_dim, action_dim)  # 隐藏层到输出层的全连接层
    
          def forward(self, x):
              x = F.relu(self.fc1(x))  # 使用ReLU激活函数对隐藏层输出进行激活
              return F.softmax(self.fc2(x), dim=1)  # 使用softmax函数将输出层的值转换为概率分布
    
    
      # 定义价值网络,用于输出状态的价值
      class ValueNet(nn.Module):
          def __init__(self, state_dim, hidden_dim):
              super(ValueNet, self).__init__()
              self.fc1 = nn.Linear(state_dim, hidden_dim)  # 输入层到隐藏层的全连接层
              self.fc2 = nn.Linear(hidden_dim, 1)  # 隐藏层到输出层的全连接层
    
          def forward(self, x):
              x = F.relu(self.fc1(x))  # 使用ReLU激活函数对隐藏层输出进行激活
              return self.fc2(x)  # 输出状态的价值
    
    
      # 定义PPO算法类
      class PPO:
          def __init__(self, state_dim, hidden_dim, action_dim, actor_lr, critic_lr,
                      lmbda, epochs, eps, gamma, device):
              # 初始化策略网络和价值网络
              self.actor = PolicyNet(state_dim, hidden_dim, action_dim).to(device)
              self.critic = ValueNet(state_dim, hidden_dim).to(device)
              # 定义优化器,用于更新策略网络和价值网络的参数
              self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=actor_lr)
              self.critic_optimizer = optim.Adam(self.critic.parameters(), lr=critic_lr)
              # 设置超参数
              self.gamma = gamma  # 折扣因子
              self.lmbda = lmbda  # 优势函数的通用优势估计参数
              self.epochs = epochs  # 每次更新时的训练轮数
              self.eps = eps  # PPO算法中的截断范围参数
              self.device = device  # 设备(CPU或GPU)
    
          # 定义策略网络的采样函数,根据当前状态选择动作
          def take_action(self, state):
              state = torch.tensor([state], dtype=torch.float).to(self.device)  # 将状态转换为张量
              probs = self.actor(state)  # 计算动作的概率分布
              action_dist = torch.distributions.Categorical(probs)  # 定义分类分布
              action = action_dist.sample()  # 从分布中采样动作
              return action.item()  # 返回动作的值
    
          # 定义PPO算法的更新函数
          def update(self, transition_dict):
              # 将数据转换为张量
              states = torch.tensor(transition_dict['states'], dtype=torch.float).to(self.device)
              actions = torch.tensor(transition_dict['actions']).view(-1, 1).to(self.device)
              rewards = torch.tensor(transition_dict['rewards'], dtype=torch.float).view(-1, 1).to(self.device)
              next_states = torch.tensor(transition_dict['next_states'], dtype=torch.float).to(self.device)
              dones = torch.tensor(transition_dict['dones'], dtype=torch.float).view(-1, 1).to(self.device)
    
              # 计算时序差分误差和优势函数
              td_target = rewards + self.gamma * self.critic(next_states) * (1 - dones)  # 计算时序差分目标
              td_delta = td_target - self.critic(states)  # 计算时序差分误差
              advantage = compute_advantage(self.gamma, self.lmbda, td_delta.cpu()).to(self.device)  # 计算优势函数
    
              # 计算旧策略的概率
              old_log_probs = torch.log(self.actor(states).gather(1, actions)).detach()  # 计算旧策略下动作的概率
    
              # 更新策略网络和价值网络
              for _ in range(self.epochs):
                  log_probs = torch.log(self.actor(states).gather(1, actions))  # 计算新策略下动作的概率
                  ratio = torch.exp(log_probs - old_log_probs)  # 计算概率比值
                  surr1 = ratio * advantage  # 计算目标函数的第一个部分
                  surr2 = torch.clamp(ratio, 1 - self.eps, 1 + self.eps) * advantage  # 计算目标函数的第二个部分
                  actor_loss = torch.mean(-torch.min(surr1, surr2))  # 计算策略网络的损失函数
                  critic_loss = torch.mean(F.mse_loss(self.critic(states), td_target.detach()))  # 计算价值网络的损失函数
                  self.actor_optimizer.zero_grad()  # 清空策略网络的梯度
                  self.critic_optimizer.zero_grad()  # 清空价值网络的梯度
                  actor_loss.backward()  # 计算策略网络的梯度
                  critic_loss.backward()  # 计算价值网络的梯度
                  self.actor_optimizer.step()  # 更新策略网络的参数
                  self.critic_optimizer.step()  # 更新价值网络的参数
    
    
      # 定义计算优势函数的函数
      def compute_advantage(gamma, lmbda, td_delta):
          advantage_list = []  # 存储优势函数的列表
          advantage = 0.0  # 初始化优势函数
          # 从后往前计算优势函数
          for delta in reversed(td_delta):
              advantage = gamma * lmbda * advantage + delta.item()  # 计算优势函数
              advantage_list.append(advantage)  # 将优势函数添加到列表中
          advantage_list.reverse()  # 将列表反转,恢复顺序
          return torch.tensor(advantage_list, dtype=torch.float)  # 返回优势函数的张量
    
    
      # 设置超参数
      actor_lr = 1e-3  # 策略网络的学习率
      critic_lr = 1e-2  # 价值网络的学习率
      num_episodes = 500  # 训练的轮数
      hidden_dim = 128  # 隐藏层的维度
      gamma = 0.98  # 折扣因子
      lmbda = 0.95  # 优势函数的通用优势估计参数
      epochs = 10  # 每次更新时的训练轮数
      eps = 0.2  # PPO算法中的截断范围参数
      device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")  # 设置设备
    
      # 创建环境
      env_name = 'CartPole-v1'
      env = gym.make(env_name)
    
      # 初始化PPO算法
      state_dim = env.observation_space.shape[0]  # 状态的维度
      action_dim = env.action_space.n  # 动作的维度
      agent = PPO(state_dim, hidden_dim, action_dim, actor_lr, critic_lr, lmbda, epochs, eps, gamma, device)
    
      # 训练过程
      for episode in range(num_episodes):
          state = env.reset()[0]  # 重置环境,获取初始状态
          done = False  # 是否完成标志
          episode_return = 0  # 当前回合的回报
          transition_dict = {'states': [], 'actions': [], 'next_states': [], 'rewards': [], 'dones': []}  # 存储数据的字典
          while not done:
              action = agent.take_action(state)  # 根据当前策略选择动作
              next_state, reward, terminated, truncated, _ = env.step(action)  # 执行动作,获取下一个状态、奖励等信息
              done = terminated or truncated  # 判断是否完成
              transition_dict['states'].append(state)  # 添加当前状态
              transition_dict['actions'].append(action)  # 添加当前动作
              transition_dict['next_states'].append(next_state)  # 添加下一个状态
              transition_dict['rewards'].append(reward)  # 添加奖励
              transition_dict['dones'].append(done)  # 添加是否完成标志
              state = next_state  # 更新当前状态
              episode_return += reward  # 累加回报
          agent.update(transition_dict)  # 更新策略网络和价值网络
          print(f"Episode: {episode+1}, Return: {episode_return}")  # 打印当前回合的信息
    

GRPO

…待续

DPO

…待续

RLHF

…待续