diff --git a/cwltool/docker.py b/cwltool/docker.py index a92d59feb..64097b0ae 100644 --- a/cwltool/docker.py +++ b/cwltool/docker.py @@ -2,9 +2,9 @@ import csv import datetime +import json import math import os -import re import shutil import subprocess # nosec import sys @@ -113,35 +113,18 @@ def get_image( if docker_requirement["dockerImageId"] in _IMAGES: return True - for line in ( - subprocess.check_output([self.docker_exec, "images", "--no-trunc", "--all"]) # nosec - .decode("utf-8") - .splitlines() - ): + docker_image_id = docker_requirement.get("dockerImageId") + if docker_image_id is not None: try: - match = re.match(r"^([^ ]+)\s+([^ ]+)\s+([^ ]+)", line) - split = docker_requirement["dockerImageId"].split(":") - if len(split) == 1: - split.append("latest") - elif len(split) == 2: - # if split[1] doesn't match valid tag names, it is a part of repository - if not re.match(r"[\w][\w.-]{0,127}", split[1]): - split[0] = split[0] + ":" + split[1] - split[1] = "latest" - elif len(split) == 3: - if re.match(r"[\w][\w.-]{0,127}", split[2]): - split[0] = split[0] + ":" + split[1] - split[1] = split[2] - del split[2] - - # check for repository:tag match or image id match - if match and ( - (split[0] == match.group(1) and split[1] == match.group(2)) - or docker_requirement["dockerImageId"] == match.group(3) - ): - found = True - break - except ValueError: + manifest = json.loads( + subprocess.check_output( + [self.docker_exec, "inspect", docker_image_id] + ).decode( # nosec + "utf-8" + ) + ) + found = manifest is not None + except (OSError, subprocess.CalledProcessError, UnicodeError): pass if (force_pull or not found) and pull_image: diff --git a/cwltool/job.py b/cwltool/job.py index da3d2388e..9498f0af0 100644 --- a/cwltool/job.py +++ b/cwltool/job.py @@ -510,17 +510,26 @@ def process_monitor(self, sproc: "subprocess.Popen[str]") -> None: # Value must be list rather than integer to utilise pass-by-reference in python memory_usage: MutableSequence[Optional[int]] = [None] + mem_tm: "Optional[Timer]" = None + def get_tree_mem_usage(memory_usage: MutableSequence[Optional[int]]) -> None: - children = monitor.children() try: - rss = monitor.memory_info().rss - while len(children): - rss += sum(process.memory_info().rss for process in children) - children = list(itertools.chain(*(process.children() for process in children))) - if memory_usage[0] is None or rss > memory_usage[0]: - memory_usage[0] = rss + with monitor.oneshot(): + children = monitor.children() + rss = monitor.memory_info().rss + while len(children): + rss += sum(process.memory_info().rss for process in children) + children = list( + itertools.chain(*(process.children() for process in children)) + ) + if memory_usage[0] is None or rss > memory_usage[0]: + memory_usage[0] = rss + mem_tm = Timer(interval=1, function=get_tree_mem_usage, args=(memory_usage,)) + mem_tm.daemon = True + mem_tm.start() except psutil.NoSuchProcess: - mem_tm.cancel() + if mem_tm is not None: + mem_tm.cancel() mem_tm = Timer(interval=1, function=get_tree_mem_usage, args=(memory_usage,)) mem_tm.daemon = True @@ -748,16 +757,6 @@ def run( img_id = str(docker_req["dockerImageId"]) elif "dockerPull" in docker_req: img_id = str(docker_req["dockerPull"]) - cmd = [user_space_docker_cmd, "pull", img_id] - _logger.info(str(cmd)) - try: - subprocess.check_call(cmd, stdout=sys.stderr) # nosec - except OSError as exc: - raise SourceLine(docker_req, None, WorkflowException, debug).makeError( - f"Either Docker container {img_id} is not available with " - f"user space docker implementation {user_space_docker_cmd} " - f" or {user_space_docker_cmd} is missing or broken." - ) from exc else: raise SourceLine(docker_req, None, WorkflowException, debug).makeError( "Docker image must be specified as 'dockerImageId' or " @@ -1039,6 +1038,26 @@ def terminate(): # type: () -> None if sproc.stdin is not None: sproc.stdin.close() + tm = None + if timelimit is not None and timelimit > 0: + + def terminate(): # type: () -> None + try: + _logger.warning( + "[job %s] exceeded time limit of %d seconds and will be terminated", + name, + timelimit, + ) + sproc.terminate() + except OSError: + pass + + tm = Timer(timelimit, terminate) + tm.daemon = True + tm.start() + if monitor_function: + monitor_function(sproc) + rcode = sproc.wait() return rcode diff --git a/tests/test_examples.py b/tests/test_examples.py index 48a792427..9a5b5ada8 100644 --- a/tests/test_examples.py +++ b/tests/test_examples.py @@ -1473,7 +1473,9 @@ def test_bad_userspace_runtime(factor: str) -> None: ) error_code, stdout, stderr = get_main_output(commands) stderr = re.sub(r"\s\s+", " ", stderr) - assert "or quaquioN is missing or broken" in stderr, stderr + assert ("or quaquioN is missing or broken" in stderr) or ( + "No such file or directory: 'quaquioN'" in stderr + ), stderr assert error_code == 1 diff --git a/tests/wf/timelimit.cwl b/tests/wf/timelimit.cwl index e15ebaddf..7af0d1dff 100644 --- a/tests/wf/timelimit.cwl +++ b/tests/wf/timelimit.cwl @@ -11,5 +11,5 @@ inputs: outputs: [] requirements: cwltool:TimeLimit: - timelimit: 15 + timelimit: 20 baseCommand: sleep