Skip to content

[DTensor][FSDP2][DDP] benchmark dtensor cpu overhead for adam optimizer #159169

@weifengpy

Description

@weifengpy

🚀 The feature, motivation and pitch

For production, model.parameters() can be 3000. Some params are DTensor because of FSDP2, the other are plain tensors because of DDP

creating this script to generate profiler traces to illsutrate the cpu overhead for DTensor + adam optimizer

Image
# python benchmark_dtensor_cpu_overhead.py

import contextlib
import os

import torch
import torch.nn as nn
from torch.distributed.fsdp import fully_shard
from torch.distributed._composable import replicate
from torch.distributed.tensor.experimental import implicit_replication


import torch.multiprocessing as mp


@contextlib.contextmanager
def enable_profiling(enable=False):
    if not enable:
        torch_profiler = contextlib.nullcontext()
        yield None
    else:
        trace_dir = "./profilers"
        rank = torch.distributed.get_rank() if torch.distributed.is_initialized() else 0

        def trace_handler(prof):
            curr_trace_dir_name = "iteration_" + str(prof.step_num)
            curr_trace_dir = os.path.join(trace_dir, curr_trace_dir_name)
            if not os.path.exists(curr_trace_dir):
                os.makedirs(curr_trace_dir, exist_ok=True)
            prof.export_chrome_trace(f"{curr_trace_dir}/rank{rank}_trace.json")

        if not os.path.exists(trace_dir):
            os.makedirs(trace_dir, exist_ok=True)
        warmup, active = 1, 2
        wait = 1
        with torch.profiler.profile(
            activities=[
                torch.profiler.ProfilerActivity.CPU,
                torch.profiler.ProfilerActivity.CUDA,
            ],
            schedule=torch.profiler.schedule(wait=wait, warmup=warmup, active=active),
            on_trace_ready=trace_handler,
            record_shapes=True,
        ) as torch_profiler:
            yield torch_profiler


def run(rank, world_size) -> None:
    os.environ["MASTER_ADDR"] = "127.0.0.1"
    os.environ["MASTER_PORT"] = "29500"
    torch.cuda.set_device(rank)
    torch.distributed.init_process_group(backend="nccl", rank=rank, world_size=world_size)
    rank = torch.distributed.get_rank()

    model = nn.Sequential(*[nn.Linear(512, 512, device="cuda") for _ in range(3000)])
    inp = torch.rand(512, 512, device="cuda")
    for i, layer in enumerate(model):
        if i % 2 == 0:
            fully_shard(layer)
        else:
            replicate(layer)
    optim = torch.optim.Adam(model.parameters(), lr=1e-2)
    with enable_profiling(True) as prof:
        for _ in range(10):
            loss = model(inp).sum()
            loss.backward()
            with implicit_replication():
                optim.step()
            optim.zero_grad()
            prof.step()

    torch.distributed.destroy_process_group()


def main() -> None:
    world_size = 4
    mp.spawn(run, args=(world_size,), nprocs=world_size, join=True)

if __name__ == "__main__":
    main()

Alternatives

No response

Additional context

No response

cc @robieta @chaekit @guotuofeng @guyang3532 @dzhulgakov @davidberard98 @briancoutinho @sraikund16 @sanrise

Metadata

Metadata

Assignees

No one assigned

    Labels

    featureA request for a proper, new feature.oncall: profilerprofiler-related issues (cpu, gpu, kineto)

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions