跳转至

实验任务二: 预训练语言模型

预训练语言模型

实验目标

通过本次实验,你将掌握以下内容:

  1. 使用GPU训练模型
  2. 了解预训练语言模型
  3. 使用预训练语言模型进行文本分类
  4. 比较不同句子聚合方式的效果
  5. 对分类错误样本进行简要分析

本次实验所用的预训练模型(BERT)下载链接如下:

预训练模型(BERT)下载链接:

https://box.nju.edu.cn/d/2710380144234ce78fe3/


1. 使用GPU训练模型

在PyTorch中,可以使用以下代码来检测当前环境是否有可用的GPU:

import torch

# 检查是否有可用的GPU
if torch.cuda.is_available():
    print(f"CUDA is available. Number of GPUs: {torch.cuda.device_count()}")
    print(f"Current device: {torch.cuda.current_device()}")
    print(f"Device name: {torch.cuda.get_device_name(torch.cuda.current_device())}")
else:
    print("CUDA is not available. Using CPU.")

如果显示'CUDA is not available. Using CPU.'请确认启动的环境是否正确或者尝试重新安装pytorch或者与助教联系。

GPU训练提示

如果要用GPU训练,则需要把数据和模型都放到GPU上才能训练。如果一个在CPU一个在GPU,则会报错。

定义模型后,通过model = model.to(device)把模型放到GPU上。 把模型放到GPU上的代码示例:

# 检查是否有可用的GPU
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# 创建模型
model = SimpleModel()

# 将模型放到GPU(如果可用)
model = model.to(device)

由于模型在GPU上,所以数据也必须在GPU上才能送入模型。通过inputs = inputs.to(device)把input放到GPU上。值得说明的是由于模型的输出也在GPU上,所以标签也需要放到GPU上以便于计算损失,通过labels = labels.to(device)。

把数据放到GPU上的代码示例:

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader

# 训练示例
num_epochs = 3
for epoch in range(num_epochs):
    model.train()
    for inputs, labels in train_loader:
        # 将数据放到GPU(如果可用)
        inputs, labels = inputs.to(device), labels.to(device)

        # 前向传播
        outputs = model(inputs)

通过上述过程,我们可以把数据和模型都放到GPU上从而加速训练。

你可以使用以下命令查看是否使用了GPU并且观察的GPU利用率:

watch -n 5 nvidia-smi

这个命令会每5秒(-n 5)更新一次NVIDIA GPU的状态信息。

2. 了解预训练语言模型

预训练语言模型简介

预训练语言模型(pre-trained language models)是指在大规模数据集上预先训练过的语言模型。这些模型已经学习到了一些基础的特征或知识,并可以被迁移到特定的任务上进行微调(fine-tuning)。

下面我们以BERT为例,用的bert-base-uncased版本进行实验。我们首先用AutoModel和AutoTokenizer加载模型和分词器。分词器是把文本的每个词元映射到对应的索引,以便于BERT的embedding层完成索引到嵌入的映射。

完整代码如下:

import torch
from transformers import AutoModel, AutoTokenizer

# 指定模型名称
model_name = 'bert-base-uncased'

# 读取模型对应的tokenizer
tokenizer = AutoTokenizer.from_pretrained(model_name)

# 载入模型
model = AutoModel.from_pretrained(model_name)

# 输入文本
input_text = "Here is some text to encode"

# 通过tokenizer把文本变成 token_id
input_ids = tokenizer.encode(input_text, add_special_tokens=True)
print(input_ids)

# 转换为Tensor
input_ids = torch.tensor([input_ids])

# 获得BERT的输出
with torch.no_grad():
    output = model(input_ids)

# 获得BERT模型最后一个隐层结果
output_hidden_states = output.last_hidden_state
output_hidden_states.shape
分词(tokenizer)的过程会在文本的头尾添加特殊token,即会在文本的开头加入词元[CLS]并且在文本的结尾加入词元[SEP]。你可以调整input_text和设置add_special_tokens=False,观察到这两个词元分别被编码为101和102。

除此之外,由于批处理过程需要一个批次中文本长度相同,因此额外引入了padding。所以,我们需要使用了attention_mask屏蔽这些padding token,不让其参与自注意力的计算。

最终的输出是文本中所有词元的隐藏状态(hidden states)。

我们可以用model.named_parameters(): 观察模型的所有参数及其形状,完整代码如下:

import torch
from transformers import AutoModel, AutoTokenizer

# 指定模型名称
model_name = 'bert-base-uncased'

# 读取模型对应的tokenizer
tokenizer = AutoTokenizer.from_pretrained(model_name)

# 载入模型
model = AutoModel.from_pretrained(model_name)

# 打印模型所有参数的名称和形状
for name, param in model.named_parameters():
    print(f"Parameter Name: {name}, Shape: {param.shape}")

3. 使用预训练模型进行文本分类

可能需要安装transformers包

   pip install transformers

在本章节中,你将基于上面的BERT代码和 AG NEWS 数据集完成基于预训练模型 BERT 的文本分类。你需要自己实现 BERTClassifier,并通过一个参数切换不同的句子聚合方式。需要完成的聚合方式包括:

  1. 直接使用 [CLS] 的嵌入表示当做句子嵌入。
  2. 使用 mean pooling 对一个句子中的有效词元做平均,得到句子嵌入。
  3. 使用注意力机制给每个词元分配一个权重,通过加权求和的方式得到句子嵌入。

学习率可以参考设置为 2e-5

实验要求

  1. BERTClassifier 需要通过参数切换 clsmeanattention 三种聚合方式,而不是写三个完全独立的模型。
  2. max_length 默认改为 64,并在报告中记录你最终使用的 max_lengthbatch_size
  3. 报告中至少比较三种聚合方式的测试结果。
  4. 报告中需要给出至少 2 条分类错误样本,并简要分析原因。
import torch
import torch.nn as nn
import pandas as pd
from torch.optim import AdamW
from torch.utils.data import Dataset, DataLoader
from transformers import AutoTokenizer, AutoModel
from sklearn.metrics import accuracy_score
from tqdm import tqdm

# **1. 加载 AG NEWS 数据集**
df = pd.read_csv("train.csv")  # 请替换成你的文件路径
df.columns = ["label", "title", "description"]
df["text"] = df["title"] + " " + df["description"]
df["label"] = df["label"] - 1
train_texts, train_labels = df["text"].tolist(), df["label"].tolist()
number = int(0.3 * len(train_texts))
train_texts, train_labels = train_texts[: number], train_labels[: number]

df = pd.read_csv("test.csv")  # 请替换成你的文件路径
df.columns = ["label", "title", "description"]
df["text"] = df["title"] + " " + df["description"]
df["label"] = df["label"] - 1
test_texts, test_labels = df["text"].tolist(), df["label"].tolist()

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# **2. 加载 BERT Tokenizer**
model_name = "bert-base-uncased"
tokenizer = AutoTokenizer.from_pretrained(model_name)

# **3. 处理数据**
class AGNewsDataset(Dataset):
    def __init__(self, texts, labels, tokenizer, max_length=64):
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = self.texts[idx]
        label = self.labels[idx]
        encoding = self.tokenizer(
            text,
            truncation=True,
            padding="max_length",
            max_length=self.max_length,
            return_tensors="pt",
        )
        return {
            "input_ids": encoding["input_ids"].squeeze(0),
            "attention_mask": encoding["attention_mask"].squeeze(0),
            "labels": torch.tensor(label, dtype=torch.long),
        }


train_dataset = AGNewsDataset(train_texts, train_labels, tokenizer)
test_dataset = AGNewsDataset(test_texts, test_labels, tokenizer)

train_dataloader = DataLoader(train_dataset, batch_size=16, shuffle=True)
test_dataloader = DataLoader(test_dataset, batch_size=16, shuffle=False)

# **4. 定义和加载 BERT 分类模型**
class BERTClassifier(nn.Module):
    def __init__(self, model_name, num_labels, pooling="cls"):
        super(BERTClassifier, self).__init__()
        self.bert = ...
        hidden_size = ...
        self.pooling = pooling
        self.attn_score = nn.Linear(hidden_size, 1)
        self.classifier = ...

    def forward(self, input_ids, attention_mask):
        outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask)
        hidden_states = outputs.last_hidden_state  # (batch, seq_len, hidden_size)

        if self.pooling == "cls":
            sentence_embedding = ...
        elif self.pooling == "mean":
            mask = attention_mask.unsqueeze(-1)
            sentence_embedding = ...
        elif self.pooling == "attention":
            # 提示:如果使用 self.attn_score(hidden_states),其输出 shape 是 (batch, seq_len, 1)
            # 通常需要先去掉最后一个维度,再与 attention_mask 对齐
            scores = ...
            scores = scores.masked_fill(attention_mask == 0, -1e9)
            weights = torch.softmax(scores, dim=1).unsqueeze(-1)
            sentence_embedding = ...
        else:
            raise ValueError(f"Unsupported pooling: {self.pooling}")

        logits = self.classifier(sentence_embedding)
        return logits


# **5. 设置优化器和损失函数**
def build_model(pooling):
    model = BERTClassifier(model_name, num_labels=4, pooling=pooling).to(device)
    optimizer = AdamW(model.parameters(), lr=2e-5)
    criterion = nn.CrossEntropyLoss()
    return model, optimizer, criterion


def train_one_epoch(model, optimizer, criterion):
    model.train()
    total_loss = 0
    loop = tqdm(train_dataloader, desc="Training", leave=False)

    for batch in loop:
        input_ids = batch["input_ids"].to(device)
        attention_mask = batch["attention_mask"].to(device)
        labels = batch["labels"].to(device)

        optimizer.zero_grad()
        logits = model(input_ids, attention_mask)
        loss = criterion(logits, labels)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()
        loop.set_postfix(loss=loss.item())

    return total_loss / len(train_dataloader)


def evaluate(model):
    model.eval()
    preds, true_labels = [], []

    with torch.no_grad():
        for batch in test_dataloader:
            input_ids = batch["input_ids"].to(device)
            attention_mask = batch["attention_mask"].to(device)
            labels = batch["labels"].to(device)

            logits = model(input_ids, attention_mask)
            batch_preds = logits.argmax(dim=1)

            preds.extend(batch_preds.cpu().tolist())
            true_labels.extend(labels.cpu().tolist())

    acc = accuracy_score(true_labels, preds)
    return acc, preds, true_labels


results = []
for pooling in ["cls", "mean", "attention"]:
    model, optimizer, criterion = build_model(pooling)

    for epoch in range(3):
        loss = train_one_epoch(model, optimizer, criterion)
        acc, preds, true_labels = evaluate(model)
        print(f"pooling={pooling}, epoch={epoch+1}, loss={loss:.4f}, acc={acc:.4f}")

    results.append({"pooling": pooling, "acc": acc})

print(results)

# 额外要求:结合测试集文本,任选2条分类错误样本,在报告中分析可能原因

训练速度

你如果觉得训练速度慢,可以尝试增大 batch size,不过注意不要炸显存。

思考题

思考题1:为什么使用 mean pooling 时,最好结合 attention_mask 只对有效 token 做平均?

思考题

思考题2:[CLS] 表示和平均池化表示分别更适合什么场景?请结合本实验中的文本分类任务简要分析。

思考题

思考题3:如果你的训练资源有限,你会优先调哪些超参数(例如 max_lengthbatch_size、epoch 数、学习率),为什么?