-
Notifications
You must be signed in to change notification settings - Fork 7k
Description
What happened + What you expected to happen
Ray autoscaling behaves unexpectedly when executing Dataset.map(f, concurrency=N):
- It does not immediately provision the amount of nodes necessary for the requested concurrency of N - it only provisions a single node.
- Only when the first node is up and running, it provisions another node.
- The above repeats indefinitely, provisioning more and more nodes, seemingly bounded only by the cluster configuration, rather than the application needs - the application only uses concurrency up to N and additional nodes remain unused.
The same happens when using ray.data.DataContext.get_current().execution_options.resource_limits.cpu instead of the concurrency option of Dataset.map.
The same also happens when using ray.data.DataContext.get_current().scheduling_strategy = PlacementGroupSchedulingStrategy(...). The ray.data tasks consume placement group resources, but more nodes keep getting added outside the placement group.
Example
For example, in the reproduction script below
- we work with nodes with 2 CPUs each
- we request a Dataset.map concurrency of 2
- each execution of
frequires only 1 CPU - so we expect only 1 nodes to be provisioned.
Instead, the dashboard shows the following sequence of updates:
---
Active: 1 ray.head.default
Usage: 2.0/2.0 CPU
Demands: {'CPU': 1}: 3+ from request_resources()
---
Active: 1 ray.worker, 1 ray.head.default
Usage: 2.0/4.0 CPU
Demands: {'CPU': 1}: 5+ from request_resources()
---
Active: 2 ray.worker, 1 ray.head.default
Usage: 2.0/6.0 CPU
Demands: {'CPU': 1}: 8+ from request_resources()
---
... and so on, adding nodes indefinitely. I stopped it at 9 worker nodes.
Maybe related: I noticed that the log messages reporting progress of the map remains at 0 throughout the execution even as some tasks are completing, for example:
Running Dataset. Active & requested resources: 2/6 CPU, 512.0MB/3.4GB object store: : 0.00 row [06:31, ? row/s]
- Map(f): Tasks: 2 [backpressured]; Queued blocks: 2; Resources: 2.0 CPU, 512.0MB object store: : 0.00 row [06:31, ? row/s]
All this makes Ray Data pretty much unusable for me.
Versions / Dependencies
Ray: 2.43.0
Reproduction script
Entrypoint:
import ray
import ray.data
import time
ray.init()
def f(x):
print(f"{x} Hello!")
time.sleep(5)
return {'item': 1}
ds = ray.data.range(1000)
ds.map(f, concurrency=2).materialize()
Cluster config:
cluster_name: 'debug'
max_workers: 10
upscaling_speed: 1.0
idle_timeout_minutes: 1
docker:
image: 'rayproject/ray:latest'
container_name: "ray_container"
pull_before_run: True
run_options:
- "--ulimit nofile=65536:65536"
provider:
type: aws
region: us-east-1
availability_zone: us-east-1c
cache_stopped_nodes: False
use_internal_ips: True
auth:
ssh_user: ec2-user
ssh_private_key: '~/.ssh/jakobleben-ec2.pem'
available_node_types:
ray.head.default:
node_config: &node_config
InstanceType: m5.large
# ImageId: ami-0dd6adfad4ad37eec
ImageId: ami-0514b880c5dba39fa
SecurityGroupIds: ['sg-0c15118bc2b72e433']
SubnetId: subnet-08ca755d62dc7f0db
IamInstanceProfile:
Name: 'jakobhawkeyeHawkeyeRayNodeProfile'
KeyName: 'jakobleben'
BlockDeviceMappings:
- DeviceName: /dev/xvda
Ebs:
VolumeSize: 80
VolumeType: gp3
ray.worker:
min_workers: 0
max_workers: 10
resources: {}
node_config:
<<: *node_config
head_node_type: ray.head.default
file_mounts: {}
cluster_synced_files: []
file_mounts_sync_continuously: False
rsync_exclude:
- "__pycache__"
- "**/.git"
- "**/.git/**"
rsync_filter:
- ".gitignore"
initialization_commands: []
setup_commands: []
head_setup_commands: []
worker_setup_commands: []
head_start_ray_commands:
- ray stop
- ray start --head --port=6379 --object-manager-port=8076 --autoscaling-config=~/ray_bootstrap_config.yaml --dashboard-host=0.0.0.0
worker_start_ray_commands:
- ray stop
- ray start --address=$RAY_HEAD_IP:6379 --object-manager-port=8076
Issue Severity
High: It blocks me from completing my task.