Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 0 additions & 26 deletions machine/jobs/async_scheduler.py

This file was deleted.

76 changes: 39 additions & 37 deletions machine/jobs/build_clearml_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,12 @@
from datetime import datetime
from typing import Callable, Optional, Union, cast

import aiohttp
from clearml import Task
from dynaconf.base import Settings

from ..utils.canceled_error import CanceledError
from ..utils.phased_progress_reporter import PhaseProgressStatus
from ..utils.progress_status import ProgressStatus
from .async_scheduler import AsyncScheduler


class ProgressInfo:
Expand All @@ -37,7 +35,7 @@ def clearml_check_canceled() -> None:


def get_clearml_progress_caller(
progress_info: ProgressInfo, task: Task, scheduler: AsyncScheduler, logger: logging.Logger
progress_info: ProgressInfo, task: Task, logger: logging.Logger
) -> Callable[[ProgressStatus], None]:
def clearml_progress(progress_status: ProgressStatus) -> None:
percent_completed: Optional[int] = None
Expand All @@ -51,11 +49,8 @@ def clearml_progress(progress_status: ProgressStatus) -> None:
progress_info.last_progress_time is None
or (current_time - progress_info.last_progress_time).seconds > 1
):
scheduler.schedule(
update_runtime_properties(
task,
create_runtime_properties(task, percent_completed, message, progress_status),
)
report_clearml_progress(
task=task, percent_completed=percent_completed, message=message, progress_status=progress_status
)
progress_info.last_progress_time = current_time
progress_info.last_percent_completed = percent_completed
Expand All @@ -64,6 +59,42 @@ def clearml_progress(progress_status: ProgressStatus) -> None:
return clearml_progress


def report_clearml_progress(
task: Task,
percent_completed: Optional[int] = None,
message: Optional[str] = None,
progress_status: Optional[ProgressStatus] = None,
) -> None:
if percent_completed is not None:
task.set_progress(percent_completed)
props = []
if message is not None:
props.append({"type": str, "name": "message", "description": "Build Message", "value": message})
# Report the step within the phase
if progress_status is not None and isinstance(progress_status, PhaseProgressStatus):
if progress_status.phase_stage is not None:
if progress_status.phase_step is not None:
props.append(
{
"type": int,
"name": f"{progress_status.phase_stage}_step",
"description": "Phase Step",
"value": progress_status.phase_step,
}
)
if progress_status.step_count is not None:
props.append(
{
"type": int,
"name": f"{progress_status.phase_stage}_step_count",
"description": "Maximum Phase Step",
"value": progress_status.step_count,
}
)
if len(props) > 0:
task.set_user_properties(*props)


def get_local_progress_caller(progress_info: ProgressInfo, logger: logging.Logger) -> Callable[[ProgressStatus], None]:

def local_progress(progress_status: ProgressStatus) -> None:
Expand Down Expand Up @@ -91,32 +122,3 @@ def update_settings(settings: Settings, args: dict):
raise TypeError(f"Build options could not be parsed: {e}") from e
settings.update({settings.model_type: build_options})
settings.data_dir = os.path.expanduser(cast(str, settings.data_dir))


async def update_runtime_properties(task, runtime_props: dict) -> None:
current_runtime_properties = task.data.runtime or {}
current_runtime_properties.update(runtime_props)
async with aiohttp.ClientSession(
base_url=task.session.host, headers={"Authorization": f"Bearer {task.session.token}"}
) as session:
json = {"task": task.id, "runtime": runtime_props, "force": True}
async with session.post("/tasks.edit", json=json) as response:
response.raise_for_status()


def create_runtime_properties(
task, percent_completed: Optional[int], message: Optional[str], status: Optional[ProgressStatus]
) -> dict:
runtime_props = task.data.runtime.copy() or {}
if percent_completed is not None:
runtime_props["progress"] = str(percent_completed)
if message is not None:
runtime_props["message"] = message
# Report the step within the phase
if status is not None and isinstance(status, PhaseProgressStatus):
if status.phase_stage is not None:
if status.phase_step is not None:
runtime_props[f"{status.phase_stage}_step"] = str(status.phase_step)
if status.step_count is not None:
runtime_props[f"{status.phase_stage}_step_count"] = str(status.step_count)
return runtime_props
11 changes: 3 additions & 8 deletions machine/jobs/build_nmt_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@

from ..utils.canceled_error import CanceledError
from ..utils.progress_status import ProgressStatus
from .async_scheduler import AsyncScheduler
from .build_clearml_helper import create_runtime_properties, update_runtime_properties
from .build_clearml_helper import report_clearml_progress
from .config import SETTINGS
from .nmt_engine_build_job import NmtEngineBuildJob
from .nmt_model_factory import NmtModelFactory
Expand All @@ -31,7 +30,6 @@ def run(args: dict) -> None:
task = None
if args["clearml"]:
task = Task.init()
scheduler = AsyncScheduler()

def clearml_check_canceled() -> None:
if task.get_status() == "stopped":
Expand All @@ -41,11 +39,8 @@ def clearml_check_canceled() -> None:

def clearml_progress(status: ProgressStatus) -> None:
if status.percent_completed is not None:
scheduler.schedule(
update_runtime_properties(
task,
create_runtime_properties(task, round(status.percent_completed * 100), None, status),
)
report_clearml_progress(
task=task, percent_completed=round(status.percent_completed * 100), progress_status=status
)

progress = clearml_progress
Expand Down
20 changes: 5 additions & 15 deletions machine/jobs/build_smt_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,11 @@
from clearml import Task

from ..utils.progress_status import ProgressStatus
from .async_scheduler import AsyncScheduler
from .build_clearml_helper import (
ProgressInfo,
create_runtime_properties,
get_clearml_check_canceled,
get_clearml_progress_caller,
get_local_progress_caller,
update_runtime_properties,
update_settings,
)
from .config import SETTINGS
Expand All @@ -34,17 +31,15 @@ def run(args: dict) -> None:
progress: Callable[[ProgressStatus], None]
check_canceled: Optional[Callable[[], None]] = None
task = None
scheduler: Optional[AsyncScheduler] = None
progress_info = ProgressInfo()
if args["clearml"]:
task = Task.init()
scheduler = AsyncScheduler()

check_canceled = get_clearml_check_canceled(progress_info, task)

task.reload()

progress = get_clearml_progress_caller(progress_info, task, scheduler, logger)
progress = get_clearml_progress_caller(progress_info, task, logger)

else:
progress = get_local_progress_caller(ProgressInfo(), logger)
Expand All @@ -66,12 +61,10 @@ def run(args: dict) -> None:

smt_engine_build_job = SmtEngineBuildJob(SETTINGS, smt_model_factory, shared_file_service)
train_corpus_size, confidence = smt_engine_build_job.run(progress, check_canceled)
if scheduler is not None and task is not None:
scheduler.schedule(
update_runtime_properties(
task,
create_runtime_properties(task, 100, "Completed", None),
)
if task is not None:
task.set_progress(100)
task.set_user_properties(
{"type": str, "name": "message", "description": "Build Message", "value": "Completed"}
)
task.get_logger().report_single_value(name="train_corpus_size", value=train_corpus_size)
task.get_logger().report_single_value(name="confidence", value=round(confidence, 4))
Expand All @@ -83,9 +76,6 @@ def run(args: dict) -> None:
else:
task.mark_failed(status_reason=type(e).__name__, status_message=str(e))
raise e
finally:
if scheduler is not None:
scheduler.stop()


def main() -> None:
Expand Down
20 changes: 5 additions & 15 deletions machine/jobs/build_word_alignment_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,11 @@
from clearml import Task

from ..utils.progress_status import ProgressStatus
from .async_scheduler import AsyncScheduler
from .build_clearml_helper import (
ProgressInfo,
create_runtime_properties,
get_clearml_check_canceled,
get_clearml_progress_caller,
get_local_progress_caller,
update_runtime_properties,
update_settings,
)
from .config import SETTINGS
Expand All @@ -35,17 +32,15 @@ def run(args: dict):
progress: Callable[[ProgressStatus], None]
check_canceled: Optional[Callable[[], None]] = None
task = None
scheduler: Optional[AsyncScheduler] = None
progress_info = ProgressInfo()
if args["clearml"]:
task = Task.init()
scheduler = AsyncScheduler()

check_canceled = get_clearml_check_canceled(progress_info, task)

task.reload()

progress = get_clearml_progress_caller(progress_info, task, scheduler, logger)
progress = get_clearml_progress_caller(progress_info, task, logger)

else:
progress = get_local_progress_caller(ProgressInfo(), logger)
Expand All @@ -67,12 +62,10 @@ def run(args: dict):
SETTINGS, word_alignment_model_factory, word_alignment_file_service
)
train_corpus_size = word_alignment_build_job.run(progress, check_canceled)
if scheduler is not None and task is not None:
scheduler.schedule(
update_runtime_properties(
task,
create_runtime_properties(task, 100, "Completed", None),
)
if task is not None:
task.set_progress(100)
task.set_user_properties(
{"type": str, "name": "message", "description": "Build Message", "value": "Completed"}
)
task.get_logger().report_single_value(name="train_corpus_size", value=train_corpus_size)
logger.info("Finished")
Expand All @@ -83,9 +76,6 @@ def run(args: dict):
else:
task.mark_failed(status_reason=type(e).__name__, status_message=str(e))
raise e
finally:
if scheduler is not None:
scheduler.stop()
return


Expand Down