Skip to content

vllm.v1.outputs

AsyncModelRunnerOutput

Bases: ABC

Source code in vllm/v1/outputs.py
class AsyncModelRunnerOutput(ABC):
    @abstractmethod
    def get_output(self) -> ModelRunnerOutput:
        """Get the ModelRunnerOutput for this async output.

        This is a blocking call that waits until the results are ready, which
        might involve copying device tensors to the host.
        This method should only be called once per AsyncModelRunnerOutput.
        """
        pass

get_output abstractmethod

get_output() -> ModelRunnerOutput

Get the ModelRunnerOutput for this async output.

This is a blocking call that waits until the results are ready, which might involve copying device tensors to the host. This method should only be called once per AsyncModelRunnerOutput.

Source code in vllm/v1/outputs.py
@abstractmethod
def get_output(self) -> ModelRunnerOutput:
    """Get the ModelRunnerOutput for this async output.

    This is a blocking call that waits until the results are ready, which
    might involve copying device tensors to the host.
    This method should only be called once per AsyncModelRunnerOutput.
    """
    pass

LogprobsTensors

Bases: NamedTuple

Source code in vllm/v1/outputs.py
class LogprobsTensors(NamedTuple):
    # [num_reqs x num_generated_tokens, max_num_logprobs + 1]
    logprob_token_ids: torch.Tensor
    # [num_reqs x num_generated_tokens, max_num_logprobs + 1]
    logprobs: torch.Tensor
    # [num_reqs x num_generated_tokens]
    selected_token_ranks: torch.Tensor
    # [num_reqs]
    cu_num_generated_tokens: list[int] | None = None

    def tolists(self, cu_num_generated_tokens: list[int] | None = None):
        return LogprobsLists(
            self.logprob_token_ids.cpu().numpy(),
            self.logprobs.cpu().numpy(),
            self.selected_token_ranks.cpu().numpy(),
            cu_num_generated_tokens
            if cu_num_generated_tokens is not None
            else self.cu_num_generated_tokens,
        )

    def to_cpu_nonblocking(self) -> "LogprobsTensors":
        if self.logprob_token_ids.device.type == "cpu":
            return self
        return LogprobsTensors(
            self.logprob_token_ids.to("cpu", non_blocking=True),
            self.logprobs.to("cpu", non_blocking=True),
            self.selected_token_ranks.to("cpu", non_blocking=True),
            self.cu_num_generated_tokens,
        )

    def filter(self, mask: torch.Tensor) -> "LogprobsTensors":
        """Filter the logprobs tensors with the given bool mask."""
        assert self.cu_num_generated_tokens is None, (
            "filter can't be used with cu_num_generated_tokens"
        )
        return LogprobsTensors(
            self.logprob_token_ids[mask],
            self.logprobs[mask],
            self.selected_token_ranks[mask],
        )

    @staticmethod
    def empty_cpu(
        num_positions: int, num_tokens_per_position: int
    ) -> "LogprobsTensors":
        """Create empty LogprobsTensors on CPU."""

        logprob_token_ids = torch.empty(
            (num_positions, num_tokens_per_position), dtype=torch.int32, device="cpu"
        )
        logprobs = torch.empty_like(logprob_token_ids, dtype=torch.float32)
        selected_token_ranks = torch.empty(
            num_positions, dtype=torch.int32, device="cpu"
        )
        return LogprobsTensors(
            logprob_token_ids=logprob_token_ids,
            logprobs=logprobs,
            selected_token_ranks=selected_token_ranks,
        )

empty_cpu staticmethod

empty_cpu(
    num_positions: int, num_tokens_per_position: int
) -> LogprobsTensors

Create empty LogprobsTensors on CPU.

Source code in vllm/v1/outputs.py
@staticmethod
def empty_cpu(
    num_positions: int, num_tokens_per_position: int
) -> "LogprobsTensors":
    """Create empty LogprobsTensors on CPU."""

    logprob_token_ids = torch.empty(
        (num_positions, num_tokens_per_position), dtype=torch.int32, device="cpu"
    )
    logprobs = torch.empty_like(logprob_token_ids, dtype=torch.float32)
    selected_token_ranks = torch.empty(
        num_positions, dtype=torch.int32, device="cpu"
    )
    return LogprobsTensors(
        logprob_token_ids=logprob_token_ids,
        logprobs=logprobs,
        selected_token_ranks=selected_token_ranks,
    )

filter

filter(mask: Tensor) -> LogprobsTensors

Filter the logprobs tensors with the given bool mask.

Source code in vllm/v1/outputs.py
def filter(self, mask: torch.Tensor) -> "LogprobsTensors":
    """Filter the logprobs tensors with the given bool mask."""
    assert self.cu_num_generated_tokens is None, (
        "filter can't be used with cu_num_generated_tokens"
    )
    return LogprobsTensors(
        self.logprob_token_ids[mask],
        self.logprobs[mask],
        self.selected_token_ranks[mask],
    )

RoutedExpertsLists

Bases: NamedTuple

CPU-side routed experts, the form :meth:RoutedExpertsManager.store_batch consumes.

Batched per scheduler step: the leading dim is the number of tokens scheduled across all requests in this step (total_num_scheduled_tokens), not per-request tokens. slot_mapping[i] tells the scheduler which physical KV-cache slot row i of routing_data belongs to.

Source code in vllm/v1/outputs.py
class RoutedExpertsLists(NamedTuple):
    """CPU-side routed experts, the form :meth:`RoutedExpertsManager.store_batch`
    consumes.

    Batched per scheduler step: the leading dim is the number of tokens
    scheduled across all requests in this step (``total_num_scheduled_tokens``),
    not per-request tokens. ``slot_mapping[i]`` tells the scheduler which
    physical KV-cache slot row ``i`` of ``routing_data`` belongs to.
    """

    # (num_scheduled_tokens, num_layers, num_experts_per_tok)
    routing_data: np.ndarray
    # (num_scheduled_tokens,)
    slot_mapping: np.ndarray

RoutedExpertsTensors

Bases: NamedTuple

Device-side snapshot of routed experts data, pending async D2H.

Produced by :class:GPUModelRunner at the end of each async-scheduled step. The copy stream waits on the default stream, then issues non-blocking D2H via :meth:to_cpu_nonblocking into a pinned CPU buffer; :class:AsyncGPUModelRunnerOutput.get_output synchronizes the copy before the scheduler reads it.

Sliced to total_num_scheduled_tokens (step-level, across all requests — NOT per-request). Both routing_data and slot_mapping must be private clones when sourced from shared capturer / prepare-input buffers, so the next forward pass / _prepare_inputs on the default stream does not race with a D2H still pending on the copy stream.

Source code in vllm/v1/outputs.py
class RoutedExpertsTensors(NamedTuple):
    """Device-side snapshot of routed experts data, pending async D2H.

    Produced by :class:`GPUModelRunner` at the end of each async-scheduled
    step. The copy stream waits on the default stream, then issues
    non-blocking D2H via :meth:`to_cpu_nonblocking` into a pinned CPU
    buffer; :class:`AsyncGPUModelRunnerOutput.get_output` synchronizes
    the copy before the scheduler reads it.

    Sliced to ``total_num_scheduled_tokens`` (step-level, across all
    requests — NOT per-request). Both ``routing_data`` and
    ``slot_mapping`` must be private clones when sourced from shared
    capturer / prepare-input buffers, so the next forward pass /
    ``_prepare_inputs`` on the default stream does not race with a
    D2H still pending on the copy stream.
    """

    # (num_scheduled_tokens, num_layers, num_experts_per_tok)
    routing_data: torch.Tensor
    # (num_scheduled_tokens,)
    slot_mapping: torch.Tensor

    def to_cpu_nonblocking(self) -> "RoutedExpertsTensors":
        """Issue non-blocking D2H on the current stream.

        NOTE: ``non_blocking=True`` only delivers true overlap when the
        CPU target is pinned. The current fallback here allocates a
        new pageable CPU tensor per call, which silently degrades to a
        synchronous copy; acceptable because the sync happens on the
        dedicated copy stream, not the default stream.
        """
        if self.routing_data.device.type == "cpu":
            return self
        return RoutedExpertsTensors(
            self.routing_data.to("cpu", non_blocking=True),
            self.slot_mapping.to("cpu", non_blocking=True),
        )

    def tolists(self) -> "RoutedExpertsLists":
        """Convert to the numpy-backed form consumed by the scheduler.

        ``.cpu()`` is a no-op when the tensor is already on CPU, so this
        is cheap for the post-D2H case; for raw device tensors it will
        synchronously block, which is only reached in tests.
        """
        return RoutedExpertsLists(
            self.routing_data.cpu().numpy(),
            self.slot_mapping.cpu().numpy(),
        )

to_cpu_nonblocking

to_cpu_nonblocking() -> RoutedExpertsTensors

Issue non-blocking D2H on the current stream.

NOTE: non_blocking=True only delivers true overlap when the CPU target is pinned. The current fallback here allocates a new pageable CPU tensor per call, which silently degrades to a synchronous copy; acceptable because the sync happens on the dedicated copy stream, not the default stream.

Source code in vllm/v1/outputs.py
def to_cpu_nonblocking(self) -> "RoutedExpertsTensors":
    """Issue non-blocking D2H on the current stream.

    NOTE: ``non_blocking=True`` only delivers true overlap when the
    CPU target is pinned. The current fallback here allocates a
    new pageable CPU tensor per call, which silently degrades to a
    synchronous copy; acceptable because the sync happens on the
    dedicated copy stream, not the default stream.
    """
    if self.routing_data.device.type == "cpu":
        return self
    return RoutedExpertsTensors(
        self.routing_data.to("cpu", non_blocking=True),
        self.slot_mapping.to("cpu", non_blocking=True),
    )

tolists

tolists() -> RoutedExpertsLists

Convert to the numpy-backed form consumed by the scheduler.

.cpu() is a no-op when the tensor is already on CPU, so this is cheap for the post-D2H case; for raw device tensors it will synchronously block, which is only reached in tests.

Source code in vllm/v1/outputs.py
def tolists(self) -> "RoutedExpertsLists":
    """Convert to the numpy-backed form consumed by the scheduler.

    ``.cpu()`` is a no-op when the tensor is already on CPU, so this
    is cheap for the post-D2H case; for raw device tensors it will
    synchronously block, which is only reached in tests.
    """
    return RoutedExpertsLists(
        self.routing_data.cpu().numpy(),
        self.slot_mapping.cpu().numpy(),
    )

make_empty_encoder_model_runner_output

make_empty_encoder_model_runner_output(
    scheduler_output: SchedulerOutput,
) -> ModelRunnerOutput

Create a ModelRunnerOutput stub that contains the correct per-request bookkeeping but no generated data yet.

Source code in vllm/v1/outputs.py
def make_empty_encoder_model_runner_output(
    scheduler_output: "SchedulerOutput",
) -> ModelRunnerOutput:
    """
    Create a ModelRunnerOutput stub that contains the correct
    per-request bookkeeping but no generated data yet.
    """
    if not scheduler_output.num_scheduled_tokens:
        return EMPTY_MODEL_RUNNER_OUTPUT

    # Convert to list so we get a deterministic, indexable sequence
    req_ids: list[str] = list(scheduler_output.num_scheduled_tokens.keys())

    # Give every request its own contiguous index
    req_id_to_index: dict[str, int] = {rid: idx for idx, rid in enumerate(req_ids)}

    # No tokens generated yet ⇒ one empty list per request
    sampled_token_ids: list[list[int]] = [[0] for _ in req_ids]

    # Pooler outputs are not available yet ⇒ use None placeholders
    pooler_output: list[torch.Tensor | None] = [None for _ in req_ids]

    return ModelRunnerOutput(
        req_ids=req_ids,
        req_id_to_index=req_id_to_index,
        sampled_token_ids=sampled_token_ids,
        pooler_output=pooler_output,
    )