Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions benchmarks/compress-bench/src/vortex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use vortex::file::WriteOptionsSessionExt;
use vortex_bench::Format;
use vortex_bench::SESSION;
use vortex_bench::compress::Compressor;
use vortex_bench::conversions::parquet_to_vortex;
use vortex_bench::conversions::parquet_to_vortex_chunks;

/// Compressor implementation for Vortex format.
pub struct VortexCompressor;
Expand All @@ -30,7 +30,7 @@ impl Compressor for VortexCompressor {

async fn compress(&self, parquet_path: &Path) -> Result<(u64, Duration)> {
// Read the parquet file as an array stream
let uncompressed = parquet_to_vortex(parquet_path.to_path_buf()).await?;
let uncompressed = parquet_to_vortex_chunks(parquet_path.to_path_buf()).await?;

let mut buf = Vec::new();
let start = Instant::now();
Expand All @@ -46,7 +46,7 @@ impl Compressor for VortexCompressor {

async fn decompress(&self, parquet_path: &Path) -> Result<Duration> {
// First compress to get the bytes we'll decompress
let uncompressed = parquet_to_vortex(parquet_path.to_path_buf()).await?;
let uncompressed = parquet_to_vortex_chunks(parquet_path.to_path_buf()).await?;
let mut buf = Vec::new();
let mut cursor = Cursor::new(&mut buf);
SESSION
Expand Down
8 changes: 5 additions & 3 deletions benchmarks/datafusion-bench/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use vortex_bench::Engine;
use vortex_bench::Format;
use vortex_bench::Opt;
use vortex_bench::Opts;
use vortex_bench::conversions::convert_parquet_to_vortex;
use vortex_bench::conversions::convert_parquet_directory_to_vortex;
use vortex_bench::create_benchmark;
use vortex_bench::create_output_writer;
use vortex_bench::display::DisplayFormat;
Expand Down Expand Up @@ -125,10 +125,12 @@ async fn main() -> anyhow::Result<()> {
for format in args.formats.iter() {
match format {
Format::OnDiskVortex => {
convert_parquet_to_vortex(&base_path, CompactionStrategy::Default).await?;
convert_parquet_directory_to_vortex(&base_path, CompactionStrategy::Default)
.await?;
}
Format::VortexCompact => {
convert_parquet_to_vortex(&base_path, CompactionStrategy::Compact).await?;
convert_parquet_directory_to_vortex(&base_path, CompactionStrategy::Compact)
.await?;
}
_ => {}
}
Expand Down
14 changes: 11 additions & 3 deletions benchmarks/duckdb-bench/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use vortex_bench::Engine;
use vortex_bench::Format;
use vortex_bench::Opt;
use vortex_bench::Opts;
use vortex_bench::conversions::convert_parquet_to_vortex;
use vortex_bench::conversions::convert_parquet_directory_to_vortex;
use vortex_bench::create_benchmark;
use vortex_bench::create_output_writer;
use vortex_bench::display::DisplayFormat;
Expand Down Expand Up @@ -99,10 +99,18 @@ fn main() -> anyhow::Result<()> {
for format in args.formats.iter().copied() {
match format {
Format::OnDiskVortex => {
convert_parquet_to_vortex(&base_path, CompactionStrategy::Default).await?;
convert_parquet_directory_to_vortex(
&base_path,
CompactionStrategy::Default,
)
.await?;
}
Format::VortexCompact => {
convert_parquet_to_vortex(&base_path, CompactionStrategy::Compact).await?;
convert_parquet_directory_to_vortex(
&base_path,
CompactionStrategy::Compact,
)
.await?;
}
// OnDiskDuckDB tables are created during register_tables by loading from Parquet
_ => {}
Expand Down
6 changes: 3 additions & 3 deletions vortex-bench/src/bin/data-gen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use vortex_bench::CompactionStrategy;
use vortex_bench::Format;
use vortex_bench::Opt;
use vortex_bench::Opts;
use vortex_bench::conversions::convert_parquet_to_vortex;
use vortex_bench::conversions::convert_parquet_directory_to_vortex;
use vortex_bench::create_benchmark;
use vortex_bench::generate_duckdb_registration_sql;
use vortex_bench::setup_logging_and_tracing;
Expand Down Expand Up @@ -68,15 +68,15 @@ async fn main() -> anyhow::Result<()> {
.iter()
.any(|f| matches!(f, Format::OnDiskVortex))
{
convert_parquet_to_vortex(&base_path, CompactionStrategy::Default).await?;
convert_parquet_directory_to_vortex(&base_path, CompactionStrategy::Default).await?;
}

if args
.formats
.iter()
.any(|f| matches!(f, Format::VortexCompact))
{
convert_parquet_to_vortex(&base_path, CompactionStrategy::Compact).await?;
convert_parquet_directory_to_vortex(&base_path, CompactionStrategy::Compact).await?;
}

if args
Expand Down
2 changes: 1 addition & 1 deletion vortex-bench/src/clickbench/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use vortex::error::VortexExpect;

use crate::Format;
// Re-export for use by clickbench_benchmark
pub use crate::conversions::convert_parquet_to_vortex;
pub use crate::conversions::convert_parquet_directory_to_vortex;
use crate::idempotent_async;

pub static HITS_SCHEMA: LazyLock<Schema> = LazyLock::new(|| {
Expand Down
154 changes: 114 additions & 40 deletions vortex-bench/src/conversions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ use std::path::PathBuf;
use futures::StreamExt;
use futures::TryStreamExt;
use parquet::arrow::ParquetRecordBatchStreamBuilder;
use parquet::arrow::async_reader::ParquetRecordBatchStream;
use sysinfo::System;
use tokio::fs::File;
use tokio::fs::OpenOptions;
use tokio::fs::create_dir_all;
Expand All @@ -20,6 +22,12 @@ use vortex::array::VortexSessionExecute;
use vortex::array::arrays::ChunkedArray;
use vortex::array::arrow::FromArrowArray;
use vortex::array::builders::builder_with_capacity;
use vortex::array::stream::ArrayStreamAdapter;
use vortex::array::stream::ArrayStreamExt;
use vortex::dtype::DType;
use vortex::dtype::arrow::FromArrowType;
use vortex::error::VortexError;
use vortex::error::VortexResult;
use vortex::file::WriteOptionsSessionExt;
use vortex::session::VortexSession;

Expand All @@ -28,37 +36,115 @@ use crate::Format;
use crate::SESSION;
use crate::utils::file::idempotent_async;

/// Read a Parquet file and return it as a Vortex ArrayStream.
pub async fn parquet_to_vortex(parquet_path: PathBuf) -> anyhow::Result<ChunkedArray> {
/// Memory budget per concurrent conversion stream in GB. This is somewhat arbitary.
const MEMORY_PER_STREAM_GB: u64 = 4;

/// Minimum number of concurrent conversion streams.
const MIN_CONCURRENCY: u64 = 1;

/// Maximum number of concurrent conversion streams. This is somewhat arbitary.
const MAX_CONCURRENCY: u64 = 16;

/// Returns the available system memory in bytes.
fn available_memory_bytes() -> u64 {
System::new_all().available_memory()
}

/// Calculate appropriate concurrency based on available memory.
fn calculate_concurrency() -> usize {
let available_gb = available_memory_bytes() / (1024 * 1024 * 1024);
let concurrency = (available_gb / MEMORY_PER_STREAM_GB).clamp(MIN_CONCURRENCY, MAX_CONCURRENCY);

info!(
"Available memory: {}GB, maximum concurrency is: {}",
available_gb, concurrency
);

concurrency as usize
}

/// Read a Parquet file and return it as a Vortex [`ChunkedArray`].
///
/// Note: This loads the entire file into memory. For large files, use the streaming conversion like
/// in [`parquet_to_vortex_stream`] instead.
pub async fn parquet_to_vortex_chunks(parquet_path: PathBuf) -> anyhow::Result<ChunkedArray> {
let file = File::open(parquet_path).await?;
let mut reader = ParquetRecordBatchStreamBuilder::new(file).await?.build()?;
let mut chunks = vec![];

while let Some(rb) = reader.next().await {
let rb = rb?;
let chunk = ArrayRef::from_arrow(rb, false)?;

// Make sure data is uncompressed and canonicalized
let mut builder = builder_with_capacity(chunk.dtype(), chunk.len());
chunk.append_to_builder(
builder.as_mut(),
&mut VortexSession::default().create_execution_ctx(),
)?;
let chunk = builder.finish();
chunks.push(chunk);
}
let builder = ParquetRecordBatchStreamBuilder::new(file).await?;
let reader = builder.build()?;

let chunks: Vec<ArrayRef> = parquet_to_vortex_stream(reader)
.map(|r| r.map_err(anyhow::Error::from))
.try_collect()
.await?;

Ok(ChunkedArray::from_iter(chunks))
}

/// Create a streaming Vortex array from a Parquet reader.
///
/// Streams record batches and converts them to Vortex arrays on-the-fly, avoiding loading the
/// entire file into memory.
pub fn parquet_to_vortex_stream(
reader: ParquetRecordBatchStream<File>,
) -> impl futures::Stream<Item = VortexResult<ArrayRef>> {
reader.map(move |result| {
result
.map_err(|e| VortexError::generic(e.into()))
.and_then(|rb| {
let chunk = ArrayRef::from_arrow(rb, false)?;
let mut builder = builder_with_capacity(chunk.dtype(), chunk.len());

// Canonicalize the chunk.
chunk.append_to_builder(
builder.as_mut(),
&mut VortexSession::default().create_execution_ctx(),
)?;

Ok(builder.finish())
})
})
}

/// Convert a single Parquet file to Vortex format using streaming.
///
/// Streams data directly from Parquet to Vortex without loading the entire file into memory.
pub async fn convert_parquet_file_to_vortex(
parquet_path: &Path,
output_path: &Path,
compaction: CompactionStrategy,
) -> anyhow::Result<()> {
let file = File::open(parquet_path).await?;
let builder = ParquetRecordBatchStreamBuilder::new(file).await?;
let dtype = DType::from_arrow(builder.schema().as_ref());

let stream = parquet_to_vortex_stream(builder.build()?);

let mut output_file = OpenOptions::new()
.write(true)
.truncate(true)
.create(true)
.open(output_path)
.await?;

compaction
.apply_options(SESSION.write_options())
.write(
&mut output_file,
ArrayStreamExt::boxed(ArrayStreamAdapter::new(dtype, stream)),
)
.await?;

Ok(())
}

/// Convert all Parquet files in a directory to Vortex format.
///
/// This function reads Parquet files from `{input_path}/parquet/` and writes
/// Vortex files to `{input_path}/vortex-file-compressed/` (for Default compaction)
/// or `{input_path}/vortex-compact/` (for Compact compaction).
/// This function reads Parquet files from `{input_path}/parquet/` and writes Vortex files to
/// `{input_path}/vortex-file-compressed/` (for Default compaction) or
/// `{input_path}/vortex-compact/` (for Compact compaction).
///
/// The conversion is idempotent - existing Vortex files will not be regenerated.
pub async fn convert_parquet_to_vortex(
/// The conversion is idempotent: existing Vortex files will not be regenerated.
pub async fn convert_parquet_directory_to_vortex(
input_path: &Path,
compaction: CompactionStrategy,
) -> anyhow::Result<()> {
Expand All @@ -72,7 +158,6 @@ pub async fn convert_parquet_to_vortex(
create_dir_all(&vortex_dir).await?;

let parquet_inputs = fs::read_dir(&parquet_path)?.collect::<std::io::Result<Vec<_>>>()?;

trace!(
"Found {} parquet files in {}",
parquet_inputs.len(),
Expand All @@ -83,6 +168,7 @@ pub async fn convert_parquet_to_vortex(
.iter()
.filter(|entry| entry.path().extension().is_some_and(|e| e == "parquet"));

let concurrency = calculate_concurrency();
futures::stream::iter(iter)
.map(|dir_entry| {
let filename = {
Expand All @@ -100,30 +186,18 @@ pub async fn convert_parquet_to_vortex(
"Processing file '{filename}' with {:?} strategy",
compaction
);
let chunked_array = parquet_to_vortex(parquet_file_path).await?;
let mut f = OpenOptions::new()
.write(true)
.truncate(true)
.create(true)
.open(&vtx_file)
.await?;

let write_options = compaction.apply_options(SESSION.write_options());

write_options
.write(&mut f, chunked_array.to_array_stream())
.await?;

anyhow::Ok(())
convert_parquet_file_to_vortex(&parquet_file_path, &vtx_file, compaction)
.await
})
.await
.expect("Failed to write Vortex file")
}
.in_current_span(),
)
})
.buffer_unordered(16)
.buffer_unordered(concurrency)
.try_collect::<Vec<_>>()
.await?;

Ok(())
}
6 changes: 3 additions & 3 deletions vortex-bench/src/datasets/taxi_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use vortex::file::WriteOptionsSessionExt;
use crate::CompactionStrategy;
use crate::IdempotentPath;
use crate::SESSION;
use crate::conversions::parquet_to_vortex;
use crate::conversions::parquet_to_vortex_chunks;
use crate::datasets::Dataset;
use crate::datasets::data_downloads::download_data;
use crate::idempotent_async;
Expand Down Expand Up @@ -61,7 +61,7 @@ pub async fn taxi_data_vortex() -> Result<PathBuf> {
let buf = output_fname.to_path_buf();
let mut output_file = TokioFile::create(output_fname).await?;

let data = parquet_to_vortex(taxi_data_parquet().await?).await?;
let data = parquet_to_vortex_chunks(taxi_data_parquet().await?).await?;

SESSION
.write_options()
Expand All @@ -81,7 +81,7 @@ pub async fn taxi_data_vortex_compact() -> Result<PathBuf> {
// This is the only difference to `taxi_data_vortex`.
let write_options = CompactionStrategy::Compact.apply_options(SESSION.write_options());

let data = parquet_to_vortex(taxi_data_parquet().await?).await?;
let data = parquet_to_vortex_chunks(taxi_data_parquet().await?).await?;

write_options
.write(&mut output_file, data.to_array_stream())
Expand Down
4 changes: 2 additions & 2 deletions vortex-bench/src/downloadable_dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use vortex::file::WriteOptionsSessionExt;

use crate::IdempotentPath;
use crate::SESSION;
use crate::conversions::parquet_to_vortex;
use crate::conversions::parquet_to_vortex_chunks;
use crate::datasets::Dataset;
use crate::datasets::data_downloads::download_data;
use crate::idempotent_async;
Expand Down Expand Up @@ -61,7 +61,7 @@ impl Dataset for DownloadableDataset {
let dir = format!("{}/", self.name()).to_data_path();
let vortex = dir.join(format!("{}.vortex", self.name()));

let data = parquet_to_vortex(parquet).await?;
let data = parquet_to_vortex_chunks(parquet).await?;
idempotent_async(&vortex, async |path| -> anyhow::Result<()> {
SESSION
.write_options()
Expand Down
4 changes: 2 additions & 2 deletions vortex-bench/src/public_bi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use crate::Format;
use crate::IdempotentPath;
use crate::SESSION;
use crate::TableSpec;
use crate::conversions::parquet_to_vortex;
use crate::conversions::parquet_to_vortex_chunks;
use crate::datasets::Dataset;
use crate::datasets::data_downloads::decompress_bz2;
use crate::datasets::data_downloads::download_data;
Expand Down Expand Up @@ -362,7 +362,7 @@ impl PBIData {
let vortex = self.get_file_path(&table.name, FileType::Vortex);

async move {
let data = parquet_to_vortex(parquet).await?;
let data = parquet_to_vortex_chunks(parquet).await?;
let vortex_file =
idempotent_async(&vortex, async |output_path| -> anyhow::Result<()> {
SESSION
Expand Down
Loading