Skip to content
41 changes: 12 additions & 29 deletions cwltool/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

import csv
import datetime
import json
import math
import os
import re
import shutil
import subprocess # nosec
import sys
Expand Down Expand Up @@ -113,35 +113,18 @@ def get_image(
if docker_requirement["dockerImageId"] in _IMAGES:
return True

for line in (
subprocess.check_output([self.docker_exec, "images", "--no-trunc", "--all"]) # nosec
.decode("utf-8")
.splitlines()
):
docker_image_id = docker_requirement.get("dockerImageId")
if docker_image_id is not None:
try:
match = re.match(r"^([^ ]+)\s+([^ ]+)\s+([^ ]+)", line)
split = docker_requirement["dockerImageId"].split(":")
if len(split) == 1:
split.append("latest")
elif len(split) == 2:
# if split[1] doesn't match valid tag names, it is a part of repository
if not re.match(r"[\w][\w.-]{0,127}", split[1]):
split[0] = split[0] + ":" + split[1]
split[1] = "latest"
elif len(split) == 3:
if re.match(r"[\w][\w.-]{0,127}", split[2]):
split[0] = split[0] + ":" + split[1]
split[1] = split[2]
del split[2]

# check for repository:tag match or image id match
if match and (
(split[0] == match.group(1) and split[1] == match.group(2))
or docker_requirement["dockerImageId"] == match.group(3)
):
found = True
break
except ValueError:
manifest = json.loads(
subprocess.check_output(
[self.docker_exec, "inspect", docker_image_id]
).decode( # nosec
"utf-8"
)
)
found = manifest is not None
except (OSError, subprocess.CalledProcessError, UnicodeError):
pass

if (force_pull or not found) and pull_image:
Expand Down
55 changes: 37 additions & 18 deletions cwltool/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -510,17 +510,26 @@ def process_monitor(self, sproc: "subprocess.Popen[str]") -> None:
# Value must be list rather than integer to utilise pass-by-reference in python
memory_usage: MutableSequence[Optional[int]] = [None]

mem_tm: "Optional[Timer]" = None

def get_tree_mem_usage(memory_usage: MutableSequence[Optional[int]]) -> None:
children = monitor.children()
try:
rss = monitor.memory_info().rss
while len(children):
rss += sum(process.memory_info().rss for process in children)
children = list(itertools.chain(*(process.children() for process in children)))
if memory_usage[0] is None or rss > memory_usage[0]:
memory_usage[0] = rss
with monitor.oneshot():
children = monitor.children()
rss = monitor.memory_info().rss
while len(children):
rss += sum(process.memory_info().rss for process in children)
children = list(
itertools.chain(*(process.children() for process in children))
)
if memory_usage[0] is None or rss > memory_usage[0]:
memory_usage[0] = rss
mem_tm = Timer(interval=1, function=get_tree_mem_usage, args=(memory_usage,))
mem_tm.daemon = True
mem_tm.start()
except psutil.NoSuchProcess:
mem_tm.cancel()
if mem_tm is not None:
mem_tm.cancel()

mem_tm = Timer(interval=1, function=get_tree_mem_usage, args=(memory_usage,))
mem_tm.daemon = True
Expand Down Expand Up @@ -748,16 +757,6 @@ def run(
img_id = str(docker_req["dockerImageId"])
elif "dockerPull" in docker_req:
img_id = str(docker_req["dockerPull"])
cmd = [user_space_docker_cmd, "pull", img_id]
_logger.info(str(cmd))
try:
subprocess.check_call(cmd, stdout=sys.stderr) # nosec
except OSError as exc:
raise SourceLine(docker_req, None, WorkflowException, debug).makeError(
f"Either Docker container {img_id} is not available with "
f"user space docker implementation {user_space_docker_cmd} "
f" or {user_space_docker_cmd} is missing or broken."
) from exc
else:
raise SourceLine(docker_req, None, WorkflowException, debug).makeError(
"Docker image must be specified as 'dockerImageId' or "
Expand Down Expand Up @@ -1039,6 +1038,26 @@ def terminate(): # type: () -> None
if sproc.stdin is not None:
sproc.stdin.close()

tm = None
if timelimit is not None and timelimit > 0:

def terminate(): # type: () -> None
try:
_logger.warning(
"[job %s] exceeded time limit of %d seconds and will be terminated",
name,
timelimit,
)
sproc.terminate()
except OSError:
pass

tm = Timer(timelimit, terminate)
tm.daemon = True
tm.start()
if monitor_function:
monitor_function(sproc)

rcode = sproc.wait()

return rcode
Expand Down
4 changes: 3 additions & 1 deletion tests/test_examples.py
Original file line number Diff line number Diff line change
Expand Up @@ -1473,7 +1473,9 @@ def test_bad_userspace_runtime(factor: str) -> None:
)
error_code, stdout, stderr = get_main_output(commands)
stderr = re.sub(r"\s\s+", " ", stderr)
assert "or quaquioN is missing or broken" in stderr, stderr
assert ("or quaquioN is missing or broken" in stderr) or (
"No such file or directory: 'quaquioN'" in stderr
), stderr
assert error_code == 1


Expand Down
2 changes: 1 addition & 1 deletion tests/wf/timelimit.cwl
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@ inputs:
outputs: []
requirements:
cwltool:TimeLimit:
timelimit: 15
timelimit: 20
baseCommand: sleep