diff --git a/tests/distributed/test_comm_ops.py b/tests/distributed/test_comm_ops.py index 9474cb21599d..9f815b62b5d3 100644 --- a/tests/distributed/test_comm_ops.py +++ b/tests/distributed/test_comm_ops.py @@ -8,7 +8,6 @@ from vllm.model_executor.parallel_utils.communication_op import ( tensor_model_parallel_all_reduce, - tensor_model_parallel_all_gather, broadcast_tensor_dict, ) from vllm.test_utils import (init_test_distributed_environment, @@ -31,28 +30,6 @@ def all_reduce_test_worker(tensor_parallel_size: int, rank: int, assert torch.allclose(t, expected) -@ray.remote(num_gpus=1, max_calls=1) -def all_gather_test_worker(tensor_parallel_size: int, rank: int, - distributed_init_port: str): - init_test_distributed_environment(1, tensor_parallel_size, rank, - distributed_init_port) - num_dimensions = 3 - tensor_size = list(range(2, num_dimensions + 2)) - total_size = 1 - for s in tensor_size: - total_size *= s - for all_gather_dimension in range(num_dimensions): - all_tensors = [ - torch.arange(total_size, dtype=torch.float32, - device="cuda").reshape(tensor_size) * (r + 1) - for r in range(tensor_parallel_size) - ] - expected = torch.cat(all_tensors, dim=all_gather_dimension) - t = all_tensors[rank] - t = tensor_model_parallel_all_gather(t, all_gather_dimension) - assert torch.allclose(t, expected) - - @ray.remote(num_gpus=1, max_calls=1) def broadcast_tensor_dict_test_worker(tensor_parallel_size: int, rank: int, distributed_init_port: str): @@ -84,9 +61,7 @@ def broadcast_tensor_dict_test_worker(tensor_parallel_size: int, rank: int, @pytest.mark.skipif(torch.cuda.device_count() < 2, reason="Need at least 2 GPUs to run the test.") @pytest.mark.parametrize("tensor_parallel_size", [2]) -@pytest.mark.parametrize("test_target", [ - all_reduce_test_worker, all_gather_test_worker, - broadcast_tensor_dict_test_worker -]) +@pytest.mark.parametrize( + "test_target", [all_reduce_test_worker, broadcast_tensor_dict_test_worker]) def test_multi_process_tensor_parallel(tensor_parallel_size, test_target): multi_process_tensor_parallel(tensor_parallel_size, test_target) diff --git a/vllm/lora/layers.py b/vllm/lora/layers.py index e667d70f71e3..c51ee1d30c57 100644 --- a/vllm/lora/layers.py +++ b/vllm/lora/layers.py @@ -12,7 +12,6 @@ from vllm.lora.punica import add_lora, add_lora_slice, bgmv from vllm.model_executor.layers.sampler import Sampler from vllm.model_executor.parallel_utils.communication_op import ( - tensor_model_parallel_all_gather, tensor_model_parallel_all_reduce, tensor_model_parallel_gather, ) @@ -369,14 +368,9 @@ def forward(self, input_): # Matrix multiply. output_parallel = self.apply_weights(input_, bias) - if self.base_layer.gather_output: - # All-gather across the partitions. - output = tensor_model_parallel_all_gather(output_parallel) - else: - output = output_parallel output_bias = (self.base_layer.bias if self.base_layer.skip_bias_add else None) - return output, output_bias + return output_parallel, output_bias @property def linear_weights(self): diff --git a/vllm/model_executor/layers/linear.py b/vllm/model_executor/layers/linear.py index b2396a1d6f14..c3c42beff295 100644 --- a/vllm/model_executor/layers/linear.py +++ b/vllm/model_executor/layers/linear.py @@ -8,7 +8,7 @@ from vllm.model_executor.parallel_utils.parallel_state import ( get_tensor_model_parallel_rank, get_tensor_model_parallel_world_size) from vllm.model_executor.parallel_utils.communication_op import ( - tensor_model_parallel_all_reduce, tensor_model_parallel_all_gather) + tensor_model_parallel_all_reduce) from vllm.model_executor.parallel_utils.utils import ( divide, split_tensor_along_last_dim) from vllm.model_executor.utils import set_weight_attrs @@ -214,13 +214,8 @@ def forward(self, input_): # Matrix multiply. output_parallel = self.linear_method.apply_weights( self.linear_weights, input_, bias) - if self.gather_output: - # All-gather across the partitions. - output = tensor_model_parallel_all_gather(output_parallel) - else: - output = output_parallel output_bias = self.bias if self.skip_bias_add else None - return output, output_bias + return output_parallel, output_bias class MergedColumnParallelLinear(ColumnParallelLinear): diff --git a/vllm/model_executor/parallel_utils/communication_op.py b/vllm/model_executor/parallel_utils/communication_op.py index cf805df892fd..a99d3b4a7aae 100644 --- a/vllm/model_executor/parallel_utils/communication_op.py +++ b/vllm/model_executor/parallel_utils/communication_op.py @@ -41,34 +41,6 @@ def tensor_model_parallel_all_reduce(input_: torch.Tensor) -> torch.Tensor: return input_ -def tensor_model_parallel_all_gather(input_: torch.Tensor, - dim: int = -1) -> torch.Tensor: - """All-gather the input tensor across model parallel group.""" - world_size = get_tensor_model_parallel_world_size() - # Bypass the function if we are using only 1 GPU. - if world_size == 1: - return input_ - assert -input_.dim() <= dim < input_.dim(), ( - f"Invalid dim ({dim}) for input tensor with shape {input_.size()}") - if dim < 0: - # Convert negative dim to positive. - dim += input_.dim() - input_size = input_.size() - # Allocate output tensor. - output_tensor = torch.empty((world_size, ) + input_size, - dtype=input_.dtype, - device=input_.device) - # All-gather. - torch.distributed.all_gather_into_tensor( - output_tensor, input_, group=get_tensor_model_parallel_group()) - # Reshape - output_tensor = output_tensor.movedim(0, dim) - output_tensor = output_tensor.reshape(input_size[:dim] + - (world_size * input_size[dim], ) + - input_size[dim + 1:]) - return output_tensor - - def tensor_model_parallel_gather(input_: torch.Tensor, dst: int = 0, dim: int = -1) -> torch.Tensor: