-
-
Notifications
You must be signed in to change notification settings - Fork 10.7k
[ROCm][FEAT] Fuse DeepSeek shared experts into AITER fused_moe ops #24097
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ROCm][FEAT] Fuse DeepSeek shared experts into AITER fused_moe ops #24097
Conversation
Deepseek 085 sharedexperts aiter jun new Signed-off-by: chenjun <[email protected]> Signed-off-by: kliuae <[email protected]>
Signed-off-by: kliuae <[email protected]>
Signed-off-by: kliuae <[email protected]>
Signed-off-by: kliuae <[email protected]>
Signed-off-by: kliuae <[email protected]>
Signed-off-by: kliuae <[email protected]>
Signed-off-by: kliuae <[email protected]>
Signed-off-by: kliuae <[email protected]>
Signed-off-by: kliuae <[email protected]>
This pull request has merge conflicts that must be resolved before it can be |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces an optimization for DeepSeek models on ROCm by fusing shared experts into the AITER FusedMoE kernel. This is controlled by a new environment flag. The changes span across environment variable setup, the core FusedMoE layer, quantization layers, and the DeepSeek model implementation to correctly handle the fused logic and weight loading.
The implementation looks solid and the changes are consistent with the goal of the PR. I've found one area for improvement in the initialization logic for the shared expert metadata, which could be made more memory and performance efficient. My detailed feedback is in the comment below.
if is_EP: | ||
s_topk_ids_list = [[fake_expertid] * | ||
(n_shared_experts + is_EP)] * max_num_tokens | ||
for i in range(tp_rank, max_num_tokens, tp_size): | ||
s_topk_ids_list[i] = shared_expert_ids | ||
else: | ||
s_topk_ids_list = [range(n_routed_experts, fake_expertid) | ||
] * max_num_tokens | ||
s_topk_ids[:] = torch.tensor(s_topk_ids_list, | ||
dtype=torch.int32, | ||
device='cuda') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current implementation for initializing s_topk_ids
can be inefficient. It constructs a large Python list of lists (s_topk_ids_list
) on the host, which is then converted to a PyTorch tensor on the CPU before being moved to the GPU. For a large max_num_tokens
, this can lead to significant host memory consumption and slow down the initialization process.
A more efficient approach would be to perform these operations directly on the GPU tensor, avoiding the large intermediate host-side data structures. This can be achieved using tensor broadcasting and slicing.
if is_EP: | |
s_topk_ids_list = [[fake_expertid] * | |
(n_shared_experts + is_EP)] * max_num_tokens | |
for i in range(tp_rank, max_num_tokens, tp_size): | |
s_topk_ids_list[i] = shared_expert_ids | |
else: | |
s_topk_ids_list = [range(n_routed_experts, fake_expertid) | |
] * max_num_tokens | |
s_topk_ids[:] = torch.tensor(s_topk_ids_list, | |
dtype=torch.int32, | |
device='cuda') | |
if is_EP: | |
s_topk_ids.fill_(fake_expertid) | |
shared_expert_ids_tensor = torch.tensor(shared_expert_ids, | |
dtype=torch.int32, | |
device='cuda') | |
s_topk_ids[tp_rank::tp_size] = shared_expert_ids_tensor | |
else: | |
s_topk_ids_row = torch.arange(n_routed_experts, | |
fake_expertid, | |
dtype=torch.int32, | |
device='cuda') | |
s_topk_ids.copy_(s_topk_ids_row.expand(max_num_tokens, -1)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a significant performance optimization for DeepSeek models on ROCm by fusing shared experts into the AITER MoE kernel. The implementation is gated behind environment variables and includes comprehensive benchmark and accuracy tests, which is great. However, I've identified two critical issues that need to be addressed. The first is related to the use of a global variable for model-specific metadata, which can lead to race conditions and incorrect behavior when serving multiple models. The second is a bug in the weight loading logic for the fused shared experts, which fails to correctly track loaded parameters and will likely cause errors. Addressing these issues will ensure the stability and correctness of this new feature.
aiter_topK_meta_data = None | ||
|
||
|
||
@lru_cache(maxsize=1) | ||
def init_aiter_topK_meta_data(n_routed_experts: int, | ||
n_shared_experts: int, | ||
top_k: int, | ||
tp_rank: int, | ||
tp_size: int, | ||
shared_experts_score: float = 1.0, | ||
max_num_tokens: int = 32768, | ||
is_EP: bool = False): | ||
global aiter_topK_meta_data | ||
fake_expertid = n_routed_experts + n_shared_experts | ||
|
||
# all layers reuse same buffer | ||
total_topk_ids = torch.empty( | ||
(max_num_tokens, top_k + n_shared_experts + is_EP), | ||
dtype=torch.int32, | ||
device='cuda') | ||
ns_topk_ids, s_topk_ids = total_topk_ids.split( | ||
[top_k, n_shared_experts + is_EP], dim=1) | ||
shared_expert_ids = [ | ||
n_routed_experts + i for i in range(n_shared_experts + is_EP) | ||
] | ||
if is_EP: | ||
s_topk_ids_list = [[fake_expertid] * | ||
(n_shared_experts + is_EP)] * max_num_tokens | ||
for i in range(tp_rank, max_num_tokens, tp_size): | ||
s_topk_ids_list[i] = shared_expert_ids | ||
else: | ||
s_topk_ids_list = [range(n_routed_experts, fake_expertid) | ||
] * max_num_tokens | ||
s_topk_ids[:] = torch.tensor(s_topk_ids_list, | ||
dtype=torch.int32, | ||
device='cuda') | ||
|
||
total_topk_weights = torch.empty( | ||
(max_num_tokens, top_k + n_shared_experts + is_EP), | ||
dtype=torch.float32, | ||
device='cuda') | ||
ns_topk_weights, s_topk_weights = total_topk_weights.split( | ||
[top_k, n_shared_experts + is_EP], dim=1) | ||
s_topk_weights.fill_(shared_experts_score) | ||
aiter_topK_meta_data = (total_topk_weights, total_topk_ids) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The use of a global variable aiter_topK_meta_data
to store model-specific metadata is problematic. If vLLM serves multiple models with different MoE configurations in the same process, this global variable will be overwritten, leading to incorrect behavior for one of the models. This can cause race conditions and hard-to-debug errors.
The metadata should be managed without using a global variable. A better approach would be:
- Modify
init_aiter_topK_meta_data
to return the metadata tuple instead of modifying a global variable. The@lru_cache
decorator should then be used on a function that is pure (has no side effects). - In
FusedMoE.__init__
, store the returned metadata in an instance attribute, e.g.,self.aiter_topK_meta_data
. - Pass this instance attribute down through the call chain (
forward_cuda
->select_experts
->rocm_aiter_grouped_topk
). rocm_aiter_grouped_topk
should then use the passed metadata instead of the global variable.
This change will ensure that each model's metadata is properly encapsulated and avoids race conditions.
else: | ||
is_expert_weight = False | ||
for mapping in expert_params_mapping: | ||
param_name, weight_name, expert_id, shard_id = mapping | ||
if weight_name not in name: | ||
continue | ||
|
||
# Anyway, this is an expert weight and should not be | ||
# attempted to load as other weights later | ||
is_expert_weight = True | ||
|
||
# Do not modify `name` since the loop may continue here | ||
# Instead, create a new variable | ||
name_mapped = name.replace(weight_name, param_name) | ||
|
||
if is_pp_missing_parameter(name_mapped, self): | ||
continue | ||
|
||
param = params_dict[name_mapped] | ||
# We should ask the weight loader to return success or not | ||
# here since otherwise we may skip experts with other | ||
# available replicas. | ||
weight_loader = typing.cast(Callable[..., bool], | ||
param.weight_loader) | ||
success = weight_loader(param, | ||
loaded_weight, | ||
name_mapped, | ||
shard_id=shard_id, | ||
expert_id=expert_id, | ||
return_success=True) | ||
if success: | ||
name = name_mapped | ||
break | ||
else: | ||
if is_expert_weight: | ||
# We've checked that this is an expert weight | ||
# However it's not mapped locally to this rank | ||
# So we simply skip it | ||
continue | ||
|
||
# Skip loading extra bias for GPTQ models. | ||
if name.endswith(".bias") and name not in params_dict: | ||
continue | ||
|
||
# Remapping the name of FP8 kv-scale. | ||
name = maybe_remap_kv_scale_name(name, params_dict) | ||
if name is None: | ||
continue | ||
|
||
if is_pp_missing_parameter(name, self): | ||
continue | ||
|
||
param = params_dict[name] | ||
weight_loader = getattr(param, "weight_loader", | ||
default_weight_loader) | ||
weight_loader(param, loaded_weight) | ||
|
||
# Special handling: when AITER fusion_shared_experts is enabled, | ||
# checkpoints may provide a single widened shared_experts tensor | ||
# without explicit expert indices | ||
# (e.g. ...mlp.shared_experts.gate_proj.weight). | ||
# For models with multiple shared experts, split that tensor | ||
# evenly into per-shared-expert slices and load them into | ||
# appended expert slots mlp.experts.{n_routed_experts + j}.* | ||
# accordingly. | ||
num_chunks = 1 | ||
if is_fuse_shared_experts_layer: | ||
num_chunks = getattr(self.config, "n_shared_experts", | ||
1) or 1 | ||
# Determine split axis based on op type | ||
# gate/up: ColumnParallel → split along dim 0 | ||
# down: RowParallel → split along dim 1 | ||
split_dim = 1 if "down_proj.weight" in name else 0 | ||
total = loaded_weight.shape[split_dim] | ||
assert total % num_chunks == 0, ( | ||
f"Shared expert weight dim {total} " | ||
f"not divisible by num_chunks {num_chunks}") | ||
chunk_size = total // num_chunks | ||
|
||
for j in range(num_chunks): | ||
chunk_name = name | ||
weight_to_load = loaded_weight | ||
|
||
if is_fuse_shared_experts_layer: | ||
if split_dim == 0: | ||
weight_to_load = loaded_weight[j * | ||
chunk_size:(j + 1) * | ||
chunk_size, :] | ||
else: | ||
weight_to_load = loaded_weight[:, j * | ||
chunk_size:(j + 1) * | ||
chunk_size] | ||
# Synthesize an expert-style name so expert mapping | ||
# can route it | ||
chunk_name = name.replace( | ||
"mlp.shared_experts", | ||
f"mlp.experts.{self.config.n_routed_experts + j}") | ||
|
||
# Use expert_params_mapping to locate the destination | ||
# param and delegate to its expert-aware weight_loader | ||
# with expert_id. | ||
for mapping in expert_params_mapping: | ||
param_name, weight_name, expert_id, shard_id = mapping | ||
if weight_name not in chunk_name: | ||
continue | ||
|
||
# Anyway, this is an expert weight and should not be | ||
# attempted to load as other weights later | ||
is_expert_weight = True | ||
|
||
# Do not modify `name` since the loop may continue here | ||
# Instead, create a new variable | ||
name_mapped = chunk_name.replace( | ||
weight_name, param_name) | ||
|
||
if is_pp_missing_parameter(name_mapped, self): | ||
continue | ||
|
||
param = params_dict[name_mapped] | ||
# We should ask the weight loader to return success or | ||
# not here since otherwise we may skip experts with | ||
# other available replicas. | ||
weight_loader = typing.cast(Callable[..., bool], | ||
param.weight_loader) | ||
success = weight_loader(param, | ||
weight_to_load, | ||
name_mapped, | ||
shard_id=shard_id, | ||
expert_id=expert_id, | ||
return_success=True) | ||
if success: | ||
if not is_fuse_shared_experts_layer: | ||
name = name_mapped | ||
break | ||
else: | ||
if is_expert_weight: | ||
# We've checked that this is an expert weight | ||
# However it's not mapped locally to this rank | ||
# So we simply skip it | ||
continue | ||
|
||
# Skip loading extra bias for GPTQ models. | ||
if name.endswith(".bias") and name not in params_dict: | ||
continue | ||
|
||
# Remapping the name of FP8 kv-scale. | ||
name = maybe_remap_kv_scale_name(name, params_dict) | ||
if name is None: | ||
continue | ||
|
||
if is_pp_missing_parameter(name, self): | ||
continue | ||
|
||
param = params_dict[name] | ||
weight_loader = getattr(param, "weight_loader", | ||
default_weight_loader) | ||
weight_loader(param, loaded_weight) | ||
loaded_params.add(name) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic for loading shared expert weights when is_fuse_shared_experts_layer
is true does not correctly update the loaded_params
set. It adds the original shared expert tensor name (e.g., ...mlp.shared_experts.gate_proj.weight
) to loaded_params
, but it actually loads the weights into multiple, chunked expert parameters (e.g., mlp.experts.64.*
, mlp.experts.65.*
, etc.).
As a result, vLLM will not be aware that these chunked expert parameters have been loaded, which will likely lead to "missing keys" errors at the end of the weight loading process or incorrect model behavior if those checks are bypassed.
The fix is to add each name_mapped
to loaded_params
as it is successfully loaded within the for j in range(num_chunks):
loop, and then prevent the original shared expert name from being added to loaded_params
at the end of the outer loop over weights.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@kliuae , is this comment relevant? I'm not familiar enough with the weight loading to know.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently it's not checked in vLLM, and yes down the line it'd be good to align the loaded_params with the names actually loaded. We'll make updates accordingly to reflect this.
cc @qli88 |
Signed-off-by: kliuae <[email protected]>
Signed-off-by: kliuae <[email protected]>
Signed-off-by: kliuae <[email protected]>
Signed-off-by: kliuae <[email protected]>
Signed-off-by: kliuae <[email protected]>
Signed-off-by: kliuae <[email protected]>
This pull request has merge conflicts that must be resolved before it can be |
Signed-off-by: kliuae <[email protected]>
This pull request has merge conflicts that must be resolved before it can be |
@bnellnm I have made changes addressing your comments, and the branch has been synced with the upstream. Can you help with the review? Thanks |
This pull request has merge conflicts that must be resolved before it can be |
if config.n_shared_experts is None: | ||
if ( | ||
config.n_shared_experts is None | ||
or is_rocm_aiter_fusion_shared_expert_enabled() | ||
): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that I refactored this recently so that there's only an instance of SharedFusedMoE
. It can handle when self.shared_experts
is None
so it should be simple to keep it mostly the same except for passing n_shared_experts
when config.n_shared_experts()
is true.
# Use expert_params_mapping to locate the destination | ||
# param and delegate to its expert-aware weight_loader | ||
# with expert_id. | ||
for mapping in expert_params_mapping: | ||
param_name, weight_name, expert_id, shard_id = mapping | ||
if weight_name not in chunk_name: | ||
continue | ||
|
||
# Anyway, this is an expert weight and should not be | ||
# attempted to load as other weights later | ||
is_expert_weight = True | ||
|
||
# Do not modify `name` since the loop may continue here | ||
# Instead, create a new variable | ||
name_mapped = chunk_name.replace(weight_name, param_name) | ||
|
||
if is_pp_missing_parameter(name_mapped, self): | ||
continue | ||
|
||
param = params_dict[name_mapped] | ||
# We should ask the weight loader to return success or | ||
# not here since otherwise we may skip experts with | ||
# other available replicas. | ||
weight_loader = typing.cast( | ||
Callable[..., bool], param.weight_loader | ||
) | ||
success = weight_loader( | ||
param, | ||
weight_to_load, | ||
name_mapped, | ||
shard_id=shard_id, | ||
expert_id=expert_id, | ||
return_success=True, | ||
) | ||
if success: | ||
if not is_fuse_shared_experts_layer: | ||
name = name_mapped | ||
break | ||
else: | ||
if is_expert_weight: | ||
# We've checked that this is an expert weight | ||
# However it's not mapped locally to this rank | ||
# So we simply skip it | ||
continue | ||
|
||
# Skip loading extra bias for GPTQ models. | ||
if name.endswith(".bias") and name not in params_dict: | ||
continue | ||
|
||
# Remapping the name of FP8 kv-scale. | ||
name = maybe_remap_kv_scale_name(name, params_dict) | ||
if name is None: | ||
continue | ||
|
||
if is_pp_missing_parameter(name, self): | ||
continue | ||
|
||
param = params_dict[name] | ||
weight_loader = getattr( | ||
param, "weight_loader", default_weight_loader | ||
) | ||
weight_loader(param, loaded_weight) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this bit basically the same as before? It's a little hard to tell the way the diff shows up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes this is largely the same as before. The changes are that, since in deepseek-v2-lite the multiple shared experts' weights are provided as single weights tensors, load_weights chunks the tensors by the number of shared experts and wraps their weight loading in a loop. Other than that, for the other layers and when shared experts fusion is not enabled, this number of chunks is set to one, and their loading logic should remain the same.
[top_k, n_shared_experts + is_EP], dim=1 | ||
) | ||
s_topk_weights.fill_(shared_experts_score) | ||
aiter_topK_meta_data = (total_topk_weights, total_topk_ids) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you assert aiter_topK_meta_data is None
here so that if we run into the situation where the parameters to the init function change, we don't silently overwrite the global with a different value? I assume since the init function is cached it should only ever do this assignment once as long as the input parameters remain unchanged.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, nice work! I had a few final questions/comments though.
Signed-off-by: kliuae <[email protected]>
Signed-off-by: kliuae <[email protected]>
Signed-off-by: kliuae <[email protected]>
Signed-off-by: kliuae <[email protected]>
…m/EmbeddedLLM/vllm into upstream-aiter-fmoe-sharedexperts
…llm-project#24097) Signed-off-by: chenjun <[email protected]> Signed-off-by: kliuae <[email protected]> Co-authored-by: valarLip <[email protected]> Co-authored-by: TJian <[email protected]>
…llm-project#24097) Signed-off-by: chenjun <[email protected]> Signed-off-by: kliuae <[email protected]> Co-authored-by: valarLip <[email protected]> Co-authored-by: TJian <[email protected]> Signed-off-by: Alberto Perdomo <[email protected]>
…llm-project#24097) Signed-off-by: chenjun <[email protected]> Signed-off-by: kliuae <[email protected]> Co-authored-by: valarLip <[email protected]> Co-authored-by: TJian <[email protected]> Signed-off-by: Alberto Perdomo <[email protected]>
…llm-project#24097) Signed-off-by: chenjun <[email protected]> Signed-off-by: kliuae <[email protected]> Co-authored-by: valarLip <[email protected]> Co-authored-by: TJian <[email protected]>
Purpose
This PR targets ROCm AITER, introducing a flag‑gated path that fuses DeepSeek models’ shared_experts into the AITER's FusedMoE kernel, reducing separate MLP and addition overhead while preserving numeric behavior.
When shared experts fusion is enabled, the shared experts are viewed as synthetic routed experts after the original routed experts and receive allocated top‑k slots through grouped_topk, enabling a single fused MoE dispatch for both shared and routed experts.
This feature can be controlled by the environment flag
VLLM_ROCM_USE_AITER_FUSION_SHARED_EXPERTS
which is only effective whenVLLM_ROCM_USE_AITER_MOE
is set.Test Plan
The following tests validate DeepSeek models by collecting benchmark metrics and performning correctness tests through lm_eval.
vLLM server launch command:
Benchmark commands:
lm_eval command:
Test Result
Benchmark results
deepseek-ai/DeepSeek-R1 on sharegpt dataset
Shared Experts
Rate
deepseek-ai/DeepSeek-R1 on random dataset, input-len/output-len: 1k/1k
Shared Experts
Rate
Accuracy test
deepseek-ai/DeepSeek-R1
Shared Experts
deepseek-ai/DeepSeek-V3
Shared Experts
deepseek-ai/DeepSeek-V2-Lite-Chat
Shared Experts
Essential Elements of an Effective PR Description Checklist
supported_models.md
andexamples
for a new model.