diff --git a/benchmarks/compress-bench/src/vortex.rs b/benchmarks/compress-bench/src/vortex.rs index 049305336af..b2eb1a5d490 100644 --- a/benchmarks/compress-bench/src/vortex.rs +++ b/benchmarks/compress-bench/src/vortex.rs @@ -17,7 +17,7 @@ use vortex::file::WriteOptionsSessionExt; use vortex_bench::Format; use vortex_bench::SESSION; use vortex_bench::compress::Compressor; -use vortex_bench::conversions::parquet_to_vortex; +use vortex_bench::conversions::parquet_to_vortex_chunks; /// Compressor implementation for Vortex format. pub struct VortexCompressor; @@ -30,7 +30,7 @@ impl Compressor for VortexCompressor { async fn compress(&self, parquet_path: &Path) -> Result<(u64, Duration)> { // Read the parquet file as an array stream - let uncompressed = parquet_to_vortex(parquet_path.to_path_buf()).await?; + let uncompressed = parquet_to_vortex_chunks(parquet_path.to_path_buf()).await?; let mut buf = Vec::new(); let start = Instant::now(); @@ -46,7 +46,7 @@ impl Compressor for VortexCompressor { async fn decompress(&self, parquet_path: &Path) -> Result { // First compress to get the bytes we'll decompress - let uncompressed = parquet_to_vortex(parquet_path.to_path_buf()).await?; + let uncompressed = parquet_to_vortex_chunks(parquet_path.to_path_buf()).await?; let mut buf = Vec::new(); let mut cursor = Cursor::new(&mut buf); SESSION diff --git a/benchmarks/datafusion-bench/src/main.rs b/benchmarks/datafusion-bench/src/main.rs index be0ef3cd340..6be64e5bbad 100644 --- a/benchmarks/datafusion-bench/src/main.rs +++ b/benchmarks/datafusion-bench/src/main.rs @@ -28,7 +28,7 @@ use vortex_bench::Engine; use vortex_bench::Format; use vortex_bench::Opt; use vortex_bench::Opts; -use vortex_bench::conversions::convert_parquet_to_vortex; +use vortex_bench::conversions::convert_parquet_directory_to_vortex; use vortex_bench::create_benchmark; use vortex_bench::create_output_writer; use vortex_bench::display::DisplayFormat; @@ -125,10 +125,12 @@ async fn main() -> anyhow::Result<()> { for format in args.formats.iter() { match format { Format::OnDiskVortex => { - convert_parquet_to_vortex(&base_path, CompactionStrategy::Default).await?; + convert_parquet_directory_to_vortex(&base_path, CompactionStrategy::Default) + .await?; } Format::VortexCompact => { - convert_parquet_to_vortex(&base_path, CompactionStrategy::Compact).await?; + convert_parquet_directory_to_vortex(&base_path, CompactionStrategy::Compact) + .await?; } _ => {} } diff --git a/benchmarks/duckdb-bench/src/main.rs b/benchmarks/duckdb-bench/src/main.rs index 800d6625507..12a5e45ca7b 100644 --- a/benchmarks/duckdb-bench/src/main.rs +++ b/benchmarks/duckdb-bench/src/main.rs @@ -15,7 +15,7 @@ use vortex_bench::Engine; use vortex_bench::Format; use vortex_bench::Opt; use vortex_bench::Opts; -use vortex_bench::conversions::convert_parquet_to_vortex; +use vortex_bench::conversions::convert_parquet_directory_to_vortex; use vortex_bench::create_benchmark; use vortex_bench::create_output_writer; use vortex_bench::display::DisplayFormat; @@ -99,10 +99,18 @@ fn main() -> anyhow::Result<()> { for format in args.formats.iter().copied() { match format { Format::OnDiskVortex => { - convert_parquet_to_vortex(&base_path, CompactionStrategy::Default).await?; + convert_parquet_directory_to_vortex( + &base_path, + CompactionStrategy::Default, + ) + .await?; } Format::VortexCompact => { - convert_parquet_to_vortex(&base_path, CompactionStrategy::Compact).await?; + convert_parquet_directory_to_vortex( + &base_path, + CompactionStrategy::Compact, + ) + .await?; } // OnDiskDuckDB tables are created during register_tables by loading from Parquet _ => {} diff --git a/vortex-bench/src/bin/data-gen.rs b/vortex-bench/src/bin/data-gen.rs index 9a757676247..d38c977a6c6 100644 --- a/vortex-bench/src/bin/data-gen.rs +++ b/vortex-bench/src/bin/data-gen.rs @@ -19,7 +19,7 @@ use vortex_bench::CompactionStrategy; use vortex_bench::Format; use vortex_bench::Opt; use vortex_bench::Opts; -use vortex_bench::conversions::convert_parquet_to_vortex; +use vortex_bench::conversions::convert_parquet_directory_to_vortex; use vortex_bench::create_benchmark; use vortex_bench::generate_duckdb_registration_sql; use vortex_bench::setup_logging_and_tracing; @@ -68,7 +68,7 @@ async fn main() -> anyhow::Result<()> { .iter() .any(|f| matches!(f, Format::OnDiskVortex)) { - convert_parquet_to_vortex(&base_path, CompactionStrategy::Default).await?; + convert_parquet_directory_to_vortex(&base_path, CompactionStrategy::Default).await?; } if args @@ -76,7 +76,7 @@ async fn main() -> anyhow::Result<()> { .iter() .any(|f| matches!(f, Format::VortexCompact)) { - convert_parquet_to_vortex(&base_path, CompactionStrategy::Compact).await?; + convert_parquet_directory_to_vortex(&base_path, CompactionStrategy::Compact).await?; } if args diff --git a/vortex-bench/src/clickbench/data.rs b/vortex-bench/src/clickbench/data.rs index c4f36672d30..c0ac621a6b4 100644 --- a/vortex-bench/src/clickbench/data.rs +++ b/vortex-bench/src/clickbench/data.rs @@ -29,7 +29,7 @@ use vortex::error::VortexExpect; use crate::Format; // Re-export for use by clickbench_benchmark -pub use crate::conversions::convert_parquet_to_vortex; +pub use crate::conversions::convert_parquet_directory_to_vortex; use crate::idempotent_async; pub static HITS_SCHEMA: LazyLock = LazyLock::new(|| { diff --git a/vortex-bench/src/conversions.rs b/vortex-bench/src/conversions.rs index 06373bf2dcc..761b720865d 100644 --- a/vortex-bench/src/conversions.rs +++ b/vortex-bench/src/conversions.rs @@ -8,6 +8,8 @@ use std::path::PathBuf; use futures::StreamExt; use futures::TryStreamExt; use parquet::arrow::ParquetRecordBatchStreamBuilder; +use parquet::arrow::async_reader::ParquetRecordBatchStream; +use sysinfo::System; use tokio::fs::File; use tokio::fs::OpenOptions; use tokio::fs::create_dir_all; @@ -20,6 +22,12 @@ use vortex::array::VortexSessionExecute; use vortex::array::arrays::ChunkedArray; use vortex::array::arrow::FromArrowArray; use vortex::array::builders::builder_with_capacity; +use vortex::array::stream::ArrayStreamAdapter; +use vortex::array::stream::ArrayStreamExt; +use vortex::dtype::DType; +use vortex::dtype::arrow::FromArrowType; +use vortex::error::VortexError; +use vortex::error::VortexResult; use vortex::file::WriteOptionsSessionExt; use vortex::session::VortexSession; @@ -28,37 +36,115 @@ use crate::Format; use crate::SESSION; use crate::utils::file::idempotent_async; -/// Read a Parquet file and return it as a Vortex ArrayStream. -pub async fn parquet_to_vortex(parquet_path: PathBuf) -> anyhow::Result { +/// Memory budget per concurrent conversion stream in GB. This is somewhat arbitary. +const MEMORY_PER_STREAM_GB: u64 = 4; + +/// Minimum number of concurrent conversion streams. +const MIN_CONCURRENCY: u64 = 1; + +/// Maximum number of concurrent conversion streams. This is somewhat arbitary. +const MAX_CONCURRENCY: u64 = 16; + +/// Returns the available system memory in bytes. +fn available_memory_bytes() -> u64 { + System::new_all().available_memory() +} + +/// Calculate appropriate concurrency based on available memory. +fn calculate_concurrency() -> usize { + let available_gb = available_memory_bytes() / (1024 * 1024 * 1024); + let concurrency = (available_gb / MEMORY_PER_STREAM_GB).clamp(MIN_CONCURRENCY, MAX_CONCURRENCY); + + info!( + "Available memory: {}GB, maximum concurrency is: {}", + available_gb, concurrency + ); + + concurrency as usize +} + +/// Read a Parquet file and return it as a Vortex [`ChunkedArray`]. +/// +/// Note: This loads the entire file into memory. For large files, use the streaming conversion like +/// in [`parquet_to_vortex_stream`] instead. +pub async fn parquet_to_vortex_chunks(parquet_path: PathBuf) -> anyhow::Result { let file = File::open(parquet_path).await?; - let mut reader = ParquetRecordBatchStreamBuilder::new(file).await?.build()?; - let mut chunks = vec![]; - - while let Some(rb) = reader.next().await { - let rb = rb?; - let chunk = ArrayRef::from_arrow(rb, false)?; - - // Make sure data is uncompressed and canonicalized - let mut builder = builder_with_capacity(chunk.dtype(), chunk.len()); - chunk.append_to_builder( - builder.as_mut(), - &mut VortexSession::default().create_execution_ctx(), - )?; - let chunk = builder.finish(); - chunks.push(chunk); - } + let builder = ParquetRecordBatchStreamBuilder::new(file).await?; + let reader = builder.build()?; + + let chunks: Vec = parquet_to_vortex_stream(reader) + .map(|r| r.map_err(anyhow::Error::from)) + .try_collect() + .await?; Ok(ChunkedArray::from_iter(chunks)) } +/// Create a streaming Vortex array from a Parquet reader. +/// +/// Streams record batches and converts them to Vortex arrays on-the-fly, avoiding loading the +/// entire file into memory. +pub fn parquet_to_vortex_stream( + reader: ParquetRecordBatchStream, +) -> impl futures::Stream> { + reader.map(move |result| { + result + .map_err(|e| VortexError::generic(e.into())) + .and_then(|rb| { + let chunk = ArrayRef::from_arrow(rb, false)?; + let mut builder = builder_with_capacity(chunk.dtype(), chunk.len()); + + // Canonicalize the chunk. + chunk.append_to_builder( + builder.as_mut(), + &mut VortexSession::default().create_execution_ctx(), + )?; + + Ok(builder.finish()) + }) + }) +} + +/// Convert a single Parquet file to Vortex format using streaming. +/// +/// Streams data directly from Parquet to Vortex without loading the entire file into memory. +pub async fn convert_parquet_file_to_vortex( + parquet_path: &Path, + output_path: &Path, + compaction: CompactionStrategy, +) -> anyhow::Result<()> { + let file = File::open(parquet_path).await?; + let builder = ParquetRecordBatchStreamBuilder::new(file).await?; + let dtype = DType::from_arrow(builder.schema().as_ref()); + + let stream = parquet_to_vortex_stream(builder.build()?); + + let mut output_file = OpenOptions::new() + .write(true) + .truncate(true) + .create(true) + .open(output_path) + .await?; + + compaction + .apply_options(SESSION.write_options()) + .write( + &mut output_file, + ArrayStreamExt::boxed(ArrayStreamAdapter::new(dtype, stream)), + ) + .await?; + + Ok(()) +} + /// Convert all Parquet files in a directory to Vortex format. /// -/// This function reads Parquet files from `{input_path}/parquet/` and writes -/// Vortex files to `{input_path}/vortex-file-compressed/` (for Default compaction) -/// or `{input_path}/vortex-compact/` (for Compact compaction). +/// This function reads Parquet files from `{input_path}/parquet/` and writes Vortex files to +/// `{input_path}/vortex-file-compressed/` (for Default compaction) or +/// `{input_path}/vortex-compact/` (for Compact compaction). /// -/// The conversion is idempotent - existing Vortex files will not be regenerated. -pub async fn convert_parquet_to_vortex( +/// The conversion is idempotent: existing Vortex files will not be regenerated. +pub async fn convert_parquet_directory_to_vortex( input_path: &Path, compaction: CompactionStrategy, ) -> anyhow::Result<()> { @@ -72,7 +158,6 @@ pub async fn convert_parquet_to_vortex( create_dir_all(&vortex_dir).await?; let parquet_inputs = fs::read_dir(&parquet_path)?.collect::>>()?; - trace!( "Found {} parquet files in {}", parquet_inputs.len(), @@ -83,6 +168,7 @@ pub async fn convert_parquet_to_vortex( .iter() .filter(|entry| entry.path().extension().is_some_and(|e| e == "parquet")); + let concurrency = calculate_concurrency(); futures::stream::iter(iter) .map(|dir_entry| { let filename = { @@ -100,21 +186,8 @@ pub async fn convert_parquet_to_vortex( "Processing file '{filename}' with {:?} strategy", compaction ); - let chunked_array = parquet_to_vortex(parquet_file_path).await?; - let mut f = OpenOptions::new() - .write(true) - .truncate(true) - .create(true) - .open(&vtx_file) - .await?; - - let write_options = compaction.apply_options(SESSION.write_options()); - - write_options - .write(&mut f, chunked_array.to_array_stream()) - .await?; - - anyhow::Ok(()) + convert_parquet_file_to_vortex(&parquet_file_path, &vtx_file, compaction) + .await }) .await .expect("Failed to write Vortex file") @@ -122,8 +195,9 @@ pub async fn convert_parquet_to_vortex( .in_current_span(), ) }) - .buffer_unordered(16) + .buffer_unordered(concurrency) .try_collect::>() .await?; + Ok(()) } diff --git a/vortex-bench/src/datasets/taxi_data.rs b/vortex-bench/src/datasets/taxi_data.rs index 74db55437fa..9367d9746aa 100644 --- a/vortex-bench/src/datasets/taxi_data.rs +++ b/vortex-bench/src/datasets/taxi_data.rs @@ -15,7 +15,7 @@ use vortex::file::WriteOptionsSessionExt; use crate::CompactionStrategy; use crate::IdempotentPath; use crate::SESSION; -use crate::conversions::parquet_to_vortex; +use crate::conversions::parquet_to_vortex_chunks; use crate::datasets::Dataset; use crate::datasets::data_downloads::download_data; use crate::idempotent_async; @@ -61,7 +61,7 @@ pub async fn taxi_data_vortex() -> Result { let buf = output_fname.to_path_buf(); let mut output_file = TokioFile::create(output_fname).await?; - let data = parquet_to_vortex(taxi_data_parquet().await?).await?; + let data = parquet_to_vortex_chunks(taxi_data_parquet().await?).await?; SESSION .write_options() @@ -81,7 +81,7 @@ pub async fn taxi_data_vortex_compact() -> Result { // This is the only difference to `taxi_data_vortex`. let write_options = CompactionStrategy::Compact.apply_options(SESSION.write_options()); - let data = parquet_to_vortex(taxi_data_parquet().await?).await?; + let data = parquet_to_vortex_chunks(taxi_data_parquet().await?).await?; write_options .write(&mut output_file, data.to_array_stream()) diff --git a/vortex-bench/src/downloadable_dataset.rs b/vortex-bench/src/downloadable_dataset.rs index 712dcec6049..7aa140ec1b2 100644 --- a/vortex-bench/src/downloadable_dataset.rs +++ b/vortex-bench/src/downloadable_dataset.rs @@ -10,7 +10,7 @@ use vortex::file::WriteOptionsSessionExt; use crate::IdempotentPath; use crate::SESSION; -use crate::conversions::parquet_to_vortex; +use crate::conversions::parquet_to_vortex_chunks; use crate::datasets::Dataset; use crate::datasets::data_downloads::download_data; use crate::idempotent_async; @@ -61,7 +61,7 @@ impl Dataset for DownloadableDataset { let dir = format!("{}/", self.name()).to_data_path(); let vortex = dir.join(format!("{}.vortex", self.name())); - let data = parquet_to_vortex(parquet).await?; + let data = parquet_to_vortex_chunks(parquet).await?; idempotent_async(&vortex, async |path| -> anyhow::Result<()> { SESSION .write_options() diff --git a/vortex-bench/src/public_bi.rs b/vortex-bench/src/public_bi.rs index ca1f8df1471..0539266ed0d 100644 --- a/vortex-bench/src/public_bi.rs +++ b/vortex-bench/src/public_bi.rs @@ -39,7 +39,7 @@ use crate::Format; use crate::IdempotentPath; use crate::SESSION; use crate::TableSpec; -use crate::conversions::parquet_to_vortex; +use crate::conversions::parquet_to_vortex_chunks; use crate::datasets::Dataset; use crate::datasets::data_downloads::decompress_bz2; use crate::datasets::data_downloads::download_data; @@ -362,7 +362,7 @@ impl PBIData { let vortex = self.get_file_path(&table.name, FileType::Vortex); async move { - let data = parquet_to_vortex(parquet).await?; + let data = parquet_to_vortex_chunks(parquet).await?; let vortex_file = idempotent_async(&vortex, async |output_path| -> anyhow::Result<()> { SESSION