Bases: SecondaryTierManager
A simple in-memory secondary tier.
This implementation: - Stores blocks in a dictionary (key -> True) - Completes transfers immediately (synchronous)
Source code in vllm/v1/kv_offload/tiering/example/manager.py
| class ExampleSecondaryTierManager(SecondaryTierManager):
"""
A simple in-memory secondary tier.
This implementation:
- Stores blocks in a dictionary (key -> True)
- Completes transfers immediately (synchronous)
"""
def __init__(
self,
offloading_spec: "OffloadingSpec",
primary_kv_view: memoryview,
tier_type: str,
custom_param: int = 0,
):
"""
Initialize the example secondary tier.
Args:
custom_param: Dummy parameter demonstrating custom args.
"""
super().__init__(
offloading_spec=offloading_spec,
primary_kv_view=primary_kv_view,
tier_type=tier_type,
)
logger.info(
"ExampleSecondaryTierManager initialized with custom_param=%d", custom_param
)
# key -> True (only care about presence)
self.blocks: dict[OffloadKey, bool] = {}
# Completed jobs waiting to be retrieved by get_finished()
self.completed_jobs: list[JobResult] = []
def lookup(self, key: OffloadKey, req_context: ReqContext) -> bool | None:
"""
Check whether a block exists in this secondary tier.
Args:
key: Offload key to look up.
req_context: Per-request context.
Returns:
True if the block is present, False if not found.
"""
return key in self.blocks
def submit_store(self, job_metadata: JobMetadata) -> None:
"""
Submit a job to store blocks from primary tier to this tier.
Args:
job_metadata: Job metadata including job_id, keys, and
spec for reading blocks from the primary tier.
"""
keys = job_metadata.keys
block_ids = job_metadata.block_ids
assert len(keys) == len(block_ids), (
f"Length mismatch: {len(keys)} keys but {len(block_ids)} block_ids"
)
for key in keys:
self.blocks[key] = True
self.completed_jobs.append(JobResult(job_id=job_metadata.job_id, success=True))
def submit_load(self, job_metadata: JobMetadata) -> None:
"""
Submit a job to load blocks from this tier to primary tier.
Args:
job_metadata: Job metadata including job_id, keys, and
spec for writing blocks into the primary tier.
"""
keys = job_metadata.keys
block_ids = job_metadata.block_ids
assert len(keys) == len(block_ids), (
f"Length mismatch: {len(keys)} keys but {len(block_ids)} block_ids"
)
for key in keys:
if key not in self.blocks:
self.completed_jobs.append(
JobResult(job_id=job_metadata.job_id, success=False)
)
return
self.completed_jobs.append(JobResult(job_id=job_metadata.job_id, success=True))
def get_finished(self) -> Iterable[JobResult]:
"""
Poll for finished jobs.
Returns:
Iterable of JobResult objects for all jobs that have
finished since the last call.
"""
result = self.completed_jobs
self.completed_jobs = []
return result
def get_num_blocks(self) -> int:
"""Get the number of blocks currently stored in this tier."""
return len(self.blocks)
|
__init__
Initialize the example secondary tier.
Parameters:
| Name | Type | Description | Default |
custom_param | int | Dummy parameter demonstrating custom args. | 0 |
Source code in vllm/v1/kv_offload/tiering/example/manager.py
| def __init__(
self,
offloading_spec: "OffloadingSpec",
primary_kv_view: memoryview,
tier_type: str,
custom_param: int = 0,
):
"""
Initialize the example secondary tier.
Args:
custom_param: Dummy parameter demonstrating custom args.
"""
super().__init__(
offloading_spec=offloading_spec,
primary_kv_view=primary_kv_view,
tier_type=tier_type,
)
logger.info(
"ExampleSecondaryTierManager initialized with custom_param=%d", custom_param
)
# key -> True (only care about presence)
self.blocks: dict[OffloadKey, bool] = {}
# Completed jobs waiting to be retrieved by get_finished()
self.completed_jobs: list[JobResult] = []
|
get_finished
Poll for finished jobs.
Returns:
Source code in vllm/v1/kv_offload/tiering/example/manager.py
| def get_finished(self) -> Iterable[JobResult]:
"""
Poll for finished jobs.
Returns:
Iterable of JobResult objects for all jobs that have
finished since the last call.
"""
result = self.completed_jobs
self.completed_jobs = []
return result
|
get_num_blocks
Get the number of blocks currently stored in this tier.
Source code in vllm/v1/kv_offload/tiering/example/manager.py
| def get_num_blocks(self) -> int:
"""Get the number of blocks currently stored in this tier."""
return len(self.blocks)
|
lookup
lookup(
key: OffloadKey, req_context: ReqContext
) -> bool | None
Check whether a block exists in this secondary tier.
Parameters:
| Name | Type | Description | Default |
key | OffloadKey | | required |
req_context | ReqContext | | required |
Returns:
| Type | Description |
bool | None | True if the block is present, False if not found. |
Source code in vllm/v1/kv_offload/tiering/example/manager.py
| def lookup(self, key: OffloadKey, req_context: ReqContext) -> bool | None:
"""
Check whether a block exists in this secondary tier.
Args:
key: Offload key to look up.
req_context: Per-request context.
Returns:
True if the block is present, False if not found.
"""
return key in self.blocks
|
submit_load
Submit a job to load blocks from this tier to primary tier.
Parameters:
| Name | Type | Description | Default |
job_metadata | JobMetadata | Job metadata including job_id, keys, and spec for writing blocks into the primary tier. | required |
Source code in vllm/v1/kv_offload/tiering/example/manager.py
| def submit_load(self, job_metadata: JobMetadata) -> None:
"""
Submit a job to load blocks from this tier to primary tier.
Args:
job_metadata: Job metadata including job_id, keys, and
spec for writing blocks into the primary tier.
"""
keys = job_metadata.keys
block_ids = job_metadata.block_ids
assert len(keys) == len(block_ids), (
f"Length mismatch: {len(keys)} keys but {len(block_ids)} block_ids"
)
for key in keys:
if key not in self.blocks:
self.completed_jobs.append(
JobResult(job_id=job_metadata.job_id, success=False)
)
return
self.completed_jobs.append(JobResult(job_id=job_metadata.job_id, success=True))
|
submit_store
Submit a job to store blocks from primary tier to this tier.
Parameters:
| Name | Type | Description | Default |
job_metadata | JobMetadata | Job metadata including job_id, keys, and spec for reading blocks from the primary tier. | required |
Source code in vllm/v1/kv_offload/tiering/example/manager.py
| def submit_store(self, job_metadata: JobMetadata) -> None:
"""
Submit a job to store blocks from primary tier to this tier.
Args:
job_metadata: Job metadata including job_id, keys, and
spec for reading blocks from the primary tier.
"""
keys = job_metadata.keys
block_ids = job_metadata.block_ids
assert len(keys) == len(block_ids), (
f"Length mismatch: {len(keys)} keys but {len(block_ids)} block_ids"
)
for key in keys:
self.blocks[key] = True
self.completed_jobs.append(JobResult(job_id=job_metadata.job_id, success=True))
|