京东自营 + 国补 iPhone 历史最低价          国家补贴 享8折

从零开始200行python代码实现LLM

前言

大语言模型(LLM)很火,讨论的文章铺天盖地,但对于没有机器学习背景的人来说,看多了只是粗浅了解了一堆概念,疑惑只增不减。

本文尝试从零开始,用python实现一个极简但完整的大语言模型,在过程中把各种概念“具象化”,让大家亲眼看到、亲手写出self-attention机制、transformer模型,亲自感受下训练、推理中会遇到的一些问题。

本文适用范围及目标:

‒✅只需会写基本的python代码;

‒✅尝试实现完整的语言模型(但由于层数、dataset限制,只会写诗词);

‒❌不解释数学、机器学习原理性的知识,只做到“能用”为止;

‒❌不依赖抽象层次高的框架,用到的部分也会做解释;

声明:文章绝大部分内容来自ak大神的nanoGPT[1]。

相关代码都在Github仓库:

simpx/buildyourownllm [2]上,建议先clone下来,并通过pip install torch 安装唯一的依赖后,在仓库目录下运行各个代码体验过程。

动手写代码最容易把抽象的概念具象化,非常建议使用vscode + ipynb的组合调试文中的代码,鉴于篇幅,不额外介绍工具。

本文先介绍“从零基础到Bigram模型”,下一篇文章再介绍“从Bigram模型到LLM”。


先用传统方式实现一个“诗词生成器”

让我们忘记机器学习,用传统思路来实现一个“诗词生成器”。

观察一下我们的数据集 ci.txt ,里面包含了宋和南唐的词,我们的目标是实现一个生成类似诗词的工具。

$ head -n 8 ci.txt
虞美人 李煜
春花秋月何时了,往事知多少?
小楼昨夜又东风,故国不堪回首月明中。
雕栏玉砌应犹在,只是朱颜改。
问君能有几多愁?恰似一江春水向东流。


乌夜啼 李煜
昨夜风兼雨,帘帏飒飒秋声。

词是由一堆字组成的,那么一个简单的想法,我们可以通过计算每个字后面出现各个字的概率。

然后根据这些概率,不断的递归生成“下一个字”,生成的字多了,截断一部分,就是一首词了。

具体思路为:

  • 准备词汇表:将ci.txt 出现的所有字去重,得到我们的词汇表,长度为vocab_size;
  • 统计频率:准备一个vocab_size * vocab_size 的字典,统计每个词后出现别的词的频率;
  • 计算概率,生成新“字”:根据频率计算概率,并随机采样,生成下一个字;

完整的代码如下(带注释版的见simplemodel_with_comments.py[3]):

simplemodel.py

import random


random.seed(42) # 去掉此行,获得随机结果


prompt = "春江"
max_new_token = 100


with open('ci.txt', 'r', encoding='utf-8') as f:
    text = f.read()


chars = sorted(list(set(text)))
vocab_size = len(chars)
stoi = { ch:i for i,ch in enumerate(chars) }
itos = { i:ch for i,ch in enumerate(chars) }
encode = lambda s: [stoi[c] for c in s]
decode = lambda l: ''.join([itos[i] for i in l])


transition = [[0 for _ in range(vocab_size)] for _ in range(vocab_size)]


for i in range(len(text) - 1):
    current_token_id = encode(text[i])[0]
    next_token_id = encode(text[i + 1])[0]
    transition[current_token_id][next_token_id] += 1


generated_token = encode(prompt)


for i in range(max_new_token - 1):
    current_token_id = generated_token[-1]
    logits = transition[current_token_id]
    total = sum(logits)
    logits = [logit / total for logit in logits]
    next_token_id = random.choices(range(vocab_size), weights=logits, k=1)[0]
    generated_token.append(next_token_id)
    current_token_id = next_token_id


print(decode(generated_token))

直接通过python simplemodel.py 即可运行,去掉random.seed(42) 可以看到不同的输出结果。

在我的mac电脑上耗时2秒,效果如下:

$ python simplemodel.py
春江月 张先生疑被。


倦旅。
清歌声月边、莼鲈清唱,尽一卮酒红蕖花月,彩笼里繁蕊珠玑。
只今古。
浣溪月上宾鸿相照。
乞团,烟渚澜翻覆古1
半吐,还在蓬瀛烟沼。
木兰花露弓刀,更任东南楼缥缈。
黄柳,

这像是一首名为“春江月”、作者为“张先生疑被。”的词,但其实我们只是实现了一个“下一个词预测器”。

在代码的眼里,只不过“春”字后面大概率是“江”,而“江”字后面大概率是“月”而已,它不知道什么是词,甚至不知道什么是一首词的开头、结尾。

这个字符序列层面的“意义”,实际上是由读者赋予的。

词汇表 - tokenizer

我们的“词汇表”,相当于LLM里的tokenizer,只不过我们直接使用ci.txt 里出现过的所有字符当做词汇表用。我们的词汇表只有6418个词汇,而真正的LLM有更大的vocab_size,以及更高效的编码,一些常用词组直接对应1个token,比如下面是qwen2.5的tokenizer。

>>> from transformers import AutoTokenizer
>>> tokenizer = AutoTokenizer.from_pretained('Qwen/Qwen2.5-0.5B')
>>> tokenizer.vocab_size
151643
>>> tokenizer.encode("春江花月夜")
[99528, 69177, 99232, 9754, 99530]
>>> tokenizer.encode("阿里巴巴")
[107076]
>>> tokenizer.encode("阿里妈妈")
[102661, 101935]
>>> tokenizer.encode("人工智能")
[104455]
>>> tokenizer.decode([102661, 104455, 101935])
'阿里人工智能妈妈'

qwen2.5使用了一个大小为151643的词汇表,其中常见的词汇“阿里巴巴”、“人工智能”都只对应1个token,而在我们的词汇表里,1个字符永远对应1个token,编码效率较低。

模型、训练、推理

我们刚刚实现的“模型”,实际是就是自然语言N-gram模型中的“Bigram模型”。这是一种基于统计的语言模型,用于预测一个词出现的概率,在这个模型中,假设句子中的每个字只依赖于其前面的一个字。具体的实现就是一个词频字典transition,而所谓的“训练”过程就是遍历所有数据,统计“下一个词”出现的频率。但我们的“推理”过程还是非常像真正的LLM的,步骤如下:

1.我们从transition 中获取下一个token的logits(logits是机器学习中常用的术语,表示最后一层的原始输出值),我们可以把logits[i]简单理解为“下一个token_id是i的得分”,因此logits肯定是长度为vocab_size的字典;

2.获得“得分字典”后,使用[logit / total for logit in logits] 做归一化处理,这是为了下一步更好的做随机采样。在这里我们使用最简单的线性归一,不考虑total为0的情况;

3.根据归一后的“得分字典”,使用random.choices 随机获取一个token id并返回;

4.循环反复,直到获得足够多的token。


进行重构,更加有“机器学习风格”

接下来我们把Bigram模型的实现变得更加“机器学习风格”,以便帮助我们理解后面真实的pytorch代码,有pytorch背景的同学可以直接跳过本节。

完整的代码码如下(带注释版的见simplebigrammodel_with_comments.py[4]):

simplebigrammodel.py

import random
from typing import List


random.seed(42) # 去掉此行,获得随机结果
prompts = ["春江", "往事"]
max_new_token = 100
max_iters = 8000
batch_size = 32
block_size = 8
with open('ci.txt', 'r', encoding='utf-8') as f:
    text = f.read()
class Tokenizer:
    def __init__(self, text: str):
        self.chars = sorted(list(set(text)))
        self.vocab_size = len(self.chars)
        self.stoi = {ch: i for i, ch in enumerate(self.chars)}
        self.itos = {i: ch for i, ch in enumerate(self.chars)}
    
    def encode(self, s: str) -> List[int]:
        return [self.stoi[c] for c in s]
    
    def decode(self, l: List[int]) -> str:
        return''.join([self.itos[i] for i in l])
class BigramLanguageModel():
    def __init__(self, vocab_size: int):
        self.vocab_size = vocab_size
        self.transition = [[0 for _ in range(vocab_size)] 
                          for _ in range(vocab_size)]
        
    def __call__(self, x):
        # 方便直接调用model(x)
        return self.forward(x)
    
    def forward(self, idx: List[List[int]]) -> List[List[List[float]]]:
        '''
        输入idx,是一个二维数组,如[[1, 2, 3],
                                  [4, 5, 6]]
        表示同时希望推理的多个序列
        输出是一个三维数组,如[[[0.1, 0.2, 0.3, .. (vocab_size)],
                                [0.4, 0.5, 0.6, .. (vocab_size)],
                                [0.7, 0.8, 0.9, .. (vocab_size)]],
                               [[0.2, 0.3, 0.4, .. (vocab_size)],
                                [0.5, 0.6, 0.7, .. (vocab_size)],
                                [0.8, 0.9, 1.0, .. (vocab_size)]]]
        
        '''
        B = len(idx)  # 批次大小
        T = len(idx[0])  # 每一批的序列长度
        
        logits = [
            [[0.0 for _ in range(self.vocab_size)] 
             for _ in range(T)]
            for _ in range(B)
        ]
        
        for b in range(B):
            for t in range(T):
                current_token = idx[b][t]
                # 计算了每一个token的下一个token的概率
                logits[b][t] = self.transition[current_token]
                
        return logits
    def generate(self, idx: List[List[int]], max_new_tokens: int) -> List[int]:
        for _ in range(max_new_tokens):
            logits_batch = self(idx)
            for batch_idx, logits in enumerate(logits_batch):
                # 我们计算了每一个token的下一个token的概率
                # 但实际上我们只需要最后一个token的“下一个token的概率”
                logits = logits[-1]
                total = max(sum(logits),1)
                # 归一化
                logits = [logit / total for logit in logits]
                # 根据概率随机采样
                next_token = random.choices(
                    range(self.vocab_size),
                    weights=logits,
                    k=1
                )[0]
                idx[batch_idx].append(next_token)
        return idx
    
def get_batch(tokens, batch_size, block_size):
    '''
    随机获取一批数据x和y用于训练
    x和y都是二维数组,可以用于并行训练
    其中y数组内的每一个值,都是x数组内对应位置的值的下一个值
    格式如下:
    x = [[1, 2, 3],
         [9, 10, 11]]
    y = [[2, 3, 4],
         [10, 11, 12]]
    '''
    ix = random.choices(range(len(tokens) - block_size), k=batch_size)
    x, y = [], []
    for i in ix:
        x.append(tokens[i:i+block_size])
        y.append(tokens[i+1:i+block_size+1])
    return x, y
tokenizer = Tokenizer(text)
vocab_size = tokenizer.vocab_size
tokens = tokenizer.encode(text)
model = BigramLanguageModel(vocab_size)
# 训练
for iter in range(max_iters):
    x_batch, y_batch = get_batch(tokens, batch_size, block_size)
    for i in range(len(x_batch)):
        for j in range(len(x_batch[i])):
            x = x_batch[i][j]
            y = y_batch[i][j]
            model.transition[x][y] += 1
prompt_tokens = [tokenizer.encode(prompt) for prompt in prompts]
# 推理
result = model.generate(prompt_tokens, max_new_token)
# decode
for tokens in result:
    print(tokenizer.decode(tokens))
    print('-'*10)

虽然有100多行代码,但实际上功能和上一个50行代码几乎是一样的,稍微运行、调试一下就能明白。

直接通过python simplebigrammodel.py 即可运行,这一次会生成2个字符序列:

$ python simplebigrammodel.py
春江红紫霄效颦。


怎。
兰修月。
两个事对西风酒伴寄我登临,看雪惊起步,总不与泪满南园春来。
最关上阅。
信断,名姝,夜正坐认旧武仙 朱弦。


岁,回。




看一丝竹。
愿皇受风,当。


妆一笑时,不堪
----------
往事多闲田舍、十三楚珪
酒困不须紫芝兰花痕皱,青步虹。
暗殿人物华高层轩者,临江渌池塘。
三峡。
天、彩霞冠
燕翻云垂杨、一声羌笛罢瑶觥船窗幽园春生阵。
长桥。
无恙,中有心期。


开处。
燕姹绿遍,烂□
----------

解释一下这100多行代码的实现:

机器学习风格的一些约定

我们用Tokenizer 类封装了词汇表,以便它能像qwen的词汇表一样被使用。

同时,我们实现了一个BigramLanguageModel 类,这模仿pytorch里的nn.Module 写法,即:

1.参数在__init__ 中初始化;

2.推理在forward 函数中实现,并通过__call__ 允许对象被直接调用;

3.序列生成在generate 函数中实现;

最后,我们修改了数据加载的机制,如下:

def get_batch(tokens, batch_size, block_size):
    ix = random.choices(range(len(tokens) - block_size), k=batch_size)
    x, y = [], []
    for i in ix:
        x.append(tokens[i:i+block_size])
        y.append(tokens[i+1:i+block_size+1])
    return x, y

每次调用get_batch 的时候,会随机返回两份数据,其中y 数组中的每一个token,都是x 数组内对应位置的token的下一个token。采用这样的写法,是为了方便后续操作。

批处理in,批处理out

这一个版本最难懂的地方,是数据都以多维数组的方式呈现,连推理结果返回的都是2个。

实际上,我们这里的“多维数组”,就是机器学习中的“张量”(Tensor),是为了最终方便GPU处理而准备的。

张量(Tensor)是数学和物理学中用于表示多维数据的对象,广泛应用于机器学习、深度学习和计算机视觉等领域。在深度学习框架(如 TensorFlow 和 PyTorch)中,张量是数据的基本结构。

而我们代码中低效的for循环,未来在GPU中都会被高效的并行计算。

我们先以传统思维来仔细看一下forward 函数的实现,以进一步理解“张量”和“批处理”。

    def forward(self, idx: List[List[int]]) -> List[List[List[float]]]:
        B = len(idx)  # 批次大小
        T = len(idx[0])  # 每一批的序列长度
        
        logits = [
            [[0.0for _ in range(self.vocab_size)] 
             for _ in range(T)]
            for _ in range(B)
        ]
        
        for b in range(B):
            for t in range(T):
                current_token = idx[b][t]
                # 计算了每一个token的下一个token的概率
                logits[b][t] = self.transition[current_token]
                
        return logits

forward 函数的入参是一个大小为B * T的二维数组,按照机器学习领域的说法,就是一个形状为(B, T)的“张量”,表示输入了“B”批次的数据,每个批次包含“T”个token。

这里B、T、C都是机器学习里的常用变量名,B(Batch Size)代表批量大小、T(Time Steps or Sequence Length)对于序列数据来说代表序列的长度、C(Channels)在图像处理中代表通道数,在语言模型中可以表示特征维度。

返回值logits 是一个形状为(B, T, C)的张量(C等于vocab_size),它表示了“每个批次”的序列中,“每个token”的下一个token的频率。这么说起来很绕,其实只要想象成:“所有B*T个数的token,都有一张独立的表,表中记录了下一个出现的token是X的频率”。

logits 的大小为B * T * C,由于我们是Bigram模型,每个token的概率只和它上一个token有关,所以实际上我们只需要计算批次中最后一个token的logit就可以了,但为了和以后的模型统一,依旧保留了这些冗余计算。

好消息,我们现在已经有了一个能跑的玩具“模型”,它能根据概率预测下一个词,但却缺乏了真正的训练过程。

坏消息,在实现真正的机器学习之前,我们还是绕不开pytorch。不过幸运的是,我们只需要做到“知其然”即可。


5分钟简明pytorch教程

PyTorch 是一个开源的深度学习库,提供一系列非常方便的基础数据结构和函数,简化我们的操作。

下面是一个使用pytorch实现线性回归的简单例子:

pytorch_5min.py

import torch
from torch import nn
from torch.nn import functional as F


torch.manual_seed(42) # 随机数种子,方便复现


# 判断环境中是否有GPU
device = 'cuda'if torch.cuda.is_available() else'mps'if torch.mps.is_available() else'cpu'
print(f"Using {device} device")


# 1. 创建tensor演示
x = torch.tensor([1.0, 2.0, 3.0])
y = torch.tensor([2.0, 4.0, 6.0])


# 2. 基本运算演示
print(x + y)                # 加法: tensor([3., 6., 9.])
print(x * y)                # 点乘: tensor([2., 8., 18.])
print(torch.matmul(x, y))   # 矩阵乘法: tensor(28.)
print(x @ y)                # 另一种矩阵乘写法: tensor(28.)
print(x.shape)              # tensor的形状: torch.Size([3])


# 3. 定义模型
class SimpleNet(nn.Module):
    def __init__(self):
        super().__init__()
        self.linear = nn.Linear(1, 1)  # 输入维度=1,输出维度=1
    
    def forward(self, x):
        return self.linear(x)


# 4. 生成训练数据
# 真实关系: y = 2x + 1
x_train = torch.rand(100, 1) * 10  # 生成 0-10 之间的随机数
y_train = 2 * x_train + 1 + torch.randn(100, 1) * 0.1  # 真实函数:y = 2x + 1 加上一些噪声
# 将数据移动到指定设备
x_train = x_train.to(device)
y_train = y_train.to(device)


# 5. 创建模型和优化器
model = SimpleNet().to(device)
optimizer = torch.optim.SGD(model.parameters(), lr=0.01)
criterion = nn.MSELoss()


# 6. 训练循环
epochs = 5000


print("\n训练开始...")
for epoch in range(epochs):
    # 前向传播,预测结果
    y_pred = model(x_train)
    
    # 计算预测值和真实值之间的损失
    loss = criterion(y_pred, y_train)
    
    # 反向传播,修改模型参数
    optimizer.zero_grad() # 清除旧的梯度
    loss.backward() # 计算新的梯度 
    optimizer.step() # 更新参数:参数 -= 学习率 * 梯度
    
    if (epoch + 1) % 100 == 0:
        w = model.linear.weight.item()
        b = model.linear.bias.item()
        print(f'Epoch [{epoch+1}/{epochs}], Loss: {loss.item():.4f}, w: {w:.2f}, b: {b:.2f}')


# 7. 打印结果
w = model.linear.weight.item()
b = model.linear.bias.item()
print(f'\n训练完成!')
print(f'学习到的函数: y = {w:.2f}x + {b:.2f}')
print(f'实际函数: y = 2.00x + 1.00')


# 8. 测试模型
test_x = torch.tensor([[0.0], [5.0], [10.0]]).to(device)
with torch.no_grad():
    test_y = model(test_x)
    print("\n预测结果:")
    for x, y in zip(test_x, test_y):
        print(f'x = {x.item():.1f}, y = {y.item():.2f}')

通过python pytorch_5min.py 即可运行:

$ python pytorch_5min.py 
Using mps device
tensor([3., 6., 9.])
tensor([ 2.,  8., 18.])
tensor(28.)
tensor(28.)
torch.Size([3])


训练开始...
Epoch [100/5000], Loss: 0.0988, w: 2.09, b: 0.41
Epoch [200/5000], Loss: 0.0420, w: 2.05, b: 0.64
...
Epoch [5000/5000], Loss: 0.0066, w: 2.00, b: 1.02


训练完成!
学习到的函数: y = 2.00x + 1.02
实际函数: y = 2.00x + 1.00


预测结果:
x = 0.0, y = 1.02
x = 5.0, y = 11.00
x = 10.0, y = 20.98

这个例子中,最特别的是有真正的“训练”过程,“训练”究竟是什么?我们经常听到的“反向传播”、“梯度下降”、“学习率”又是什么?

鉴于这只是5分钟教程,我们只要记住后面我们所有的机器学习代码都是这样的结构即可。

tensor操作

这一部分详见代码,看完代码后才发现,大学时候的《线性代数》课程是多么重要。

这里最值得注意的是“矩阵相乘”,即“点积”、matmul操作,简写为“@”符号,是后面self-attention机制的核心。

矩阵乘还经常用作张量形状的变换,如形状为(B, T, embd)的张量和形状为(embd, C)的张量相乘,结果为(B, T, C)的张量 —— 这一点也经常被用到。

此外,tensor.to(device) 可以把tensor数据移动到指定的设备,如GPU。

模型、神经网络的layer

我们的模型内部只有一个简单的线性层nn.Linear(1, 1) ,它输入输出都是一维张量。(1,1)的线性层实际上内部就是一个线性方程,对于输入任何数字x,它会输出x * w + b,实际上神经网络中的“layer”就是内含了一系列参数、可被训练的单元。通过输出nn.Linear 可以更清晰的看出实现。

>>> layer = nn.Linear(1, 1)
>>> layer.weight.item(), layer.bias.item()
(0.8262009620666504, 0.9049515724182129)
>>> torch.tensor([[1.0],[2.]])
tensor([[1.],
        [2.]])
>>> layer(_)
tensor([[1.7312],
        [2.5574]], grad_fn=<AddmmBackward0>)

手动计算一下就能发现,实际上layer的输出值,就是输入x * weight + bias的结果。

其中grad_fn 是pytorch用来反向传播的关键,pytorch记住了这个tensor是怎么计算出来的,在后面的反向传播中被使用,对pytorch用户不可见。

反向传播和梯度下降

5分钟的教程只需要我们先硬记住一点,机器学习的“训练”就是这样一个过程:

1.先“前向传播”,计算出输出(如Linear层输出结果)。

2.再“反向传播”。

a.通过“损失函数”计算出模型的输出和真实数据之间的“损失值”loss(如例子中的MSELoss损失函数);

b.计算“梯度”,利用损失函数对输出层的梯度进行计算,接着向前传播(反向传播)计算前一层的梯度,直到输入层(这一步pytorch能自动处理,不需要我们关心。可以简单理解为,“梯度”就是损失函数对各个参数的导数。核心目的就是为了计算出“如何调整w和b的值来减少损失”);

c.更新参数,“梯度”是一个向量,把“梯度”乘上我们的“学习率”再加上原来的参数,就是我们新的参数了。如果学习率大,那么每次更新的多,学习率小,每次更新的就少。“梯度下降”,就是我们通过迭代更新参数,以寻找到损失函数最小的过程;

这中间最复杂的求导、算梯度、更新每一层参数的操作,pytorch都自动完成了(前面看到的grad_fn 就是用于这个过程),我们只需要知道在这个结构下,选择不同的优化器算法、损失函数实现、模型结构即可,剩下的交给pytorch。

而“推理”,就只有“前向传播”,计算出输出即可。


实现一个真正的Bigram模型

5分钟“精通”完pytorch,接下来我们来实现真正的pytorch版Bigram模型。

首先,我们把前面的simplebigrammodel.py ,用pytorch的tensor数据结构改造成一个新版本,代码见simplebigrammodel_torch.py [5],这里不再展开。通过这份代码,能在熟悉算法的基础上,进一步深刻理解tensor。

然后,我们基于它进一步实现Bigram模型,后续我们的代码都将基于这个为基础,逐渐改出完整的gpt。

完整代码如下,也可以看babygpt_v1.py[6]。

babygpt_v1.py

展开阅读全文

本文系作者在时代Java发表,未经许可,不得转载。

如有侵权,请联系nowjava@qq.com删除。

编辑于

关注时代Java

关注时代Java