Skip to content

vllm.v1.kv_offload.tiering.spec

TieringOffloadingSpec: Spec for multi-tier KV cache offloading.

This spec creates a TieringOffloadingManager with a CPU primary tier and configurable secondary tiers (e.g., Storage, Network).

Configuration via kv_connector_extra_config
  • cpu_bytes_to_use: (required) Bytes to allocate for CPU primary tier
  • block_size: (optional) Block size for offloaded blocks (default: GPU block size)
  • eviction_policy: (optional) Primary tier eviction policy: "lru" or "arc" (default: "lru")
  • secondary_tiers: (optional) List of secondary tier configurations Each secondary tier config is a dict with:
    • type: (required) Type of secondary tier (e.g., "example", "storage", "network")
    • Additional tier-specific parameters are passed directly to the tier constructor. See each tier's documentation for supported parameters.

Example configuration: { "cpu_bytes_to_use": 10737418240, # 10 GB "block_size": 16, "eviction_policy": "lru", "secondary_tiers": [ { "type": "example", "custom_param": 67 } ] }

TieringOffloadingSpec

Bases: CPUOffloadingSpec

Spec for multi-tier KV cache offloading.

Creates a TieringOffloadingManager with: - Primary tier: CPU (LRU or ARC eviction policy) - Secondary tiers: Configurable via extra_config

The CPU primary tier has direct GPU access and serves as the gateway for all GPU↔offload operations. Secondary tiers cannot directly access GPU memory and must transfer data through the primary tier.

Source code in vllm/v1/kv_offload/tiering/spec.py
class TieringOffloadingSpec(CPUOffloadingSpec):
    """
    Spec for multi-tier KV cache offloading.

    Creates a TieringOffloadingManager with:
    - Primary tier: CPU (LRU or ARC eviction policy)
    - Secondary tiers: Configurable via extra_config

    The CPU primary tier has direct GPU access and serves as the gateway for
    all GPU↔offload operations. Secondary tiers cannot directly access GPU
    memory and must transfer data through the primary tier.
    """

    def __init__(self, vllm_config: VllmConfig, kv_cache_config: KVCacheConfig):
        super().__init__(vllm_config, kv_cache_config)
        # Redeclare for mypy: parent sets this but `--follow-imports skip` hides it
        self._manager: OffloadingManager | None = None

        # Parse secondary tier configurations
        self.secondary_tier_configs = self.extra_config.get("secondary_tiers", [])
        if not isinstance(self.secondary_tier_configs, list):
            raise ValueError("secondary_tiers must be a list of tier configurations")

        # Scheduler-side mmap (rank=None); kept for cleanup
        self._scheduler_mmap: SharedOffloadRegion | None = None

    @override
    def get_manager(self) -> OffloadingManager:
        """
        Get the TieringOffloadingManager.

        Creates a TieringOffloadingManager with:
        - Primary tier: CPU (LRU or ARC)
        - Secondary tiers: As configured in extra_config

        Returns:
            TieringOffloadingManager instance
        """
        if not self._manager:
            kv_events_config = self.vllm_config.kv_events_config
            enable_events = (
                kv_events_config is not None and kv_events_config.enable_kv_cache_events
            )

            # Create scheduler-side SharedOffloadRegion (rank=None) so the
            # primary tier can eagerly create a memoryview over _base.
            world_size = self.vllm_config.parallel_config.world_size
            scheduler_mmap = SharedOffloadRegion(
                instance_id=self.vllm_config.instance_id,
                total_size_bytes=self.cpu_page_size_per_worker
                * world_size
                * self.num_blocks,
                num_blocks=self.num_blocks,
                rank=None,
                num_workers=world_size,
                cpu_page_size=self.cpu_page_size_per_worker,
            )
            self._scheduler_mmap = scheduler_mmap

            # Create primary tier (CPU-based)
            assert len(self.gpu_block_size) == 1
            primary_tier = CPUPrimaryTierOffloadingManager(
                num_blocks=self.num_blocks,
                cache_policy=self.eviction_policy,  # type: ignore[arg-type]
                enable_events=enable_events,
                mmap_region=scheduler_mmap,
            )

            # Create secondary tiers
            primary_kv_view = primary_tier.get_kv_memoryview()
            secondary_tiers = []
            for i, tier_config in enumerate(self.secondary_tier_configs):
                try:
                    tier = SecondaryTierFactory.create_secondary_tier(
                        tier_config, primary_kv_view, self
                    )
                    secondary_tiers.append(tier)
                    logger.info(
                        "Created secondary tier #%d (%s)",
                        i,
                        tier.tier_type,
                    )
                except Exception as e:
                    logger.error(
                        "Failed to create secondary tier from config %s: %s",
                        tier_config,
                        e,
                    )
                    raise

            # Create TieringOffloadingManager. GPU↔CPU transfers use the inherited
            # get_handlers(); secondary tier transfers are handled by the
            # secondary tier managers and need no additional handlers here.
            tiering_manager = TieringOffloadingManager(
                primary_tier=primary_tier,
                secondary_tiers=secondary_tiers,
                enable_events=enable_events,
            )
            if int(self.extra_config.get("store_threshold", 0)) >= 2:
                raise ValueError(
                    "store_threshold is not supported for TieringOffloadingSpec"
                )
            self._manager = tiering_manager

            logger.info(
                "Created TieringOffloadingManager with primary tier "
                "(%s, %s blocks) and %s secondary tier(s)",
                self.eviction_policy,
                self.num_blocks,
                len(secondary_tiers),
            )

        return self._manager

    @override
    def create_handlers(self, kv_caches: CanonicalKVCaches) -> CpuGpuOffloadingHandlers:
        world_size = self.vllm_config.parallel_config.world_size
        rank = torch.accelerator.current_device_index()
        worker_mmap = SharedOffloadRegion(
            instance_id=self.vllm_config.instance_id,
            total_size_bytes=self.cpu_page_size_per_worker
            * world_size
            * self.num_blocks,
            num_blocks=self.num_blocks,
            rank=rank,
            num_workers=world_size,
            cpu_page_size=self.cpu_page_size_per_worker,
        )
        return CpuGpuOffloadingHandlers(
            kv_caches=kv_caches,
            block_size_factor=self.block_size_factor,
            num_cpu_blocks=self.num_blocks,
            mmap_region=worker_mmap,
        )

get_manager

get_manager() -> OffloadingManager

Get the TieringOffloadingManager.

Creates a TieringOffloadingManager with: - Primary tier: CPU (LRU or ARC) - Secondary tiers: As configured in extra_config

Returns:

Type Description
OffloadingManager

TieringOffloadingManager instance

Source code in vllm/v1/kv_offload/tiering/spec.py
@override
def get_manager(self) -> OffloadingManager:
    """
    Get the TieringOffloadingManager.

    Creates a TieringOffloadingManager with:
    - Primary tier: CPU (LRU or ARC)
    - Secondary tiers: As configured in extra_config

    Returns:
        TieringOffloadingManager instance
    """
    if not self._manager:
        kv_events_config = self.vllm_config.kv_events_config
        enable_events = (
            kv_events_config is not None and kv_events_config.enable_kv_cache_events
        )

        # Create scheduler-side SharedOffloadRegion (rank=None) so the
        # primary tier can eagerly create a memoryview over _base.
        world_size = self.vllm_config.parallel_config.world_size
        scheduler_mmap = SharedOffloadRegion(
            instance_id=self.vllm_config.instance_id,
            total_size_bytes=self.cpu_page_size_per_worker
            * world_size
            * self.num_blocks,
            num_blocks=self.num_blocks,
            rank=None,
            num_workers=world_size,
            cpu_page_size=self.cpu_page_size_per_worker,
        )
        self._scheduler_mmap = scheduler_mmap

        # Create primary tier (CPU-based)
        assert len(self.gpu_block_size) == 1
        primary_tier = CPUPrimaryTierOffloadingManager(
            num_blocks=self.num_blocks,
            cache_policy=self.eviction_policy,  # type: ignore[arg-type]
            enable_events=enable_events,
            mmap_region=scheduler_mmap,
        )

        # Create secondary tiers
        primary_kv_view = primary_tier.get_kv_memoryview()
        secondary_tiers = []
        for i, tier_config in enumerate(self.secondary_tier_configs):
            try:
                tier = SecondaryTierFactory.create_secondary_tier(
                    tier_config, primary_kv_view, self
                )
                secondary_tiers.append(tier)
                logger.info(
                    "Created secondary tier #%d (%s)",
                    i,
                    tier.tier_type,
                )
            except Exception as e:
                logger.error(
                    "Failed to create secondary tier from config %s: %s",
                    tier_config,
                    e,
                )
                raise

        # Create TieringOffloadingManager. GPU↔CPU transfers use the inherited
        # get_handlers(); secondary tier transfers are handled by the
        # secondary tier managers and need no additional handlers here.
        tiering_manager = TieringOffloadingManager(
            primary_tier=primary_tier,
            secondary_tiers=secondary_tiers,
            enable_events=enable_events,
        )
        if int(self.extra_config.get("store_threshold", 0)) >= 2:
            raise ValueError(
                "store_threshold is not supported for TieringOffloadingSpec"
            )
        self._manager = tiering_manager

        logger.info(
            "Created TieringOffloadingManager with primary tier "
            "(%s, %s blocks) and %s secondary tier(s)",
            self.eviction_policy,
            self.num_blocks,
            len(secondary_tiers),
        )

    return self._manager