Skip to content

Conversation

@bveeramani
Copy link
Member

When multiple Ray Data or Ray Train jobs run concurrently on a cluster, they each independently request resources from the Ray Autoscaler without awareness of each other. This can lead to inefficient resource utilization and conflicts.

This PR introduces an AutoscalingCoordinator that acts as a central point for coordinating resource requests across components.

Signed-off-by: Balaji Veeramani <[email protected]>
Signed-off-by: Balaji Veeramani <[email protected]>
@bveeramani bveeramani requested a review from a team as a code owner December 10, 2025 19:09
Signed-off-by: Balaji Veeramani <[email protected]>
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces an AutoscalingCoordinator to centralize resource requests, which is a significant improvement for managing resources in concurrent job scenarios. The implementation is robust, featuring a resilient actor-based design with timeout handling, caching, and lazy initialization. The tests are comprehensive, covering unit, integration, and failure cases effectively.

My feedback highlights a few areas for improvement:

  • A potential high-severity issue regarding the over-allocation of remaining resources.
  • A medium-severity issue concerning an in-place modification of an argument list, which could lead to unexpected side effects.
  • Minor suggestions for improving code style and test hygiene.

Overall, this is a well-executed feature addition. Addressing these points will further enhance the reliability and maintainability of the new coordinator.

Comment on lines +407 to +409
for ongoing_req in ongoing_reqs:
if ongoing_req.request_remaining:
ongoing_req.allocated_resources.extend(cluster_node_resources)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The current implementation allocates all remaining cluster resources to every requester with request_remaining=True. As noted in the code comment, this can lead to double-allocation and overallocation if multiple such requesters exist, potentially causing resource contention and task scheduling failures. While this is for parity with previous behavior, it's a significant correctness risk. A more robust approach would be to divide the remaining resources among the requesters, for example, proportionally to their initial requests, or on a first-come, first-served basis.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bveeramani is this valid?

requester_id = kwargs.get(requester_id_param)
if requester_id is None:
# Try to get from args by checking function signature
import inspect
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

For better code style, to avoid potential overhead from repeated imports, and to make dependencies explicit, it's recommended to move the import inspect statement to the top of the file with other imports.

Comment on lines +283 to +285
for r in resources:
for k in r:
r[k] = math.ceil(r[k])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This loop modifies the resources list in-place, which can be an unexpected side effect for the caller. To avoid this, consider creating a new list of resource dictionaries with the rounded-up values using a list comprehension. This makes the function's behavior clearer and safer as it avoids mutating its inputs.

        resources = [{k: math.ceil(v) for k, v in r.items()} for r in resources]

Comment on lines +40 to +46
MOCKED_TIME = 0


def mock_time():
global MOCKED_TIME

return MOCKED_TIME
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using a global variable MOCKED_TIME for mocking time can lead to test flakiness, especially if tests are run in parallel. It would be more robust to encapsulate the mock time state, for example, by using a class instance or a fixture that doesn't rely on global state.

For example, you could define a callable class:

class MockTime:
    def __init__(self, initial_time=0):
        self.current_time = initial_time

    def __call__(self):
        return self.current_time

    def set_time(self, new_time):
        self.current_time = new_time

And then in your test:

mock_time = MockTime()
with patch("time.time", mock_time):
    # ... test logic ...
    mock_time.set_time(10)
    # ... more test logic ...

@bveeramani bveeramani changed the title [Data][Autoscaling][1/N] Add AutoscalingCoordinator [Data][Autoscaler][1/N] Add AutoscalingCoordinator Dec 10, 2025
# Call tick() as an actor task,
# so we don't need to handle multi-threading.
time.sleep(self.TICK_INTERVAL_S)
ray.get(self._self_handle.tick.remote())
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Tick thread silently crashes on unhandled exceptions

The tick_thread_run function has no exception handling around the ray.get(self._self_handle.tick.remote()) call. If any exception occurs (network issues, actor errors, or other runtime errors), the thread will terminate silently without any notification since it's a daemon thread. This causes the periodic maintenance operations (purging expired requests, updating cluster resources, reallocating resources) to stop functioning permanently until the actor restarts, potentially leading to stale allocation information and memory growth from accumulated expired requests.

Fix in Cursor Fix in Web

return False
for key in res2:
res1[key] -= res2[key]
return True
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: KeyError when subtracting zero-valued resources not present in node

The _maybe_subtract_resources function raises a KeyError if a requested resource bundle contains a key with value 0 that doesn't exist in the node's resources. The check on line 352 uses res1.get(key, 0) < res2[key], which evaluates to 0 < 0 (False) when res2[key] is 0, so the function proceeds to line 355 which attempts res1[key] -= res2[key]. Since key doesn't exist in res1, this raises a KeyError. This could occur if a resource request includes resource types with zero values that aren't present on cluster nodes.

Fix in Cursor Fix in Web

Signed-off-by: Balaji Veeramani <[email protected]>
@bveeramani bveeramani enabled auto-merge (squash) December 10, 2025 21:41
@github-actions github-actions bot added the go add ONLY when ready to merge, run all tests label Dec 10, 2025
@github-actions github-actions bot disabled auto-merge December 10, 2025 21:41
@bveeramani bveeramani enabled auto-merge (squash) December 10, 2025 22:23
Signed-off-by: Balaji Veeramani <[email protected]>
@github-actions github-actions bot disabled auto-merge December 10, 2025 23:57
@bveeramani bveeramani merged commit fcba103 into master Dec 11, 2025
6 checks passed
@bveeramani bveeramani deleted the autoscaling-coordinator branch December 11, 2025 00:46
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

go add ONLY when ready to merge, run all tests

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants