深度学习基础理论————分布式训练(模型并行/数据并行/流水线并行/张量并行)
主要介绍Pytorch分布式训练代码以及原理以及一些简易的Demo代码模型并行 是指将一个模型的不同部分(如层或子模块)分配到不同的设备上运行。它通常用于非常大的模型,这些模型无法完整地放入单个设备的内存中。在模型并行中,数据会顺序通过各个层,即一层处理完所有数据之后再传递给下一层。这意味着,在任何时刻,只有当前正在处理的数据位于相应的设备上。
graph LR A[输入数据] --> B B --> C C --> D D --> E[输出数据]
流水线并行 是一种特殊的模型并行形式,它不仅拆分模型的不同层,还将输入数据流分为多个微批次(micro-batches)。这样可以实现多批次数据的同时处理,提高了设备利用率和训练效率。比如\(t_0\)时刻再\(layer_0\)处理\(data_0\)在\(t_1\)时刻会有\(layer_0\)处理\(data_1\)并且\(layer_1\)处理\(data_0\)
graph LR subgraph 时间点 t0 A --> B end subgraph 时间点 t1 B --> C D --> B end subgraph 时间点 t2 C --> E B --> F end subgraph 时间点 t3 E --> G F --> H end
数据并行 是最常用的分布式训练策略之一,它通过复制整个模型到多个设备上来实现。每个设备处理一小批数据,并在每次迭代结束时同步梯度。这种方法简单且易于实现,适用于大多数情况。
graph LR A[输入数据] --> B{Split} B --> C B --> D B --> E C --> F D --> F E --> F F --> G
张量并行 是一种更精细的并行策略,将矩阵运算中\(x,A\)拆分,并分配到不同的设备上。这使得单个层可以在多个设备上并行执行,从而提高了训练速度。根据拆分的方式不同,可以分为列并行(Column-wise Parallelism)和行并行(Row-wise Parallelism)等。
graph LR A[输入张量 X] --> B{Split} B --> C B --> D C --> E D --> F E --> G{All-Gather/All-Reduce} F --> G G --> H[最终输出]
1、并行训练
Image From: https://github.com/hkproj/pytorch-transformer-distributed
一、数据并行(DP/DDP(主要介绍DDP))
核心思想: 将输入数据拆分成多个小批次(mini-batch),分别分配到多个设备(如 GPU)上进行计算,每个设备计算一个小批次的梯度,最后在主设备上合并梯度并更新模型参数。
[*]二、模型并行*
核心思想: 将模型的不同部分放在不同设备上,适用于模型过大无法单个设备存储的情况(如 GPT-3)。
实现方法: 1、手动将模型的不同部分分配到不同设备。2、前向传播时按设备顺序计算,反向传播时按相反顺序回传梯度。
1、数据并行
DP流程
缺点也是显而易见:
[*]1、数据副本会冗余(因为要把数据先复制,然后进行分布);
[*]2、前向传递前在 GPU 上复制模型(由于模型参数是在主 GPU 上更新的,因此必须在每次前向传递开始时重新同步模型);
[*]3、GPU 利用率不均衡(损失计算在主 GPU 上进行,在主 GPU 上进行梯度降低和参数更新
DDP流程
对比DP和DDP
1、DP是一种集中-分发机制(优化器/梯度计算都是再master进程上处理好之后,然后分发到不同的进程中)
2、DDP是一种独立-运行机制(每个进程都有自己的优化器,并且在计算梯度过程中:各进程需要将梯度进行汇总规约到主进程,主进程用梯度来更新模型权重,然后其 broadcast 模型到所有进程(其他GPU)进行下一步训练)
Image From: https://www.telesens.co/2019/04/04/distributed-data-parallel-training-using-pytorch-on-aws/
整体流程:
1、加载模型阶段。每个GPU都拥有模型的一个副本,所以不需要拷贝模型。rank为0的进程会将网络初始化参数broadcast到其它每个进程中,确保每个进程中的模型都拥有一样的初始化值。
2、加载数据阶段。DDP 不需要广播数据,而是使用多进程并行加载数据。在 host 之上,每个worker进程都会把自己负责的数据从硬盘加载到 page-locked memory。DistributedSampler 保证每个进程加载到的数据是彼此不重叠的。
3、前向传播阶段。在每个GPU之上运行前向传播,计算输出。每个GPU都执行同样的训练,所以不需要有主 GPU。
4、计算损失。在每个GPU之上计算损失。
5、反向传播阶段。运行后向传播来计算梯度,在计算梯度同时也对梯度执行all-reduce操作。
由于数据实在不同设备上,但是是一个模型,对于梯度的计算可以:直接将不同设备之间梯度相互传播(每个设备的数据是不一样的,但是模型是相同的,这样计算梯度会不同),然后计算平均(alll-reduce计算方法)
6、更新模型参数阶段。因为每个GPU都从完全相同的模型开始训练,并且梯度被all-reduced,因此每个GPU在反向传播结束时最终得到平均梯度的相同副本,所有GPU上的权重更新都相同,也就不需要模型同步了。注意,在每次迭代中,模型中的Buffers 需要从rank为0的进程广播到进程组的其它进程上
DDP代码实现
pytorch中实现DDP大致流程如下:
[*]1、初始化不同进程组。这里主要是使用init_process_group进行实现
[*]2、将数据/模型进行并行。主要主要是使用nn.parallel.DistributedDataParallel处理模型,DistributedSampler处理数据
DistributedSampler:作用是将数据集划分为多个子集,每个子集分配给一个 GPU,使每个 GPU 在训练时处理不同的样本,从而避免重复计算和数据冗余
[*]3、运行完毕之后,通过destroy_process_group()销毁进程
简易Demo如下(单机多卡):
更加详细代码:https://pytorch.org/tutorials/intermediate/ddp_tutorial.html
import osimport datetimeimport torchimport torch.nn as nnimport torch.optim as optimimport torch.distributed as distfrom torch.utils.data.distributed import DistributedSamplerfrom torch.utils.data import DataLoaderdist.init_process_group(backend='nccl', timeout= datetime.timedelta(minutes= 5)) # 初始化不同进程local_rank = int(os.environ['LOCAL_RANK']) # 这里主要是设置GPU数量device = torch.device('cuda', local_rank)torch.cuda.set_device(device)class Model(nn.Module): ...def train(..): ...def val(..): ...def main(): # 加载数据 train_dataset = ... val_dataset = ... train_sampler = DistributedSampler(train_dataset) # 对数据进行采样 train_loader = DataLoader(train_dataset, batch_size=.., shuffle=False, sampler=train_sampler, num_workers=16, pin_memory=True) val_loader = DataLoader(val_dataset, batch_size=.., shuffle=False, num_workers=16, pin_memory=True) # 加载模型 model = Model(..) model.to(device) # 分布式处理模型 model = nn.parallel.DistributedDataParallel(model, device_ids=, output_device=local_rank) # 优化器/loss criterion = .. optimizer = .. for epoch in range(epochs): train_sampler.set_epoch(epoch) train(..) if dist.get_rank()== 0: # 只在主进程上做检验 val(..) dist.barrier() # 同步所有进程 # 销毁所有进程 dist.destroy_process_group()if __name__ == '__main__': main()# 运行代码# python -m torch.distributed.launch --nproc_per_node GPU数量 train.py<hr>
[*]代码解释
1、dist.init_process_group(backend='nccl', timeout= datetime.timedelta(minutes= 5))
[*]backend: 通信后端, nccl(GPU)或 gloo(CPU)
[*]init_method: 用于初始化进程通信的方法,可以是 env://、tcp://hostname:port 或 file://
[*]world_size: 参与训练的进程总数
[*]rank: 当前进程的 ID(从 0 开始),用于区分各个进程
# 单机多卡dist.init_process_group( backend='nccl',# 使用 NCCL 后端进行 GPU 通信 init_method='env://',# 使用环境变量初始化进程组 world_size=2,# 总进程数 rank=int(os.environ['LOCAL_RANK']),# 当前进程的 rank timeout=datetime.timedelta(minutes=5)# 设置超时时间)# 多机多卡dist.init_process_group( backend='nccl',# 使用 NCCL 后端进行 GPU 通信 init_method='tcp://<MASTER_IP>:23456',# 指定 master 节点的 IP 和端口 world_size=8,# 例如 4 台机器,每台 2 卡,world_size大小为 8 rank=int(os.environ['RANK']),# 当前进程的 rank timeout=datetime.timedelta(minutes=5)# 设置超时时间)# 获取本地设备和设置local_rank = int(os.environ['LOCAL_RANK'])device = torch.device('cuda', local_rank)torch.cuda.set_device(device)
[*]backend
nccl:适用于 GPU 上的分布式训练,基于 NVIDIA NCCL 库,专门优化了 GPU 间的通信。通常在多 GPU 训练时使用。
gloo:适用于 CPU 或 GPU 上的分布式训练,支持多种设备间的通信。在没有 NVIDIA GPU 的环境下,可以使用 gloo 后端。
mpi:使用 MPI(Message Passing Interface)库进行分布式训练,适用于跨节点训练。
tcp:通过 TCP 进行通信,较少使用,通常用于调试
From: https://pytorch.org/docs/stable/distributed.html
2、流水线并行
<hr>补充1: pytorch实现流水线并行
From:
1、https://pytorch.org/docs/stable/distributed.pipelining.html
2、https://pytorch.org/tutorials/intermediate/pipelining_tutorial.html
3、https://zhuanlan.zhihu.com/p/658773834
[*]第一步:定义模型结构,以及初始化
# Piplineimport torchimport torch.nn as nnfrom dataclasses import dataclassimport osimport torch.distributed as distfrom torch.distributed.pipelining import pipeline, SplitPoint, PipelineStage, ScheduleGPipeglobal rank, device, pp_group, stage_index, num_stagesdef init_distributed(): global rank, device, pp_group, stage_index, num_stages # 显卡数量 rank = int(os.environ["LOCAL_RANK"]) # 训练总共进程数 world_size = int(os.environ["WORLD_SIZE"]) # 指定设备 device = torch.device(f"cuda:{rank}") if torch.cuda.is_available() else torch.device("cpu") dist.init_process_group() pp_group = dist.new_group() stage_index = rank num_stages = world_size@dataclassclass ModelArgs: dim: int = 512 n_layers: int = 8 n_heads: int = 8 vocab_size: int = 10000class Transformer(nn.Module): def __init__(self, model_args: ModelArgs): super().__init__() self.tok_embeddings = nn.Embedding(model_args.vocab_size, model_args.dim) # 因为流水线并行要用到将不同的模型放到不同的显卡上,因此可以先用 ModuleDict 将不同模型分 # 为不同的层,然后分配到不同设备 self.layers = torch.nn.ModuleDict() for layer_id in range(model_args.n_layers): self.layers = nn.TransformerDecoderLayer(model_args.dim, model_args.n_heads) self.norm = nn.LayerNorm(model_args.dim) self.output = nn.Linear(model_args.dim, model_args.vocab_size) def forward(self, tokens: torch.Tensor): h = self.tok_embeddings(tokens) if self.tok_embeddings else tokens for layer in self.layers.values(): h = layer(h, h) h = self.norm(h) if self.norm else h output = self.output(h).clone() if self.output else h return output
[*]第二步:切割模型
这里假设两张显卡,将前4层放到显卡A。后四层放到显卡B
def manual_model_split(model) -> PipelineStage: if stage_index == 0: # prepare the first stage model for i in range(4, 8): del model.layers model.norm = None model.output = None elif stage_index == 1: # prepare the second stage model for i in range(4): del model.layers model.tok_embeddings = None stage = PipelineStage( model, stage_index, num_stages, device, ) return stage
[*]第三步:训练
if __name__ == "__main__": init_distributed() num_microbatches = 4 model_args = ModelArgs() model = Transformer(model_args) # Dummy data x = torch.ones(32, 500, dtype=torch.long) y = torch.randint(0, model_args.vocab_size, (32, 500), dtype=torch.long) example_input_microbatch = x.chunk(num_microbatches) # Option 1: Manual model splitting stage = manual_model_split(model) model.to(device) x = x.to(device) y = y.to(device) def tokenwise_loss_fn(outputs, targets): loss_fn = nn.CrossEntropyLoss() outputs = outputs.reshape(-1, model_args.vocab_size) targets = targets.reshape(-1) return loss_fn(outputs, targets) schedule = ScheduleGPipe(stage, n_microbatches=num_microbatches, loss_fn=tokenwise_loss_fn) if rank == 0: schedule.step(x) elif rank == 1: losses = [] output = schedule.step(target=y, losses=losses) print(f"losses: {losses}") dist.destroy_process_group()# --nproc_per_node 2 两台设备# torchrun --nnodes 1 --nproc_per_node 2 pipelining_tutorial.py<hr>2.1 GPipe实现流水线并行: https://torchgpipe.readthedocs.io
GPipe 将一个小批量(mini-batch)分割成多个微批量(micro-batch),使设备尽可能并行工作。这就是所谓的流水线并行。基本上,流水线并行是一个小型数据并行的堆栈。当每个分区处理完一个微型批次后,可以将输出扔给下一个分区,并立即开始处理下一个微型批次。现在,分区可以重叠。
Image From: https://arxiv.org/pdf/1811.06965
上图中b、c分别表示为模型并行和流水线并行都会有一个“拆分”的处理(模型并行和流水线并行都会对模型进行拆分,但模型并行主要关注模型的计算任务如何分布到不同设备,而流水线并行还结合了微批次化的数据处理,用于提升并行效率。)那么就会有存在一个问题:如何去处理梯度?(模型并行好理解,前向处理之后我直接反过来再去处理梯度即可)
对比数据并行(From: https://www.cnblogs.com/rossiXYZ/p/15172816.html)
micro-batch 跟数据并行有高度的相似性:
[*]数据并行是空间上的,数据被拆分成多个 tensor,同时喂给多个设备并行计算,然后将梯度累加在一起更新。
[*]micro-batch 是时间上的数据并行,数据被拆分成多个 tensor,这些 tensor 按照时序依次进入同一个设备串行计算,然后将梯度累加在一起更新。
当总的 batch size 一致,且数据并行的并行度和 micro-batch 的累加次数相等时,数据并行和 Gradient Accumulation 在数学上完全等价。Gradient Accumulation 通过多个 micro-batch的梯度累加使得下一个 micro-batch 的前向计算不需要依赖上一个 micro-batch 的反向计算,因此可以畅通无阻的进行下去(当然在一个大 batch 的最后一次 micro-batch 还是会触发这个依赖)
理解两种方式,我们假设数据数量:10,然后设备个数:5,同时假设我们也将模型分布到这5个设备上
数据并行:每个设备会处理2个数据(10/5)
流水线并行:因为模型分布在不同设备上(假设:\(ld_1, ld_2, ld_3, ld_4, ld_5\)),会有一个操作:将数据在拆分为不同micro-batch(这里假设为5,得到:$md_1,md_2,md_3,md_4,md_5 \(),这样一来随着前向传播:\)t_0\(时:\)ld_1\(处理\)md_1\(;\)t_2\(时:\)(ld_1, md_2), (ld_2, md_1)\(。对比 **模型并行** :假设\)t_4\(恰好`流水线并行`处理完数据,那么\)t_0\rightarrow t_5\(:\)(ld_1, (md_1...md_5))$
这个过程,因为要等所有的模型处理完数据,就会不断将梯度进行累积。
<hr>梯度累计
pytorch代码:一般就是在loss.backward()(反向传播,计算当前梯度)计算之后不去使用optimizer.step()(更新网络参数)和optimizer.zero_grad()(清空过往梯度)
单卡训练,梯度累计代码示范:
for i, (images, target) in enumerate(train_loader): # 1. input output images = images.cuda(non_blocking=True) target = torch.from_numpy(np.array(target)).float().cuda(non_blocking=True) outputs = model(images) # 前向传播 loss = criterion(outputs, target) # 计算损失 # 2. backward loss.backward() # 反向传播,计算当前梯度 # 3. update parameters of net if ((i+1)%accumulation)==0: # optimizer the net optimizer.step() # 更新网络参数 optimizer.zero_grad() # reset grdient # 清空过往梯度数据并行梯度累计:
for i, (inputs, targets) in enumerate(data_loader): inputs, targets = inputs.to(device), targets.to(device) # 前向传播 outputs = model(inputs) loss = criterion(outputs, targets) / accumulation_steps# 累计时按步数归一化 # 反向传播 loss.backward() # 每 accumulation_steps 次更新一次参数 if (i + 1) % accumulation_steps == 0: optimizer.step()# 更新参数 optimizer.zero_grad()# 清空梯度 # 如果最后的 mini-batch 不足以凑齐 accumulation_steps if (i + 1) % accumulation_steps != 0: optimizer.step() optimizer.zero_grad()<hr>简单使用:
from torchgpipe import GPipemodel = nn.Sequential(a, b, c, d)model = GPipe(model, balance=, chunks=8)# 1st partition: nn.Sequential(a, b) on cuda:0# 2nd partition: nn.Sequential(c, d) on cuda:1for input in data_loader: output = model(input)在GPipe中会存在一个Bubbles问题(有进行任何有效工作的点)(比如说上图:\(F_{3,0}\)执行完成之前,\(F_{4,i}\)都只能等待,就会造成一个空档期)参考知乎上描述,可以通过增加microbatch来实现降低Bubbles的比例
2.2 PipeDream并行(https://zhuanlan.zhihu.com/p/617087561)
下图是PipeDream的调度图,4个GPU和8个microbatchs。蓝色的方块表示前向传播,绿色表示反向传播,数字则是microbatch的id。
GPipe需要等所有的microbatch前向传播完成后,才会开始反向传播。PipeDream则是当一个microbatch的前向传播完成后,立即进入反向传播阶段
<hr>补充1:gradient-checkpoint方法
是一种减少深度学习模型训练过程中显存使用的技术(用时间换内存)。它通过在前向传播中有选择地存储部分中间激活值,并在需要反向传播时重新计算丢弃的激活值,显著降低显存占用,同时代价是额外的计算时间。
补充1.1:
显存:1、静态内存:模型自身显存占用(模型的参数量);2、动态内存:训练过程中的计算过程。gradient-checkpoint做的就是减少动态内存占用
比如说:\(x \xrightarrow{x_1} a_1 \xrightarrow{x_2} a_2 \xrightarrow{x_3} a_3 \xrightarrow{x_4} a_4\)
\(loss=(a_4- y)^2\)
那么在计算梯度过程 \(\frac{dloss}{dw_1}=2(a_4-y)w_4w_3w_2x\)
使用gradient-checkpoint人为放弃部分中间过程值,比如说\(a_1, a_2\),如果放弃那就意味着后续在反向传播过程重新再计算\(a_1, a_2\)值即可
From:https://www.bilibili.com/video/BV1nJ4m1M7Qw/?vd_source=881c4826193cfb648b5cdd0bad9f19f0
from torch.utils.checkpoint import checkpointclass LargeModel1(nn.Module): def __init__(self): super(LargeModel1, self).__init__() self.block1 = nn.Sequential( nn.Linear(1024, 2048), nn.ReLU() ) self.block2 = ... self.block3 = ... def forward(self, x): x = checkpoint(self.block1, x)# 仅存储 block1 的输入和输出 x = checkpoint(self.block2, x)# 仅存储 block2 的输入和输出 x = self.block3(x)# 最后一个 block 不使用 checkpoint return x'''使用Checkpoint显存占用:2.00 MB使用Checkpoint耗时:0.0015211105346679688 秒不使用Checkpoint显存占用:34.77 MB不使用Checkpoint耗时:0.0011744499206542969 秒'''其他例子:https://github.com/prigoyal/pytorch_memonger/blob/master/tutorial/Checkpointing_for_PyTorch_models.ipynb
具体原理:
梯度检查点(gradient checkpointing) 的工作原理是从计算图中省略一些激活值(由前向传播产生,其中这里的”一些“是指可以只省略模型中的部分激活值,折中时间和空间,陈天奇在它的论文使用了如下动图的方法,即前向传播的时候存一个节点释放一个节点,空的那个等需要用的时候再backword的时候重新计算)。这减少了计算图使用的内存,降低了总体内存压力(并允许在处理过程中使用更大的批次大小)。
From: https://zhuanlan.zhihu.com/p/448395808
补充2:数据并行+流水线并行
数据并行:将数据拆分然后分配到不同设备上
流水线并行:将模型进行拆分,然后处理所有数据
两种在“进程”上是垂直的,也就是可以叠加。比如说:假设有一个 4 层的神经网络,批量大小为 256,模型拆分为 2 个阶段,设备数量为 4。
[*]第一步:假设对不同的设备分配模型(流水线并行拆分模型):
GPU 0+ GPU 1:处理第 1 和第 2 层
GPU 2+ GPU 3:处理第 3 和第 4 层
[*]第二步:分配数据(数据并行分配数据),分配为2个不同的mini-batch
GPU 0+ GPU 2:处理第 1 个 mini-batch
GPU 1+ GPU 3:处理第 2 个 mini-batch
[*]第三步:前向传播+ 反向传播
流水线并行过程发生在:GPU 0+ GPU 2 和 GPU 1+ GPU 3
数据并行过程发生在:GPU 0+ GPU 1 和 GPU 2+ GPU 3
# 数据并行+流水线并行 简单 demoimport osimport datetimeimport torchimport torch.nn as nnfrom torch.utils.data import DataLoaderimport torch.distributed as distfrom torch.utils.data.distributed import DistributedSamplerfrom torch.distributed.pipelining import PipelineStage, SplitPoint, pipeline, ScheduleGPipe# 初始化dist.init_process_group(backend='nccl', timeout= datetime.timedelta(minutes= 5)) # 初始化不同进程local_rank = int(os.environ['LOCAL_RANK']) # 这里主要是设置GPU数量device = torch.device('cuda', local_rank)torch.cuda.set_device(device)class Model(nn.Module): def __init__(self): super(Model, self).__init__() self.layer1 = ... # F1 self.layer2 = ... # F2 self.layer3 = ... self.layer4 = ... def forward(self, x): x = self.layer1(x) ... out = self.layer4(x) return outclass PipeLineModel(nn.Module): def __init__(self, model): super(PipeLineModel, self).__init__() self.stages = nn.ModuleList([ PipelineStage(model.layer1, device), ..., PipelineStage(model.layer1, device) ]) self.split_points = def forward(self, x): schedule = ScheduleGPipe(self.stages, self.split_points) return schedule(x)def train(): ...def val(): ...def main(): train_dataset = ... val_dataset = ... train_sampler = DistributedSampler(train_dataset) # 对数据进行采样 train_loader = DataLoader(train_dataset, batch_size=..., shuffle=False, sampler=train_sampler, num_workers=16, pin_memory=True) val_loader = DataLoader(val_dataset, batch_size=..., shuffle=False, num_workers=16, pin_memory=True) model = Model() model = PipeLineModel(model= model) ddp_model = nn.parallel.DistributedDataParallel(model, device_ids=, output_device=local_rank) criterion = ... optimizer = ... for epoch in range(...): train_sampler.set_epoch(epoch) train(...) if dist.get_rank()== 0: val(...) dist.barrier() dist.destroy_process_group()if __name__ == "__main__": main()# python -m torch.distributed.launch --nproc_per_node 4 train.py补充3: 流水线并行那么batch-norm如何使用?
因为data被拆分不同的micro-batch,数据小,用batch-norm不稳定,可以用SyncBatchNorm
建议还是不用正如Gpipe中描述:
But in the current implementation, it is slower than the vanilla batch normalization. That is why we turn off by default.
如果要去用全局就会导致速度上慢了(要进行同步)
https://cloud.tencent.com/developer/article/2126838
<hr>3、张量并行
张量并行是针对模型中的张量进行拆分,将其放置到不同的GPU上。张量切分方式分为按行进行切分和按列进行切分,分别对应行并行(Row Parallelism)(权重矩阵按行分割)与列并行(Column Parallelism)(权重矩阵按列分割)。假设计算过程为:\(y=Ax\)其中\(A\)为权重
From:https://github.com/wdndev/llm_interview_note/blob/main/04.分布式训练/4.张量并行/4.张量并行.md
对于方向传播过程中梯度处理:\(y=Ax\)
代码初始化:
dist.init_process_group(backend='nccl', timeout=datetime.timedelta(minutes=5))local_rank = int(os.environ['LOCAL_RANK'])device = torch.device('cuda', local_rank)torch.cuda.set_device(device)
[*]列并行
反向传播1:
\[\frac{\partial L}{\partial X}=\frac{\partial L}{\partial X}|_{A_1}+\frac{\partial L}{\partial X}|_{A_2} (\text{all-reduce})\]
反向传播2:\(Y=\text{cat}\)
\[\frac{\partial L}{\partial Y_1} \\\frac{\partial L}{\partial Y_2}\]
# 列并行class ColumnParallelLinear(nn.Module): def __init__(self, input_size, output_size): super(ColumnParallelLinear, self).__init__() world_size = dist.get_world_size() self.linear = nn.Linear(input_size, output_size // world_size) # 按列进行拆分 def forward(self, x): local_output = self.linear(x) # All-Gather outputs = dist.all_gather(outputs, local_output) return torch.cat(outputs, dim=-1) def backward(self, grad_output): # 通过All-Reduce同步梯度 dist.all_reduce(self.linear.weight.grad) if self.linear.bias is not None: dist.all_reduce(self.linear.bias.grad)
[*]行并行
反向传播1:
\[\frac{\partial L}{\partial X}=[\frac{\partial L}{\partial X_1}+\frac{\partial L}{\partial X_2}] (\text{all-gather})\]
反向传播2:\(Y= Y_1+ Y_2\)
\[\frac{\partial L}{\partial Y_1}= \frac{\partial L}{\partial Y}\]
class RowParallelLinear(nn.Module): def __init__(self, input_size, output_size): super(RowParallelLinear, self).__init__() # 获取当前分布式环境中的进程总数(world size)和当前进程的秩(rank) world_size = dist.get_world_size() rank = dist.get_rank() # 设置输出大小,并初始化线性层。每个设备负责一部分输入特征到所有输出特征的映射 self.output_size = output_size self.linear = nn.Linear(input_size // world_size, output_size, bias=False) # 初始化权重矩阵 with torch.no_grad(): # 创建一个完整的权重矩阵,并使用 Kaiming 均匀分布进行初始化 full_weight = torch.empty(input_size, output_size) nn.init.kaiming_uniform_(full_weight, a=math.sqrt(5)) # 将权重矩阵按行分割成多个部分,每个部分对应一个设备 weight_chunks = list(full_weight.chunk(world_size, dim=0)) # 将对应的权重部分复制到当前设备上的线性层中 self.linear.weight.data.copy_(weight_chunks) # 如果有偏置项,则也需要按行分割并广播给所有进程 if self.linear.bias is not None: with torch.no_grad(): # 将偏置项按行分割 bias_chunks = list(self.linear.bias.chunk(world_size, dim=0)) # 将对应的偏置部分复制到当前设备上的线性层中 self.linear.bias.data.copy_(bias_chunks) # 广播偏置到所有进程,确保所有设备上的偏置一致 dist.broadcast(self.linear.bias, src=0) def forward(self, x): """ 前向传播: - 对输入张量 x 按最后一维(通常是特征维度)切片,每个设备只处理它负责的那一部分输入。 - 计算局部输出后,使用 All-Reduce 操作将所有设备的局部输出相加以获得完整的输出张量。 """ # 对x根据设备切片 input_chunks = list(x.chunk(dist.get_world_size(), dim=-1)) local_input = input_chunks local_output = self.linear(local_input) # 使用 All-Reduce 收集所有设备的输出以获得完整的输出张量 dist.all_reduce(local_output, op=dist.ReduceOp.SUM) return local_output def backward(self, grad_output): """ 反向传播: - 在每个设备上本地计算输入梯度。 - 通过 All-Reduce 同步输入梯度、权重梯度以及偏置梯度(如果有)。 """ # 本地计算输入梯度 local_grad_input = grad_output @ self.linear.weight.T # 通过 All-Reduce 同步梯度 dist.all_reduce(local_grad_input) dist.all_reduce(self.linear.weight.grad) if self.linear.bias is not None: dist.all_reduce(self.linear.bias.grad)参考
1、https://www.telesens.co/2019/04/04/distributed-data-parallel-training-using-pytorch-on-aws/
2、https://github.com/hkproj/pytorch-transformer-distributed
3、https://mp.weixin.qq.com/s/WdLpHfWLRvDLLxeanFduxA
4、https://torchgpipe.readthedocs.io
5、https://arxiv.org/pdf/1811.06965
6、https://www.cnblogs.com/rossiXYZ/p/15172816.html
7、https://www.bilibili.com/video/BV1nJ4m1M7Qw/?vd_source=881c4826193cfb648b5cdd0bad9f19f0
页:
[1]