diff --git a/tests/distributed/test_expert_placement.py b/tests/distributed/test_expert_placement.py new file mode 100644 index 000000000000..a3b1b3193deb --- /dev/null +++ b/tests/distributed/test_expert_placement.py @@ -0,0 +1,194 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project + +import pytest + +from vllm.model_executor.layers.fused_moe.layer import determine_expert_map + + +def verify_round_robin_pattern(expert_map, ep_rank, ep_size, + global_num_experts): + """Verify that the expert map follows the round_robin pattern.""" + # Calculate expected local experts (supporting non-divisible cases) + base_experts = global_num_experts // ep_size + remainder = global_num_experts % ep_size + + if ep_rank < remainder: + local_num_experts = base_experts + 1 + else: + local_num_experts = base_experts + + # Expected expert IDs for this rank in round_robin pattern + # For non-divisible cases, ranks with extra experts start earlier + expected_expert_ids = [] + for expert_idx in range(local_num_experts): + global_expert_id = ep_rank + expert_idx * ep_size + expected_expert_ids.append(global_expert_id) + + # Check that only expected experts are mapped to this rank + for global_expert_id in range(global_num_experts): + if global_expert_id in expected_expert_ids: + local_expert_id = expert_map[global_expert_id] + expected_local_id = expected_expert_ids.index(global_expert_id) + assert ( + local_expert_id == expected_local_id + ), f"Global expert {global_expert_id} should map to local expert " \ + f"{expected_local_id}, got {local_expert_id}" + else: + assert ( + expert_map[global_expert_id] == -1 + ), f"Global expert {global_expert_id} should not be mapped to " \ + f"this rank" + + # Verify that all local expert IDs are consecutive starting from 0 + local_expert_ids = [ + expert_map[global_id] for global_id in expected_expert_ids + ] + expected_local_ids = list(range(local_num_experts)) + assert ( + local_expert_ids == expected_local_ids + ), f"Expected local expert IDs {expected_local_ids}, got {local_expert_ids}" + + +@pytest.mark.parametrize("expert_placement_strategy", ["round_robin"]) +@pytest.mark.parametrize("world_size", [2, 4]) +def test_expert_placement_various_sizes(expert_placement_strategy, world_size): + """Test round_robin expert placement with various expert counts.""" + + # Test with different global_num_experts values + # Include both divisible and non-divisible cases + if world_size == 2: + test_cases = [ + (4, 2), # 4 experts (divisible) + (8, 2), # 8 experts (divisible) + (9, 2), # 9 experts (non-divisible) + (16, 2), # 16 experts (divisible) + (17, 2), # 17 experts (non-divisible) + ] + elif world_size == 4: + test_cases = [ + (8, 4), # 8 experts (divisible) + (16, 4), # 16 experts (divisible) + (18, 4), # 18 experts (non-divisible) + (32, 4), # 32 experts (divisible) + (33, 4), # 33 experts (non-divisible) + ] + else: + test_cases = [] + + for test_global_experts, test_ep_size in test_cases: + # Ensure ep_size matches world_size + assert (test_ep_size == world_size + ), f"ep_size {test_ep_size} must equal world_size {world_size}" + + # Test each rank + for ep_rank in range(world_size): + # Calculate expected local experts + base_experts = test_global_experts // test_ep_size + remainder = test_global_experts % test_ep_size + if ep_rank < remainder: + expected_test_local = base_experts + 1 + else: + expected_test_local = base_experts + + test_local_experts, test_expert_map = determine_expert_map( + ep_size=test_ep_size, + ep_rank=ep_rank, + global_num_experts=test_global_experts, + expert_placement_strategy=expert_placement_strategy, + ) + + assert ( + test_local_experts == expected_test_local + ), f"For {test_global_experts} experts on {test_ep_size} ranks, " \ + f"rank {ep_rank}: expected {expected_test_local} local" \ + f"experts, got {test_local_experts}" + + if test_expert_map is not None: + assert test_expert_map.shape == ( + test_global_experts, + ), f"Expected expert map shape ({test_global_experts},), " \ + f"got {test_expert_map.shape}" + + # Verify round_robin pattern for this test case + verify_round_robin_pattern(test_expert_map, ep_rank, + test_ep_size, test_global_experts) + + +@pytest.mark.parametrize("expert_placement_strategy", ["round_robin"]) +@pytest.mark.parametrize("world_size", [2, 4]) +def test_expert_placement_edge_cases(expert_placement_strategy, world_size): + """Test edge cases for round_robin expert placement.""" + + # Test case 1: ep_size = 1 (should return None for expert_map) + local_num_experts, expert_map = determine_expert_map( + ep_size=1, + ep_rank=0, + global_num_experts=8, + expert_placement_strategy=expert_placement_strategy, + ) + assert local_num_experts == 8, "For ep_size=1, should get all experts" + assert expert_map is None, "For ep_size=1, expert_map should be None" + + # Test case 2: ep_size = 0 (should raise assertion) + with pytest.raises(AssertionError): + determine_expert_map( + ep_size=0, + ep_rank=0, + global_num_experts=8, + expert_placement_strategy=expert_placement_strategy, + ) + + +def test_determine_expert_map_comprehensive(): + """Test of determine_expert_map function with various configurations.""" + + # Test cases: (ep_size, ep_rank, global_num_experts, + # expert_placement_strategy, expected_local, expected_map_pattern) + test_cases = [ + # Round robin placement tests + (2, 0, 8, "round_robin", 4, [0, -1, 1, -1, 2, -1, 3, + -1]), # rank 0 gets even experts + (2, 1, 8, "round_robin", 4, [-1, 0, -1, 1, -1, 2, -1, + 3]), # rank 1 gets odd experts + (2, 0, 9, "round_robin", 5, [0, -1, 1, -1, 2, -1, 3, -1, 4 + ]), # rank 0 gets 5 experts (even + last) + (2, 1, 9, "round_robin", 4, [-1, 0, -1, 1, -1, 2, -1, 3, + -1]), # rank 1 gets 4 experts (odd) + + # 4-rank tests + (4, 0, 8, "round_robin", 2, [0, -1, -1, -1, 1, -1, -1, + -1]), # rank 0 gets experts 0, 4 + (4, 1, 8, "round_robin", 2, [-1, 0, -1, -1, -1, 1, -1, + -1]), # rank 1 gets experts 1, 5 + (4, 2, 8, "round_robin", 2, [-1, -1, 0, -1, -1, -1, 1, + -1]), # rank 2 gets experts 2, 6 + (4, 3, 8, "round_robin", 2, [-1, -1, -1, 0, -1, -1, -1, + 1]), # rank 3 gets experts 3, 7 + ] + + for ep_size, ep_rank, global_num_experts, expert_placement_strategy, \ + expected_local, expected_map_pattern in test_cases: + local_num_experts, expert_map = determine_expert_map( + ep_size=ep_size, + ep_rank=ep_rank, + global_num_experts=global_num_experts, + expert_placement_strategy=expert_placement_strategy, + ) + + assert local_num_experts == expected_local, \ + f"ep_size={ep_size}, ep_rank={ep_rank}, " \ + f"global_num_experts={global_num_experts}, " \ + f"expert_placement_strategy={expert_placement_strategy}: " \ + f"expected {expected_local} local experts, got {local_num_experts}" + + if expected_map_pattern is None: + assert expert_map is None, "Expected expert_map to be None" + else: + assert expert_map is not None, "Expected expert_map to not be None" + actual_map = expert_map.tolist() + assert actual_map == expected_map_pattern, \ + f"ep_size={ep_size}, ep_rank={ep_rank}, " \ + f"global_num_experts={global_num_experts}, " \ + f"expert_placement_strategy={expert_placement_strategy}: " \ + f"expected map {expected_map_pattern}, got {actual_map}" diff --git a/vllm/config/parallel.py b/vllm/config/parallel.py index 2f8ad5c6b6b0..231406bf6052 100644 --- a/vllm/config/parallel.py +++ b/vllm/config/parallel.py @@ -29,6 +29,7 @@ logger = init_logger(__name__) +ExpertPlacementStrategy = Literal["linear", "round_robin"] DistributedExecutorBackend = Literal["ray", "mp", "uni", "external_launcher"] @@ -102,6 +103,15 @@ class ParallelConfig: """Enable expert parallelism load balancing for MoE layers.""" eplb_config: EPLBConfig = field(default_factory=EPLBConfig) """Expert parallelism configuration.""" + expert_placement_strategy: ExpertPlacementStrategy = "linear" + """The expert placement strategy for MoE layers:\n + - "linear": Experts are placed in a contiguous manner. For example, with 4 + experts and 2 ranks, rank 0 will have experts [0, 1] and rank 1 will have + experts [2, 3].\n + - "round_robin": Experts are placed in a round-robin manner. For example, + with 4 experts and 2 ranks, rank 0 will have experts [0, 2] and rank 1 + will have experts [1, 3]. This strategy can help improve load balancing + for grouped expert models with no redundant experts.""" num_redundant_experts: Optional[int] = None """`num_redundant_experts` is deprecated and has been replaced with `eplb_config.num_redundant_experts`. This will be removed in v0.12.0. diff --git a/vllm/engine/arg_utils.py b/vllm/engine/arg_utils.py index 595d318fbaaf..20d998d613d4 100644 --- a/vllm/engine/arg_utils.py +++ b/vllm/engine/arg_utils.py @@ -34,6 +34,7 @@ SpeculativeConfig, TaskOption, TokenizerMode, VllmConfig, get_attr_docs) from vllm.config.multimodal import MMCacheType, MultiModalConfig +from vllm.config.parallel import ExpertPlacementStrategy from vllm.config.utils import get_field from vllm.logger import init_logger from vllm.platforms import CpuArchEnum, current_platform @@ -328,6 +329,8 @@ class EngineArgs: enable_expert_parallel: bool = ParallelConfig.enable_expert_parallel eplb_config: EPLBConfig = get_field(ParallelConfig, "eplb_config") enable_eplb: bool = ParallelConfig.enable_eplb + expert_placement_strategy: ExpertPlacementStrategy = \ + ParallelConfig.expert_placement_strategy num_redundant_experts: int = EPLBConfig.num_redundant_experts eplb_window_size: int = EPLBConfig.window_size eplb_step_interval: int = EPLBConfig.step_interval @@ -696,6 +699,9 @@ def add_cli_args(parser: FlexibleArgumentParser) -> FlexibleArgumentParser: **parallel_kwargs["enable_eplb"]) parallel_group.add_argument("--eplb-config", **parallel_kwargs["eplb_config"]) + parallel_group.add_argument( + "--expert-placement-strategy", + **parallel_kwargs["expert_placement_strategy"]) parallel_group.add_argument( "--num-redundant-experts", type=int, @@ -1335,6 +1341,7 @@ def create_engine_config( enable_expert_parallel=self.enable_expert_parallel, enable_eplb=self.enable_eplb, eplb_config=self.eplb_config, + expert_placement_strategy=self.expert_placement_strategy, max_parallel_loading_workers=self.max_parallel_loading_workers, disable_custom_all_reduce=self.disable_custom_all_reduce, ray_workers_use_nsight=self.ray_workers_use_nsight, diff --git a/vllm/model_executor/layers/fused_moe/layer.py b/vllm/model_executor/layers/fused_moe/layer.py index a90a71159f72..c62897c91816 100644 --- a/vllm/model_executor/layers/fused_moe/layer.py +++ b/vllm/model_executor/layers/fused_moe/layer.py @@ -4,7 +4,7 @@ from abc import abstractmethod from collections.abc import Iterable from enum import Enum -from typing import Callable, Literal, Optional, Union, overload +from typing import Callable, Literal, Optional, Union, get_args, overload import torch import torch.nn.functional as F @@ -12,6 +12,7 @@ import vllm.envs as envs from vllm.config import get_current_vllm_config +from vllm.config.parallel import ExpertPlacementStrategy from vllm.distributed import (get_dp_group, get_ep_group, get_tensor_model_parallel_world_size, tensor_model_parallel_all_reduce) @@ -675,8 +676,11 @@ def forward_tpu( def determine_expert_map( - ep_size: int, ep_rank: int, - global_num_experts: int) -> tuple[int, Optional[torch.Tensor]]: + ep_size: int, + ep_rank: int, + global_num_experts: int, + expert_placement_strategy: ExpertPlacementStrategy = "linear", +) -> tuple[int, Optional[torch.Tensor]]: """ Calculates how many experts should be assigned to each rank for EP and creates a mapping from global to local expert index. Experts are @@ -684,8 +688,11 @@ def determine_expert_map( last rank. Args: - ep_size (int): The size of the expert parallel group - global_num_experts (int): The total number of experts in the model. + ep_size: The size of the expert parallel group + ep_rank: The rank of the current process in the expert parallel + group + global_num_experts: The total number of experts in the model. + expert_placement_strategy: The expert placement strategy. Returns: tuple[int, Optional[torch.Tensor]]: A tuple containing: @@ -711,9 +718,23 @@ def determine_expert_map( # Create a tensor of size num_experts filled with -1 expert_map = torch.full((global_num_experts, ), -1, dtype=torch.int32) # Create an expert map for the local experts - start_idx = ep_rank * base_experts + min(ep_rank, remainder) - expert_map[start_idx:start_idx + local_num_experts] = torch.arange( - 0, local_num_experts, dtype=torch.int32) + if expert_placement_strategy == "linear": + start_idx = ep_rank * base_experts + min(ep_rank, remainder) + expert_map[start_idx:start_idx + local_num_experts] = torch.arange( + 0, local_num_experts, dtype=torch.int32) + elif expert_placement_strategy == "round_robin": + local_log_experts = torch.arange(ep_rank, + global_num_experts, + ep_size, + dtype=torch.int32) + + expert_map[local_log_experts] = torch.arange(0, + local_num_experts, + dtype=torch.int32) + else: + raise ValueError("Unsupported expert placement strategy " + f"'{expert_placement_strategy}', expected one of " + f"{get_args(ExpertPlacementStrategy)}") return (local_num_experts, expert_map) @@ -846,15 +867,36 @@ def __init__( else: assert num_redundant_experts == 0, \ "Redundant experts are only supported with EPLB." + + expert_placement_strategy = ( + vllm_config.parallel_config.expert_placement_strategy) + if expert_placement_strategy == "round_robin": + # TODO(Bruce): will support round robin expert placement with + # EPLB enabled in the future. + round_robin_supported = ((num_expert_group is not None + and num_expert_group > 1) + and num_redundant_experts == 0 + and not self.enable_eplb) + + if not round_robin_supported: + logger.warning( + "Round-robin expert placement is only supported for " + "models with multiple expert groups and no redundant " + "experts. Falling back to linear expert placement.") + expert_placement_strategy = "linear" + self.local_num_experts, self.expert_map = determine_expert_map( ep_size=self.ep_size, ep_rank=self.ep_rank, - global_num_experts=self.global_num_experts) + global_num_experts=self.global_num_experts, + expert_placement_strategy=expert_placement_strategy, + ) logger.info_once( - "[EP Rank %s/%s] Expert parallelism is enabled. Local/global" + "[EP Rank %s/%s] Expert parallelism is enabled. Expert " + "placement strategy: %s. Local/global" " number of experts: %s/%s. Experts local to global index map:" - " %s.", self.ep_rank, self.ep_size, self.local_num_experts, - self.global_num_experts, + " %s.", self.ep_rank, self.ep_size, expert_placement_strategy, + self.local_num_experts, self.global_num_experts, get_compressed_expert_map(self.expert_map)) else: self.local_num_experts, self.expert_map = (self.global_num_experts,