Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 35 additions & 12 deletions torchx/cli/cmd_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
from pyre_extensions import none_throws
from torchx.cli.cmd_base import SubCommand
from torchx.cli.cmd_log import get_logs
from torchx.runner import Runner, config, get_runner
from torchx.runner import Runner, config
from torchx.runner.workspaces import get_workspace_runner, WorkspaceRunner
from torchx.schedulers import get_default_scheduler_name, get_scheduler_factories
from torchx.specs import CfgVal
from torchx.specs.finder import (
Expand Down Expand Up @@ -138,10 +139,14 @@ def _run(self, runner: Runner, args: argparse.Namespace) -> None:
" (e.g. `local_cwd`)"
)

run_opts = get_runner().run_opts()
run_opts = runner.run_opts()
scheduler_opts = run_opts[args.scheduler]
cfg = _parse_run_config(args.scheduler_args, scheduler_opts)
config.apply(scheduler=args.scheduler, cfg=cfg)
config_files = config.find_configs()
workspace = (
"file://" + os.path.dirname(config_files[0]) if config_files else None
)

if len(args.conf_args) < 1:
none_throws(self._subparser).error(
Expand All @@ -154,22 +159,40 @@ def _run(self, runner: Runner, args: argparse.Namespace) -> None:
conf_file, conf_args = args.conf_args[0], args.conf_args[1:]
try:
if args.dryrun:
dryrun_info = runner.dryrun_component(
conf_file, conf_args, args.scheduler, cfg
)
if isinstance(runner, WorkspaceRunner):
dryrun_info = runner.dryrun_component(
conf_file,
conf_args,
args.scheduler,
workspace=workspace,
cfg=cfg,
)
else:
dryrun_info = runner.dryrun_component(
conf_file, conf_args, args.scheduler, cfg=cfg
)
logger.info(
"\n=== APPLICATION ===\n"
f"{pformat(asdict(dryrun_info._app), indent=2, width=80)}"
)

logger.info("\n=== SCHEDULER REQUEST ===\n" f"{dryrun_info}")
else:
app_handle = runner.run_component(
conf_file,
conf_args,
args.scheduler,
cfg,
)
if isinstance(runner, WorkspaceRunner):
app_handle = runner.run_component(
conf_file,
conf_args,
args.scheduler,
workspace=workspace,
cfg=cfg,
)
else:
app_handle = runner.run_component(
conf_file,
conf_args,
args.scheduler,
cfg=cfg,
)
# DO NOT delete this line. It is used by slurm tests to retrieve the app id
print(app_handle)

Expand Down Expand Up @@ -200,7 +223,7 @@ def _run(self, runner: Runner, args: argparse.Namespace) -> None:

def run(self, args: argparse.Namespace) -> None:
os.environ["TORCHX_CONTEXT_NAME"] = os.getenv("TORCHX_CONTEXT_NAME", "cli_run")
with get_runner() as runner:
with get_workspace_runner() as runner:
self._run(runner, args)

def _wait_and_exit(self, runner: Runner, app_handle: str, log: bool) -> None:
Expand Down
24 changes: 19 additions & 5 deletions torchx/runner/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,13 @@ def my_component(a: int) -> specs.AppDef:
import configparser as configparser
import logging
from pathlib import Path
from typing import Dict, List, Optional, TextIO
from typing import Dict, List, Optional, TextIO, Iterable

from torchx.schedulers import Scheduler, get_schedulers
from torchx.specs import CfgVal, get_type_name
from torchx.specs.api import runopt

CONFIG_FILE = ".torchxconfig"

_NONE = "None"

Expand Down Expand Up @@ -235,15 +236,28 @@ def apply(
Then after the method call, ``cfg={"foo":"bar","hello":"world"}``.
"""

for configfile in find_configs(dirs):
with open(configfile, "r") as f:
load(scheduler, f, cfg)
log.info(f"loaded configs from {configfile}")


def find_configs(dirs: Optional[Iterable[str]] = None) -> List[str]:
"""
find_configs returns all the .torchxconfig files it finds in the specified
directories. If directories is empty it checks the local directory.
"""
if not dirs:
dirs = [str(Path.cwd())]

config_files = []

for d in dirs:
configfile = Path(d) / ".torchxconfig"
configfile = Path(d) / CONFIG_FILE
if configfile.exists():
with open(str(configfile), "r") as f:
load(scheduler, f, cfg)
log.info(f"loaded configs from {configfile}")
config_files.append(str(configfile))

return config_files


def load(scheduler: str, f: TextIO, cfg: Dict[str, CfgVal]) -> None:
Expand Down
52 changes: 52 additions & 0 deletions torchx/runner/test/workspaces_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.

import unittest
from typing import Mapping
from unittest.mock import MagicMock, call

from torchx.runner.workspaces import (
get_workspace_runner,
WorkspaceRunner,
)
from torchx.schedulers.api import (
WorkspaceScheduler,
)
from torchx.specs.api import AppDryRunInfo, AppDef, CfgVal


class WorkspaceRunnerTest(unittest.TestCase):
def test_get_workspace_runner(self) -> None:
self.assertIsInstance(get_workspace_runner(), WorkspaceRunner)

def test_workspace_runner(self) -> None:
scheduler = MagicMock(spec=WorkspaceScheduler)

def submit_dryrun(app: AppDef, cfg: Mapping[str, CfgVal]) -> AppDryRunInfo[str]:
self.assertEqual(app.roles[0].image, "$img")

dryrun_info: AppDryRunInfo[str] = AppDryRunInfo(str("req"), str)
dryrun_info._app = app
return dryrun_info

scheduler.submit_dryrun = submit_dryrun
scheduler.build_workspace_image.return_value = "$img"
runner = WorkspaceRunner(
"workspaces_test",
schedulers={
"mock": scheduler,
},
)
app_args = ["--image", "dummy_image", "--script", "test.py"]
workspace = "memory:///foo"
ret = runner.run_component("dist.ddp", app_args, "mock", workspace)

self.assertEqual(
scheduler.build_workspace_image.mock_calls,
[
call("dummy_image", workspace),
],
)
122 changes: 122 additions & 0 deletions torchx/runner/workspaces.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
#!/usr/bin/env python3
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.

"""
This contains an experimental patching runner that can overlay a workspace on
top of the provided image. This allows for fast iterations without having to
rebuild a new image with your application code.

The workspace is a fsspec filesystem that gets walked and overlaid on the image.
This allows having multiple different interfaces such as from Jupyter Notebooks
as well as local file systems.
"""

import logging
import warnings
from typing import List, Mapping, Optional

from torchx.runner.api import Runner
from torchx.schedulers import get_schedulers
from torchx.schedulers.api import WorkspaceScheduler
from torchx.specs import (
from_function,
AppDef,
SchedulerBackend,
AppHandle,
CfgVal,
AppDryRunInfo,
)
from torchx.specs.finder import get_component

log: logging.Logger = logging.getLogger(__name__)


class WorkspaceRunner(Runner):
"""
WorkspaceRunner is a special runner that takes an optional workspace
argument for the run and dryrun_component methods. If a workspace is
specified a new image will be built with the workspace overlaid on top.

WARNING: This is in prototype stage and may have backwards incompatible
changes made without notice.
"""

def run_component(
self,
component: str,
component_args: List[str],
scheduler: SchedulerBackend,
workspace: Optional[str],
cfg: Optional[Mapping[str, CfgVal]] = None,
) -> AppHandle:
dryrun_info = self.dryrun_component(
component,
component_args,
scheduler,
workspace,
cfg,
)
return self.schedule(dryrun_info)

def dryrun_component(
self,
component: str,
component_args: List[str],
scheduler: SchedulerBackend,
workspace: Optional[str],
cfg: Optional[Mapping[str, CfgVal]] = None,
) -> AppDryRunInfo:
component_def = get_component(component)
app = from_function(component_def.fn, component_args)
return self.dryrun(app, scheduler, workspace, cfg)

def dryrun(
self,
app: AppDef,
scheduler: SchedulerBackend,
workspace: Optional[str],
cfg: Optional[Mapping[str, CfgVal]] = None,
) -> AppDryRunInfo:
if workspace:
self._patch_app(app, scheduler, workspace)

return super().dryrun(app, scheduler, cfg)

def _patch_app(self, app: AppDef, scheduler: str, workspace: str) -> None:
sched = self._scheduler(scheduler)
if not isinstance(sched, WorkspaceScheduler):
warnings.warn(
f"can't apply workspace to image since {sched} is not a "
"WorkspaceScheduler"
)
return

log.info(f"building patch images for workspace: {workspace}...")

images = {}
for role in app.roles:
img = images.get(role.image)
if not img:
img = sched.build_workspace_image(role.image, workspace)
images[role.image] = img
role.image = img

def __enter__(self) -> "WorkspaceRunner":
return self


def get_workspace_runner(
name: Optional[str] = None, **scheduler_params: object
) -> WorkspaceRunner:
"""
Returns a WorkspaceRunner. See torchx.runner.get_runner for more info.
"""
if not name:
name = "torchx"

schedulers = get_schedulers(session_name=name, **scheduler_params)
return WorkspaceRunner(name, schedulers)
16 changes: 16 additions & 0 deletions torchx/schedulers/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,22 @@ def _validate(self, app: AppDef, scheduler: SchedulerBackend) -> None:
)


class WorkspaceScheduler(Scheduler):
"""
WorkspaceScheduler is a Scheduler that has workspace support.

Experimental: this interface may change without notice.
"""

@abc.abstractmethod
def build_workspace_image(self, img: str, workspace: str) -> str:
"""
build_workspace_image builds a new image with the workspace filesystem
overlaid on it and returns the new image name.
"""
...


def filter_regex(regex: str, data: Iterable[str]) -> Iterable[str]:
"""
filter_regex takes a string iterator and returns an iterator that only has
Expand Down
8 changes: 3 additions & 5 deletions torchx/schedulers/docker_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
from torchx.schedulers.api import (
AppDryRunInfo,
DescribeAppResponse,
Scheduler,
WorkspaceScheduler,
filter_regex,
Stream,
)
Expand Down Expand Up @@ -102,7 +102,7 @@ def has_docker() -> bool:
return False


class DockerScheduler(Scheduler):
class DockerScheduler(WorkspaceScheduler):
"""
DockerScheduler is a TorchX scheduling interface to Docker.

Expand Down Expand Up @@ -415,7 +415,6 @@ def build_workspace_image(self, img: str, workspace: str) -> str:

Returns:
The new Docker image ID.

"""
return _build_container_from_workspace(self._client(), img, workspace)

Expand Down Expand Up @@ -444,7 +443,6 @@ def _copy_to_tarfile(workspace: str, tf: tarfile.TarFile) -> None:
assert isinstance(dir, str), "path must be str"
relpath = posixpath.relpath(dir, path)
for file, info in files.items():
print(relpath, dir, file, info)
with fs.open(info["name"], "rb") as f:
tinfo = tarfile.TarInfo(posixpath.join(relpath, file))
tinfo.size = info["size"]
Expand Down Expand Up @@ -486,5 +484,5 @@ def _build_container_from_workspace(
)
finally:
context.close()
print(image)

return image.id