diff --git a/tests/kernels/moe/modular_kernel_tools/common.py b/tests/kernels/moe/modular_kernel_tools/common.py index b5fcc4cd70bf..57f5befa36f2 100644 --- a/tests/kernels/moe/modular_kernel_tools/common.py +++ b/tests/kernels/moe/modular_kernel_tools/common.py @@ -522,11 +522,11 @@ def make_modular_kernel( quant_config: FusedMoEQuantConfig, ) -> mk.FusedMoEModularKernel: - def next_power_of_2(x): + def next_power_of_2(x) -> int: import math if x == 0: return 1 - return 2**math.ceil(math.log2(x)) + return int(2**math.ceil(math.log2(x))) # make moe config moe_parallel_config: FusedMoEParallelConfig = FusedMoEParallelConfig.make( @@ -542,7 +542,7 @@ def next_power_of_2(x): num_local_experts=config.num_local_experts, moe_parallel_config=moe_parallel_config, in_dtype=config.dtype, - max_num_tokens=next_power_of_2(config.M), + max_num_tokens=max(128, next_power_of_2(config.M)), ) # make modular kernel diff --git a/tests/kernels/moe/modular_kernel_tools/mk_objects.py b/tests/kernels/moe/modular_kernel_tools/mk_objects.py index 57a1da7b4b1a..5a3e35540d01 100644 --- a/tests/kernels/moe/modular_kernel_tools/mk_objects.py +++ b/tests/kernels/moe/modular_kernel_tools/mk_objects.py @@ -76,6 +76,7 @@ class ExpertInfo: common_float_and_int_types = common_float_types + [torch.int8] nvfp4_types = ["nvfp4"] fp8_types = [torch.float8_e4m3fn] +fp8_bf16_types = [torch.float8_e4m3fn, torch.bfloat16] def register_prepare_and_finalize( @@ -146,13 +147,13 @@ def expert_info(kind) -> ExpertInfo: return info -register_prepare_and_finalize( - MoEPrepareAndFinalizeNoEP, - standard_format, - common_float_types, - blocked_quantization_support=True, - backend=None, -) +# register_prepare_and_finalize( +# MoEPrepareAndFinalizeNoEP, +# standard_format, +# common_float_types, +# blocked_quantization_support=True, +# backend=None, +# ) register_experts( BatchedTritonExperts, @@ -184,29 +185,39 @@ def expert_info(kind) -> ExpertInfo: ) # Disable on blackwell for now -if has_deep_ep() and not current_platform.has_device_capability(100): +if has_deep_ep(): # and not current_platform.has_device_capability(100): from vllm.model_executor.layers.fused_moe.deepep_ht_prepare_finalize import ( # noqa: E501 DeepEPHTPrepareAndFinalize) from vllm.model_executor.layers.fused_moe.deepep_ll_prepare_finalize import ( # noqa: E501 DeepEPLLPrepareAndFinalize) + from vllm.model_executor.layers.fused_moe.deepep_hybrid_prepare_finalize import ( # noqa: E501 + DeepEPHybridPrepareAndFinalize) + + # register_prepare_and_finalize( + # DeepEPHTPrepareAndFinalize, + # standard_format, + # common_float_types, + # blocked_quantization_support=True, + # backend="deepep_high_throughput", + # ) + + # register_prepare_and_finalize( + # DeepEPLLPrepareAndFinalize, + # batched_format, + # common_float_types, + # blocked_quantization_support=True, + # backend="deepep_low_latency", + # ) register_prepare_and_finalize( - DeepEPHTPrepareAndFinalize, + DeepEPHybridPrepareAndFinalize, standard_format, - common_float_types, - blocked_quantization_support=True, - backend="deepep_high_throughput", - ) - - register_prepare_and_finalize( - DeepEPLLPrepareAndFinalize, - batched_format, - common_float_types, + fp8_bf16_types, blocked_quantization_support=True, - backend="deepep_low_latency", + backend="deepep_hybrid", ) -if has_pplx(): +if False and has_pplx(): from vllm.model_executor.layers.fused_moe.pplx_prepare_finalize import ( PplxPrepareAndFinalize) register_prepare_and_finalize( @@ -217,7 +228,7 @@ def expert_info(kind) -> ExpertInfo: backend="pplx", ) -if (has_flashinfer_cutlass_fused_moe() +if False and (has_flashinfer_cutlass_fused_moe() and current_platform.has_device_capability(100)): from vllm.model_executor.layers.fused_moe.flashinfer_cutlass_moe import ( # noqa: E501 FlashInferExperts) diff --git a/vllm/distributed/device_communicators/all2all.py b/vllm/distributed/device_communicators/all2all.py index 661ed939608a..4b0e2b676e6b 100644 --- a/vllm/distributed/device_communicators/all2all.py +++ b/vllm/distributed/device_communicators/all2all.py @@ -248,6 +248,33 @@ def set_num_sms(self, num_sms: int): deep_ep.Buffer.set_num_sms(num_sms) +class DeepEPHybridAll2AllManager(DeepEPAll2AllManagerBase): + """ + All2All communication based on DeepEP Hybrid kernels. + """ + + def __init__(self, cpu_group): + super().__init__(cpu_group) + + def _make_all2all_kwargs(self, **kwargs) -> dict[Any, Any]: + extra_kwargs = dict(group=self.cpu_group, + num_sms_dispatch_api = 32, + num_sms_combine_api = 32, + num_sms_preprocessing_api = 128, + nvlink_domain_size = 2, # hack for now. dp world_size + ) + return {**kwargs, **extra_kwargs} + + def get_handle(self, kwargs): + import deep_ep + buffer_kwargs = self._make_all2all_kwargs(**kwargs) + logger.debug("DeepEP Hybrid all2all args %s", buffer_kwargs) + handle: deep_ep.Buffer = self.handle_cache.get_or_create( + buffer_kwargs, deep_ep.HybridEpBuffer) + logger.debug("DeepEP Hybrid constructed.") + return handle + + class DeepEPLLAll2AllManager(DeepEPAll2AllManagerBase): """ All2All communication based on DeepEP Low-Latency kernels. @@ -395,4 +422,4 @@ def cleanup(self): self.workspace_tensor = None self.prepare_workspace_tensor = None self.mapping = None - self.initialized = False \ No newline at end of file + self.initialized = False diff --git a/vllm/distributed/device_communicators/cuda_communicator.py b/vllm/distributed/device_communicators/cuda_communicator.py index bab372b722db..27b296f40ebd 100644 --- a/vllm/distributed/device_communicators/cuda_communicator.py +++ b/vllm/distributed/device_communicators/cuda_communicator.py @@ -114,6 +114,10 @@ def __init__(self, from .all2all import DeepEPLLAll2AllManager self.all2all_manager = DeepEPLLAll2AllManager(self.cpu_group) logger.info("Using DeepEP Low-Latency all2all manager.") + elif all2all_backend == "deepep_hybrid": + from .all2all import DeepEPHybridAll2AllManager + self.all2all_manager = DeepEPHybridAll2AllManager(self.cpu_group) + logger.info("Using DeepEP Hybrid all2all manager.") elif all2all_backend == "flashinfer_all2allv": from .all2all import FlashInferAllToAllManager self.all2all_manager = FlashInferAllToAllManager( diff --git a/vllm/envs.py b/vllm/envs.py index 4797d96bb899..220c97268b46 100755 --- a/vllm/envs.py +++ b/vllm/envs.py @@ -156,6 +156,7 @@ VLLM_ALL2ALL_BACKEND: Literal["naive", "pplx", "deepep_high_throughput", "deepep_low_latency", + "deepep_hybrid", "allgather_reducescatter", "flashinfer_all2allv"] = \ "allgather_reducescatter" @@ -1214,10 +1215,11 @@ def get_vllm_port() -> Optional[int]: "VLLM_ALL2ALL_BACKEND": env_with_choices("VLLM_ALL2ALL_BACKEND", "allgather_reducescatter", ["naive", "pplx", - "deepep_high_throughput", - "deepep_low_latency", - "allgather_reducescatter", - "flashinfer_all2allv"]), + "deepep_high_throughput", + "deepep_low_latency", + "deepep_hybrid", + "allgather_reducescatter", + "flashinfer_all2allv"]), # Flashinfer MoE backend for vLLM's fused Mixture-of-Experts support. # Both require compute capability 10.0 or above. diff --git a/vllm/model_executor/layers/fused_moe/config.py b/vllm/model_executor/layers/fused_moe/config.py index 34bfe1c16aac..cdb314ead8c3 100644 --- a/vllm/model_executor/layers/fused_moe/config.py +++ b/vllm/model_executor/layers/fused_moe/config.py @@ -618,6 +618,11 @@ def use_deepep_ll_kernels(self): return (self.use_all2all_kernels and envs.VLLM_ALL2ALL_BACKEND == "deepep_low_latency") + @property + def use_deepep_hybrid_kernels(self): + return (self.use_all2all_kernels + and envs.VLLM_ALL2ALL_BACKEND == "deepep_hybrid") + @staticmethod def make(tp_size_: int, dp_size_: int, vllm_parallel_config: ParallelConfig) -> "FusedMoEParallelConfig": @@ -794,6 +799,10 @@ def use_deepep_ht_kernels(self): def use_deepep_ll_kernels(self): return self.moe_parallel_config.use_deepep_ll_kernels + @property + def use_deepep_hybrid_kernels(self): + return self.moe_parallel_config.use_deepep_hybrid_kernels + @property def use_flashinfer_cutlass_kernels(self): """ diff --git a/vllm/model_executor/layers/fused_moe/deepep_ht_prepare_finalize.py b/vllm/model_executor/layers/fused_moe/deepep_ht_prepare_finalize.py index 9e9a9afc18a0..e9e85f304f6a 100644 --- a/vllm/model_executor/layers/fused_moe/deepep_ht_prepare_finalize.py +++ b/vllm/model_executor/layers/fused_moe/deepep_ht_prepare_finalize.py @@ -132,6 +132,13 @@ def _do_dispatch( async_finish=self.async_prepare and not dbo_enabled(), allocate_on_comm_stream=False) + print(f"HT STUFF\n" + f"a1 = {tokens.shape} -> {token_data.shape}\n" + f"topk_ids={expert_topk_ids.shape}\n" + f"probs={expert_topk_weights.shape}\n" + f"lem shape={expert_num_tokens_per_expert_list}\n" + ) + # record the handle for this ubatch a2a_idx = dbo_current_ubatch_id() self.handles[a2a_idx] = handle diff --git a/vllm/model_executor/layers/fused_moe/deepep_hybrid_prepare_finalize.py b/vllm/model_executor/layers/fused_moe/deepep_hybrid_prepare_finalize.py new file mode 100644 index 000000000000..a87ff7c5489b --- /dev/null +++ b/vllm/model_executor/layers/fused_moe/deepep_hybrid_prepare_finalize.py @@ -0,0 +1,218 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project +from typing import Callable, Optional, Union + +import deep_ep +import torch +import torch.nn.functional as F + +import vllm.model_executor.layers.fused_moe.modular_kernel as mk +from vllm.model_executor.layers.fused_moe.config import FusedMoEQuantConfig +from vllm.model_executor.layers.fused_moe.topk_weight_and_reduce import ( + TopKWeightAndReduceContiguous, TopKWeightAndReduceDelegate) +from vllm.model_executor.layers.fused_moe.utils import ( + moe_kernel_quantize_input) +from vllm.utils import round_up +from vllm.v1.worker.ubatching import ( + dbo_current_ubatch_id, dbo_enabled, dbo_switch_to_comm, + dbo_switch_to_compute, dbo_switch_to_compute_sync, + dbo_yield_and_switch_from_comm_to_compute, + dbo_yield_and_switch_from_compute_to_comm) + + +class DeepEPHybridPrepareAndFinalize(mk.FusedMoEPrepareAndFinalize): + """ + Prepare/Finalize using DeepEP High-Throughput kernels. + """ + + @staticmethod + def maybe_roundup_layer_hidden_size(hidden_size: int, + dtype: torch.dtype) -> int: + # Round up hidden size so it is compatible with DeepEP High Throughput + # kernels. + # DeepEP intranode kernels make copies in units of, + # 32(warp-size) int4 elements. Round up hidden size to respect this. + # For example, an input hidden size of 2880 with dtype torch.bfloat16 + # will be rounded up to 3072. + hidden_size_bytes = hidden_size * dtype.itemsize + xfer_atom_size = 512 # 32 * 16 (size(int4)) + if hidden_size_bytes % xfer_atom_size == 0: + return hidden_size + + hidden_size_bytes = round_up(hidden_size_bytes, xfer_atom_size) + return hidden_size_bytes // dtype.itemsize + + def __init__(self, buffer: deep_ep.HybridEpBuffer, num_dispatchers: int, + dp_size: int, rank_expert_offset: int): + super().__init__() + self.buffer = buffer + self.num_dispatchers_ = num_dispatchers + self.dp_size = dp_size + self.rank_expert_offset = rank_expert_offset #? + self.handle = None + self.expert_probs = None + + # From https://github.com/deepseek-ai/DeepEP/blob/9fe9021f29c9083cd1808ab36b740208524d9f63/deep_ep/buffer.py#L164 + self.available_rank_configs = [2, 4, 8, 16, 24, 32, 64, 128, 144, 160] + + def num_dispatchers(self) -> int: + return self.num_dispatchers_ + + @property + def activation_format(self) -> mk.FusedMoEActivationFormat: + return mk.FusedMoEActivationFormat.Standard + + def max_num_tokens_per_rank(self) -> Optional[int]: + return None + + def topk_indices_dtype(self) -> Optional[torch.dtype]: + return torch.int64 + + def _get_dispatch_config(self) -> Optional[deep_ep.Config]: + if self.num_dispatchers_ not in self.available_rank_configs: + return None + return deep_ep.Buffer.get_dispatch_config(self.num_dispatchers_) + + def _get_combine_config(self) -> Optional[deep_ep.Config]: + if self.num_dispatchers_ not in self.available_rank_configs: + return None + return deep_ep.Buffer.get_combine_config(self.num_dispatchers_) + + def supports_async(self) -> bool: + return False # combine async not supported + + def prepare( + self, + a1: torch.Tensor, + topk_weights: torch.Tensor, + topk_ids: torch.Tensor, + num_experts: int, + expert_map: Optional[torch.Tensor], + apply_router_weight_on_input: bool, + quant_config: FusedMoEQuantConfig, + ) -> mk.PrepareResultType: + + if apply_router_weight_on_input: + topk = topk_ids.size(1) + # TODO: this only works for topK=1, will need to update for topK>1 + assert topk == 1, ( + "apply_router_weight_on_input is only implemented for topk=1") + a1 = a1 * topk_weights.to(a1.dtype) + + if quant_config.is_block_quantized: + # Quant and Dispatch + a1q, a1q_scale = moe_kernel_quantize_input( + a1, + quant_config.a1_scale, + quant_dtype=quant_config.quant_dtype, + per_act_token_quant=quant_config.per_act_token_quant, + block_shape=quant_config.block_shape, + ) + if a1q_scale is not None and a1q_scale.numel() == 1: + a1q_scale = a1q_scale.view(1, 1) + a1_post_scale = None + else: + a1q = a1 + a1q_scale = torch.ones(1, device=a1.device, dtype=torch.float32) # hack + a1_post_scale = quant_config.a1_scale + + ( + expert_x, expert_probs, expert_x_scale, handle + ) = self.buffer.dispatch( + tensor=a1q, + scaling_factor=a1q_scale, + topk_idx=topk_ids, + topk_weights=topk_weights, + routing_map=None, # None = generated dynamically + handle=None, + num_of_tokens_for_experts=-1, #?? + ) + self.handle = handle + self.expert_probs = expert_probs + + (sparse_to_dense_map, + rdma_to_attn_map, + attn_to_rdma_map, + num_of_tokens_for_experts, + local_expert_routing_map, + num_tokens) = self.handle + + num_of_tokens_for_experts = num_of_tokens_for_experts.cpu() + + print(f"STUFF\n" + f"rank_exp_offset = {self.rank_expert_offset}\n" + f"a={a1q.shape}/{a1q.dtype} -> {expert_x.shape}/{expert_x.dtype}\n" + f"topk_ids={topk_ids.shape}\n" + f"tok_for_exp={num_of_tokens_for_experts}\n" + f"probs={expert_probs.shape}\n" + f"lem shape={local_expert_routing_map.shape}, {local_expert_routing_map[:num_of_tokens_for_experts].shape}\n" + f"lem numel={local_expert_routing_map.nonzero().numel()}\n" + #f"lem={local_expert_routing_map}\n" + f"lem sum={local_expert_routing_map.sum(dim=1).shape}\n" + f"sparse_to_dense_map={sparse_to_dense_map.shape} {sparse_to_dense_map.dtype} {sparse_to_dense_map}\n" + f"rdma_to_attn_map={rdma_to_attn_map.shape} {rdma_to_attn_map.dtype} {rdma_to_attn_map}\n" + f"attn_to_rdma_map={attn_to_rdma_map.shape} {attn_to_rdma_map.dtype}\n" + f"num_tokens={num_tokens}\n" + ) + + local_expert_routing_map = local_expert_routing_map[:num_of_tokens_for_experts.item()] + + # TBD + new_topk_ids = None + + # N/A + expert_tokens_meta = None + + # Dispatch and Quant + # DeepEP kernels only support dispatching block-quantized + # activation scales. + # Dispatch in bfloat16 and quantize afterwards + if not quant_config.is_block_quantized: + # Quantize after dispatch. + expert_x_scale = None + if expert_x.numel() != 0: + expert_x, expert_x_scale = moe_kernel_quantize_input( + expert_x, + a1_post_scale, + quant_dtype=quant_config.quant_dtype, + per_act_token_quant=False, + block_shape=quant_config.block_shape) + + return (expert_x, expert_x_scale, expert_tokens_meta, new_topk_ids, expert_probs) + + def finalize( + self, + output: torch.Tensor, + fused_expert_output: torch.Tensor, + topk_weights: torch.Tensor, + topk_ids: torch.Tensor, + apply_router_weight_on_input: bool, + weight_and_reduce_impl: mk.TopKWeightAndReduce, + ) -> None: + # fused_expert_output can have 0 tokens - This happens when none of the + # tokens from the all2all reach this EP rank. + if False and fused_expert_output.numel() != 0: + if isinstance(weight_and_reduce_impl, TopKWeightAndReduceDelegate): + weight_and_reduce_impl = TopKWeightAndReduceContiguous() + fused_expert_output = weight_and_reduce_impl.apply( + output=None, + fused_expert_output=fused_expert_output, + topk_weights=topk_weights, + topk_ids=topk_ids, + apply_router_weight_on_input=apply_router_weight_on_input, + ) + + print(f"\nCOMBINE START({self.rank_expert_offset})\n") + + combined_x, _ = self.buffer.combine( + tensor=fused_expert_output, + probs=self.expert_probs, # None? + handle=self.handle, + ) + + print(f"\nCOMBINE END({self.rank_expert_offset}) {combined_x.shape}/{combined_x.dtype}\n") + + # TODO(lucas): support this case with the refactored modular kernel + # Respect inplace outputs. + # apply weights??? + output.copy_(combined_x, non_blocking=True) diff --git a/vllm/model_executor/layers/fused_moe/layer.py b/vllm/model_executor/layers/fused_moe/layer.py index b68190e5d1c1..7627aca4b49e 100644 --- a/vllm/model_executor/layers/fused_moe/layer.py +++ b/vllm/model_executor/layers/fused_moe/layer.py @@ -50,6 +50,7 @@ pplx_hidden_dim_scale_bytes) if has_deep_ep(): from .deepep_ht_prepare_finalize import DeepEPHTPrepareAndFinalize + from .deepep_hybrid_prepare_finalize import DeepEPHybridPrepareAndFinalize from .deepep_ll_prepare_finalize import (DEEPEP_QUANT_BLOCK_SHAPE, DeepEPLLPrepareAndFinalize) else: @@ -203,6 +204,29 @@ def _maybe_make_prepare_finalize( num_dispatchers=all2all_manager.world_size, use_fp8_dispatch=use_fp8_dispatch, ) + elif moe.use_deepep_hybrid_kernels: + assert moe.dp_size == all2all_manager.dp_world_size + + use_fp8 = quant_config.use_fp8_w8a8 if quant_config is not None else False + + all_to_all_args = dict( + hidden_dim=moe.hidden_dim, + max_num_of_tokens_per_rank=moe.max_num_tokens, + num_local_experts=(moe.num_experts // all2all_manager.world_size), + num_of_experts=moe.num_experts, + use_fp8=use_fp8, + ) + + print(f"MAX NUM TOKENS = {moe.max_num_tokens}") + + handle = all2all_manager.get_handle(all_to_all_args) + prepare_finalize = DeepEPHybridPrepareAndFinalize( + handle, + num_dispatchers=all2all_manager.world_size, + dp_size=all2all_manager.dp_world_size, + rank_expert_offset=all2all_manager.rank * + moe.num_local_experts, + ) return prepare_finalize @@ -1023,6 +1047,8 @@ def __init__( max_num_tokens=envs.VLLM_MOE_DP_CHUNK_SIZE, has_bias=has_bias, ) + print(f"VLLM_MOE_DP_CHUNK_SIZE={envs.VLLM_MOE_DP_CHUNK_SIZE}") + self.moe_config = moe self.moe_quant_config: Optional[FusedMoEQuantConfig] = None self.quant_config = quant_config @@ -1145,6 +1171,10 @@ def use_deepep_ht_kernels(self): def use_deepep_ll_kernels(self): return self.moe_parallel_config.use_deepep_ll_kernels + @property + def use_deepep_hybrid_kernels(self): + return self.moe_parallel_config.use_deepep_hybrid_kernels + @property def use_flashinfer_cutlass_kernels(self): return (self.moe_quant_config is not None @@ -1693,7 +1723,7 @@ def must_reduce_shared_expert_outputs(self) -> bool: Therefore it is required that we reduce the shared_experts output early. """ - return (self.use_pplx_kernels or self.use_deepep_ht_kernels + return (self.use_pplx_kernels or self.use_deepep_ht_kernels or self.use_deepep_hybrid_kernels or self.use_deepep_ll_kernels) def maybe_all_reduce_tensor_model_parallel( @@ -1701,8 +1731,7 @@ def maybe_all_reduce_tensor_model_parallel( """ The pplx combine kernel reduces across GPU ranks by default. """ - if (self.use_pplx_kernels or self.use_deepep_ht_kernels - or self.use_deepep_ll_kernels): + if self.must_reduce_shared_expert_outputs(): return final_hidden_states else: return tensor_model_parallel_all_reduce(final_hidden_states) @@ -1895,6 +1924,7 @@ def forward_impl( do_naive_dispatch_combine: bool = ( self.dp_size > 1 and not self.moe_parallel_config.use_deepep_ht_kernels + and not self.moe_parallel_config.use_deepep_hybrid_kernels and not self.moe_config.use_flashinfer_cutlass_kernels) # If there are shared experts but we are not using a modular kernel, the diff --git a/vllm/model_executor/layers/fused_moe/modular_kernel.py b/vllm/model_executor/layers/fused_moe/modular_kernel.py index 4ba14196682a..35622e595eb9 100644 --- a/vllm/model_executor/layers/fused_moe/modular_kernel.py +++ b/vllm/model_executor/layers/fused_moe/modular_kernel.py @@ -81,8 +81,8 @@ def _moe_problem_size( if a1.dim() == 2: # Make sure we are using the correct a1 (pre-permute). - assert topk_ids.size(0) == a1.size(0), \ - f"{topk_ids.size(0)} != {a1.size(0)}" +# assert topk_ids.size(0) == a1.size(0), \ +# f"{topk_ids.size(0)} != {a1.size(0)}" M = a1.size(0) else: assert a1.dim() == 3 @@ -240,16 +240,16 @@ def prepare_async( - apply_router_weight_on_input: When True, apply the weights to the activations, before quantization + dispatching. - Returns a callback or a hook callback pair that when invoked waits for - results from other workers and has the same return signature as + Returns a callback or a hook callback pair that when invoked waits for + results from other workers and has the same return signature as `prepare`, if a hook is returned this is more lightweight check that - the recv is complete without doing extra work (used by DBO, will be + the recv is complete without doing extra work (used by DBO, will be refactored in the very near future) - + e.g. ret = obj.prepare_async(...) - + if isinstance(ret, tuple): hook, receiver = ret hook() @@ -310,10 +310,10 @@ def finalize_async( - weight_and_reduce_impl: An optional TopKWeightAndReduce implementation. - Returns a callback or a hook callback pair that when invoked waits for - results from other workers and has the same return signature as + Returns a callback or a hook callback pair that when invoked waits for + results from other workers and has the same return signature as `finalize`, if a hook is returned this is more lightweight check that - the recv is complete without doing extra work (used by DBO, will be + the recv is complete without doing extra work (used by DBO, will be refactored in the very near future) ret = obj.finalize_async(output, ...)