pytorch - 💡(How to fix) Fix [RFC] Adding Distributed Support to OpenReg [8 comments, 6 participants]

Official PRs (…)
ON THIS PAGE

Recommended Tools

×6

Utilities matched from this issue’s tags and category — try them while you read without losing context.

GitHub issue graph ai analysis

Paste a GitHub issue URL. We fetch that issue, discover linked issues from bodies/comments/timeline, collect linked pull requests, and produce a structured English report.

The report is written in English Markdown for sharing and archival.

Helpful · Quick feedback

Loading…
GitHub stats
pytorch/pytorch#176877Fetched 2026-04-08 00:24:03
View on GitHub
Comments
8
Participants
6
Timeline
70
Reactions
0
Author
Timeline (top)
subscribed ×30mentioned ×28commented ×8labeled ×4

OpenReg is the PyTorch's reference implementation for out-of-tree hardware backends. This RFC adds distributed training support to OpenReg. It delivers a custom c10d::Backend (OpenReg Collectives Communication Library - OCCL), infrastructure for out-of-tree backends to reuse PyTorch's distributed test suite, and vendor-facing documentation.

Error Message

All collectives not implemented initially. Unimplemented collectives fall through to the base class error and are handled by test skip configuration.

Root Cause

OpenReg is the PyTorch's reference implementation for out-of-tree hardware backends. This RFC adds distributed training support to OpenReg. It delivers a custom c10d::Backend (OpenReg Collectives Communication Library - OCCL), infrastructure for out-of-tree backends to reuse PyTorch's distributed test suite, and vendor-facing documentation.

Fix Action

Fix / Workaround

OpenReg currently demonstrates how accelerator vendors integrate with PyTorch's device model, operator dispatch, memory management, streams, autograd etc, all without modifying PyTorch core. But, if a vendor wants to support distributed, there is no reference demonstrating how an out of tree accelerator can integrate with PyTorch's torch.distributed and the c10d backend abstraction. Today, the only references for this integration are ProcessGroupGloo, ProcessGroupNCCL, and FakeProcessGroup, the first two are tightly coupled to their respective libraries and not designed to be read as integration guides, and FakeProcessGroup is a test stub that returns pre-completed Work objects without moving any data. At present, OpenReg lacks the infrastructure needed to demonstrate distributed execution:

torch.distributed.all_reduce(tensor)
ProcessGroup (dispatches by device type)
ProcessGroupOpenReg (c10d::Backend subclass)
        ├── OpenRegWork (c10d::Work subclass, worker thread pool)
        ├── tensorToHost / hostToTensor (mprotect memory handling)
        └── reduceLocal (CPU-side reduction via at::from_blob views)
OcclTransport (blocking TCP sockets)
        ├── Store-based address exchange (c10d::PrefixStore)
        ├── Lazy peer connections
        └── Tagged send/recv with sequence counter

Overrides the collective virtual methods in Backend.hpp, dispatching to a minimal TCP transport. Key design choices:

Code Example

torch.distributed.all_reduce(tensor)
ProcessGroup (dispatches by device type)
ProcessGroupOpenReg (c10d::Backend subclass)
        ├── OpenRegWork (c10d::Work subclass, worker thread pool)
        ├── tensorToHost / hostToTensor (mprotect memory handling)
        └── reduceLocal (CPU-side reduction via at::from_blob views)
OcclTransport (blocking TCP sockets)
        ├── Store-based address exchange (c10d::PrefixStore)
        ├── Lazy peer connections
        └── Tagged send/recv with sequence counter
RAW_BUFFERClick to expand / collapse

🚀 The feature, motivation and pitch

Summary

OpenReg is the PyTorch's reference implementation for out-of-tree hardware backends. This RFC adds distributed training support to OpenReg. It delivers a custom c10d::Backend (OpenReg Collectives Communication Library - OCCL), infrastructure for out-of-tree backends to reuse PyTorch's distributed test suite, and vendor-facing documentation.

Motivation

OpenReg currently demonstrates how accelerator vendors integrate with PyTorch's device model, operator dispatch, memory management, streams, autograd etc, all without modifying PyTorch core. But, if a vendor wants to support distributed, there is no reference demonstrating how an out of tree accelerator can integrate with PyTorch's torch.distributed and the c10d backend abstraction. Today, the only references for this integration are ProcessGroupGloo, ProcessGroupNCCL, and FakeProcessGroup, the first two are tightly coupled to their respective libraries and not designed to be read as integration guides, and FakeProcessGroup is a test stub that returns pre-completed Work objects without moving any data. At present, OpenReg lacks the infrastructure needed to demonstrate distributed execution:

  • No reference implementation for a custom ProcessGroup backend. Distributed support is currently achieved only by registering Gloo as the default backend.
  • No inter-process communication mechanism for OpenReg tensors, preventing collective operations across processes.
  • No distributed test coverage, making it difficult to validate correctness or provide guidance for vendors implementing their own distributed backends.

The proposal aims to address this gap by introducing a reference distributed backend and validation infrastructure, enabling OpenReg to demonstrate how accelerator vendors can implement and verify distributed training support within PyTorch.

Goals:

  1. OCCL: A c10d::Backend that performs real tensor data exchange, not a stub. Core collectives (allreduce, broadcast, allgather, send/recv etc) that produce correct results.
  2. Distributed test reuse: Run PyTorch's existing distributed tests, with declarative configuration for out-of-tree backends.
  3. Documentation: Accelerator vendor guide for implementing distributed backends.

Non-goals: Performance optimization, multi-node support, full collective coverage.

Proposed Implementation

Detailed design document: openreg_distributed_proposal contains the full implementation specification.

Architecture Overview

torch.distributed.all_reduce(tensor)
ProcessGroup (dispatches by device type)
ProcessGroupOpenReg (c10d::Backend subclass)
        ├── OpenRegWork (c10d::Work subclass, worker thread pool)
        ├── tensorToHost / hostToTensor (mprotect memory handling)
        └── reduceLocal (CPU-side reduction via at::from_blob views)
OcclTransport (blocking TCP sockets)
        ├── Store-based address exchange (c10d::PrefixStore)
        ├── Lazy peer connections
        └── Tagged send/recv with sequence counter

The implementation has four components:

1. ProcessGroupOpenReg (c10d::Backend subclass)

Overrides the collective virtual methods in Backend.hpp, dispatching to a minimal TCP transport. Key design choices:

  • Worker thread pool for async execution, following ProcessGroupGloo`s runLoop pattern, any collectives are enqueued and Work::wait() blocks until the worker calls finish().
  • tensorToHost/hostToTensor helpers handle OpenReg's mprotect-guarded memory. Using the same MemoryGuard pattern (unprotect, memcpy, reprotect) as existing OpenReg operators uses.
  • reduceLocal performs CPU-side reductions via at::from_blob views of unprotected device memory. Supports SUM, AVG initailly.
  • Flat collective algorithms built from point-to-point send/recv (e.g., allreduce = gather to root → reduce → broadcast). These are intentionally kept simple as openreg is just a reference implementation. Hardware vendors will replace these with their hardware-tuned/supported algorithm

All collectives not implemented initially. Unimplemented collectives fall through to the base class error and are handled by test skip configuration.

2. TCP Transport Layer

A single OcclTransport class managing a full mesh of TCP connections. This is the component vendors replace with their hardware-specific transport (RDMA, NVLink, proprietary interconnect).

Why not wrap Gloo? The transport layer is kept self-contained to demonstrate the full vendor integration surface. When a vendor integrates their CCL library into PyTorch, the integration points are subclassing c10d::Backend, using c10d::Store for bootstrapping the communicator, managing the c10d::Work async lifecycle, wiring up an execution model (worker threads or device streams), and registering the backend via pybind11.

If OCCL wraps Gloo, the Backend subclass and registration are visible but the Store bootstrap, Work lifecycle, and execution model are all hidden inside Gloo's abstractions. By keeping the transport self-contained, we make the full plugin surface visible. The transport itself is intentionally minimal and exists to serve the plugin demonstration, not as a communication library in its own.

3. Multi-Process Initialization

No simulator changes needed. OpenReg's existing design is already compatible with multi-process distributed training. Each spawned rank gets an independent simulator instance and the data exchange happens exclusively through OCCL's transport.

4. Distributed Test Reuse

PyTorch has ~288 tests in _DistTestBase and ~25 reusable test* methods in AbstractCommTest. Today, out-of-tree backends cannot reuse these. The tests are gated behind @requires_nccl(), @requires_gloo(), and @skip_if_lt_x_gpu(N).

We build infrastructure so any out-of-tree backend can validate against these tests:

  • Declarative YAML configuration for skip/xfail per test/class, with reasons. The vendors maintain configuration and not fork test files
  • Decorator bypass extending DistTestCases.backend_feature so PrivateUse1 devices count as accelerators for GPU-count checks.

Related RFC: #174469

5. Documentation

A distributed.md page added to docs/source/accelerator/, covering: c10d backend, backend registration, transport integration, Store bootstrapping, multi-process initialization, and test reuse. Structured around code references to OCCL's implementation.

Drawbacks

  • Single-machine only (TCP transport assumes localhost)
  • No fault tolerance, rank crash causes other ranks to hang until test harness timeout
  • No computation-communication overlap, collectives block the worker thread

Alternatives

  • Wrap Gloo: Lower maintenance but hides Store bootstrap, Work lifecycle, and execution model.
  • Shared memory transport: Would eliminate TCP overhead for single-machine testing but adds complexity around mprotect coordination.

Additional context

No response

cc @awgu @wanchaol @fegin @fduwjj @wz337 @wconstab @d4l3k @pragupta @msaroufim @dcci @aditvenk @xmfan @NmomoN @mengpenghui @fwenguang @cdzhan @1274085042 @PHLens @albanD

extent analysis

Problem Summary

Distributed training support for OpenReg, a PyTorch reference implementation for out-of-tree hardware backends.

Root Cause Analysis

The issue is due to the lack of a custom ProcessGroup backend and inter-process communication mechanism for OpenReg tensors, preventing collective operations across processes.

Fix Plan

1. Implement ProcessGroupOpenReg (c10d::Backend subclass)

# process_group_openreg.py
import torch.distributed as dist
from c10d import Backend, Work

class ProcessGroupOpenReg(Backend):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.worker_thread_pool = WorkerThreadPool()

    def all_reduce(self, tensor):
        # dispatch to worker thread pool
        self.worker_thread_pool.enqueue(tensor)

    def reduce_local(self, tensor):
        # perform CPU-side reduction via at::from_blob views
        return tensor.sum()

class WorkerThreadPool:
    def __init__(self):
        self.workers = []

    def enqueue(self, tensor):
        # enqueue collective operation
        self.workers.append(tensor)

    def wait(self):
        # block until worker thread pool finishes
        for worker in self.workers:
            worker.wait()

2. Implement TCP Transport Layer

# occl_transport.py
import socket

class OcclTransport:
    def __init__(self):
        self.connections = {}

    def connect(self, peer):
        # establish TCP connection
        self.connections[peer] = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.connections[peer].connect(peer)

    def send(self, tensor):
        # send tensor over TCP connection
        self.connections[tensor.peer].send(tensor.data)

    def recv(self, tensor):
        # receive tensor over TCP connection
        tensor.data = self.connections[tensor.peer].recv(tensor.size)

3.

Vote matrix · Quick signals

Works
Did the solution work? Tap to confirm.
Easy Fix
Was it a quick fix?
Time Saver
Did it save you time?
Blocking
Was it severely blocking?
Common Issue
Are others likely hitting this too?
Flaky / Intermittent
Is it intermittent?
Verified / Reproducible
Can you reproduce it reliably?
Loading…

Still need to ship something?

×6

Another batch ranked right after the header list — different links, same matching logic.

Back to top recommendations

TRENDING