Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 28 additions & 86 deletions tests/distributed/test_pipeline_parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,23 +26,10 @@
VLLM_MULTI_NODE = os.getenv("VLLM_MULTI_NODE", "0") == "1"


@pytest.fixture(scope="function", autouse=True)
def use_v0_only(monkeypatch):
"""
For PP, we fall back to V0 by default. This means
that the TP baseline runs with V1 while the PP engine
runs with V0. This gives divergent results with dummy
weights. Once we enable V1 by default for PP, we can
remove this.
"""
monkeypatch.setenv('VLLM_USE_V1', '0')


class ParallelSetup(NamedTuple):
tp_size: int
pp_size: int
eager_mode: bool
chunked_prefill: bool


class PPTestOptions(NamedTuple):
Expand All @@ -53,23 +40,10 @@ class PPTestOptions(NamedTuple):
@dataclass
class PPTestSettings:
parallel_setups: list[ParallelSetup]
# NOTE: the length of distributed_backends and
# vllm_major_versions should be the same, and they
# are first zipped together to iterate over all
# test settings.
distributed_backends: list[str]
# vllm major version: "0" for V0, "1" for V1
vllm_major_versions: list[str]
runner: RunnerOption
test_options: PPTestOptions

def __post_init__(self):
if len(self.distributed_backends) != len(self.vllm_major_versions):
raise ValueError(
f"Length mismatch: distributed_backends "
f"({len(self.distributed_backends)}) != "
f"vllm_major_versions ({len(self.vllm_major_versions)})")

@staticmethod
def detailed(
*,
Expand All @@ -83,27 +57,21 @@ def detailed(
parallel_setups=[
ParallelSetup(tp_size=tp_base,
pp_size=pp_base,
eager_mode=False,
chunked_prefill=False),
eager_mode=False),
ParallelSetup(tp_size=tp_base,
pp_size=2 * pp_base,
eager_mode=False,
chunked_prefill=True),
eager_mode=False),
ParallelSetup(tp_size=tp_base,
pp_size=2 * pp_base,
eager_mode=True,
chunked_prefill=False),
eager_mode=True),
ParallelSetup(tp_size=2 * tp_base,
pp_size=pp_base,
eager_mode=False,
chunked_prefill=True),
eager_mode=False),
ParallelSetup(tp_size=2 * tp_base,
pp_size=pp_base,
eager_mode=True,
chunked_prefill=False),
eager_mode=True),
],
distributed_backends=["mp", "mp", "ray", "ray"],
vllm_major_versions=["0", "1", "0", "1"],
distributed_backends=["mp", "ray"],
runner=runner,
test_options=PPTestOptions(multi_node_only=multi_node_only,
load_format=load_format),
Expand All @@ -118,17 +86,14 @@ def fast(
multi_node_only: bool = False,
load_format: Optional[str] = None,
):
vllm_major_versions = ["1"] if runner == "pooling" else ["0"]

return PPTestSettings(
parallel_setups=[
ParallelSetup(tp_size=tp_base,
pp_size=pp_base,
eager_mode=True,
chunked_prefill=False),
eager_mode=True),
],
distributed_backends=["mp"],
vllm_major_versions=vllm_major_versions,
runner=runner,
test_options=PPTestOptions(multi_node_only=multi_node_only,
load_format=load_format),
Expand All @@ -138,10 +103,8 @@ def iter_params(self, model_id: str):
opts = self.test_options

for parallel_setup in self.parallel_setups:
for backend, vllm_major_version in zip(self.distributed_backends,
self.vllm_major_versions):
yield (model_id, parallel_setup, backend, vllm_major_version,
self.runner, opts)
for backend in self.distributed_backends:
yield (model_id, parallel_setup, backend, self.runner, opts)


# NOTE: You can adjust tp_base and/or pp_base locally to fit the model in GPU
Expand Down Expand Up @@ -269,7 +232,6 @@ def _compare_tp(
model_id: str,
parallel_setup: ParallelSetup,
distributed_backend: str,
vllm_major_version: str,
runner: RunnerOption,
test_options: PPTestOptions,
num_gpus_available: int,
Expand All @@ -281,7 +243,6 @@ def _compare_tp(
tp_size,
pp_size,
eager_mode,
chunked_prefill,
) = parallel_setup

multi_node_only, load_format = test_options
Expand Down Expand Up @@ -334,8 +295,6 @@ def _compare_tp(
"--max-num-seqs",
"8",
]
if chunked_prefill:
common_args.append("--enable-chunked-prefill")
if eager_mode:
common_args.append("--enforce-eager")
if runner != "auto":
Expand All @@ -353,32 +312,26 @@ def _compare_tp(
if max_num_seqs:
common_args.extend(["--max-num-seqs", f"{max_num_seqs}"])

specific_case = tp_size == 2 and pp_size == 2 and chunked_prefill
testing_ray_compiled_graph = False
if distributed_backend == "ray" and (vllm_major_version == "1"
or specific_case):
if distributed_backend == "ray":
# For V1, test Ray Compiled Graph for all the tests
# For V0, test Ray Compiled Graph for a subset of the tests
pp_env = {
"VLLM_USE_V1": vllm_major_version,
"VLLM_USE_V1": "1",
"VLLM_USE_RAY_COMPILED_DAG": "1",
"VLLM_USE_RAY_SPMD_WORKER": "1",
"VLLM_USE_RAY_COMPILED_DAG_NCCL_CHANNEL": "1",
}
# Temporary. Currently when zeromq + SPMD is used, it does not properly
# terminate because of a Ray Compiled Graph issue.
common_args.append("--disable-frontend-multiprocessing")
testing_ray_compiled_graph = True
elif distributed_backend == "mp":
# Both V0/V1 of multiprocessing executor support PP
pp_env = {
"VLLM_USE_V1": vllm_major_version,
"VLLM_USE_V1": "1",
}
else:
pp_env = None

tp_env = {
"VLLM_USE_V1": vllm_major_version,
"VLLM_USE_V1": "1",
}

pp_args = [
Expand All @@ -404,25 +357,17 @@ def _compare_tp(
"mp",
]

try:
compare_two_settings(model_id,
pp_args,
tp_args,
pp_env,
tp_env,
method=method)
except Exception:
if testing_ray_compiled_graph and vllm_major_version == "0":
# Ray Compiled Graph tests are flaky for V0,
# so we don't want to fail the test
logger.exception("Ray Compiled Graph tests failed")
else:
raise
compare_two_settings(model_id,
pp_args,
tp_args,
pp_env,
tp_env,
method=method)


@pytest.mark.parametrize(
("model_id", "parallel_setup", "distributed_backend", "vllm_major_version",
"runner", "test_options"),
("model_id", "parallel_setup", "distributed_backend", "runner",
"test_options"),
[
params for model_id, settings in TEXT_GENERATION_MODELS.items()
for params in settings.iter_params(model_id) if model_id in TEST_MODELS
Expand All @@ -433,15 +378,14 @@ def test_tp_language_generation(
model_id: str,
parallel_setup: ParallelSetup,
distributed_backend: str,
vllm_major_version: str,
runner: RunnerOption,
test_options: PPTestOptions,
num_gpus_available,
):
pytest.skip("Skipping the test until V1 passes it.")
_compare_tp(model_id,
parallel_setup,
distributed_backend,
vllm_major_version,
runner,
test_options,
num_gpus_available,
Expand All @@ -450,8 +394,8 @@ def test_tp_language_generation(


@pytest.mark.parametrize(
("model_id", "parallel_setup", "distributed_backend", "vllm_major_version",
"runner", "test_options"),
("model_id", "parallel_setup", "distributed_backend", "runner",
"test_options"),
[
params for model_id, settings in EMBEDDING_MODELS.items()
for params in settings.iter_params(model_id) if model_id in TEST_MODELS
Expand All @@ -462,15 +406,14 @@ def test_tp_language_embedding(
model_id: str,
parallel_setup: ParallelSetup,
distributed_backend: str,
vllm_major_version: str,
runner: RunnerOption,
test_options: PPTestOptions,
num_gpus_available,
):
pytest.skip("Skipping the test until V1 passes it.")
_compare_tp(model_id,
parallel_setup,
distributed_backend,
vllm_major_version,
runner,
test_options,
num_gpus_available,
Expand All @@ -479,8 +422,8 @@ def test_tp_language_embedding(


@pytest.mark.parametrize(
("model_id", "parallel_setup", "distributed_backend", "vllm_major_version",
"runner", "test_options"),
("model_id", "parallel_setup", "distributed_backend", "runner",
"test_options"),
[
params for model_id, settings in MULTIMODAL_MODELS.items()
for params in settings.iter_params(model_id) if model_id in TEST_MODELS
Expand All @@ -491,15 +434,14 @@ def test_tp_multimodal_generation(
model_id: str,
parallel_setup: ParallelSetup,
distributed_backend: str,
vllm_major_version: str,
runner: RunnerOption,
test_options: PPTestOptions,
num_gpus_available,
):
pytest.skip("Skipping the test until V1 passes it.")
_compare_tp(model_id,
parallel_setup,
distributed_backend,
vllm_major_version,
runner,
test_options,
num_gpus_available,
Expand Down