Skip to content

Commit a90d10b

Browse files
mr-cndonyapour
andcommitted
Fix resource allocation for CUDA
Co-authored-by: Nazanin Donyapour <[email protected]>
1 parent 509ffb9 commit a90d10b

File tree

2 files changed

+31
-8
lines changed

2 files changed

+31
-8
lines changed

cwltool/executors.py

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626

2727
from .command_line_tool import CallbackJob, ExpressionJob
2828
from .context import RuntimeContext, getdefault
29+
from .cuda import cuda_version_and_device_count
2930
from .cwlprov.provenance_profile import ProvenanceProfile
3031
from .errors import WorkflowException
3132
from .job import JobBase
@@ -269,16 +270,22 @@ def __init__(self) -> None:
269270

270271
self.max_ram = int(psutil.virtual_memory().available / 2**20)
271272
self.max_cores = float(psutil.cpu_count())
273+
self.max_cuda = cuda_version_and_device_count()[1]
272274
self.allocated_ram = float(0)
273275
self.allocated_cores = float(0)
276+
self.allocated_cuda: int = 0
274277

275278
def select_resources(
276279
self, request: Dict[str, Union[int, float]], runtime_context: RuntimeContext
277280
) -> Dict[str, Union[int, float]]: # pylint: disable=unused-argument
278281
"""Naïve check for available cpu cores and memory."""
279282
result: Dict[str, Union[int, float]] = {}
280283
maxrsc = {"cores": self.max_cores, "ram": self.max_ram}
281-
for rsc in ("cores", "ram"):
284+
resources_types = {"cores", "ram", "cudaDeviceCount"}
285+
if "cudaDeviceCountMin" in request or "cudaDeviceCountMax" in request:
286+
maxrsc["cudaDeviceCount"] = self.max_cuda
287+
resources_types.add("cudaDeviceCount")
288+
for rsc in resources_types:
282289
rsc_min = request[rsc + "Min"]
283290
if rsc_min > maxrsc[rsc]:
284291
raise WorkflowException(
@@ -293,9 +300,6 @@ def select_resources(
293300
result["tmpdirSize"] = math.ceil(request["tmpdirMin"])
294301
result["outdirSize"] = math.ceil(request["outdirMin"])
295302

296-
if "cudaDeviceCount" in request:
297-
result["cudaDeviceCount"] = request["cudaDeviceCount"]
298-
299303
return result
300304

301305
def _runner(
@@ -326,6 +330,10 @@ def _runner(
326330
self.allocated_ram -= ram
327331
cores = job.builder.resources["cores"]
328332
self.allocated_cores -= cores
333+
cudaDevices: int = cast(
334+
int, job.builder.resources.get("cudaDeviceCount", 0)
335+
)
336+
self.allocated_cuda -= cudaDevices
329337
runtime_context.workflow_eval_lock.notify_all()
330338

331339
def run_job(
@@ -349,34 +357,46 @@ def run_job(
349357
if isinstance(job, JobBase):
350358
ram = job.builder.resources["ram"]
351359
cores = job.builder.resources["cores"]
352-
if ram > self.max_ram or cores > self.max_cores:
360+
if "cudaDeviceCount" in job.builder.resources:
361+
cudaDevices = job.builder.resources["cudaDeviceCount"]
362+
else:
363+
cudaDevices = 0
364+
if ram > self.max_ram or cores > self.max_cores or cudaDevices > self.max_cuda:
353365
_logger.error(
354366
'Job "%s" cannot be run, requests more resources (%s) '
355-
"than available on this host (max ram %d, max cores %d",
367+
"than available on this host (already allocated ram is %d, "
368+
"allocated cores is %d, allocated CUDA is %d, "
369+
"max ram %d, max cores %d, max CUDA %d).",
356370
job.name,
357371
job.builder.resources,
358372
self.allocated_ram,
359373
self.allocated_cores,
374+
self.allocated_cuda,
360375
self.max_ram,
361376
self.max_cores,
377+
self.max_cuda,
362378
)
363379
self.pending_jobs.remove(job)
364380
return
365381

366382
if (
367383
self.allocated_ram + ram > self.max_ram
368384
or self.allocated_cores + cores > self.max_cores
385+
or self.allocated_cuda + cudaDevices > self.max_cuda
369386
):
370387
_logger.debug(
371388
'Job "%s" cannot run yet, resources (%s) are not '
372389
"available (already allocated ram is %d, allocated cores is %d, "
373-
"max ram %d, max cores %d",
390+
"allocated CUDA devices is %d, "
391+
"max ram %d, max cores %d, max CUDA %d).",
374392
job.name,
375393
job.builder.resources,
376394
self.allocated_ram,
377395
self.allocated_cores,
396+
self.allocated_cuda,
378397
self.max_ram,
379398
self.max_cores,
399+
self.max_cuda,
380400
)
381401
n += 1
382402
continue
@@ -386,6 +406,8 @@ def run_job(
386406
self.allocated_ram += ram
387407
cores = job.builder.resources["cores"]
388408
self.allocated_cores += cores
409+
cuda = cast(int, job.builder.resources.get("cudaDevices", 0))
410+
self.allocated_cuda += cuda
389411
self.taskqueue.add(
390412
functools.partial(self._runner, job, runtime_context, TMPDIR_LOCK),
391413
runtime_context.workflow_eval_lock,

cwltool/process.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -980,7 +980,8 @@ def evalResources(
980980
):
981981
if rsc is None:
982982
continue
983-
mn = mx = None # type: Optional[Union[int, float]]
983+
mn: Optional[Union[int, float]] = None
984+
mx: Optional[Union[int, float]] = None
984985
if rsc.get(a + "Min"):
985986
with SourceLine(rsc, f"{a}Min", WorkflowException, runtimeContext.debug):
986987
mn = cast(

0 commit comments

Comments
 (0)