Skip to content

vllm.distributed.weight_transfer.packed_tensor

Packed tensor utilities for efficient weight transfer.

PackedChunk dataclass

Result of packing tensors into a single contiguous uint8 buffer.

Source code in vllm/distributed/weight_transfer/packed_tensor.py
@dataclass
class PackedChunk:
    """Result of packing tensors into a single contiguous uint8 buffer."""

    packed_tensor: torch.Tensor
    names: list[str]
    shapes: list[list[int]]
    dtypes: list[torch.dtype]
    tensor_sizes: list[int]

PackedIpcChunk dataclass

Metadata and IPC handle for a single packed chunk.

Source code in vllm/distributed/weight_transfer/packed_tensor.py
@dataclass
class PackedIpcChunk:
    """Metadata and IPC handle for a single packed chunk."""

    names: list[str]
    shapes: list[list[int]]
    dtype_names: list[str]
    tensor_sizes: list[int]
    ipc_handle: dict[str, tuple]

pack_tensors

pack_tensors(
    iterator: Iterator[tuple[str, Tensor]],
    post_iter_func: Callable[[tuple[str, Tensor]], Tensor],
    buffer_size_bytes: int,
    tensor_list: list[Tensor] | None = None,
    current_size: int = 0,
) -> PackedChunk | None

Pack tensors from an iterator into a single contiguous uint8 buffer.

Consumes from the iterator until the accumulated size exceeds buffer_size_bytes or the iterator is exhausted, then returns a PackedChunk. Returns None if no tensors were consumed.

Parameters:

Name Type Description Default
iterator Iterator[tuple[str, Tensor]]

Iterator of (name, tensor) pairs

required
post_iter_func Callable[[tuple[str, Tensor]], Tensor]

Applied to each item before linearizing to uint8

required
buffer_size_bytes int

Max bytes before flushing

required
tensor_list list[Tensor] | None

Pre-existing tensor list to append to (for NCCL multi-buffer reuse). If None, a fresh list is created.

None
current_size int

Byte count already accumulated in tensor_list

0
Source code in vllm/distributed/weight_transfer/packed_tensor.py
def pack_tensors(
    iterator: Iterator[tuple[str, torch.Tensor]],
    post_iter_func: Callable[[tuple[str, torch.Tensor]], torch.Tensor],
    buffer_size_bytes: int,
    tensor_list: list[torch.Tensor] | None = None,
    current_size: int = 0,
) -> PackedChunk | None:
    """Pack tensors from an iterator into a single contiguous uint8 buffer.

    Consumes from the iterator until the accumulated size exceeds
    buffer_size_bytes or the iterator is exhausted, then returns a
    PackedChunk. Returns None if no tensors were consumed.

    Args:
        iterator: Iterator of (name, tensor) pairs
        post_iter_func: Applied to each item before linearizing to uint8
        buffer_size_bytes: Max bytes before flushing
        tensor_list: Pre-existing tensor list to append to (for NCCL
                    multi-buffer reuse). If None, a fresh list is created.
        current_size: Byte count already accumulated in tensor_list
    """
    if tensor_list is None:
        tensor_list = []

    names: list[str] = []
    shapes: list[list[int]] = []
    dtypes: list[torch.dtype] = []
    tensor_sizes: list[int] = []
    total_bytes = current_size

    while True:
        try:
            item = next(iterator)
        except StopIteration:
            break

        name, orig_tensor = item
        # Apply post processing and convert to linearized uint8 tensor
        tensor = post_iter_func(item).contiguous().view(torch.uint8).view(-1)

        if tensor.numel() > buffer_size_bytes:
            import warnings

            warnings.warn(
                f"Tensor '{name}' has size {tensor.numel()} bytes, which "
                f"exceeds buffer_size_bytes={buffer_size_bytes}.",
                stacklevel=2,
            )

        tensor_list.append(tensor)
        names.append(name)
        shapes.append(list(orig_tensor.shape))
        dtypes.append(orig_tensor.dtype)
        tensor_sizes.append(tensor.numel())
        total_bytes += tensor.numel()

        if total_bytes > buffer_size_bytes:
            break

    if not tensor_list:
        return None

    packed = torch.cat(tensor_list, dim=0)
    del tensor_list
    return PackedChunk(
        packed_tensor=packed,
        names=names,
        shapes=shapes,
        dtypes=dtypes,
        tensor_sizes=tensor_sizes,
    )

packed_ipc_consumer

packed_ipc_consumer(
    ipc_handle: dict[str, tuple],
    names: list[str],
    shapes: list[list[int]],
    dtype_names: list[str],
    tensor_sizes: list[int],
    device_index: int,
) -> list[tuple[str, Tensor]]

Unpack a single packed IPC chunk into named tensors.

Reconstructs the packed buffer via rebuild_cuda_tensor, unpacks into individual tensors, and clones each into independent storage before returning.

The clone is intentional: the producer reuses one IPC buffer across chunks, so any tensor view that aliases the buffer would observe the next chunk's bytes as soon as the producer's generator is resumed. Callers that retain references past their own update_weights call (notably vLLM's layerwise reload, which buffers bound_args for replay in _layerwise_process) would otherwise replay against stale data and silently corrupt multi-chunk weight transfers.

Parameters:

Name Type Description Default
ipc_handle dict[str, tuple]

Mapping of GPU UUID to rebuild_cuda_tensor args tuple

required
names list[str]

Parameter names in the packed buffer

required
shapes list[list[int]]

Parameter shapes

required
dtype_names list[str]

Parameter dtype name strings (e.g. "float16")

required
tensor_sizes list[int]

Size in bytes of each parameter in the packed buffer

required
device_index int

Local CUDA device index

required
Source code in vllm/distributed/weight_transfer/packed_tensor.py
def packed_ipc_consumer(
    ipc_handle: dict[str, tuple],
    names: list[str],
    shapes: list[list[int]],
    dtype_names: list[str],
    tensor_sizes: list[int],
    device_index: int,
) -> list[tuple[str, torch.Tensor]]:
    """Unpack a single packed IPC chunk into named tensors.

    Reconstructs the packed buffer via rebuild_cuda_tensor, unpacks
    into individual tensors, and clones each into independent storage
    before returning.

    The clone is intentional: the producer reuses one IPC buffer across
    chunks, so any tensor view that aliases the buffer would observe the
    *next* chunk's bytes as soon as the producer's generator is resumed.
    Callers that retain references past their own update_weights call
    (notably vLLM's layerwise reload, which buffers ``bound_args`` for
    replay in ``_layerwise_process``) would otherwise replay against
    stale data and silently corrupt multi-chunk weight transfers.

    Args:
        ipc_handle: Mapping of GPU UUID to rebuild_cuda_tensor args tuple
        names: Parameter names in the packed buffer
        shapes: Parameter shapes
        dtype_names: Parameter dtype name strings (e.g. "float16")
        tensor_sizes: Size in bytes of each parameter in the packed buffer
        device_index: Local CUDA device index
    """
    from torch.multiprocessing.reductions import rebuild_cuda_tensor

    props = torch.cuda.get_device_properties(device_index)
    physical_gpu_id = str(props.uuid)

    if physical_gpu_id not in ipc_handle:
        raise ValueError(
            f"IPC handle not found for GPU UUID {physical_gpu_id}. "
            f"Available UUIDs: {list(ipc_handle.keys())}"
        )

    args = ipc_handle[physical_gpu_id]
    list_args = list(args)
    list_args[6] = device_index
    packed = rebuild_cuda_tensor(*list_args)

    content_size = sum(tensor_sizes)
    packed = packed[:content_size]

    dtypes = [getattr(torch, dn) for dn in dtype_names]
    return [
        (name, t.clone())
        for name, t in unpack_tensor(packed, names, shapes, dtypes, tensor_sizes)
    ]

packed_ipc_producer

packed_ipc_producer(
    iterator: Iterator[tuple[str, Tensor]],
    gpu_uuid: str,
    post_iter_func: Callable[[tuple[str, Tensor]], Tensor],
    buffer_size_bytes: int = DEFAULT_PACKED_BUFFER_SIZE_BYTES,
) -> Iterator[PackedIpcChunk]

Pack tensors into a reusable IPC buffer and yield handles.

Allocates a single GPU buffer of buffer_size_bytes and registers it for IPC once via reduce_tensor. Each chunk's packed data is copied into this buffer before yielding, so only one IPC-shared allocation exists for the lifetime of the transfer.

Callers must ensure the consumer has finished reading the buffer (e.g. ray.get returned) before resuming the generator for the next chunk.

Parameters:

Name Type Description Default
iterator Iterator[tuple[str, Tensor]]

Iterator of (name, tensor) pairs.

required
gpu_uuid str

Physical GPU UUID string for this rank.

required
post_iter_func Callable[[tuple[str, Tensor]], Tensor]

Applied to each (name, tensor) before packing.

required
buffer_size_bytes int

Exact capacity of the reusable IPC buffer. Every chunk is guaranteed to fit within this size. A ValueError is raised if any single tensor exceeds it.

DEFAULT_PACKED_BUFFER_SIZE_BYTES
Source code in vllm/distributed/weight_transfer/packed_tensor.py
def packed_ipc_producer(
    iterator: Iterator[tuple[str, torch.Tensor]],
    gpu_uuid: str,
    post_iter_func: Callable[[tuple[str, torch.Tensor]], torch.Tensor],
    buffer_size_bytes: int = DEFAULT_PACKED_BUFFER_SIZE_BYTES,
) -> Iterator[PackedIpcChunk]:
    """Pack tensors into a reusable IPC buffer and yield handles.

    Allocates a single GPU buffer of ``buffer_size_bytes`` and registers
    it for IPC once via ``reduce_tensor``.  Each chunk's packed data is
    copied into this buffer before yielding, so only one IPC-shared
    allocation exists for the lifetime of the transfer.

    Callers **must** ensure the consumer has finished reading the buffer
    (e.g. ``ray.get`` returned) before resuming the generator for the
    next chunk.

    Args:
        iterator: Iterator of (name, tensor) pairs.
        gpu_uuid: Physical GPU UUID string for this rank.
        post_iter_func: Applied to each (name, tensor) before packing.
        buffer_size_bytes: Exact capacity of the reusable IPC buffer.
            Every chunk is guaranteed to fit within this size.  A
            ``ValueError`` is raised if any single tensor exceeds it.
    """
    ipc_buffer = torch.empty(buffer_size_bytes, dtype=torch.uint8, device="cuda")
    _, ipc_args = reduce_tensor(ipc_buffer)

    names: list[str] = []
    shapes: list[list[int]] = []
    dtypes: list[torch.dtype] = []
    tensor_sizes: list[int] = []
    total_bytes = 0

    for name, orig_tensor in iterator:
        flat = (
            post_iter_func((name, orig_tensor)).contiguous().view(torch.uint8).view(-1)
        )

        if flat.numel() > buffer_size_bytes:
            raise ValueError(
                f"Tensor '{name}' has size {flat.numel()} bytes, "
                f"which exceeds buffer_size_bytes={buffer_size_bytes}. "
                f"Increase buffer_size_bytes to at least {flat.numel()}."
            )

        if total_bytes and total_bytes + flat.numel() > buffer_size_bytes:
            # Drain queued copies so the consumer sees a fully-written buffer.
            torch.cuda.current_stream().synchronize()
            yield PackedIpcChunk(
                names=names,
                shapes=shapes,
                dtype_names=[str(d).split(".")[-1] for d in dtypes],
                tensor_sizes=tensor_sizes,
                ipc_handle={gpu_uuid: ipc_args},
            )
            # Rebind to fresh lists so the yielded chunk's metadata is
            # not mutated while the consumer is still reading.
            names, shapes, dtypes, tensor_sizes = [], [], [], []
            total_bytes = 0

        ipc_buffer[total_bytes : total_bytes + flat.numel()].copy_(flat)
        names.append(name)
        shapes.append(list(orig_tensor.shape))
        dtypes.append(orig_tensor.dtype)
        tensor_sizes.append(flat.numel())
        total_bytes += flat.numel()

    if total_bytes:
        torch.cuda.current_stream().synchronize()
        yield PackedIpcChunk(
            names=names,
            shapes=shapes,
            dtype_names=[str(d).split(".")[-1] for d in dtypes],
            tensor_sizes=tensor_sizes,
            ipc_handle={gpu_uuid: ipc_args},
        )

packed_nccl_broadcast_consumer

packed_nccl_broadcast_consumer(
    iterator: Iterator[tuple[str, tuple[list[int], dtype]]],
    group: Any,
    src: int,
    post_unpack_func: Callable[
        [list[tuple[str, Tensor]]], None
    ],
    buffer_size_bytes: int = DEFAULT_PACKED_BUFFER_SIZE_BYTES,
    num_buffers: int = DEFAULT_PACKED_NUM_BUFFERS,
) -> None

Consume packed tensors and unpack them into a list of tensors.

Parameters:

Name Type Description Default
iterator Iterator[tuple[str, tuple[list[int], dtype]]]

Iterator of parameter metadata. Returns (name, (shape, dtype))

required
group Any

Process group (PyNcclCommunicator)

required
src int

Source rank (0 in current implementation)

required
post_unpack_func Callable[[list[tuple[str, Tensor]]], None]

Function to apply to each list of (name, tensor) after unpacking

required
buffer_size_bytes int

Size in bytes for each packed tensor buffer. Both producer and consumer must use the same value.

DEFAULT_PACKED_BUFFER_SIZE_BYTES
num_buffers int

Number of buffers for double/triple buffering. Both producer and consumer must use the same value.

DEFAULT_PACKED_NUM_BUFFERS
Source code in vllm/distributed/weight_transfer/packed_tensor.py
def packed_nccl_broadcast_consumer(
    iterator: Iterator[tuple[str, tuple[list[int], torch.dtype]]],
    group: Any,
    src: int,
    post_unpack_func: Callable[[list[tuple[str, torch.Tensor]]], None],
    buffer_size_bytes: int = DEFAULT_PACKED_BUFFER_SIZE_BYTES,
    num_buffers: int = DEFAULT_PACKED_NUM_BUFFERS,
) -> None:
    """Consume packed tensors and unpack them into a list of tensors.

    Args:
        iterator: Iterator of parameter metadata. Returns (name, (shape, dtype))
        group: Process group (PyNcclCommunicator)
        src: Source rank (0 in current implementation)
        post_unpack_func: Function to apply to each list of (name, tensor) after
                         unpacking
        buffer_size_bytes: Size in bytes for each packed tensor buffer.
                          Both producer and consumer must use the same value.
        num_buffers: Number of buffers for double/triple buffering.
                    Both producer and consumer must use the same value.

    """
    target_packed_tensor_size = buffer_size_bytes

    streams = [torch.cuda.Stream() for _ in range(num_buffers)]
    buffer_idx = 0

    packing_tensor_meta_data: list[list[tuple[str, list[int], torch.dtype, int]]] = [
        [] for _ in range(num_buffers)
    ]
    packing_tensor_sizes: list[int] = [0 for _ in range(num_buffers)]
    packed_tensors: list[torch.Tensor] = [
        torch.empty(0, dtype=torch.uint8, device="cuda") for _ in range(num_buffers)
    ]

    while True:
        # Synchronize the current stream
        streams[buffer_idx].synchronize()
        with torch.cuda.stream(streams[buffer_idx]):
            # Initialize the packing tensor meta data
            packing_tensor_meta_data[buffer_idx] = []
            packing_tensor_sizes[buffer_idx] = 0
            try:
                # Form a packed tensor
                while True:
                    name, (shape, dtype) = next(iterator)
                    tensor_size = math.prod(shape) * dtype.itemsize
                    packing_tensor_meta_data[buffer_idx].append(
                        (name, shape, dtype, tensor_size)
                    )
                    packing_tensor_sizes[buffer_idx] += tensor_size
                    if packing_tensor_sizes[buffer_idx] > target_packed_tensor_size:
                        break
                # Create a packed tensor and broadcast it
                packed_tensors[buffer_idx] = torch.empty(
                    packing_tensor_sizes[buffer_idx], dtype=torch.uint8, device="cuda"
                )
                group.broadcast(packed_tensors[buffer_idx], src=src)
                # Load the packed tensor into the model
                names, shapes, dtypes, tensor_sizes = zip(
                    *packing_tensor_meta_data[buffer_idx]
                )
                post_unpack_func(
                    unpack_tensor(
                        packed_tensors[buffer_idx],
                        list(names),
                        list(shapes),
                        list(dtypes),
                        list(tensor_sizes),
                    )
                )
                # Move to the next buffer
                buffer_idx = (buffer_idx + 1) % num_buffers
            except StopIteration:
                # Do the last broadcast if there are remaining tensors
                if len(packing_tensor_meta_data[buffer_idx]) > 0:
                    # Create a packed tensor and broadcast it
                    packed_tensors[buffer_idx] = torch.empty(
                        packing_tensor_sizes[buffer_idx],
                        dtype=torch.uint8,
                        device="cuda",
                    )
                    group.broadcast(packed_tensors[buffer_idx], src=src)
                    # Load the packed tensor into the model
                    names, shapes, dtypes, tensor_sizes = zip(
                        *packing_tensor_meta_data[buffer_idx]
                    )
                    post_unpack_func(
                        unpack_tensor(
                            packed_tensors[buffer_idx],
                            list(names),
                            list(shapes),
                            list(dtypes),
                            list(tensor_sizes),
                        )
                    )
                break

packed_nccl_broadcast_producer

packed_nccl_broadcast_producer(
    iterator: Iterator[tuple[str, Tensor]],
    group: Any,
    src: int,
    post_iter_func: Callable[[tuple[str, Tensor]], Tensor],
    buffer_size_bytes: int = DEFAULT_PACKED_BUFFER_SIZE_BYTES,
    num_buffers: int = DEFAULT_PACKED_NUM_BUFFERS,
) -> None

Broadcast tensors in a packed manner from trainer to workers.

Parameters:

Name Type Description Default
iterator Iterator[tuple[str, Tensor]]

Iterator of model parameters. Returns a tuple of (name, tensor)

required
group Any

Process group (PyNcclCommunicator)

required
src int

Source rank (0 in current implementation)

required
post_iter_func Callable[[tuple[str, Tensor]], Tensor]

Function to apply to each (name, tensor) pair before packing, should return a tensor

required
buffer_size_bytes int

Size in bytes for each packed tensor buffer. Both producer and consumer must use the same value.

DEFAULT_PACKED_BUFFER_SIZE_BYTES
num_buffers int

Number of buffers for double/triple buffering. Both producer and consumer must use the same value.

DEFAULT_PACKED_NUM_BUFFERS
Source code in vllm/distributed/weight_transfer/packed_tensor.py
def packed_nccl_broadcast_producer(
    iterator: Iterator[tuple[str, torch.Tensor]],
    group: Any,
    src: int,
    post_iter_func: Callable[[tuple[str, torch.Tensor]], torch.Tensor],
    buffer_size_bytes: int = DEFAULT_PACKED_BUFFER_SIZE_BYTES,
    num_buffers: int = DEFAULT_PACKED_NUM_BUFFERS,
) -> None:
    """Broadcast tensors in a packed manner from trainer to workers.

    Args:
        iterator: Iterator of model parameters. Returns a tuple of (name, tensor)
        group: Process group (PyNcclCommunicator)
        src: Source rank (0 in current implementation)
        post_iter_func: Function to apply to each (name, tensor) pair before
                       packing, should return a tensor
        buffer_size_bytes: Size in bytes for each packed tensor buffer.
                          Both producer and consumer must use the same value.
        num_buffers: Number of buffers for double/triple buffering.
                    Both producer and consumer must use the same value.

    """
    streams = [torch.cuda.Stream() for _ in range(num_buffers)]
    # Keep references to in-flight chunks so their packed_tensors
    # aren't freed while an async broadcast is still reading them.
    in_flight: list[PackedChunk | None] = [None] * num_buffers
    buffer_idx = 0

    while True:
        # Synchronize the current stream
        streams[buffer_idx].synchronize()
        # Previous chunk on this buffer slot is now safe to free
        in_flight[buffer_idx] = None
        # Start tasks for the new buffer in a new stream
        with torch.cuda.stream(streams[buffer_idx]):
            chunk = pack_tensors(iterator, post_iter_func, buffer_size_bytes)
            if chunk is None:
                break
            # Pack the tensors and call broadcast collective
            group.broadcast(chunk.packed_tensor, src=src)
            # Hold reference until this stream is synchronized
            in_flight[buffer_idx] = chunk
            # Move to the next buffer
            buffer_idx = (buffer_idx + 1) % num_buffers

unpack_tensor

unpack_tensor(
    packed_tensor: Tensor,
    names: list[str],
    shapes: list[list[int]],
    dtypes: list[dtype],
    tensor_sizes: list[int],
) -> list[tuple[str, Tensor]]

Unpack a packed uint8 tensor into a list of named tensors.

The returned tensors are views of packed_tensor (the .contiguous() call is a no-op on already-contiguous row-slices). If packed_tensor lives in storage that may be reused — e.g. a reused CUDA IPC buffer — callers must clone the results before the underlying storage is overwritten.

Parameters:

Name Type Description Default
packed_tensor Tensor

The packed torch.uint8 tensor to unpack

required
names list[str]

List of tensor names

required
shapes list[list[int]]

List of tensor shapes

required
dtypes list[dtype]

List of tensor dtypes

required
tensor_sizes list[int]

List of tensor sizes in bytes

required
Source code in vllm/distributed/weight_transfer/packed_tensor.py
def unpack_tensor(
    packed_tensor: torch.Tensor,
    names: list[str],
    shapes: list[list[int]],
    dtypes: list[torch.dtype],
    tensor_sizes: list[int],
) -> list[tuple[str, torch.Tensor]]:
    """Unpack a packed uint8 tensor into a list of named tensors.

    The returned tensors are **views** of ``packed_tensor`` (the
    ``.contiguous()`` call is a no-op on already-contiguous row-slices).
    If ``packed_tensor`` lives in storage that may be reused — e.g. a
    reused CUDA IPC buffer — callers must clone the results before the
    underlying storage is overwritten.

    Args:
        packed_tensor: The packed torch.uint8 tensor to unpack
        names: List of tensor names
        shapes: List of tensor shapes
        dtypes: List of tensor dtypes
        tensor_sizes: List of tensor sizes in bytes
    """
    unpacked_tensors = packed_tensor.split(tensor_sizes)

    return [
        (name, tensor.contiguous().view(dtype).view(*shape))
        for name, shape, dtype, tensor in zip(names, shapes, dtypes, unpacked_tensors)
    ]