2626
2727from .command_line_tool import CallbackJob , ExpressionJob
2828from .context import RuntimeContext , getdefault
29+ from .cuda import cuda_version_and_device_count
2930from .cwlprov .provenance_profile import ProvenanceProfile
3031from .errors import WorkflowException
3132from .job import JobBase
@@ -269,16 +270,22 @@ def __init__(self) -> None:
269270
270271 self .max_ram = int (psutil .virtual_memory ().available / 2 ** 20 )
271272 self .max_cores = float (psutil .cpu_count ())
273+ self .max_cuda = cuda_version_and_device_count ()[1 ]
272274 self .allocated_ram = float (0 )
273275 self .allocated_cores = float (0 )
276+ self .allocated_cuda : int = 0
274277
275278 def select_resources (
276279 self , request : Dict [str , Union [int , float ]], runtime_context : RuntimeContext
277280 ) -> Dict [str , Union [int , float ]]: # pylint: disable=unused-argument
278281 """Naïve check for available cpu cores and memory."""
279282 result : Dict [str , Union [int , float ]] = {}
280283 maxrsc = {"cores" : self .max_cores , "ram" : self .max_ram }
281- for rsc in ("cores" , "ram" ):
284+ resources_types = {"cores" , "ram" }
285+ if "cudaDeviceCountMin" in request or "cudaDeviceCountMax" in request :
286+ maxrsc ["cudaDeviceCount" ] = self .max_cuda
287+ resources_types .add ("cudaDeviceCount" )
288+ for rsc in resources_types :
282289 rsc_min = request [rsc + "Min" ]
283290 if rsc_min > maxrsc [rsc ]:
284291 raise WorkflowException (
@@ -293,9 +300,6 @@ def select_resources(
293300 result ["tmpdirSize" ] = math .ceil (request ["tmpdirMin" ])
294301 result ["outdirSize" ] = math .ceil (request ["outdirMin" ])
295302
296- if "cudaDeviceCount" in request :
297- result ["cudaDeviceCount" ] = request ["cudaDeviceCount" ]
298-
299303 return result
300304
301305 def _runner (
@@ -326,6 +330,10 @@ def _runner(
326330 self .allocated_ram -= ram
327331 cores = job .builder .resources ["cores" ]
328332 self .allocated_cores -= cores
333+ cudaDevices : int = cast (
334+ int , job .builder .resources .get ("cudaDeviceCount" , 0 )
335+ )
336+ self .allocated_cuda -= cudaDevices
329337 runtime_context .workflow_eval_lock .notify_all ()
330338
331339 def run_job (
@@ -349,34 +357,43 @@ def run_job(
349357 if isinstance (job , JobBase ):
350358 ram = job .builder .resources ["ram" ]
351359 cores = job .builder .resources ["cores" ]
352- if ram > self .max_ram or cores > self .max_cores :
360+ cudaDevices = cast (int , job .builder .resources .get ("cudaDeviceCount" , 0 ))
361+ if ram > self .max_ram or cores > self .max_cores or cudaDevices > self .max_cuda :
353362 _logger .error (
354363 'Job "%s" cannot be run, requests more resources (%s) '
355- "than available on this host (max ram %d, max cores %d" ,
364+ "than available on this host (already allocated ram is %d, "
365+ "allocated cores is %d, allocated CUDA is %d, "
366+ "max ram %d, max cores %d, max CUDA %d)." ,
356367 job .name ,
357368 job .builder .resources ,
358369 self .allocated_ram ,
359370 self .allocated_cores ,
371+ self .allocated_cuda ,
360372 self .max_ram ,
361373 self .max_cores ,
374+ self .max_cuda ,
362375 )
363376 self .pending_jobs .remove (job )
364377 return
365378
366379 if (
367380 self .allocated_ram + ram > self .max_ram
368381 or self .allocated_cores + cores > self .max_cores
382+ or self .allocated_cuda + cudaDevices > self .max_cuda
369383 ):
370384 _logger .debug (
371385 'Job "%s" cannot run yet, resources (%s) are not '
372386 "available (already allocated ram is %d, allocated cores is %d, "
373- "max ram %d, max cores %d" ,
387+ "allocated CUDA devices is %d, "
388+ "max ram %d, max cores %d, max CUDA %d)." ,
374389 job .name ,
375390 job .builder .resources ,
376391 self .allocated_ram ,
377392 self .allocated_cores ,
393+ self .allocated_cuda ,
378394 self .max_ram ,
379395 self .max_cores ,
396+ self .max_cuda ,
380397 )
381398 n += 1
382399 continue
@@ -386,6 +403,8 @@ def run_job(
386403 self .allocated_ram += ram
387404 cores = job .builder .resources ["cores" ]
388405 self .allocated_cores += cores
406+ cuda = cast (int , job .builder .resources .get ("cudaDevices" , 0 ))
407+ self .allocated_cuda += cuda
389408 self .taskqueue .add (
390409 functools .partial (self ._runner , job , runtime_context , TMPDIR_LOCK ),
391410 runtime_context .workflow_eval_lock ,
0 commit comments