Skip to content

Commit 0c615dd

Browse files
author
yicheng
committed
add python pipe test
Signed-off-by: yicheng <[email protected]>
1 parent 79722fb commit 0c615dd

File tree

1 file changed

+142
-0
lines changed

1 file changed

+142
-0
lines changed

python/ray/tests/test_pipe.py

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
import subprocess
2+
import sys
3+
4+
import pytest
5+
6+
from ray._private.pipe import Pipe
7+
8+
9+
def test_parent_read_child_write():
10+
"""Parent process reads data written by child process."""
11+
pipe = Pipe()
12+
writer_handle = pipe.make_writer_handle()
13+
14+
code = f"""
15+
from ray._private.pipe import Pipe
16+
pipe = Pipe.from_writer_handle({writer_handle})
17+
pipe.write("hello from child")
18+
pipe.close()
19+
"""
20+
# Use close_fds=False instead of pass_fds for cross-platform compatibility.
21+
# On Windows, pass_fds is not supported; close_fds=False allows inheritable
22+
# handles to be passed to child processes.
23+
proc = subprocess.Popen(
24+
[sys.executable, "-c", code],
25+
close_fds=False,
26+
)
27+
pipe.close_writer_handle()
28+
29+
data = pipe.read(timeout_s=5)
30+
proc.wait()
31+
32+
assert data == "hello from child"
33+
assert proc.returncode == 0
34+
pipe.close()
35+
36+
37+
def test_sibling_processes_communicate():
38+
"""Two sibling child processes communicate through a pipe.
39+
40+
Parent creates pipe, spawns writer then reader.
41+
Data flow: writer child -> reader child
42+
"""
43+
pipe = Pipe()
44+
45+
# Create writer handle and spawn writer first
46+
writer_handle = pipe.make_writer_handle()
47+
writer_code = f"""
48+
from ray._private.pipe import Pipe
49+
pipe = Pipe.from_writer_handle({writer_handle})
50+
pipe.write("hello from sibling")
51+
pipe.close()
52+
"""
53+
writer_proc = subprocess.Popen(
54+
[sys.executable, "-c", writer_code],
55+
close_fds=False,
56+
)
57+
pipe.close_writer_handle()
58+
59+
# Now create reader handle and spawn reader
60+
reader_handle = pipe.make_reader_handle()
61+
reader_code = f"""
62+
from ray._private.pipe import Pipe
63+
pipe = Pipe.from_reader_handle({reader_handle})
64+
data = pipe.read(timeout_s=5)
65+
print(data, end='')
66+
pipe.close()
67+
"""
68+
reader_proc = subprocess.Popen(
69+
[sys.executable, "-c", reader_code],
70+
close_fds=False,
71+
stdout=subprocess.PIPE,
72+
)
73+
pipe.close_reader_handle()
74+
75+
writer_proc.wait()
76+
stdout, _ = reader_proc.communicate(timeout=5)
77+
78+
assert stdout.decode() == "hello from sibling"
79+
assert writer_proc.returncode == 0
80+
assert reader_proc.returncode == 0
81+
pipe.close()
82+
83+
84+
def test_multiple_pipes_to_one_child():
85+
"""One child writes to multiple pipes created by parent."""
86+
pipe1 = Pipe()
87+
pipe2 = Pipe()
88+
writer_handle1 = pipe1.make_writer_handle()
89+
writer_handle2 = pipe2.make_writer_handle()
90+
91+
code = f"""
92+
from ray._private.pipe import Pipe
93+
p1 = Pipe.from_writer_handle({writer_handle1})
94+
p2 = Pipe.from_writer_handle({writer_handle2})
95+
p1.write("data_for_pipe1")
96+
p2.write("data_for_pipe2")
97+
p1.close()
98+
p2.close()
99+
"""
100+
proc = subprocess.Popen(
101+
[sys.executable, "-c", code],
102+
close_fds=False,
103+
)
104+
105+
pipe1.close_writer_handle()
106+
pipe2.close_writer_handle()
107+
108+
data1 = pipe1.read(timeout_s=5)
109+
data2 = pipe2.read(timeout_s=5)
110+
proc.wait()
111+
112+
assert data1 == "data_for_pipe1"
113+
assert data2 == "data_for_pipe2"
114+
assert proc.returncode == 0
115+
116+
pipe1.close()
117+
pipe2.close()
118+
119+
120+
def test_read_timeout():
121+
"""Read raises RuntimeError on timeout when no data available."""
122+
pipe = Pipe()
123+
with pytest.raises(RuntimeError, match="Timed out"):
124+
pipe.read(timeout_s=0.1)
125+
pipe.close()
126+
127+
128+
def test_operations_after_close():
129+
"""Read/write on closed pipe raises RuntimeError."""
130+
pipe = Pipe()
131+
pipe.close()
132+
with pytest.raises(RuntimeError, match="already taken or closed"):
133+
pipe.read()
134+
135+
pipe2 = Pipe()
136+
pipe2.close()
137+
with pytest.raises(RuntimeError, match="already taken or closed"):
138+
pipe2.write("data")
139+
140+
141+
if __name__ == "__main__":
142+
sys.exit(pytest.main(["-sv", __file__]))

0 commit comments

Comments
 (0)