钓鱼网站网址大全,注册商标符号,做广告的公司,检查网站的跳转路径是否清晰 哪里要优化简介
本文详细介绍了LLM推理中的三种并行计算方法#xff1a;数据并行通过在多设备上复制模型并并行处理不同批次数据提升速度#xff1b;模型并行将模型拆分到多设备上解决单设备显存不足问题#xff1b;流水线并行通过微批次调度实现GPU并行计算提高利用率。文章对比分析…简介本文详细介绍了LLM推理中的三种并行计算方法数据并行通过在多设备上复制模型并并行处理不同批次数据提升速度模型并行将模型拆分到多设备上解决单设备显存不足问题流水线并行通过微批次调度实现GPU并行计算提高利用率。文章对比分析了三者在显存占用、吞吐量和性能上的权衡指出需根据模型规模和硬件限制选择合适策略组合使用能进一步释放扩展潜力。本文将分析数据并行Data Parallelism、模型并行Model Parallelism与流水线并行Pipeline Parallelism在推理引擎中的实现方式并讨论它们对显存占用、吞吐量及整体性能权衡的影响。在数据并行Data Parallel范式中我们将数据集划分到多个计算设备上而每个设备均保留一份完整的模型副本。当模型规模能够轻松装入单个设备显存时该方法尤为高效。通过并行处理不同批次的数据理论上可将推理速度提升为可用设备数量的倍数。然而当模型体积过大以至于无法完整放入单个 GPU 时这一策略便不再可行。Figure 1: Data Parallel Diagram(credits : eraser.io)图 1数据并行示意图来源eraser.io在多设备环境下数据集需均匀划分。例如若共有 100 条数据并使用 2 张 GPU理想策略是按 50/50 比例切分。为保证随机性并避免偏差通常先打乱索引再将其平均分配到各 GPU。下面给出具体实现。#Dataset file : dataset.pyimport torchfrom random import Randomimport torch.distributed as distfrom torch.utils.data import DataLoaderclassPartition(): #Standard pytorch dataset class(containing len and getitem methods) def__init__(self, data, index): self.data data #Entire Dataset(list) self.index index #Indices(list) : Different for each device def__len__(self): returnlen(self.index) #Partition represents the chunk of data(not entire data) def__getitem__(self, index): data_idx self.index[index] returnself.data[data_idx]classDataPartitioner(): def__init__(self, data, sizes[0.5, 0.5], seed1234): self.data data self.partitions partitions rng Random() rng.seed(seed) #Get the length of the entire dataset data_len len(data) indices list(range(data_len)) #Shuffle the indices rng.shuffle(indices) #Partition the indices for the devices start_idx 0 for size in sizes: part_len int(size * data_len) self.partitions.append(indices[start_idx:start_idx part_len]) start_idx part_len defuse(self, partition): return Partition(self.data, self.partitions[partition]) #Dataset, List of Indices defpartition_dataset(rank, world_size, dataset, batch_size128, collate_fnNone): partitioned_batch_size batch_size // world_size sizes [1/ world_size for _ inrange(world_size)] #world_size : The number of devices partitioner DataPartitioner(dataset, sizessizes) partition partitioner.use(rank) #Wrap this in a dataloader dataloader DataLoader( partition, batch_sizepartitioned_batch_size, collate_fncollate_fn ) return dataloader逻辑清晰对吧若对world_size与rank的含义存疑可简单理解为•world_size表示参与训练的设备总数如 GPU 数量。•rank用于标识其中的某一具体设备。例如使用 4 张 GPU 时world_size设为 4而rank取值 0–3分别对应每张 GPU。大规模训练需两大核心组件多进程并发torch.multiprocessing与设备间高效通信torch.distributed。思考题为何需要通信请先行思考。分布式数据加载器已就绪接下来实现训练流程。#importsimport tqdmimport torchimport dataset #The dataloader that we wroteimport numpy as npimport torch.nn as nnfrom functools import partialimport torch.distributed as distfrom torch.utils.data import DataLoaderfrom torch.multiprocessing import Processfrom transformers import AutoConfig, GPT2LMHeadModelfrom utils import get_tokenizer, collate_batch#You can write your own tokenizer, train and generate functionsdefaverage_gradients(model): world_size dist.get_world_size() for param in model.parameters(): if param.grad isnotNone: dist.all_reduce(param.grad.data, opdist.ReduceOp.SUM) #Communication overhead across gpus param.grad.data / world_sizedeftrain(model, optimizer, examples, batch_size, collate_fn, desc, rank0, average_gradients_fnNone): model.train() tokens_per_sec [] tokens_num [] for i, batch inenumerate(prog_bar : tqdm.tqdm(examples, descfTraining ({desc}))): t0 time.time() optimizer.zero_grad() logits model(input_idsbatch[input_ids]).logits loss torch.nn.functional.cross_entropy( inputlogits.reshape((-1, logits.shape[-1])), targetbatch[labels].reshape(-1), reductionnone) loss (torch.sum(loss * batch[label_token_weights].reshape(-1)) / torch.sum(batch[label_token_weights])) loss.backward() #Calculates the gradients if average_gradients_fn isnotNone: average_gradients_fn(model) optimizer.step() #Updates the weights batch_time time.time() - t0 tokens np.prod(batch[input_ids].shape) tokens_per_sec.append(tokens / batch_time) tokens_num.append(tokens) prog_bar.set_postfix( tokens_per_sectokens / batch_time, lossloss.item()) return np.mean(tokens_per_sec), tokens_num defsetup(rank, world_size, backend): #Sets up the communication between multiple devices(GPUs) os.environ[MASTER_ADDRESS] localhost os.environ[MASTER_PORT] 33333 dist.init_process_group(backendbackend, rankrank, world_sizeworld_size)#This function will be run by each process concurrently, the id for the process is rankdefrun_dp(rank, world_size, backend, dataset_name, model_max_length, n_epochs, batch_size, learning_rate): setup(rank, world_size, backend) config AutoConfig.from_pretrained(gpt2) model GPT2LMHeadModel(configconfig).to(rank) #This is great! We are loading it on each device, thats why rank!!! optimizer torch.optim.AdamW(model.parameters(), lrlearning_rate) #Surprisingly, AdamW works great for LLMs #We will use german(deutsch) to english translation dataset dataset { split: datasets.load_dataset(dataset_name, splitsplit)[translation] for split in [train, validation, test] } src_key, tgt_key de, en dataset[train] dataset[train][:5000] dataset[validation] dataset[validation][:1000] dataset[test] dataset[test][:100] #tokenization tokenizer get_tokenizer(examplesdataset[train], vocab_sizeconfig.vocab_size, src_keysrc_key, tgt_keytgt_key) #collate function : partial pre-fills some of the arguments collate_fn partial(collate_batch, src_keysrc_key, tgt_keytgt_key, tokenizertokenizer, model_max_lengthmodel_max_length, devicerank) train_loader partition_dataset(rank, world_size, dataset[train], batch_sizebatch_size, collate_fncollate_fn) val_loader DataLoader(dataset[validation], batch_sizebatch_size, shuffleFalse, collate_fncollate_fn) test_loader DataLoader(dataset[test], batch_sizebatch_size, shuffleFalse, collate_fncollate_fn) total_time [] total_tokens_per_sec [] for epoch_idx inrange(n_epochs): start time.time() avg_tokens_per_sec, _ train( modelmodel, optimizeroptimizer, examplestrain_loader, batch_sizebatch_size, collate_fncollate_fn, descdesc, rankrank, average_gradients_fnaverage_gradients) end time.time()if __name__ __main__: import torch.multiprocessing as mp mp.set_start_method(spawn, forceTrue) parser argparse.ArgumentParser() parser.add_argument(--pytest, typebool, defaultFalse) parser.add_argument(--dataset, typestr) parser.add_argument(--model_max_length, typeint, default128) parser.add_argument(--n_epochs, typeint, default10) parser.add_argument(--batch_size, typeint, default128) parser.add_argument(--learning_rate, typefloat, default1e-4) parser.add_argument(--world_size, typeint, default2) args parser.parse_args() backend nccl#for cpu choose gloo for rank inrange(world_size): p Process( targetrun_dp, args(rank, world_size, backend, args.dataset, args.model_max_length, args.n_epochs, args.batch_size, args.learning_rate) ) p.start() processes.append(p) # Wait for all processes to finish for p in processes: p.join()在更新模型权重之前所有设备上的梯度会被求平均以保证各副本的一致性。这意味着在训练的每一步每张 GPU 都拥有完全相同的模型副本。 *图 2用于梯度平均的 All-reduce 方法*下面做一个快速实验。笔者手头有两张 H10080 GBGPU对于 GPT-2 这种小模型来说实属“杀鸡用牛刀”。然而正如俗语所言“若手中只有火箭筒也只能用它打蚊子”。因此本文将测试在该配置下可获得的训练吞吐率与训练耗时。 *图 3单卡与双卡训练期间的吞吐率对比*结果显示双卡方案的吞吐率训练过程中处理的 token 数几乎达到单卡的 2 倍但这一速度提升以硬件数量翻倍为代价。 *图 4单卡与双卡训练耗时对比*尽管预期每个 epoch 的平均训练时间应约为单卡方案的一半但多次实验表明情况略有不同大多数 epoch 确实提速明显但总有 1–2 个 epoch 出现显著波动。这种峰值可能源于多种开销例如数据加载器取下一批数据的延迟、Python 垃圾回收的触发或 NCCL 跨设备通信带来的同步延迟。## 模型并行Model Parallel模型并行Model Parallel的核心思想是将模型本身拆分到多个设备上。例如当模型包含 12 层且拥有 2 张 GPU 时可把前 6 层置于 GPU 0后 6 层置于 GPU 1。每张设备仅负责前向与反向传播的一部分从而允许训练单张 GPU 无法容纳的超大模型。 *图 5模型并行流程示意图*在传统模型并行中执行是**顺序**的任一时刻只有一张 GPU 处于活跃计算状态。因此即便拥有 4 张 GPU 并将模型切分也只能让一张 GPU 参与计算其余 GPU 处于空闲等待。这导致 GPU 利用率低下设备数量增加时收益递减。pythonimport mathdefget_device_map(n_layers, devices): Returns a dictionary of layers distributed evenly across all devices. layers list(range(n_layers)) n_blocks int(math.ceil(n_layers / len(devices))) layers_list [layers[i : i n_blocks] for i inrange(0, n_layers, n_blocks)] returndict(zip(devices, layers_list))defparallelize(self, device_mapNone): Distribute the model layers across the devices based on the device_map. self.device_map ( get_device_map(len(self.h), range(torch.cuda.device_count())) if device_map isNoneelse device_map ) self.model_parallel True self.first_device cpuifcpuinself.device_map.keys() elsecuda: str(min(self.device_map.keys())) self.last_device cuda: str(max(self.device_map.keys())) self.wte self.wte.to(self.first_device) self.wpe self.wpe.to(self.first_device) # Load onto devices for k, v inself.device_map.items(): for block in v: cuda_device cuda: str(k) self.h[block] self.h[block].to(cuda_device) # ln_f to last self.ln_f self.ln_f.to(self.last_device)为简化说明以 GPT-2 为例模型共 12 层平均分布在 2 张 GPU 上。表 1不同层在 GPU 上的分布示意表 1不同层在 GPU 上的分布示意• wte、wpe → “cuda:0”迭代 1k0, v[0,1,2,3,4,5]└── self.h[0].to(“cuda:0”)└── self.h[1].to(“cuda:0”)└── self.h[2].to(“cuda:0”)└── self.h[3].to(“cuda:0”)└── self.h[4].to(“cuda:0”)└── self.h[5].to(“cuda:0”)迭代 2k1, v[6,7,8,9,10,11]└── self.h[6].to(“cuda:1”)└── self.h[7].to(“cuda:1”)└── self.h[8].to(“cuda:1”)└── self.h[9].to(“cuda:1”)└── self.h[10].to(“cuda:1”)└── self.h[11].to(“cuda:1”)• ln_f、lm_head → “cuda:1”为进一步验证我们使用 7B 参数的 LLaMA-2 进行实验。在 2 张与 4 张 V10016 GBGPU 上以相同配置运行模型并行评估其性能与可扩展性。表 2LLaMA-2–7B 模型并行参数与结果表 2LLaMA-2–7B 模型并行参数与结果实验再次证明任一时刻仅有一张 GPU 处于活跃状态。数据像水流一样顺序经过每张 GPU增加 GPU 数量并不能加快“水流”速度只是把管道切成更多段。然而它确实使得原本无法单卡加载的超大模型得以训练。流水线并行Pipeline Parallel流水线并行Pipeline Parallel通过将深度学习模型划分为若干顺序阶段并让不同 GPU 并行处理输入批次中的不同微批次micro-batch从而在多 GPU 上分布模型。与传统模型并行每次仅让一张 GPU 计算不同流水线并行允许每张 GPU 负责模型的一部分并在不同微批次上并行工作。具体而言假设模型共 32 层可用 GPU 为 4 张可将模型切分为每 8 层一段。当一批输入数据到达时先将其进一步拆分为微批次。第一个微批次在 GPU 0层 0–7上开始处理一旦完成立即把中间激活传递给 GPU 1层 8–15同时 GPU 0 开始处理第二个微批次。依此类推GPU 2 在收到 GPU 1 的输出后立即处理第一个微批次GPU 3 接收 GPU 2 的结果继续计算形成持续的数据流。经过短暂的“预热”阶段称为pipeline bubble后所有 GPU 保持活跃每张 GPU 在任意时刻都在处理不同的微批次。与模型并行相比这种重叠执行显著提高了 GPU 利用率类似于流水线装配每个阶段GPU专注特定任务一旦流水线填满系统效率大幅提升。图 6模型并行 vs 流水线并行图 6模型并行 vs 流水线并行调度器负责在每个时钟周期决定哪张 GPU 处理哪个微批次。def _clock_cycles(num_batches: int, num_partitions: int) - Iterable[List[Tuple[int, int]]]: Key insight: batch i is at partition j when: clock i j total_clocks num_batches num_partitions - 1 # Fill Run Drain for clock in range(total_clocks): schedule [] for i in range(num_batches): j clock - i # Which partition is batch i at? if 0 j num_partitions: # Is it a valid partition? schedule.append((i, j)) # (batch_idx, partition_idx) yield schedule示例3 个批次3 个分区• Clock 0: [(0,0)] → GPU 0 处理批次 0• Clock 1: [(1,0), (0,1)] → GPU 0 处理批次 1GPU 1 处理批次 0• Clock 2: [(2,0), (1,1), (0,2)] → 3 张 GPU 全部忙碌• Clock 3: [(2,1), (1,2)] → GPU 1 处理批次 2GPU 2 处理批次 1• Clock 4: [(2,2)] → GPU 2 处理批次 2def forward(self, x): # 1. SPLIT: Divide batch into micro-batches batches list(x.split(self.split_size, dim0)) # 2. SCHEDULE: Generate the clock cycle schedule schedules _clock_cycles(num_batches, num_partitions) # 3. EXECUTE: Process each clock cycle for schedule in schedules: self.compute(batches, schedule) # ← This runs GPUs in parallel! # 4. COMBINE: Concatenate results output torch.cat(batches, dim0) return output.to(last_device) plaintext def compute(self, batches, schedule): # PHASE 1: Submit ALL tasks in parallel (non-blocking) for batch_idx, partition_idx in schedule: batch batches[batch_idx].to(devices[partition_idx]) defcompute_fn(): return partition(batch) # Run layers on this GPU task Task(compute_fn) self.in_queues[partition_idx].put(task) # Send to worker thread # PHASE 2: Collect ALL results for batch_idx, partition_idx in schedule: success, result self.out_queues[partition_idx].get() # Wait for result batches[batch_idx] output # Store result for next stage或许有人会问为何不能像数据并行那样使用进程流水线并行需要在每个时钟周期进行高频、紧密的 GPU 间协调。线程与共享内存队列的开销极小而独立进程则会引入过多通信延迟。数据并行、模型并行与流水线并行各自提供了在多 GPU 上扩展深度学习负载的不同思路但它们的权衡决定了适用场景。表 3数据并行、模型并行与流水线并行对比总结表 3数据并行、模型并行与流水线并行对比总结最终不存在放之四海而皆准的策略。若模型可装入内存数据并行通常最为直接若模型规模巨大则模型并行或流水线并行必不可少。将策略组合如流水线并行 数据并行更能进一步释放扩展潜力。关键在于理解模型规模、硬件限制以及每种方法在内存、通信与性能上的权衡。如何学习AI大模型大模型时代火爆出圈的LLM大模型让程序员们开始重新评估自己的本领。 “AI会取代那些行业”“谁的饭碗又将不保了”等问题热议不断。不如成为「掌握AI工具的技术人」毕竟AI时代谁先尝试谁就能占得先机想正式转到一些新兴的 AI 行业不仅需要系统的学习AI大模型。同时也要跟已有的技能结合辅助编程提效或上手实操应用增加自己的职场竞争力。但是LLM相关的内容很多现在网上的老课程老教材关于LLM又太少。所以现在小白入门就只能靠自学学习成本和门槛很高那么针对所有自学遇到困难的同学们我帮大家系统梳理大模型学习脉络将这份LLM大模型资料分享出来包括LLM大模型书籍、640套大模型行业报告、LLM大模型学习视频、LLM大模型学习路线、开源大模型学习教程等, 有需要的小伙伴可以扫描下方二维码领取↓↓↓学习路线第一阶段 从大模型系统设计入手讲解大模型的主要方法第二阶段 在通过大模型提示词工程从Prompts角度入手更好发挥模型的作用第三阶段 大模型平台应用开发借助阿里云PAI平台构建电商领域虚拟试衣系统第四阶段 大模型知识库应用开发以LangChain框架为例构建物流行业咨询智能问答系统第五阶段 大模型微调开发借助以大健康、新零售、新媒体领域构建适合当前领域大模型第六阶段 以SD多模态大模型为主搭建了文生图小程序案例第七阶段 以大模型平台应用与开发为主通过星火大模型文心大模型等成熟大模型构建大模型行业应用。学会后的收获• 基于大模型全栈工程实现前端、后端、产品经理、设计、数据分析等通过这门课可获得不同能力• 能够利用大模型解决相关实际项目需求 大数据时代越来越多的企业和机构需要处理海量数据利用大模型技术可以更好地处理这些数据提高数据分析和决策的准确性。因此掌握大模型应用开发技能可以让程序员更好地应对实际项目需求• 基于大模型和企业数据AI应用开发实现大模型理论、掌握GPU算力、硬件、LangChain开发框架和项目实战技能 学会Fine-tuning垂直训练大模型数据准备、数据蒸馏、大模型部署一站式掌握• 能够完成时下热门大模型垂直领域模型训练能力提高程序员的编码能力 大模型应用开发需要掌握机器学习算法、深度学习框架等技术这些技术的掌握可以提高程序员的编码能力和分析能力让程序员更加熟练地编写高质量的代码。1.AI大模型学习路线图2.100套AI大模型商业化落地方案3.100集大模型视频教程4.200本大模型PDF书籍5.LLM面试题合集6.AI产品经理资源合集获取方式有需要的小伙伴可以保存图片到wx扫描二v码免费领取【保证100%免费】