Skip to content
21 changes: 15 additions & 6 deletions src/taskgraph/actions/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from taskgraph.util.taskcluster import (
CONCURRENCY,
get_artifact,
get_artifact_with_prefix,
get_session,
list_tasks,
parse_time,
Expand All @@ -28,8 +29,8 @@
logger = logging.getLogger(__name__)


def get_parameters(decision_task_id):
return get_artifact(decision_task_id, "public/parameters.yml")
def get_parameters(decision_task_id, artifact_prefix="public"):
return get_artifact(decision_task_id, f"{artifact_prefix}/parameters.yml")


def fetch_graph_and_labels(parameters, graph_config, task_group_id=None):
Expand All @@ -43,9 +44,13 @@ def fetch_graph_and_labels(parameters, graph_config, task_group_id=None):
decision_task_id = task_group_id

# First grab the graph and labels generated during the initial decision task
full_task_graph = get_artifact(decision_task_id, "public/full-task-graph.json")
full_task_graph = get_artifact_with_prefix(
decision_task_id, "full-task-graph.json", parameters
)
_, full_task_graph = TaskGraph.from_json(full_task_graph)
label_to_taskid = get_artifact(decision_task_id, "public/label-to-taskid.json")
label_to_taskid = get_artifact_with_prefix(
decision_task_id, "label-to-taskid.json", parameters
)

# fetch everything in parallel; this avoids serializing any delay in downloading
# each artifact (such as waiting for the artifact to be mirrored locally)
Expand All @@ -56,7 +61,9 @@ def fetch_graph_and_labels(parameters, graph_config, task_group_id=None):
# for old ones
def fetch_action(task_id):
logger.info(f"fetching label-to-taskid.json for action task {task_id}")
run_label_to_id = get_artifact(task_id, "public/label-to-taskid.json")
run_label_to_id = get_artifact_with_prefix(
task_id, "label-to-taskid.json", parameters
)
if label_to_taskid and run_label_to_id:
label_to_taskid.update(run_label_to_id)
else:
Expand All @@ -81,7 +88,9 @@ def fetch_action(task_id):
def fetch_cron(task_id):
logger.info(f"fetching label-to-taskid.json for cron task {task_id}")
try:
run_label_to_id = get_artifact(task_id, "public/label-to-taskid.json")
run_label_to_id = get_artifact_with_prefix(
task_id, "label-to-taskid.json", parameters
)
label_to_taskid.update(run_label_to_id) # type: ignore
except HTTPError as e:
if e.response.status_code != 404:
Expand Down
46 changes: 30 additions & 16 deletions src/taskgraph/decision.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,34 +113,42 @@ def taskgraph_decision(options, parameters=None):
enable_verifications=options.get("verify", True),
)

# Get the artifact prefix from parameters
artifact_prefix = tgg.parameters.get("artifact_prefix", "public")

# write out the parameters used to generate this graph
write_artifact("parameters.yml", dict(**tgg.parameters))
write_artifact("parameters.yml", dict(**tgg.parameters), artifact_prefix)

# write out the public/actions.json file
# write out the actions.json file
write_artifact(
"actions.json",
render_actions_json(tgg.parameters, tgg.graph_config, decision_task_id),
artifact_prefix,
)

# write out the full graph for reference
full_task_json = tgg.full_task_graph.to_json()
write_artifact("full-task-graph.json", full_task_json)
write_artifact("full-task-graph.json", full_task_json, artifact_prefix)

# write out the public/runnable-jobs.json file
# write out the runnable-jobs.json file
write_artifact(
"runnable-jobs.json", full_task_graph_to_runnable_tasks(full_task_json)
"runnable-jobs.json",
full_task_graph_to_runnable_tasks(full_task_json),
artifact_prefix,
)

# this is just a test to check whether the from_json() function is working
_, _ = TaskGraph.from_json(full_task_json)

# write out the target task set to allow reproducing this as input
write_artifact("target-tasks.json", list(tgg.target_task_set.tasks.keys()))
write_artifact(
"target-tasks.json", list(tgg.target_task_set.tasks.keys()), artifact_prefix
)

# write out the optimized task graph to describe what will actually happen,
# and the map of labels to taskids
write_artifact("task-graph.json", tgg.morphed_task_graph.to_json())
write_artifact("label-to-taskid.json", tgg.label_to_taskid)
write_artifact("task-graph.json", tgg.morphed_task_graph.to_json(), artifact_prefix)
write_artifact("label-to-taskid.json", tgg.label_to_taskid, artifact_prefix)

# write out current run-task and fetch-content scripts
RUN_TASK_DIR = pathlib.Path(__file__).parent / "run-task"
Expand Down Expand Up @@ -181,6 +189,7 @@ def get_decision_parameters(graph_config, options):
"level",
"target_tasks_method",
"tasks_for",
"artifact_prefix",
]
if n in options
}
Expand Down Expand Up @@ -366,11 +375,14 @@ def set_try_config(parameters, task_config_file):
)


def write_artifact(filename, data):
logger.info(f"writing artifact file `{filename}`")
def write_artifact(filename, data, artifact_prefix="public"):
prefixed_filename = f"{artifact_prefix}/{filename}"
logger.info(f"writing artifact file `{prefixed_filename}`")
if not os.path.isdir(ARTIFACTS_DIR):
os.mkdir(ARTIFACTS_DIR)
path = ARTIFACTS_DIR / filename
prefix_dir = ARTIFACTS_DIR / artifact_prefix
os.makedirs(prefix_dir, exist_ok=True)
path = prefix_dir / filename
if filename.endswith(".yml"):
with open(path, "w") as f:
yaml.safe_dump(data, f, allow_unicode=True, default_flow_style=False)
Expand All @@ -386,10 +398,11 @@ def write_artifact(filename, data):
raise TypeError(f"Don't know how to write to {filename}")


def read_artifact(filename):
path = ARTIFACTS_DIR / filename
def read_artifact(filename, artifact_prefix="public"):
prefix_dir = ARTIFACTS_DIR / artifact_prefix
path = prefix_dir / filename
if filename.endswith(".yml"):
return load_yaml(path, filename)
return load_yaml(prefix_dir, filename)
elif filename.endswith(".json"):
with open(path) as f:
return json.load(f)
Expand All @@ -402,5 +415,6 @@ def read_artifact(filename):
raise TypeError(f"Don't know how to read {filename}")


def rename_artifact(src, dest):
os.rename(ARTIFACTS_DIR / src, ARTIFACTS_DIR / dest)
def rename_artifact(src, dest, artifact_prefix="public"):
prefix_dir = ARTIFACTS_DIR / artifact_prefix
os.rename(prefix_dir / src, prefix_dir / dest)
5 changes: 5 additions & 0 deletions src/taskgraph/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -790,6 +790,11 @@ def load_task(args):
@argument(
"--verbose", "-v", action="store_true", help="include debug-level logging output"
)
@argument(
"--artifact-prefix",
default="public",
help="Prefix for artifact paths (default: public)",
)
def decision(options):
from taskgraph.decision import taskgraph_decision # noqa: PLC0415

Expand Down
1 change: 1 addition & 0 deletions src/taskgraph/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ class ParameterMismatch(Exception):
Required("target_tasks_method"): str,
Required("tasks_for"): str,
Required("version"): Any(str, None),
Optional("artifact_prefix"): str,
Optional("code-review"): {
Required("phabricator-build-target"): str,
},
Expand Down
17 changes: 14 additions & 3 deletions src/taskgraph/transforms/docker_image.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@
from taskgraph.util import json
from taskgraph.util.docker import create_context_tar, generate_context_hash
from taskgraph.util.schema import Schema
from taskgraph.util.taskcluster import (
get_artifact_prefix_from_parameters,
is_public_artifact_prefix,
)

from .task import task_description_schema

Expand Down Expand Up @@ -143,6 +147,8 @@ def fill_template(config, tasks):
f"Missing package job for {config.kind}-{image_name}: {p}"
)

artifact_prefix = get_artifact_prefix_from_parameters(config.params)

if not taskgraph.fast:
context_path = os.path.join("taskcluster", "docker", definition)
topsrcdir = os.path.dirname(config.graph_config.taskcluster_yml)
Expand Down Expand Up @@ -189,7 +195,9 @@ def fill_template(config, tasks):
},
"always-target": True,
"expires-after": expires if config.params.is_try() else "1 year",
"scopes": [],
"scopes": []
if is_public_artifact_prefix(config.params)
else [f"queue:get-artifact:{artifact_prefix}/docker-contexts/*"],
"run-on-projects": [],
"worker-type": "images",
"worker": {
Expand All @@ -199,12 +207,12 @@ def fill_template(config, tasks):
{
"type": "file",
"path": "/workspace/out/image.tar.zst",
"name": "public/image.tar.zst",
"name": f"{artifact_prefix}/image.tar.zst",
}
],
"env": {
"CONTEXT_TASK_ID": {"task-reference": "<decision>"},
"CONTEXT_PATH": f"public/docker-contexts/{image_name}.tar.gz",
"CONTEXT_PATH": f"{artifact_prefix}/docker-contexts/{image_name}.tar.gz",
"HASH": context_hash,
"PROJECT": config.params["project"],
"IMAGE_NAME": image_name,
Expand All @@ -221,6 +229,9 @@ def fill_template(config, tasks):
"max-run-time": 7200,
},
}

if not is_public_artifact_prefix(config.params):
taskdesc["worker"]["taskcluster-proxy"] = True
if "index" in task:
taskdesc["index"] = task["index"]
if job_symbol:
Expand Down
9 changes: 9 additions & 0 deletions src/taskgraph/transforms/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,8 @@ def verify_index(config, index):
{"in-tree": str},
# an indexed docker image
{"indexed": str},
# an external task image
{"path": str, "type": str, "task-id": str},
),
# worker features that should be enabled
Required("relengapi-proxy"): bool,
Expand Down Expand Up @@ -593,6 +595,13 @@ def build_docker_worker_payload(config, task, task_def):
"namespace": image["indexed"],
"type": "indexed-image",
}
elif "task-id" in image:
# External task image - convert to taskId format for Taskcluster
image = {
"path": image["path"],
"taskId": image["task-id"],
"type": image["type"],
}
else:
raise Exception("unknown docker image type")

Expand Down
28 changes: 26 additions & 2 deletions src/taskgraph/util/taskcluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,32 @@ def get_artifact_path(task, path):
return f"{get_artifact_prefix(task)}/{path}"


def get_index_url(index_path, multiple=False):
index_tmpl = liburls.api(get_root_url(), "index", "v1", "task{}/{}")
def get_artifact_prefix_from_parameters(parameters):
return parameters.get("artifact_prefix", "public")


def is_public_artifact_prefix(parameters):
return get_artifact_prefix_from_parameters(parameters) == "public"


def get_artifact_with_prefix(task_id, path, parameters, use_proxy=False):
prefix = get_artifact_prefix_from_parameters(parameters)
prefixed_path = f"{prefix}/{path}"
return get_artifact(task_id, prefixed_path, use_proxy)


def get_artifact_prefix_from_parameters(parameters):
return parameters.get("artifact_prefix", "public")


def get_artifact_with_prefix(task_id, path, parameters, use_proxy=False):
prefix = get_artifact_prefix_from_parameters(parameters)
prefixed_path = f"{prefix}/{path}"
return get_artifact(task_id, prefixed_path, use_proxy)


def get_index_url(index_path, use_proxy=False, multiple=False):
index_tmpl = liburls.api(get_root_url(use_proxy), "index", "v1", "task{}/{}")
return index_tmpl.format("s" if multiple else "", index_path)


Expand Down
8 changes: 5 additions & 3 deletions src/taskgraph/util/taskgraph.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
Tools for interacting with existing taskgraphs.
"""

from taskgraph.util.taskcluster import find_task_id, get_artifact
from taskgraph.util.taskcluster import find_task_id, get_artifact_with_prefix


def find_decision_task(parameters, graph_config):
Expand Down Expand Up @@ -35,14 +35,16 @@ def find_decision_task(parameters, graph_config):


def find_existing_tasks_from_previous_kinds(
full_task_graph, previous_graph_ids, rebuild_kinds
full_task_graph, previous_graph_ids, rebuild_kinds, parameters=None
):
"""Given a list of previous decision/action taskIds and kinds to ignore
from the previous graphs, return a dictionary of labels-to-taskids to use
as ``existing_tasks`` in the optimization step."""
existing_tasks = {}
for previous_graph_id in previous_graph_ids:
label_to_taskid = get_artifact(previous_graph_id, "public/label-to-taskid.json")
label_to_taskid = get_artifact_with_prefix(
previous_graph_id, "label-to-taskid.json", parameters
)
if label_to_taskid:
kind_labels = {
t.label
Expand Down
28 changes: 28 additions & 0 deletions test/test_actions_util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import unittest
from unittest import mock

from taskgraph.actions.util import get_parameters


class TestActionsUtil(unittest.TestCase):
@mock.patch("taskgraph.actions.util.get_artifact")
def test_get_parameters_default_prefix(self, mock_get_artifact):
mock_get_artifact.return_value = {"some": "parameters"}

result = get_parameters("task-id-123")

mock_get_artifact.assert_called_once_with(
"task-id-123", "public/parameters.yml"
)
self.assertEqual(result, {"some": "parameters"})

@mock.patch("taskgraph.actions.util.get_artifact")
def test_get_parameters_custom_prefix(self, mock_get_artifact):
mock_get_artifact.return_value = {"some": "parameters"}

result = get_parameters("task-id-123", "custom-prefix")

mock_get_artifact.assert_called_once_with(
"task-id-123", "custom-prefix/parameters.yml"
)
self.assertEqual(result, {"some": "parameters"})
Loading
Loading