Different categories of fine-tuning 不同类别的微调#
微调语言模型最常见的方法是指令微调与分类微调。指令微调涉及在一组任务上训练语言模型,使用特定的指令来提高其理解和执行自然语言提示中描述的任务的能力

Preparing the dataset 准备数据集#
import urllib.request
import zipfile
import os
from pathlib import Path
url = "https://archive.ics.uci.edu/static/public/228/sms+spam+collection.zip"
zip_path = "sms_spam_collection.zip"
extracted_path = "sms_spam_collection"
data_file_path = Path(extracted_path) / "SMSSpamCollection.tsv"
def download_and_unzip_spam_data(
url, zip_path, extracted_path, data_file_path):
if data_file_path.exists():
print(f"{data_file_path} already exists. Skipping download "
"and extraction."
)
return
with urllib.request.urlopen(url) as response: #1 下载文件
with open(zip_path, "wb") as out_file:
out_file.write(response.read())
with zipfile.ZipFile(zip_path, "r") as zip_ref: #2 解压
zip_ref.extractall(extracted_path)
original_file_path = Path(extracted_path) / "SMSSpamCollection"
os.rename(original_file_path, data_file_path) #3 保存文件
print(f"File downloaded and saved as {data_file_path}")
download_and_unzip_spam_data(url, zip_path, extracted_path, data_file_path)数据集以制表符分隔的文本文件形式保存在 sms_spam_collection 文件夹中的 SMSSpamCollection.tsv,使用 PD 阅读
import pandas as pd
df = pd.read_csv(
data_file_path, sep="\t", header=None, names=["Label", "Text"]
)
df 因为数据不是很均衡,使用下面的函数将其均衡
def create_balanced_dataset(df):
num_spam = df[df["Label"] == "spam"].shape[0] #1
ham_subset = df[df["Label"] == "ham"].sample(
num_spam, random_state=123
) #2
balanced_df = pd.concat([
ham_subset, df[df["Label"] == "spam"]
]) #3
return balanced_df
balanced_df = create_balanced_dataset(df)
print(balanced_df["Label"].value_counts())和拆分训练集和测试集
def random_split(df, train_frac, validation_frac):
df = df.sample(
frac=1, random_state=123
).reset_index(drop=True) #1
train_end = int(len(df) * train_frac) #2
validation_end = train_end + int(len(df) * validation_frac)
#3
train_df = df[:train_end]
validation_df = df[train_end:validation_end]
test_df = df[validation_end:]
return train_df, validation_df, test_df
train_df, validation_df, test_df = random_split(
balanced_df, 0.7, 0.1) #4保存到本地
train_df.to_csv("train.csv", index=None)
validation_df.to_csv("validation.csv", index=None)
test_df.to_csv("test.csv", index=None)Creating data loaders 创建数据加载器#
此前,我们利用滑动窗口技术生成大小统一的文本块,然后将这些文本块分组为批次,以便更高效地进行模型训练。每个文本块都作为一个单独的训练实例。然而,我们现在处理的是一个垃圾邮件数据集,其中包含长度各异的文本消息。为了像处理文本块那样对这些消息进行分批,我们主要有两个选择:
- 将所有消息截断为数据集中或批次中最短消息的长度:简单,但是不够高效
- 将所有消息填充到数据集中或批次中最长消息的长度:更高效
我们使用 “<|endoftext|>” 作为填充标记。
import torch
from torch.utils.data import Dataset
class SpamDataset(Dataset):
def __init__(self, csv_file, tokenizer, max_length=None,
pad_token_id=50256):
self.data = pd.read_csv(csv_file) #1 读取 CSV 文件
self.encoded_texts = [
tokenizer.encode(text) for text in self.data["Text"] #2 编码文本
]
if max_length is None:
self.max_length = self._longest_encoded_length()
else:
self.max_length = max_length
self.encoded_texts = [
encoded_text[:self.max_length]
for encoded_text in self.encoded_texts
]
#3 填充剩余的
self.encoded_texts = [
encoded_text + [pad_token_id] *
(self.max_length - len(encoded_text))
for encoded_text in self.encoded_texts
]
def __getitem__(self, index):
encoded = self.encoded_texts[index]
label = self.data.iloc[index]["Label"]
label = 1 if label == "spam" else 0 # 书中缺少了这段会导致 invalid data type 'str' 错误
return (
torch.tensor(encoded, dtype=torch.long),
torch.tensor(label, dtype=torch.long)
)
def __len__(self):
return len(self.data)
def _longest_encoded_length(self):
max_length = 0
for encoded_text in self.encoded_texts:
encoded_length = len(encoded_text)
if encoded_length > max_length:
max_length = encoded_length
return max_length然后加载训练数据
train_dataset = SpamDataset(
csv_file="train.csv",
max_length=None,
tokenizer=tokenizer
)
val_dataset = SpamDataset(
csv_file="validation.csv",
max_length=train_dataset.max_length,
tokenizer=tokenizer
)
test_dataset = SpamDataset(
csv_file="test.csv",
max_length=train_dataset.max_length,
tokenizer=tokenizer
)准备测试 Load
from torch.utils.data import DataLoader
num_workers = 0 #1
batch_size = 8
torch.manual_seed(123)
train_loader = DataLoader(
dataset=train_dataset,
batch_size=batch_size,
shuffle=True,
num_workers=num_workers,
drop_last=True,
)
val_loader = DataLoader(
dataset=val_dataset,
batch_size=batch_size,
num_workers=num_workers,
drop_last=False,
)
test_loader = DataLoader(
dataset=test_dataset,
batch_size=batch_size,
num_workers=num_workers,
drop_last=False,
)Initializing a model with pretrained weights 使用预训练权重初始化模型#

import pandas as pd
import tiktoken
import torch
from torch.utils.data import Dataset, DataLoader
from ch04.ch04 import generate_text_simple
from ch05.gpt_download import download_and_load_gpt2
from ch05.pretrain import GPTModel, load_weights_into_gpt
from ch05.pretrain import text_to_token_ids, token_ids_to_text
class SpamDataset(Dataset):
def __init__(self, csv_file, tokenizer, max_length=None,
pad_token_id=50256):
self.data = pd.read_csv(csv_file) # 1 读取 CSV 文件
self.encoded_texts = [
tokenizer.encode(text) for text in self.data["Text"] # 2 编码文本
]
if max_length is None:
self.max_length = self._longest_encoded_length()
else:
self.max_length = max_length
self.encoded_texts = [
encoded_text[:self.max_length]
for encoded_text in self.encoded_texts
]
# 3 填充剩余的
self.encoded_texts = [
encoded_text + [pad_token_id] *
(self.max_length - len(encoded_text))
for encoded_text in self.encoded_texts
]
def __getitem__(self, index):
encoded = self.encoded_texts[index]
label = self.data.iloc[index]["Label"]
return (
torch.tensor(encoded, dtype=torch.long),
torch.tensor(label, dtype=torch.long)
)
def __len__(self):
return len(self.data)
def _longest_encoded_length(self):
max_length = 0
for encoded_text in self.encoded_texts:
encoded_length = len(encoded_text)
if encoded_length > max_length:
max_length = encoded_length
return max_length
if __name__ == '__main__':
tokenizer = tiktoken.get_encoding("gpt2")
print(tokenizer.encode("<|endoftext|>", allowed_special={"<|endoftext|>"}))
train_dataset = SpamDataset(
csv_file="train.csv",
max_length=None,
tokenizer=tokenizer
)
val_dataset = SpamDataset(
csv_file="validation.csv",
max_length=train_dataset.max_length,
tokenizer=tokenizer
)
test_dataset = SpamDataset(
csv_file="test.csv",
max_length=train_dataset.max_length,
tokenizer=tokenizer
)
num_workers = 0 # 1
batch_size = 8
torch.manual_seed(123)
train_loader = DataLoader(
dataset=train_dataset,
batch_size=batch_size,
shuffle=True,
num_workers=num_workers,
drop_last=True,
)
val_loader = DataLoader(
dataset=val_dataset,
batch_size=batch_size,
num_workers=num_workers,
drop_last=False,
)
test_loader = DataLoader(
dataset=test_dataset,
batch_size=batch_size,
num_workers=num_workers,
drop_last=False,
)
CHOOSE_MODEL = "gpt2-small (124M)"
INPUT_PROMPT = "Every effort moves"
BASE_CONFIG = {
"vocab_size": 50257, # 1
"context_length": 1024, # 2
"drop_rate": 0.0, # 3
"qkv_bias": True # 4
}
model_configs = {
"gpt2-small (124M)": {"emb_dim": 768, "n_layers": 12, "n_heads": 12},
"gpt2-medium (355M)": {"emb_dim": 1024, "n_layers": 24, "n_heads": 16},
"gpt2-large (774M)": {"emb_dim": 1280, "n_layers": 36, "n_heads": 20},
"gpt2-xl (1558M)": {"emb_dim": 1600, "n_layers": 48, "n_heads": 25},
}
BASE_CONFIG.update(model_configs[CHOOSE_MODEL])
model_size = CHOOSE_MODEL.split(" ")[-1].lstrip("(").rstrip(")")
settings, params = download_and_load_gpt2(
model_size=model_size, models_dir="gpt2"
)
model = GPTModel(BASE_CONFIG)
load_weights_into_gpt(model, params)
model.eval()
text_1 = "Every effort moves you"
token_ids = generate_text_simple(
model=model,
idx=text_to_token_ids(text_1, tokenizer),
max_new_tokens=15,
context_size=BASE_CONFIG["context_length"]
)
print(token_ids_to_text(token_ids, tokenizer))Adding a classification head 添加分类头#

我们用一个较小的输出层替换原始输出层,该输出层将隐藏表示映射到 50,257 个词汇表,而新的输出层将映射到两个类: 0 (“非垃圾邮件”)和 1 (“垃圾邮件”)
当前的模型结构是
GPTModel(
(tok_emb): Embedding(50257, 768)
(pos_emb): Embedding(1024, 768)
(drop_emb): Dropout(p=0.0, inplace=False)
(trf_blocks): Sequential(
... 省略一些重复结构
(11): TransformerBlock(
(att): MultiHeadAttention(
(W_query): Linear(in_features=768, out_features=768, bias=True)
(W_key): Linear(in_features=768, out_features=768, bias=True)
(W_value): Linear(in_features=768, out_features=768, bias=True)
(out_proj): Linear(in_features=768, out_features=768, bias=True)
(dropout): Dropout(p=0.0, inplace=False)
)
(ff): FeedForward(
(layers): Sequential(
(0): Linear(in_features=768, out_features=3072, bias=True)
(1): GELU()
(2): Linear(in_features=3072, out_features=768, bias=True)
)
)
(norm1): LayerNorm()
(norm2): LayerNorm()
(drop_resid): Dropout(p=0.0, inplace=False)
)
)
(final_norm): LayerNorm()
(out_head): Linear(in_features=768, out_features=50257, bias=False)
)GPTModel 由嵌入层组成,其后是 12 个相同的 Transformer 模块,接着是最终的 LayerNorm 和输出层 out_head
在基于神经网络的语言模型中,较低层通常捕捉适用于广泛任务和数据集的基本语言结构和语义。因此,仅对最后几层(即靠近输出的层)进行微调,这些层更针对细微的语言模式和特定任务的特征,通常就足以使模型适应新任务。
# 冻结训练
for param in model.parameters():
param.requires_grad = False
# 替换输出层( model.out_head ),该层最初将层输入映射到 50257 维,降为 2 维
torch.manual_seed(123)
num_classes = 2
model.out_head = torch.nn.Linear(
in_features=BASE_CONFIG["emb_dim"],
out_features=num_classes
)这个新的 model.out_head 输出层默认将其 requires_grad 属性设置为 True ,这意味着它是模型中在训练期间唯一会更新的层。但是微调其他层可以显著提高模型的预测性能。我们还将最后一个 Transformer 模块以及连接该模块与输出层的最终 LayerNorm 模块配置为可训练的。

for param in model.trf_blocks[-1].parameters():
param.requires_grad = True
for param in model.final_norm.parameters():
param.requires_grad = True我们针对只对最后一个输出词元特别感兴趣,因为注意力机制下,序列中的最后一个词元积累的信息最多,将最后一个标记转换为类别标签预测,并计算模型的初始预测准确率。随后,我们将针对垃圾邮件分类任务对模型进行微调。
Calculating the classification loss and accuracy 计算分类损失和准确率#

定义损失计算函数
def calc_accuracy_loader(data_loader, model, device, num_batches=None):
model.eval()
correct_predictions, num_examples = 0, 0
if num_batches is None:
num_batches = len(data_loader)
else:
num_batches = min(num_batches, len(data_loader))
for i, (input_batch, target_batch) in enumerate(data_loader):
if i < num_batches:
input_batch = input_batch.to(device)
target_batch = target_batch.to(device)
with torch.no_grad():
logits = model(input_batch)[:, -1, :] #1
predicted_labels = torch.argmax(logits, dim=-1)
num_examples += predicted_labels.shape[0]
correct_predictions += (
(predicted_labels == target_batch).sum().item()
)
else:
break
return correct_predictions / num_examples使用如下执行测试
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
torch.manual_seed(123)
train_accuracy = calc_accuracy_loader(
train_loader, model, device, num_batches=10
)
val_accuracy = calc_accuracy_loader(
val_loader, model, device, num_batches=10
)
test_accuracy = calc_accuracy_loader(
test_loader, model, device, num_batches=10
)
print(f"Training accuracy: {train_accuracy*100:.2f}%")
print(f"Validation accuracy: {val_accuracy*100:.2f}%")
print(f"Test accuracy: {test_accuracy*100:.2f}%")损失函数定于如下
def calc_loss_batch(input_batch, target_batch, model, device):
input_batch = input_batch.to(device)
target_batch = target_batch.to(device)
logits = model(input_batch)[:, -1, :] #1
loss = torch.nn.functional.cross_entropy(logits, target_batch)
return loss
def calc_loss_loader(data_loader, model, device, num_batches=None):
total_loss = 0.
if len(data_loader) == 0:
return float("nan")
elif num_batches is None:
num_batches = len(data_loader)
else: #1
num_batches = min(num_batches, len(data_loader))
for i, (input_batch, target_batch) in enumerate(data_loader):
if i < num_batches:
loss = calc_loss_batch(
input_batch, target_batch, model, device
)
total_loss += loss.item()
else:
break
return total_loss / num_batches
with torch.no_grad(): #1
train_loss = calc_loss_loader(
train_loader, model, device, num_batches=5
)
val_loss = calc_loss_loader(val_loader, model, device, num_batches=5)
test_loss = calc_loss_loader(test_loader, model, device, num_batches=5)
print(f"Training loss: {train_loss:.3f}")
print(f"Validation loss: {val_loss:.3f}")
print(f"Test loss: {test_loss:.3f}")Fine-tuning the model on supervised data 在监督数据上微调模型#
训练函数对预训练的 LLM 进行微调,以提高其垃圾邮件分类准确率。

训练循环与我们用于预训练的整体训练循环相同;唯一的区别在于,我们计算分类准确率,而不是生成样本输出来评估模型。
def train_classifier_simple(
model, train_loader, val_loader, optimizer, device,
num_epochs, eval_freq, eval_iter):
train_losses, val_losses, train_accs, val_accs = [], [], [], [] #1
examples_seen, global_step = 0, -1
for epoch in range(num_epochs): #2
model.train() #3
for input_batch, target_batch in train_loader:
optimizer.zero_grad() #4
loss = calc_loss_batch(
input_batch, target_batch, model, device
)
loss.backward() #5
optimizer.step() #6
examples_seen += input_batch.shape[0] #7
global_step += 1
#8
if global_step % eval_freq == 0:
train_loss, val_loss = evaluate_model(
model, train_loader, val_loader, device, eval_iter)
train_losses.append(train_loss)
val_losses.append(val_loss)
print(f"Ep {epoch+1} (Step {global_step:06d}): "
f"Train loss {train_loss:.3f}, "
f"Val loss {val_loss:.3f}"
)
#9
train_accuracy = calc_accuracy_loader(
train_loader, model, device, num_batches=eval_iter
)
val_accuracy = calc_accuracy_loader(
val_loader, model, device, num_batches=eval_iter
)
print(f"Training accuracy: {train_accuracy*100:.2f}% | ", end="")
print(f"Validation accuracy: {val_accuracy*100:.2f}%")
train_accs.append(train_accuracy)
val_accs.append(val_accuracy)
return train_losses, val_losses, train_accs, val_accs, examples_seendef train_classifier_simple(
model, train_loader, val_loader, optimizer, device,
num_epochs, eval_freq, eval_iter):
train_losses, val_losses, train_accs, val_accs = [], [], [], [] #1
examples_seen, global_step = 0, -1
for epoch in range(num_epochs): #2
model.train() #3
for input_batch, target_batch in train_loader:
optimizer.zero_grad() #4
loss = calc_loss_batch(
input_batch, target_batch, model, device
)
loss.backward() #5
optimizer.step() #6
examples_seen += input_batch.shape[0] #7
global_step += 1
#8
if global_step % eval_freq == 0:
train_loss, val_loss = evaluate_model(
model, train_loader, val_loader, device, eval_iter)
train_losses.append(train_loss)
val_losses.append(val_loss)
print(f"Ep {epoch+1} (Step {global_step:06d}): "
f"Train loss {train_loss:.3f}, "
f"Val loss {val_loss:.3f}"
)
#9
train_accuracy = calc_accuracy_loader(
train_loader, model, device, num_batches=eval_iter
)
val_accuracy = calc_accuracy_loader(
val_loader, model, device, num_batches=eval_iter
)
print(f"Training accuracy: {train_accuracy*100:.2f}% | ", end="")
print(f"Validation accuracy: {val_accuracy*100:.2f}%")
train_accs.append(train_accuracy)
val_accs.append(val_accuracy)
return train_losses, val_losses, train_accs, val_accs, examples_seen和
import time
start_time = time.time()
torch.manual_seed(123)
optimizer = torch.optim.AdamW(model.parameters(), lr=5e-5, weight_decay=0.1)
num_epochs = 5
train_losses, val_losses, train_accs, val_accs, examples_seen = \
train_classifier_simple(
model, train_loader, val_loader, optimizer, device,
num_epochs=num_epochs, eval_freq=50,
eval_iter=5
)
end_time = time.time()
execution_time_minutes = (end_time - start_time) / 60
print(f"Training completed in {execution_time_minutes:.2f} minutes.")Using the LLM as a spam classifier 将 LLM 用作垃圾邮件分类器#
def classify_review(
text, model, tokenizer, device, max_length=None,
pad_token_id=50256):
model.eval()
input_ids = tokenizer.encode(text) #1
supported_context_length = model.pos_emb.weight.shape[1]
input_ids = input_ids[:min( #2
max_length, supported_context_length
)]
input_ids += [pad_token_id] * (max_length - len(input_ids)) #3
input_tensor = torch.tensor(
input_ids, device=device
).unsqueeze(0) #4
with torch.no_grad(): #5
logits = model(input_tensor)[:, -1, :] #6
predicted_label = torch.argmax(logits, dim=-1).item()
return "spam" if predicted_label == 1 else "not spam" #7Summary 总结#
- 分类微调涉及通过一个小型分类层替换 LLM 的输出层。
- 在将短信分类为“垃圾短信”或“非垃圾短信”的情况下,新的分类层仅由两个输出节点组成。此前,我们使用的输出节点数量与词汇表中唯一词元的数量相等(即 50256 个)。
- 与预训练中预测文本中的下一个词元不同,分类微调训练模型输出正确的类别标签,例如 “垃圾邮件” 或 “非垃圾邮件”。