Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ __pycache__/
*.py[cod]
*$py.class
*.metaflow
*.metaflow_spin
metaflow_card_cache/

build/
dist/
Expand Down
1 change: 1 addition & 0 deletions metaflow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ class and related decorators.
metadata,
get_metadata,
default_metadata,
inspect_spin,
Metaflow,
Flow,
Run,
Expand Down
86 changes: 73 additions & 13 deletions metaflow/cli.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import os
import functools
import inspect
import os
Expand All @@ -7,7 +8,6 @@

import metaflow.tracing as tracing
from metaflow._vendor import click
from metaflow.system import _system_logger, _system_monitor

from . import decorators, lint, metaflow_version, parameters, plugins
from .cli_args import cli_args
Expand All @@ -27,6 +27,8 @@
DEFAULT_PACKAGE_SUFFIXES,
)
from .metaflow_current import current
from .metaflow_profile import from_start
from metaflow.system import _system_monitor, _system_logger
from .metaflow_environment import MetaflowEnvironment
from .packaging_sys import MetaflowCodeContent
from .plugins import (
Expand All @@ -38,9 +40,9 @@
)
from .pylint_wrapper import PyLint
from .R import metaflow_r_version, use_r
from .util import get_latest_run_id, resolve_identity, decompress_list
from .user_configs.config_options import LocalFileInput, config_options
from .user_configs.config_parameters import ConfigValue
from .util import get_latest_run_id, resolve_identity

ERASE_TO_EOL = "\033[K"
HIGHLIGHT = "red"
Expand Down Expand Up @@ -125,6 +127,8 @@ def logger(body="", system_msg=False, head="", bad=False, timestamp=True, nl=Tru
"step": "metaflow.cli_components.step_cmd.step",
"run": "metaflow.cli_components.run_cmds.run",
"resume": "metaflow.cli_components.run_cmds.resume",
"spin": "metaflow.cli_components.run_cmds.spin",
"spin-step": "metaflow.cli_components.step_cmd.spin_step",
},
)
def cli(ctx):
Expand Down Expand Up @@ -318,6 +322,13 @@ def version(obj):
hidden=True,
is_eager=True,
)
@click.option(
"--spin-mode",
is_flag=True,
default=False,
help="Enable spin mode for metaflow cli commands. Setting this flag will result "
"in using spin metadata and spin datastore for executions"
)
@click.pass_context
def start(
ctx,
Expand All @@ -335,6 +346,7 @@ def start(
local_config_file=None,
config=None,
config_value=None,
spin_mode=False,
**deco_options
):
if quiet:
Expand All @@ -347,6 +359,7 @@ def start(
if use_r():
version = metaflow_r_version()

from_start("MetaflowCLI: Starting")
echo("Metaflow %s" % version, fg="magenta", bold=True, nl=False)
echo(" executing *%s*" % ctx.obj.flow.name, fg="magenta", nl=False)
echo(" for *%s*" % resolve_identity(), fg="magenta")
Expand All @@ -366,6 +379,7 @@ def start(
ctx.obj.check = functools.partial(_check, echo)
ctx.obj.top_cli = cli
ctx.obj.package_suffixes = package_suffixes.split(",")
ctx.obj.spin_mode = spin_mode

ctx.obj.datastore_impl = [d for d in DATASTORES if d.TYPE == datastore][0]

Expand Down Expand Up @@ -472,19 +486,12 @@ def start(
# set force rebuild flag for environments that support it.
ctx.obj.environment._force_rebuild = force_rebuild_environments
ctx.obj.environment.validate_environment(ctx.obj.logger, datastore)

ctx.obj.event_logger = LOGGING_SIDECARS[event_logger](
flow=ctx.obj.flow, env=ctx.obj.environment
)
ctx.obj.event_logger.start()
_system_logger.init_system_logger(ctx.obj.flow.name, ctx.obj.event_logger)

ctx.obj.monitor = MONITOR_SIDECARS[monitor](
flow=ctx.obj.flow, env=ctx.obj.environment
)
ctx.obj.monitor.start()
_system_monitor.init_system_monitor(ctx.obj.flow.name, ctx.obj.monitor)

ctx.obj.metadata = [m for m in METADATA_PROVIDERS if m.TYPE == metadata][0](
ctx.obj.environment, ctx.obj.flow, ctx.obj.event_logger, ctx.obj.monitor
)
Expand All @@ -498,6 +505,52 @@ def start(
)

ctx.obj.config_options = config_options
ctx.obj.is_spin = False
ctx.obj.skip_decorators = False

# Override values for spin steps, or if we are in spin mode
if hasattr(ctx, "saved_args") and ctx.saved_args and "spin" in ctx.saved_args[0] or ctx.obj.spin_mode:
# To minimize side effects for spin, we will only use the following:
# - local metadata provider,
# - local datastore,
# - local environment,
# - null event logger,
# - null monitor
ctx.obj.is_spin = True
if "--skip-decorators" in ctx.saved_args:
ctx.obj.skip_decorators = True

ctx.obj.event_logger = LOGGING_SIDECARS["nullSidecarLogger"](
flow=ctx.obj.flow, env=ctx.obj.environment
)
ctx.obj.monitor = MONITOR_SIDECARS["nullSidecarMonitor"](
flow=ctx.obj.flow, env=ctx.obj.environment
)
# Use spin metadata, spin datastore, and spin datastore root
ctx.obj.metadata = [m for m in METADATA_PROVIDERS if m.TYPE == "spin"][0](
ctx.obj.environment, ctx.obj.flow, ctx.obj.event_logger, ctx.obj.monitor
)
ctx.obj.datastore_impl = [d for d in DATASTORES if d.TYPE == "spin"][0]
datastore_root = ctx.obj.datastore_impl.get_datastore_root_from_config(
ctx.obj.echo, create_on_absent=True
)
ctx.obj.datastore_impl.datastore_root = datastore_root

ctx.obj.flow_datastore = FlowDataStore(
ctx.obj.flow.name,
ctx.obj.environment, # Same environment as run/resume
ctx.obj.metadata, # local metadata
ctx.obj.event_logger, # null event logger
ctx.obj.monitor, # null monitor
storage_impl=ctx.obj.datastore_impl,
)

# Start event logger and monitor
ctx.obj.event_logger.start()
_system_logger.init_system_logger(ctx.obj.flow.name, ctx.obj.event_logger)

ctx.obj.monitor.start()
_system_monitor.init_system_monitor(ctx.obj.flow.name, ctx.obj.monitor)

decorators._init(ctx.obj.flow)

Expand All @@ -512,9 +565,11 @@ def start(
ctx.obj.logger,
echo,
deco_options,
ctx.obj.is_spin,
ctx.obj.skip_decorators,
)

# In the case of run/resume, we will want to apply the TL decospecs
# In the case of run/resume/spin, we will want to apply the TL decospecs
# *after* the run decospecs so that they don't take precedence. In other
# words, for the same decorator, we want `myflow.py run --with foo` to
# take precedence over any other `foo` decospec
Expand Down Expand Up @@ -542,11 +597,10 @@ def start(
if (
hasattr(ctx, "saved_args")
and ctx.saved_args
and ctx.saved_args[0] not in ("run", "resume")
and ctx.saved_args[0] not in ("run", "resume", "spin")
):
# run/resume are special cases because they can add more decorators with --with,
# run/resume/spin are special cases because they can add more decorators with --with,
# so they have to take care of themselves.

all_decospecs = ctx.obj.tl_decospecs + list(
ctx.obj.environment.decospecs() or []
)
Expand All @@ -556,6 +610,9 @@ def start(
# or a scheduler setting them up in their own way.
if ctx.saved_args[0] not in ("step", "init"):
all_decospecs += DEFAULT_DECOSPECS.split()
elif ctx.saved_args[0] == "spin-step":
# If we are in spin-args, we will not attach any decorators
all_decospecs = []
if all_decospecs:
decorators._attach_decorators(ctx.obj.flow, all_decospecs)
decorators._init(ctx.obj.flow)
Expand All @@ -569,6 +626,9 @@ def start(
ctx.obj.environment,
ctx.obj.flow_datastore,
ctx.obj.logger,
# The last two arguments are only used for spin steps
ctx.obj.is_spin,
ctx.obj.skip_decorators,
)

# Check the graph again (mutators may have changed it)
Expand Down
Loading
Loading