diff --git a/torchx/schedulers/local_scheduler.py b/torchx/schedulers/local_scheduler.py index 4c609de8e..68a788860 100644 --- a/torchx/schedulers/local_scheduler.py +++ b/torchx/schedulers/local_scheduler.py @@ -102,9 +102,9 @@ class ReplicaParam: env: Dict[str, str] # IO stream files - stdout: Optional[str] - stderr: Optional[str] - combined: Optional[str] + stdout: Optional[str] = None + stderr: Optional[str] = None + combined: Optional[str] = None cwd: Optional[str] = None @@ -592,6 +592,13 @@ def run_opts(self) -> runopts: help="if set, prepends CWD to replica's PATH env var" " making any binaries in CWD take precedence over those in PATH", ) + opts.add( + "auto_set_cuda_visible_devices", + type_=bool, + default=False, + help="sets the `CUDA_AVAILABLE_DEVICES` for roles that request GPU resources." + " Each role replica will be assigned one GPU. Does nothing if the device count is less than replicas.", + ) return opts def _validate(self, app: AppDef, scheduler: SchedulerBackend) -> None: @@ -770,6 +777,66 @@ def _submit_dryrun( request, lambda p: pprint.pformat(asdict(p), indent=2, width=80) ) + def _get_gpu_device_count(self) -> int: + gpu_cmd = "nvidia-smi -L" + try: + log.debug(f"Running {gpu_cmd}") + result = subprocess.run( + gpu_cmd.split(), capture_output=True, text=True, check=True + ) + log.debug(f"Cmd {gpu_cmd} returned: {result}") + gpus_info = [gpu_info for gpu_info in result.stdout.split("\n") if gpu_info] + return len(gpus_info) + except subprocess.CalledProcessError as e: + log.exception(f"Got exception while listing GPUs {e.stderr}") + return 0 + + def _set_cuda_visible_devices_for_role_replica( + self, + replica: ReplicaParam, + replica_id: int, + requested_gpus: int, + role_gpu_start_idx: int, + ) -> None: + if requested_gpus <= 0: + return + start_device = role_gpu_start_idx + requested_gpus * replica_id + end_device = role_gpu_start_idx + requested_gpus * (replica_id + 1) + devices = list(range(start_device, end_device)) + visible_devices = ",".join([str(device) for device in devices]) + replica.env["CUDA_VISIBLE_DEVICES"] = visible_devices + + def _update_env_cuda_visible_devices( + self, + role_params: Dict[str, List[ReplicaParam]], + app: AppDef, + cfg: Mapping[str, CfgVal], + ) -> None: + device_count = 0 + autoset = cfg.get("auto_set_cuda_visible_devices") + if not autoset: + return + requested_gpus_total = sum( + [role.resource.gpu * role.num_replicas for role in app.roles] + ) + if requested_gpus_total <= 0: + return + device_count = self._get_gpu_device_count() + if requested_gpus_total > device_count: + autoset = False + log.warning( + "Cannot set `CUDA_VISIBLE_DEVICES` due to " + f"Available GPUs {device_count} less than requested {requested_gpus_total}" + ) + role_gpu_start_idx = 0 + for role in app.roles: + role_replicas = role_params[role.name] + for replica_id, replica in enumerate(role_replicas): + self._set_cuda_visible_devices_for_role_replica( + replica, replica_id, role.resource.gpu, role_gpu_start_idx + ) + role_gpu_start_idx += role.resource.gpu * role.num_replicas + def _to_popen_request( self, app: AppDef, @@ -785,6 +852,7 @@ def _to_popen_request( role_params: Dict[str, List[ReplicaParam]] = {} role_log_dirs: Dict[str, List[str]] = {} + for role in app.roles: replica_params = role_params.setdefault(role.name, []) replica_log_dirs = role_log_dirs.setdefault(role.name, []) @@ -798,8 +866,8 @@ def _to_popen_request( replica_id=str(replica_id), ) replica_role = values.apply(role) - replica_log_dir = os.path.join(app_log_dir, role.name, str(replica_id)) + replica_log_dir = os.path.join(app_log_dir, role.name, str(replica_id)) if "TORCHELASTIC_ERROR_FILE" not in replica_role.env: # this is the top level (agent if using elastic role) error file # a.k.a scheduler reply file @@ -818,7 +886,7 @@ def _to_popen_request( ) ) replica_log_dirs.append(replica_log_dir) - + self._update_env_cuda_visible_devices(role_params, app, cfg) return PopenRequest(app_id, app_log_dir, role_params, role_log_dirs) def describe(self, app_id: str) -> Optional[DescribeAppResponse]: diff --git a/torchx/schedulers/test/local_scheduler_test.py b/torchx/schedulers/test/local_scheduler_test.py index 9223fa144..2ae74a761 100644 --- a/torchx/schedulers/test/local_scheduler_test.py +++ b/torchx/schedulers/test/local_scheduler_test.py @@ -11,10 +11,12 @@ import os import shutil import signal +import subprocess import tempfile import time import unittest from contextlib import contextmanager +from dataclasses import dataclass from datetime import datetime from os.path import join from typing import Callable, Generator, Optional @@ -32,7 +34,7 @@ join_PATH, make_unique, ) -from torchx.specs.api import AppDef, AppState, Role, is_terminal, macros +from torchx.specs.api import AppDef, AppState, Role, is_terminal, macros, Resource from .test_util import write_shell_script @@ -828,6 +830,114 @@ def test_close_twice(self) -> None: self.scheduler.close() # nothing to validate just make sure no errors are raised + def test_get_gpu_device_count(self) -> None: + @dataclass + class ProcResult: + stdout: str + + nvidia_smi_out = ( + "GPU 0: Tesla V100-SXM2-16GB (UUID: GPU-196a22c5-717b-66db-0acc-58cde6f3df85)\n" + "GPU 1: Tesla V100-SXM2-16GB (UUID: GPU-45e9165d-4f7e-d954-7ff5-481bc2c0ec7b)\n" + "GPU 2: Tesla V100-SXM2-16GB (UUID: GPU-26e22503-5fd5-8f55-d068-e1714fbb6fd6)\n" + "GPU 3: Tesla V100-SXM2-16GB (UUID: GPU-ebfc20c7-5f1a-1bc9-0d98-601cbe21fc2d)\n" + ) + + stdout = nvidia_smi_out + result = ProcResult(stdout) + with patch("subprocess.run", return_value=result): + gpu_count = self.scheduler._get_gpu_device_count() + self.assertEqual(4, gpu_count) + + def test_get_gpu_device_count_error(self) -> None: + error = subprocess.CalledProcessError( + returncode=2, + cmd="", + output="", + stderr="", + ) + with patch("subprocess.run", side_effect=error): + gpu_count = self.scheduler._get_gpu_device_count() + self.assertEqual(0, gpu_count) + + def test_set_cuda_visible_devices_for_role_replica(self) -> None: + replica_param1 = ReplicaParam( + args=["a", "b"], + env={}, + cwd="/home/bob", + ) + replica_param2 = ReplicaParam( + args=["a", "b"], + env={}, + cwd="/home/bob", + ) + self.scheduler._set_cuda_visible_devices_for_role_replica( + replica_param1, 0, 4, 0 + ) + self.assertEqual("0,1,2,3", replica_param1.env["CUDA_VISIBLE_DEVICES"]) + self.scheduler._set_cuda_visible_devices_for_role_replica( + replica_param2, 1, 8, 4 + ) + # start gpu is 4(request_gpu_start=4) + 8(replica_id=1) + self.assertEqual( + "12,13,14,15,16,17,18,19", replica_param2.env["CUDA_VISIBLE_DEVICES"] + ) + + def test_get_cuda_devices_is_set(self) -> None: + with patch.object(self.scheduler, "_get_gpu_device_count", return_value=16): + appdef = AppDef( + name="role1", + roles=[ + Role( + name="role1", + image=self.test_dir, + entrypoint="train", + resource=Resource(gpu=2, cpu=0, memMB=0), + num_replicas=2, + ), + Role( + name="role2", + image=self.test_dir, + entrypoint="train", + resource=Resource(gpu=3, cpu=0, memMB=0), + num_replicas=2, + ), + ], + ) + popen_req = self.scheduler._to_popen_request( + appdef, {"auto_set_cuda_visible_devices": True} + ) + role1_params = popen_req.role_params["role1"] + self.assertEqual(2, len(role1_params)) + self.assertEqual("0,1", role1_params[0].env["CUDA_VISIBLE_DEVICES"]) + self.assertEqual("2,3", role1_params[1].env["CUDA_VISIBLE_DEVICES"]) + role2_params = popen_req.role_params["role2"] + self.assertEqual(2, len(role2_params)) + self.assertEqual("4,5,6", role2_params[0].env["CUDA_VISIBLE_DEVICES"]) + self.assertEqual("7,8,9", role2_params[1].env["CUDA_VISIBLE_DEVICES"]) + + def test_get_cuda_devices_not_set(self) -> None: + with patch.object(self.scheduler, "_get_gpu_device_count", return_value=8): + trainer1 = AppDef( + name="trainer1", + roles=[ + Role( + name="trainer1", + image=self.test_dir, + entrypoint="trainer1.sh", + resource=Resource(gpu=4, cpu=0, memMB=0), + num_replicas=4, + ) + ], + ) + + popen_req = self.scheduler._to_popen_request(trainer1, {}) + role_params = popen_req.role_params["trainer1"] + self.assertEqual(4, len(role_params)) + self.assertFalse("CUDA_VISIBLE_DEVICES" in role_params[0].env) + self.assertFalse("CUDA_VISIBLE_DEVICES" in role_params[1].env) + self.assertFalse("CUDA_VISIBLE_DEVICES" in role_params[2].env) + self.assertFalse("CUDA_VISIBLE_DEVICES" in role_params[3].env) + def test_no_orphan_process_function(self) -> None: self._test_orphan_workflow() @@ -839,6 +949,9 @@ def _test_orphan_workflow(self) -> None: target=start_sleep_processes, args=(self.test_dir, mp_queue, child_nproc) ) proc.start() + # Before querying the queue we need to wait + # Otherwise we will get `FileNotFoundError: [Errno 2] No such file or directory` error + time.sleep(10) total_processes = child_nproc + 1 pids = [] for _ in range(total_processes):