From fe34bba1f765e51da2eb7279e08ca74c5e493c5d Mon Sep 17 00:00:00 2001 From: Tristan Rice Date: Tue, 4 Jan 2022 13:17:33 -0800 Subject: [PATCH] torchx/runner,cli: add patching support via WorkspaceScheduler (#360) Summary: This adds patching support to the cli and runner. To keep this somewhat isolated from the main APIs there's now a couple of new concepts: * WorkspaceScheduler extends Scheduler and adds a build_workspace_image * WorkspaceRunner extends Runner and adds support for building a workspace image as part of dryrun This makes the cli run command use the WorkspaceRunner instead of the normal Runner for experimentation purposes. Patching is enabled if the cwd contains a .torchxconfig file. Pull Request resolved: https://github.com/pytorch/torchx/pull/360 Test Plan: $ touch .torchxconfig $ echo "echo foo" > foo.sh $ torchx run --scheduler local_docker utils.sh sh foo.sh torchx 2021-12-10 16:24:19 INFO loaded configs from /home/tristanr/Developer/torchx-proj/.torchxconfig torchx 2021-12-10 16:24:19 INFO building patch images for workspace: file:///home/tristanr/Developer/torchx-proj... torchx 2021-12-10 16:24:21 INFO Pulling container image: sha256:0da419b5accdba18412014ffeafcf782acd53d6522a198ee7cbfb48556f355be (this may take a while) torchx 2021-12-10 16:24:23 WARNING failed to pull image sha256:0da419b5accdba18412014ffeafcf782acd53d6522a198ee7cbfb48556f355be, falling back to local: 404 Client Error for htt p+docker://localhost/v1.41/images/create?tag=0da419b5accdba18412014ffeafcf782acd53d6522a198ee7cbfb48556f355be&fromImage=sha256: Not Found ("pull access denied for sha256, reposi tory does not exist or may require 'docker login': denied: requested access to the resource is denied") local_docker://torchx/sh-g7frzl4q92g2bd torchx 2021-12-10 16:24:24 INFO Waiting for the app to finish... torchx 2021-12-10 16:24:24 INFO Job finished: SUCCEEDED sh/0 foo Reviewed By: kiukchung Differential Revision: D33036468 Pulled By: d4l3k fbshipit-source-id: 430f1955ce77ff3a3b1d9b1a5ba5deaebf4f25c2 --- torchx/cli/cmd_run.py | 47 +++++++--- torchx/runner/config.py | 24 +++-- torchx/runner/test/workspaces_test.py | 52 +++++++++++ torchx/runner/workspaces.py | 122 ++++++++++++++++++++++++++ torchx/schedulers/api.py | 16 ++++ torchx/schedulers/docker_scheduler.py | 8 +- 6 files changed, 247 insertions(+), 22 deletions(-) create mode 100644 torchx/runner/test/workspaces_test.py create mode 100644 torchx/runner/workspaces.py diff --git a/torchx/cli/cmd_run.py b/torchx/cli/cmd_run.py index cbaeaca47..a6dc49831 100644 --- a/torchx/cli/cmd_run.py +++ b/torchx/cli/cmd_run.py @@ -17,7 +17,8 @@ from pyre_extensions import none_throws from torchx.cli.cmd_base import SubCommand from torchx.cli.cmd_log import get_logs -from torchx.runner import Runner, config, get_runner +from torchx.runner import Runner, config +from torchx.runner.workspaces import get_workspace_runner, WorkspaceRunner from torchx.schedulers import get_default_scheduler_name, get_scheduler_factories from torchx.specs import CfgVal from torchx.specs.finder import ( @@ -138,10 +139,14 @@ def _run(self, runner: Runner, args: argparse.Namespace) -> None: " (e.g. `local_cwd`)" ) - run_opts = get_runner().run_opts() + run_opts = runner.run_opts() scheduler_opts = run_opts[args.scheduler] cfg = _parse_run_config(args.scheduler_args, scheduler_opts) config.apply(scheduler=args.scheduler, cfg=cfg) + config_files = config.find_configs() + workspace = ( + "file://" + os.path.dirname(config_files[0]) if config_files else None + ) if len(args.conf_args) < 1: none_throws(self._subparser).error( @@ -154,9 +159,18 @@ def _run(self, runner: Runner, args: argparse.Namespace) -> None: conf_file, conf_args = args.conf_args[0], args.conf_args[1:] try: if args.dryrun: - dryrun_info = runner.dryrun_component( - conf_file, conf_args, args.scheduler, cfg - ) + if isinstance(runner, WorkspaceRunner): + dryrun_info = runner.dryrun_component( + conf_file, + conf_args, + args.scheduler, + workspace=workspace, + cfg=cfg, + ) + else: + dryrun_info = runner.dryrun_component( + conf_file, conf_args, args.scheduler, cfg=cfg + ) logger.info( "\n=== APPLICATION ===\n" f"{pformat(asdict(dryrun_info._app), indent=2, width=80)}" @@ -164,12 +178,21 @@ def _run(self, runner: Runner, args: argparse.Namespace) -> None: logger.info("\n=== SCHEDULER REQUEST ===\n" f"{dryrun_info}") else: - app_handle = runner.run_component( - conf_file, - conf_args, - args.scheduler, - cfg, - ) + if isinstance(runner, WorkspaceRunner): + app_handle = runner.run_component( + conf_file, + conf_args, + args.scheduler, + workspace=workspace, + cfg=cfg, + ) + else: + app_handle = runner.run_component( + conf_file, + conf_args, + args.scheduler, + cfg=cfg, + ) # DO NOT delete this line. It is used by slurm tests to retrieve the app id print(app_handle) @@ -200,7 +223,7 @@ def _run(self, runner: Runner, args: argparse.Namespace) -> None: def run(self, args: argparse.Namespace) -> None: os.environ["TORCHX_CONTEXT_NAME"] = os.getenv("TORCHX_CONTEXT_NAME", "cli_run") - with get_runner() as runner: + with get_workspace_runner() as runner: self._run(runner, args) def _wait_and_exit(self, runner: Runner, app_handle: str, log: bool) -> None: diff --git a/torchx/runner/config.py b/torchx/runner/config.py index a0ac1a8d9..c2b749313 100644 --- a/torchx/runner/config.py +++ b/torchx/runner/config.py @@ -96,12 +96,13 @@ def my_component(a: int) -> specs.AppDef: import configparser as configparser import logging from pathlib import Path -from typing import Dict, List, Optional, TextIO +from typing import Dict, List, Optional, TextIO, Iterable from torchx.schedulers import Scheduler, get_schedulers from torchx.specs import CfgVal, get_type_name from torchx.specs.api import runopt +CONFIG_FILE = ".torchxconfig" _NONE = "None" @@ -235,15 +236,28 @@ def apply( Then after the method call, ``cfg={"foo":"bar","hello":"world"}``. """ + for configfile in find_configs(dirs): + with open(configfile, "r") as f: + load(scheduler, f, cfg) + log.info(f"loaded configs from {configfile}") + + +def find_configs(dirs: Optional[Iterable[str]] = None) -> List[str]: + """ + find_configs returns all the .torchxconfig files it finds in the specified + directories. If directories is empty it checks the local directory. + """ if not dirs: dirs = [str(Path.cwd())] + config_files = [] + for d in dirs: - configfile = Path(d) / ".torchxconfig" + configfile = Path(d) / CONFIG_FILE if configfile.exists(): - with open(str(configfile), "r") as f: - load(scheduler, f, cfg) - log.info(f"loaded configs from {configfile}") + config_files.append(str(configfile)) + + return config_files def load(scheduler: str, f: TextIO, cfg: Dict[str, CfgVal]) -> None: diff --git a/torchx/runner/test/workspaces_test.py b/torchx/runner/test/workspaces_test.py new file mode 100644 index 000000000..b9bd04521 --- /dev/null +++ b/torchx/runner/test/workspaces_test.py @@ -0,0 +1,52 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the BSD-style license found in the +# LICENSE file in the root directory of this source tree. + +import unittest +from typing import Mapping +from unittest.mock import MagicMock, call + +from torchx.runner.workspaces import ( + get_workspace_runner, + WorkspaceRunner, +) +from torchx.schedulers.api import ( + WorkspaceScheduler, +) +from torchx.specs.api import AppDryRunInfo, AppDef, CfgVal + + +class WorkspaceRunnerTest(unittest.TestCase): + def test_get_workspace_runner(self) -> None: + self.assertIsInstance(get_workspace_runner(), WorkspaceRunner) + + def test_workspace_runner(self) -> None: + scheduler = MagicMock(spec=WorkspaceScheduler) + + def submit_dryrun(app: AppDef, cfg: Mapping[str, CfgVal]) -> AppDryRunInfo[str]: + self.assertEqual(app.roles[0].image, "$img") + + dryrun_info: AppDryRunInfo[str] = AppDryRunInfo(str("req"), str) + dryrun_info._app = app + return dryrun_info + + scheduler.submit_dryrun = submit_dryrun + scheduler.build_workspace_image.return_value = "$img" + runner = WorkspaceRunner( + "workspaces_test", + schedulers={ + "mock": scheduler, + }, + ) + app_args = ["--image", "dummy_image", "--script", "test.py"] + workspace = "memory:///foo" + ret = runner.run_component("dist.ddp", app_args, "mock", workspace) + + self.assertEqual( + scheduler.build_workspace_image.mock_calls, + [ + call("dummy_image", workspace), + ], + ) diff --git a/torchx/runner/workspaces.py b/torchx/runner/workspaces.py new file mode 100644 index 000000000..85760062b --- /dev/null +++ b/torchx/runner/workspaces.py @@ -0,0 +1,122 @@ +#!/usr/bin/env python3 +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the BSD-style license found in the +# LICENSE file in the root directory of this source tree. + +""" +This contains an experimental patching runner that can overlay a workspace on +top of the provided image. This allows for fast iterations without having to +rebuild a new image with your application code. + +The workspace is a fsspec filesystem that gets walked and overlaid on the image. +This allows having multiple different interfaces such as from Jupyter Notebooks +as well as local file systems. +""" + +import logging +import warnings +from typing import List, Mapping, Optional + +from torchx.runner.api import Runner +from torchx.schedulers import get_schedulers +from torchx.schedulers.api import WorkspaceScheduler +from torchx.specs import ( + from_function, + AppDef, + SchedulerBackend, + AppHandle, + CfgVal, + AppDryRunInfo, +) +from torchx.specs.finder import get_component + +log: logging.Logger = logging.getLogger(__name__) + + +class WorkspaceRunner(Runner): + """ + WorkspaceRunner is a special runner that takes an optional workspace + argument for the run and dryrun_component methods. If a workspace is + specified a new image will be built with the workspace overlaid on top. + + WARNING: This is in prototype stage and may have backwards incompatible + changes made without notice. + """ + + def run_component( + self, + component: str, + component_args: List[str], + scheduler: SchedulerBackend, + workspace: Optional[str], + cfg: Optional[Mapping[str, CfgVal]] = None, + ) -> AppHandle: + dryrun_info = self.dryrun_component( + component, + component_args, + scheduler, + workspace, + cfg, + ) + return self.schedule(dryrun_info) + + def dryrun_component( + self, + component: str, + component_args: List[str], + scheduler: SchedulerBackend, + workspace: Optional[str], + cfg: Optional[Mapping[str, CfgVal]] = None, + ) -> AppDryRunInfo: + component_def = get_component(component) + app = from_function(component_def.fn, component_args) + return self.dryrun(app, scheduler, workspace, cfg) + + def dryrun( + self, + app: AppDef, + scheduler: SchedulerBackend, + workspace: Optional[str], + cfg: Optional[Mapping[str, CfgVal]] = None, + ) -> AppDryRunInfo: + if workspace: + self._patch_app(app, scheduler, workspace) + + return super().dryrun(app, scheduler, cfg) + + def _patch_app(self, app: AppDef, scheduler: str, workspace: str) -> None: + sched = self._scheduler(scheduler) + if not isinstance(sched, WorkspaceScheduler): + warnings.warn( + f"can't apply workspace to image since {sched} is not a " + "WorkspaceScheduler" + ) + return + + log.info(f"building patch images for workspace: {workspace}...") + + images = {} + for role in app.roles: + img = images.get(role.image) + if not img: + img = sched.build_workspace_image(role.image, workspace) + images[role.image] = img + role.image = img + + def __enter__(self) -> "WorkspaceRunner": + return self + + +def get_workspace_runner( + name: Optional[str] = None, **scheduler_params: object +) -> WorkspaceRunner: + """ + Returns a WorkspaceRunner. See torchx.runner.get_runner for more info. + """ + if not name: + name = "torchx" + + schedulers = get_schedulers(session_name=name, **scheduler_params) + return WorkspaceRunner(name, schedulers) diff --git a/torchx/schedulers/api.py b/torchx/schedulers/api.py index 176a4fc24..332171302 100644 --- a/torchx/schedulers/api.py +++ b/torchx/schedulers/api.py @@ -280,6 +280,22 @@ def _validate(self, app: AppDef, scheduler: SchedulerBackend) -> None: ) +class WorkspaceScheduler(Scheduler): + """ + WorkspaceScheduler is a Scheduler that has workspace support. + + Experimental: this interface may change without notice. + """ + + @abc.abstractmethod + def build_workspace_image(self, img: str, workspace: str) -> str: + """ + build_workspace_image builds a new image with the workspace filesystem + overlaid on it and returns the new image name. + """ + ... + + def filter_regex(regex: str, data: Iterable[str]) -> Iterable[str]: """ filter_regex takes a string iterator and returns an iterator that only has diff --git a/torchx/schedulers/docker_scheduler.py b/torchx/schedulers/docker_scheduler.py index 78e78c43b..d303b0a6d 100644 --- a/torchx/schedulers/docker_scheduler.py +++ b/torchx/schedulers/docker_scheduler.py @@ -31,7 +31,7 @@ from torchx.schedulers.api import ( AppDryRunInfo, DescribeAppResponse, - Scheduler, + WorkspaceScheduler, filter_regex, Stream, ) @@ -102,7 +102,7 @@ def has_docker() -> bool: return False -class DockerScheduler(Scheduler): +class DockerScheduler(WorkspaceScheduler): """ DockerScheduler is a TorchX scheduling interface to Docker. @@ -415,7 +415,6 @@ def build_workspace_image(self, img: str, workspace: str) -> str: Returns: The new Docker image ID. - """ return _build_container_from_workspace(self._client(), img, workspace) @@ -444,7 +443,6 @@ def _copy_to_tarfile(workspace: str, tf: tarfile.TarFile) -> None: assert isinstance(dir, str), "path must be str" relpath = posixpath.relpath(dir, path) for file, info in files.items(): - print(relpath, dir, file, info) with fs.open(info["name"], "rb") as f: tinfo = tarfile.TarInfo(posixpath.join(relpath, file)) tinfo.size = info["size"] @@ -486,5 +484,5 @@ def _build_container_from_workspace( ) finally: context.close() - print(image) + return image.id