Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions cwltool/argparser.py
Original file line number Diff line number Diff line change
Expand Up @@ -656,8 +656,10 @@ def arg_parser() -> argparse.ArgumentParser:
default=None,
help="Only executes the underlying Process (CommandLineTool, "
"ExpressionTool, or sub-Workflow) for the given step in a workflow. "
"This will not include any step-level processing: scatter, when, no "
"processing of step-level default, or valueFrom input modifiers. "
"This will not include any step-level processing: 'scatter', 'when'; "
"and there will be no processing of step-level 'default', or 'valueFrom' "
"input modifiers. However, requirements/hints from the step or parent "
"workflow(s) will be inherited as usual."
"The input object must match that Process's inputs.",
)

Expand Down
22 changes: 1 addition & 21 deletions cwltool/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
MutableSequence,
Optional,
Set,
Tuple,
Union,
cast,
)
Expand All @@ -36,6 +35,7 @@
CONTENT_LIMIT,
CWLObjectType,
CWLOutputType,
HasReqsHints,
aslist,
get_listing,
normalizeFilesDirs,
Expand Down Expand Up @@ -134,26 +134,6 @@ def check_format(
)


class HasReqsHints:
"""Base class for get_requirement()."""

def __init__(self) -> None:
"""Initialize this reqs decorator."""
self.requirements = [] # type: List[CWLObjectType]
self.hints = [] # type: List[CWLObjectType]

def get_requirement(
self, feature: str
) -> Tuple[Optional[CWLObjectType], Optional[bool]]:
for item in reversed(self.requirements):
if item["class"] == feature:
return (item, True)
for item in reversed(self.hints):
if item["class"] == feature:
return (item, False)
return (None, None)


class Builder(HasReqsHints):
def __init__(
self,
Expand Down
4 changes: 2 additions & 2 deletions cwltool/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@
from schema_salad.utils import FetcherCallableType
from typing_extensions import TYPE_CHECKING

from .builder import Builder, HasReqsHints
from .builder import Builder
from .mpi import MpiConfig
from .mutation import MutationManager
from .pathmapper import PathMapper
from .secrets import SecretStore
from .software_requirements import DependenciesConfiguration
from .stdfsaccess import StdFsAccess
from .utils import DEFAULT_TMP_PREFIX, CWLObjectType, ResolverType
from .utils import DEFAULT_TMP_PREFIX, CWLObjectType, HasReqsHints, ResolverType

if TYPE_CHECKING:
from .process import Process
Expand Down
1 change: 0 additions & 1 deletion cwltool/cwlviewer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
"""Visualize a CWL workflow."""
import os
from pathlib import Path
from urllib.parse import urlparse

Expand Down
3 changes: 2 additions & 1 deletion cwltool/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
from typing_extensions import TYPE_CHECKING

from . import env_to_stdout, run_job
from .builder import Builder, HasReqsHints
from .builder import Builder
from .context import RuntimeContext
from .errors import UnsupportedRequirement, WorkflowException
from .loghandler import _logger
Expand All @@ -49,6 +49,7 @@
from .utils import (
CWLObjectType,
CWLOutputType,
HasReqsHints,
DirectoryType,
OutputCallbackType,
bytes2str_in_dicts,
Expand Down
137 changes: 90 additions & 47 deletions cwltool/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,11 @@
from ruamel.yaml.main import YAML
from schema_salad.exceptions import ValidationException
from schema_salad.ref_resolver import Loader, file_uri, uri_file_path
from schema_salad.sourceline import strip_dup_lineno
from schema_salad.sourceline import cmap, strip_dup_lineno
from schema_salad.utils import ContextType, FetcherCallableType, json_dumps, yaml_no_ts

from . import CWL_CONTENT_TYPES, workflow
from .argparser import arg_parser, generate_parser, get_default_args
from .builder import HasReqsHints
from .context import LoadingContext, RuntimeContext, getdefault
from .cwlrdf import printdot, printrdf
from .errors import ArgumentException, UnsupportedRequirement, WorkflowException
Expand Down Expand Up @@ -89,6 +88,7 @@
CWLObjectType,
CWLOutputAtomType,
CWLOutputType,
HasReqsHints,
adjustDirObjs,
normalizeFilesDirs,
processes_to_kill,
Expand Down Expand Up @@ -754,69 +754,95 @@ def my_represent_none(
)


def inherit_reqshints(tool: Process, parent: Process) -> None:
"""Copy down requirements and hints from ancestors of a given process."""
for parent_req in parent.requirements:
found = False
for tool_req in tool.requirements:
if parent_req["class"] == tool_req["class"]:
found = True
break
if not found:
tool.requirements.append(parent_req)
for parent_hint in parent.hints:
found = False
for tool_req in tool.requirements:
if parent_hint["class"] == tool_req["class"]:
found = True
break
if not found:
for tool_hint in tool.hints:
if parent_hint["class"] == tool_hint["class"]:
found = True
break
if not found:
tool.hints.append(parent_hint)


def choose_target(
args: argparse.Namespace,
tool: Process,
loadingContext: LoadingContext,
loading_context: LoadingContext,
) -> Optional[Process]:
"""Walk the Workflow, extract the subset matches all the args.targets."""
if loadingContext.loader is None:
raise Exception("loadingContext.loader cannot be None")
if loading_context.loader is None:
raise Exception("loading_context.loader cannot be None")

if isinstance(tool, Workflow):
url = urllib.parse.urlparse(tool.tool["id"])
if url.fragment:
extracted = get_subgraph(
[tool.tool["id"] + "/" + r for r in args.target], tool
[tool.tool["id"] + "/" + r for r in args.target], tool, loading_context
)
else:
extracted = get_subgraph(
[
loadingContext.loader.fetcher.urljoin(tool.tool["id"], "#" + r)
loading_context.loader.fetcher.urljoin(tool.tool["id"], "#" + r)
for r in args.target
],
tool,
loading_context,
)
else:
_logger.error("Can only use --target on Workflows")
return None
if isinstance(loadingContext.loader.idx, MutableMapping):
loadingContext.loader.idx[extracted["id"]] = extracted
tool = make_tool(extracted["id"], loadingContext)
if isinstance(loading_context.loader.idx, MutableMapping):
loading_context.loader.idx[extracted["id"]] = extracted
tool = make_tool(extracted["id"], loading_context)
else:
raise Exception("Missing loadingContext.loader.idx!")
raise Exception("Missing loading_context.loader.idx!")

return tool


def choose_step(
args: argparse.Namespace,
tool: Process,
loadingContext: LoadingContext,
loading_context: LoadingContext,
) -> Optional[Process]:
"""Walk the given Workflow and extract just args.single_step."""
if loadingContext.loader is None:
raise Exception("loadingContext.loader cannot be None")
if loading_context.loader is None:
raise Exception("loading_context.loader cannot be None")

if isinstance(tool, Workflow):
url = urllib.parse.urlparse(tool.tool["id"])
if url.fragment:
extracted = get_step(tool, tool.tool["id"] + "/" + args.single_step)
step_id = tool.tool["id"] + "/" + args.single_step
else:
extracted = get_step(
tool,
loadingContext.loader.fetcher.urljoin(
tool.tool["id"], "#" + args.single_step
),
step_id = loading_context.loader.fetcher.urljoin(
tool.tool["id"], "#" + args.single_step
)
extracted = get_step(tool, step_id, loading_context)
else:
_logger.error("Can only use --single-step on Workflows")
return None
if isinstance(loadingContext.loader.idx, MutableMapping):
loadingContext.loader.idx[extracted["id"]] = extracted
tool = make_tool(extracted["id"], loadingContext)
if isinstance(loading_context.loader.idx, MutableMapping):
loading_context.loader.idx[extracted["id"]] = cast(
Union[CommentedMap, CommentedSeq, str, None], cmap(extracted)
)
tool = make_tool(extracted["id"], loading_context)
else:
raise Exception("Missing loadingContext.loader.idx!")
raise Exception("Missing loading_context.loader.idx!")

return tool

Expand All @@ -826,36 +852,33 @@ def choose_process(
tool: Process,
loadingContext: LoadingContext,
) -> Optional[Process]:
"""Walk the given Workflow and extract just args.single_step."""
"""Walk the given Workflow and extract just args.single_process."""
if loadingContext.loader is None:
raise Exception("loadingContext.loader cannot be None")

if isinstance(tool, Workflow):
url = urllib.parse.urlparse(tool.tool["id"])
if url.fragment:
extracted = get_process(
tool,
tool.tool["id"] + "/" + args.single_process,
loadingContext.loader.idx,
)
step_id = tool.tool["id"] + "/" + args.single_process
else:
extracted = get_process(
tool,
loadingContext.loader.fetcher.urljoin(
tool.tool["id"], "#" + args.single_process
),
loadingContext.loader.idx,
step_id = loadingContext.loader.fetcher.urljoin(
tool.tool["id"], "#" + args.single_process
)
extracted, workflow_step = get_process(
tool,
step_id,
loadingContext,
)
else:
_logger.error("Can only use --single-process on Workflows")
return None
if isinstance(loadingContext.loader.idx, MutableMapping):
loadingContext.loader.idx[extracted["id"]] = extracted
tool = make_tool(extracted["id"], loadingContext)
new_tool = make_tool(extracted["id"], loadingContext)
else:
raise Exception("Missing loadingContext.loader.idx!")

return tool
inherit_reqshints(new_tool, workflow_step)
return new_tool


def check_working_directories(
Expand Down Expand Up @@ -887,6 +910,33 @@ def check_working_directories(
return None


def print_targets(
tool: Process,
stdout: Union[TextIO, StreamWriter],
loading_context: LoadingContext,
prefix: str = "",
) -> None:
"""Recursively find targets for --subgraph and friends."""
for f in ("outputs", "inputs"):
if tool.tool[f]:
_logger.info("%s %s%s targets:", prefix[:-1], f[0].upper(), f[1:-1])
stdout.write(
" "
+ "\n ".join([f"{prefix}{shortname(t['id'])}" for t in tool.tool[f]])
+ "\n"
)
if "steps" in tool.tool:
_logger.info("%s steps targets:", prefix[:-1])
for t in tool.tool["steps"]:
stdout.write(f" {prefix}{shortname(t['id'])}\n")
run: Union[str, Process] = t["run"]
if isinstance(run, str):
process = make_tool(run, loading_context)
else:
process = run
print_targets(process, stdout, loading_context, shortname(t["id"]) + "/")


def main(
argsl: Optional[List[str]] = None,
args: Optional[argparse.Namespace] = None,
Expand Down Expand Up @@ -1084,14 +1134,7 @@ def main(
return 0

if args.print_targets:
for f in ("outputs", "steps", "inputs"):
if tool.tool[f]:
_logger.info("%s%s targets:", f[0].upper(), f[1:-1])
stdout.write(
" "
+ "\n ".join([shortname(t["id"]) for t in tool.tool[f]])
+ "\n"
)
print_targets(tool, stdout, loadingContext)
return 0

if args.target:
Expand Down
3 changes: 2 additions & 1 deletion cwltool/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
from typing_extensions import TYPE_CHECKING

from . import expression
from .builder import INPUT_OBJ_VOCAB, Builder, HasReqsHints
from .builder import INPUT_OBJ_VOCAB, Builder
from .context import LoadingContext, RuntimeContext, getdefault
from .errors import UnsupportedRequirement, WorkflowException
from .loghandler import _logger
Expand All @@ -62,6 +62,7 @@
CWLObjectType,
CWLOutputAtomType,
CWLOutputType,
HasReqsHints,
JobsGeneratorType,
OutputCallbackType,
adjustDirObjs,
Expand Down
8 changes: 5 additions & 3 deletions cwltool/software_requirements.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@

from typing_extensions import TYPE_CHECKING

from .utils import HasReqsHints

if TYPE_CHECKING:
from .builder import Builder, HasReqsHints
from .builder import Builder

try:
from galaxy.tool_util import deps
Expand Down Expand Up @@ -98,7 +100,7 @@ def build_job_script(self, builder: "Builder", command: List[str]) -> str:
return job_script


def get_dependencies(builder: "HasReqsHints") -> ToolRequirements:
def get_dependencies(builder: HasReqsHints) -> ToolRequirements:
(software_requirement, _) = builder.get_requirement("SoftwareRequirement")
dependencies = [] # type: List[ToolRequirement]
if software_requirement and software_requirement.get("packages"):
Expand Down Expand Up @@ -129,7 +131,7 @@ def get_dependencies(builder: "HasReqsHints") -> ToolRequirements:


def get_container_from_software_requirements(
use_biocontainers: bool, builder: "HasReqsHints"
use_biocontainers: bool, builder: HasReqsHints
) -> Optional[str]:
if use_biocontainers:
ensure_galaxy_lib_available()
Expand Down
Loading