torch.distributed.init_process_group()详细说明(RANK/WORLD_SIZE)


Vincent
发布于 2024-08-03 / 25 阅读 / 0 评论 /
torch.distributed.init_process_group()详细说明(RANK/WORLD_SIZE) 一、torch.distributed.init_process_group函数定义 torch.distributed.init_process_group( back

torch.distributed.init_process_group()详细说明(RANK/WORLD_SIZE)

一、torch.distributed.init_process_group函数定义

torch.distributed.init_process_group(

    backend,

    init_method=None,

    timeout=datetime.timedelta(seconds=1800),

    world_size=-1,

    rank=-1,

    store=None,

    group_name='default',

    **kwargs

)

参数说明:

  • backend:指定分布式后端的名称,例如 ‘nccl’、‘gloo’ 或 ‘mpi’。

  • init_method:初始化方法的 URL 或文件路径。默认为 None,表示使用默认的初始化方法。

  • timeout:初始化过程的超时时间,默认为 1800 秒。

  • world_size:参与分布式训练的总进程数。默认为 -1,表示从环境变量中自动获取。

  • rank:当前进程的排名。默认为 -1,表示从环境变量中自动获取。

  • store:用于存储进程组信息的存储对象。默认为 None,表示使用默认存储。

  • group_name:进程组的名称,默认为 ‘default’。

  • **kwargs:其他可选参数,根据不同的分布式后端而定。

二、RANK、WORLD_SIZE 和 LOCAL_RANK

在分布式训练中,RANK、WORLD_SIZE 和 LOCAL_RANK 都是用于标识进程的环境变量,但它们的含义略有不同:

1、RANK说明

RANK 表示当前进程在所有进程中的排名。例如,如果有 4 个进程,它们的 RANK 分别为 0、1、2 和 3。在分布式训练中,我们通常需要使用 RANK 来决定当前进程的角色和任务,例如是否是主进程、是否需要保存模型等。

2、WORLD_SIZE说明

WORLD_SIZE 表示所有进程的总数。例如,如果有 4 个进程,它们的 WORLD_SIZE 均为 4。在分布式训练中,我们通常需要使用 WORLD_SIZE 来决定数据并行的方式和分配任务的方式等。

3、LOCAL_RANK说明

LOCAL_RANK 表示当前进程在同一台计算机上的排名。例如,如果有 4 个进程,其中 2 个运行在计算机 A 上,另外 2 个运行在计算机 B 上,那么在计算机 A 上运行的两个进程的 LOCAL_RANK 分别为 0 和 1,在计算机 B 上运行的两个进程的 LOCAL_RANK 分别为 0 和 1。在分布式训练中,我们通常需要使用 LOCAL_RANK 来决定如何分配 GPU 设备和数据等。

在分布式训练中,这三个环境变量通常需要在所有进程中保持一致,并且需要在初始化分布式训练环境时设置。例如,在 PyTorch 中,可以使用 torch.distributed.init_process_group() 函数来初始化分布式训练环境,并自动设置这三个环境变量。

三、环境变量与应用

我将在这里介绍只有2个进程的world_size与rank的使用方法,在启动2个进程前,我先介绍1个进程的环境配置等方法,特别是结合torch的init_process_group搭配使用方式。

但使用分布式环境前,都需要在os.environ系统环境变量声明ip与port端口,如下:

# 设置主进程的 IP 地址和端口号

os.environ['MASTER_ADDR'] = '127.0.0.1'

os.environ['MASTER_PORT'] = '29501'

当然,你也可以执行代码借助命令给定,我这里是在py文件中直接配置。

1、使用系统环境配置

直接使用os.environ系统环境变量指定rank与world_size,init_process_group能自动获取系统配置,我在这里rank=0,world_size=1分别表示进程rank0与共1个进程,如下:

os.environ['RANK'] = '0'

os.environ['WORLD_SIZE'] = '1'

# 初始化分布式训练环境

dist.init_process_group(backend='nccl')

完整的示列代码如下:

import torch.distributed as dist

import os

# 设置主进程的 IP 地址和端口号

os.environ['MASTER_ADDR'] = '127.0.0.1'

os.environ['MASTER_PORT'] = '29501'

os.environ['RANK'] = '0'

os.environ['WORLD_SIZE'] = '1'

dist.init_process_group(backend='nccl') # 初始化分布式训练环境

# 获取当前进程的排名和总进程数

rank = dist.get_rank()

world_size = dist.get_world_size()

print(f"Rank: {rank}, World size: {world_size}") # 在分布式训练中使用排名和总进程数

# 执行分布式训练代码

# ...

dist.destroy_process_group()  # 释放资源

2、init_process_group直接配置

你也可以不使用os.environ系统环境变量指定rank与world_size,直接使用init_process_group指定,效果和上面一致,如下:

dist.init_process_group(backend='nccl',rank=0,world_size=1)  # 初始化分布式训练环境

完整的示列代码如下:

import torch.distributed as dist

import os

# 设置主进程的 IP 地址和端口号

os.environ['MASTER_ADDR'] = '127.0.0.1'

os.environ['MASTER_PORT'] = '29501'

dist.init_process_group(backend='nccl',rank=0,world_size=1)  # 初始化分布式训练环境

# 获取当前进程的排名和总进程数

rank = dist.get_rank()

world_size = dist.get_world_size()

print(f"Rank: {rank}, World size: {world_size}") # 在分布式训练中使用排名和总进程数

# 执行分布式训练代码

# ...

dist.destroy_process_group()  # 释放资源

3、多个进程应用(world_size=2)

我们继续设置2个进程world_size=2,则使用2个rank分别运行各自子进程,每个子进程我们会有一个py文件代码,主要更改地方为rank与world_size值,其中2个py文件分别为

try_rank0.py与try_rank1.py,其代码如下:

try_rank0.py:

import torch.distributed as dist

import os

# 设置主进程的 IP 地址和端口号

os.environ['MASTER_ADDR'] = '127.0.0.1'

os.environ['MASTER_PORT'] = '29501'

dist.init_process_group(backend='nccl', rank=0, world_size=2)  # 初始化分布式训练环境

# 获取当前进程的排名和总进程数

rank = dist.get_rank()

world_size = dist.get_world_size()

print(f"Rank: {rank}, World size: {world_size}") # 在分布式训练中使用排名和总进程数

# 执行分布式训练代码

# ...

dist.destroy_process_group()  # 释放资源

try_rank1.py:

import torch.distributed as dist

import os

# 设置主进程的 IP 地址和端口号

os.environ['MASTER_ADDR'] = '127.0.0.1'

os.environ['MASTER_PORT'] = '29501'

# 设置当前进程的局部排名(local_rank)

os.environ['RANK'] = '1'

os.environ['WORLD_SIZE'] = '2'

dist.init_process_group(backend='nccl') # 初始化分布式训练环境

# 获取当前进程的排名和总进程数

rank = dist.get_rank()

world_size = dist.get_world_size()

# 在分布式训练中使用排名和总进程数

print(f"Rank: {rank}, World size: {world_size}")

# 执行分布式训练代码

# ...

dist.destroy_process_group()  # 释放资源

需先执行运行主节点try_rank0.py文件,在执行try_rank1.py文件。

四、模型应用

这里,我们首先调用 dist.init_process_group() 来初始化分布式训练环境。然后定义了一个简单的线性模型,并使用随机生成的数据进行训练。最后,我们调用 dist.destroy_process_group() 来释放资源。

请注意,这只是一个简单的示例,实际使用中可能需要更复杂的模型和数据。同时,请确保在每个进程中都正确地调用了 torch.distributed.init_process_group(),并根据实际情况设置其他参数。

import torch

import torch.distributed as dist

import torch.nn as nn

import torch.optim as optim

import os

# 设置环境变量 MASTER_ADDR 和 MASTER_PORT

os.environ['MASTER_ADDR'] = '127.0.0.1'

os.environ['MASTER_PORT'] = '29505'

# 设置环境变量 RANK

os.environ['RANK'] = '0'

os.environ['WORLD_SIZE'] = '2'

# 初始化分布式训练环境,尽管系统环境给了world_size为2,但调用给的是1,不冲突

dist.init_process_group(backend='nccl', rank=0,world_size=1)

# 定义线性模型

class LinearModel(nn.Module):

    def init(self):

        super(LinearModel, self).__init__()

        self.linear = nn.Linear(1, 1)

    def forward(self, x):

        return self.linear(x)

# 创建模型实例

model = LinearModel()

# 定义损失函数和优化器

criterion = nn.MSELoss()

optimizer = optim.SGD(model.parameters(), lr=0.01)

model = model.to(torch.device('cuda'))

# 生成随机数据

x = torch.randn(100, 1)

y = 3  x + 2 + torch.randn(100, 1)  0.1

# 将数据分发到各个进程

x = x.to(torch.device('cuda'))

y = y.to(torch.device('cuda'))

# 在每个进程上进行训练

for epoch in range(10):

    optimizer.zero_grad()

    outputs = model(x)

    loss = criterion(outputs, y)

    loss.backward()

    optimizer.step()

    print(loss)

# 释放资源

dist.destroy_process_group()