Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
cef8d51
Enable zigzag static EPLB strategy for expert placement in MoE layers.
cboss6 Aug 27, 2025
af21812
Format.
cboss6 Aug 28, 2025
08a5d78
Merge branch 'main' into cboss/zigzag-vllm-co
cboss6 Aug 28, 2025
4af6754
Refactor assert statement.
cboss6 Aug 28, 2025
418d4aa
Make zigzag as a default behavior once preconditions are met.
cboss6 Sep 2, 2025
d748884
Update vllm/model_executor/layers/fused_moe/layer.py
cboss6 Sep 2, 2025
e6ec357
Rename the name of zigzag and refactor the unit test.
cboss6 Sep 2, 2025
51fef4b
Refactor round-robin expert placement unit test.
cboss6 Sep 4, 2025
2f1847d
Remove unnecessary cases.
cboss6 Sep 4, 2025
488462b
Fix the precommit issue.
cboss6 Sep 4, 2025
aa1b319
Use round_robin_expert_placement as a optional choice other than usin…
cboss6 Sep 9, 2025
880a356
Merge branch 'main' into cboss/zigzag-vllm
cboss6 Sep 16, 2025
df9688d
Format.
cboss6 Sep 9, 2025
a3cf096
`enable_round_robin_expert_placement` -> `expert_placement_strategy`
hmellor Sep 12, 2025
e9a51a8
Merge branch 'main' into pr/cboss6/23745
hmellor Sep 12, 2025
fde9449
Merge branch 'main' into cboss/zigzag-vllm
cboss6 Sep 15, 2025
4a1e017
Fix precommit.
cboss6 Sep 15, 2025
96ec745
Add options to docstring
hmellor Sep 15, 2025
af8f588
Merge branch 'main' into pr/cboss6/23745
hmellor Sep 15, 2025
224d6aa
Add line saying why you might use it
hmellor Sep 15, 2025
73e13df
Merge branch 'main' into cboss/zigzag-vllm
cboss6 Sep 16, 2025
2e88bcc
Merge remote-tracking branch 'cboss-vllm/main' into cboss/zigzag-vllm…
cboss6 Sep 16, 2025
3529bbe
Fix the precommit issue.
cboss6 Sep 16, 2025
18d37d1
Update vllm/model_executor/layers/fused_moe/layer.py
hmellor Sep 16, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
194 changes: 194 additions & 0 deletions tests/distributed/test_expert_placement.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project

import pytest

from vllm.model_executor.layers.fused_moe.layer import determine_expert_map


def verify_round_robin_pattern(expert_map, ep_rank, ep_size,
global_num_experts):
"""Verify that the expert map follows the round_robin pattern."""
# Calculate expected local experts (supporting non-divisible cases)
base_experts = global_num_experts // ep_size
remainder = global_num_experts % ep_size

if ep_rank < remainder:
local_num_experts = base_experts + 1
else:
local_num_experts = base_experts

# Expected expert IDs for this rank in round_robin pattern
# For non-divisible cases, ranks with extra experts start earlier
expected_expert_ids = []
for expert_idx in range(local_num_experts):
global_expert_id = ep_rank + expert_idx * ep_size
expected_expert_ids.append(global_expert_id)

# Check that only expected experts are mapped to this rank
for global_expert_id in range(global_num_experts):
if global_expert_id in expected_expert_ids:
local_expert_id = expert_map[global_expert_id]
expected_local_id = expected_expert_ids.index(global_expert_id)
assert (
local_expert_id == expected_local_id
), f"Global expert {global_expert_id} should map to local expert " \
f"{expected_local_id}, got {local_expert_id}"
else:
assert (
expert_map[global_expert_id] == -1
), f"Global expert {global_expert_id} should not be mapped to " \
f"this rank"

# Verify that all local expert IDs are consecutive starting from 0
local_expert_ids = [
expert_map[global_id] for global_id in expected_expert_ids
]
expected_local_ids = list(range(local_num_experts))
assert (
local_expert_ids == expected_local_ids
), f"Expected local expert IDs {expected_local_ids}, got {local_expert_ids}"


@pytest.mark.parametrize("expert_placement_strategy", ["round_robin"])
@pytest.mark.parametrize("world_size", [2, 4])
def test_expert_placement_various_sizes(expert_placement_strategy, world_size):
"""Test round_robin expert placement with various expert counts."""

# Test with different global_num_experts values
# Include both divisible and non-divisible cases
if world_size == 2:
test_cases = [
(4, 2), # 4 experts (divisible)
(8, 2), # 8 experts (divisible)
(9, 2), # 9 experts (non-divisible)
(16, 2), # 16 experts (divisible)
(17, 2), # 17 experts (non-divisible)
]
elif world_size == 4:
test_cases = [
(8, 4), # 8 experts (divisible)
(16, 4), # 16 experts (divisible)
(18, 4), # 18 experts (non-divisible)
(32, 4), # 32 experts (divisible)
(33, 4), # 33 experts (non-divisible)
]
else:
test_cases = []

for test_global_experts, test_ep_size in test_cases:
# Ensure ep_size matches world_size
assert (test_ep_size == world_size
), f"ep_size {test_ep_size} must equal world_size {world_size}"

# Test each rank
for ep_rank in range(world_size):
# Calculate expected local experts
base_experts = test_global_experts // test_ep_size
remainder = test_global_experts % test_ep_size
if ep_rank < remainder:
expected_test_local = base_experts + 1
else:
expected_test_local = base_experts

test_local_experts, test_expert_map = determine_expert_map(
ep_size=test_ep_size,
ep_rank=ep_rank,
global_num_experts=test_global_experts,
expert_placement_strategy=expert_placement_strategy,
)

assert (
test_local_experts == expected_test_local
), f"For {test_global_experts} experts on {test_ep_size} ranks, " \
f"rank {ep_rank}: expected {expected_test_local} local" \
f"experts, got {test_local_experts}"

if test_expert_map is not None:
assert test_expert_map.shape == (
test_global_experts,
), f"Expected expert map shape ({test_global_experts},), " \
f"got {test_expert_map.shape}"

# Verify round_robin pattern for this test case
verify_round_robin_pattern(test_expert_map, ep_rank,
test_ep_size, test_global_experts)


@pytest.mark.parametrize("expert_placement_strategy", ["round_robin"])
@pytest.mark.parametrize("world_size", [2, 4])
def test_expert_placement_edge_cases(expert_placement_strategy, world_size):
"""Test edge cases for round_robin expert placement."""

# Test case 1: ep_size = 1 (should return None for expert_map)
local_num_experts, expert_map = determine_expert_map(
ep_size=1,
ep_rank=0,
global_num_experts=8,
expert_placement_strategy=expert_placement_strategy,
)
assert local_num_experts == 8, "For ep_size=1, should get all experts"
assert expert_map is None, "For ep_size=1, expert_map should be None"

# Test case 2: ep_size = 0 (should raise assertion)
with pytest.raises(AssertionError):
determine_expert_map(
ep_size=0,
ep_rank=0,
global_num_experts=8,
expert_placement_strategy=expert_placement_strategy,
)


def test_determine_expert_map_comprehensive():
"""Test of determine_expert_map function with various configurations."""

# Test cases: (ep_size, ep_rank, global_num_experts,
# expert_placement_strategy, expected_local, expected_map_pattern)
test_cases = [
# Round robin placement tests
(2, 0, 8, "round_robin", 4, [0, -1, 1, -1, 2, -1, 3,
-1]), # rank 0 gets even experts
(2, 1, 8, "round_robin", 4, [-1, 0, -1, 1, -1, 2, -1,
3]), # rank 1 gets odd experts
(2, 0, 9, "round_robin", 5, [0, -1, 1, -1, 2, -1, 3, -1, 4
]), # rank 0 gets 5 experts (even + last)
(2, 1, 9, "round_robin", 4, [-1, 0, -1, 1, -1, 2, -1, 3,
-1]), # rank 1 gets 4 experts (odd)

# 4-rank tests
(4, 0, 8, "round_robin", 2, [0, -1, -1, -1, 1, -1, -1,
-1]), # rank 0 gets experts 0, 4
(4, 1, 8, "round_robin", 2, [-1, 0, -1, -1, -1, 1, -1,
-1]), # rank 1 gets experts 1, 5
(4, 2, 8, "round_robin", 2, [-1, -1, 0, -1, -1, -1, 1,
-1]), # rank 2 gets experts 2, 6
(4, 3, 8, "round_robin", 2, [-1, -1, -1, 0, -1, -1, -1,
1]), # rank 3 gets experts 3, 7
]

for ep_size, ep_rank, global_num_experts, expert_placement_strategy, \
expected_local, expected_map_pattern in test_cases:
local_num_experts, expert_map = determine_expert_map(
ep_size=ep_size,
ep_rank=ep_rank,
global_num_experts=global_num_experts,
expert_placement_strategy=expert_placement_strategy,
)

assert local_num_experts == expected_local, \
f"ep_size={ep_size}, ep_rank={ep_rank}, " \
f"global_num_experts={global_num_experts}, " \
f"expert_placement_strategy={expert_placement_strategy}: " \
f"expected {expected_local} local experts, got {local_num_experts}"

if expected_map_pattern is None:
assert expert_map is None, "Expected expert_map to be None"
else:
assert expert_map is not None, "Expected expert_map to not be None"
actual_map = expert_map.tolist()
assert actual_map == expected_map_pattern, \
f"ep_size={ep_size}, ep_rank={ep_rank}, " \
f"global_num_experts={global_num_experts}, " \
f"expert_placement_strategy={expert_placement_strategy}: " \
f"expected map {expected_map_pattern}, got {actual_map}"
10 changes: 10 additions & 0 deletions vllm/config/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

logger = init_logger(__name__)

ExpertPlacementStrategy = Literal["linear", "round_robin"]
DistributedExecutorBackend = Literal["ray", "mp", "uni", "external_launcher"]


Expand Down Expand Up @@ -102,6 +103,15 @@ class ParallelConfig:
"""Enable expert parallelism load balancing for MoE layers."""
eplb_config: EPLBConfig = field(default_factory=EPLBConfig)
"""Expert parallelism configuration."""
expert_placement_strategy: ExpertPlacementStrategy = "linear"
"""The expert placement strategy for MoE layers:\n
- "linear": Experts are placed in a contiguous manner. For example, with 4
experts and 2 ranks, rank 0 will have experts [0, 1] and rank 1 will have
experts [2, 3].\n
- "round_robin": Experts are placed in a round-robin manner. For example,
with 4 experts and 2 ranks, rank 0 will have experts [0, 2] and rank 1
will have experts [1, 3]. This strategy can help improve load balancing
for grouped expert models with no redundant experts."""
num_redundant_experts: Optional[int] = None
"""`num_redundant_experts` is deprecated and has been replaced with
`eplb_config.num_redundant_experts`. This will be removed in v0.12.0.
Expand Down
7 changes: 7 additions & 0 deletions vllm/engine/arg_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
SpeculativeConfig, TaskOption, TokenizerMode,
VllmConfig, get_attr_docs)
from vllm.config.multimodal import MMCacheType, MultiModalConfig
from vllm.config.parallel import ExpertPlacementStrategy
from vllm.config.utils import get_field
from vllm.logger import init_logger
from vllm.platforms import CpuArchEnum, current_platform
Expand Down Expand Up @@ -328,6 +329,8 @@ class EngineArgs:
enable_expert_parallel: bool = ParallelConfig.enable_expert_parallel
eplb_config: EPLBConfig = get_field(ParallelConfig, "eplb_config")
enable_eplb: bool = ParallelConfig.enable_eplb
expert_placement_strategy: ExpertPlacementStrategy = \
ParallelConfig.expert_placement_strategy
num_redundant_experts: int = EPLBConfig.num_redundant_experts
eplb_window_size: int = EPLBConfig.window_size
eplb_step_interval: int = EPLBConfig.step_interval
Expand Down Expand Up @@ -696,6 +699,9 @@ def add_cli_args(parser: FlexibleArgumentParser) -> FlexibleArgumentParser:
**parallel_kwargs["enable_eplb"])
parallel_group.add_argument("--eplb-config",
**parallel_kwargs["eplb_config"])
parallel_group.add_argument(
"--expert-placement-strategy",
**parallel_kwargs["expert_placement_strategy"])
parallel_group.add_argument(
"--num-redundant-experts",
type=int,
Expand Down Expand Up @@ -1335,6 +1341,7 @@ def create_engine_config(
enable_expert_parallel=self.enable_expert_parallel,
enable_eplb=self.enable_eplb,
eplb_config=self.eplb_config,
expert_placement_strategy=self.expert_placement_strategy,
max_parallel_loading_workers=self.max_parallel_loading_workers,
disable_custom_all_reduce=self.disable_custom_all_reduce,
ray_workers_use_nsight=self.ray_workers_use_nsight,
Expand Down
66 changes: 54 additions & 12 deletions vllm/model_executor/layers/fused_moe/layer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@
from abc import abstractmethod
from collections.abc import Iterable
from enum import Enum
from typing import Callable, Literal, Optional, Union, overload
from typing import Callable, Literal, Optional, Union, get_args, overload

import torch
import torch.nn.functional as F
from torch.nn.parameter import UninitializedParameter

import vllm.envs as envs
from vllm.config import get_current_vllm_config
from vllm.config.parallel import ExpertPlacementStrategy
from vllm.distributed import (get_dp_group, get_ep_group,
get_tensor_model_parallel_world_size,
tensor_model_parallel_all_reduce)
Expand Down Expand Up @@ -675,17 +676,23 @@ def forward_tpu(


def determine_expert_map(
ep_size: int, ep_rank: int,
global_num_experts: int) -> tuple[int, Optional[torch.Tensor]]:
ep_size: int,
ep_rank: int,
global_num_experts: int,
expert_placement_strategy: ExpertPlacementStrategy = "linear",
) -> tuple[int, Optional[torch.Tensor]]:
"""
Calculates how many experts should be assigned to each rank for EP and
creates a mapping from global to local expert index. Experts are
distributed evenly across ranks. Any remaining are assigned to the
last rank.

Args:
ep_size (int): The size of the expert parallel group
global_num_experts (int): The total number of experts in the model.
ep_size: The size of the expert parallel group
ep_rank: The rank of the current process in the expert parallel
group
global_num_experts: The total number of experts in the model.
expert_placement_strategy: The expert placement strategy.

Returns:
tuple[int, Optional[torch.Tensor]]: A tuple containing:
Expand All @@ -711,9 +718,23 @@ def determine_expert_map(
# Create a tensor of size num_experts filled with -1
expert_map = torch.full((global_num_experts, ), -1, dtype=torch.int32)
# Create an expert map for the local experts
start_idx = ep_rank * base_experts + min(ep_rank, remainder)
expert_map[start_idx:start_idx + local_num_experts] = torch.arange(
0, local_num_experts, dtype=torch.int32)
if expert_placement_strategy == "linear":
start_idx = ep_rank * base_experts + min(ep_rank, remainder)
expert_map[start_idx:start_idx + local_num_experts] = torch.arange(
0, local_num_experts, dtype=torch.int32)
elif expert_placement_strategy == "round_robin":
local_log_experts = torch.arange(ep_rank,
global_num_experts,
ep_size,
dtype=torch.int32)

expert_map[local_log_experts] = torch.arange(0,
local_num_experts,
dtype=torch.int32)
else:
raise ValueError("Unsupported expert placement strategy "
f"'{expert_placement_strategy}', expected one of "
f"{get_args(ExpertPlacementStrategy)}")
return (local_num_experts, expert_map)


Expand Down Expand Up @@ -846,15 +867,36 @@ def __init__(
else:
assert num_redundant_experts == 0, \
"Redundant experts are only supported with EPLB."

expert_placement_strategy = (
vllm_config.parallel_config.expert_placement_strategy)
if expert_placement_strategy == "round_robin":
# TODO(Bruce): will support round robin expert placement with
# EPLB enabled in the future.
round_robin_supported = ((num_expert_group is not None
and num_expert_group > 1)
and num_redundant_experts == 0
and not self.enable_eplb)

if not round_robin_supported:
logger.warning(
"Round-robin expert placement is only supported for "
"models with multiple expert groups and no redundant "
"experts. Falling back to linear expert placement.")
expert_placement_strategy = "linear"

self.local_num_experts, self.expert_map = determine_expert_map(
ep_size=self.ep_size,
ep_rank=self.ep_rank,
global_num_experts=self.global_num_experts)
global_num_experts=self.global_num_experts,
expert_placement_strategy=expert_placement_strategy,
)
logger.info_once(
"[EP Rank %s/%s] Expert parallelism is enabled. Local/global"
"[EP Rank %s/%s] Expert parallelism is enabled. Expert "
"placement strategy: %s. Local/global"
" number of experts: %s/%s. Experts local to global index map:"
" %s.", self.ep_rank, self.ep_size, self.local_num_experts,
self.global_num_experts,
" %s.", self.ep_rank, self.ep_size, expert_placement_strategy,
self.local_num_experts, self.global_num_experts,
get_compressed_expert_map(self.expert_map))
else:
self.local_num_experts, self.expert_map = (self.global_num_experts,
Expand Down