diff --git a/src/taskgraph/actions/util.py b/src/taskgraph/actions/util.py index 9d4979129..5646a0c1d 100644 --- a/src/taskgraph/actions/util.py +++ b/src/taskgraph/actions/util.py @@ -19,6 +19,7 @@ from taskgraph.util.taskcluster import ( CONCURRENCY, get_artifact, + get_artifact_with_prefix, get_session, list_tasks, parse_time, @@ -28,8 +29,8 @@ logger = logging.getLogger(__name__) -def get_parameters(decision_task_id): - return get_artifact(decision_task_id, "public/parameters.yml") +def get_parameters(decision_task_id, artifact_prefix="public"): + return get_artifact(decision_task_id, f"{artifact_prefix}/parameters.yml") def fetch_graph_and_labels(parameters, graph_config, task_group_id=None): @@ -43,9 +44,13 @@ def fetch_graph_and_labels(parameters, graph_config, task_group_id=None): decision_task_id = task_group_id # First grab the graph and labels generated during the initial decision task - full_task_graph = get_artifact(decision_task_id, "public/full-task-graph.json") + full_task_graph = get_artifact_with_prefix( + decision_task_id, "full-task-graph.json", parameters + ) _, full_task_graph = TaskGraph.from_json(full_task_graph) - label_to_taskid = get_artifact(decision_task_id, "public/label-to-taskid.json") + label_to_taskid = get_artifact_with_prefix( + decision_task_id, "label-to-taskid.json", parameters + ) # fetch everything in parallel; this avoids serializing any delay in downloading # each artifact (such as waiting for the artifact to be mirrored locally) @@ -56,7 +61,9 @@ def fetch_graph_and_labels(parameters, graph_config, task_group_id=None): # for old ones def fetch_action(task_id): logger.info(f"fetching label-to-taskid.json for action task {task_id}") - run_label_to_id = get_artifact(task_id, "public/label-to-taskid.json") + run_label_to_id = get_artifact_with_prefix( + task_id, "label-to-taskid.json", parameters + ) if label_to_taskid and run_label_to_id: label_to_taskid.update(run_label_to_id) else: @@ -81,7 +88,9 @@ def fetch_action(task_id): def fetch_cron(task_id): logger.info(f"fetching label-to-taskid.json for cron task {task_id}") try: - run_label_to_id = get_artifact(task_id, "public/label-to-taskid.json") + run_label_to_id = get_artifact_with_prefix( + task_id, "label-to-taskid.json", parameters + ) label_to_taskid.update(run_label_to_id) # type: ignore except HTTPError as e: if e.response.status_code != 404: diff --git a/src/taskgraph/decision.py b/src/taskgraph/decision.py index e75bc3014..b0deeeac7 100644 --- a/src/taskgraph/decision.py +++ b/src/taskgraph/decision.py @@ -113,34 +113,42 @@ def taskgraph_decision(options, parameters=None): enable_verifications=options.get("verify", True), ) + # Get the artifact prefix from parameters + artifact_prefix = tgg.parameters.get("artifact_prefix", "public") + # write out the parameters used to generate this graph - write_artifact("parameters.yml", dict(**tgg.parameters)) + write_artifact("parameters.yml", dict(**tgg.parameters), artifact_prefix) - # write out the public/actions.json file + # write out the actions.json file write_artifact( "actions.json", render_actions_json(tgg.parameters, tgg.graph_config, decision_task_id), + artifact_prefix, ) # write out the full graph for reference full_task_json = tgg.full_task_graph.to_json() - write_artifact("full-task-graph.json", full_task_json) + write_artifact("full-task-graph.json", full_task_json, artifact_prefix) - # write out the public/runnable-jobs.json file + # write out the runnable-jobs.json file write_artifact( - "runnable-jobs.json", full_task_graph_to_runnable_tasks(full_task_json) + "runnable-jobs.json", + full_task_graph_to_runnable_tasks(full_task_json), + artifact_prefix, ) # this is just a test to check whether the from_json() function is working _, _ = TaskGraph.from_json(full_task_json) # write out the target task set to allow reproducing this as input - write_artifact("target-tasks.json", list(tgg.target_task_set.tasks.keys())) + write_artifact( + "target-tasks.json", list(tgg.target_task_set.tasks.keys()), artifact_prefix + ) # write out the optimized task graph to describe what will actually happen, # and the map of labels to taskids - write_artifact("task-graph.json", tgg.morphed_task_graph.to_json()) - write_artifact("label-to-taskid.json", tgg.label_to_taskid) + write_artifact("task-graph.json", tgg.morphed_task_graph.to_json(), artifact_prefix) + write_artifact("label-to-taskid.json", tgg.label_to_taskid, artifact_prefix) # write out current run-task and fetch-content scripts RUN_TASK_DIR = pathlib.Path(__file__).parent / "run-task" @@ -181,6 +189,7 @@ def get_decision_parameters(graph_config, options): "level", "target_tasks_method", "tasks_for", + "artifact_prefix", ] if n in options } @@ -366,11 +375,14 @@ def set_try_config(parameters, task_config_file): ) -def write_artifact(filename, data): - logger.info(f"writing artifact file `{filename}`") +def write_artifact(filename, data, artifact_prefix="public"): + prefixed_filename = f"{artifact_prefix}/{filename}" + logger.info(f"writing artifact file `{prefixed_filename}`") if not os.path.isdir(ARTIFACTS_DIR): os.mkdir(ARTIFACTS_DIR) - path = ARTIFACTS_DIR / filename + prefix_dir = ARTIFACTS_DIR / artifact_prefix + os.makedirs(prefix_dir, exist_ok=True) + path = prefix_dir / filename if filename.endswith(".yml"): with open(path, "w") as f: yaml.safe_dump(data, f, allow_unicode=True, default_flow_style=False) @@ -386,10 +398,11 @@ def write_artifact(filename, data): raise TypeError(f"Don't know how to write to {filename}") -def read_artifact(filename): - path = ARTIFACTS_DIR / filename +def read_artifact(filename, artifact_prefix="public"): + prefix_dir = ARTIFACTS_DIR / artifact_prefix + path = prefix_dir / filename if filename.endswith(".yml"): - return load_yaml(path, filename) + return load_yaml(prefix_dir, filename) elif filename.endswith(".json"): with open(path) as f: return json.load(f) @@ -402,5 +415,6 @@ def read_artifact(filename): raise TypeError(f"Don't know how to read {filename}") -def rename_artifact(src, dest): - os.rename(ARTIFACTS_DIR / src, ARTIFACTS_DIR / dest) +def rename_artifact(src, dest, artifact_prefix="public"): + prefix_dir = ARTIFACTS_DIR / artifact_prefix + os.rename(prefix_dir / src, prefix_dir / dest) diff --git a/src/taskgraph/main.py b/src/taskgraph/main.py index 6113ac2f5..b9ad40c8c 100644 --- a/src/taskgraph/main.py +++ b/src/taskgraph/main.py @@ -790,6 +790,11 @@ def load_task(args): @argument( "--verbose", "-v", action="store_true", help="include debug-level logging output" ) +@argument( + "--artifact-prefix", + default="public", + help="Prefix for artifact paths (default: public)", +) def decision(options): from taskgraph.decision import taskgraph_decision # noqa: PLC0415 diff --git a/src/taskgraph/parameters.py b/src/taskgraph/parameters.py index d88a155b7..8606fb49c 100644 --- a/src/taskgraph/parameters.py +++ b/src/taskgraph/parameters.py @@ -61,6 +61,7 @@ class ParameterMismatch(Exception): Required("target_tasks_method"): str, Required("tasks_for"): str, Required("version"): Any(str, None), + Optional("artifact_prefix"): str, Optional("code-review"): { Required("phabricator-build-target"): str, }, diff --git a/src/taskgraph/transforms/docker_image.py b/src/taskgraph/transforms/docker_image.py index 95e80d09c..13c105140 100644 --- a/src/taskgraph/transforms/docker_image.py +++ b/src/taskgraph/transforms/docker_image.py @@ -14,6 +14,10 @@ from taskgraph.util import json from taskgraph.util.docker import create_context_tar, generate_context_hash from taskgraph.util.schema import Schema +from taskgraph.util.taskcluster import ( + get_artifact_prefix_from_parameters, + is_public_artifact_prefix, +) from .task import task_description_schema @@ -143,6 +147,8 @@ def fill_template(config, tasks): f"Missing package job for {config.kind}-{image_name}: {p}" ) + artifact_prefix = get_artifact_prefix_from_parameters(config.params) + if not taskgraph.fast: context_path = os.path.join("taskcluster", "docker", definition) topsrcdir = os.path.dirname(config.graph_config.taskcluster_yml) @@ -189,7 +195,9 @@ def fill_template(config, tasks): }, "always-target": True, "expires-after": expires if config.params.is_try() else "1 year", - "scopes": [], + "scopes": [] + if is_public_artifact_prefix(config.params) + else [f"queue:get-artifact:{artifact_prefix}/docker-contexts/*"], "run-on-projects": [], "worker-type": "images", "worker": { @@ -199,12 +207,12 @@ def fill_template(config, tasks): { "type": "file", "path": "/workspace/out/image.tar.zst", - "name": "public/image.tar.zst", + "name": f"{artifact_prefix}/image.tar.zst", } ], "env": { "CONTEXT_TASK_ID": {"task-reference": ""}, - "CONTEXT_PATH": f"public/docker-contexts/{image_name}.tar.gz", + "CONTEXT_PATH": f"{artifact_prefix}/docker-contexts/{image_name}.tar.gz", "HASH": context_hash, "PROJECT": config.params["project"], "IMAGE_NAME": image_name, @@ -221,6 +229,9 @@ def fill_template(config, tasks): "max-run-time": 7200, }, } + + if not is_public_artifact_prefix(config.params): + taskdesc["worker"]["taskcluster-proxy"] = True if "index" in task: taskdesc["index"] = task["index"] if job_symbol: diff --git a/src/taskgraph/transforms/task.py b/src/taskgraph/transforms/task.py index bf14f2ab2..a41bef681 100644 --- a/src/taskgraph/transforms/task.py +++ b/src/taskgraph/transforms/task.py @@ -489,6 +489,8 @@ def verify_index(config, index): {"in-tree": str}, # an indexed docker image {"indexed": str}, + # an external task image + {"path": str, "type": str, "task-id": str}, ), # worker features that should be enabled Required("relengapi-proxy"): bool, @@ -593,6 +595,13 @@ def build_docker_worker_payload(config, task, task_def): "namespace": image["indexed"], "type": "indexed-image", } + elif "task-id" in image: + # External task image - convert to taskId format for Taskcluster + image = { + "path": image["path"], + "taskId": image["task-id"], + "type": image["type"], + } else: raise Exception("unknown docker image type") diff --git a/src/taskgraph/util/taskcluster.py b/src/taskgraph/util/taskcluster.py index c0a20d833..6c2273c5f 100644 --- a/src/taskgraph/util/taskcluster.py +++ b/src/taskgraph/util/taskcluster.py @@ -161,8 +161,32 @@ def get_artifact_path(task, path): return f"{get_artifact_prefix(task)}/{path}" -def get_index_url(index_path, multiple=False): - index_tmpl = liburls.api(get_root_url(), "index", "v1", "task{}/{}") +def get_artifact_prefix_from_parameters(parameters): + return parameters.get("artifact_prefix", "public") + + +def is_public_artifact_prefix(parameters): + return get_artifact_prefix_from_parameters(parameters) == "public" + + +def get_artifact_with_prefix(task_id, path, parameters, use_proxy=False): + prefix = get_artifact_prefix_from_parameters(parameters) + prefixed_path = f"{prefix}/{path}" + return get_artifact(task_id, prefixed_path, use_proxy) + + +def get_artifact_prefix_from_parameters(parameters): + return parameters.get("artifact_prefix", "public") + + +def get_artifact_with_prefix(task_id, path, parameters, use_proxy=False): + prefix = get_artifact_prefix_from_parameters(parameters) + prefixed_path = f"{prefix}/{path}" + return get_artifact(task_id, prefixed_path, use_proxy) + + +def get_index_url(index_path, use_proxy=False, multiple=False): + index_tmpl = liburls.api(get_root_url(use_proxy), "index", "v1", "task{}/{}") return index_tmpl.format("s" if multiple else "", index_path) diff --git a/src/taskgraph/util/taskgraph.py b/src/taskgraph/util/taskgraph.py index a368f4e11..4e15efa25 100644 --- a/src/taskgraph/util/taskgraph.py +++ b/src/taskgraph/util/taskgraph.py @@ -6,7 +6,7 @@ Tools for interacting with existing taskgraphs. """ -from taskgraph.util.taskcluster import find_task_id, get_artifact +from taskgraph.util.taskcluster import find_task_id, get_artifact_with_prefix def find_decision_task(parameters, graph_config): @@ -35,14 +35,16 @@ def find_decision_task(parameters, graph_config): def find_existing_tasks_from_previous_kinds( - full_task_graph, previous_graph_ids, rebuild_kinds + full_task_graph, previous_graph_ids, rebuild_kinds, parameters=None ): """Given a list of previous decision/action taskIds and kinds to ignore from the previous graphs, return a dictionary of labels-to-taskids to use as ``existing_tasks`` in the optimization step.""" existing_tasks = {} for previous_graph_id in previous_graph_ids: - label_to_taskid = get_artifact(previous_graph_id, "public/label-to-taskid.json") + label_to_taskid = get_artifact_with_prefix( + previous_graph_id, "label-to-taskid.json", parameters + ) if label_to_taskid: kind_labels = { t.label diff --git a/test/test_actions_util.py b/test/test_actions_util.py new file mode 100644 index 000000000..3b4f3f154 --- /dev/null +++ b/test/test_actions_util.py @@ -0,0 +1,28 @@ +import unittest +from unittest import mock + +from taskgraph.actions.util import get_parameters + + +class TestActionsUtil(unittest.TestCase): + @mock.patch("taskgraph.actions.util.get_artifact") + def test_get_parameters_default_prefix(self, mock_get_artifact): + mock_get_artifact.return_value = {"some": "parameters"} + + result = get_parameters("task-id-123") + + mock_get_artifact.assert_called_once_with( + "task-id-123", "public/parameters.yml" + ) + self.assertEqual(result, {"some": "parameters"}) + + @mock.patch("taskgraph.actions.util.get_artifact") + def test_get_parameters_custom_prefix(self, mock_get_artifact): + mock_get_artifact.return_value = {"some": "parameters"} + + result = get_parameters("task-id-123", "custom-prefix") + + mock_get_artifact.assert_called_once_with( + "task-id-123", "custom-prefix/parameters.yml" + ) + self.assertEqual(result, {"some": "parameters"}) diff --git a/test/test_decision.py b/test/test_decision.py index 2b2e4f34e..aa0fce351 100644 --- a/test/test_decision.py +++ b/test/test_decision.py @@ -26,7 +26,9 @@ def test_write_artifact_json(self): try: decision.ARTIFACTS_DIR = Path(tmpdir) / "artifacts" decision.write_artifact("artifact.json", data) - with open(os.path.join(decision.ARTIFACTS_DIR, "artifact.json")) as f: + with open( + os.path.join(decision.ARTIFACTS_DIR, "public", "artifact.json") + ) as f: self.assertEqual(json.load(f), data) finally: if os.path.exists(tmpdir): @@ -39,7 +41,97 @@ def test_write_artifact_yml(self): try: decision.ARTIFACTS_DIR = Path(tmpdir) / "artifacts" decision.write_artifact("artifact.yml", data) - self.assertEqual(load_yaml(decision.ARTIFACTS_DIR, "artifact.yml"), data) + self.assertEqual( + load_yaml(decision.ARTIFACTS_DIR / "public", "artifact.yml"), data + ) + finally: + if os.path.exists(tmpdir): + shutil.rmtree(tmpdir) + decision.ARTIFACTS_DIR = Path("artifacts") + + def test_write_artifact_custom_prefix(self): + data = [{"some": "data"}] + tmpdir = tempfile.mkdtemp() + try: + decision.ARTIFACTS_DIR = Path(tmpdir) / "artifacts" + decision.write_artifact("artifact.json", data, "custom-prefix") + with open( + os.path.join(decision.ARTIFACTS_DIR, "custom-prefix", "artifact.json") + ) as f: + self.assertEqual(json.load(f), data) + finally: + if os.path.exists(tmpdir): + shutil.rmtree(tmpdir) + decision.ARTIFACTS_DIR = Path("artifacts") + + def test_read_artifact_json(self): + data = {"test": "data"} + tmpdir = tempfile.mkdtemp() + try: + decision.ARTIFACTS_DIR = Path(tmpdir) / "artifacts" + decision.write_artifact("test.json", data) + result = decision.read_artifact("test.json") + self.assertEqual(result, data) + finally: + if os.path.exists(tmpdir): + shutil.rmtree(tmpdir) + decision.ARTIFACTS_DIR = Path("artifacts") + + def test_read_artifact_yml(self): + data = {"test": "data"} + tmpdir = tempfile.mkdtemp() + try: + decision.ARTIFACTS_DIR = Path(tmpdir) / "artifacts" + decision.write_artifact("test.yml", data) + result = decision.read_artifact("test.yml") + self.assertEqual(result, data) + finally: + if os.path.exists(tmpdir): + shutil.rmtree(tmpdir) + decision.ARTIFACTS_DIR = Path("artifacts") + + def test_read_artifact_custom_prefix(self): + data = {"test": "data"} + tmpdir = tempfile.mkdtemp() + try: + decision.ARTIFACTS_DIR = Path(tmpdir) / "artifacts" + decision.write_artifact("test.json", data, "custom") + result = decision.read_artifact("test.json", "custom") + self.assertEqual(result, data) + finally: + if os.path.exists(tmpdir): + shutil.rmtree(tmpdir) + decision.ARTIFACTS_DIR = Path("artifacts") + + def test_rename_artifact(self): + data = {"test": "data"} + tmpdir = tempfile.mkdtemp() + try: + decision.ARTIFACTS_DIR = Path(tmpdir) / "artifacts" + decision.write_artifact("original.json", data) + decision.rename_artifact("original.json", "renamed.json") + result = decision.read_artifact("renamed.json") + self.assertEqual(result, data) + # Verify original is gone + with self.assertRaises(FileNotFoundError): + decision.read_artifact("original.json") + finally: + if os.path.exists(tmpdir): + shutil.rmtree(tmpdir) + decision.ARTIFACTS_DIR = Path("artifacts") + + def test_rename_artifact_custom_prefix(self): + data = {"test": "data"} + tmpdir = tempfile.mkdtemp() + try: + decision.ARTIFACTS_DIR = Path(tmpdir) / "artifacts" + decision.write_artifact("original.json", data, "custom") + decision.rename_artifact("original.json", "renamed.json", "custom") + result = decision.read_artifact("renamed.json", "custom") + self.assertEqual(result, data) + # Verify original is gone + with self.assertRaises(FileNotFoundError): + decision.read_artifact("original.json", "custom") finally: if os.path.exists(tmpdir): shutil.rmtree(tmpdir)