Huggingface|Accelerate分布式训练加速
PyTorch 很灵活。 它允许您根据需要自定义它。 这意味着您还必须处理所有低级硬件自定义,而在 95% 的项目中您实际上并不关心这些。 PyTorch 的主要痛点之一是使代码适应各种硬件配置(CPU/GPU/TPU)。 您必须维护大量用于混合精度训练、梯度累积等的模版代码。尽管一些高级库完全抽象了所有工程组件(包括训练循环),但您仍然需要熟悉它们的 API 。 您还必须学习特定的方法和函数来覆盖它们以注入您的自定义行为。 但是,如果有一个库仅抽象出多 GPU/TPU/fp16 所需的样板代码并允许您完全按原样使用原始 PyTorch 代码呢? HuggingFace Accelerate 就是专门为此目的而创建的! 在本文中,我们将了解 HuggingFace Accelerate 所提供的功能以及执行分布式训练/评估以及权重和偏差集成是多么简单。 这是我们将要介绍的内容:
你为什么要使用Huggingface Accelerate?
安装并配置Huggingface Accelerate
对比:典型pytorch训练循环
对比:Huggingface Accelerate训练循环
Accelerate类
梯度累积
梯度裁剪
分布式评估
执行过程
打印
延迟执行
保存加载模型
日志
运行分布式代码
为什么应该使用 HuggingFace Accelerate?
在继续阅读本文之前,您可能会对为什么应该首先使用 Accelerate 有疑问。 它实际上解决了什么问题? Accelerate 解决的主要问题是分布式训练。 例如,在项目开始时,您可能会在单个 GPU 上运行模型来测试某些内容,但随着项目的发展加速您的训练,您可能会觉得需要将现有代码扩展到多 GPU 系统 。 在这种情况下,您实际上可以使用完全相同的代码通过 HuggingFace Accelerate 在 CPU/GPU/多 GPU/TPU 上进行训练,而这对于纯 PyTorch 来说是不可能的。 在那里,您必须编写一堆 if-else 语句,以使您的管道足够强大,可以在任何类型的训练设置上运行。 如果您想调试 PyTorch 代码,那么在 CPU 上运行代码通常会很有帮助,因为它会产生更有意义的错误,而不是在 GPU 上。 啊,但是等等,还有更多。 以下列出了使用 Accelerate 的其他一些优势:
您可以删除处理不同训练设置(CPU/GPU/TPU)所需的样板代码。
您还可以使用相同的代码在 CPU、GPU、多 GPU 和多节点上进行训练。
这是进行分布式评估的便捷方法。
允许您删除混合精度和梯度累积所需的样板。
增强分布式系统中的日志记录和跟踪。
可以方便地保存分布式系统中的训练状态。
完全分片的数据并行训练。
集成DeepSpeed。
集成各种实验跟踪器(ex: Weights & Biases),以便方便地记录分布式系统。
附带一个方便的 CLI 命令,用于启动分布式训练。
在 Jupyter Notebook 中启动分布式训练的便捷功能。
我们将在本文中一一研究这些功能。
安装和配置 HuggingFace Accelerate
当然,在使用 HuggingFace Accelerate 之前,您必须安装它。 您可以通过 pip 或 conda 来完成:
pip install accelerate
OR
conda install -c conda-forge accelerate
Accelerate 是一个快速更新的库,每天都会添加新功能。 我更喜欢从 GitHub 存储库安装它以使用尚未发布的功能。 您可以通过在终端中运行以下命令来执行此操作:
pip install git+https://github.com/huggingface/accelerate
安装 Accelerate 后,您应该为当前系统配置它。 为此,请运行以下命令并回答提示的问题:
accelerate config
如果您不想配置这些文件,accelerate config default
可以快速完成对accelerate的配置。
完成后,要检查您的配置是否正常,您可以运行:
accelerate env
下面是一个示例输出,它描述了一台机器上使用混合精度的两个 GPU。
- `Accelerate` version: 0.11.0.dev0
- Platform: Linux-5.10.0-15-cloud-amd64-x86_64-with-debian-11.3
- Python version: 3.7.12
- Numpy version: 1.19.5
- PyTorch version (GPU?): 1.12.0+cu102 (True)
- `Accelerate` default config:
- compute_environment: LOCAL_MACHINE
- distributed_type: MULTI_GPU
- mixed_precision: fp16
- use_cpu: False
- num_processes: 2
- machine_rank: 0
- num_machines: 1
- main_process_ip: None
- main_process_port: None
- main_training_function: main
- deepspeed_config: {}
- fsdp_config: {}
比较与对比:典型的 PyTorch 训练循环
以下是您必须熟悉的基本 PyTorch 训练循环:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
for batch in training_dataloader:
optimizer.zero_grad()
inputs, targets = batch
inputs = inputs.to(device)
targets = targets.to(device)
outputs = model(inputs)
loss = loss_function(outputs, targets)
optimizer.step()
scheduler.step()
这是一个基本的训练循环,只能在 CPU 或单个 GPU 上运行(而且,它不支持混合精度和梯度累积等现代技术)。 为了实现分布式训练和fp16/梯度累积,你需要添加一堆if-else语句,这使得代码难以维护并且容易出错。 接下来,您将看到 Accelerate 如何让您无缝集成多 GPU/TPU/多节点训练,同时只需几行额外代码即可支持混合精度和梯度累积。
比较和对比:HuggingFace 加速训练循环
from accelerate import Accelerator
accelerator = Accelerator()
model, optimizer, training_dataloader, scheduler = accelerator.prepare(
model, optimizer, training_dataloader, scheduler
)
for batch in training_dataloader:
optimizer.zero_grad()
inputs, targets = batch
outputs = model(inputs)
loss = loss_function(outputs, targets)
accelerator.backward(loss)
optimizer.step()
scheduler.step()
上面的训练循环能够在CPU/GPU/多GPU/TPU/多节点上运行。 请注意,只需很少的改变即可将现有的原始 PyTorch 代码转换为更强大的形式,可以轻松扩展到您想要的任何硬件! 让我们更详细地看看上面的训练循环。 首先,我们导入 Accelerator 主类并实例化它:
from accelerate import Accelerator
accelerator = Accelerator()
注意:Accelerator 类应该在脚本开始时实例化,或者尽早实例化,以便在整个脚本中使用 Accelerator 提供的方便的函数/方法。
您应该删除所有现有的 .cuda() 或 .to(device) 调用。 加速器对象将自动为您处理此问题,并将这些对象放置在正确的设备上。 如果您出于某种原因想要停用自动设备放置并且想要自己执行此操作,则可以通过在初始化 Accelerator 类时传递 device_placement=False 来停用它。 接下来,您需要将模型、优化器、训练/验证数据加载器和学习率调度程序传递给 Accelerator.prepare() 方法。 对象提供给prepare() 方法的顺序并不重要——重要的是它们按照传递的顺序解包。 这将为训练做好一切准备。 例如,HuggingFace Accelerate 会将数据加载器分片到所有可用的 GPU/TPU 核心上,以便每个核心看到训练数据集的不同部分。 此外,所有进程的随机状态将在每次迭代开始时同步。
实际批量大小将是所使用的设备数量乘以您在脚本中设置的批量大小。 例如:在创建训练数据加载器时使用 4 个 GPU 进行训练,批量大小为 16,这将有效地在实际批量大小为 16*4 = 64 的情况下进行训练。
仅当调度器(scheduler)需要在每个优化器步骤中逐步执行时,才应将学习率调度程序传递给prepare()。
此外,如果您正在分布式设置上进行训练,您的训练数据加载器长度可能会发生变化。 因此,任何使用训练数据加载器长度的指令(例如:如果您想记录训练步骤的总数)都应该在prepare()方法之后调用。 您可能会注意到,Accelerator 是将完整的 HuggingFace Accelerate 框架绑定在一起的主类。 事实上,让我们详细看看。
Accelerate类
Accelerator 类是这个完整框架的关键,它还有一些有用的方法,您将在本文的后续部分中看到。 Accelerate 类在实例化时采用的最重要的参数如下所述:
device_placement
:如果您希望 Accelerate 自动将对象放置在适当的设备上,请设置为 True。 理想情况下,应该打开此功能。 您可以将其关闭以手动放置对象。split_batches
:如果设置为 True,则批次将跨设备拆分。 例如:如果您在启动数据加载器时在 4-GPU 机器上进行训练,批量大小为1有4个数据的数据集,则每个 GPU 上的实际批量大小将为 4/4 = 1。如果设置为 False,则有效 所有 GPU 的批量大小将为 4*4 = 16。理想情况下,应将其设置为 False。mix_ precision
: Accelerate 自动处理混合精度逻辑,您无需编写 if-else 语句即可在混合精度和全精度之间切换。 传递“no”以禁用混合精度。 要启用混合精度,只需传递“fp16”即可。 Accelerate 还支持 bf16(通过“bf16”来启用它)。gradient_accumulation_steps
: Accelerate 还会自动处理梯度累积逻辑,减少大量样板代码。 只需传递梯度累积步骤的数量,Accelerate 将通过上下文管理器(with accelerator.accumulate)完成其余的工作,正如您将在本文后面看到的那样。cpu
:如果为 True,则即使 GPU 可用,也会强制在 CPU 上进行训练。 对于调试目的很有用。log_with
:用于记录的实验跟踪器。 要使用W&B ,请传递 wandb,您就可以使用 W&B 进行实验跟踪。Accelerate
的方法还有很多,这里不可能一一涵盖。 官方文档非常好,您可以查看参数。
进行梯度累积
如果您需要训练更大的批量大小但 GPU 内存有限,梯度累积是一个不错的策略。 梯度累积通过累积指定步数的梯度来模拟更大的批量大小。 要在 HuggingFace Accelerate 中使用梯度累积,您只需在启动 Accelerator 类时将gradient_accumulation_steps 传递到所需的数字,并将训练循环包装在accumulate() 上下文管理器中。 这是启用梯度累积的 HuggingFace Accelerate 训练循环:
from accelerate import Accelerator
accelerator = Accelerator(gradient_accumulation_steps=2)
model, optimizer, training_dataloader, scheduler = accelerator.prepare(
model, optimizer, training_dataloader, scheduler
)
for batch in training_dataloader:
# accumulate context manager
with accelerate.accumulate(model): # you can also use accelerator.accumulate
optimizer.zero_grad()
inputs, targets = batch
outputs = model(inputs)
loss = loss_function(outputs, targets)
accelerator.backward(loss)
optimizer.step()
scheduler.step()
执行梯度裁剪
梯度裁剪是一种有用的技术,可以避免神经网络中梯度爆炸问题。 如果您要执行混合精度的梯度裁剪,则应首先取消缩放梯度。 以下是 PyTorch 文档中的声明:
所有由scaler.scale(loss).backward()产生的梯度都会被缩放。 如果你想修改或检查backward()和scaler.step(optimizer)之间参数的.grad属性,你应该首先取消缩放它们。 例如,梯度裁剪操作一组梯度,使其全局范数(参见 torch.nn.utils.clip_grad_norm_())或最大幅度(参见 torch.nn.utils.clip_grad_value_())<=某个用户施加的阈值 。 如果您尝试在不取消缩放的情况下进行clip,则梯度的范数/最大幅度也会缩放,因此您请求的阈值(这意味着未缩放梯度的阈值)将无效。
unscale_ 只能在每个优化器的每个step调用中调用一次,并且只能在该优化器分配的参数的所有梯度都已累积之后调用。 在每个step之间对给定优化器调用 unscale_ 两次会触发运行时错误。
纯 PyTorch 中混合精度和梯度裁剪的训练循环如下所示:
scaler = GradScaler()
for epoch in epochs:
for input, target in data:
optimizer.zero_grad()
with autocast():
output = model(input)
loss = loss_fn(output, target)
scaler.scale(loss).backward()
# Unscales the gradients of optimizer's assigned params in-place
scaler.unscale_(optimizer)
# Since the gradients of optimizer's assigned params are unscaled, clips as usual:
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm)
# optimizer's gradients are already unscaled, so scaler.step does not unscale them,
# although it still skips optimizer.step() if the gradients contain infs or NaNs.
scaler.step(optimizer)
# Updates the scale for next iteration.
scaler.update()
正如您所看到的,在我们的训练循环中同时使用混合精度、梯度裁剪和梯度累积可能会导致大量的样板代码。 结果,代码维护变得更加困难。 幸运的是,使用 HuggingFace Accelerate 可以更有效地完成梯度裁剪:
from accelerate import Accelerator
max_grad_norm = 1.0
accelerator = Accelerator(mixed_precision="fp16", gradient_accumulation_steps=2)
model, optimizer, training_dataloader, scheduler = accelerator.prepare(
model, optimizer, training_dataloader, scheduler
)
for batch in training_dataloader:
# accumulate context manager (for gradient accumulation)
with accelerate.accumulate(model):
optimizer.zero_grad()
inputs, targets = batch
outputs = model(inputs)
loss = loss_function(outputs, targets)
accelerator.backward(loss)
# gradient clipping
if accelerator.sync_gradients:
accelerator.clip_grad_norm_(model.parameters(), max_grad_norm)
optimizer.step()
scheduler.step()
上面的训练循环执行混合精度训练、梯度累积和梯度裁剪。 请注意,与纯 PyTorch 训练循环相比,训练循环看起来多么干净。 让我们深入研究一下梯度裁剪部分发生了什么: Accelerator.sync_gradients
检查梯度当前是否在所有进程之间同步。 您应该使用 Accelerator.clip_grad_norm_
而不是 torch.nn.utils.clip_grad_norm_
。 在幕后,Accelerate 的 Clip_grad_norm_
在剪切梯度之前执行梯度缩放。 您可以查看源代码,并查看上述所讨论概念的问题。
执行分布式评估
如果您以前曾经尝试过使用纯 PyTorch 执行分布式评估,那么您就会知道它有多么具有挑战性。 HuggingFace Accelerate 提供了一种尽可能轻松地执行分布式评估的便捷方法。 您可以使用 HuggingFace Accelerate 的 Gather_for_metrics()
方法从所有进程中收集所有预测和标签以计算指标。 此外,gather_for_metrics()
会删除最后一批中的重复项,因为数据集末尾的某些数据可能会重复,以便批次可以在所有workers之间平均分配。 Gather_for_metrics() 会在收集时自动删除重复的数据,以便您的指标计算正确。 这是演示分布式评估的简短代码片段。
for inputs, targets in validation_dataloader:
predictions = model(inputs)
# Gather all predictions and targets
all_predictions, all_targets = accelerator.gather_for_metrics((predictions, targets))
metrics = calculate_metrics(all_predictions, all_targets)
如果您不想执行分布式评估而只想执行分布式训练,那么您可以将验证数据加载器保留在prepare()方法之外。
执行流程
HuggingFace Accelerate 还提供了一些方便的方法来在分布式系统上执行您可能喜欢的流程。 以下大部分内容取自此处的 Accelerate 文档。
在每个server上执行
如果您使用多个server并且希望某些内容在每个server(节点)上执行一次,那么您可以使用 is_local_main_process
。
if accelerator.is_local_main_process:
do_thing_once_per_server()
大多数加速器方法也有一个可供您使用的装饰器对应项。 例如,您可以使用 on_local_main_process()
装饰器包装函数,以在函数执行时实现相同的行为:
@accelerator.on_local_main_process
def do_my_thing():
"Something done once per server"
do_thing_once_per_server()
仅在一个server执行
对于只应在所有servers上执行一次的语句(即多节点情况下只有一个节点上的一个进程是主进程),请使用 is_main_process
:
if accelerator.is_main_process:
do_thing_once()
同样,您可以使用装饰器对应部分来包装函数的执行:
@accelerator.on_main_process
def do_my_thing():
"Something done once per server"
do_thing_once()
关于具体流程 如果一个函数应该在特定的硬件或本地进程index上运行,则有类似的装饰器可以实现此目的:
@accelerator.on_local_process(local_process_idx=0)
def do_my_thing():
"Something done on process index 0 on each server"
do_thing_on_index_zero_on_each_server()
@accelerator.on_process(process_index=0)
def do_my_thing():
"Something done on process index 0"
do_thing_on_index_zero()
打印
在每个进程上打印并不是一个好主意,因为它会堵塞控制台日志并使它们无法读取。 为了每个进程只打印一次,HuggingFace Accelerate 有它的 print 方法,这是一种打印东西的便捷方法。 用 Accelerate 的打印方法替换常用的打印。 在底层,它只是检查进程是否是本地主进程。
accelerator.print("My thing I want to print!")
等待(Deferring)执行
有时您可能需要推迟(推迟)一些执行。 当您运行 Python 脚本时,指令会按顺序执行。 当您处于分布式设置(即在多个 GPU 上运行脚本)时,每个进程(或 GPU)将按顺序执行所有指令。 某些进程可能比其他进程更快地执行指令。 您可能需要等待所有进程到达某个点,然后再执行进一步的指令。 例如,在保存模型之前,您应该确保所有进程都已执行指令(即所有进程都应该完成训练)。 要等待所有进程到达脚本中的某个点,您可以在该特定点使用 Accelerate 的 wait_for_everyone()
。
accelerator.wait_for_everyone()
该指令将阻塞所有首先到达的进程,直到所有其他进程都到达该点。
保存和加载状态
如果您想在脚本开始时保存在prepare()方法中传递的任何对象/模型,您应该使用unwrap_model()
来删除在分布式过程中添加的所有特殊模型包装器。 您应该使用 Accelerate 的 save()
而不是 torch.save()
。 在幕后,Accelerate 的 save()
方法为每台机器或服务器保存一次对象。 您可以查看源代码。 此外,在使用 wait_for_everyone()
保存模型之前,暂停已完成的进程直到所有进程完成(如上一节所述)也很有用。 这是一个严格遵循上述要点的简短示例。
model = MyModel()
model = accelerator.prepare(model)
accelerator.wait_for_everyone()
# Unwrap
model = accelerator.unwrap_model(model)
state_dict = model.state_dict()
# Use accelerator.save()
accelerator.save(state_dict, "my_state.pkl")
您可能经常想保存并在之后继续训练状态。 这样做需要保存和加载模型、优化器、RNG 生成器和 GradScaler。 HuggingFace Accelerate 内部有两个方便的功能可以快速实现这一目标:
使用
save_state()
将上述所有内容保存到文件夹位置使用
load_state()
加载从早期save_state
存储的所有内容
您还可以通过 register_for_checkpointing()
方法注册自定义对象来保存它们。 只要对象具有 state_dict
和 load_state_dict
功能并且已注册检查点,HuggingFace Accelerate 就可以使用上述两种方法保存和加载任何对象。 这是一个在训练期间使用检查点保存和重新加载状态的示例(从 HuggingFace Accelerate 文档获取和修改)
from accelerate import Accelerator
import torch
accelerator = Accelerator()
my_scheduler = torch.optim.lr_scheduler.StepLR(my_optimizer, step_size=1, gamma=0.99)
my_model, my_optimizer, my_training_dataloader = accelerate.prepare(my_model, my_optimizer, my_training_dataloader)
# Register the LR scheduler
accelerate.register_for_checkpointing(my_scheduler)
# Save the starting state
accelerate.save_state("my/save/path")
# Perform training
# training loop here ...
# Restore previous state
accelerate.load_state("my/save/path")
记录
HuggingFace Accelerate 有自己的日志记录实用程序来处理分布式系统中的日志记录。 您应该使用 Accelerate 的日志记录实用程序替换标准 Python 日志记录模块。 这是一个简短的例子:
from accelerate.logging import get_logger
logger = get_logger(__name__)
# logs on all processes
logger.info("My log", main_process_only=False)
# logs only on main process
logger.debug("My log", main_process_only=True)
使用Weights & Biases进行实验跟踪
在分布式设置中使用实验跟踪器可能有点复杂,但 HuggingFace Accelerate 使我们变得相当容易。 要将Weights & Biases与 HuggingFace Accelerate 结合使用,您应该在启动 Accelerator 类时首先将 wandb 传递给 log_with
参数。
from accelerate import Accelerator
accelerator = Accelerator(log_with="wandb")
在实验开始时,应使用 Accelerator.init_trackers()
来设置您的项目。 init_trackers()
接受以下参数。
项目名称:项目的名称。 这将被传递到后台的 wandb.init() 项目参数中。
config:要记录的配置。 传递到 wandb.init() 的配置参数中。
init_kwargs:要传递给特定跟踪器的
__init__
函数的嵌套 kwargs 字典。 您可以传递wandb.init()
在此参数中作为键值对的任何其他参数。
以下是如何使用 HuggingFace Accelerate 初始化 W&B 运行的示例。
from accelerate import Accelerator
accelerator = Accelerator()
hps = {"num_epochs": 5, "learning_rate": 1e-4, "batch_size": 16}
accelerator.init_trackers(
"my_project",
config=hps,
init_kwargs={
"wandb": {
"notes": "testing accelerate pipeline",
"tags": ["tag_a", "tag_b"],
"entity": "gladiator",
}
},
)
初始化 W&B 跟踪后,您现在可以使用 Accelerate 的 log()
方法记录任何数据,就像之前使用 wandb.log()
所做的那样。 您还可以传递当前step编号,将记录的数据与训练循环中的特定步骤相关联。
accelerator.log({"train_loss": 1.12, "valid_loss": 0.8}, step=1)
完成训练后,请确保运行 Accelerator.end_training()
,以便所有跟踪器都可以运行其完成功能(如果有)。 这类似于调用 wandb.finish()
来完成运行并上传所有数据。
accelerator.end_training()
如果您想了解 HuggingFace Accelerate 在幕后的作用,可以查看WandBTracker 类。
启动分布式代码
现在您已经了解了如何使用 HuggingFace Accelerate 来训练分布式设置,现在是时候启动我们已调整用于训练分布式设置的实际代码了。 第一步是将所有代码包装到 main() 函数中。
from accelerate import Accelerator
def main():
accelerator = Accelerator()
model, optimizer, training_dataloader, scheduler = accelerator.prepare(
model, optimizer, training_dataloader, scheduler
)
for batch in training_dataloader:
optimizer.zero_grad()
inputs, targets = batch
outputs = model(inputs)
loss = loss_function(outputs, targets)
accelerator.backward(loss)
optimizer.step()
scheduler.step()
if __name__ == "__main__":
main()
您也可以将其他中间函数包装在主函数中:
def main():
function_which_does_data_processing()
function_which_does_training()
function_which_does_evaluation()
if __name__ == "__main__":
main()
接下来,您需要通过 accelerate launch
来启动它。
建议您在使用加速启动之前运行accelerate config来根据您的喜好配置环境。 否则, Accelerate 将根据您的系统设置使用非常基本的默认值。
HuggingFace Accelerate 有一个特殊的 CLI 命令,可帮助您通过加速启动在系统中启动代码。 该命令包含在各种平台上启动脚本所需的所有不同命令,而您不必记住每个命令是什么。 您可以使用以下命令快速启动脚本:
accelerate launch {script_name.py} --arg1 --arg2 ...
只需将 accelerate launch
放在命令的开头,然后像平常一样将其他参数传递给脚本即可!
由于这会运行各种torch生成方法,因此也可以在此处修改所有预期的环境变量。 例如,以下是如何使用单个 GPU 加速启动:
CUDA_VISIBLE_DEVICES="0" accelerate launch {script_name.py} --arg1 --arg2 ...