Skip to content

vllm.distributed.weight_transfer.ipc_engine

IPC-based weight transfer engine using CUDA IPC for communication.

IPCTrainerSendWeightsArgs dataclass

Arguments for IPC trainer_send_weights method.

Source code in vllm/distributed/weight_transfer/ipc_engine.py
@dataclass
class IPCTrainerSendWeightsArgs:
    """Arguments for IPC trainer_send_weights method."""

    send_mode: str | Callable[["IPCWeightTransferUpdateInfo"], None]
    """How to send updates to vLLM. Either a string ('ray' or 'http') for
    built-in transports, or a callable that receives an
    IPCWeightTransferUpdateInfo and performs the send."""
    llm_handle: Any = None
    """Ray actor handle or list of handles (required for 'ray' send_mode)."""
    url: str | None = None
    """Base URL for HTTP endpoint (required for 'http' send_mode)."""
    packed: bool = False
    """Whether to use packed tensor transfer for bounded-memory chunking."""
    packed_buffer_size_bytes: int = DEFAULT_PACKED_BUFFER_SIZE_BYTES
    """Size in bytes for each packed tensor buffer when packed=True."""

    def __post_init__(self):
        """Validate that required arguments are provided for the selected mode."""
        if callable(self.send_mode):
            return
        if self.send_mode == "ray" and self.llm_handle is None:
            raise ValueError("llm_handle is required for 'ray' send_mode")
        if self.send_mode == "http" and self.url is None:
            raise ValueError("url is required for 'http' send_mode")
        if self.send_mode not in ("ray", "http"):
            raise ValueError(
                f"send_mode must be 'ray', 'http', or a callable, "
                f"got {self.send_mode!r}"
            )

llm_handle class-attribute instance-attribute

llm_handle: Any = None

Ray actor handle or list of handles (required for 'ray' send_mode).

packed class-attribute instance-attribute

packed: bool = False

Whether to use packed tensor transfer for bounded-memory chunking.

packed_buffer_size_bytes class-attribute instance-attribute

packed_buffer_size_bytes: int = (
    DEFAULT_PACKED_BUFFER_SIZE_BYTES
)

Size in bytes for each packed tensor buffer when packed=True.

send_mode instance-attribute

send_mode: (
    str | Callable[[IPCWeightTransferUpdateInfo], None]
)

How to send updates to vLLM. Either a string ('ray' or 'http') for built-in transports, or a callable that receives an IPCWeightTransferUpdateInfo and performs the send.

url class-attribute instance-attribute

url: str | None = None

Base URL for HTTP endpoint (required for 'http' send_mode).

__post_init__

__post_init__()

Validate that required arguments are provided for the selected mode.

Source code in vllm/distributed/weight_transfer/ipc_engine.py
def __post_init__(self):
    """Validate that required arguments are provided for the selected mode."""
    if callable(self.send_mode):
        return
    if self.send_mode == "ray" and self.llm_handle is None:
        raise ValueError("llm_handle is required for 'ray' send_mode")
    if self.send_mode == "http" and self.url is None:
        raise ValueError("url is required for 'http' send_mode")
    if self.send_mode not in ("ray", "http"):
        raise ValueError(
            f"send_mode must be 'ray', 'http', or a callable, "
            f"got {self.send_mode!r}"
        )

IPCWeightTransferEngine

Bases: WeightTransferEngine[IPCWeightTransferInitInfo, IPCWeightTransferUpdateInfo]

Weight transfer engine using CUDA IPC for communication between trainer and workers.

This implementation uses CUDA IPC to transfer weights from the trainer (rank 0) to all inference workers in a process group. IPC handles are used to share memory between processes on the same node.

Source code in vllm/distributed/weight_transfer/ipc_engine.py
class IPCWeightTransferEngine(
    WeightTransferEngine[IPCWeightTransferInitInfo, IPCWeightTransferUpdateInfo]
):
    """
    Weight transfer engine using CUDA IPC for communication between trainer and workers.

    This implementation uses CUDA IPC to transfer weights from the trainer (rank 0)
    to all inference workers in a process group. IPC handles are used to share
    memory between processes on the same node.
    """

    # Define backend-specific dataclass types
    init_info_cls = IPCWeightTransferInitInfo
    update_info_cls = IPCWeightTransferUpdateInfo

    def __init__(
        self, config: WeightTransferConfig, parallel_config: ParallelConfig
    ) -> None:
        """
        Initialize the IPC weight transfer engine.

        Args:
            config: The configuration for the weight transfer engine
            parallel_config: The configuration for the parallel setup
        """
        super().__init__(config, parallel_config)

    def parse_update_info(
        self, update_dict: dict[str, Any]
    ) -> IPCWeightTransferUpdateInfo:
        """Parse update dict, deserializing pickled IPC handles if present.

        HTTP transport sends IPC handles as a base64-encoded pickle under the
        key ``ipc_handles_pickled``. This method deserializes them back into
        ``ipc_handles`` before constructing the typed dataclass, keeping
        serialization concerns out of the dataclass itself.

        Requires ``VLLM_ALLOW_INSECURE_SERIALIZATION=1`` because the
        payload is deserialized via ``pickle.loads``.
        """
        if "ipc_handles_pickled" in update_dict:
            if "ipc_handles" in update_dict:
                raise ValueError(
                    "Cannot specify both `ipc_handles` and `ipc_handles_pickled`"
                )

            if not envs.VLLM_ALLOW_INSECURE_SERIALIZATION:
                raise ValueError(
                    "Refusing to deserialize `ipc_handles_pickled` without "
                    "VLLM_ALLOW_INSECURE_SERIALIZATION=1"
                )

            pickled = update_dict.pop("ipc_handles_pickled")
            update_dict["ipc_handles"] = pickle.loads(base64.b64decode(pickled))

        return super().parse_update_info(update_dict)

    def init_transfer_engine(self, init_info: IPCWeightTransferInitInfo) -> None:
        """
        Initialize the weight transfer mechanism.
        This is called once at the beginning of training.
        No initialization needed for IPC backend.

        Args:
            init_info: IPC initialization info (empty)
        """
        pass

    def receive_weights(
        self,
        update_info: IPCWeightTransferUpdateInfo,
        load_weights: Callable[[list[tuple[str, torch.Tensor]]], None],
    ) -> None:
        """
        Receive weights from the trainer via CUDA IPC handles.

        Args:
            update_info: IPC update info containing parameter names, dtypes, shapes,
                        and IPC handles. Each IPC handle is a mapping between physical
                        GPU UUID and the rebuild_cuda_tensor args tuple.
            load_weights: Callable that loads weights into the model. Called
                         incrementally for each weight to avoid OOM.
        """
        device_index = torch.accelerator.current_device_index()

        if update_info.packed:
            assert update_info.tensor_sizes is not None
            assert isinstance(update_info.ipc_handles, dict)
            weights = packed_ipc_consumer(
                ipc_handle=update_info.ipc_handles,
                names=update_info.names,
                shapes=update_info.shapes,
                dtype_names=update_info.dtype_names,
                tensor_sizes=update_info.tensor_sizes,
                device_index=device_index,
            )
            load_weights(weights)
        else:
            assert isinstance(update_info.ipc_handles, list)
            weights = []
            for name, ipc_handle in zip(
                update_info.names,
                update_info.ipc_handles,
            ):
                props = torch.cuda.get_device_properties(device_index)
                physical_gpu_id = str(props.uuid)

                if physical_gpu_id not in ipc_handle:
                    raise ValueError(
                        f"IPC handle not found for GPU UUID "
                        f"{physical_gpu_id}. "
                        f"Available UUIDs: {list(ipc_handle.keys())}"
                    )

                args = ipc_handle[physical_gpu_id]
                list_args = list(args)
                # Index 6 of the args from reduce_tensor is the device_index.
                # We need to overwrite it with the receiver's device index.
                list_args[6] = device_index
                weight = rebuild_cuda_tensor(*list_args)
                weights.append((name, weight))

            load_weights(weights)

    def shutdown(self) -> None:
        pass

    @staticmethod
    def trainer_send_weights(
        iterator: Iterator[tuple[str, torch.Tensor]],
        trainer_args: dict[str, Any] | IPCTrainerSendWeightsArgs,
    ) -> None:
        """Send weights from trainer to inference workers via CUDA IPC.

        Supports two transport modes ('ray' and 'http') and two transfer
        strategies:
        - Non-packed (default): all weights in a single API call.
        - Packed (packed=True): chunked transfer with bounded GPU memory.

        For multi-GPU training, all ranks must call this method in
        parallel. IPC handles are all-gathered across ranks and merged
        so that each vLLM worker can find its own GPU UUID. Only rank 0
        sends the payload to vLLM.

        .. note::
            This method calls ``update_weights`` internally. The caller must
            call ``start_weight_update`` before and ``finish_weight_update``
            after this method.

        Args:
            iterator: Iterator of (name, tensor) pairs. For multi-GPU,
                     each rank should yield the full tensor on its own GPU
                     (e.g. via FSDP full_tensor()).
            trainer_args: IPCTrainerSendWeightsArgs or equivalent dict.
        """
        args = (
            IPCTrainerSendWeightsArgs(**trainer_args)
            if isinstance(trainer_args, dict)
            else trainer_args
        )
        device_index = torch.accelerator.current_device_index()
        gpu_uuid = str(torch.cuda.get_device_properties(device_index).uuid)
        if args.packed:
            IPCWeightTransferEngine._send_packed(iterator, args, gpu_uuid)
        else:
            IPCWeightTransferEngine._send_unpacked(iterator, args, gpu_uuid)

    @staticmethod
    def _is_rank_zero() -> bool:
        """Return True if this is rank 0 or no distributed group exists."""
        if not torch.distributed.is_initialized():
            return True
        return torch.distributed.get_rank() == 0

    @staticmethod
    def _all_gather_and_merge_handles(
        handles: list[dict[str, tuple]],
    ) -> list[dict[str, tuple]]:
        """All-gather and merge IPC handle dicts across ranks in one call.

        Each rank contributes a list of {gpu_uuid: ipc_args} dicts (one
        per parameter or one per chunk). A single all_gather_object
        collects every rank's full list, then rank 0 merges per-index so
        each dict maps every GPU UUID to its args.

        Non-rank-0 returns a list of empty dicts.
        No-op (returns handles unchanged) when no distributed group exists.
        """
        if (
            not torch.distributed.is_initialized()
            or torch.distributed.get_world_size() == 1
        ):
            return handles

        world_size = torch.distributed.get_world_size()
        gathered: list[list[dict[str, tuple]] | None] = [None] * world_size
        torch.distributed.all_gather_object(gathered, handles)
        torch.distributed.barrier()
        torch.cuda.synchronize()

        if torch.distributed.get_rank() == 0:
            merged: list[dict[str, tuple]] = []
            for param_idx in range(len(handles)):
                m: dict[str, tuple] = {}
                for rank_handles in gathered:
                    if rank_handles is not None:
                        m.update(rank_handles[param_idx])
                merged.append(m)
            return merged
        return [{} for _ in handles]

    @staticmethod
    def _post_send_sync() -> None:
        """Barrier + ipc_collect after a send; no-op if single-GPU."""
        if (
            torch.distributed.is_initialized()
            and torch.distributed.get_world_size() > 1
        ):
            torch.distributed.barrier()
        torch.cuda.ipc_collect()

    @staticmethod
    def _send_unpacked(
        iterator: Iterator[tuple[str, torch.Tensor]],
        args: IPCTrainerSendWeightsArgs,
        gpu_uuid: str,
    ) -> None:
        """Send all weights in a single API call (non-packed mode)."""
        names: list[str] = []
        dtype_names: list[str] = []
        shapes: list[list[int]] = []
        ipc_handles: list[dict[str, tuple]] = []
        # Hold strong refs to every contiguous copy until the send + post-send
        # sync completes. reduce_tensor's returned args do NOT keep storage
        # alive, and non-contiguous inputs allocate fresh storage in
        # .contiguous() that would otherwise be GC'd before the consumer opens
        # the IPC handle.
        weight_refs: list[torch.Tensor] = []

        for name, tensor in iterator:
            names.append(name)
            dtype_names.append(str(tensor.dtype).split(".")[-1])
            shapes.append(list(tensor.shape))

            weight = tensor.detach().contiguous()
            weight_refs.append(weight)
            _, ipc_args = reduce_tensor(weight)
            ipc_handles.append({gpu_uuid: ipc_args})

        ipc_handles = IPCWeightTransferEngine._all_gather_and_merge_handles(ipc_handles)

        if IPCWeightTransferEngine._is_rank_zero():
            IPCWeightTransferEngine._do_send(
                args=args,
                names=names,
                dtype_names=dtype_names,
                shapes=shapes,
                ipc_handles=ipc_handles,
            )

        IPCWeightTransferEngine._post_send_sync()

    @staticmethod
    def _send_packed(
        iterator: Iterator[tuple[str, torch.Tensor]],
        args: IPCTrainerSendWeightsArgs,
        gpu_uuid: str,
    ) -> None:
        """Send weights in bounded-memory chunks (packed mode)."""
        post_iter_func: Callable = lambda item: item[1]

        for chunk in packed_ipc_producer(
            iterator=iterator,
            gpu_uuid=gpu_uuid,
            post_iter_func=post_iter_func,
            buffer_size_bytes=args.packed_buffer_size_bytes,
        ):
            ipc_handle = IPCWeightTransferEngine._all_gather_and_merge_handles(
                [chunk.ipc_handle]
            )[0]

            if IPCWeightTransferEngine._is_rank_zero():
                IPCWeightTransferEngine._do_send(
                    args=args,
                    names=chunk.names,
                    dtype_names=chunk.dtype_names,
                    shapes=chunk.shapes,
                    ipc_handles=ipc_handle,
                    tensor_sizes=chunk.tensor_sizes,
                    packed=True,
                )

            IPCWeightTransferEngine._post_send_sync()

    @staticmethod
    def _do_send(
        args: IPCTrainerSendWeightsArgs,
        names: list[str],
        dtype_names: list[str],
        shapes: list[list[int]],
        ipc_handles: list[dict[str, tuple]] | dict[str, tuple],
        tensor_sizes: list[int] | None = None,
        packed: bool = False,
    ) -> None:
        """Send a single update payload via the configured transport."""
        update_fields: dict[str, Any] = {
            "names": names,
            "dtype_names": dtype_names,
            "shapes": shapes,
            "packed": packed,
        }
        if tensor_sizes is not None:
            update_fields["tensor_sizes"] = tensor_sizes

        update_fields["ipc_handles"] = ipc_handles
        update_info = IPCWeightTransferUpdateInfo(**update_fields)

        if callable(args.send_mode):
            args.send_mode(update_info)
        elif args.send_mode == "ray":
            handles = (
                args.llm_handle
                if isinstance(args.llm_handle, list)
                else [args.llm_handle]
            )
            ray.get(
                [
                    h.update_weights.remote(dict(update_info=asdict(update_info)))
                    for h in handles
                ]
            )
        elif args.send_mode == "http":
            pickled_handles = base64.b64encode(pickle.dumps(ipc_handles)).decode(
                "utf-8"
            )
            http_fields = {k: v for k, v in update_fields.items() if k != "ipc_handles"}
            http_fields["ipc_handles_pickled"] = pickled_handles

            url = f"{args.url}/update_weights"
            payload = {"update_info": http_fields}
            response = requests.post(url, json=payload, timeout=300)
            response.raise_for_status()

__init__

__init__(
    config: WeightTransferConfig,
    parallel_config: ParallelConfig,
) -> None

Initialize the IPC weight transfer engine.

Parameters:

Name Type Description Default
config WeightTransferConfig

The configuration for the weight transfer engine

required
parallel_config ParallelConfig

The configuration for the parallel setup

required
Source code in vllm/distributed/weight_transfer/ipc_engine.py
def __init__(
    self, config: WeightTransferConfig, parallel_config: ParallelConfig
) -> None:
    """
    Initialize the IPC weight transfer engine.

    Args:
        config: The configuration for the weight transfer engine
        parallel_config: The configuration for the parallel setup
    """
    super().__init__(config, parallel_config)

_all_gather_and_merge_handles staticmethod

_all_gather_and_merge_handles(
    handles: list[dict[str, tuple]],
) -> list[dict[str, tuple]]

All-gather and merge IPC handle dicts across ranks in one call.

Each rank contributes a list of {gpu_uuid: ipc_args} dicts (one per parameter or one per chunk). A single all_gather_object collects every rank's full list, then rank 0 merges per-index so each dict maps every GPU UUID to its args.

Non-rank-0 returns a list of empty dicts. No-op (returns handles unchanged) when no distributed group exists.

Source code in vllm/distributed/weight_transfer/ipc_engine.py
@staticmethod
def _all_gather_and_merge_handles(
    handles: list[dict[str, tuple]],
) -> list[dict[str, tuple]]:
    """All-gather and merge IPC handle dicts across ranks in one call.

    Each rank contributes a list of {gpu_uuid: ipc_args} dicts (one
    per parameter or one per chunk). A single all_gather_object
    collects every rank's full list, then rank 0 merges per-index so
    each dict maps every GPU UUID to its args.

    Non-rank-0 returns a list of empty dicts.
    No-op (returns handles unchanged) when no distributed group exists.
    """
    if (
        not torch.distributed.is_initialized()
        or torch.distributed.get_world_size() == 1
    ):
        return handles

    world_size = torch.distributed.get_world_size()
    gathered: list[list[dict[str, tuple]] | None] = [None] * world_size
    torch.distributed.all_gather_object(gathered, handles)
    torch.distributed.barrier()
    torch.cuda.synchronize()

    if torch.distributed.get_rank() == 0:
        merged: list[dict[str, tuple]] = []
        for param_idx in range(len(handles)):
            m: dict[str, tuple] = {}
            for rank_handles in gathered:
                if rank_handles is not None:
                    m.update(rank_handles[param_idx])
            merged.append(m)
        return merged
    return [{} for _ in handles]

_do_send staticmethod

_do_send(
    args: IPCTrainerSendWeightsArgs,
    names: list[str],
    dtype_names: list[str],
    shapes: list[list[int]],
    ipc_handles: list[dict[str, tuple]] | dict[str, tuple],
    tensor_sizes: list[int] | None = None,
    packed: bool = False,
) -> None

Send a single update payload via the configured transport.

Source code in vllm/distributed/weight_transfer/ipc_engine.py
@staticmethod
def _do_send(
    args: IPCTrainerSendWeightsArgs,
    names: list[str],
    dtype_names: list[str],
    shapes: list[list[int]],
    ipc_handles: list[dict[str, tuple]] | dict[str, tuple],
    tensor_sizes: list[int] | None = None,
    packed: bool = False,
) -> None:
    """Send a single update payload via the configured transport."""
    update_fields: dict[str, Any] = {
        "names": names,
        "dtype_names": dtype_names,
        "shapes": shapes,
        "packed": packed,
    }
    if tensor_sizes is not None:
        update_fields["tensor_sizes"] = tensor_sizes

    update_fields["ipc_handles"] = ipc_handles
    update_info = IPCWeightTransferUpdateInfo(**update_fields)

    if callable(args.send_mode):
        args.send_mode(update_info)
    elif args.send_mode == "ray":
        handles = (
            args.llm_handle
            if isinstance(args.llm_handle, list)
            else [args.llm_handle]
        )
        ray.get(
            [
                h.update_weights.remote(dict(update_info=asdict(update_info)))
                for h in handles
            ]
        )
    elif args.send_mode == "http":
        pickled_handles = base64.b64encode(pickle.dumps(ipc_handles)).decode(
            "utf-8"
        )
        http_fields = {k: v for k, v in update_fields.items() if k != "ipc_handles"}
        http_fields["ipc_handles_pickled"] = pickled_handles

        url = f"{args.url}/update_weights"
        payload = {"update_info": http_fields}
        response = requests.post(url, json=payload, timeout=300)
        response.raise_for_status()

_is_rank_zero staticmethod

_is_rank_zero() -> bool

Return True if this is rank 0 or no distributed group exists.

Source code in vllm/distributed/weight_transfer/ipc_engine.py
@staticmethod
def _is_rank_zero() -> bool:
    """Return True if this is rank 0 or no distributed group exists."""
    if not torch.distributed.is_initialized():
        return True
    return torch.distributed.get_rank() == 0

_post_send_sync staticmethod

_post_send_sync() -> None

Barrier + ipc_collect after a send; no-op if single-GPU.

Source code in vllm/distributed/weight_transfer/ipc_engine.py
@staticmethod
def _post_send_sync() -> None:
    """Barrier + ipc_collect after a send; no-op if single-GPU."""
    if (
        torch.distributed.is_initialized()
        and torch.distributed.get_world_size() > 1
    ):
        torch.distributed.barrier()
    torch.cuda.ipc_collect()

_send_packed staticmethod

_send_packed(
    iterator: Iterator[tuple[str, Tensor]],
    args: IPCTrainerSendWeightsArgs,
    gpu_uuid: str,
) -> None

Send weights in bounded-memory chunks (packed mode).

Source code in vllm/distributed/weight_transfer/ipc_engine.py
@staticmethod
def _send_packed(
    iterator: Iterator[tuple[str, torch.Tensor]],
    args: IPCTrainerSendWeightsArgs,
    gpu_uuid: str,
) -> None:
    """Send weights in bounded-memory chunks (packed mode)."""
    post_iter_func: Callable = lambda item: item[1]

    for chunk in packed_ipc_producer(
        iterator=iterator,
        gpu_uuid=gpu_uuid,
        post_iter_func=post_iter_func,
        buffer_size_bytes=args.packed_buffer_size_bytes,
    ):
        ipc_handle = IPCWeightTransferEngine._all_gather_and_merge_handles(
            [chunk.ipc_handle]
        )[0]

        if IPCWeightTransferEngine._is_rank_zero():
            IPCWeightTransferEngine._do_send(
                args=args,
                names=chunk.names,
                dtype_names=chunk.dtype_names,
                shapes=chunk.shapes,
                ipc_handles=ipc_handle,
                tensor_sizes=chunk.tensor_sizes,
                packed=True,
            )

        IPCWeightTransferEngine._post_send_sync()

_send_unpacked staticmethod

_send_unpacked(
    iterator: Iterator[tuple[str, Tensor]],
    args: IPCTrainerSendWeightsArgs,
    gpu_uuid: str,
) -> None

Send all weights in a single API call (non-packed mode).

Source code in vllm/distributed/weight_transfer/ipc_engine.py
@staticmethod
def _send_unpacked(
    iterator: Iterator[tuple[str, torch.Tensor]],
    args: IPCTrainerSendWeightsArgs,
    gpu_uuid: str,
) -> None:
    """Send all weights in a single API call (non-packed mode)."""
    names: list[str] = []
    dtype_names: list[str] = []
    shapes: list[list[int]] = []
    ipc_handles: list[dict[str, tuple]] = []
    # Hold strong refs to every contiguous copy until the send + post-send
    # sync completes. reduce_tensor's returned args do NOT keep storage
    # alive, and non-contiguous inputs allocate fresh storage in
    # .contiguous() that would otherwise be GC'd before the consumer opens
    # the IPC handle.
    weight_refs: list[torch.Tensor] = []

    for name, tensor in iterator:
        names.append(name)
        dtype_names.append(str(tensor.dtype).split(".")[-1])
        shapes.append(list(tensor.shape))

        weight = tensor.detach().contiguous()
        weight_refs.append(weight)
        _, ipc_args = reduce_tensor(weight)
        ipc_handles.append({gpu_uuid: ipc_args})

    ipc_handles = IPCWeightTransferEngine._all_gather_and_merge_handles(ipc_handles)

    if IPCWeightTransferEngine._is_rank_zero():
        IPCWeightTransferEngine._do_send(
            args=args,
            names=names,
            dtype_names=dtype_names,
            shapes=shapes,
            ipc_handles=ipc_handles,
        )

    IPCWeightTransferEngine._post_send_sync()

init_transfer_engine

init_transfer_engine(
    init_info: IPCWeightTransferInitInfo,
) -> None

Initialize the weight transfer mechanism. This is called once at the beginning of training. No initialization needed for IPC backend.

Parameters:

Name Type Description Default
init_info IPCWeightTransferInitInfo

IPC initialization info (empty)

required
Source code in vllm/distributed/weight_transfer/ipc_engine.py
def init_transfer_engine(self, init_info: IPCWeightTransferInitInfo) -> None:
    """
    Initialize the weight transfer mechanism.
    This is called once at the beginning of training.
    No initialization needed for IPC backend.

    Args:
        init_info: IPC initialization info (empty)
    """
    pass

parse_update_info

parse_update_info(
    update_dict: dict[str, Any],
) -> IPCWeightTransferUpdateInfo

Parse update dict, deserializing pickled IPC handles if present.

HTTP transport sends IPC handles as a base64-encoded pickle under the key ipc_handles_pickled. This method deserializes them back into ipc_handles before constructing the typed dataclass, keeping serialization concerns out of the dataclass itself.

Requires VLLM_ALLOW_INSECURE_SERIALIZATION=1 because the payload is deserialized via pickle.loads.

Source code in vllm/distributed/weight_transfer/ipc_engine.py
def parse_update_info(
    self, update_dict: dict[str, Any]
) -> IPCWeightTransferUpdateInfo:
    """Parse update dict, deserializing pickled IPC handles if present.

    HTTP transport sends IPC handles as a base64-encoded pickle under the
    key ``ipc_handles_pickled``. This method deserializes them back into
    ``ipc_handles`` before constructing the typed dataclass, keeping
    serialization concerns out of the dataclass itself.

    Requires ``VLLM_ALLOW_INSECURE_SERIALIZATION=1`` because the
    payload is deserialized via ``pickle.loads``.
    """
    if "ipc_handles_pickled" in update_dict:
        if "ipc_handles" in update_dict:
            raise ValueError(
                "Cannot specify both `ipc_handles` and `ipc_handles_pickled`"
            )

        if not envs.VLLM_ALLOW_INSECURE_SERIALIZATION:
            raise ValueError(
                "Refusing to deserialize `ipc_handles_pickled` without "
                "VLLM_ALLOW_INSECURE_SERIALIZATION=1"
            )

        pickled = update_dict.pop("ipc_handles_pickled")
        update_dict["ipc_handles"] = pickle.loads(base64.b64decode(pickled))

    return super().parse_update_info(update_dict)

receive_weights

receive_weights(
    update_info: IPCWeightTransferUpdateInfo,
    load_weights: Callable[
        [list[tuple[str, Tensor]]], None
    ],
) -> None

Receive weights from the trainer via CUDA IPC handles.

Parameters:

Name Type Description Default
update_info IPCWeightTransferUpdateInfo

IPC update info containing parameter names, dtypes, shapes, and IPC handles. Each IPC handle is a mapping between physical GPU UUID and the rebuild_cuda_tensor args tuple.

required
load_weights Callable[[list[tuple[str, Tensor]]], None]

Callable that loads weights into the model. Called incrementally for each weight to avoid OOM.

required
Source code in vllm/distributed/weight_transfer/ipc_engine.py
def receive_weights(
    self,
    update_info: IPCWeightTransferUpdateInfo,
    load_weights: Callable[[list[tuple[str, torch.Tensor]]], None],
) -> None:
    """
    Receive weights from the trainer via CUDA IPC handles.

    Args:
        update_info: IPC update info containing parameter names, dtypes, shapes,
                    and IPC handles. Each IPC handle is a mapping between physical
                    GPU UUID and the rebuild_cuda_tensor args tuple.
        load_weights: Callable that loads weights into the model. Called
                     incrementally for each weight to avoid OOM.
    """
    device_index = torch.accelerator.current_device_index()

    if update_info.packed:
        assert update_info.tensor_sizes is not None
        assert isinstance(update_info.ipc_handles, dict)
        weights = packed_ipc_consumer(
            ipc_handle=update_info.ipc_handles,
            names=update_info.names,
            shapes=update_info.shapes,
            dtype_names=update_info.dtype_names,
            tensor_sizes=update_info.tensor_sizes,
            device_index=device_index,
        )
        load_weights(weights)
    else:
        assert isinstance(update_info.ipc_handles, list)
        weights = []
        for name, ipc_handle in zip(
            update_info.names,
            update_info.ipc_handles,
        ):
            props = torch.cuda.get_device_properties(device_index)
            physical_gpu_id = str(props.uuid)

            if physical_gpu_id not in ipc_handle:
                raise ValueError(
                    f"IPC handle not found for GPU UUID "
                    f"{physical_gpu_id}. "
                    f"Available UUIDs: {list(ipc_handle.keys())}"
                )

            args = ipc_handle[physical_gpu_id]
            list_args = list(args)
            # Index 6 of the args from reduce_tensor is the device_index.
            # We need to overwrite it with the receiver's device index.
            list_args[6] = device_index
            weight = rebuild_cuda_tensor(*list_args)
            weights.append((name, weight))

        load_weights(weights)

trainer_send_weights staticmethod

trainer_send_weights(
    iterator: Iterator[tuple[str, Tensor]],
    trainer_args: dict[str, Any]
    | IPCTrainerSendWeightsArgs,
) -> None

Send weights from trainer to inference workers via CUDA IPC.

Supports two transport modes ('ray' and 'http') and two transfer strategies: - Non-packed (default): all weights in a single API call. - Packed (packed=True): chunked transfer with bounded GPU memory.

For multi-GPU training, all ranks must call this method in parallel. IPC handles are all-gathered across ranks and merged so that each vLLM worker can find its own GPU UUID. Only rank 0 sends the payload to vLLM.

.. note:: This method calls update_weights internally. The caller must call start_weight_update before and finish_weight_update after this method.

Parameters:

Name Type Description Default
iterator Iterator[tuple[str, Tensor]]

Iterator of (name, tensor) pairs. For multi-GPU, each rank should yield the full tensor on its own GPU (e.g. via FSDP full_tensor()).

required
trainer_args dict[str, Any] | IPCTrainerSendWeightsArgs

IPCTrainerSendWeightsArgs or equivalent dict.

required
Source code in vllm/distributed/weight_transfer/ipc_engine.py
@staticmethod
def trainer_send_weights(
    iterator: Iterator[tuple[str, torch.Tensor]],
    trainer_args: dict[str, Any] | IPCTrainerSendWeightsArgs,
) -> None:
    """Send weights from trainer to inference workers via CUDA IPC.

    Supports two transport modes ('ray' and 'http') and two transfer
    strategies:
    - Non-packed (default): all weights in a single API call.
    - Packed (packed=True): chunked transfer with bounded GPU memory.

    For multi-GPU training, all ranks must call this method in
    parallel. IPC handles are all-gathered across ranks and merged
    so that each vLLM worker can find its own GPU UUID. Only rank 0
    sends the payload to vLLM.

    .. note::
        This method calls ``update_weights`` internally. The caller must
        call ``start_weight_update`` before and ``finish_weight_update``
        after this method.

    Args:
        iterator: Iterator of (name, tensor) pairs. For multi-GPU,
                 each rank should yield the full tensor on its own GPU
                 (e.g. via FSDP full_tensor()).
        trainer_args: IPCTrainerSendWeightsArgs or equivalent dict.
    """
    args = (
        IPCTrainerSendWeightsArgs(**trainer_args)
        if isinstance(trainer_args, dict)
        else trainer_args
    )
    device_index = torch.accelerator.current_device_index()
    gpu_uuid = str(torch.cuda.get_device_properties(device_index).uuid)
    if args.packed:
        IPCWeightTransferEngine._send_packed(iterator, args, gpu_uuid)
    else:
        IPCWeightTransferEngine._send_unpacked(iterator, args, gpu_uuid)

IPCWeightTransferInitInfo dataclass

Bases: WeightTransferInitInfo

Initialization info for IPC weight transfer backend. No init needed for IPC.

Source code in vllm/distributed/weight_transfer/ipc_engine.py
@dataclass
class IPCWeightTransferInitInfo(WeightTransferInitInfo):
    """Initialization info for IPC weight transfer backend. No init needed for IPC."""

    pass

IPCWeightTransferUpdateInfo dataclass

Bases: WeightTransferUpdateInfo

Update info for IPC weight transfer backend.

Source code in vllm/distributed/weight_transfer/ipc_engine.py
@dataclass
class IPCWeightTransferUpdateInfo(WeightTransferUpdateInfo):
    """Update info for IPC weight transfer backend."""

    names: list[str]
    dtype_names: list[str]
    shapes: list[list[int]]
    ipc_handles: list[dict[str, tuple]] | dict[str, tuple]
    """IPC handles mapping physical GPU UUID to rebuild_cuda_tensor args.
    For non-packed mode: list of per-parameter handle dicts.
    For packed mode: single handle dict for the packed buffer."""
    tensor_sizes: list[int] | None = None
    """Per-parameter sizes in bytes within the packed buffer.
    Required when packed=True, unused otherwise."""
    packed: bool = False
    """Whether this update uses packed tensor format."""

    def __post_init__(self):
        num_params = len(self.names)
        if len(self.dtype_names) != num_params:
            raise ValueError(
                f"`dtype_names` should be of the same size as `names`: "
                f"got {len(self.dtype_names)} and {len(self.names)}"
            )
        if len(self.shapes) != num_params:
            raise ValueError(
                f"`shapes` should be of the same size as `names`: "
                f"got {len(self.shapes)} and {len(self.names)}"
            )
        if (
            not self.packed
            and isinstance(self.ipc_handles, list)
            and len(self.ipc_handles) != num_params
        ):
            raise ValueError(
                f"`ipc_handles` should be of the same size as `names`: "
                f"got {len(self.ipc_handles)} and {len(self.names)}"
            )
        if self.packed and self.tensor_sizes is None:
            raise ValueError("`tensor_sizes` is required when packed=True")

ipc_handles instance-attribute

ipc_handles: list[dict[str, tuple]] | dict[str, tuple]

IPC handles mapping physical GPU UUID to rebuild_cuda_tensor args. For non-packed mode: list of per-parameter handle dicts. For packed mode: single handle dict for the packed buffer.

packed class-attribute instance-attribute

packed: bool = False

Whether this update uses packed tensor format.

tensor_sizes class-attribute instance-attribute

tensor_sizes: list[int] | None = None

Per-parameter sizes in bytes within the packed buffer. Required when packed=True, unused otherwise.