Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]

### Added
- Monitor now checks each run of a study on every loop
- Garbage collection functionality for the database. Called with: `merlin database gc`, `merlin database garbage-collect`, or `merlin database cleanup`
- Built-in database garbage collection to the `merlin monitor`
- Can be disabled with `--disable-gc`
Expand Down
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
264 changes: 156 additions & 108 deletions merlin/monitor/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,25 @@ class Monitor:
however long the monitor lives. The class interacts with the Merlin database to retrieve study
and run information and uses a task server monitor to help manage workflow health.

The monitor supports tracking multiple concurrent runs of the same study, making it suitable for
iterative workflows and scenarios where multiple runs share the same worker pool. It continuously
polls the database for active runs and performs health checks on each one during every monitoring
cycle.

Attributes:
spec (MerlinSpec): The Merlin specification that defines the workflow.
sleep (int): The interval (in seconds) between monitoring checks.
no_restart (bool): If True, the monitor will not try to restart the workflow.
no_restart (bool): If True, the monitor will not try to restart workflows automatically.
task_server_monitor (TaskServerMonitor): A monitor for interacting with whichever task server
that the user is utilizing.
that the user is utilizing (e.g., Celery).
merlin_db (MerlinDatabase): Interface for accessing and querying the Merlin database.

Methods:
monitor_all_runs: Monitors all runs of the current study until they are complete.
monitor_single_run: Monitors a single run of a study until it completes.
wait_for_workers: Wait for workers to start before proceeding with health checks.
check_task_activity: Determine if there is active task activity for a run.
check_run_health: Perform health checks and restart stalled workflows.
restart_workflow: Restart a run of a workflow.
"""

Expand Down Expand Up @@ -116,56 +124,31 @@ def _validate_run_workspace(self, run: RunEntity) -> bool:
workspace = run.get_workspace()
if not os.path.exists(workspace):
LOG.error(
f"Run {run.get_id()} has an invalid workspace '{workspace}'. "
f"Run {run.get_id()} has an invalid or inaccessible workspace '{workspace}'. "
"This run will be skipped. Consider running garbage collection "
"to clean up stale entries."
)
return False
return True

def monitor_all_runs(self):
"""
Monitors all runs of the current study until they are complete. For each run, it checks
if the run is already complete. If not, it monitors the run until it finishes. This
method ensures that all runs in the study are processed. This is necessary to be able to
monitor iterative workflows.

The method retrieves all runs from the database and iterates through them sequentially.
If a run is incomplete, it calls [`monitor_single_run`][monitor.monitor.Monitor.monitor_single_run]
to monitor it until completion.
def wait_for_workers(self, run: RunEntity):
"""
study_entity = self.merlin_db.get("study", self.spec.name)

index = 0
while True:
# Always refresh the list at the start of the loop; there could be new runs (think iterative studies)
all_runs = [self.merlin_db.get("run", run_id) for run_id in study_entity.get_runs()]
if index >= len(all_runs): # Break if there are no more runs to process
break

run = all_runs[index]
run_workspace = run.get_workspace()
LOG.info(f"Monitor: Checking if run with workspace '{run_workspace}' has completed...")

if run.run_complete:
LOG.info(
f"Monitor: Determined that run with workspace '{run_workspace}' has already completed. "
"Moving on to the next run."
)
index += 1
continue
Wait for all workers to be ready before proceeding with the health check of the run.

LOG.info(f"Monitor: Run with workspace '{run_workspace}' has not yet completed.")
This method retrieves the logical worker names for the given run from the database and
delegates to the task server monitor to wait for them to become available. This ensures
that health checks and task activity monitoring don't begin until the workers are fully
initialized.

try:
# Monitor the run until it completes
self.monitor_single_run(run)
except RunNotFoundError as e:
LOG.error(f"Run with workspace '{run_workspace}' no longer exists in database: {e}. " "Skipping to next run.")

index += 1
Args:
run: A RunEntity instance representing the run to monitor.
"""
worker_names = [self.merlin_db.get("logical_worker", worker_id=wid).get_name() for wid in run.get_workers()]
LOG.info(f"Monitor: Waiting for the following workers to start: {worker_names}...")
self.task_server_monitor.wait_for_workers(worker_names, self.sleep)
LOG.info("Monitor: Workers have started.")

def _check_task_activity(self, run: RunEntity) -> bool:
def check_task_activity(self, run: RunEntity) -> bool:
"""
Checks whether there is active task activity for the given run.

Expand All @@ -191,94 +174,159 @@ def _check_task_activity(self, run: RunEntity) -> bool:

return False

def _handle_transient_exception(self, exc: Exception):
def restart_workflow(self, run: RunEntity):
"""
Handles transient exceptions that may occur during monitoring.

This method logs the exception type, message, and full traceback, then
sleeps for the configured interval before retrying. It is designed to
gracefully handle recoverable errors such as Redis timeouts or broker issues.
Restart a run of a workflow.

Args:
exc (Exception): The exception instance that was caught during execution.
"""
LOG.warning(f"{exc.__class__.__name__} occurred:\n{exc}")
LOG.warning(f"Full traceback:\n{traceback.format_exc()}")
time.sleep(self.sleep)
run: A [`RunEntity`][db_scripts.entities.run_entity.RunEntity] instance representing
the run that's going to be restarted.

def monitor_single_run(self, run: RunEntity):
Raises:
RestartException: If the workflow restart process fails.
"""
Monitors a single run of a study until it completes to ensure that the allocation stays alive
and workflows are restarted if necessary.
try:
run_workspace = verify_dirpath(run.get_workspace())
LOG.info(f"Monitor: Restarting workflow for run with workspace '{run_workspace}'...")
restart_proc = subprocess.run(f"merlin restart {run_workspace}", shell=True, capture_output=True, text=True)
if restart_proc.returncode != 0:
LOG.error(f"Monitor: Failed to restart workflow: {restart_proc.stderr}")
raise RestartException(f"Restart process failed with error: {restart_proc.stderr}")
LOG.info(f"Monitor: Workflow restarted successfully: {restart_proc.stdout}")
except ValueError:
LOG.warning(
f"Monitor: Run with workspace '{run.get_workspace()}' was not found. Ignoring the restart of this workspace."
)

Args:
run: A [`RunEntity`][db_scripts.entities.run_entity.RunEntity] instance representing
the run that's going to be monitored.
def check_run_health(self, run: RunEntity):
"""
run_workspace = run.get_workspace()
Check the health of a single run and restart if necessary.

# Validate workspace exists before monitoring
if not self._validate_run_workspace(run):
raise RunNotFoundError(f"Cannot monitor run with invalid workspace '{run_workspace}'")
This method performs worker health checks to detect and restart dead workers,
monitors task activity to determine if the workflow is making progress, and
automatically restarts stalled workflows (unless auto-restart is disabled).

run_complete = run.run_complete # Saving this to a variable as it queries the db each time it's called
The health check considers a workflow stalled if there are no tasks in the queues,
no workers processing tasks, and the run is not marked as complete. This typically
indicates a workflow that has hung and needs to be restarted.

LOG.info(f"Monitor: Monitoring run with workspace '{run_workspace}'...")
Transient exceptions such as Redis timeouts are caught and handled gracefully
to avoid terminating the monitoring process.

# Wait for workers to spin up before checking on tasks
worker_names = [self.merlin_db.get("logical_worker", worker_id=wid).get_name() for wid in run.get_workers()]
LOG.info(f"Monitor: Waiting for the following workers to start: {worker_names}...")
self.task_server_monitor.wait_for_workers(worker_names, self.sleep)
LOG.info("Monitor: Workers have started.")
Args:
run: A RunEntity instance representing the run to monitor.
"""
try:
# Run worker health check (checks for dead workers and restarts them if necessary)
self.task_server_monitor.run_worker_health_check(run.get_workers())

# Check if any tasks are currently in the queues or if workers are processing tasks
active_tasks = self.check_task_activity(run)

# If no tasks are in the queues or being processed by workers and the run is not complete, we have a hanging
# workflow so restart it
if not active_tasks and not run.run_complete:
if self.no_restart:
LOG.warning(
f"Monitor: Determined restart was required for '{run.get_workspace()}' but auto-restart is disabled."
)
else:
self.restart_workflow(run)
except (RedisTimeoutError, OperationalError, TimeoutError) as exc:
LOG.warning(f"{exc.__class__.__name__} occurred:\n{exc}")
LOG.warning(f"Full traceback:\n{traceback.format_exc()}")
time.sleep(self.sleep)

while not run_complete:
try:
# Run worker health check (checks for dead workers and restarts them if necessary)
self.task_server_monitor.run_worker_health_check(run.get_workers())
def monitor_all_runs(self):
"""
Monitors all runs of the current study until they are complete.

# Check if any tasks are currently in the queues or if workers are processing tasks
active_tasks = self._check_task_activity(run)
This method continuously polls the database for all runs associated with the study,
filters out completed runs, and performs health checks on all active runs during each
monitoring cycle. This approach allows the monitor to track multiple concurrent runs
of the same study, which is essential for iterative workflows and scenarios where
multiple runs share a common worker pool.

run_complete = run.run_complete # Re-query db for this value
The method will continue monitoring until all runs have completed. It logs the status
of completed and active runs during each cycle for visibility into the monitoring process.
"""
study_entity = self.merlin_db.get("study", self.spec.name)

# If no tasks are in the queues or being processed by workers and the run is not complete, we have a hanging
# workflow so restart it
if not active_tasks and not run_complete:
if self.no_restart:
LOG.warning(
f"Monitor: Determined restart was required for '{run_workspace}' but auto-restart is disabled."
)
else:
self.restart_workflow(run)
while True:
all_runs = []
for run_id in study_entity.get_runs():
try:
all_runs.append(self.merlin_db.get("run", run_id))
except RunNotFoundError:
LOG.warning(f"Monitor: Run with ID '{run_id}' no longer exists in database. Skipping this run")
continue

# Filter to complete and incomplete runs
active_runs = []
completed_runs = []
for run in all_runs:
if not self._validate_run_workspace(run):
LOG.warning(f"Monitor: Skipping run '{run.get_workspace()}' with invalid or inaccessible workspace.")
continue

if run.run_complete:
completed_runs.append(run)
else:
active_runs.append(run)

# Log completed runs
if completed_runs:
completed_workspaces = [run.get_workspace() for run in completed_runs]
LOG.info(f"Monitor: The following runs have completed: {completed_workspaces}")

# Log active runs
if active_runs:
active_workspaces = [run.get_workspace() for run in active_runs]
LOG.info(f"Monitor: Currently monitoring {len(active_runs)} active run(s): {active_workspaces}")
else:
LOG.info("Monitor: No active runs remaining.")
break

if not run_complete:
time.sleep(self.sleep)
# The below exceptions do not modify the `run_complete` value so the loop should retry
except (RedisTimeoutError, OperationalError, TimeoutError) as exc:
self._handle_transient_exception(exc)
# Check each active run
for run in active_runs:
self.wait_for_workers(run)
self.check_run_health(run)

LOG.info(f"Monitor: Run with workspace '{run_workspace}' has completed.")
time.sleep(self.sleep)

def restart_workflow(self, run: RunEntity):
def monitor_single_run(self, run: RunEntity):
"""
Restart a run of a workflow.
Monitors a single run of a study until it completes.

This method focuses on monitoring a specific run, continuously performing health
checks until the run is marked as complete. It ensures that the allocation stays
alive, workers remain healthy, and stalled workflows are restarted if necessary.

Unlike [`monitor_all_runs`][monitor.monitor.Monitor.monitor_all_runs], this method
is designed for scenarios where only a single run needs to be monitored. It follows
a simpler execution model that doesn't poll for additional runs.

Args:
run: A [`RunEntity`][db_scripts.entities.run_entity.RunEntity] instance representing
the run that's going to be restarted.
the run that's going to be monitored.

Raises:
RestartException: If the workflow restart process fails.
RunNotFoundError: If the run's workspace is invalid or inaccessible.
"""
try:
run_workspace = verify_dirpath(run.get_workspace())
LOG.info(f"Monitor: Restarting workflow for run with workspace '{run_workspace}'...")
restart_proc = subprocess.run(f"merlin restart {run_workspace}", shell=True, capture_output=True, text=True)
if restart_proc.returncode != 0:
LOG.error(f"Monitor: Failed to restart workflow: {restart_proc.stderr}")
raise RestartException(f"Restart process failed with error: {restart_proc.stderr}")
LOG.info(f"Monitor: Workflow restarted successfully: {restart_proc.stdout}")
except ValueError:
LOG.warning(
f"Monitor: Run with workspace '{run.get_workspace()}' was not found. Ignoring the restart of this workspace."
)
run_workspace = run.get_workspace()

# Validate workspace exists before monitoring
if not self._validate_run_workspace(run):
raise RunNotFoundError(f"Cannot monitor run with invalid or inaccessible workspace '{run_workspace}'")

LOG.info(f"Monitor: Monitoring run with workspace '{run_workspace}'...")

# Wait for workers to spin up before checking on tasks
self.wait_for_workers(run)

while not run.run_complete:
self.check_run_health(run)
if not run.run_complete:
time.sleep(self.sleep)

LOG.info(f"Monitor: Run with workspace '{run_workspace}' has completed.")
Loading