Skip to content

vllm.v1.kv_offload.tiering.example.manager

ExampleSecondaryTierManager: A simple in-memory secondary tier.

This implementation provides a minimal secondary tier that stores blocks in memory (using a dictionary) with immediate completion. It serves as a reference for writing new tiers and is useful for testing the TieringOffloadingManager without requiring actual storage or network backends.

ExampleSecondaryTierManager

Bases: SecondaryTierManager

A simple in-memory secondary tier.

This implementation: - Stores blocks in a dictionary (key -> True) - Completes transfers immediately (synchronous)

Source code in vllm/v1/kv_offload/tiering/example/manager.py
class ExampleSecondaryTierManager(SecondaryTierManager):
    """
    A simple in-memory secondary tier.

    This implementation:
    - Stores blocks in a dictionary (key -> True)
    - Completes transfers immediately (synchronous)
    """

    def __init__(
        self,
        offloading_spec: "OffloadingSpec",
        primary_kv_view: memoryview,
        tier_type: str,
        custom_param: int = 0,
    ):
        """
        Initialize the example secondary tier.

        Args:
            custom_param: Dummy parameter demonstrating custom args.
        """
        super().__init__(
            offloading_spec=offloading_spec,
            primary_kv_view=primary_kv_view,
            tier_type=tier_type,
        )

        logger.info(
            "ExampleSecondaryTierManager initialized with custom_param=%d", custom_param
        )

        # key -> True (only care about presence)
        self.blocks: dict[OffloadKey, bool] = {}

        # Completed jobs waiting to be retrieved by get_finished()
        self.completed_jobs: list[JobResult] = []

    def lookup(self, key: OffloadKey, req_context: ReqContext) -> bool | None:
        """
        Check whether a block exists in this secondary tier.

        Args:
            key: Offload key to look up.
            req_context: Per-request context.

        Returns:
            True if the block is present, False if not found.
        """
        return key in self.blocks

    def submit_store(self, job_metadata: JobMetadata) -> None:
        """
        Submit a job to store blocks from primary tier to this tier.

        Args:
            job_metadata: Job metadata including job_id, keys, and
                          spec for reading blocks from the primary tier.
        """
        keys = job_metadata.keys
        block_ids = job_metadata.block_ids

        assert len(keys) == len(block_ids), (
            f"Length mismatch: {len(keys)} keys but {len(block_ids)} block_ids"
        )

        for key in keys:
            self.blocks[key] = True
        self.completed_jobs.append(JobResult(job_id=job_metadata.job_id, success=True))

    def submit_load(self, job_metadata: JobMetadata) -> None:
        """
        Submit a job to load blocks from this tier to primary tier.

        Args:
            job_metadata: Job metadata including job_id, keys, and
                          spec for writing blocks into the primary tier.
        """
        keys = job_metadata.keys
        block_ids = job_metadata.block_ids

        assert len(keys) == len(block_ids), (
            f"Length mismatch: {len(keys)} keys but {len(block_ids)} block_ids"
        )

        for key in keys:
            if key not in self.blocks:
                self.completed_jobs.append(
                    JobResult(job_id=job_metadata.job_id, success=False)
                )
                return

        self.completed_jobs.append(JobResult(job_id=job_metadata.job_id, success=True))

    def get_finished(self) -> Iterable[JobResult]:
        """
        Poll for finished jobs.

        Returns:
            Iterable of JobResult objects for all jobs that have
            finished since the last call.
        """
        result = self.completed_jobs
        self.completed_jobs = []
        return result

    def get_num_blocks(self) -> int:
        """Get the number of blocks currently stored in this tier."""
        return len(self.blocks)

__init__

__init__(
    offloading_spec: OffloadingSpec,
    primary_kv_view: memoryview,
    tier_type: str,
    custom_param: int = 0,
)

Initialize the example secondary tier.

Parameters:

Name Type Description Default
custom_param int

Dummy parameter demonstrating custom args.

0
Source code in vllm/v1/kv_offload/tiering/example/manager.py
def __init__(
    self,
    offloading_spec: "OffloadingSpec",
    primary_kv_view: memoryview,
    tier_type: str,
    custom_param: int = 0,
):
    """
    Initialize the example secondary tier.

    Args:
        custom_param: Dummy parameter demonstrating custom args.
    """
    super().__init__(
        offloading_spec=offloading_spec,
        primary_kv_view=primary_kv_view,
        tier_type=tier_type,
    )

    logger.info(
        "ExampleSecondaryTierManager initialized with custom_param=%d", custom_param
    )

    # key -> True (only care about presence)
    self.blocks: dict[OffloadKey, bool] = {}

    # Completed jobs waiting to be retrieved by get_finished()
    self.completed_jobs: list[JobResult] = []

get_finished

get_finished() -> Iterable[JobResult]

Poll for finished jobs.

Returns:

Type Description
Iterable[JobResult]

Iterable of JobResult objects for all jobs that have

Iterable[JobResult]

finished since the last call.

Source code in vllm/v1/kv_offload/tiering/example/manager.py
def get_finished(self) -> Iterable[JobResult]:
    """
    Poll for finished jobs.

    Returns:
        Iterable of JobResult objects for all jobs that have
        finished since the last call.
    """
    result = self.completed_jobs
    self.completed_jobs = []
    return result

get_num_blocks

get_num_blocks() -> int

Get the number of blocks currently stored in this tier.

Source code in vllm/v1/kv_offload/tiering/example/manager.py
def get_num_blocks(self) -> int:
    """Get the number of blocks currently stored in this tier."""
    return len(self.blocks)

lookup

lookup(
    key: OffloadKey, req_context: ReqContext
) -> bool | None

Check whether a block exists in this secondary tier.

Parameters:

Name Type Description Default
key OffloadKey

Offload key to look up.

required
req_context ReqContext

Per-request context.

required

Returns:

Type Description
bool | None

True if the block is present, False if not found.

Source code in vllm/v1/kv_offload/tiering/example/manager.py
def lookup(self, key: OffloadKey, req_context: ReqContext) -> bool | None:
    """
    Check whether a block exists in this secondary tier.

    Args:
        key: Offload key to look up.
        req_context: Per-request context.

    Returns:
        True if the block is present, False if not found.
    """
    return key in self.blocks

submit_load

submit_load(job_metadata: JobMetadata) -> None

Submit a job to load blocks from this tier to primary tier.

Parameters:

Name Type Description Default
job_metadata JobMetadata

Job metadata including job_id, keys, and spec for writing blocks into the primary tier.

required
Source code in vllm/v1/kv_offload/tiering/example/manager.py
def submit_load(self, job_metadata: JobMetadata) -> None:
    """
    Submit a job to load blocks from this tier to primary tier.

    Args:
        job_metadata: Job metadata including job_id, keys, and
                      spec for writing blocks into the primary tier.
    """
    keys = job_metadata.keys
    block_ids = job_metadata.block_ids

    assert len(keys) == len(block_ids), (
        f"Length mismatch: {len(keys)} keys but {len(block_ids)} block_ids"
    )

    for key in keys:
        if key not in self.blocks:
            self.completed_jobs.append(
                JobResult(job_id=job_metadata.job_id, success=False)
            )
            return

    self.completed_jobs.append(JobResult(job_id=job_metadata.job_id, success=True))

submit_store

submit_store(job_metadata: JobMetadata) -> None

Submit a job to store blocks from primary tier to this tier.

Parameters:

Name Type Description Default
job_metadata JobMetadata

Job metadata including job_id, keys, and spec for reading blocks from the primary tier.

required
Source code in vllm/v1/kv_offload/tiering/example/manager.py
def submit_store(self, job_metadata: JobMetadata) -> None:
    """
    Submit a job to store blocks from primary tier to this tier.

    Args:
        job_metadata: Job metadata including job_id, keys, and
                      spec for reading blocks from the primary tier.
    """
    keys = job_metadata.keys
    block_ids = job_metadata.block_ids

    assert len(keys) == len(block_ids), (
        f"Length mismatch: {len(keys)} keys but {len(block_ids)} block_ids"
    )

    for key in keys:
        self.blocks[key] = True
    self.completed_jobs.append(JobResult(job_id=job_metadata.job_id, success=True))