Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
99 commits
Select commit Hold shift + click to select a range
ae891b2
add files that we'll need for this refactor
bgunnar5 Jul 15, 2025
3254380
add MerlinBaseFactory class
bgunnar5 Jul 15, 2025
1e48dea
refactor MerlinBackendFactory to use MerlinBaseFactory
bgunnar5 Jul 15, 2025
1edf5ba
add tests for MerlinBaseFactory and fix backend tests
bgunnar5 Jul 15, 2025
0134cd3
convert monitor factory to use new MerlinBaseFactory
bgunnar5 Jul 15, 2025
1c0088f
remove comment and update MonitorFactory docstring
bgunnar5 Jul 15, 2025
b8c9e19
update MerlinStatusRendererFactory to use MerlinBaseFactory and fix T…
bgunnar5 Jul 15, 2025
88cda38
add tests for the status renderer factory
bgunnar5 Jul 15, 2025
1a9cff2
update CHANGELOG
bgunnar5 Jul 15, 2025
f2efe91
run fix-style
bgunnar5 Jul 15, 2025
bfbfe03
fix issue with typehint that breaks in python 3.8
bgunnar5 Jul 15, 2025
9a5e257
mocked more items to try to fix broken tests on github
bgunnar5 Jul 16, 2025
4128bf8
create MerlinWorker and CeleryWorker classes to handle the launching …
bgunnar5 Jul 21, 2025
5d554ec
implement worker-handler related classes
bgunnar5 Jul 23, 2025
45afd64
add worker factory class and small cleanup to the rest of the worker …
bgunnar5 Jul 23, 2025
edead13
remove functions that are now in the new worker files
bgunnar5 Jul 23, 2025
7a50198
link the new worker classes to the actual launching of workers
bgunnar5 Jul 23, 2025
e51018d
resolve merge conflicts
bgunnar5 Jul 23, 2025
60ee2c2
fix regex in test
bgunnar5 Jul 23, 2025
c5ba679
remove watchdog files and run fix-style
bgunnar5 Jul 23, 2025
17ae613
fix tests that broke after refactor
bgunnar5 Jul 23, 2025
889541e
run fix-style
bgunnar5 Jul 23, 2025
4e1e1fb
update CHANGELOG
bgunnar5 Jul 23, 2025
b7caa85
refactored how the database command is initially processed
bgunnar5 Jul 30, 2025
b08f358
add filtering logic to the entity managers
bgunnar5 Jul 30, 2025
2e12115
add support for database-level filtering
bgunnar5 Jul 30, 2025
595df60
move get_plural_of_entity and get_singular_of_entity to utils.py
bgunnar5 Jul 30, 2025
b4bab5e
fix database command tests
bgunnar5 Aug 5, 2025
37feee9
fix broken tests for entity manager
bgunnar5 Aug 5, 2025
64f23a2
run fix-style and update CHANGELOG
bgunnar5 Aug 5, 2025
14b6ce0
add tests for new backend filtering
bgunnar5 Aug 5, 2025
7eb7529
add tests for new cli utils functions
bgunnar5 Aug 5, 2025
29151e6
fix typo in cli/utils.py
bgunnar5 Aug 5, 2025
0e747e9
run fix-style
bgunnar5 Aug 5, 2025
24042da
add filter options to the command line page
bgunnar5 Aug 6, 2025
c7949f3
resolve conflicts
bgunnar5 Jul 15, 2025
50743c5
resolve conflicts
bgunnar5 Jul 15, 2025
6daae0f
mocked more items to try to fix broken tests on github
bgunnar5 Jul 16, 2025
dd5d19d
create MerlinWorker and CeleryWorker classes to handle the launching …
bgunnar5 Jul 21, 2025
032405c
implement worker-handler related classes
bgunnar5 Jul 23, 2025
cc327aa
add worker factory class and small cleanup to the rest of the worker …
bgunnar5 Jul 23, 2025
15a3386
remove functions that are now in the new worker files
bgunnar5 Jul 23, 2025
c4cbe13
link the new worker classes to the actual launching of workers
bgunnar5 Jul 23, 2025
d2d324b
add files that we'll need for this refactor
bgunnar5 Jul 15, 2025
9a81f88
fix regex in test
bgunnar5 Jul 23, 2025
caddb71
remove watchdog files and run fix-style
bgunnar5 Jul 23, 2025
547e3c5
fix tests that broke after refactor
bgunnar5 Jul 23, 2025
8b212eb
run fix-style
bgunnar5 Jul 23, 2025
fcc6b8e
update CHANGELOG
bgunnar5 Jul 23, 2025
4d84bd8
run fix-style
bgunnar5 Aug 7, 2025
be71c82
fix worker-related factory classes
bgunnar5 Aug 7, 2025
534769e
add tests for new workers files
bgunnar5 Aug 7, 2025
272a0a7
change imports for Celery worker and handler in tests
bgunnar5 Aug 7, 2025
4d5efd4
run fix-style
bgunnar5 Aug 7, 2025
799d667
attempt to fix broken unit tests
bgunnar5 Aug 7, 2025
bfe02a2
fix style
bgunnar5 Aug 7, 2025
a214e56
pull latest changes from develop
bgunnar5 Aug 7, 2025
51db25c
add mocked merlin db to broken test
bgunnar5 Aug 7, 2025
45e7e64
run fix-style
bgunnar5 Aug 7, 2025
0a4de9c
fix susbcriptable error in test
bgunnar5 Aug 7, 2025
fa2d6f5
merge latest changes from develop
bgunnar5 Aug 7, 2025
dd3d390
Merge branch 'feature/filter-db-queries' into refactor/query-workers
bgunnar5 Aug 7, 2025
f6865c7
first pass at adding new worker formatter classes
bgunnar5 Aug 18, 2025
4d9d31e
link new formatters to new query-workers refactor
bgunnar5 Aug 18, 2025
413c39d
remove _discover_builtins method that's not used and annoying
bgunnar5 Aug 26, 2025
c420d5e
change database to only store base name of queue
bgunnar5 Aug 27, 2025
0d47eaa
get filters working for query-workers command
bgunnar5 Aug 27, 2025
64ac2c8
finalize worker formatters
bgunnar5 Aug 27, 2025
7a1b3a8
add console as class attribute for formmatters and fix pylint issues
bgunnar5 Aug 27, 2025
dfdf7d9
more pylint fixes
bgunnar5 Aug 27, 2025
add1927
add attributes section to json formatter
bgunnar5 Aug 27, 2025
74fcd1d
fix broken tests
bgunnar5 Aug 27, 2025
3defbaa
update CHANGELOG
bgunnar5 Aug 27, 2025
e1033d7
updated the query-workers docs page
bgunnar5 Aug 27, 2025
cac4d2f
update the query-workers CLI docs
bgunnar5 Aug 27, 2025
65e5c09
pull latest changes from develop-2.0
bgunnar5 Sep 22, 2025
2fb872d
fix one more merge problem
bgunnar5 Sep 22, 2025
0320158
add query-workers tests to celery worker handler test file
bgunnar5 Sep 23, 2025
82772b4
add tests for rich formatter
bgunnar5 Sep 23, 2025
d4c7e46
add tests for all formatter modules
bgunnar5 Sep 23, 2025
f070fc9
run fix-style
bgunnar5 Sep 23, 2025
a873e58
fix broken test
bgunnar5 Sep 23, 2025
4c90c69
remove query-workers integration tests
bgunnar5 Sep 25, 2025
f645267
fix issue with tests
bgunnar5 Sep 29, 2025
16d4e42
run fix style
bgunnar5 Sep 29, 2025
68b4486
pull latest changes from develop-2.0
bgunnar5 Oct 8, 2025
c22a9e7
fix issue with pid and use WorkerStatus enum properly
bgunnar5 Oct 9, 2025
dd00638
add functionality to compare db data against live celery data
bgunnar5 Oct 9, 2025
94b7539
pull in garbage collection update
bgunnar5 Oct 23, 2025
c8f3c7b
run fix style and fix tests
bgunnar5 Oct 23, 2025
be5be8c
move function to CeleryWorkerHandler and add/fix tests
bgunnar5 Oct 23, 2025
3a42d0a
run fix-style
bgunnar5 Oct 23, 2025
407e882
pull latest changes from develop-2.0
bgunnar5 Nov 24, 2025
0461e1e
integrate new queue-info code with latest changes
bgunnar5 Nov 24, 2025
0c4c9be
add ability to query workers from local db
bgunnar5 Nov 24, 2025
eede467
update docs to add new local-db option
bgunnar5 Nov 24, 2025
4c6cac7
remove accidental double reload_data call
bgunnar5 Nov 24, 2025
02f6276
fix bug when loading status of a worker
bgunnar5 Nov 25, 2025
695da66
fix two unit tests related to latest changes
bgunnar5 Nov 25, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,18 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Can be disabled with `--disable-gc`
- Alias for `merlin database` command so it can be called with `merlin db`
- Status of run entities in the database (this will differ from task statuses)
- New classes for formatting `query-workers` output:
- `WorkerFormatter`: base class for defining formatted output for workers
- `RichWorkerFormatter`: implementation of `WorkerFormatter` for output using the rich library
- `JSONWorkerFormatter`: implementation of `WorkerFormatter` for JSON output
- `WorkerFormatterFactory`: factory class for selecting the desired worker formatter

### Changed
- Changes to the `query-workers` command:
- Output now displays a lot more information, including logical and physical worker specific info
- Output now formatted using rich tables or json
- Now handled through worker classes rather than functions in `celeryadapter.py` file
- Behind the scenes this is now querying the new Merlin Database

## [2.0.0b2]

Expand Down
Binary file not shown.
Binary file not shown.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
18 changes: 6 additions & 12 deletions docs/user_guide/command_line.md
Original file line number Diff line number Diff line change
Expand Up @@ -1308,7 +1308,7 @@ The Merlin library comes equipped with several commands to help monitor your wor

- *[detailed-status](#detailed-status-merlin-detailed-status)*: Display task-by-task status information for a study
- *[monitor](#monitor-merlin-monitor)*: Keep your allocation alive while tasks are being processed
- *[query-workers](#query-workers-merlin-query-workers)*: Communicate with Celery to view information on active workers
- *[query-workers](#query-workers-merlin-query-workers)*: View information on [worker entities](./database/entities.md#worker-entities)
- *[queue-info](#queue-info-merlin-queue-info)*: Communicate with Celery to view the status of queues in your workflow(s)
- *[status](#status-merlin-status)*: Display a summary of the status of a study

Expand Down Expand Up @@ -1477,9 +1477,9 @@ merlin monitor [OPTIONS] SPECIFICATION

### Query Workers (`merlin query-workers`)

Check which workers are currently connected to the task server.
View information on [worker entities](./database/entities.md#worker-entities) in your database.

This will broadcast a command to all connected workers and print the names of any that respond and the queues they're attached to. This is useful for interacting with workers, such as via `merlin stop-workers --workers`.
This will query the [Merlin Database](./database/index.md) for information on [logical worker entities](./database/entities.md#logical-worker-entity) and [physical worker entities](./database/entities.md#physical-worker-entity). This can be useful for seeing which workers are running and where.

For more information, see the [Query Workers documentation](./monitoring/queues_and_workers.md#query-workers).

Expand All @@ -1497,7 +1497,9 @@ merlin query-workers [OPTIONS]
| `--task_server` | string | Task server type for which to query workers. Currently only "celery" is implemented. | "celery" |
| `--spec` | filename | Query for the workers named in the `merlin` block of the spec file given here | None |
| `--queues` | List[string] | Takes a space-delimited list of queues as input. This will query for workers associated with the names of the queues you provide here. | None |
| `--workers` | List[regex] | A space-delimited list of regular expressions representing workers to query | None |
| `--workers` | List[string] | A space-delimited list of logical worker names to query | None |
| `-f`, `--format` | choice(`rich` \| `json`) | Output format | rich |
| `-l`, `--local-db` | boolean | Use the local Merlin database for querying workers | `False` |

**Examples:**

Expand Down Expand Up @@ -1527,14 +1529,6 @@ merlin query-workers [OPTIONS]
merlin query-workers --workers step_1_worker
```

!!! example "Query Workers Using Regex"

This will query only workers whose names start with `step`:

```bash
merlin query-workers --workers ^step
```

### Queue Info (`merlin queue-info`)

!!! note
Expand Down
51 changes: 20 additions & 31 deletions docs/user_guide/monitoring/queues_and_workers.md
Original file line number Diff line number Diff line change
Expand Up @@ -280,28 +280,30 @@ merlin queue-info --spec <spec file> --vars <VAR_TO_MODIFY>=<VAL_TO_SET>

## Query Workers

Merlin provides users with the [`merlin query-workers`](../command_line.md#query-workers-merlin-query-workers) command to help users see which workers are running and what task queues they're watching.
Merlin provides users with the [`merlin query-workers`](../command_line.md#query-workers-merlin-query-workers) command to help users see information about the [worker entities](../database/entities.md#worker-entities) that currently exist in their database.

This command will output content to a table format with two columns: workers and queues. The workers column will contain one connected worker per row. The queues column will contain a comma-delimited list of queues that the connected worker is watching.
This command will output a summary of both the [logical worker entities](../database/entities.md#logical-worker-entity) and the [physical worker entities](../database/entities.md#physical-worker-entity), detailed information on the physical workers that exist, and information on logical workers that don't have any physical instances yet.

**Usage:**
There are two different ways that output of the `query-workers` command can be formattted: `rich` or `json`. By default, this is set to `rich`.

**Basic Usage:**

```bash
merlin query-workers
```

??? example "Example Query-Workers Output With No Connected Workers"
??? example "Example Query-Workers Output With No Worker Entities in the Database"

<figure markdown>
![Output of Query-Workers When No Workers Are Connected](../../assets/images/monitoring/queues_and_workers/no-connected-workers.png)
<figcaption>Output of Query-Workers When No Workers Are Connected</figcaption>
![Output of Query-Workers When Worker Entities Do Not Exist](../../assets/images/monitoring/queues_and_workers/query-workers-worker-entities-do-not-exist.png)
<figcaption>Output of Query-Workers When Worker Entities Do Not Exist</figcaption>
</figure>

??? example "Example Query-Workers Output With Connected Workers"
??? example "Example Query-Workers Output With Worker Entities in the Database"

<figure markdown>
![Output of Query-Workers When There Are Workers Connected](../../assets/images/monitoring/queues_and_workers/connected-workers.png)
<figcaption>Output of Query-Workers When There Are Workers Connected</figcaption>
![Output of Query-Workers When Worker Entities Exist](../../assets/images/monitoring/queues_and_workers/query-workers-worker-entities-exist.png)
<figcaption>Output of Query-Workers When Worker Entities Exist</figcaption>
</figure>

### Query Workers by Spec File
Expand Down Expand Up @@ -456,7 +458,7 @@ merlin query-workers --queues <space-delimited list of task queues>

Say we have the below spec file with four workers `creator`, `trainer`, `predictor`, and `verifier` that are each attached to their respective steps/task queues. In other words, `creator` will be connected to the `create_data` task queue, `trainer` will be connected to the `train` task queue, etc.:

```yaml title="demo_workflow.yaml" hl_lines="33-44"
```yaml title="demo_workflow_queues_option.yaml" hl_lines="33-44"
description:
name: demo_workflow
description: a very simple merlin workflow
Expand Down Expand Up @@ -506,14 +508,14 @@ merlin query-workers --queues <space-delimited list of task queues>
We can start these workers with:

```bash
merlin run-workers demo_workflow.yaml
merlin run-workers demo_workflow_queues_option.yaml
```

Now if we query the workers *without* the `--queues` option, we'll see all four workers alive and connected to their respective queues:

<figure markdown>
![All Four Workers From 'demo_workflow.yaml' Being Queried](../../assets/images/monitoring/queues_and_workers/queues-example-all-workers.png)
<figcaption>All Four Workers From 'demo_workflow.yaml' Being Queried</figcaption>
![All Four Workers From 'demo_workflow_queues_option.yaml' Being Queried](../../assets/images/monitoring/queues_and_workers/query-workers-queues-all-workers.png)
<figcaption>All Four Workers From 'demo_workflow_queues_option.yaml' Being Queried</figcaption>
</figure>

Let's refine this query to just view the workers connected to the `train` and `predict` queues:
Expand All @@ -525,25 +527,25 @@ merlin query-workers --queues <space-delimited list of task queues>
As we can see in the output below, only the `trainer` and `predictor` workers are now displayed:

<figure markdown>
![Output of Query-Workers Using the Queues Option](../../assets/images/monitoring/queues_and_workers/queues-example-filtered-workers.png)
![Output of Query-Workers Using the Queues Option](../../assets/images/monitoring/queues_and_workers/query-workers-queues-option.png)
<figcaption>Output of Query-Workers Using the Queues Option</figcaption>
</figure>

### Query Workers by Worker Regex

There will be instances when you know precisely which workers you want to query. In such cases, the `--workers` option in the `query-workers` command proves useful. This option facilitates querying workers using [regular expressions](https://docs.python.org/3/library/re.html). As full strings are accepted as regular expressions, you can also query workers by worker name.
There will be instances when you know precisely which workers you want to query. In such cases, the `--workers` option in the `query-workers` command proves useful. This option facilitates querying workers by their logical names.

**Usage:**

```bash
merlin query-workers --workers <space-delimited list of regular expressions>
merlin query-workers --workers <space-delimited list of worker names>
```

??? example "Example of Using the `--workers` Option With Query-Workers"

Say we have the following spec file with 3 workers `step_1_worker`, `step_2_worker`, and `other_worker`:

```yaml title="demo_workflow.yaml" hl_lines="27-35"
```yaml title="demo_workflow_workers_option.yaml" hl_lines="27-35"
description:
name: demo_workflow
description: a very simple merlin workflow
Expand Down Expand Up @@ -590,19 +592,6 @@ merlin query-workers --workers <space-delimited list of regular expressions>
In our output we see that both workers that we asked for were queried but `other_worker` was ignored:

<figure markdown>
![Output of Query-Workers Using the Workers Option With Worker Names](../../assets/images/monitoring/queues_and_workers/workers-option-with-worker-names.png)
![Output of Query-Workers Using the Workers Option With Worker Names](../../assets/images/monitoring/queues_and_workers/query-workers-workers-option.png)
<figcaption>Output of Query-Workers Using the Workers Option With Worker Names</figcaption>
</figure>

Alternatively, we can do the exact same query using a regular expression:

```bash
merlin query-workers --workers ^step
```

The `^` operator for regular expressions will match the beginning of a string. In this example when we say `^step` we're asking Merlin to match any worker starting with the word `step`, which in this case is `step_1_worker` and `step_2_worker`. We can see this in the output below:

<figure markdown>
![Output of Query-Workers Using the Workers Option With RegEx](../../assets/images/monitoring/queues_and_workers/workers-option-with-regex.png)
<figcaption>Output of Query-Workers Using the Workers Option With RegEx</figcaption>
</figure>
15 changes: 1 addition & 14 deletions merlin/abstracts/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,27 +124,13 @@ def _discover_plugins_via_entry_points(self):
except ImportError:
LOG.debug("pkg_resources not available for plugin discovery")

def _discover_builtin_modules(self):
"""
Optional hook to discover built-in components by scanning local modules.

Default implementation does nothing.

Subclasses can override this method to implement package/module scanning.
"""
LOG.warning(
f"Class {self.__class__.__name__} did not override _discover_builtin_modules(). "
"Built-in module discovery will be skipped."
)

def _discover_plugins(self):
"""
Discover and register plugin components via entry points.

Subclasses can override this to support more discovery mechanisms.
"""
self._discover_plugins_via_entry_points()
self._discover_builtin_modules()

def _raise_component_error_class(self, msg: str) -> Type[Exception]:
"""
Expand Down Expand Up @@ -227,6 +213,7 @@ def _get_component_class(self, canonical_name: str, component_type: str) -> Any:

return component_class

# TODO should we change 'config' to 'kwargs'?
def create(self, component_type: str, config: Dict = None) -> Any:
"""
Instantiate and return a component of the specified type.
Expand Down
2 changes: 1 addition & 1 deletion merlin/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ def handle_worker_startup(sender: str = None, **kwargs):
"physical_worker",
name=str(sender),
host=host,
status=WorkerStatus.RUNNING,
worker_status=WorkerStatus.RUNNING.value,
logical_worker_id=logical_worker.get_id(),
pid=os.getpid(),
)
Expand Down
35 changes: 30 additions & 5 deletions merlin/cli/commands/query_workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@

from merlin.ascii_art import banner_small
from merlin.cli.commands.command_entry_point import CommandEntryPoint
from merlin.router import query_workers
from merlin.config.configfile import initialize_config
from merlin.spec.specification import MerlinSpec
from merlin.utils import verify_filepath
from merlin.workers.formatters.formatter_factory import worker_formatter_factory
from merlin.workers.handlers.handler_factory import worker_handler_factory


LOG = logging.getLogger("merlin")
Expand Down Expand Up @@ -66,7 +68,21 @@ def add_parser(self, subparsers: ArgumentParser):
action="store",
nargs="+",
default=None,
help="Regex match for specific workers to query.",
help="Specific logical worker names to query.",
)
format_default = "rich"
query.add_argument(
"-f",
"--format",
choices=worker_formatter_factory.list_available(),
default=format_default,
help=f"Output format. Default: {format_default}",
)
query.add_argument(
"-l",
"--local-db",
action="store_true",
help="Use the local Merlin database for querying workers.",
)

def process_command(self, args: Namespace):
Expand All @@ -85,15 +101,24 @@ def process_command(self, args: Namespace):
"""
print(banner_small)

# Get the workers from the spec file if --spec provided
if args.local_db:
initialize_config(local_mode=True)

worker_names = []
if args.workers:
worker_names.extend(args.workers)

# Get the workers from the spec file if --spec provided
spec = None
if args.spec:
spec_path = verify_filepath(args.spec)
spec = MerlinSpec.load_specification(spec_path)
worker_names = spec.get_worker_names()
worker_names.extend(spec.get_worker_names())
for worker_name in worker_names:
if "$" in worker_name:
LOG.warning(f"Worker '{worker_name}' is unexpanded. Target provenance spec instead?")
LOG.debug(f"Searching for the following workers to stop based on the spec {args.spec}: {worker_names}")

query_workers(args.task_server, worker_names, args.queues, args.workers)
task_server = spec.merlin["resources"]["task_server"] if spec else args.task_server
worker_handler = worker_handler_factory.create(task_server)
worker_handler.query_workers(args.format, queues=args.queues, workers=worker_names, local_db=args.local_db)
8 changes: 4 additions & 4 deletions merlin/common/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,10 @@ class WorkerStatus(Enum):
REBOOTING (str): Indicates the worker is actively restarting itself. String value: "rebooting".
"""

RUNNING = "running"
STALLED = "stalled"
STOPPED = "stopped"
REBOOTING = "rebooting"
RUNNING = "RUNNING"
STALLED = "STALLED"
STOPPED = "STOPPED"
REBOOTING = "REBOOTING"


class RunStatus(Enum):
Expand Down
4 changes: 2 additions & 2 deletions merlin/db_scripts/data_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ class PhysicalWorkerModel(BaseDataModel): # pylint: disable=too-many-instance-a
name (str): The name of the physical worker.
pid (str): The process ID (PID) of the worker process.
restart_count (int): The number of times this worker has been restarted.
worker_status (WorkerStatus): The current status of the worker (e.g., running, stopped).
worker_status (str): The current status of the worker (e.g., running, stopped).
"""

id: str = field(default_factory=lambda: str(uuid.uuid4())) # pylint: disable=invalid-name
Expand All @@ -426,7 +426,7 @@ class PhysicalWorkerModel(BaseDataModel): # pylint: disable=too-many-instance-a
launch_cmd: str = None
args: Dict = field(default_factory=dict)
pid: str = None
worker_status: WorkerStatus = WorkerStatus.STOPPED
worker_status: str = field(default=WorkerStatus.STOPPED.value)
heartbeat_timestamp: datetime = field(default_factory=datetime.now)
latest_start_time: datetime = field(default_factory=datetime.now)
host: str = None
Expand Down
18 changes: 13 additions & 5 deletions merlin/db_scripts/entities/physical_worker_entity.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,17 @@ def get_pid(self) -> Optional[int]:
The process ID for this worker or None if not set.
"""
self.reload_data()
return int(self.entity_info.pid) if self.entity_info.pid else None
if not self.entity_info.pid:
return None

def set_pid(self, pid: str):
# Handle both int strings and float strings
try:
# Convert to float first, then to int
return int(float(self.entity_info.pid))
except (ValueError, TypeError):
return None

def set_pid(self, pid: int):
"""
Set the PID of this worker.

Expand All @@ -223,7 +231,7 @@ def get_worker_status(self) -> WorkerStatus:
the status of this worker.
"""
self.reload_data()
return self.entity_info.worker_status
return WorkerStatus(self.entity_info.worker_status)

def set_worker_status(self, status: WorkerStatus):
"""
Expand All @@ -233,7 +241,7 @@ def set_worker_status(self, status: WorkerStatus):
status: A [`WorkerStatus`][common.enums.WorkerStatus] enum representing
the new status of the worker.
"""
self.entity_info.worker_status = status
self.entity_info.worker_status = status.value
self.save()

def get_heartbeat_timestamp(self) -> str:
Expand Down Expand Up @@ -294,7 +302,7 @@ def get_restart_count(self) -> int:
The number of times that this worker has been restarted.
"""
self.reload_data()
return self.entity_info.restart_count
return int(float(self.entity_info.restart_count))

def increment_restart_count(self):
"""
Expand Down
2 changes: 1 addition & 1 deletion merlin/db_scripts/entity_managers/entity_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ def _matches_filters(self, entity: T, filters: Dict) -> bool:
entity: The entity instance to check against the filters.
filters: A dictionary of filter keys and values used to narrow down the query results.
Filter keys must correspond to entries in the `_filter_accessor_map` defined
by the subclass. Values are compared against the entitys corresponding attributes
by the subclass. Values are compared against the entity's corresponding attributes
or methods (e.g., {"name": "foo"}, {"queues": ["queue1", "queue2"]}).

Returns:
Expand Down
6 changes: 6 additions & 0 deletions merlin/exceptions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,12 @@ class MerlinWorkerHandlerNotSupportedError(Exception):
"""


class MerlinWorkerFormatterNotSupportedError(Exception):
"""
Exception to signal that the provided worker formatter is not supported by Merlin.
"""


class MerlinWorkerNotSupportedError(Exception):
"""
Exception to signal that the provided worker is not supported by Merlin.
Expand Down
Loading