更新于:2024-05-18T11:08:43+08:00
推理和训练大模型通常需要巨大的计算资源和时间。微软推出 DeepSpeed 深度学习优化库,旨在解决这一问题。从本篇博客中,我们将深入了解 deepspeed,并理解微软的工程师们是如何通过对内存、并行度、通信的优化,从而极大地加速了大模型的推理和训练过程。
注:本篇博文的源码分析基于 deepspeed-0.14.2。
DeepSpeed 简介
DeepSpeed 是一个专门为深度学习模型训练设计的优化库。它实现了 ZeRO 论文 中描述的所有内容。它是目前开源社区中广泛使用的训练大模型的框架。一般地,它支持以下三个阶段(stage):
ZeRO stage 1:优化器状态分区
ZeRO stage 2:梯度分区
ZeRO stage 3:参数分区
上述三个阶段给了用户更多更灵活的训练选择。用户可参考官方文档 ,选择适配自己硬件条件的训练方式。一般而言,花费的内存越少,训练时间越长;花费的内存越多,训练时间越短。
Fastest
Memory efficient
ZeRO-1
ZeRO-3 + offload
ZeRO-2
ZeRO-3
ZeRO-2 + offload
ZeRO-2 + offload
ZeRO-3
ZeRO-2
ZeRO-3 + offload
ZeRO-1
此外,它还支持了以下功能:
自定义混合精度训练处理。使用类似 PyTorch AMP 的方式,也可以选择使用类似 Apex 的方式
基于 CUDA 扩展的快速优化器。主要优化器包括 Adam、AdamW、OneBitAdam 和 Lamb
将部分训练参数卸载到 CPU 主存或者 SSD 上,适合于显存空间不足的用户。详细可参考ZeRO-Offload 到 CPU 和 NVMe 这两篇论文。
deepspeed 之大模型推理
网络上使用 deepspeed 做训练的博客汗牛充栋,但使用它做推理的博客就比较少,因此我先从推理开始探索 deepspeed 的内部机制。从官网博客上 可以了解到,deepspeed 推理有如下几个特点:
deepspeed 将推理的多个算子融合为单一的算子 kernel,从而减少 kernel 间的启动开销和访问内存的延迟。与 JIT、XLA 或其他项目的算子融合相比,deepspeed 的算子融合力度更猛,它融合了element-wise、矩阵乘、转置、归约到一个 kernel。与未融合相比,以上几个部分的加速比可分别达到 1.5x, 2.9x, 3x, 1.2x
为推理定制化的 GeMM 。小 batch size 会导致维度更瘦小 GeMM 运算操作:即参与 Gemm 运算的矩阵都是权重参数比激活大得多的矩阵,并且每个参数的总计算量受批量大小的限制。此时,GeMM 的性能主要取决于从主内存读取参数所花费的时间,即内存瓶颈,而不是 GPU 的计算瓶颈。因此,为了达到最佳性能,deepspeed 对推理内核进行了微调,以最大限度地利用内存带宽来加载参数。这项优化使得 DeepSpeed 推理内核在批量大小为 1-10 的推理工作负载上,实现比 NVIDIA cuBLAS 库高出 20% 的性能。
使用通用和专用的 Transformer 内核。deepspeed 推理部分使用了两种 transformers 内核来实现前文提到的两个优化方案:
Generic Transformer:使用深度融合技术,将 Transformer 中的各个 PyTorch 操作(如 LayerNorm、Softmax 和 bias-add)替换为高度优化的 DeepSpeed 版本。
Specialized Transformer:进一步利用深度融合技术,创建了融合调度,不仅在PyTorch的宏操作符(如Softmax)内部融合微操作符,还在多个宏操作符(如 Softmax 和 LayerNorm,以及转置操作和甚至 GeMM)之间进行融合。Specialized Transformer 内核的融合结构上图所示。
但在深究这些高大上的优化原理和实现之前,我们需要先用 deepspeed 跑一个模型,以此作为一个驱动例子,带领我们一步步深入下去:
使用 deepspeed 完成模型推理
具体步骤
首先,需要进入 NGC docker,再安装 deepspeed transformers 等库:
1 2 docker run --init -it --name ${NAME} nvcr.io/nvidia/pytorch:23.03-py3 /bin/bash pip install deepspeed transformers sentencepiece mpi4py
然后,准备好模型和权重数据。敲入以下推理代码,以模型 t5-v1_1-small
为例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 from transformers import pipelinefrom transformers.models.t5.modeling_t5 import T5Blockimport osimport torchimport deepspeed local_rank = int (os.environ["LOCAL_RANK" ]) world_size = int (os.environ["WORLD_SIZE" ]) deepspeed.init_distributed() pipe = pipeline("text2text-generation" , model="google/t5-v1_1-small" , device=local_rank) pipe.model = deepspeed.init_inference( pipe.model, tensor_parallel={"tp_size" : world_size}, dtype=torch.float , injection_policy={T5Block: ('SelfAttention.o' , 'EncDecAttention.o' , 'DenseReluDense.wo' )} ) output = pipe('Input String' )
然后就是启动 deepspeed 完成推理:
1 deepspeed --num_gpus 2 inference.py
init_inference
简单地解释一下 deepspeed.init_inference
这个重要的 API。该 API 装入并初始化推理模型。第一个参数是传入的推理模型;tensor_parallel
传入推理时用的张量并行度参数,一般就是显卡的数量;第三个参数是运算类型;第四个 injection_policy
参数给那些不支持 deepspeed 内核的模型准备的,用户需要手动指定模型的“注入策略”,即 Transformer 层的两个特定线性层:1)attention output GeMM 和 2)layer output GeMM。deepspeed 内部会根据用户提供的层,增加必要的 all-reduce 通信以便将各个 GPU 上的计算结果合并起来。上面的 t5-v1_1-small
就是例子。当然不是所有的 transformers 库内的模型都可以这样做,官方文档中列出了目前支持的模型
但具体到 injection_policy
是如何确定的,又是如何实现并行的,我们还要接着往下看。但机敏的读者可以猜到,多半是根据 t5 模型源码里的 T5Block
类 内的 layer(nn.Module) 确定,实现 SelfAttention.o
, EncDecAttention.o
, DenseReluDense.wo
的计算并行。
此外,还有一些比较重要的参数,但在这个例子中没有体现,比如 replace_with_kernel_inject=True
可以将模型内的部分 kernel 替换成 deepspeed 内开发的高性能 kernel。
有时还需要完成 DeepSpeed 配置文件(通常为ds_config.json
),指定使用的推理优化策略、参数等资源等,详细的配置说明可参考官方文档 。
deepspeed 推理引擎
简单尝试过使用 deepspeed 推理模型后,接下来我们就要分析底层原理,探究更深的奥秘。我们紧接上文,从 init_inference
这个 API 出发,发现其内部简化后也就是很简单的几句 python:
1 2 3 4 def init_inference (model, config=None , **kwargs ): ds_inference_config = DeepSpeedInferenceConfig(**config_dict) engine = InferenceEngine(model, config=ds_inference_config) return engine
DeepSpeedInferenceConfig 就是配置 deepspeed 推理时的 config 数据。真正要启动的 InferenceEngine
是我们比较关心的。
InferenceEngine 之 forward
找到 InferenceEngine
实现后,首先关注一下它的前推函数(有删减):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 def forward (self, *inputs, **kwargs ): """Execute forward propagation Arguments: *inputs: Variable length input list **kwargs: variable length keyword arguments """ start = None if self.model_profile_enabled: get_accelerator().synchronize() start = time.time() if self._config.enable_cuda_graph and not self.local_cuda_graph: if self.cuda_graph_created: outputs = self._graph_replay(*inputs, **kwargs) else : self._create_cuda_graph(*inputs, **kwargs) outputs = self._graph_replay(*inputs, **kwargs) else : outputs = self.module(*inputs, **kwargs) if self.model_profile_enabled and self._config.enable_cuda_graph: get_accelerator().synchronize() duration = (time.time() - start) * 1e3 self._model_times.append(duration) return outputs
代码中提到的 cuda graph 是 cuda10 中为了加速模型计算流程而提出的优化特性,简单地说,CUDA Graphs 将整个计算流程定义为一个图,通过提供一种由单个 CPU 操作来启动图上的多个 GPU kernel 的方式减少 kernel 的启动开销。
另外从代码中看到的比较有用的东西,在这里插一句:transformers 中定义的模型基本都有 register_forward_pre_hook
和 register_forward_hook
这两个 hook 函数,可以很方便地让程序员在前推模型之前和之后插入自己的函数,方便调试或计时。
1 2 3 4 def profile_model_time (self, use_cuda_events=True ): if not self.model_profile_enabled and not self._config.enable_cuda_graph: self.module.register_forward_pre_hook(self._pre_forward_hook) self.module.register_forward_hook(self._post_forward_hook)
tensor_parallelism 与 injection_policy
三种模式
看了上小节的推理代码,你可能会想,这和一般的推理过程也没区别啊。别急,deepspeed 的优雅之处就是在于,它可以近似无伤地优化模型,但模型内部已经被 deepspeed 改动过了。就如同我们上文的 t5-v1_1-small
模型,它的很多 layer 层已经被注入高性能的 deepspeed kernel。
总的来讲,deepspeed 推理引擎支持三种模式:
用户指定的张量并行策略 user specified policy for tensor-parallelism.
高性能 kernel 注入 kernel injection (replace_with_kernel_inject)
自动化张量并行 automatic tensor parallelism if tp_size > 1.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 for client_module, injection_policy in self.injection_dict.items(): config.injection_policy_tuple = injection_policy self._apply_injection_policy(config, client_module)if config.replace_with_kernel_inject: self._apply_injection_policy(config)elif config.tensor_parallel.tp_size > 1 : parser_dict = AutoTP.tp_parser(model) for client_module, injection_policy in parser_dict: config.injection_policy_tuple = injection_policy self._apply_injection_policy(config, client_module)
明显看得出来,_apply_injection_policy
函数在支持张量并行时起到了关键作用。三种模式都离不开该函数的支持 。它获取了 injection_dict
中的数据,以前文 t5-v1_1-small
模型为例,injection_dict
就是 T5Block: ('SelfAttention.o', 'EncDecAttention.o', 'DenseReluDense.wo')
。再次回顾一下,injection_dict
中输入的线性层要求是两个特定线性层中的一种:1)attention output GeMM 和 2)layer output GeMM。
我在运行 t5-v1_1-small
模型时将它的 layer 信息打印了出来,我们可以看看到底是哪些 kernel 会被 inject:
从打印信息看到,encoder/decoder 的 attention 层计算,以及 DenseFFN 网络会被完成注入替换。
接下来的重头戏就是,我们要搞清楚它是如何完成注入替换的:
1 2 3 4 5 6 7 8 9 10 def _apply_injection_policy (self, config, client_module=None ): checkpoint_dir = config.checkpoint checkpoint = SDLoaderFactory.get_sd_loader_json(checkpoint_dir, self.checkpoint_engine) if checkpoint_dir is not None else None generic_injection(self.module, dtype=config.dtype, enable_cuda_graph=config.enable_cuda_graph) if isinstance (self.module, torch.nn.Module): replace_transformer_layer(client_module, self.module, checkpoint, config, self.config)
从上面代码可以了解到,该函数将 kernel inject 任务划分为两部分:
replace_transformer_layer
完成 nn.Module
类的替换
generic_injection
完成非 nn.Module
类的替换
多数情况下大家的模型代码继承自 nn.Module
,因此我们先来看看 replace_transformer_layer
的源码 ,看它是怎么处理 nn.Module
类的替换。
先回顾一下前文,避免被弄晕。在运行模型推理时,我们需要通过 init_inference
API 中的 injection_policy
参数将模型内的某些 layer 层替换为 deepspeed 提供的高性能层。而用户传入的类名就是 _apply_injection_policy(config, client_module)
中的 client_module
,也即是 replace_transformer_layer
的 orig_layer_impl
。用 t5-v1_1-small
为例,orig_layer_impl
就是 T5Block
,其中的 injection_policy
则是 ‘SelfAttention.o’, ‘EncDecAttention.o’, ‘DenseReluDense.wo’ 。而 model
是用户传入的整个模型。
上面列出的函数调用栈(从上往下)可以看到,用户传入的 injection_policy
字典中,其键表示要执行 inject 的类,而其值对应了如何对该类进行优化的策略 policy
:它在用户提供的 layer 层和 deepspeed 内有的高度优化的推理 transformer 层之间搭建起了一个映射。deepspeed 官方提供了不少基于 transformer 的高度优化过的推理层,如果用户提供的 layer 层恰好与这些相匹配,那么可以直接借鉴官方内部的优化策略。
下面来看一下这部分的源码(有删减):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 def replace_transformer_layer (orig_layer_impl, model, checkpoint_dict, config, model_config ): mp_replace = ReplaceWithTensorSlicing(mp_group=config.tensor_parallel.tp_group, mp_size=config.tensor_parallel.tp_size) if checkpoint_dict is not None and not config.replace_with_kernel_inject: checkpoint = checkpoint_dict["checkpoints" ] pbar = tqdm.tqdm(total=len (checkpoint), desc=f"Loading {len (checkpoint)} checkpoint shards" ) for i in range (len (checkpoint)): checkpoint_file = os.path.join(config.base_dir, checkpoint[i]) replaced_module = replace_module(model=model, orig_class=orig_layer_impl, replace_fn=replace_fn, _replace_policy=config.injection_policy_tuple, checkpoint=checkpoint_file) replaced_module = set_lm_head(replaced_module) else : replaced_module = replace_module(model=model, orig_class=orig_layer_impl, replace_fn=replace_fn, _replace_policy=config.injection_policy_tuple)
上面的源码中,最重要的就是这个函数:replace_module
。它负责扫描整个模型,找到所有是 orig_layer_impl
类的实例,然后调用 replace_fn
函数将它们替换掉,_replace_policy
则是针对该 layer 层的优化策略。
这些源码篇幅较长,不适合全部粘贴放在此处,建议读者将本博客与 deepspeed replace_module的源码 一起服用,效果更佳。
1 2 3 4 5 6 7 8 9 10 def replace_module (model, orig_class, replace_fn, _replace_policy, checkpoint=None ): """ Scan the model for instances of ``orig_clas:`` to replace using ``replace_fn``. Arguments: model (torch.nn.Module): the model to augment orig_class (torch.nn.Module): the module to search for replace_fn (method): a method to convert instances of ``orig_class`` to the desired type and return a new instance. Returns: A modified ``model``. """
总的来说,replace_module
函数的流程是这样的:
根据输入的 orig_class
类名,寻找 deepspeed 中支持的 policy,如果有 deepspeed 能支持的 policy 分割法,那么直接就使用,否则保留。
进入 _replace_module
函数,开始递归处理模型。它会遍历 model
中所有的 layer,并找到 policy 中 layer 层
对于每个找到的 layer,调用 replace_fn
函数做替换
对于不在 policy 中的 layer 层,使用自动化张量并行(AutoTP)的方式处理,并递归调用 _replace_module
,处理其子层。
那么 replace_fn
函数如何将原来的 layer 层做替换的呢?对于“高性能 kernel 注入”的模式,使用 replace_with_policy
;对于“用户指定的张量并行策略”模式,使用 replace_wo_policy
处理:
replace_wo_policy
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 def replace_fn (child, _policy, layer_id=0 , prefix="" , state_dict=None ): training = False if training: new_module = replace_with_policy(child, _policy, config.triangular_masking) else : if config.replace_with_kernel_inject: new_module = replace_with_policy(child, _policy, config.triangular_masking, inference=True , layer_id=layer_id) else : new_module = replace_wo_policy(child, _policy, prefix=prefix, state_dict=state_dict) return new_module
我们来看一下 replace_wo_policy
如何实现的,以及它是如何处理我们上面给出的例子的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 def replace_wo_policy (module, all_reduce_linears, prefix="" , state_dict=None ): _autotp = AutoTP(module, all_reduce_linears, prefix, state_dict, linear_layer_setting, orig_layer_impl) _autotp.set_tensor_parallel_config(config.tensor_parallel.tp_size, config.tensor_parallel.tp_group) num_kv_heads = _autotp.get_model_num_kv_heads(model_config) set_num_kv_heads(num_kv_heads) n_embd = None multi_query_n_embd_names = ['n_embd' ] for name in multi_query_n_embd_names: if hasattr (model_config, name): n_embd = getattr (model_config, name) if n_embd != None : break set_n_embd(n_embd) _autotp.update_linear_policies() if "lm_head" in all_reduce_linears or "embed_out" in all_reduce_linears: return _autotp._replace_last_linear_module(module) return _autotp._replace_module(module)
关于 AutoTP 是怎么实现的,这里不再赘述。但我可以简单地描述一下这里的操作。replace_wo_policy
的参数中,module
就是输入的要被替换的网络层的类名,all_reduce_linear
表示输入的 policy。仍以 t5-v1_1-small
为例,module
就是 T5Block
,其中的 all_reduce_linear
则是 ‘SelfAttention.o’, ‘EncDecAttention.o’, ‘DenseReluDense.wo’。_autotp.update_linear_policies
中会将 all_reduce_linear
中记录的 nn.linear
层用自己内部的函数做张量切分,并使用 inference_all_reduce
做运算,因此需要被替换为 LinearAllreduce
类(见下)。因为这是一个 all reduce 操作,所以取名为 all_reduce_linear
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 from deepspeed import comm as distclass LinearAllreduce (nn.Module): def __init__ (self, weight, bias=None , mp_group=None ): super (LinearAllreduce, self).__init__() self.weight = weight self.bias = bias self.mp_group = mp_group def forward (self, input ): output = torch.matmul(input , self.weight.transpose(-1 , -2 )) if self.mp_group is not None : dist.inference_all_reduce(output, group=self.mp_group) if self.bias is not None : output += self.bias return output
replace_with_policy
最后,我们来看一下 replace_with_policy
如何实现的。因为这里的网络层替换有对应的 policy 类,所以 deepspeed 会根据对应的 model,构建出相应的 policy 类和包含它的 container 类。在 container 类做好张量初始化和拆分后,准备好数据和 config 信息,再实例化一个内部的高性能 model 类,最后返回:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 def replace_with_policy (child, policy_cls, triangular_masking, inference=False , layer_id=0 ): policy = policy_cls(child, inference=inference) _container = policy_to_ds_container(policy=policy, config=config, model_config=model_config, layer_id=layer_id, child=child) _container.set_tensor_parallel_config(config.tensor_parallel.tp_size, config.tensor_parallel.tp_group) _container.initialize_tensors() if config.dtype in [torch.float16, torch.bfloat16, torch.int8]: _container.convert_to_required_dtype() quantizer = GroupQuantizer(q_int8=quantize) _container.set_quantization_config(quantizer) _container.create_ds_model_config() _container.create_module() _container.transpose() _container.apply_tensor_parallelism(mp_replace) _container.copy_data_to_new_module() global container_g if container_g is None : container_g = _container return _container.module
这里以 llama2 模型为例。首先程序会构造与 llama 对应的 LLAMA2LayerPolicy
类和 DS_LLAMA2Container
类。然后,程序开始依顺序准备模型的 TP、量化等 config,再创建内部的 llama 模型 DeepSpeedLlama2Inference
,并将张量数据切分后拷贝到新模型中,最后返回。DeepSpeedLlama2Inference
就是 deepspeed 针对 llama2 开发的高性能推理模型。
generic_injection
再顺便看看 generic_injection
怎么处理非 nn.Module
类。从 generic_injection
的源码可以看出 ,它要替换的注意力块来自于 diffusers 库 。
1 2 3 4 5 6 7 8 9 10 import diffusersif hasattr (diffusers.models.attention, 'CrossAttention' ): cross_attention = diffusers.models.attention.CrossAttentionelse : cross_attention = diffusers.models.attention_processor.Attention attention_block = diffusers.models.attention.BasicTransformerBlock new_policies = { cross_attention: replace_attn, attention_block: replace_attn_block, }
使用 deepspeed 自己实现的 DSClipEncoder
替换模型的 text_encoder
部分。
1 2 3 from ..model_implementations.transformers.clip_encoder import DSClipEncoder cg_encoder = DSClipEncoder(module.text_encoder, enable_cuda_graph=enable_cuda_graph)setattr (module, 'text_encoder' , cg_encoder)
然后就是遍历模型的所有子模块,并一一检查它们是否与先前定义的 new_policies
中的替换策略匹配:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 for name in module.__dict__.keys(): sub_module = getattr (module, name) policy = _module_match(sub_module) if policy is not None : def _replace_module (module, policy ): for name, child in module.named_children(): _replace_module(child, policy) if child.__class__ in new_policies: replaced_module = new_policies[child.__class__](child, policy) setattr (module, name, replaced_module) _replace_module(sub_module, policy) new_module = policy.apply(sub_module, enable_cuda_graph=enable_cuda_graph) print (f"**** found and replaced {name} w. {type (new_module)} " ) setattr (module, name, new_module)
对于每个匹配的子模块,使用 _replace_module
函数递归地替换其子模块。在这里,new_policies
是一个字典,其中键是类(如 CrossAttention
或 BasicTransformerBlock
),而值是一个函数:replace_attn
或者 replace_attn_block
:
1 2 3 4 5 6 7 8 def replace_attn (child, policy ): policy_attn = policy.attention(child) return attn_moduledef replace_attn_block (child, policy ): config = Diffusers2DTransformerConfig() return DeepSpeedDiffusersTransformerBlock(child, config)
该函数接受原始模块和策略作为参数,并返回替换后的模块。最后,policy
内实现的新模块完成对模型的进一步替换:
1 new_module = policy.apply(sub_module, enable_cuda_graph=enable_cuda_graph)
目前 deepspeed 支持的 replace_policy
有很多,具体见其内部实现的 replace_policy.py 文件 ,而 generic_policies
适用的有 UNetPolicy
, VAEPolicy
。它们具体实现有兴趣的读者可自己查阅代码。
总结
DeepSpeed 作为一款强大的深度学习优化库,为研究人员和开发者提供了高效的训练解决方案。网上许多极客对该框架进行了研究,但并不是很深入,多是翻译搬运,少了自己的理解和对源码的深入挖掘。接下来我还会对 deepspeed 展开详细的研究。
下一步,我会继续沿着 DeepSpeed 推理的路径出发,研究其推理的高性能的算子融合,包括其专用和通用的 transformer kernel 实现。