-
Notifications
You must be signed in to change notification settings - Fork 70
Open
Description
I found that Arrow.Stream does not work with non-seekable I/O, which should be supported for streaming. Here are some MWEs.
Named pipes
mkfifo /tmp/arrow_pipe
# Producer
julia -e '
using Arrow
open("/tmp/arrow_pipe", "w") do io
Arrow.write(io, (i = collect(1:10),); file=false) # streaming
end
' &
# Consumer
julia -e '
using Arrow
open("/tmp/arrow_pipe", "r") do io
for batch in Arrow.Stream(io)
println(batch)
end
end
'
# Result: no output, no error
rm /tmp/arrow_pipeSockets
using Arrow, Sockets
server = listen(9999)
@async begin
conn = accept(server)
Arrow.write(conn, (i = collect(1:10),); file=false)
close(conn)
end
sock = connect(9999)
# This block hangs indefinitely. Press Ctrl-C to proceed.
for batch in Arrow.Stream(sock)
println(batch)
end
wait(t)
# ERROR: TaskFailedException
# nested task error: MethodError: no method matching position(::TCPSocket)Unix domain sockets
using Arrow, Sockets
server = listen("/tmp/arrow.sock")
@async begin
conn = accept(server)
Arrow.write(conn, (i = collect(1:10),); file=false)
close(conn)
end
sock = connect("/tmp/arrow.sock")
# This block hangs indefinitely. Press Ctrl-C to proceed.
for batch in Arrow.Stream(sock)
println(batch)
end
wait(t)
# ERROR: TaskFailedException
# nested task error: MethodError: no method matching position(::Base.PipeEndpoint)The above demo should be reproducible with Arrow v2.8.0.
Diagnosis
The first issue is caused tobytes(io::IOStream) unconditionally using Mmap.mmap(io). For named pipes, filesize(io) returns 0, so Mmap.mmap(io) silently returns an empty UInt8[].
The rest is caused by Base.write(io, msg, ...) calling position(io) to record block positions for the file format footer, even when writing streaming format (file=false).
I will submit a PR soon.
Metadata
Metadata
Assignees
Labels
No labels