Setting up distributed training of models
Distributed training is necessary when a model or data doesn't fit in the memory of a single GPU, or when training needs to be accelerated by using multiple devices in parallel. Several parallelism strategies exist, and the choice between them determines the system architecture.
Parallelism strategies
Data Parallelism – each GPU contains a copy of the entire model and processes different parts of the batch. Gradients are aggregated (all-reduce) after each step. Suitable for models that fit in the memory of a single GPU.
Model Parallelism (Tensor Parallelism) – the model is split across layers or tensors across GPUs. This is necessary when the model is too large for a single GPU. Used in Megatron-LM and DeepSpeed.
Pipeline Parallelism – model layers are distributed sequentially across GPUs. Different GPUs process different micro-batches simultaneously. Used in GPipe and PipeDream.
3D Parallelism is a combination of all three strategies. DeepSpeed and Megatron-LM are used to train LLMs with hundreds of billions of parameters.
Data Parallel with PyTorch DDP
DistributedDataParallel (DDP) is the recommended approach for data parallelism in PyTorch:
import torch
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
def setup(rank, world_size):
dist.init_process_group(
backend='nccl', # nccl для GPU, gloo для CPU
rank=rank,
world_size=world_size
)
torch.cuda.set_device(rank)
def train(rank, world_size, model, dataset):
setup(rank, world_size)
model = model.to(rank)
ddp_model = DDP(model, device_ids=[rank])
sampler = DistributedSampler(dataset, num_replicas=world_size, rank=rank)
loader = DataLoader(dataset, sampler=sampler, batch_size=32)
optimizer = torch.optim.AdamW(ddp_model.parameters(), lr=1e-4)
for epoch in range(num_epochs):
sampler.set_epoch(epoch) # Важно для перемешивания
for batch in loader:
optimizer.zero_grad()
loss = ddp_model(batch)
loss.backward() # all-reduce автоматически
optimizer.step()
Running on a single node (8 GPUs):
torchrun --nproc_per_node=8 train.py
Running on multiple nodes:
# На узле 0 (master):
torchrun --nnodes=4 --nproc_per_node=8 \
--node_rank=0 \
--master_addr="10.0.0.1" --master_port=29500 \
train.py
# На узлах 1-3 (worker):
torchrun --nnodes=4 --nproc_per_node=8 \
--node_rank=1 \ # 2, 3 соответственно
--master_addr="10.0.0.1" --master_port=29500 \
train.py
Accelerate by Hugging Face
For a simpler setup with support for mixed precision, gradient accumulation, and various distributed backends:
from accelerate import Accelerator
accelerator = Accelerate(
mixed_precision='bf16',
gradient_accumulation_steps=4
)
model, optimizer, train_dataloader = accelerator.prepare(
model, optimizer, train_dataloader
)
for batch in train_dataloader:
with accelerator.accumulate(model):
outputs = model(**batch)
loss = outputs.loss
accelerator.backward(loss)
optimizer.step()
optimizer.zero_grad()
Automatic strategy selection
| Model size | Recommended strategy |
|---|---|
| < 1B parameters | DDP (Data Parallel) |
| 1B - 10B parameters | DDP + ZeRO-2/3 (DeepSpeed) |
| 10B - 100B parameters | Tensor + Pipeline Parallel (Megatron) |
| > 100B parameters | 3D Parallelism (DeepSpeed + Megatron) |
Scaling efficiency and optimization
Key metrics: GPU utilization (target > 85%), MFU (Model FLOPS Utilization). Common bottlenecks:
- IO-bound: data reading is slower than the GPU can process. Solution: prefetch, increasing num_workers, NVMe storage.
- Communication-bound: all-reduce takes too long. Solution: gradient compression, increasing batch size.
- Memory-bound: gradient checkpointing, mixed precision (BF16/FP16), activation offloading.
On an 8x A100 80GB cluster with NVLink, when training a 7B parameter model (DDP + ZeRO-2), an MFU of about 40-50% is achieved - a typical figure for a well-tuned setup.







