pytorch - ✅(Solved) Fix Conflict between PyTorch CUDACachingAllocator and NCCL symmetric memory (ncclCommWindowRegister) causing incorrect all-reduce results [1 pull requests, 6 comments, 3 participants]

Official PRs (…)
ON THIS PAGE

Recommended Tools

×6

Utilities matched from this issue’s tags and category — try them while you read without losing context.

GitHub issue graph ai analysis

Paste a GitHub issue URL. We fetch that issue, discover linked issues from bodies/comments/timeline, collect linked pull requests, and produce a structured English report.

The report is written in English Markdown for sharing and archival.

Helpful · Quick feedback

Loading…
GitHub stats
pytorch/pytorch#178138Fetched 2026-04-08 01:16:21
View on GitHub
Comments
6
Participants
3
Timeline
42
Reactions
0
Author
Participants
Assignees
Timeline (top)
mentioned ×16subscribed ×16commented ×6labeled ×2

Error Message

import json import os import random import tempfile import time from dataclasses import dataclass from functools import partial

import torch import torch.distributed as dist from sglang.srt.distributed.device_communicators.pynccl import
PyNcclCommunicator from torch.cuda.memory import CUDAPluggableAllocator from torch.distributed.distributed_c10d import _get_default_group from torch.utils import cpp_extension

""" NCCL 对称内存分配与 all_reduce 测试工具

功能说明: 本脚本实现了基于 NCCL 对称内存 (Symmetric Memory) 的分配和 all_reduce 测试功能。主要特点:

  1. 自定义 NCCL 内存分配器:

    • 使用 ncclMemAlloc 分配内存,并通过 ncclCommWindowRegister 注册为 NCCL_WIN_COLL_SYMMETRIC 窗口,实现对称内存布局
  2. 多测试用例验证:

    • 创建 8 个不同大小、不同内存类型(对称/非对称)的测试用例
    • 每个 test case 执行 all_reduce(op=SUM) 操作
    • 验证所有结果元素是否符合预期(value * world_size)
  3. 显存复用机制:

    • 每个 test case 验证完成后立即删除 tensor 引用
    • 显存由自定义分配器管理,可在后续 test case 中复用

使用方法:

  1. 使用 torchrun 启动分布式训练:

    单机多卡: torchrun --nproc_per_node=2 nccl_symm_mem.py

    多机多卡: torchrun --nnodes=2 --nproc_per_node=2 --node_rank=0 \ --master_addr="192.168.1.1" --master_port=12345 \ nccl_symm_mem.py

  2. 环境变量:

    • RANK: 当前进程 rank(由 torchrun 自动设置)
    • WORLD_SIZE: 总进程数(由 torchrun 自动设置)
    • SGLANG_TMP_NCCL_COMM_VALUE: NCCL comm 指针值(程序自动设置)
  3. 输出说明:

    • Rank 0 会打印详细的测试用例结果和汇总表格
    • 表格包含:Size, Value, Sym(是否对称内存), Exp(期望值), MaxDiff(最大差异), MeanDiff(平均差异), Result(通过/失败)
    • 程序退出码:0 表示全部通过,1 表示有失败

注意事项:

  • 需要安装 PyTorch 和 NCCL 相应版本
  • 需要编译 C++ 扩展(首次运行时自动编译到当前目录)
  • 对称内存要求使用 torch.cuda.use_mem_pool(pool) 上下文管理器
  • NCCL 版本需支持 ncclMemAlloc / ncclMemFree API ================================================================================ """

============================================

创建 NCCL 内存分配器

============================================

nccl_allocator_source = """ #include <cuda_runtime.h> #include <stdio.h> #include <stdlib.h> #include <unistd.h>

extern "C" {

// copy from https://github.com/NVIDIA/nccl/blob/master/src/nccl.h.in typedef enum { ncclSuccess = 0, ncclUnhandledCudaError = 1, ncclSystemError = 2, ncclInternalError = 3, ncclInvalidArgument = 4, ncclInvalidUsage = 5, ncclRemoteError = 6, ncclInProgress = 7, ncclNumResults = 8 } ncclResult_t; typedef struct ncclComm* ncclComm_t; typedef struct ncclWindow_vidmem* ncclWindow_t; ncclResult_t ncclCommWindowRegister(ncclComm_t comm, void* buff, size_t size, ncclWindow_t* win, int winFlags); #define NCCL_WIN_COLL_SYMMETRIC 0x01

ncclResult_t ncclMemAlloc(void** ptr, size_t size); ncclResult_t ncclMemFree(void ptr); const char ncclGetErrorString(ncclResult_t result);

#define NCCLCHECK(cmd) do {
ncclResult_t res = cmd;
if (res != ncclSuccess) {
fprintf(stderr, "ERROR: NCCL symmetric memory allocation failed. Most likely out of device memory. '%s'\n",
ncclGetErrorString(res));
return NULL;
}
} while(0)

void* nccl_alloc_plug(size_t size, int device, void* stream) { void* ptr; NCCLCHECK(ncclMemAlloc(&ptr, size));

const char *str_val = getenv("SGLANG_TMP_NCCL_COMM_VALUE"); char endptr; void int_val = (void *)strtoull(str_val, &endptr, 0);

ncclComm_t comm = (ncclComm_t)(int_val); ncclWindow_t win; NCCLCHECK(ncclCommWindowRegister(comm, ptr, size, &win, NCCL_WIN_COLL_SYMMETRIC)); printf("device %d allocated %d bytes at %ld register to comm %lld\n", device, size, ptr, comm);

return ptr; }

void nccl_free_plug(void* ptr, size_t size, int device, void* stream) { printf("device %d free %d bytes at %ld\n", device, size, ptr); ncclResult_t err = ncclMemFree(ptr); }

}

"""

_allocator = None _mem_pool = None _allocation_records = []

@dataclass class GlobalVars: pool:torch.cuda.MemPool | None = None rank : int = 0 world_size : int = 1 process_group: PyNcclCommunicator| None = None

def __init__(self):
    if not dist.is_initialized():
        rank = int(os.environ.get("RANK", "0"))
        world_size = int(os.environ.get("WORLD_SIZE", "1"))
        dist.init_process_group(
            backend="nccl",
            rank=rank,
            world_size=world_size,
            init_method="tcp://127.0.0.1:12344" if 'MASTER_ADDR' not in os.environ else None,
        )
        torch.cuda.set_device(rank)
        torch.set_default_device(f'cuda:{rank}')
        self.rank = rank
        self.world_size = world_size
    print(f"Initialized process group: rank={rank}, world_size={world_size}")
    out_dir = os.path.join(tempfile.gettempdir(), "symm_allocator")
    try:
        os.remove(os.path.join(out_dir, "lock"))
    except FileNotFoundError:
        pass
    torch.distributed.barrier()
    nccl_allocator_libname = "nccl_allocator_2"
    torch.utils.cpp_extension.load_inline(
        name=nccl_allocator_libname,
        cpp_sources=nccl_allocator_source,
        with_cuda=True,
        extra_ldflags=["-lnccl"],
        verbose=True,
        is_python_module=False,
        build_directory=out_dir,
    )
    global _allocator
    _allocator = CUDAPluggableAllocator(
        f"{out_dir}/{nccl_allocator_libname}.so",
        "nccl_alloc_plug",
        "nccl_free_plug",
    ).allocator()

    _cur_device = torch.cuda.current_device()
    self.device = _cur_device
    pool = torch.cuda.MemPool(_allocator)
    global _mem_pool
    _mem_pool = pool
    print(f'rank = {rank}, Current device: {_cur_device} {id(pool)=}', flush=True)

    self.pool = pool
    cpu_group = torch.distributed.new_group([rank for rank in range(world_size)], backend='gloo')
    pynccl_comm = PyNcclCommunicator(group=cpu_group, device=rank)
    self.process_group = pynccl_comm
    os.environ["SGLANG_TMP_NCCL_COMM_VALUE"] = str(self.process_group.comm.value)
    print(f"Set SGLANG_TMP_NCCL_COMM_VALUE: {self.process_group.comm.value} {id(self.pool)=}", flush=True)

@dataclass class TestCase: """测试用例数据结构""" size: int | list[int] val: float is_symmetric: bool tensor: torch.Tensor expected_sum: float | None # 预期的所有 reduce 后的结果

def __init__(self, size, val, is_symmetric, pool=None, device=None):
    self.size = size
    self.val = val
    self.is_symmetric = is_symmetric
    if self.is_symmetric and pool is not None:
        with torch.cuda.use_mem_pool(pool):
            self.tensor = torch.zeros(size, dtype=torch.float32, device=device)
        global _allocation_records
        if len(pool.snapshot()) > len(_allocation_records):
            for seg in pool.snapshot():
                new_addr = seg['address']
                found = False
                for item in _allocation_records:
                    if item[1] == new_addr:
                        found = True
                        break
                if not found:
                    _allocation_records.append( (len(_allocation_records), new_addr ))
    else:
        self.tensor = torch.zeros(size, dtype=torch.float32, device=device)
    self.tensor.fill_(self.val)
    self.expected_sum = None  # 将在运行时计算

def set_expected_sum(self, world_size):
    """设置预期的 all_reduce 结果"""
    self.expected_sum = self.val * world_size

def verify_all_reduce(self):
    with _global_vars.process_group.change_state(enable=True):
        print(f'rank {_global_vars.rank}: all_reduce {self.tensor.data_ptr()=}')
        _global_vars.process_group.all_reduce(self.tensor)#, op=dist.ReduceOp.SUM)

    # 检查所有元素
    print(f'rank {_global_vars.rank}: after all_reduce {self.tensor.data_ptr()=}')
    torch.cuda.synchronize()
    tolerance = 1e-5
    expected_tensor = torch.full_like(self.tensor, self.expected_sum)
    diff = torch.abs(self.tensor - expected_tensor)
    max_diff = diff.max().item()
    mean_diff = diff.mean().item()

    # 所有元素都在容差范围内
    is_correct = (diff < tolerance).all().item()

    return is_correct, max_diff, mean_diff

def create_test_cases(pool=None) -> list[TestCase]: """创建多个测试用例""" test_cases = []

for i in range(8):
    tc1 = TestCase(size=1024 * 256, val=1.0, is_symmetric=True, pool=pool, device=_global_vars.device)
    test_cases.append(tc1)

return test_cases

def run_tests(): """运行所有测试用例""" rank = dist.get_rank() world_size = dist.get_world_size()

print(f"\n{'='*60}")
print(f"{'='*20} 开始测试用例 {'='*20}")
print(f"Rank: {rank}, World Size: {world_size}")
print(f"{'='*60}\n")

# 创建测试用例
test_cases = create_test_cases(_global_vars.pool)

global _allocation_records
_allocation_records.sort(key=lambda x: x[1])
print(f'\nrank {_global_vars.rank} {_allocation_records=}\n', flush=True)

# 设置期望值
for tc in test_cases:
    tc.set_expected_sum(world_size)

# 运行测试用例
all_passed = True
test_results = []

for idx in range(len(test_cases)):
    tc = test_cases[idx]
    #print(f"{'='*20} 测试用例 {idx + 1} {'='*20}")
    #print(f"  Size: {tc.size:,}")
    #print(f"  Value: {tc.val}")
    #print(f"  Is Symmetric: {tc.is_symmetric}")
    #print(f"  Expected Sum: {tc.expected_sum}")

    # 运行 all_reduce 并验证所有元素
    is_correct, max_diff, mean_diff = tc.verify_all_reduce()

    print(f"  Max Diff: {max_diff:.8f}")
    print(f"  Mean Diff: {mean_diff:.8f}")
    print(f"  Result: {'PASS' if is_correct else 'FAIL'}")

    test_results.append({
        'test_case': idx + 1,
        'size': tc.size,
        'val': tc.val,
        'is_symmetric': tc.is_symmetric,
        'expected_sum': tc.expected_sum,
        'max_diff': max_diff,
        'mean_diff': mean_diff,
        'pass': is_correct
    })

    if not is_correct:
        all_passed = False

    # 删除 tensor 和 test case 引用以释放显存
    #del tc.tensor
    #test_cases[idx] = None  # 从列表中移除引用

    print()

# 打印测试摘要
print(f"\n{'='*60}")
print(f"{'='*20} 测试摘要 {'='*20}")
print(f"{'='*60}\n")

if rank == 0:
    print(f"{'TestCase':<8} {'Size':<12} {'Value':<8} {'Sym':<5} {'Exp':<8} {'MaxDiff':<12} {'MeanDiff':<12} {'Result':<8}")
    print("-" * 80)

    passed_count = 0
    failed_count = 0

    for result in test_results:
        status = 'PASS' if result['pass'] else 'FAIL'
        print(f"{result['test_case']:<8} "
              f"{result['size']:<12,} "
              f"{result['val']:<8} "
              f"{result['is_symmetric']:<5} "
              f"{result['expected_sum']:<8} "
              f"{result['max_diff']:<12.8f} "
              f"{result['mean_diff']:<12.8f} "
              f"{status:<8}")

        if result['pass']:
            passed_count += 1
        else:
            failed_count += 1

    print("-" * 80)
    print(f"Total: {len(test_results)} | Passed: {passed_count} | Failed: {failed_count}")

    if all_passed:
        print("\n=== 所有测试用例通过 ===\n")
    else:
        print("\n=== 部分测试用例失败 ===\n")

return all_passed

_glabal_vars : GlobalVars = None if name == "main": # 初始化分布式环境 _global_vars = GlobalVars()

print(f'1st round')
all_passed = run_tests()
dist.barrier()
# run for second time to see whether tensor has same data ptr
print(f'2nd round')
all_passed = run_tests()

# 优雅退出
if dist.is_initialized():
    dist.barrier()
    if dist.get_rank() == 0:
        print("\n=== 测试完成 ===")

time.sleep(10)
exit(0)

Fix Action

Fixed

PR fix notes

PR #178362: [CUDA][Mempool] use allocation-time counter instead of address for Block ordering to fix NCCL symmetric memory mismatch

Description (problem / solution / changelog)

Previously, blocks were ordered by their memory address. However, this caused issues because different ranks might allocate memory at non-uniform addresses, leading to inconsistent block ordering across ranks. This inconsistency could result in misaligned tensor reuse during communication, causing incorrect or corrupted results.

To fix this, we replace address-based sorting with an allocation-time counter, which guarantees a globally consistent order of blocks across all ranks. This ensures that tensor block reuse is aligned and deterministic, eliminating communication errors due to block misalignment.

This pr is based on #167662 and comments in issue #178138.

Changed files

  • c10/core/CachingDeviceAllocator.h (modified, +0/-1)
  • c10/cuda/CUDACachingAllocator.cpp (modified, +13/-2)
  • torch/csrc/distributed/c10d/ProcessGroupNCCL.cpp (modified, +0/-9)

Code Example

import json
import os
import random
import tempfile
import time
from dataclasses import dataclass
from functools import partial

import torch
import torch.distributed as dist
from sglang.srt.distributed.device_communicators.pynccl import \
    PyNcclCommunicator
from torch.cuda.memory import CUDAPluggableAllocator
from torch.distributed.distributed_c10d import _get_default_group
from torch.utils import cpp_extension

"""
NCCL 对称内存分配与 all_reduce 测试工具
================================================================================

功能说明:
  本脚本实现了基于 NCCL 对称内存 (Symmetric Memory) 的分配和 all_reduce
  测试功能。主要特点:

  1. 自定义 NCCL 内存分配器:
     - 使用 ncclMemAlloc 分配内存,并通过 ncclCommWindowRegister 注册为
       NCCL_WIN_COLL_SYMMETRIC 窗口,实现对称内存布局

  2. 多测试用例验证:
     - 创建 8 个不同大小、不同内存类型(对称/非对称)的测试用例
     - 每个 test case 执行 all_reduce(op=SUM) 操作
     - 验证所有结果元素是否符合预期(value * world_size)

  3. 显存复用机制:
     - 每个 test case 验证完成后立即删除 tensor 引用
     - 显存由自定义分配器管理,可在后续 test case 中复用

使用方法:
  1. 使用 torchrun 启动分布式训练:

     单机多卡:
       torchrun --nproc_per_node=2 nccl_symm_mem.py

     多机多卡:
       torchrun --nnodes=2 --nproc_per_node=2 --node_rank=0 \\
                --master_addr="192.168.1.1" --master_port=12345 \\
                nccl_symm_mem.py

  2. 环境变量:
     - RANK: 当前进程 rank(由 torchrun 自动设置)
     - WORLD_SIZE: 总进程数(由 torchrun 自动设置)
     - SGLANG_TMP_NCCL_COMM_VALUE: NCCL comm 指针值(程序自动设置)

  3. 输出说明:
     - Rank 0 会打印详细的测试用例结果和汇总表格
     - 表格包含:Size, Value, Sym(是否对称内存), Exp(期望值),
                MaxDiff(最大差异), MeanDiff(平均差异), Result(通过/失败)
     - 程序退出码:0 表示全部通过,1 表示有失败

注意事项:
  - 需要安装 PyTorchNCCL 相应版本
  - 需要编译 C++ 扩展(首次运行时自动编译到当前目录)
  - 对称内存要求使用 torch.cuda.use_mem_pool(pool) 上下文管理器
  - NCCL 版本需支持 ncclMemAlloc / ncclMemFree API
================================================================================
"""

# ============================================
# 创建 NCCL 内存分配器
# ============================================
nccl_allocator_source = """
#include <cuda_runtime.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>

extern "C" {

// copy from https://github.com/NVIDIA/nccl/blob/master/src/nccl.h.in
typedef enum { ncclSuccess                 =  0,
               ncclUnhandledCudaError      =  1,
               ncclSystemError             =  2,
               ncclInternalError           =  3,
               ncclInvalidArgument         =  4,
               ncclInvalidUsage            =  5,
               ncclRemoteError             =  6,
               ncclInProgress              =  7,
               ncclNumResults              =  8 } ncclResult_t;
typedef struct ncclComm* ncclComm_t;
typedef struct ncclWindow_vidmem* ncclWindow_t;
ncclResult_t  ncclCommWindowRegister(ncclComm_t comm, void* buff, size_t size, ncclWindow_t* win, int winFlags);
#define NCCL_WIN_COLL_SYMMETRIC 0x01

ncclResult_t  ncclMemAlloc(void** ptr, size_t size);
ncclResult_t  ncclMemFree(void *ptr);
const char*  ncclGetErrorString(ncclResult_t result);

#define NCCLCHECK(cmd) do {                                               \
  ncclResult_t res = cmd;                                                 \
  if (res != ncclSuccess) {                                               \
    fprintf(stderr, "ERROR: NCCL symmetric memory allocation failed. Most likely out of device memory. '%s'\\n", \
           ncclGetErrorString(res));                       \
    return NULL;                                                        \
  }                                                                       \
} while(0)

void* nccl_alloc_plug(size_t size, int device, void* stream) {
  void* ptr;
  NCCLCHECK(ncclMemAlloc(&ptr, size));

  const char *str_val = getenv("SGLANG_TMP_NCCL_COMM_VALUE");
  char *endptr;
  void* int_val = (void *)strtoull(str_val, &endptr, 0);

  ncclComm_t comm = (ncclComm_t)(int_val);
  ncclWindow_t win;
  NCCLCHECK(ncclCommWindowRegister(comm, ptr, size, &win, NCCL_WIN_COLL_SYMMETRIC));
  printf("device %d allocated %d bytes at %ld register to comm %lld\\n", device, size, ptr, comm);

  return ptr;
}

void nccl_free_plug(void* ptr, size_t size, int device, void* stream) {
  printf("device %d free %d bytes at %ld\\n", device, size, ptr);
  ncclResult_t err = ncclMemFree(ptr);
}

}

"""

_allocator = None
_mem_pool = None
_allocation_records = []

@dataclass
class GlobalVars:
    pool:torch.cuda.MemPool | None = None
    rank : int = 0
    world_size : int = 1
    process_group: PyNcclCommunicator| None = None

    def __init__(self):
        if not dist.is_initialized():
            rank = int(os.environ.get("RANK", "0"))
            world_size = int(os.environ.get("WORLD_SIZE", "1"))
            dist.init_process_group(
                backend="nccl",
                rank=rank,
                world_size=world_size,
                init_method="tcp://127.0.0.1:12344" if 'MASTER_ADDR' not in os.environ else None,
            )
            torch.cuda.set_device(rank)
            torch.set_default_device(f'cuda:{rank}')
            self.rank = rank
            self.world_size = world_size
        print(f"Initialized process group: rank={rank}, world_size={world_size}")
        out_dir = os.path.join(tempfile.gettempdir(), "symm_allocator")
        try:
            os.remove(os.path.join(out_dir, "lock"))
        except FileNotFoundError:
            pass
        torch.distributed.barrier()
        nccl_allocator_libname = "nccl_allocator_2"
        torch.utils.cpp_extension.load_inline(
            name=nccl_allocator_libname,
            cpp_sources=nccl_allocator_source,
            with_cuda=True,
            extra_ldflags=["-lnccl"],
            verbose=True,
            is_python_module=False,
            build_directory=out_dir,
        )
        global _allocator
        _allocator = CUDAPluggableAllocator(
            f"{out_dir}/{nccl_allocator_libname}.so",
            "nccl_alloc_plug",
            "nccl_free_plug",
        ).allocator()

        _cur_device = torch.cuda.current_device()
        self.device = _cur_device
        pool = torch.cuda.MemPool(_allocator)
        global _mem_pool
        _mem_pool = pool
        print(f'rank = {rank}, Current device: {_cur_device} {id(pool)=}', flush=True)

        self.pool = pool
        cpu_group = torch.distributed.new_group([rank for rank in range(world_size)], backend='gloo')
        pynccl_comm = PyNcclCommunicator(group=cpu_group, device=rank)
        self.process_group = pynccl_comm
        os.environ["SGLANG_TMP_NCCL_COMM_VALUE"] = str(self.process_group.comm.value)
        print(f"Set SGLANG_TMP_NCCL_COMM_VALUE: {self.process_group.comm.value} {id(self.pool)=}", flush=True)


@dataclass
class TestCase:
    """测试用例数据结构"""
    size: int | list[int]
    val: float
    is_symmetric: bool
    tensor: torch.Tensor
    expected_sum: float | None # 预期的所有 reduce 后的结果

    def __init__(self, size, val, is_symmetric, pool=None, device=None):
        self.size = size
        self.val = val
        self.is_symmetric = is_symmetric
        if self.is_symmetric and pool is not None:
            with torch.cuda.use_mem_pool(pool):
                self.tensor = torch.zeros(size, dtype=torch.float32, device=device)
            global _allocation_records
            if len(pool.snapshot()) > len(_allocation_records):
                for seg in pool.snapshot():
                    new_addr = seg['address']
                    found = False
                    for item in _allocation_records:
                        if item[1] == new_addr:
                            found = True
                            break
                    if not found:
                        _allocation_records.append( (len(_allocation_records), new_addr ))
        else:
            self.tensor = torch.zeros(size, dtype=torch.float32, device=device)
        self.tensor.fill_(self.val)
        self.expected_sum = None  # 将在运行时计算

    def set_expected_sum(self, world_size):
        """设置预期的 all_reduce 结果"""
        self.expected_sum = self.val * world_size

    def verify_all_reduce(self):
        with _global_vars.process_group.change_state(enable=True):
            print(f'rank {_global_vars.rank}: all_reduce {self.tensor.data_ptr()=}')
            _global_vars.process_group.all_reduce(self.tensor)#, op=dist.ReduceOp.SUM)

        # 检查所有元素
        print(f'rank {_global_vars.rank}: after all_reduce {self.tensor.data_ptr()=}')
        torch.cuda.synchronize()
        tolerance = 1e-5
        expected_tensor = torch.full_like(self.tensor, self.expected_sum)
        diff = torch.abs(self.tensor - expected_tensor)
        max_diff = diff.max().item()
        mean_diff = diff.mean().item()

        # 所有元素都在容差范围内
        is_correct = (diff < tolerance).all().item()

        return is_correct, max_diff, mean_diff

def create_test_cases(pool=None) -> list[TestCase]:
    """创建多个测试用例"""
    test_cases = []

    for i in range(8):
        tc1 = TestCase(size=1024 * 256, val=1.0, is_symmetric=True, pool=pool, device=_global_vars.device)
        test_cases.append(tc1)

    return test_cases

def run_tests():
    """运行所有测试用例"""
    rank = dist.get_rank()
    world_size = dist.get_world_size()

    print(f"\n{'='*60}")
    print(f"{'='*20} 开始测试用例 {'='*20}")
    print(f"Rank: {rank}, World Size: {world_size}")
    print(f"{'='*60}\n")

    # 创建测试用例
    test_cases = create_test_cases(_global_vars.pool)

    global _allocation_records
    _allocation_records.sort(key=lambda x: x[1])
    print(f'\nrank {_global_vars.rank} {_allocation_records=}\n', flush=True)

    # 设置期望值
    for tc in test_cases:
        tc.set_expected_sum(world_size)

    # 运行测试用例
    all_passed = True
    test_results = []

    for idx in range(len(test_cases)):
        tc = test_cases[idx]
        #print(f"{'='*20} 测试用例 {idx + 1} {'='*20}")
        #print(f"  Size: {tc.size:,}")
        #print(f"  Value: {tc.val}")
        #print(f"  Is Symmetric: {tc.is_symmetric}")
        #print(f"  Expected Sum: {tc.expected_sum}")

        # 运行 all_reduce 并验证所有元素
        is_correct, max_diff, mean_diff = tc.verify_all_reduce()

        print(f"  Max Diff: {max_diff:.8f}")
        print(f"  Mean Diff: {mean_diff:.8f}")
        print(f"  Result: {'PASS' if is_correct else 'FAIL'}")

        test_results.append({
            'test_case': idx + 1,
            'size': tc.size,
            'val': tc.val,
            'is_symmetric': tc.is_symmetric,
            'expected_sum': tc.expected_sum,
            'max_diff': max_diff,
            'mean_diff': mean_diff,
            'pass': is_correct
        })

        if not is_correct:
            all_passed = False

        # 删除 tensor 和 test case 引用以释放显存
        #del tc.tensor
        #test_cases[idx] = None  # 从列表中移除引用

        print()

    # 打印测试摘要
    print(f"\n{'='*60}")
    print(f"{'='*20} 测试摘要 {'='*20}")
    print(f"{'='*60}\n")

    if rank == 0:
        print(f"{'TestCase':<8} {'Size':<12} {'Value':<8} {'Sym':<5} {'Exp':<8} {'MaxDiff':<12} {'MeanDiff':<12} {'Result':<8}")
        print("-" * 80)

        passed_count = 0
        failed_count = 0

        for result in test_results:
            status = 'PASS' if result['pass'] else 'FAIL'
            print(f"{result['test_case']:<8} "
                  f"{result['size']:<12,} "
                  f"{result['val']:<8} "
                  f"{result['is_symmetric']:<5} "
                  f"{result['expected_sum']:<8} "
                  f"{result['max_diff']:<12.8f} "
                  f"{result['mean_diff']:<12.8f} "
                  f"{status:<8}")

            if result['pass']:
                passed_count += 1
            else:
                failed_count += 1

        print("-" * 80)
        print(f"Total: {len(test_results)} | Passed: {passed_count} | Failed: {failed_count}")

        if all_passed:
            print("\n=== 所有测试用例通过 ===\n")
        else:
            print("\n=== 部分测试用例失败 ===\n")

    return all_passed

_glabal_vars : GlobalVars = None
if __name__ == "__main__":
    # 初始化分布式环境
    _global_vars = GlobalVars()

    print(f'1st round')
    all_passed = run_tests()
    dist.barrier()
    # run for second time to see whether tensor has same data ptr
    print(f'2nd round')
    all_passed = run_tests()

    # 优雅退出
    if dist.is_initialized():
        dist.barrier()
        if dist.get_rank() == 0:
            print("\n=== 测试完成 ===")

    time.sleep(10)
    exit(0)
RAW_BUFFERClick to expand / collapse

Description

When trying to use NCCL’s symmetric memory API (ncclCommWindowRegister with NCCL_WIN_COLL_SYMMETRIC) together with PyTorch’s CUDA caching allocator (CUDACachingAllocator), we’ve observed incorrect results from ncclAllReduce under certain patterns of allocation and reuse.

Problem

NCCL symmetric memory requires that, for a given communicators group, buffers registered on each rank have a consistent mapping relationship:

  • For a particular registered window, the “same” logical buffer across ranks is assumed to have the same offset from the window base on each rank.
  • In practice, this means that when we repeatedly allocate Tensors that we want to use as symmetric buffers, the underlying device addresses (or at least their relative ordering) must be consistent across ranks. However, PyTorch’s CUDACachingAllocator:
  • Maintains internal blocks and sorts them by stream, size, address;
  • May allocate a new Tensor from different blocks on different ranks, even if each rank performs the “same” allocation sequence at the Python level;
  • Over time and under reuse/fragmentation, the mapping from “nth allocation” → “which block / address” can diverge between ranks.

Here's a simple code to re-produce, you can run with torchrun --nproc_per_node=8 nccl_symm_mem.py.

import json
import os
import random
import tempfile
import time
from dataclasses import dataclass
from functools import partial

import torch
import torch.distributed as dist
from sglang.srt.distributed.device_communicators.pynccl import \
    PyNcclCommunicator
from torch.cuda.memory import CUDAPluggableAllocator
from torch.distributed.distributed_c10d import _get_default_group
from torch.utils import cpp_extension

"""
NCCL 对称内存分配与 all_reduce 测试工具
================================================================================

功能说明:
  本脚本实现了基于 NCCL 对称内存 (Symmetric Memory) 的分配和 all_reduce
  测试功能。主要特点:

  1. 自定义 NCCL 内存分配器:
     - 使用 ncclMemAlloc 分配内存,并通过 ncclCommWindowRegister 注册为
       NCCL_WIN_COLL_SYMMETRIC 窗口,实现对称内存布局

  2. 多测试用例验证:
     - 创建 8 个不同大小、不同内存类型(对称/非对称)的测试用例
     - 每个 test case 执行 all_reduce(op=SUM) 操作
     - 验证所有结果元素是否符合预期(value * world_size)

  3. 显存复用机制:
     - 每个 test case 验证完成后立即删除 tensor 引用
     - 显存由自定义分配器管理,可在后续 test case 中复用

使用方法:
  1. 使用 torchrun 启动分布式训练:

     单机多卡:
       torchrun --nproc_per_node=2 nccl_symm_mem.py

     多机多卡:
       torchrun --nnodes=2 --nproc_per_node=2 --node_rank=0 \\
                --master_addr="192.168.1.1" --master_port=12345 \\
                nccl_symm_mem.py

  2. 环境变量:
     - RANK: 当前进程 rank(由 torchrun 自动设置)
     - WORLD_SIZE: 总进程数(由 torchrun 自动设置)
     - SGLANG_TMP_NCCL_COMM_VALUE: NCCL comm 指针值(程序自动设置)

  3. 输出说明:
     - Rank 0 会打印详细的测试用例结果和汇总表格
     - 表格包含:Size, Value, Sym(是否对称内存), Exp(期望值),
                MaxDiff(最大差异), MeanDiff(平均差异), Result(通过/失败)
     - 程序退出码:0 表示全部通过,1 表示有失败

注意事项:
  - 需要安装 PyTorch 和 NCCL 相应版本
  - 需要编译 C++ 扩展(首次运行时自动编译到当前目录)
  - 对称内存要求使用 torch.cuda.use_mem_pool(pool) 上下文管理器
  - NCCL 版本需支持 ncclMemAlloc / ncclMemFree API
================================================================================
"""

# ============================================
# 创建 NCCL 内存分配器
# ============================================
nccl_allocator_source = """
#include <cuda_runtime.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>

extern "C" {

// copy from https://github.com/NVIDIA/nccl/blob/master/src/nccl.h.in
typedef enum { ncclSuccess                 =  0,
               ncclUnhandledCudaError      =  1,
               ncclSystemError             =  2,
               ncclInternalError           =  3,
               ncclInvalidArgument         =  4,
               ncclInvalidUsage            =  5,
               ncclRemoteError             =  6,
               ncclInProgress              =  7,
               ncclNumResults              =  8 } ncclResult_t;
typedef struct ncclComm* ncclComm_t;
typedef struct ncclWindow_vidmem* ncclWindow_t;
ncclResult_t  ncclCommWindowRegister(ncclComm_t comm, void* buff, size_t size, ncclWindow_t* win, int winFlags);
#define NCCL_WIN_COLL_SYMMETRIC 0x01

ncclResult_t  ncclMemAlloc(void** ptr, size_t size);
ncclResult_t  ncclMemFree(void *ptr);
const char*  ncclGetErrorString(ncclResult_t result);

#define NCCLCHECK(cmd) do {                                               \
  ncclResult_t res = cmd;                                                 \
  if (res != ncclSuccess) {                                               \
    fprintf(stderr, "ERROR: NCCL symmetric memory allocation failed. Most likely out of device memory. '%s'\\n", \
           ncclGetErrorString(res));                       \
    return NULL;                                                        \
  }                                                                       \
} while(0)

void* nccl_alloc_plug(size_t size, int device, void* stream) {
  void* ptr;
  NCCLCHECK(ncclMemAlloc(&ptr, size));

  const char *str_val = getenv("SGLANG_TMP_NCCL_COMM_VALUE");
  char *endptr;
  void* int_val = (void *)strtoull(str_val, &endptr, 0);

  ncclComm_t comm = (ncclComm_t)(int_val);
  ncclWindow_t win;
  NCCLCHECK(ncclCommWindowRegister(comm, ptr, size, &win, NCCL_WIN_COLL_SYMMETRIC));
  printf("device %d allocated %d bytes at %ld register to comm %lld\\n", device, size, ptr, comm);

  return ptr;
}

void nccl_free_plug(void* ptr, size_t size, int device, void* stream) {
  printf("device %d free %d bytes at %ld\\n", device, size, ptr);
  ncclResult_t err = ncclMemFree(ptr);
}

}

"""

_allocator = None
_mem_pool = None
_allocation_records = []

@dataclass
class GlobalVars:
    pool:torch.cuda.MemPool | None = None
    rank : int = 0
    world_size : int = 1
    process_group: PyNcclCommunicator| None = None

    def __init__(self):
        if not dist.is_initialized():
            rank = int(os.environ.get("RANK", "0"))
            world_size = int(os.environ.get("WORLD_SIZE", "1"))
            dist.init_process_group(
                backend="nccl",
                rank=rank,
                world_size=world_size,
                init_method="tcp://127.0.0.1:12344" if 'MASTER_ADDR' not in os.environ else None,
            )
            torch.cuda.set_device(rank)
            torch.set_default_device(f'cuda:{rank}')
            self.rank = rank
            self.world_size = world_size
        print(f"Initialized process group: rank={rank}, world_size={world_size}")
        out_dir = os.path.join(tempfile.gettempdir(), "symm_allocator")
        try:
            os.remove(os.path.join(out_dir, "lock"))
        except FileNotFoundError:
            pass
        torch.distributed.barrier()
        nccl_allocator_libname = "nccl_allocator_2"
        torch.utils.cpp_extension.load_inline(
            name=nccl_allocator_libname,
            cpp_sources=nccl_allocator_source,
            with_cuda=True,
            extra_ldflags=["-lnccl"],
            verbose=True,
            is_python_module=False,
            build_directory=out_dir,
        )
        global _allocator
        _allocator = CUDAPluggableAllocator(
            f"{out_dir}/{nccl_allocator_libname}.so",
            "nccl_alloc_plug",
            "nccl_free_plug",
        ).allocator()

        _cur_device = torch.cuda.current_device()
        self.device = _cur_device
        pool = torch.cuda.MemPool(_allocator)
        global _mem_pool
        _mem_pool = pool
        print(f'rank = {rank}, Current device: {_cur_device} {id(pool)=}', flush=True)

        self.pool = pool
        cpu_group = torch.distributed.new_group([rank for rank in range(world_size)], backend='gloo')
        pynccl_comm = PyNcclCommunicator(group=cpu_group, device=rank)
        self.process_group = pynccl_comm
        os.environ["SGLANG_TMP_NCCL_COMM_VALUE"] = str(self.process_group.comm.value)
        print(f"Set SGLANG_TMP_NCCL_COMM_VALUE: {self.process_group.comm.value} {id(self.pool)=}", flush=True)


@dataclass
class TestCase:
    """测试用例数据结构"""
    size: int | list[int]
    val: float
    is_symmetric: bool
    tensor: torch.Tensor
    expected_sum: float | None # 预期的所有 reduce 后的结果

    def __init__(self, size, val, is_symmetric, pool=None, device=None):
        self.size = size
        self.val = val
        self.is_symmetric = is_symmetric
        if self.is_symmetric and pool is not None:
            with torch.cuda.use_mem_pool(pool):
                self.tensor = torch.zeros(size, dtype=torch.float32, device=device)
            global _allocation_records
            if len(pool.snapshot()) > len(_allocation_records):
                for seg in pool.snapshot():
                    new_addr = seg['address']
                    found = False
                    for item in _allocation_records:
                        if item[1] == new_addr:
                            found = True
                            break
                    if not found:
                        _allocation_records.append( (len(_allocation_records), new_addr ))
        else:
            self.tensor = torch.zeros(size, dtype=torch.float32, device=device)
        self.tensor.fill_(self.val)
        self.expected_sum = None  # 将在运行时计算

    def set_expected_sum(self, world_size):
        """设置预期的 all_reduce 结果"""
        self.expected_sum = self.val * world_size

    def verify_all_reduce(self):
        with _global_vars.process_group.change_state(enable=True):
            print(f'rank {_global_vars.rank}: all_reduce {self.tensor.data_ptr()=}')
            _global_vars.process_group.all_reduce(self.tensor)#, op=dist.ReduceOp.SUM)

        # 检查所有元素
        print(f'rank {_global_vars.rank}: after all_reduce {self.tensor.data_ptr()=}')
        torch.cuda.synchronize()
        tolerance = 1e-5
        expected_tensor = torch.full_like(self.tensor, self.expected_sum)
        diff = torch.abs(self.tensor - expected_tensor)
        max_diff = diff.max().item()
        mean_diff = diff.mean().item()

        # 所有元素都在容差范围内
        is_correct = (diff < tolerance).all().item()

        return is_correct, max_diff, mean_diff

def create_test_cases(pool=None) -> list[TestCase]:
    """创建多个测试用例"""
    test_cases = []

    for i in range(8):
        tc1 = TestCase(size=1024 * 256, val=1.0, is_symmetric=True, pool=pool, device=_global_vars.device)
        test_cases.append(tc1)

    return test_cases

def run_tests():
    """运行所有测试用例"""
    rank = dist.get_rank()
    world_size = dist.get_world_size()

    print(f"\n{'='*60}")
    print(f"{'='*20} 开始测试用例 {'='*20}")
    print(f"Rank: {rank}, World Size: {world_size}")
    print(f"{'='*60}\n")

    # 创建测试用例
    test_cases = create_test_cases(_global_vars.pool)

    global _allocation_records
    _allocation_records.sort(key=lambda x: x[1])
    print(f'\nrank {_global_vars.rank} {_allocation_records=}\n', flush=True)

    # 设置期望值
    for tc in test_cases:
        tc.set_expected_sum(world_size)

    # 运行测试用例
    all_passed = True
    test_results = []

    for idx in range(len(test_cases)):
        tc = test_cases[idx]
        #print(f"{'='*20} 测试用例 {idx + 1} {'='*20}")
        #print(f"  Size: {tc.size:,}")
        #print(f"  Value: {tc.val}")
        #print(f"  Is Symmetric: {tc.is_symmetric}")
        #print(f"  Expected Sum: {tc.expected_sum}")

        # 运行 all_reduce 并验证所有元素
        is_correct, max_diff, mean_diff = tc.verify_all_reduce()

        print(f"  Max Diff: {max_diff:.8f}")
        print(f"  Mean Diff: {mean_diff:.8f}")
        print(f"  Result: {'PASS' if is_correct else 'FAIL'}")

        test_results.append({
            'test_case': idx + 1,
            'size': tc.size,
            'val': tc.val,
            'is_symmetric': tc.is_symmetric,
            'expected_sum': tc.expected_sum,
            'max_diff': max_diff,
            'mean_diff': mean_diff,
            'pass': is_correct
        })

        if not is_correct:
            all_passed = False

        # 删除 tensor 和 test case 引用以释放显存
        #del tc.tensor
        #test_cases[idx] = None  # 从列表中移除引用

        print()

    # 打印测试摘要
    print(f"\n{'='*60}")
    print(f"{'='*20} 测试摘要 {'='*20}")
    print(f"{'='*60}\n")

    if rank == 0:
        print(f"{'TestCase':<8} {'Size':<12} {'Value':<8} {'Sym':<5} {'Exp':<8} {'MaxDiff':<12} {'MeanDiff':<12} {'Result':<8}")
        print("-" * 80)

        passed_count = 0
        failed_count = 0

        for result in test_results:
            status = 'PASS' if result['pass'] else 'FAIL'
            print(f"{result['test_case']:<8} "
                  f"{result['size']:<12,} "
                  f"{result['val']:<8} "
                  f"{result['is_symmetric']:<5} "
                  f"{result['expected_sum']:<8} "
                  f"{result['max_diff']:<12.8f} "
                  f"{result['mean_diff']:<12.8f} "
                  f"{status:<8}")

            if result['pass']:
                passed_count += 1
            else:
                failed_count += 1

        print("-" * 80)
        print(f"Total: {len(test_results)} | Passed: {passed_count} | Failed: {failed_count}")

        if all_passed:
            print("\n=== 所有测试用例通过 ===\n")
        else:
            print("\n=== 部分测试用例失败 ===\n")

    return all_passed

_glabal_vars : GlobalVars = None
if __name__ == "__main__":
    # 初始化分布式环境
    _global_vars = GlobalVars()

    print(f'1st round')
    all_passed = run_tests()
    dist.barrier()
    # run for second time to see whether tensor has same data ptr
    print(f'2nd round')
    all_passed = run_tests()

    # 优雅退出
    if dist.is_initialized():
        dist.barrier()
        if dist.get_rank() == 0:
            print("\n=== 测试完成 ===")

    time.sleep(10)
    exit(0)

Question/Suggestion

Is there any supported way in PyTorch to:

  • Disable internal block reordering for certain allocations, or
  • Allocate Tensors from a region where the block layout is guaranteed to be consistent across ranks?

This issue mainly affects advanced users integrating PyTorch with NCCL’s symmetric window API, but it can lead to very subtle correctness bugs (wrong results without obvious crashes), so any guidance or support here would be very helpful.

cc @awgu @wanchaol @fegin @fduwjj @wz337 @wconstab @d4l3k @pragupta @msaroufim @dcci @aditvenk @xmfan

extent analysis

Fix Plan

To address the issue of inconsistent block layout across ranks when using PyTorch's CUDACachingAllocator with NCCL's symmetric memory API, we can implement the following steps:

  • Use a custom allocator: Implement a custom allocator that allocates memory from a region where the block layout is guaranteed to be consistent across ranks.
  • Disable internal block reordering: Modify the CUDACachingAllocator to disable internal block reordering for certain allocations.

Here's an example of how you can create a custom allocator:

import torch

class CustomAllocator(torch.cuda.memory.CUDACachingAllocator):
    def __init__(self, block_size):
        super().__init__()
        self.block_size = block_size

    def alloc(self, size, stream):
        # Allocate memory from a region with a consistent block layout
        ptr = torch.cuda.caching_allocator_alloc(size, stream)
        return ptr

    def free(self, ptr, size, stream):
        # Free the allocated memory
        torch.cuda.caching_allocator_free(ptr, size, stream)

# Create a custom allocator with a block size of 1024
custom_allocator = CustomAllocator(block_size=1024)

# Set the custom allocator as the default allocator
torch.cuda.set_allocator(custom_allocator)

To disable internal block reordering, you can modify the CUDACachingAllocator to use a custom sorting function that preserves the original order of the blocks:

class CustomCUDACachingAllocator(torch.cuda.memory.CUDACachingAllocator):
    def __init__(self):
        super().__init__()

    def _sort_blocks(self, blocks):
        # Use a custom sorting function that preserves the original order
        return sorted(blocks, key=lambda x: x['address'])

# Create a custom CUDACachingAllocator that disables internal block reordering
custom_caching_allocator = CustomCUDACachingAllocator()

# Set the custom CUDACachingAllocator as the default allocator
torch.cuda.set_allocator(custom_caching_allocator)

Verification

To verify that the custom allocator or the modified CUDACachingAllocator fixes the issue, you can run the test cases again and check if the results are correct.

# Run the test cases with the custom allocator
all_passed = run_tests()

# Check if all test cases passed
if all_passed:
    print("All test cases passed with the custom allocator.")
else:
    print("Some test cases failed with the custom allocator.")

Extra Tips

When using a custom allocator or modifying the CUDACachingAllocator, make sure to test your code thoroughly to ensure that it works correctly in all scenarios. Additionally, consider the performance implications of using a custom allocator or disabling internal block reordering, as it may affect the overall performance of your application.

Vote matrix · Quick signals

Works
Did the solution work? Tap to confirm.
Easy Fix
Was it a quick fix?
Time Saver
Did it save you time?
Blocking
Was it severely blocking?
Common Issue
Are others likely hitting this too?
Flaky / Intermittent
Is it intermittent?
Verified / Reproducible
Can you reproduce it reliably?
Loading…

Still need to ship something?

×6

Another batch ranked right after the header list — different links, same matching logic.

Back to top recommendations

TRENDING

pytorch - ✅(Solved) Fix Conflict between PyTorch CUDACachingAllocator and NCCL symmetric memory (ncclCommWindowRegister) causing incorrect all-reduce results [1 pull requests, 6 comments, 3 participants]