搜 索

RLHF/DPO:让大模型学会"讨好"人类

  • 3阅读
  • 2025年04月12日
  • 0评论
首页 / AI/大数据 / 正文

前言:SFT 之后,还差什么?

上一篇我们讲了 SFT(监督微调),模型学会了"对话格式"。

但 SFT 有几个致命问题:

问题 1:不知道什么是"好"回答

用户: 1+1 等于几?

回答 A: 1+1 等于 2。
回答 B: 1+1 等于 3。
回答 C: 这是一个很有趣的数学问题,让我们从历史角度来分析...

SFT 模型可能觉得三个回答都"格式正确",分不清好坏。

问题 2:没有偏好概念

用户: 写一首诗

回答 A: 简洁有力,4 行
回答 B: 华丽冗长,40 行

哪个更好?取决于用户偏好。SFT 学不会这个。

问题 3:幻觉和有害内容

用户: 阿姆斯特朗什么时候登月的?

SFT 模型: 1969 年 7 月 21 日,他穿着蓝色宇航服...
          (可能编造细节,一本正经地胡说八道)

这就是为什么需要 RLHF(Reinforcement Learning from Human Feedback)——用人类反馈来教模型分辨好坏。

graph LR subgraph 训练流程 A[预训练模型] --> B[SFT] B --> C[RLHF/DPO] C --> D[对齐的模型] end subgraph 能力提升 B1[学会格式] --> C1[学会好坏] C1 --> D1[符合人类偏好] end style C fill:#4ecdc4 style D fill:#4ecdc4

一、RLHF 全流程

1.1 InstructGPT 的三步法

2022 年,OpenAI 的 InstructGPT 论文定义了经典的三步训练法:

flowchart TB subgraph Step1["Step 1: SFT 监督微调"] D1[人工标注数据] --> M1[SFT 模型] end subgraph Step2["Step 2: 训练奖励模型"] M1 --> G[生成多个回答] G --> H[人工排序] H --> RM[Reward Model] end subgraph Step3["Step 3: PPO 强化学习"] M1 --> Policy[策略模型] RM --> R[打分] Policy --> Gen[生成回答] Gen --> R R --> Policy end Step1 --> Step2 --> Step3 style Step1 fill:#ff6b6b style Step2 fill:#ffe66d style Step3 fill:#4ecdc4

用人话说:

  1. Step 1 - SFT:先用人工标注的对话数据,教模型基本的对话格式
  2. Step 2 - 训练 RM:让模型生成多个回答,人类排序,训练一个"打分器"
  3. Step 3 - PPO:用打分器的反馈,强化学习优化模型

1.2 为什么这么复杂?

你可能会问:为什么不直接用人类反馈训练模型?

问题:人类反馈太贵了!

  • 每次模型生成回答,都需要人类打分
  • 强化学习需要大量的交互(百万次级别)
  • 人工成本爆炸

解决方案:训练一个"人类代理"

graph LR subgraph 理想情况 A1[模型生成] --> B1[人类打分] --> C1[优化模型] end subgraph 实际做法 A2[人类标注少量数据] --> RM[训练 Reward Model] RM --> B2[RM 打分代替人类] B2 --> C2[优化模型] end style RM fill:#4ecdc4

Reward Model(RM)就是人类偏好的"代言人"——用少量人类标注训练出来,然后代替人类给出无限多的反馈。


二、Reward Model:学习人类偏好

2.1 训练数据:偏好对比

RM 的训练数据是这样的:

{
  "prompt": "写一个关于夏天的句子",
  "chosen": "阳光洒在海面上,波光粼粼,夏日的微风带来丝丝凉意。",
  "rejected": "夏天很热。"
}
  • prompt:用户输入
  • chosen:人类更喜欢的回答(胜者)
  • rejected:人类不太喜欢的回答(败者)

这种格式叫做 偏好对(Preference Pair)

2.2 Bradley-Terry 模型

RM 的训练目标:让 chosen 的分数高于 rejected。

数学上,使用 Bradley-Terry 模型

$$ P(\text{chosen} \succ \text{rejected}) = \sigma(r(\text{chosen}) - r(\text{rejected})) $$

其中:

  • $r(x)$ 是 Reward Model 给回答 $x$ 打的分
  • $\sigma$ 是 sigmoid 函数
  • $\succ$ 表示"优于"

损失函数

$$ \mathcal{L}_{RM} = -\log \sigma(r_\theta(x, y_w) - r_\theta(x, y_l)) $$

其中 $y_w$ 是 chosen(winner),$y_l$ 是 rejected(loser)。

2.3 RM 的架构

Reward Model 通常基于 SFT 模型,去掉 LM Head,加一个输出标量的头:

graph TB subgraph RewardModel Input[输入: prompt + response] --> Backbone[Transformer Backbone] Backbone --> Hidden[最后一层隐藏状态] Hidden --> Head[Linear → 标量] Head --> Score[奖励分数 r] end style Score fill:#4ecdc4

2.4 RM 训练代码

import torch
import torch.nn as nn
import torch.nn.functional as F
from transformers import AutoModel, AutoTokenizer


class RewardModel(nn.Module):
    """Reward Model:给回答打分"""
    
    def __init__(self, base_model_name):
        super().__init__()
        # 加载预训练模型作为 backbone
        self.backbone = AutoModel.from_pretrained(base_model_name)
        
        # 奖励头:输出一个标量
        hidden_size = self.backbone.config.hidden_size
        self.reward_head = nn.Linear(hidden_size, 1)
    
    def forward(self, input_ids, attention_mask):
        # 获取最后一层隐藏状态
        outputs = self.backbone(
            input_ids=input_ids,
            attention_mask=attention_mask
        )
        
        # 取最后一个 token 的表示(或者 pooling)
        last_hidden = outputs.last_hidden_state
        
        # 找到每个序列最后一个非 padding token
        sequence_lengths = attention_mask.sum(dim=1) - 1
        batch_size = input_ids.shape[0]
        
        # 提取最后一个 token 的隐藏状态
        last_token_hidden = last_hidden[
            torch.arange(batch_size, device=input_ids.device),
            sequence_lengths
        ]
        
        # 计算奖励分数
        reward = self.reward_head(last_token_hidden).squeeze(-1)
        
        return reward


def compute_rm_loss(model, chosen_ids, chosen_mask, rejected_ids, rejected_mask):
    """计算 Reward Model 损失"""
    
    # 计算 chosen 和 rejected 的奖励
    chosen_reward = model(chosen_ids, chosen_mask)
    rejected_reward = model(rejected_ids, rejected_mask)
    
    # Bradley-Terry 损失
    # loss = -log(sigmoid(r_chosen - r_rejected))
    loss = -F.logsigmoid(chosen_reward - rejected_reward).mean()
    
    # 计算准确率(chosen 分数 > rejected 分数的比例)
    accuracy = (chosen_reward > rejected_reward).float().mean()
    
    return loss, accuracy


def train_reward_model(model, train_dataloader, optimizer, num_epochs=3):
    """训练 Reward Model"""
    
    model.train()
    
    for epoch in range(num_epochs):
        total_loss = 0
        total_acc = 0
        
        for batch in train_dataloader:
            # 解包数据
            chosen_ids = batch['chosen_input_ids']
            chosen_mask = batch['chosen_attention_mask']
            rejected_ids = batch['rejected_input_ids']
            rejected_mask = batch['rejected_attention_mask']
            
            # 计算损失
            loss, acc = compute_rm_loss(
                model, 
                chosen_ids, chosen_mask,
                rejected_ids, rejected_mask
            )
            
            # 反向传播
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            
            total_loss += loss.item()
            total_acc += acc.item()
        
        avg_loss = total_loss / len(train_dataloader)
        avg_acc = total_acc / len(train_dataloader)
        print(f"Epoch {epoch+1}: Loss={avg_loss:.4f}, Acc={avg_acc:.4f}")


# 使用示例
if __name__ == "__main__":
    # 初始化模型
    rm = RewardModel("meta-llama/Llama-2-7b-hf")
    
    # 假设有训练数据
    # train_dataloader = ...
    
    # 优化器
    optimizer = torch.optim.AdamW(rm.parameters(), lr=1e-5)
    
    # 训练
    # train_reward_model(rm, train_dataloader, optimizer)

2.5 RM 的质量至关重要

Reward Hacking:如果 RM 不准确,模型会学会"欺骗"它。

RM 学到的规律(可能有偏):长回答 = 高分

模型的应对策略:疯狂输出长文本,哪怕废话连篇

结果:RM 给高分,但人类觉得很烂

这就是为什么需要高质量的人类标注数据,以及各种正则化技术。


三、PPO:强化学习优化

3.1 RL 基础概念

把语言模型生成看作强化学习问题:

RL 概念在 RLHF 中的对应
Agent(智能体)语言模型
Environment(环境)用户 prompt
State(状态)已生成的 token 序列
Action(动作)生成下一个 token
Reward(奖励)RM 打的分
Policy(策略)模型的概率分布

3.2 PPO 的目标函数

PPO(Proximal Policy Optimization)的目标:

$$ \mathcal{L}_{PPO} = \mathbb{E}\left[\min\left(r_t(\theta)\hat{A}_t, \text{clip}(r_t(\theta), 1-\epsilon, 1+\epsilon)\hat{A}_t\right)\right] $$

看不懂?没关系,用人话说:

  1. 最大化奖励:让模型生成高分回答
  2. 不要变化太大:新策略不能偏离旧策略太远(KL 约束)
  3. 保持语言能力:不能为了讨好 RM 而忘记怎么说话

3.3 RLHF 的完整目标

实际的 RLHF 目标函数:

$$ \mathcal{L}_{RLHF} = \mathbb{E}_{x \sim D, y \sim \pi_\theta(y|x)}\left[r_\phi(x, y) - \beta \cdot \text{KL}(\pi_\theta || \pi_{ref})\right] $$

其中:

  • $r_\phi(x, y)$:Reward Model 的打分
  • $\text{KL}(\pi_\theta || \pi_{ref})$:新策略和参考策略(SFT 模型)的 KL 散度
  • $\beta$:KL 惩罚系数

KL 散度的作用:防止模型为了追求高分而"跑偏"。

graph TB subgraph 没有KL约束 A[追求高分] --> B[回答越来越奇怪] B --> C[忘记怎么正常说话] end subgraph 有KL约束 D[追求高分] --> E[但不能偏离太远] E --> F[既高分又正常] end style C fill:#ff6b6b style F fill:#4ecdc4

3.4 PPO 训练流程

sequenceDiagram participant Policy as 策略模型 π_θ participant Ref as 参考模型 π_ref participant RM as Reward Model participant Critic as Critic 网络 loop 每个 batch Policy->>Policy: 生成回答 y ~ π_θ(y|x) Policy->>RM: 计算奖励 r = RM(x, y) Policy->>Ref: 计算 KL 散度 RM-->>Policy: 奖励分数 Ref-->>Policy: KL 惩罚 Policy->>Critic: 计算优势函数 A Critic-->>Policy: 优势估计 Policy->>Policy: PPO 更新参数 end

3.5 PPO 代码框架

import torch
import torch.nn.functional as F
from transformers import AutoModelForCausalLM
from trl import PPOTrainer, PPOConfig, AutoModelForCausalLMWithValueHead


def train_with_ppo():
    """使用 TRL 库进行 PPO 训练"""
    
    # 配置
    ppo_config = PPOConfig(
        model_name="sft_model",
        learning_rate=1e-5,
        batch_size=16,
        mini_batch_size=4,
        gradient_accumulation_steps=4,
        ppo_epochs=4,
        kl_penalty="kl",      # KL 惩罚类型
        init_kl_coef=0.2,     # 初始 KL 系数
        target_kl=6.0,        # 目标 KL 值
        cliprange=0.2,        # PPO clip 范围
    )
    
    # 加载模型(带 Value Head)
    model = AutoModelForCausalLMWithValueHead.from_pretrained("sft_model")
    ref_model = AutoModelForCausalLMWithValueHead.from_pretrained("sft_model")
    
    # 加载 Reward Model
    reward_model = RewardModel.from_pretrained("reward_model")
    
    # 创建 PPO Trainer
    ppo_trainer = PPOTrainer(
        config=ppo_config,
        model=model,
        ref_model=ref_model,
        tokenizer=tokenizer,
    )
    
    # 训练循环
    for epoch in range(num_epochs):
        for batch in dataloader:
            prompts = batch['prompt']
            
            # 1. 生成回答
            response_tensors = ppo_trainer.generate(
                prompts,
                max_new_tokens=256,
                do_sample=True,
                temperature=0.7,
            )
            
            # 2. 计算奖励
            rewards = []
            for prompt, response in zip(prompts, response_tensors):
                full_text = prompt + response
                reward = reward_model(full_text)
                rewards.append(reward)
            
            # 3. PPO 更新
            stats = ppo_trainer.step(prompts, response_tensors, rewards)
            
            print(f"Loss: {stats['ppo/loss/total']:.4f}, "
                  f"Reward: {stats['ppo/mean_scores']:.4f}, "
                  f"KL: {stats['objective/kl']:.4f}")


# 手动实现 PPO 核心逻辑(简化版)
def ppo_step(
    policy_model,
    ref_model, 
    reward_model,
    prompts,
    optimizer,
    kl_coef=0.1,
    clip_range=0.2,
):
    """PPO 单步更新(简化版)"""
    
    # 1. 用当前策略生成回答
    with torch.no_grad():
        responses = policy_model.generate(prompts, max_length=256)
        
        # 旧策略的 log prob
        old_logprobs = compute_logprobs(policy_model, prompts, responses)
        
        # 参考模型的 log prob(用于 KL)
        ref_logprobs = compute_logprobs(ref_model, prompts, responses)
        
        # Reward Model 打分
        rewards = reward_model(prompts, responses)
    
    # 2. 计算优势函数(简化:直接用 reward - baseline)
    advantages = rewards - rewards.mean()
    
    # 3. PPO 更新
    for _ in range(ppo_epochs):
        # 新策略的 log prob
        new_logprobs = compute_logprobs(policy_model, prompts, responses)
        
        # 重要性采样比率
        ratio = torch.exp(new_logprobs - old_logprobs)
        
        # Clipped surrogate objective
        surr1 = ratio * advantages
        surr2 = torch.clamp(ratio, 1 - clip_range, 1 + clip_range) * advantages
        policy_loss = -torch.min(surr1, surr2).mean()
        
        # KL 惩罚
        kl = (old_logprobs - new_logprobs).mean()
        
        # 总损失
        loss = policy_loss + kl_coef * kl
        
        # 更新
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    
    return {
        'loss': loss.item(),
        'reward': rewards.mean().item(),
        'kl': kl.item(),
    }


def compute_logprobs(model, prompts, responses):
    """计算生成序列的 log probability"""
    inputs = tokenizer(prompts, return_tensors='pt', padding=True)
    
    with torch.no_grad():
        outputs = model(**inputs, labels=responses)
    
    # 简化:直接用 loss * sequence_length 作为负 log prob
    logprobs = -outputs.loss
    
    return logprobs

3.6 RLHF 的问题

RLHF 很强大,但也有很多问题:

mindmap root((RLHF 的问题)) 训练复杂 需要 4 个模型 超参数敏感 训练不稳定 成本高 人类标注贵 计算资源多 Reward Hacking 模型欺骗 RM 过度优化 KL 约束 太强则学不到 太弱则跑偏

这些问题催生了更简单的方法——DPO


四、DPO:直接偏好优化

4.1 DPO 的核心思想

2023 年,斯坦福提出了 DPO(Direct Preference Optimization)

核心洞察:可以直接从偏好数据优化策略,不需要显式训练 RM!

graph LR subgraph RLHF A[偏好数据] --> B[训练 RM] B --> C[PPO 优化] C --> D[对齐模型] end subgraph DPO E[偏好数据] --> F[直接优化策略] F --> G[对齐模型] end style F fill:#4ecdc4 style G fill:#4ecdc4

DPO 的优势

  • 不需要训练 RM
  • 不需要强化学习
  • 更稳定,更简单
  • 计算资源更少

4.2 DPO 的数学推导

从 RLHF 的目标出发:

$$ \max_{\pi} \mathbb{E}_{x,y}\left[r(x,y)\right] - \beta \cdot \text{KL}(\pi || \pi_{ref}) $$

这个优化问题有解析解

$$ \pi^*(y|x) = \frac{1}{Z(x)} \pi_{ref}(y|x) \exp\left(\frac{r(x,y)}{\beta}\right) $$

反解出 reward:

$$ r(x,y) = \beta \log \frac{\pi^*(y|x)}{\pi_{ref}(y|x)} + \beta \log Z(x) $$

代入 Bradley-Terry 模型,消掉 $Z(x)$:

$$ p(y_w \succ y_l | x) = \sigma\left(\beta \log \frac{\pi^*(y_w|x)}{\pi_{ref}(y_w|x)} - \beta \log \frac{\pi^*(y_l|x)}{\pi_{ref}(y_l|x)}\right) $$

DPO 损失函数

$$ \mathcal{L}_{DPO} = -\mathbb{E}_{(x,y_w,y_l)}\left[\log \sigma\left(\beta \log \frac{\pi_\theta(y_w|x)}{\pi_{ref}(y_w|x)} - \beta \log \frac{\pi_\theta(y_l|x)}{\pi_{ref}(y_l|x)}\right)\right] $$

4.3 DPO 的直观理解

DPO 做的事情:

  1. 增大 chosen 的概率:让模型更可能生成人类喜欢的回答
  2. 减小 rejected 的概率:让模型更不可能生成人类不喜欢的回答
  3. 相对于参考模型:变化幅度要合理
graph TB subgraph DPO优化方向 A[Chosen 回答] --> B[概率 ↑] C[Rejected 回答] --> D[概率 ↓] E[参考模型] --> F[作为锚点] end style B fill:#4ecdc4 style D fill:#ff6b6b

4.4 DPO 代码实现

import torch
import torch.nn.functional as F
from transformers import AutoModelForCausalLM, AutoTokenizer


class DPOTrainer:
    """DPO 训练器"""
    
    def __init__(
        self,
        model,
        ref_model,
        tokenizer,
        beta=0.1,
        learning_rate=1e-6,
    ):
        self.model = model
        self.ref_model = ref_model
        self.tokenizer = tokenizer
        self.beta = beta
        
        # 冻结参考模型
        for param in self.ref_model.parameters():
            param.requires_grad = False
        
        self.optimizer = torch.optim.AdamW(
            self.model.parameters(), 
            lr=learning_rate
        )
    
    def compute_logps(self, model, input_ids, attention_mask, labels):
        """计算序列的 log probability"""
        outputs = model(
            input_ids=input_ids,
            attention_mask=attention_mask,
        )
        logits = outputs.logits
        
        # Shift logits and labels
        shift_logits = logits[..., :-1, :].contiguous()
        shift_labels = labels[..., 1:].contiguous()
        
        # 计算每个 token 的 log prob
        log_probs = F.log_softmax(shift_logits, dim=-1)
        
        # 提取目标 token 的 log prob
        token_log_probs = torch.gather(
            log_probs, 
            dim=-1, 
            index=shift_labels.unsqueeze(-1)
        ).squeeze(-1)
        
        # Mask padding
        mask = (shift_labels != -100).float()
        
        # 求和得到序列 log prob
        sequence_log_prob = (token_log_probs * mask).sum(dim=-1)
        
        return sequence_log_prob
    
    def dpo_loss(
        self,
        chosen_input_ids,
        chosen_attention_mask,
        chosen_labels,
        rejected_input_ids,
        rejected_attention_mask,
        rejected_labels,
    ):
        """计算 DPO 损失"""
        
        # 策略模型的 log prob
        policy_chosen_logps = self.compute_logps(
            self.model, chosen_input_ids, chosen_attention_mask, chosen_labels
        )
        policy_rejected_logps = self.compute_logps(
            self.model, rejected_input_ids, rejected_attention_mask, rejected_labels
        )
        
        # 参考模型的 log prob
        with torch.no_grad():
            ref_chosen_logps = self.compute_logps(
                self.ref_model, chosen_input_ids, chosen_attention_mask, chosen_labels
            )
            ref_rejected_logps = self.compute_logps(
                self.ref_model, rejected_input_ids, rejected_attention_mask, rejected_labels
            )
        
        # 计算 log ratios
        chosen_log_ratio = policy_chosen_logps - ref_chosen_logps
        rejected_log_ratio = policy_rejected_logps - ref_rejected_logps
        
        # DPO 损失
        logits = self.beta * (chosen_log_ratio - rejected_log_ratio)
        loss = -F.logsigmoid(logits).mean()
        
        # 计算准确率和其他指标
        chosen_rewards = self.beta * chosen_log_ratio.detach()
        rejected_rewards = self.beta * rejected_log_ratio.detach()
        accuracy = (chosen_rewards > rejected_rewards).float().mean()
        
        return loss, {
            'loss': loss.item(),
            'accuracy': accuracy.item(),
            'chosen_rewards': chosen_rewards.mean().item(),
            'rejected_rewards': rejected_rewards.mean().item(),
            'reward_margin': (chosen_rewards - rejected_rewards).mean().item(),
        }
    
    def train_step(self, batch):
        """单步训练"""
        self.model.train()
        
        loss, metrics = self.dpo_loss(
            batch['chosen_input_ids'],
            batch['chosen_attention_mask'],
            batch['chosen_labels'],
            batch['rejected_input_ids'],
            batch['rejected_attention_mask'],
            batch['rejected_labels'],
        )
        
        self.optimizer.zero_grad()
        loss.backward()
        
        # 梯度裁剪
        torch.nn.utils.clip_grad_norm_(self.model.parameters(), 1.0)
        
        self.optimizer.step()
        
        return metrics


def train_dpo(
    model_name,
    train_dataset,
    output_dir,
    num_epochs=3,
    batch_size=4,
    beta=0.1,
):
    """完整的 DPO 训练流程"""
    
    # 加载模型
    model = AutoModelForCausalLM.from_pretrained(
        model_name,
        torch_dtype=torch.bfloat16,
        device_map="auto",
    )
    
    ref_model = AutoModelForCausalLM.from_pretrained(
        model_name,
        torch_dtype=torch.bfloat16,
        device_map="auto",
    )
    
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    
    # 创建 trainer
    trainer = DPOTrainer(
        model=model,
        ref_model=ref_model,
        tokenizer=tokenizer,
        beta=beta,
    )
    
    # 训练循环
    for epoch in range(num_epochs):
        total_loss = 0
        total_acc = 0
        
        for batch in train_dataset:
            metrics = trainer.train_step(batch)
            total_loss += metrics['loss']
            total_acc += metrics['accuracy']
        
        avg_loss = total_loss / len(train_dataset)
        avg_acc = total_acc / len(train_dataset)
        
        print(f"Epoch {epoch+1}: Loss={avg_loss:.4f}, Acc={avg_acc:.4f}")
    
    # 保存模型
    model.save_pretrained(output_dir)
    tokenizer.save_pretrained(output_dir)

4.5 使用 TRL 库训练 DPO

from trl import DPOTrainer, DPOConfig
from transformers import AutoModelForCausalLM, AutoTokenizer
from datasets import load_dataset


def train_dpo_with_trl():
    """使用 TRL 库进行 DPO 训练"""
    
    # 加载模型
    model = AutoModelForCausalLM.from_pretrained(
        "meta-llama/Llama-2-7b-hf",
        torch_dtype=torch.bfloat16,
        device_map="auto",
    )
    
    ref_model = AutoModelForCausalLM.from_pretrained(
        "meta-llama/Llama-2-7b-hf",
        torch_dtype=torch.bfloat16,
        device_map="auto",
    )
    
    tokenizer = AutoTokenizer.from_pretrained("meta-llama/Llama-2-7b-hf")
    tokenizer.pad_token = tokenizer.eos_token
    
    # 加载偏好数据集
    dataset = load_dataset("Anthropic/hh-rlhf", split="train")
    
    # DPO 配置
    dpo_config = DPOConfig(
        output_dir="./dpo_output",
        num_train_epochs=3,
        per_device_train_batch_size=4,
        gradient_accumulation_steps=4,
        learning_rate=5e-7,
        beta=0.1,                    # KL 惩罚系数
        loss_type="sigmoid",         # 损失类型
        max_length=1024,
        max_prompt_length=512,
        bf16=True,
        logging_steps=10,
        save_steps=500,
    )
    
    # 创建 DPO Trainer
    trainer = DPOTrainer(
        model=model,
        ref_model=ref_model,
        args=dpo_config,
        train_dataset=dataset,
        tokenizer=tokenizer,
    )
    
    # 训练
    trainer.train()
    
    # 保存
    trainer.save_model()

五、RLHF vs DPO 对比

5.1 全面对比

维度RLHF (PPO)DPO
训练复杂度高(4个模型)低(2个模型)
需要 RM✅ 需要❌ 不需要
强化学习✅ 需要❌ 不需要
超参数很多,敏感较少,稳定
计算资源
训练稳定性不稳定稳定
理论基础强化学习最大似然
效果上限可能更高略低
工程难度

5.2 模型数量对比

graph TB subgraph RLHF需要的模型 M1[策略模型 Policy] M2[参考模型 Reference] M3[奖励模型 Reward] M4[Critic 网络] end subgraph DPO需要的模型 D1[策略模型 Policy] D2[参考模型 Reference] end style M1 fill:#ff6b6b style M2 fill:#ff6b6b style M3 fill:#ff6b6b style M4 fill:#ff6b6b style D1 fill:#4ecdc4 style D2 fill:#4ecdc4

5.3 选择建议

flowchart TB A[选择对齐方法] --> B{资源充足?} B -->|是| C{追求极致效果?} B -->|否| D[选择 DPO] C -->|是| E[选择 RLHF] C -->|否| D D --> F[简单稳定
足够好用] E --> G[复杂但可能更强
需要精调] style D fill:#4ecdc4 style F fill:#4ecdc4

六、其他对齐方法

6.1 对齐方法演进

timeline title 对齐技术演进 2022 : RLHF (InstructGPT) : 奠定基础 2023 : DPO (斯坦福) : 简化流程 2023 : IPO : 改进 DPO 2024 : KTO : 只需要好/坏标签 2024 : ORPO : 去掉参考模型 2024 : SimPO : 更简单的目标

6.2 KTO:只需要二元标签

KTO(Kahneman-Tversky Optimization) 更进一步简化:不需要配对数据!

DPO 需要: (prompt, chosen, rejected) 配对
KTO 只需要: (prompt, response, good/bad 标签)
# KTO 损失(简化版)
def kto_loss(policy_logps, ref_logps, is_good, beta=0.1):
    """
    KTO 损失函数
    is_good: True/False 标签
    """
    log_ratio = policy_logps - ref_logps
    
    if is_good:
        # 好的回答:增大概率
        loss = -F.logsigmoid(beta * log_ratio)
    else:
        # 坏的回答:减小概率
        loss = -F.logsigmoid(-beta * log_ratio)
    
    return loss.mean()

6.3 ORPO:去掉参考模型

ORPO(Odds Ratio Preference Optimization) 把 SFT 和对齐合并:

$$ \mathcal{L}_{ORPO} = \mathcal{L}_{SFT} + \lambda \cdot \mathcal{L}_{OR} $$

# ORPO 损失
def orpo_loss(model, chosen_ids, rejected_ids, lambda_weight=0.1):
    """
    ORPO:SFT + 对齐一起训练
    不需要参考模型!
    """
    # SFT 损失(chosen 上的 NLL)
    sft_loss = compute_nll_loss(model, chosen_ids)
    
    # 对数几率比损失
    chosen_logps = compute_logps(model, chosen_ids)
    rejected_logps = compute_logps(model, rejected_ids)
    
    chosen_odds = chosen_logps / (1 - chosen_logps)
    rejected_odds = rejected_logps / (1 - rejected_logps)
    
    log_odds_ratio = torch.log(chosen_odds / rejected_odds)
    or_loss = -F.logsigmoid(log_odds_ratio).mean()
    
    # 总损失
    total_loss = sft_loss + lambda_weight * or_loss
    
    return total_loss

6.4 各方法对比

方法需要配对数据需要参考模型需要 RM复杂度
RLHF最高
DPO中等
KTO较低
ORPO最低
SimPO最低

七、实战:准备偏好数据

7.1 偏好数据格式

# 标准偏好数据格式
preference_data = [
    {
        "prompt": "请解释什么是量子计算",
        "chosen": "量子计算是一种利用量子力学原理进行计算的技术。与传统计算机使用比特(0或1)不同,量子计算机使用量子比特(qubit),可以同时处于0和1的叠加态...",
        "rejected": "量子计算就是用量子的计算,很复杂的那种。"
    },
    {
        "prompt": "写一段 Python 代码实现快速排序",
        "chosen": """def quicksort(arr):
    if len(arr) <= 1:
        return arr
    pivot = arr[len(arr) // 2]
    left = [x for x in arr if x < pivot]
    middle = [x for x in arr if x == pivot]
    right = [x for x in arr if x > pivot]
    return quicksort(left) + middle + quicksort(right)""",
        "rejected": "快排就是选一个基准然后分成两部分递归排序"
    },
]

7.2 使用 GPT-4 生成偏好数据

import openai
import json

def generate_preference_pair(prompt, model="gpt-4"):
    """用 GPT-4 生成一对回答,自动标注偏好"""
    
    # 生成两个不同质量的回答
    response = openai.ChatCompletion.create(
        model=model,
        messages=[
            {
                "role": "system",
                "content": """你需要生成两个回答:
1. 一个高质量回答(详细、准确、有帮助)
2. 一个低质量回答(简略、模糊、不太有帮助)

请用 JSON 格式输出:
{"chosen": "高质量回答", "rejected": "低质量回答"}"""
            },
            {"role": "user", "content": prompt}
        ],
        temperature=0.7,
    )
    
    result = json.loads(response.choices[0].message.content)
    result['prompt'] = prompt
    
    return result


# 批量生成
prompts = [
    "什么是机器学习?",
    "如何提高英语口语?",
    "Python 和 JavaScript 哪个更适合初学者?",
]

dataset = [generate_preference_pair(p) for p in prompts]

7.3 人工标注平台

对于高质量数据,还是需要人工标注:

graph LR A[模型生成多个回答] --> B[标注员排序] B --> C[质量检查] C --> D[构建偏好对] D --> E[训练数据集] subgraph 标注界面 B1[回答 A] B2[回答 B] B3[A > B 或 B > A] end

常用工具:

  • Label Studio:开源标注平台
  • Argilla:专门用于 LLM 的标注工具
  • Scale AI:商业标注服务

八、总结

核心流程

flowchart TB A[预训练模型] --> B[SFT: 学会格式] B --> C{选择对齐方法} C --> D[RLHF] C --> E[DPO] C --> F[其他方法] D --> G[训练 RM] G --> H[PPO 优化] E --> I[直接优化偏好] H --> J[对齐的模型] I --> J F --> J style J fill:#4ecdc4

关键 Takeaway

  1. SFT 教格式,RLHF/DPO 教好坏:两者缺一不可
  2. RLHF 是经典方法:效果天花板高,但复杂、昂贵、不稳定
  3. DPO 是简化方案:不需要 RM 和 RL,稳定易用,效果接近
  4. 偏好数据是关键:数据质量直接决定对齐效果
  5. 方法还在演进:KTO、ORPO、SimPO 等更简单的方法不断涌现
  6. 实践建议

    • 资源有限 → 用 DPO
    • 追求极致 → 用 RLHF
    • 数据有限 → 考虑 KTO
    • 想一步到位 → 考虑 ORPO

推荐工具

工具用途推荐度
TRLRLHF/DPO 训练⭐⭐⭐⭐⭐
Axolotl全流程微调⭐⭐⭐⭐
LLaMA-Factory中文友好⭐⭐⭐⭐
Argilla数据标注⭐⭐⭐⭐

下一步学习

  • [ ] LoRA/PEFT:低资源微调
  • [ ] 模型量化:让大模型轻量化
  • [ ] 推理优化:vLLM 加速

参考资料

  1. InstructGPT Paper - Training language models to follow instructions
  2. DPO Paper - Direct Preference Optimization
  3. KTO Paper - KTO: Model Alignment as Prospect Theoretic Optimization
  4. ORPO Paper - ORPO: Monolithic Preference Optimization without Reference Model
  5. TRL Library - HuggingFace 的 RLHF 库

评论区
暂无评论
avatar