Skip to content

Commit 7338b27

Browse files
author
yicheng
committed
better wrong input check
Signed-off-by: yicheng <[email protected]>
1 parent 0c615dd commit 7338b27

File tree

4 files changed

+128
-36
lines changed

4 files changed

+128
-36
lines changed

python/ray/tests/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -467,6 +467,7 @@ py_test_module_list(
467467
"test_label_scheduling.py",
468468
"test_minimal_install.py",
469469
"test_path_utils.py",
470+
"test_pipe.py",
470471
"test_runtime_env_ray_minimal.py",
471472
],
472473
tags = [

python/ray/tests/test_pipe.py

Lines changed: 0 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -81,42 +81,6 @@ def test_sibling_processes_communicate():
8181
pipe.close()
8282

8383

84-
def test_multiple_pipes_to_one_child():
85-
"""One child writes to multiple pipes created by parent."""
86-
pipe1 = Pipe()
87-
pipe2 = Pipe()
88-
writer_handle1 = pipe1.make_writer_handle()
89-
writer_handle2 = pipe2.make_writer_handle()
90-
91-
code = f"""
92-
from ray._private.pipe import Pipe
93-
p1 = Pipe.from_writer_handle({writer_handle1})
94-
p2 = Pipe.from_writer_handle({writer_handle2})
95-
p1.write("data_for_pipe1")
96-
p2.write("data_for_pipe2")
97-
p1.close()
98-
p2.close()
99-
"""
100-
proc = subprocess.Popen(
101-
[sys.executable, "-c", code],
102-
close_fds=False,
103-
)
104-
105-
pipe1.close_writer_handle()
106-
pipe2.close_writer_handle()
107-
108-
data1 = pipe1.read(timeout_s=5)
109-
data2 = pipe2.read(timeout_s=5)
110-
proc.wait()
111-
112-
assert data1 == "data_for_pipe1"
113-
assert data2 == "data_for_pipe2"
114-
assert proc.returncode == 0
115-
116-
pipe1.close()
117-
pipe2.close()
118-
119-
12084
def test_read_timeout():
12185
"""Read raises RuntimeError on timeout when no data available."""
12286
pipe = Pipe()

src/ray/util/tests/BUILD.bazel

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,19 @@ ray_cc_test(
220220
],
221221
)
222222

223+
ray_cc_test(
224+
name = "pipe_test",
225+
size = "small",
226+
srcs = ["pipe_test.cc"],
227+
tags = ["team:core"],
228+
deps = [
229+
"//src/ray/util:pipe",
230+
"//src/ray/util:process",
231+
"@com_google_absl//absl/strings",
232+
"@com_google_googletest//:gtest_main",
233+
],
234+
)
235+
223236
ray_cc_test(
224237
name = "pipe_logger_test",
225238
size = "small",

src/ray/util/tests/pipe_test.cc

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
// Copyright 2017 The Ray Authors.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#include "ray/util/pipe.h"
16+
17+
#include <gtest/gtest.h>
18+
19+
#include <string>
20+
21+
#include "absl/strings/str_cat.h"
22+
#include "ray/util/process.h"
23+
24+
namespace ray {
25+
26+
TEST(PipeTest, ParentReadChildWrite) {
27+
Pipe pipe;
28+
intptr_t writer_handle = pipe.MakeWriterHandle();
29+
30+
std::string python_code = absl::StrCat(
31+
"import os; "
32+
"os.write(",
33+
writer_handle,
34+
", b'hello from child'); "
35+
"os.close(",
36+
writer_handle,
37+
")");
38+
39+
auto [proc, ec] = Process::Spawn({"python", "-c", python_code}, /*decouple=*/false);
40+
ASSERT_FALSE(ec) << ec.message();
41+
42+
pipe.CloseWriterHandle();
43+
44+
auto result = pipe.Read(/*timeout_s=*/5);
45+
ASSERT_TRUE(result.ok()) << result.status().message();
46+
EXPECT_EQ(*result, "hello from child");
47+
48+
int exit_code = proc.Wait();
49+
EXPECT_EQ(exit_code, 0);
50+
}
51+
52+
TEST(PipeTest, SiblingProcessesCommunicate) {
53+
Pipe pipe;
54+
55+
// Spawn writer first
56+
intptr_t writer_handle = pipe.MakeWriterHandle();
57+
std::string writer_code = absl::StrCat(
58+
"import os; "
59+
"os.write(",
60+
writer_handle,
61+
", b'hello from sibling'); "
62+
"os.close(",
63+
writer_handle,
64+
")");
65+
66+
auto [writer_proc, writer_ec] =
67+
Process::Spawn({"python", "-c", writer_code}, /*decouple=*/false);
68+
ASSERT_FALSE(writer_ec) << writer_ec.message();
69+
pipe.CloseWriterHandle();
70+
71+
// Then spawn reader
72+
intptr_t reader_handle = pipe.MakeReaderHandle();
73+
std::string reader_code = absl::StrCat(
74+
"import os, sys; "
75+
"data = os.read(",
76+
reader_handle,
77+
", 100); "
78+
"os.close(",
79+
reader_handle,
80+
"); "
81+
"sys.exit(0 if data == b'hello from sibling' else 1)");
82+
83+
auto [reader_proc, reader_ec] =
84+
Process::Spawn({"python", "-c", reader_code}, /*decouple=*/false);
85+
ASSERT_FALSE(reader_ec) << reader_ec.message();
86+
pipe.CloseReaderHandle();
87+
88+
int writer_exit = writer_proc.Wait();
89+
EXPECT_EQ(writer_exit, 0);
90+
91+
int reader_exit = reader_proc.Wait();
92+
EXPECT_EQ(reader_exit, 0);
93+
}
94+
95+
TEST(PipeTest, ReadTimeout) {
96+
Pipe pipe;
97+
auto result = pipe.Read(/*timeout_s=*/0.1);
98+
EXPECT_FALSE(result.ok());
99+
}
100+
101+
TEST(PipeTest, OperationsAfterClose) {
102+
Pipe pipe;
103+
pipe.Close();
104+
105+
auto read_result = pipe.Read();
106+
EXPECT_FALSE(read_result.ok());
107+
108+
Pipe pipe2;
109+
pipe2.Close();
110+
auto write_result = pipe2.Write("data");
111+
EXPECT_FALSE(write_result.ok());
112+
}
113+
114+
} // namespace ray

0 commit comments

Comments
 (0)