diff --git a/Cargo.lock b/Cargo.lock index eec44390371..d67fd625d87 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10339,6 +10339,7 @@ dependencies = [ "libloading 0.8.9", "paste", "vortex-cuda-macros", + "vortex-dtype", ] [[package]] @@ -10365,6 +10366,7 @@ dependencies = [ "vortex-dtype", "vortex-error", "vortex-fastlanes", + "vortex-io", "vortex-mask", "vortex-nvcomp", "vortex-scalar", @@ -10612,6 +10614,7 @@ dependencies = [ "vortex-array", "vortex-buffer", "vortex-bytebool", + "vortex-cuda", "vortex-datetime-parts", "vortex-decimal-byte-parts", "vortex-dtype", @@ -10712,6 +10715,7 @@ dependencies = [ "tempfile", "tokio", "tracing", + "vortex-array", "vortex-buffer", "vortex-error", "vortex-metrics", diff --git a/encodings/sparse/src/canonical.rs b/encodings/sparse/src/canonical.rs index 2c716d7714a..665d7b94872 100644 --- a/encodings/sparse/src/canonical.rs +++ b/encodings/sparse/src/canonical.rs @@ -17,6 +17,7 @@ use vortex_array::arrays::PrimitiveArray; use vortex_array::arrays::StructArray; use vortex_array::arrays::VarBinViewArray; use vortex_array::arrays::build_views::BinaryView; +use vortex_array::buffer::BufferHandle; use vortex_array::builders::ArrayBuilder; use vortex_array::builders::DecimalBuilder; use vortex_array::builders::ListViewBuilder; @@ -480,7 +481,7 @@ fn canonicalize_varbin_inner( let mut buffers = values.buffers().to_vec(); let fill = if let Some(buffer) = &fill_value { - buffers.push(buffer.clone()); + buffers.push(BufferHandle::new_host(buffer.clone())); BinaryView::make_view( buffer.as_ref(), u32::try_from(n_patch_buffers).vortex_expect("too many buffers"), @@ -498,9 +499,11 @@ fn canonicalize_varbin_inner( views[patch_index_usize] = patch; } + let views = BufferHandle::new_host(views.freeze().into_byte_buffer()); + // SAFETY: views are constructed to maintain the invariants let array = unsafe { - VarBinViewArray::new_unchecked(views.freeze(), Arc::from(buffers), dtype, validity) + VarBinViewArray::new_handle_unchecked(views, Arc::from(buffers), dtype, validity) }; Canonical::VarBinView(array) diff --git a/fuzz/src/array/fill_null.rs b/fuzz/src/array/fill_null.rs index 8be717336a2..29c90c61c32 100644 --- a/fuzz/src/array/fill_null.rs +++ b/fuzz/src/array/fill_null.rs @@ -196,8 +196,8 @@ fn fill_varbinview_array( let string_refs: Vec<&str> = strings.iter().map(|s| s.as_str()).collect(); let result = VarBinViewArray::from_iter_str(string_refs).into_array(); if result_nullability == Nullability::Nullable { - VarBinViewArray::new( - result.to_varbinview().views().clone(), + VarBinViewArray::new_handle( + result.to_varbinview().views_handle().clone(), result.to_varbinview().buffers().clone(), result.dtype().as_nullable(), result_nullability.into(), @@ -230,8 +230,8 @@ fn fill_varbinview_array( let binary_refs: Vec<&[u8]> = binaries.iter().map(|b| b.as_slice()).collect(); let result = VarBinViewArray::from_iter_bin(binary_refs).into_array(); if result_nullability == Nullability::Nullable { - VarBinViewArray::new( - result.to_varbinview().views().clone(), + VarBinViewArray::new_handle( + result.to_varbinview().views_handle().clone(), result.to_varbinview().buffers().clone(), result.dtype().as_nullable(), result_nullability.into(), diff --git a/fuzz/src/array/mask.rs b/fuzz/src/array/mask.rs index 805a9b6f1c6..3ccb94dbb46 100644 --- a/fuzz/src/array/mask.rs +++ b/fuzz/src/array/mask.rs @@ -52,8 +52,8 @@ pub fn mask_canonical_array(canonical: Canonical, mask: &Mask) -> VortexResult { let new_validity = array.validity().mask(mask); - VarBinViewArray::new( - array.views().clone(), + VarBinViewArray::new_handle( + array.views_handle().clone(), array.buffers().clone(), array.dtype().with_nullability(new_validity.nullability()), new_validity, diff --git a/vortex-array/src/arrays/decimal/vtable/mod.rs b/vortex-array/src/arrays/decimal/vtable/mod.rs index f253902813f..7ae29978423 100644 --- a/vortex-array/src/arrays/decimal/vtable/mod.rs +++ b/vortex-array/src/arrays/decimal/vtable/mod.rs @@ -107,13 +107,11 @@ impl VTable for DecimalVTable { match_each_decimal_value_type!(metadata.values_type(), |D| { // Check and reinterpret-cast the buffer - if let Some(buffer) = values.as_host_opt() { - vortex_ensure!( - buffer.is_aligned(Alignment::of::()), - "DecimalArray buffer not aligned for values type {:?}", - D::DECIMAL_TYPE - ); - } + vortex_ensure!( + values.alignment().is_aligned_to(Alignment::of::()), + "DecimalArray buffer not aligned for values type {:?}", + D::DECIMAL_TYPE + ); DecimalArray::try_new_handle(values, metadata.values_type(), *decimal_dtype, validity) }) } diff --git a/vortex-array/src/arrays/filter/array.rs b/vortex-array/src/arrays/filter/array.rs index df3754dc58c..40857487a65 100644 --- a/vortex-array/src/arrays/filter/array.rs +++ b/vortex-array/src/arrays/filter/array.rs @@ -9,6 +9,14 @@ use vortex_mask::Mask; use crate::ArrayRef; use crate::stats::ArrayStats; +/// Decomposed parts of the filter array. +pub struct FilterArrayParts { + /// Child array that is filtered by the mask + pub child: ArrayRef, + /// Mask to apply at filter time. Child elements with set indices are kept, the rest discarded. + pub mask: Mask, +} + // TODO(connor): Write docs on why we have this, and what we had in the old world so that the future // does not repeat the mistakes of the past. /// A lazy array that represents filtering a child array by a boolean [`Mask`]. @@ -56,4 +64,12 @@ impl FilterArray { pub fn filter_mask(&self) -> &Mask { &self.mask } + + /// Consume the array and return its individual components. + pub fn into_parts(self) -> FilterArrayParts { + FilterArrayParts { + child: self.child, + mask: self.mask, + } + } } diff --git a/vortex-array/src/arrays/filter/execute/mod.rs b/vortex-array/src/arrays/filter/execute/mod.rs index 733a6f9a2f4..3eaec409c16 100644 --- a/vortex-array/src/arrays/filter/execute/mod.rs +++ b/vortex-array/src/arrays/filter/execute/mod.rs @@ -31,22 +31,15 @@ mod struct_; mod varbinview; /// Reconstruct a [`Mask`] from an [`Arc`]. -#[inline] fn values_to_mask(values: &Arc) -> Mask { Mask::Values(values.clone()) } -/// A helper function that lazily filters a [`Validity`] with a selection mask. -/// -/// If the validity is a [`Validity::Array`], then this wraps it in a `FilterArray` instead of -/// eagerly filtering the values immediately. +/// A helper function that lazily filters a [`Validity`] with selection mask values. fn filter_validity(validity: Validity, mask: &Arc) -> Validity { - match validity { - v @ (Validity::NonNullable | Validity::AllValid | Validity::AllInvalid) => v, - Validity::Array(arr) => { - Validity::Array(FilterArray::new(arr.clone(), values_to_mask(mask)).into_array()) - } - } + validity + .filter(&values_to_mask(mask)) + .vortex_expect("Somehow unable to wrap filter around a validity array") } /// Check for some fast-path execution conditions before calling [`execute_filter`]. diff --git a/vortex-array/src/arrays/filter/mod.rs b/vortex-array/src/arrays/filter/mod.rs index 1c72e20773b..3b28df57b44 100644 --- a/vortex-array/src/arrays/filter/mod.rs +++ b/vortex-array/src/arrays/filter/mod.rs @@ -3,6 +3,7 @@ mod array; pub use array::FilterArray; +pub use array::FilterArrayParts; mod execute; diff --git a/vortex-array/src/arrays/masked/execute.rs b/vortex-array/src/arrays/masked/execute.rs index 1ab1fe184ad..1dd9523ed4e 100644 --- a/vortex-array/src/arrays/masked/execute.rs +++ b/vortex-array/src/arrays/masked/execute.rs @@ -103,8 +103,8 @@ fn mask_validity_varbinview(array: VarBinViewArray, mask: &Mask) -> VarBinViewAr let new_validity = combine_validity(array.validity(), mask, len); // SAFETY: We're only changing validity, not the data structure unsafe { - VarBinViewArray::new_unchecked( - array.views().clone(), + VarBinViewArray::new_handle_unchecked( + array.views_handle().clone(), array.buffers().clone(), dtype, new_validity, diff --git a/vortex-array/src/arrays/primitive/vtable/mod.rs b/vortex-array/src/arrays/primitive/vtable/mod.rs index cf702ca3dad..459f24b433e 100644 --- a/vortex-array/src/arrays/primitive/vtable/mod.rs +++ b/vortex-array/src/arrays/primitive/vtable/mod.rs @@ -101,16 +101,13 @@ impl VTable for PrimitiveVTable { ); } - // For host buffers, we eagerly check alignment on construction. - // TODO(aduffy): check for device buffers. CUDA buffers are generally 256-byte aligned, - // but not sure about other devices. - if let Some(host_buf) = buffer.as_host_opt() { - vortex_ensure!( - host_buf.is_aligned(Alignment::new(ptype.byte_width())), - "PrimitiveArray::build: Buffer must be aligned to {}", - ptype.byte_width() - ); - } + vortex_ensure!( + buffer + .alignment() + .is_aligned_to(Alignment::new(ptype.byte_width())), + "PrimitiveArray::build: Buffer must be aligned to {}", + ptype.byte_width() + ); // SAFETY: checked ahead of time unsafe { diff --git a/vortex-array/src/arrays/varbinview/array.rs b/vortex-array/src/arrays/varbinview/array.rs index 63c8636a167..f30bf0ced19 100644 --- a/vortex-array/src/arrays/varbinview/array.rs +++ b/vortex-array/src/arrays/varbinview/array.rs @@ -3,6 +3,7 @@ use std::sync::Arc; +use vortex_buffer::Alignment; use vortex_buffer::Buffer; use vortex_buffer::ByteBuffer; use vortex_dtype::DType; @@ -15,6 +16,7 @@ use vortex_error::vortex_err; use vortex_error::vortex_panic; use vortex_vector::binaryview::BinaryView; +use crate::buffer::BufferHandle; use crate::builders::ArrayBuilder; use crate::builders::VarBinViewBuilder; use crate::stats::ArrayStats; @@ -82,16 +84,16 @@ use crate::validity::Validity; #[derive(Clone, Debug)] pub struct VarBinViewArray { pub(super) dtype: DType, - pub(super) buffers: Arc<[ByteBuffer]>, - pub(super) views: Buffer, + pub(super) buffers: Arc<[BufferHandle]>, + pub(super) views: BufferHandle, pub(super) validity: Validity, pub(super) stats_set: ArrayStats, } pub struct VarBinViewArrayParts { pub dtype: DType, - pub buffers: Arc<[ByteBuffer]>, - pub views: Buffer, + pub buffers: Arc<[BufferHandle]>, + pub views: BufferHandle, pub validity: Validity, } @@ -112,6 +114,22 @@ impl VarBinViewArray { .vortex_expect("VarBinViewArray construction failed") } + /// Creates a new [`VarBinViewArray`] with device or host memory. + /// + /// # Panics + /// + /// Panics if the provided components do not satisfy the invariants documented + /// in [`VarBinViewArray::new_unchecked`]. + pub fn new_handle( + views: BufferHandle, + buffers: Arc<[BufferHandle]>, + dtype: DType, + validity: Validity, + ) -> Self { + Self::try_new_handle(views, buffers, dtype, validity) + .vortex_expect("VarbinViewArray construction failed") + } + /// Constructs a new `VarBinViewArray`. /// /// See [`VarBinViewArray::new_unchecked`] for more information. @@ -132,6 +150,32 @@ impl VarBinViewArray { Ok(unsafe { Self::new_unchecked(views, buffers, dtype, validity) }) } + /// Constructs a new `VarBinViewArray`. + /// + /// See [`VarBinViewArray::new_unchecked`] for more information. + /// + /// # Errors + /// + /// Returns an error if the provided components do not satisfy the invariants documented in + /// [`VarBinViewArray::new_unchecked`]. + pub fn try_new_handle( + views: BufferHandle, + buffers: Arc<[BufferHandle]>, + dtype: DType, + validity: Validity, + ) -> VortexResult { + // TODO(aduffy): device validation. + if let Some(host) = views.as_host_opt() { + vortex_ensure!( + host.is_aligned(Alignment::of::()), + "Views on host must be 16 byte aligned" + ); + } + + // SAFETY: validate ensures all invariants are met. + Ok(unsafe { Self::new_handle_unchecked(views, buffers, dtype, validity) }) + } + /// Creates a new [`VarBinViewArray`] without validation from these components: /// /// * `views` is a buffer of 16-byte view entries (one per logical element). @@ -171,10 +215,38 @@ impl VarBinViewArray { Self::validate(&views, &buffers, &dtype, &validity) .vortex_expect("[Debug Assertion]: Invalid `VarBinViewArray` parameters"); + let handles: Vec = buffers + .iter() + .cloned() + .map(BufferHandle::new_host) + .collect(); + + let handles = Arc::from(handles); + Self { dtype, - buffers, + buffers: handles, + views: BufferHandle::new_host(views.into_byte_buffer()), + validity, + stats_set: Default::default(), + } + } + + /// Construct a new array from `BufferHandle`s without validation. + /// + /// # Safety + /// + /// See documentation in `new_unchecked`. + pub unsafe fn new_handle_unchecked( + views: BufferHandle, + buffers: Arc<[BufferHandle]>, + dtype: DType, + validity: Validity, + ) -> Self { + Self { views, + buffers, + dtype, validity, stats_set: Default::default(), } @@ -290,7 +362,16 @@ impl VarBinViewArray { /// contain either a pointer into one of the array's owned `buffer`s OR an inlined copy of /// the string (if the string has 12 bytes or fewer). #[inline] - pub fn views(&self) -> &Buffer { + pub fn views(&self) -> &[BinaryView] { + let host_views = self.views.as_host(); + let len = host_views.len() / size_of::(); + + // SAFETY: data alignment is checked for host buffers on construction + unsafe { std::slice::from_raw_parts(host_views.as_ptr().cast(), len) } + } + + /// Return the buffer handle backing the views. + pub fn views_handle(&self) -> &BufferHandle { &self.views } @@ -308,7 +389,8 @@ impl VarBinViewArray { .slice(view_ref.as_range()) } else { // Return access to the range of bytes around it. - views + self.views_handle() + .as_host() .clone() .into_byte_buffer() .slice_ref(view.as_inlined().value()) @@ -329,12 +411,12 @@ impl VarBinViewArray { self.nbuffers() ); } - &self.buffers[idx] + self.buffers[idx].as_host() } /// Iterate over the underlying raw data buffers, not including the views buffer. #[inline] - pub fn buffers(&self) -> &Arc<[ByteBuffer]> { + pub fn buffers(&self) -> &Arc<[BufferHandle]> { &self.buffers } diff --git a/vortex-array/src/arrays/varbinview/compute/cast.rs b/vortex-array/src/arrays/varbinview/compute/cast.rs index 19908e190bb..f04ad29851b 100644 --- a/vortex-array/src/arrays/varbinview/compute/cast.rs +++ b/vortex-array/src/arrays/varbinview/compute/cast.rs @@ -29,8 +29,8 @@ impl CastKernel for VarBinViewVTable { // SAFETY: casting just changes the DType, does not affect invariants on views/buffers. unsafe { Ok(Some( - VarBinViewArray::new_unchecked( - array.views().clone(), + VarBinViewArray::new_handle_unchecked( + array.views_handle().clone(), array.buffers().clone(), new_dtype, new_validity, diff --git a/vortex-array/src/arrays/varbinview/compute/mask.rs b/vortex-array/src/arrays/varbinview/compute/mask.rs index 884419c7578..dd32dac5c8b 100644 --- a/vortex-array/src/arrays/varbinview/compute/mask.rs +++ b/vortex-array/src/arrays/varbinview/compute/mask.rs @@ -17,8 +17,8 @@ impl MaskKernel for VarBinViewVTable { fn mask(&self, array: &VarBinViewArray, mask: &Mask) -> VortexResult { // SAFETY: masking the validity does not affect the invariants unsafe { - Ok(VarBinViewArray::new_unchecked( - array.views().clone(), + Ok(VarBinViewArray::new_handle_unchecked( + array.views_handle().clone(), array.buffers().clone(), array.dtype().as_nullable(), array.validity().mask(mask), diff --git a/vortex-array/src/arrays/varbinview/compute/take.rs b/vortex-array/src/arrays/varbinview/compute/take.rs index 90475e6698b..b937c904438 100644 --- a/vortex-array/src/arrays/varbinview/compute/take.rs +++ b/vortex-array/src/arrays/varbinview/compute/take.rs @@ -2,7 +2,6 @@ // SPDX-FileCopyrightText: Copyright the Vortex contributors use std::iter; -use std::ops::Deref; use num_traits::AsPrimitive; use vortex_buffer::Buffer; @@ -18,6 +17,7 @@ use crate::IntoArray; use crate::ToCanonical; use crate::arrays::VarBinViewArray; use crate::arrays::VarBinViewVTable; +use crate::buffer::BufferHandle; use crate::compute::TakeKernel; use crate::compute::TakeKernelAdapter; use crate::register_kernel; @@ -37,8 +37,8 @@ impl TakeKernel for VarBinViewVTable { // SAFETY: taking all components at same indices maintains invariants unsafe { - Ok(VarBinViewArray::new_unchecked( - views_buffer, + Ok(VarBinViewArray::new_handle_unchecked( + BufferHandle::new_host(views_buffer.into_byte_buffer()), array.buffers().clone(), array .dtype() @@ -53,12 +53,11 @@ impl TakeKernel for VarBinViewVTable { register_kernel!(TakeKernelAdapter(VarBinViewVTable).lift()); fn take_views>( - views: &Buffer, + views_ref: &[BinaryView], indices: &[I], mask: &Mask, ) -> Buffer { // NOTE(ngates): this deref is not actually trivial, so we run it once. - let views_ref = views.deref(); // We do not use iter_bools directly, since the resulting dyn iterator cannot // implement TrustedLen. match mask.bit_buffer() { diff --git a/vortex-array/src/arrays/varbinview/compute/zip.rs b/vortex-array/src/arrays/varbinview/compute/zip.rs index e5674632715..9718c996c38 100644 --- a/vortex-array/src/arrays/varbinview/compute/zip.rs +++ b/vortex-array/src/arrays/varbinview/compute/zip.rs @@ -46,8 +46,10 @@ impl ZipKernel for VarBinViewVTable { // build buffer lookup tables for both arrays, these map from the original buffer idx // to the new buffer index in the result array let mut buffers = DeduplicatedBuffers::default(); - let true_lookup = buffers.extend_from_slice(if_true.buffers()); - let false_lookup = buffers.extend_from_slice(if_false.buffers()); + let true_lookup = + buffers.extend_from_iter(if_true.buffers().iter().map(|b| b.as_host().clone())); + let false_lookup = + buffers.extend_from_iter(if_false.buffers().iter().map(|b| b.as_host().clone())); let mut views_builder = BufferMut::::with_capacity(len); let mut validity_builder = LazyBitBufferBuilder::new(len); diff --git a/vortex-array/src/arrays/varbinview/vtable/array.rs b/vortex-array/src/arrays/varbinview/vtable/array.rs index 63fbf7b4d0c..ee12435fbd7 100644 --- a/vortex-array/src/arrays/varbinview/vtable/array.rs +++ b/vortex-array/src/arrays/varbinview/vtable/array.rs @@ -15,7 +15,7 @@ use crate::vtable::BaseArrayVTable; impl BaseArrayVTable for VarBinViewVTable { fn len(array: &VarBinViewArray) -> usize { - array.views.len() + array.views().len() } fn dtype(array: &VarBinViewArray) -> &DType { diff --git a/vortex-array/src/arrays/varbinview/vtable/mod.rs b/vortex-array/src/arrays/varbinview/vtable/mod.rs index e987fbeeed5..daf0746eda2 100644 --- a/vortex-array/src/arrays/varbinview/vtable/mod.rs +++ b/vortex-array/src/arrays/varbinview/vtable/mod.rs @@ -123,9 +123,11 @@ impl VTable for VarBinViewVTable { fn slice(array: &Self::Array, range: Range) -> VortexResult> { Ok(Some( - VarBinViewArray::new( - array.views().slice(range.clone()), - array.buffers().clone(), + VarBinViewArray::new_handle( + array + .views_handle() + .slice_typed::(range.clone()), + Arc::clone(array.buffers()), array.dtype().clone(), array.validity().slice(range)?, ) diff --git a/vortex-array/src/arrays/varbinview/vtable/visitor.rs b/vortex-array/src/arrays/varbinview/vtable/visitor.rs index 85306484217..a5321d4cc6f 100644 --- a/vortex-array/src/arrays/varbinview/vtable/visitor.rs +++ b/vortex-array/src/arrays/varbinview/vtable/visitor.rs @@ -5,22 +5,15 @@ use super::VarBinViewVTable; use crate::ArrayBufferVisitor; use crate::ArrayChildVisitor; use crate::arrays::VarBinViewArray; -use crate::buffer::BufferHandle; use crate::vtable::ValidityHelper; use crate::vtable::VisitorVTable; impl VisitorVTable for VarBinViewVTable { fn visit_buffers(array: &VarBinViewArray, visitor: &mut dyn ArrayBufferVisitor) { for (i, buffer) in array.buffers().iter().enumerate() { - visitor.visit_buffer_handle( - &format!("buffer_{i}"), - &BufferHandle::new_host(buffer.clone()), - ); + visitor.visit_buffer_handle(&format!("buffer_{i}"), buffer); } - visitor.visit_buffer_handle( - "views", - &BufferHandle::new_host(array.views().clone().into_byte_buffer()), - ); + visitor.visit_buffer_handle("views", array.views_handle()); } fn visit_children(array: &VarBinViewArray, visitor: &mut dyn ArrayChildVisitor) { diff --git a/vortex-array/src/arrow/executor/byte_view.rs b/vortex-array/src/arrow/executor/byte_view.rs index 2171a558c30..798b8bbd430 100644 --- a/vortex-array/src/arrow/executor/byte_view.rs +++ b/vortex-array/src/arrow/executor/byte_view.rs @@ -25,11 +25,11 @@ pub fn canonical_varbinview_to_arrow( array: &VarBinViewArray, ) -> VortexResult { let views = - ScalarBuffer::::from(array.views().clone().into_byte_buffer().into_arrow_buffer()); + ScalarBuffer::::from(array.views_handle().as_host().clone().into_arrow_buffer()); let buffers: Vec<_> = array .buffers() .iter() - .map(|buffer| buffer.clone().into_arrow_buffer()) + .map(|buffer| buffer.as_host().clone().into_arrow_buffer()) .collect(); let nulls = to_null_buffer(array.validity_mask()?); @@ -44,11 +44,11 @@ pub fn execute_varbinview_to_arrow( ctx: &mut ExecutionCtx, ) -> VortexResult { let views = - ScalarBuffer::::from(array.views().clone().into_byte_buffer().into_arrow_buffer()); + ScalarBuffer::::from(array.views_handle().as_host().clone().into_arrow_buffer()); let buffers: Vec<_> = array .buffers() .iter() - .map(|buffer| buffer.clone().into_arrow_buffer()) + .map(|buffer| buffer.as_host().clone().into_arrow_buffer()) .collect(); let nulls = to_arrow_null_buffer(array.validity().clone(), array.len(), ctx)?; diff --git a/vortex-array/src/buffer.rs b/vortex-array/src/buffer.rs index cf209b75f80..35cbf5a233b 100644 --- a/vortex-array/src/buffer.rs +++ b/vortex-array/src/buffer.rs @@ -14,6 +14,7 @@ use vortex_buffer::Alignment; use vortex_buffer::ByteBuffer; use vortex_error::VortexExpect; use vortex_error::VortexResult; +use vortex_error::vortex_bail; use vortex_utils::dyn_traits::DynEq; use vortex_utils::dyn_traits::DynHash; @@ -50,6 +51,9 @@ pub trait DeviceBuffer: 'static + Send + Sync + Debug + DynEq + DynHash { /// Returns the length of the buffer in bytes. fn len(&self) -> usize; + /// Returns the alignment of the buffer. + fn alignment(&self) -> Alignment; + /// Returns true if the buffer is empty. fn is_empty(&self) -> bool { self.len() == 0 @@ -130,6 +134,40 @@ impl BufferHandle { } } + /// Returns the alignment of the buffer. + pub fn alignment(&self) -> Alignment { + match &self.0 { + Inner::Host(bytes) => bytes.alignment(), + Inner::Device(device) => device.alignment(), + } + } + + /// Returns true if the buffer is aligned to the given alignment. + pub fn is_aligned(&self, alignment: Alignment) -> bool { + self.alignment().is_aligned_to(alignment) + } + + /// Ensure the buffer satisfies the requested alignment. + /// + /// Host buffers will be copied if necessary. Device buffers will error if the + /// alignment requirement is not met. + pub fn ensure_aligned(&self, alignment: Alignment) -> VortexResult { + match &self.0 { + Inner::Host(buffer) => Ok(BufferHandle::new_host(buffer.clone().aligned(alignment))), + Inner::Device(device) => { + if device.alignment().is_aligned_to(alignment) { + Ok(self.clone()) + } else { + vortex_bail!( + "Device buffer alignment {} does not satisfy required alignment {}", + device.alignment(), + alignment + ); + } + } + } + } + /// Check if the buffer is empty. pub fn is_empty(&self) -> bool { self.len() == 0 diff --git a/vortex-array/src/builders/varbinview.rs b/vortex-array/src/builders/varbinview.rs index e89fc824c19..79d6297f3a5 100644 --- a/vortex-array/src/builders/varbinview.rs +++ b/vortex-array/src/builders/varbinview.rs @@ -5,6 +5,7 @@ use std::any::Any; use std::ops::Range; use std::sync::Arc; +use itertools::Itertools; use vortex_buffer::Buffer; use vortex_buffer::BufferMut; use vortex_buffer::ByteBuffer; @@ -437,7 +438,7 @@ impl CompletedBuffers { Self::Deduplicated(completed_buffers), BuffersWithOffsets::AllKept { buffers, offsets }, ) => { - let buffer_lookup = completed_buffers.extend_from_slice(&buffers); + let buffer_lookup = completed_buffers.extend_from_iter(buffers.iter().cloned()); ViewAdjustment::lookup(buffer_lookup, offsets) } ( @@ -498,11 +499,11 @@ impl DeduplicatedBuffers { .collect() } - pub(crate) fn extend_from_slice(&mut self, buffers: &[ByteBuffer]) -> Vec { - buffers - .iter() - .map(|buffer| self.push(buffer.clone())) - .collect() + pub(crate) fn extend_from_iter( + &mut self, + buffers: impl Iterator, + ) -> Vec { + buffers.map(|buffer| self.push(buffer)).collect() } pub(crate) fn finish(self) -> Arc<[ByteBuffer]> { @@ -589,7 +590,14 @@ impl BuffersWithOffsets { pub fn from_array(array: &VarBinViewArray, compaction_threshold: f64) -> Self { if compaction_threshold == 0.0 { return Self::AllKept { - buffers: array.buffers().clone(), + buffers: Arc::from( + array + .buffers() + .to_vec() + .into_iter() + .map(|b| b.unwrap_host()) + .collect_vec(), + ), offsets: None, }; } @@ -613,10 +621,11 @@ impl BuffersWithOffsets { .zip(array.buffers().iter()) .map(|(utilization, buffer)| { match compaction_strategy(utilization, compaction_threshold) { - CompactionStrategy::KeepFull => (Some(buffer.clone()), 0), - CompactionStrategy::Slice { start, end } => { - (Some(buffer.slice(start as usize..end as usize)), start) - } + CompactionStrategy::KeepFull => (Some(buffer.as_host().clone()), 0), + CompactionStrategy::Slice { start, end } => ( + Some(buffer.as_host().slice(start as usize..end as usize)), + start, + ), CompactionStrategy::Rewrite => (None, 0), } }); diff --git a/vortex-array/src/canonical_to_vector.rs b/vortex-array/src/canonical_to_vector.rs index 687bf93d023..0593fbb909d 100644 --- a/vortex-array/src/canonical_to_vector.rs +++ b/vortex-array/src/canonical_to_vector.rs @@ -16,6 +16,7 @@ use vortex_error::VortexResult; use vortex_mask::Mask; use vortex_vector::Vector; use vortex_vector::binaryview::BinaryVector; +use vortex_vector::binaryview::BinaryView; use vortex_vector::binaryview::StringVector; use vortex_vector::bool::BoolVector; use vortex_vector::decimal::DVector; @@ -98,19 +99,21 @@ impl Canonical { } Canonical::VarBinView(a) => { let validity = a.validity_mask()?; + let views = + Buffer::::from_byte_buffer(a.views_handle().as_host().clone()); match a.dtype() { DType::Utf8(_) => { - let views = a.views().clone(); // Convert Arc<[ByteBuffer]> to Arc> - let buffers: Box<[_]> = a.buffers().iter().cloned().collect(); + let buffers: Box<[_]> = + a.buffers().iter().map(|b| b.as_host()).cloned().collect(); Vector::String(unsafe { StringVector::new_unchecked(views, Arc::new(buffers), validity) }) } DType::Binary(_) => { - let views = a.views().clone(); // Convert Arc<[ByteBuffer]> to Arc> - let buffers: Box<[_]> = a.buffers().iter().cloned().collect(); + let buffers: Box<[_]> = + a.buffers().iter().map(|b| b.as_host()).cloned().collect(); Vector::Binary(unsafe { BinaryVector::new_unchecked(views, Arc::new(buffers), validity) }) diff --git a/vortex-array/src/compute/filter.rs b/vortex-array/src/compute/filter.rs index fe64704c767..f61c5f6fffe 100644 --- a/vortex-array/src/compute/filter.rs +++ b/vortex-array/src/compute/filter.rs @@ -76,7 +76,7 @@ pub(crate) fn warm_up_vtable() -> usize { /// not the case. pub fn filter(array: &dyn Array, mask: &Mask) -> VortexResult { // TODO(connor): Remove this function completely!!! - array.filter(mask.clone()) + Ok(array.filter(mask.clone())?.to_canonical()?.into_array()) } struct Filter; diff --git a/vortex-array/src/patches.rs b/vortex-array/src/patches.rs index e23e93931a2..39e320ba219 100644 --- a/vortex-array/src/patches.rs +++ b/vortex-array/src/patches.rs @@ -40,6 +40,7 @@ use crate::IntoArray; use crate::ToCanonical; use crate::arrays::PrimitiveArray; use crate::compute::cast; +use crate::compute::filter; use crate::compute::is_sorted; use crate::compute::take; use crate::search_sorted::SearchResult; @@ -626,8 +627,8 @@ impl Patches { } // SAFETY: filtering indices/values with same mask maintains their 1:1 relationship - let filtered_indices = self.indices.filter(filter_mask.clone())?; - let filtered_values = self.values.filter(filter_mask)?; + let filtered_indices = filter(&self.indices, &filter_mask)?; + let filtered_values = filter(&self.values, &filter_mask)?; Ok(Some(Self { array_len: self.array_len, @@ -1148,8 +1149,10 @@ fn filter_patches_with_mask( } let new_patch_indices = new_patch_indices.into_array(); - let new_patch_values = - patch_values.filter(Mask::from_indices(patch_values.len(), new_mask_indices))?; + let new_patch_values = filter( + patch_values, + &Mask::from_indices(patch_values.len(), new_mask_indices), + )?; Ok(Some(Patches::new( true_count, diff --git a/vortex-array/src/serde.rs b/vortex-array/src/serde.rs index a3aa423b177..cba1f2fd0df 100644 --- a/vortex-array/src/serde.rs +++ b/vortex-array/src/serde.rs @@ -490,10 +490,9 @@ impl ArrayParts { array_tree: ByteBuffer, segment: BufferHandle, ) -> VortexResult { - // TODO: this can also work with device buffers. - let segment = segment.try_to_host_sync()?; - // We align each buffer individually, so we remove alignment requirements on the buffer. - let segment = segment.aligned(Alignment::none()); + // We align each buffer individually, so we remove alignment requirements on the segment + // for host-resident buffers. Device buffers are sliced directly. + let segment = segment.ensure_aligned(Alignment::none())?; let fb_buffer = FlatBuffer::align_from(array_tree); @@ -504,7 +503,7 @@ impl ArrayParts { let flatbuffer_loc = fb_root._tab.loc(); let mut offset = 0; - let buffers: Arc<[_]> = fb_array + let buffers = fb_array .buffers() .unwrap_or_default() .iter() @@ -515,15 +514,13 @@ impl ArrayParts { let buffer_len = fb_buf.length() as usize; // Extract a buffer and ensure it's aligned, copying if necessary - let buffer = segment - .slice(offset..(offset + buffer_len)) - .aligned(Alignment::from_exponent(fb_buf.alignment_exponent())); - + let buffer = segment.slice(offset..(offset + buffer_len)); + let buffer = buffer + .ensure_aligned(Alignment::from_exponent(fb_buf.alignment_exponent()))?; offset += buffer_len; - BufferHandle::new_host(buffer) + Ok(buffer) }) - .collect(); - + .collect::>>()?; (flatbuffer_loc, buffers) }; diff --git a/vortex-array/src/validity.rs b/vortex-array/src/validity.rs index 1234a7bede1..838fb0e3a2a 100644 --- a/vortex-array/src/validity.rs +++ b/vortex-array/src/validity.rs @@ -181,9 +181,13 @@ impl Validity { } } - /// Keep only the entries for which the mask is true. + /// Lazily filters a [`Validity`] with a selection mask, which keeps only the entries for which + /// the mask is true. /// /// The result has length equal to the number of true values in mask. + /// + /// If the validity is a [`Validity::Array`], then this lazily wraps it in a `FilterArray` + /// instead of eagerly filtering the values immediately. pub fn filter(&self, mask: &Mask) -> VortexResult { // NOTE(ngates): we take the mask as a reference to avoid the caller cloning unnecessarily // if we happen to be NonNullable, AllValid, or AllInvalid. @@ -191,7 +195,13 @@ impl Validity { v @ (Validity::NonNullable | Validity::AllValid | Validity::AllInvalid) => { Ok(v.clone()) } - Validity::Array(arr) => Ok(Validity::Array(arr.filter(mask.clone())?)), + Validity::Array(arr) => Ok(Validity::Array( + arr.filter(mask.clone())? + // TODO(connor): This is wrong!!! We should not be eagerly decompressing the + // validity array. + .to_canonical()? + .into_array(), + )), } } diff --git a/vortex-btrblocks/src/float.rs b/vortex-btrblocks/src/float.rs index 802439f0738..795d2fb0fcb 100644 --- a/vortex-btrblocks/src/float.rs +++ b/vortex-btrblocks/src/float.rs @@ -571,7 +571,7 @@ mod tests { .display_as(DisplayOptions::MetadataOnly) .to_string() .to_lowercase(); - assert_eq!(display, "vortex.primitive(f32?, len=96)"); + assert_eq!(display, "vortex.sparse(f32?, len=96)"); Ok(()) } diff --git a/vortex-btrblocks/src/string.rs b/vortex-btrblocks/src/string.rs index b7ec69769f5..2ee8d036ea9 100644 --- a/vortex-btrblocks/src/string.rs +++ b/vortex-btrblocks/src/string.rs @@ -502,7 +502,7 @@ mod tests { .display_as(DisplayOptions::MetadataOnly) .to_string() .to_lowercase(); - assert_eq!(display, "vortex.varbinview(utf8?, len=100)"); + assert_eq!(display, "vortex.sparse(utf8?, len=100)"); Ok(()) } diff --git a/vortex-buffer/src/buffer.rs b/vortex-buffer/src/buffer.rs index ebc435ecca8..f941db6dbec 100644 --- a/vortex-buffer/src/buffer.rs +++ b/vortex-buffer/src/buffer.rs @@ -319,7 +319,10 @@ impl Buffer { let end_byte = end * size_of::(); if !begin_byte.is_multiple_of(*alignment) { - vortex_panic!("range start must be aligned to {alignment:?}"); + vortex_panic!( + "range start must be aligned to {alignment:?}, byte {}", + begin_byte + ); } if !alignment.is_aligned_to(Alignment::of::()) { vortex_panic!("Slice alignment must at least align to type T") diff --git a/vortex-cuda/Cargo.toml b/vortex-cuda/Cargo.toml index a5dcb380c81..ae66521c2d3 100644 --- a/vortex-cuda/Cargo.toml +++ b/vortex-cuda/Cargo.toml @@ -39,6 +39,7 @@ vortex-error = { workspace = true } vortex-fastlanes = { workspace = true } vortex-mask = { workspace = true } vortex-nvcomp = { path = "nvcomp" } +vortex-io = { workspace = true } vortex-session = { workspace = true } vortex-utils = { workspace = true } vortex-zigzag = { workspace = true } diff --git a/vortex-cuda/benches/dict_cuda.rs b/vortex-cuda/benches/dict_cuda.rs index 0142cb8842d..33dd123eb2f 100644 --- a/vortex-cuda/benches/dict_cuda.rs +++ b/vortex-cuda/benches/dict_cuda.rs @@ -17,6 +17,7 @@ use vortex_array::IntoArray; use vortex_array::arrays::DictArray; use vortex_array::arrays::PrimitiveArray; use vortex_array::validity::Validity::NonNullable; +use vortex_buffer::Alignment; use vortex_buffer::Buffer; use vortex_cuda::CudaBufferExt; use vortex_cuda::CudaDeviceBuffer; diff --git a/vortex-cuda/cub/Cargo.toml b/vortex-cuda/cub/Cargo.toml index 8068e49fd6b..1c92776ddb7 100644 --- a/vortex-cuda/cub/Cargo.toml +++ b/vortex-cuda/cub/Cargo.toml @@ -23,6 +23,7 @@ workspace = true libloading = { workspace = true } paste = { workspace = true } vortex-cuda-macros = { workspace = true } +vortex-dtype = { workspace = true } [build-dependencies] bindgen = { workspace = true } diff --git a/vortex-cuda/cub/build.rs b/vortex-cuda/cub/build.rs index 3d3605e8406..d5e6ae3a786 100644 --- a/vortex-cuda/cub/build.rs +++ b/vortex-cuda/cub/build.rs @@ -105,6 +105,7 @@ fn generate_rust_bindings(kernels_dir: &Path, out_dir: &Path) { .allowlist_type("cudaError_t") // Blocklist cudaStream_t and define it manually as an opaque pointer .blocklist_type("cudaStream_t") + .blocklist_type("__int256_t") // Generate dynamic library loading wrapper .dynamic_library_name("CubLibrary") .dynamic_link_require_all(true) @@ -113,6 +114,7 @@ fn generate_rust_bindings(kernels_dir: &Path, out_dir: &Path) { .raw_line("// Functions are loaded at runtime via libloading.") .raw_line("") .raw_line("pub type cudaStream_t = *mut std::ffi::c_void;") + .raw_line("pub type __int256_t = vortex_dtype::i256;") .generate() .expect("Failed to generate CUB bindings"); diff --git a/vortex-cuda/cub/kernels/filter.cu b/vortex-cuda/cub/kernels/filter.cu index 336acfbc005..18419799fc3 100644 --- a/vortex-cuda/cub/kernels/filter.cu +++ b/vortex-cuda/cub/kernels/filter.cu @@ -9,6 +9,12 @@ #include #include +// i256 type +typedef struct { + __int128_t high; + __int128_t low; +} __int256_t; + // Bit extraction functor for TransformInputIterator struct BitExtractor { const uint8_t* packed; @@ -60,6 +66,8 @@ DEFINE_TEMP_SIZE(u64, uint64_t) DEFINE_TEMP_SIZE(i64, int64_t) DEFINE_TEMP_SIZE(f32, float) DEFINE_TEMP_SIZE(f64, double) +DEFINE_TEMP_SIZE(i128, __int128_t) +DEFINE_TEMP_SIZE(i256, __int256_t) // CUB DeviceSelect::Flagged - Execute filter with byte mask (one byte per element) template @@ -100,6 +108,8 @@ DEFINE_FILTER_BYTEMASK(u64, uint64_t) DEFINE_FILTER_BYTEMASK(i64, int64_t) DEFINE_FILTER_BYTEMASK(f32, float) DEFINE_FILTER_BYTEMASK(f64, double) +DEFINE_FILTER_BYTEMASK(i128, __int128_t) +DEFINE_FILTER_BYTEMASK(i256, __int256_t) // CUB DeviceSelect::Flagged - Execute filter with bit mask (one bit per element) // @@ -161,3 +171,5 @@ DEFINE_FILTER_BITMASK(u64, uint64_t) DEFINE_FILTER_BITMASK(i64, int64_t) DEFINE_FILTER_BITMASK(f32, float) DEFINE_FILTER_BITMASK(f64, double) +DEFINE_FILTER_BITMASK(i128, __int128_t) +DEFINE_FILTER_BITMASK(i256, __int256_t) diff --git a/vortex-cuda/cub/kernels/filter.h b/vortex-cuda/cub/kernels/filter.h index a5e0e5cc304..45458e02985 100644 --- a/vortex-cuda/cub/kernels/filter.h +++ b/vortex-cuda/cub/kernels/filter.h @@ -9,6 +9,12 @@ #include #include +// i256 type +typedef struct { + __int128_t high; + __int128_t low; +} __int256_t; + // CUDA types - defined as opaque for bindgen typedef int cudaError_t; typedef void* cudaStream_t; @@ -28,7 +34,9 @@ extern "C" { X(u64, uint64_t) \ X(i64, int64_t) \ X(f32, float) \ - X(f64, double) + X(f64, double) \ + X(i128, __int128_t) \ + X(i256, __int256_t) // Filter temp size query functions #define DECLARE_FILTER_TEMP_SIZE(suffix, c_type) \ diff --git a/vortex-cuda/cub/src/filter.rs b/vortex-cuda/cub/src/filter.rs index 9df339a8a19..4159285481a 100644 --- a/vortex-cuda/cub/src/filter.rs +++ b/vortex-cuda/cub/src/filter.rs @@ -116,7 +116,7 @@ macro_rules! impl_filter { } } - #[doc = "Get the temporary storage size required for filtering `" $ty "` elements."] + #[doc = "Get the temporary storage size required for filtering elements."] pub fn [](num_items: i64) -> Result { let lib = cub_library()?; let mut temp_bytes: usize = 0; @@ -125,7 +125,7 @@ macro_rules! impl_filter { Ok(temp_bytes) } - #[doc = "Filter `" $ty "` elements using a byte mask (one byte per element)."] + #[doc = "Filter elements using a byte mask (one byte per element)."] /// /// # Safety /// @@ -162,7 +162,7 @@ macro_rules! impl_filter { check_cuda_error(err, concat!("filter_bytemask_", stringify!($suffix))) } - #[doc = "Filter `" $ty "` elements using a bit mask (one bit per element)."] + #[doc = "Filter elements using a bit mask (one bit per element)."] /// /// This version accepts packed bits directly, avoiding the need to expand /// bits to bytes in a separate kernel. @@ -219,4 +219,6 @@ impl_filter! { i64 => i64, f32 => f32, f64 => f64, + i128 => i128, + i256 => vortex_dtype::i256, } diff --git a/vortex-cuda/nvcomp/.gitignore b/vortex-cuda/nvcomp/.gitignore new file mode 100644 index 00000000000..db872fac1e0 --- /dev/null +++ b/vortex-cuda/nvcomp/.gitignore @@ -0,0 +1 @@ +sdk/ diff --git a/vortex-cuda/src/canonical.rs b/vortex-cuda/src/canonical.rs index 9270536dca4..dfc2184e078 100644 --- a/vortex-cuda/src/canonical.rs +++ b/vortex-cuda/src/canonical.rs @@ -1,15 +1,27 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors +use std::sync::Arc; + use async_trait::async_trait; +use futures::future::try_join_all; +use vortex_array::Array; use vortex_array::Canonical; +use vortex_array::IntoArray; +use vortex_array::arrays::BinaryView; use vortex_array::arrays::BoolArray; use vortex_array::arrays::BoolArrayParts; use vortex_array::arrays::DecimalArray; use vortex_array::arrays::DecimalArrayParts; use vortex_array::arrays::PrimitiveArray; use vortex_array::arrays::PrimitiveArrayParts; +use vortex_array::arrays::StructArray; +use vortex_array::arrays::StructArrayParts; +use vortex_array::arrays::VarBinViewArray; +use vortex_array::arrays::VarBinViewArrayParts; use vortex_array::buffer::BufferHandle; +use vortex_buffer::Buffer; +use vortex_buffer::ByteBuffer; use vortex_error::VortexResult; /// Move all canonical data from to_host from device. @@ -24,6 +36,29 @@ pub trait CanonicalCudaExt { impl CanonicalCudaExt for Canonical { async fn into_host(self) -> VortexResult { match self { + Canonical::Struct(struct_array) => { + // Children should all be canonical now + let len = struct_array.len(); + let StructArrayParts { + fields, + struct_fields, + validity, + .. + } = struct_array.into_parts(); + + // TODO(aduffy): try_join_all + let mut host_fields = vec![]; + for field in fields.iter().cloned() { + host_fields.push(field.to_canonical()?.into_host().await?.into_array()); + } + + Ok(Canonical::Struct(StructArray::new( + struct_fields.names().clone(), + host_fields, + len, + validity, + ))) + } n @ Canonical::Null(_) => Ok(n), Canonical::Bool(bool) => { // NOTE: update to copy to host when adding buffer handle. @@ -69,7 +104,32 @@ impl CanonicalCudaExt for Canonical { ) })) } - _ => todo!(), + Canonical::VarBinView(varbinview) => { + let VarBinViewArrayParts { + views, + buffers, + validity, + dtype, + } = varbinview.into_parts(); + + // Copy all device views to host + let host_views = views.try_into_host()?.await?; + let host_views = Buffer::::from_byte_buffer(host_views); + + // Copy any string data buffers back over to the host + let host_buffers = buffers + .iter() + .cloned() + .map(|b| b.try_into_host()) + .collect::>>()?; + let host_buffers = try_join_all(host_buffers).await?; + let host_buffers: Arc<[ByteBuffer]> = Arc::from(host_buffers); + + Ok(Canonical::VarBinView(unsafe { + VarBinViewArray::new_unchecked(host_views, host_buffers, dtype, validity) + })) + } + c => todo!("{} not implemented", c.dtype()), } } } diff --git a/vortex-cuda/src/device_buffer.rs b/vortex-cuda/src/device_buffer.rs index 2cc6517324d..1bc327c712c 100644 --- a/vortex-cuda/src/device_buffer.rs +++ b/vortex-cuda/src/device_buffer.rs @@ -1,6 +1,7 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright the Vortex contributors +use std::cmp::min; use std::fmt::Debug; use std::ops::Range; use std::sync::Arc; @@ -16,6 +17,7 @@ use vortex_array::buffer::DeviceBuffer; use vortex_buffer::Alignment; use vortex_buffer::BufferMut; use vortex_buffer::ByteBuffer; +use vortex_error::VortexExpect; use vortex_error::VortexResult; use vortex_error::vortex_err; @@ -27,11 +29,17 @@ pub struct CudaDeviceBuffer { offset: usize, len: usize, device_ptr: u64, + alignment: Alignment, } impl CudaDeviceBuffer { /// Creates a new CUDA device buffer from a [`CudaSlice`]. pub fn new(cuda_slice: CudaSlice) -> Self { + Self::new_aligned(cuda_slice, Alignment::of::()) + } + + pub fn new_aligned(cuda_slice: CudaSlice, alignment: Alignment) -> Self { + assert!(alignment.is_aligned_to(Alignment::of::())); let len = cuda_slice.len(); let device_ptr = cuda_slice.device_ptr(cuda_slice.stream()).0; @@ -40,6 +48,7 @@ impl CudaDeviceBuffer { offset: 0, len, device_ptr, + alignment, } } @@ -109,6 +118,10 @@ impl DeviceBuffer for CudaDeviceBuffer self.len * size_of::() } + fn alignment(&self) -> Alignment { + self.alignment + } + /// Synchronous copy of CUDA device to host memory. /// /// The copy is not started before other operations on the streams are completed. @@ -185,6 +198,19 @@ impl DeviceBuffer for CudaDeviceBuffer fn slice(&self, range: Range) -> Arc { let new_offset = self.offset + range.start; let new_len = range.end - range.start; + let byte_offset = new_offset * size_of::(); + let alignment = if byte_offset == 0 { + self.alignment + } else { + // TODO(joe): self.alignment is an under approx + min( + self.alignment, + Alignment::from_exponent( + u8::try_from((self.device_ptr + byte_offset as u64).trailing_zeros()) + .vortex_expect("impossible"), + ), + ) + }; assert!( range.end <= self.len, @@ -198,6 +224,7 @@ impl DeviceBuffer for CudaDeviceBuffer offset: new_offset, len: new_len, device_ptr: self.device_ptr, + alignment, }) } diff --git a/vortex-cuda/src/executor.rs b/vortex-cuda/src/executor.rs index 63b2af86675..6f2f6b5e921 100644 --- a/vortex-cuda/src/executor.rs +++ b/vortex-cuda/src/executor.rs @@ -10,25 +10,21 @@ use cudarc::driver::CudaEvent; use cudarc::driver::CudaFunction; use cudarc::driver::CudaSlice; use cudarc::driver::CudaStream; -use cudarc::driver::DevicePtrMut; use cudarc::driver::DeviceRepr; use cudarc::driver::LaunchArgs; -use cudarc::driver::result::memcpy_htod_async; use futures::future::BoxFuture; use vortex_array::Array; use vortex_array::ArrayRef; use vortex_array::Canonical; use vortex_array::ExecutionCtx; use vortex_array::buffer::BufferHandle; -use vortex_buffer::Buffer; use vortex_dtype::PType; use vortex_error::VortexResult; use vortex_error::vortex_err; -use crate::CudaDeviceBuffer; use crate::CudaSession; use crate::session::CudaSessionExt; -use crate::stream::await_stream_callback; +use crate::stream::VortexCudaStream; /// CUDA kernel events recorded before and after kernel launch. #[derive(Debug)] @@ -53,14 +49,14 @@ impl CudaKernelEvents { /// Provides access to the CUDA context and stream for kernel execution. /// Handles memory allocation and data transfers between host and device. pub struct CudaExecutionCtx { - stream: Arc, + stream: VortexCudaStream, ctx: ExecutionCtx, cuda_session: CudaSession, } impl CudaExecutionCtx { /// Creates a new CUDA execution context. - pub(crate) fn new(stream: Arc, ctx: ExecutionCtx) -> Self { + pub(crate) fn new(stream: VortexCudaStream, ctx: ExecutionCtx) -> Self { let cuda_session = ctx.session().cuda_session().clone(); Self { stream, @@ -69,24 +65,6 @@ impl CudaExecutionCtx { } } - /// Allocates a typed buffer on the GPU. - /// - /// Note: Allocation is async in case the CUDA driver supports this. - /// - /// The condition for alloc to be async is support for memory pools: - /// `CU_DEVICE_ATTRIBUTE_MEMORY_POOLS_SUPPORTED`. - /// - /// Any kernel submitted to the stream after alloc can safely use the - /// memory, as operations on the stream are ordered sequentially. - pub fn device_alloc(&self, len: usize) -> VortexResult> { - // SAFETY: No safety guarantees for allocations on the GPU. - unsafe { - self.stream - .alloc::(len) - .map_err(|e| vortex_err!("Failed to allocate device memory: {}", e)) - } - } - /// Loads a CUDA kernel function by module name and ptype(s). /// /// # Arguments @@ -143,22 +121,15 @@ impl CudaExecutionCtx { /// /// * `func` - CUDA kernel function to launch pub fn launch_builder<'a>(&'a self, func: &'a CudaFunction) -> LaunchArgs<'a> { - self.stream.launch_builder(func) + self.stream.0.launch_builder(func) } - /// Copies host data to the device asynchronously. - /// - /// Allocates device memory, schedules an async copy, and returns a future - /// that completes when the copy is finished. The source data is moved into - /// the future to ensure it remains valid until the copy completes. - /// - /// # Arguments - /// - /// * `data` - The host data to copy. - /// - /// # Returns - /// - /// A future that resolves to the device buffer handle when the copy completes. + /// See `VortexCudaStream::device_alloc`. + pub fn device_alloc(&self, len: usize) -> VortexResult> { + self.stream.device_alloc(len) + } + + /// See `VortexCudaStream::copy_to_device`. pub fn copy_to_device( &self, data: D, @@ -167,52 +138,20 @@ impl CudaExecutionCtx { T: DeviceRepr + Send + Sync + 'static, D: AsRef<[T]> + Send + 'static, { - let host_slice: &[T] = data.as_ref(); - let mut cuda_slice: CudaSlice = self.device_alloc(host_slice.len())?; - let device_ptr = cuda_slice.device_ptr_mut(&self.stream).0; - - unsafe { - memcpy_htod_async(device_ptr, host_slice, self.stream.cu_stream()) - .map_err(|e| vortex_err!("Failed to schedule async copy to device: {}", e))?; - } - - let cuda_buf = CudaDeviceBuffer::new(cuda_slice); - let stream = Arc::clone(&self.stream); - - Ok(Box::pin(async move { - await_stream_callback(&stream).await?; - - // Keep source memory alive until copy completes. - let _keep_alive = data; - - Ok(BufferHandle::new_device(Arc::new(cuda_buf))) - })) + self.stream.copy_to_device(data) } - /// Moves a host buffer handle to the device asynchronously. - /// - /// # Arguments - /// - /// * `handle` - The host buffer to move. Must be a host buffer. - /// - /// # Returns - /// - /// A future that resolves to the device buffer handle when the copy completes. + /// See `VortexCudaStream::move_to_device`. pub fn move_to_device( &self, handle: BufferHandle, ) -> VortexResult>> { - let host_buffer = handle - .as_host_opt() - .ok_or_else(|| vortex_err!("Buffer is not on host"))?; - - let buffer: Buffer = Buffer::from_byte_buffer(host_buffer.clone()); - self.copy_to_device(buffer) + self.stream.move_to_device::(handle) } /// Returns a reference to the underlying CUDA stream. pub fn stream(&self) -> &Arc { - &self.stream + &self.stream.0 } } @@ -266,6 +205,7 @@ impl CudaArrayExt for ArrayRef { impl CudaExecutionCtx { pub fn synchronize_stream(&self) -> VortexResult<()> { self.stream + .0 .synchronize() .map_err(|e| vortex_err!("cuda error: {e}")) } diff --git a/vortex-cuda/src/host_to_device_allocator.rs b/vortex-cuda/src/host_to_device_allocator.rs new file mode 100644 index 00000000000..4c15c5a5b87 --- /dev/null +++ b/vortex-cuda/src/host_to_device_allocator.rs @@ -0,0 +1,67 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use std::sync::Arc; + +use futures::FutureExt; +use futures::future::BoxFuture; +use vortex_array::buffer::BufferHandle; +use vortex_buffer::Alignment; +use vortex_error::VortexResult; +use vortex_io::CoalesceConfig; +use vortex_io::VortexReadAt; + +use crate::stream::VortexCudaStream; + +/// A wrapper that uses an allocator to produce the returned buffer handle. +#[derive(Clone)] +pub struct CopyDeviceReadAt { + read: T, + stream: VortexCudaStream, +} + +impl CopyDeviceReadAt { + pub fn new(read: T, stream: VortexCudaStream) -> Self { + Self { read, stream } + } +} + +impl VortexReadAt for CopyDeviceReadAt { + fn uri(&self) -> Option<&Arc> { + self.read.uri() + } + + fn coalesce_config(&self) -> Option { + self.read.coalesce_config() + } + + fn concurrency(&self) -> usize { + self.read.concurrency() + } + + fn size(&self) -> BoxFuture<'static, VortexResult> { + self.read.size() + } + + fn read_at( + &self, + offset: u64, + length: usize, + alignment: Alignment, + ) -> BoxFuture<'static, VortexResult> { + println!("read at cuda"); + let read = self.read.clone(); + let stream = self.stream.clone(); + async move { + let handle = read.read_at(offset, length, alignment).await?; + if handle.is_on_device() { + return Ok(handle); + } + + let host_buffer = handle.as_host().clone(); + + stream.copy_to_device(host_buffer)?.await + } + .boxed() + } +} diff --git a/vortex-cuda/src/kernel/filter.rs b/vortex-cuda/src/kernel/filter.rs deleted file mode 100644 index a43aa9c3714..00000000000 --- a/vortex-cuda/src/kernel/filter.rs +++ /dev/null @@ -1,263 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 -// SPDX-FileCopyrightText: Copyright the Vortex contributors - -//! GPU filter implementation using CUB DeviceSelect::Flagged. - -use std::ffi::c_void; -use std::sync::Arc; - -use async_trait::async_trait; -use cudarc::driver::CudaSlice; -use cudarc::driver::DevicePtr; -use cudarc::driver::DevicePtrMut; -use cudarc::driver::DeviceRepr; -use vortex_array::ArrayRef; -use vortex_array::Canonical; -use vortex_array::arrays::FilterVTable; -use vortex_array::arrays::PrimitiveArray; -use vortex_array::buffer::BufferHandle; -use vortex_cub::filter::CubFilterable; -use vortex_cub::filter::cudaStream_t; -use vortex_cuda_macros::cuda_tests; -use vortex_dtype::NativePType; -use vortex_dtype::match_each_native_simd_ptype; -use vortex_error::VortexResult; -use vortex_error::vortex_err; -use vortex_mask::Mask; -use vortex_mask::MaskValues; - -use crate::CudaDeviceBuffer; -use crate::executor::CudaArrayExt; -use crate::executor::CudaExecute; -use crate::executor::CudaExecutionCtx; -use crate::stream::await_stream_callback; - -/// CUDA executor for FilterArray using CUB DeviceSelect::Flagged. -#[derive(Debug)] -pub struct FilterExecutor; - -#[async_trait] -impl CudaExecute for FilterExecutor { - async fn execute( - &self, - array: ArrayRef, - ctx: &mut CudaExecutionCtx, - ) -> VortexResult { - let filter_array = array - .as_opt::() - .ok_or_else(|| vortex_err!("Expected FilterArray"))?; - - let mask = filter_array.filter_mask(); - - // Early return for trivial cases. - match mask { - Mask::AllTrue(_) => { - return filter_array.child().clone().execute_cuda(ctx).await; - } - Mask::AllFalse(_) => { - return Ok(Canonical::empty(filter_array.dtype())); - } - _ => {} - } - - let mask_values = mask - .values() - .ok_or_else(|| vortex_err!("Expected Mask::Values but got different variant"))?; - - let canonical = filter_array.child().clone().execute_cuda(ctx).await?; - - match canonical { - Canonical::Primitive(ref prim) => { - match_each_native_simd_ptype!(prim.ptype(), |T| { - filter_primitive::(prim, mask_values, mask, ctx).await - }) - } - _ => unimplemented!(), - } - } -} - -async fn filter_primitive( - array: &PrimitiveArray, - mask_values: &MaskValues, - mask: &Mask, - ctx: &mut CudaExecutionCtx, -) -> VortexResult -where - T: NativePType + DeviceRepr + CubFilterable + Send + Sync + 'static, -{ - let ptype = array.ptype(); - let num_items = array.len() as i64; - let output_len = mask_values.true_count(); - - if output_len == 0 { - return Ok(Canonical::empty(array.dtype())); - } - - let input_handle = array.buffer_handle(); - let d_input: BufferHandle = if input_handle.is_on_device() { - input_handle.clone() - } else { - ctx.move_to_device::(input_handle.clone())?.await? - }; - - // Upload packed bits to device. They are unpacked to bytes in the filter kernel. - let bit_buffer = mask_values.bit_buffer(); - let packed = bit_buffer.inner().as_ref(); - let bit_offset = bit_buffer.offset() as u64; - let d_packed_flags = ctx.copy_to_device(packed.to_vec())?.await?; - - let temp_bytes = T::get_temp_size(num_items) - .map_err(|e| vortex_err!("CUB filter_temp_size failed: {}", e))?; - - // Allocate device buffers. - let d_temp: CudaSlice = ctx.device_alloc(temp_bytes.max(1))?; - let mut d_output: CudaSlice = ctx.device_alloc(output_len)?; - let mut d_num_selected: CudaSlice = ctx.device_alloc(1)?; - - // Get raw pointers for FFI. - let stream = ctx.stream(); - let stream_ptr = stream.cu_stream() as cudaStream_t; - - // Downcast input buffer to get device pointer. - let d_input_cuda = d_input - .as_device() - .as_any() - .downcast_ref::>() - .ok_or_else(|| vortex_err!("Expected CudaDeviceBuffer for input"))?; - let d_input_ptr = d_input_cuda.as_view().device_ptr(stream).0 as *const T; - - // Downcast to get device pointer. - let d_packed_cuda = d_packed_flags - .as_device() - .as_any() - .downcast_ref::>() - .ok_or_else(|| vortex_err!("Expected CudaDeviceBuffer for packed flags"))?; - let d_packed_ptr = d_packed_cuda.as_view().device_ptr(stream).0 as *const u8; - - let d_temp_ptr = d_temp.device_ptr(stream).0 as *mut c_void; - let d_output_ptr = d_output.device_ptr_mut(stream).0 as *mut T; - let d_num_selected_ptr = d_num_selected.device_ptr_mut(stream).0 as *mut i64; - - // CUB uses TransformInputIterator internally to read bits on-the-fly. - unsafe { - T::filter_bitmask( - d_temp_ptr, - temp_bytes, - d_input_ptr, - d_packed_ptr, - bit_offset, - d_output_ptr, - d_num_selected_ptr, - num_items, - stream_ptr, - ) - .map_err(|e| vortex_err!("CUB filter_bitmask failed: {}", e))?; - } - - // Wait for completion - await_stream_callback(stream).await?; - - let filtered_validity = array.validity()?.filter(mask)?; - let output_handle = BufferHandle::new_device(Arc::new(CudaDeviceBuffer::new(d_output))); - - Ok(Canonical::Primitive(PrimitiveArray::from_buffer_handle( - output_handle, - ptype, - filtered_validity, - ))) -} - -#[cuda_tests] -mod tests { - use rstest::rstest; - use vortex_array::IntoArray; - use vortex_array::arrays::FilterArray; - use vortex_array::assert_arrays_eq; - use vortex_error::VortexExpect; - use vortex_session::VortexSession; - - use super::*; - use crate::CanonicalCudaExt; - use crate::session::CudaSession; - - #[rstest] - #[case::i32_sparse( - PrimitiveArray::from_iter([1i32, 2, 3, 4, 5, 6, 7, 8]), - Mask::from_iter([true, false, true, false, true, false, true, false]) - )] - #[case::i32_dense( - PrimitiveArray::from_iter([10i32, 20, 30, 40, 50]), - Mask::from_iter([true, true, true, false, true]) - )] - #[case::i64_large( - PrimitiveArray::from_iter((0..1000i64).collect::>()), - Mask::from_iter((0..1000).map(|i| i % 3 == 0)) - )] - #[case::f64_values( - PrimitiveArray::from_iter([1.1f64, 2.2, 3.3, 4.4, 5.5]), - Mask::from_iter([false, true, false, true, false]) - )] - #[case::u8_all_true( - PrimitiveArray::from_iter([1u8, 2, 3, 4, 5]), - Mask::from_iter([true, true, true, true, true]) - )] - #[case::u32_all_false( - PrimitiveArray::from_iter([1u32, 2, 3, 4, 5]), - Mask::from_iter([false, false, false, false, false]) - )] - #[tokio::test] - async fn test_gpu_filter( - #[case] input: PrimitiveArray, - #[case] mask: Mask, - ) -> VortexResult<()> { - let mut cuda_ctx = CudaSession::create_execution_ctx(&VortexSession::empty()) - .vortex_expect("failed to create CUDA execution context"); - - let filter_array = FilterArray::try_new(input.clone().into_array(), mask.clone())?; - - let cpu_result = filter_array.to_canonical()?.into_array(); - - let gpu_result = FilterExecutor - .execute(filter_array.into_array(), &mut cuda_ctx) - .await - .vortex_expect("GPU filter failed") - .into_host() - .await? - .into_array(); - - assert_arrays_eq!(cpu_result, gpu_result); - - Ok(()) - } - - #[tokio::test] - async fn test_gpu_filter_large_array() -> VortexResult<()> { - let mut cuda_ctx = CudaSession::create_execution_ctx(&VortexSession::empty()) - .vortex_expect("failed to create CUDA execution context"); - - // Create a large array to test multi-block execution - let data: Vec = (0..100_000).collect(); - let input = PrimitiveArray::from_iter(data); - - // Select every 7th element - let mask = Mask::from_iter((0..100_000).map(|i| i % 7 == 0)); - - let filter_array = FilterArray::try_new(input.into_array(), mask)?; - - let cpu_result = filter_array.to_canonical()?.into_array(); - - let gpu_result = FilterExecutor - .execute(filter_array.into_array(), &mut cuda_ctx) - .await - .vortex_expect("GPU filter failed") - .into_host() - .await? - .into_array(); - - assert_eq!(cpu_result.len(), gpu_result.len()); - assert_arrays_eq!(cpu_result, gpu_result); - - Ok(()) - } -} diff --git a/vortex-cuda/src/kernel/filter/decimal.rs b/vortex-cuda/src/kernel/filter/decimal.rs new file mode 100644 index 00000000000..4cc6ab3340c --- /dev/null +++ b/vortex-cuda/src/kernel/filter/decimal.rs @@ -0,0 +1,142 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use cudarc::driver::DeviceRepr; +use vortex_array::Canonical; +use vortex_array::arrays::DecimalArray; +use vortex_array::arrays::DecimalArrayParts; +use vortex_cub::filter::CubFilterable; +use vortex_cuda_macros::cuda_tests; +use vortex_dtype::NativeDecimalType; +use vortex_error::VortexResult; +use vortex_mask::Mask; + +use crate::CudaExecutionCtx; +use crate::kernel::filter::filter_sized; + +pub(super) async fn filter_decimal( + array: DecimalArray, + mask: Mask, + ctx: &mut CudaExecutionCtx, +) -> VortexResult { + let DecimalArrayParts { + values, + validity, + decimal_dtype, + .. + } = array.into_parts(); + + let filtered_validity = validity.filter(&mask)?; + let filtered_values = filter_sized::(values, mask, ctx).await?; + + Ok(Canonical::Decimal(DecimalArray::new_handle( + filtered_values, + D::DECIMAL_TYPE, + decimal_dtype, + filtered_validity, + ))) +} + +#[cuda_tests] +mod tests { + use rstest::rstest; + use vortex_array::IntoArray; + use vortex_array::arrays::DecimalArray; + use vortex_array::arrays::FilterArray; + use vortex_array::assert_arrays_eq; + use vortex_dtype::DecimalDType; + use vortex_dtype::i256; + use vortex_error::VortexExpect; + use vortex_error::VortexResult; + use vortex_mask::Mask; + use vortex_session::VortexSession; + + use crate::CanonicalCudaExt; + use crate::FilterExecutor; + use crate::executor::CudaExecute; + use crate::session::CudaSession; + + #[rstest] + #[case::i32_sparse( + DecimalArray::from_iter([1i32, 2, 3, 4, 5, 6, 7, 8], DecimalDType::new(19, 5)), + Mask::from_iter([true, false, true, false, true, false, true, false]) + )] + #[case::i32_dense( + DecimalArray::from_iter([10i32, 20, 30, 40, 50], DecimalDType::new(19, 5)), + Mask::from_iter([true, true, true, false, true]) + )] + #[case::i64_large( + DecimalArray::from_iter(0..1000i64, DecimalDType::new(19, 5)), + Mask::from_iter((0..1000).map(|i| i % 3 == 0)) + )] + #[case::i8_all_true( + DecimalArray::from_iter([1i8, 2, 3, 4, 5], DecimalDType::new(19, 5)), + Mask::from_iter([true, true, true, true, true]) + )] + #[case::i32_all_false( + DecimalArray::from_iter([1i32, 2, 3, 4, 5], DecimalDType::new(19, 5)), + Mask::from_iter([false, false, false, false, false]) + )] + #[case::i128_values( + DecimalArray::from_iter([1i128, 2, 3, 4, 5], DecimalDType::new(19, 5)), + Mask::from_iter([false, true, false, true, false]) + )] + #[case::i256_values( + DecimalArray::from_iter([i256::from_i128(1), i256::from_i128(2), i256::from_i128(3), i256::from_i128(4), i256::from_i128(5)], DecimalDType::new(19, 5)), + Mask::from_iter([false, true, false, true, false]) + )] + #[tokio::test] + async fn test_gpu_filter_decimal( + #[case] input: DecimalArray, + #[case] mask: Mask, + ) -> VortexResult<()> { + let mut cuda_ctx = CudaSession::create_execution_ctx(&VortexSession::empty()) + .vortex_expect("failed to create CUDA execution context"); + + let filter_array = FilterArray::try_new(input.clone().into_array(), mask.clone())?; + + let cpu_result = filter_array.to_canonical()?.into_array(); + + let gpu_result = FilterExecutor + .execute(filter_array.into_array(), &mut cuda_ctx) + .await + .vortex_expect("GPU filter failed") + .into_host() + .await? + .into_array(); + + assert_arrays_eq!(cpu_result, gpu_result); + + Ok(()) + } + + #[tokio::test] + async fn test_gpu_filter_decimal_large_array() -> VortexResult<()> { + let mut cuda_ctx = CudaSession::create_execution_ctx(&VortexSession::empty()) + .vortex_expect("failed to create CUDA execution context"); + + // Create a large array to test multi-block execution + let data: Vec = (0..100_000).collect(); + let input = DecimalArray::from_iter(data, DecimalDType::new(19, 5)); + + // Select every 7th element + let mask = Mask::from_iter((0..100_000).map(|i| i % 7 == 0)); + + let filter_array = FilterArray::try_new(input.into_array(), mask)?; + + let cpu_result = filter_array.to_canonical()?.into_array(); + + let gpu_result = FilterExecutor + .execute(filter_array.into_array(), &mut cuda_ctx) + .await + .vortex_expect("GPU filter failed") + .into_host() + .await? + .into_array(); + + assert_eq!(cpu_result.len(), gpu_result.len()); + assert_arrays_eq!(cpu_result, gpu_result); + + Ok(()) + } +} diff --git a/vortex-cuda/src/kernel/filter/mod.rs b/vortex-cuda/src/kernel/filter/mod.rs new file mode 100644 index 00000000000..3a38111546a --- /dev/null +++ b/vortex-cuda/src/kernel/filter/mod.rs @@ -0,0 +1,164 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +//! GPU filter implementation using CUB DeviceSelect::Flagged. + +mod decimal; +mod primitive; +mod varbinview; + +use std::ffi::c_void; +use std::sync::Arc; + +use async_trait::async_trait; +use cudarc::driver::DevicePtr; +use cudarc::driver::DevicePtrMut; +use cudarc::driver::DeviceRepr; +use vortex_array::ArrayRef; +use vortex_array::Canonical; +use vortex_array::arrays::FilterArrayParts; +use vortex_array::arrays::FilterVTable; +use vortex_array::buffer::BufferHandle; +use vortex_cub::filter::CubFilterable; +use vortex_cub::filter::cudaStream_t; +use vortex_dtype::match_each_decimal_value_type; +use vortex_dtype::match_each_native_simd_ptype; +use vortex_error::VortexResult; +use vortex_error::vortex_err; +use vortex_mask::Mask; + +use crate::CudaDeviceBuffer; +use crate::executor::CudaArrayExt; +use crate::executor::CudaExecute; +use crate::executor::CudaExecutionCtx; +use crate::kernel::filter::decimal::filter_decimal; +use crate::kernel::filter::primitive::filter_primitive; +use crate::kernel::filter::varbinview::filter_varbinview; +use crate::stream::await_stream_callback; + +/// CUDA executor for FilterArray using CUB DeviceSelect::Flagged. +#[derive(Debug)] +pub struct FilterExecutor; + +#[async_trait] +impl CudaExecute for FilterExecutor { + async fn execute( + &self, + array: ArrayRef, + ctx: &mut CudaExecutionCtx, + ) -> VortexResult { + let filter_array = array + .try_into::() + .map_err(|_| vortex_err!("Expected FilterArray"))?; + + let FilterArrayParts { child, mask } = filter_array.into_parts(); + + // Early return for trivial cases. + match mask { + Mask::AllTrue(_) => { + // No data filtered => execute child without any post-processing + child.execute_cuda(ctx).await + } + Mask::AllFalse(_) => { + // All data filtered => empty canonical + Ok(Canonical::empty(child.dtype())) + } + m @ Mask::Values(_) => { + let canonical = child.execute_cuda(ctx).await?; + match canonical { + Canonical::Primitive(prim) => { + match_each_native_simd_ptype!(prim.ptype(), |T| { + filter_primitive::(prim, m, ctx).await + }) + } + Canonical::Decimal(decimal) => { + match_each_decimal_value_type!(decimal.values_type(), |D| { + filter_decimal::(decimal, m, ctx).await + }) + } + Canonical::VarBinView(varbinview) => { + filter_varbinview(varbinview, m, ctx).await + } + _ => unimplemented!(), + } + } + } + } +} + +async fn filter_sized( + input: BufferHandle, + mask: Mask, + ctx: &mut CudaExecutionCtx, +) -> VortexResult { + // Return a buffer handle back once this has completed. + let d_input = if input.is_on_device() { + input + } else { + ctx.move_to_device::(input)?.await? + }; + + // Construct the inputs for the cub::DeviceSelect::Flagged call. + let output_len = mask.true_count(); + let (offset, len, flags) = mask.into_bit_buffer().into_inner(); + + let d_flags = ctx.copy_to_device(flags.to_vec())?.await?; + + let offset = offset as u64; + let len = len as i64; + + let temp_bytes = + T::get_temp_size(len).map_err(|e| vortex_err!("CUB filter_temp_size failed: {}", e))?; + + // Allocate device buffers for input, output, mask, and temp space + let d_temp = ctx.device_alloc::(temp_bytes.max(1))?; + let mut d_output = ctx.device_alloc::(output_len)?; + let mut d_num_selected = ctx.device_alloc::(1)?; + // Get raw pointers for FFI. + let stream = ctx.stream(); + let stream_ptr = stream.cu_stream() as cudaStream_t; + + // Downcast input buffer to get device pointer. + let d_input_cuda = d_input + .as_device() + .as_any() + .downcast_ref::>() + .ok_or_else(|| vortex_err!("Expected CudaDeviceBuffer for input"))?; + let d_input_ptr = d_input_cuda.as_view().device_ptr(stream).0 as *const T; + + // Downcast to get device pointer. + let d_packed_cuda = d_flags + .as_device() + .as_any() + .downcast_ref::>() + .ok_or_else(|| vortex_err!("Expected CudaDeviceBuffer for packed flags"))?; + let d_packed_ptr = d_packed_cuda.as_view().device_ptr(stream).0 as *const u8; + + let d_temp_ptr = d_temp.device_ptr(stream).0 as *mut c_void; + let d_output_ptr = d_output.device_ptr_mut(stream).0 as *mut T; + let d_num_selected_ptr = d_num_selected.device_ptr_mut(stream).0 as *mut i64; + + // CUB uses TransformInputIterator internally to read bits on-the-fly. + unsafe { + T::filter_bitmask( + d_temp_ptr, + temp_bytes, + d_input_ptr, + d_packed_ptr, + offset, + d_output_ptr, + d_num_selected_ptr, + len, + stream_ptr, + ) + .map_err(|e| vortex_err!("CUB filter_bitmask failed: {}", e))?; + } + + // Wait for completion + await_stream_callback(stream).await?; + + // Wrap the device buffer of outputs back up into a BufferHandle. + Ok(BufferHandle::new_device(Arc::new(CudaDeviceBuffer::new( + d_output, + )))) +} diff --git a/vortex-cuda/src/kernel/filter/primitive.rs b/vortex-cuda/src/kernel/filter/primitive.rs new file mode 100644 index 00000000000..88270b7df9f --- /dev/null +++ b/vortex-cuda/src/kernel/filter/primitive.rs @@ -0,0 +1,136 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use cudarc::driver::DeviceRepr; +use vortex_array::Canonical; +use vortex_array::arrays::PrimitiveArray; +use vortex_array::arrays::PrimitiveArrayParts; +use vortex_cub::filter::CubFilterable; +use vortex_cuda_macros::cuda_tests; +use vortex_dtype::NativePType; +use vortex_error::VortexResult; +use vortex_mask::Mask; + +use crate::CudaExecutionCtx; +use crate::kernel::filter::filter_sized; + +/// Execute a filter operation over the primitive array on a GPU. +pub(super) async fn filter_primitive( + array: PrimitiveArray, + mask: Mask, + ctx: &mut CudaExecutionCtx, +) -> VortexResult +where + T: NativePType + DeviceRepr + CubFilterable + Send + Sync + 'static, +{ + let PrimitiveArrayParts { + buffer, validity, .. + } = array.into_parts(); + + let filtered_validity = validity.filter(&mask)?; + let filtered_values = filter_sized::(buffer, mask, ctx).await?; + + Ok(Canonical::Primitive(PrimitiveArray::from_buffer_handle( + filtered_values, + T::PTYPE, + filtered_validity, + ))) +} + +#[cuda_tests] +mod tests { + use rstest::rstest; + use vortex_array::IntoArray; + use vortex_array::arrays::FilterArray; + use vortex_array::arrays::PrimitiveArray; + use vortex_array::assert_arrays_eq; + use vortex_error::VortexExpect; + use vortex_error::VortexResult; + use vortex_mask::Mask; + use vortex_session::VortexSession; + + use crate::CanonicalCudaExt; + use crate::FilterExecutor; + use crate::executor::CudaExecute; + use crate::session::CudaSession; + + #[rstest] + #[case::i32_sparse( + PrimitiveArray::from_iter([1i32, 2, 3, 4, 5, 6, 7, 8]), + Mask::from_iter([true, false, true, false, true, false, true, false]) + )] + #[case::i32_dense( + PrimitiveArray::from_iter([10i32, 20, 30, 40, 50]), + Mask::from_iter([true, true, true, false, true]) + )] + #[case::i64_large( + PrimitiveArray::from_iter((0..1000i64).collect::>()), + Mask::from_iter((0..1000).map(|i| i % 3 == 0)) + )] + #[case::f64_values( + PrimitiveArray::from_iter([1.1f64, 2.2, 3.3, 4.4, 5.5]), + Mask::from_iter([false, true, false, true, false]) + )] + #[case::u8_all_true( + PrimitiveArray::from_iter([1u8, 2, 3, 4, 5]), + Mask::from_iter([true, true, true, true, true]) + )] + #[case::u32_all_false( + PrimitiveArray::from_iter([1u32, 2, 3, 4, 5]), + Mask::from_iter([false, false, false, false, false]) + )] + #[tokio::test] + async fn test_gpu_filter( + #[case] input: PrimitiveArray, + #[case] mask: Mask, + ) -> VortexResult<()> { + let mut cuda_ctx = CudaSession::create_execution_ctx(&VortexSession::empty()) + .vortex_expect("failed to create CUDA execution context"); + + let filter_array = FilterArray::try_new(input.clone().into_array(), mask.clone())?; + + let cpu_result = filter_array.to_canonical()?.into_array(); + + let gpu_result = FilterExecutor + .execute(filter_array.into_array(), &mut cuda_ctx) + .await + .vortex_expect("GPU filter failed") + .into_host() + .await? + .into_array(); + + assert_arrays_eq!(cpu_result, gpu_result); + + Ok(()) + } + + #[tokio::test] + async fn test_gpu_filter_large_array() -> VortexResult<()> { + let mut cuda_ctx = CudaSession::create_execution_ctx(&VortexSession::empty()) + .vortex_expect("failed to create CUDA execution context"); + + // Create a large array to test multi-block execution + let data: Vec = (0..100_000).collect(); + let input = PrimitiveArray::from_iter(data); + + // Select every 7th element + let mask = Mask::from_iter((0..100_000).map(|i| i % 7 == 0)); + + let filter_array = FilterArray::try_new(input.into_array(), mask)?; + + let cpu_result = filter_array.to_canonical()?.into_array(); + + let gpu_result = FilterExecutor + .execute(filter_array.into_array(), &mut cuda_ctx) + .await + .vortex_expect("GPU filter failed") + .into_host() + .await? + .into_array(); + + assert_eq!(cpu_result.len(), gpu_result.len()); + assert_arrays_eq!(cpu_result, gpu_result); + + Ok(()) + } +} diff --git a/vortex-cuda/src/kernel/filter/varbinview.rs b/vortex-cuda/src/kernel/filter/varbinview.rs new file mode 100644 index 00000000000..8a9b164c9fd --- /dev/null +++ b/vortex-cuda/src/kernel/filter/varbinview.rs @@ -0,0 +1,89 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use vortex_array::Canonical; +use vortex_array::arrays::VarBinViewArray; +use vortex_array::arrays::VarBinViewArrayParts; +use vortex_cuda_macros::cuda_tests; +use vortex_error::VortexResult; +use vortex_mask::Mask; + +use crate::CudaExecutionCtx; +use crate::kernel::filter::filter_sized; + +pub(super) async fn filter_varbinview( + array: VarBinViewArray, + mask: Mask, + ctx: &mut CudaExecutionCtx, +) -> VortexResult { + let VarBinViewArrayParts { + views, + buffers, + validity, + dtype, + } = array.into_parts(); + + let filtered_validity = validity.filter(&mask)?; + let filtered_views = filter_sized::(views, mask, ctx).await?; + + Ok(Canonical::VarBinView(VarBinViewArray::new_handle( + filtered_views, + buffers, + dtype, + filtered_validity, + ))) +} + +#[cuda_tests] +mod tests { + use rstest::rstest; + use vortex_array::IntoArray; + use vortex_array::arrays::FilterArray; + use vortex_array::arrays::VarBinViewArray; + use vortex_array::assert_arrays_eq; + use vortex_error::VortexExpect; + use vortex_error::VortexResult; + use vortex_mask::Mask; + use vortex_session::VortexSession; + + use crate::CanonicalCudaExt; + use crate::FilterExecutor; + use crate::executor::CudaExecute; + use crate::session::CudaSession; + + #[rstest] + #[case::nato( + VarBinViewArray::from_iter_str(["alpha", "bravo", "charlie", "delta"]), + Mask::from_iter([true, false, true, false]) + )] + #[case::planets( + VarBinViewArray::from_iter_str( + ["mercury", "venus", "earth", "mars", "jupiter", "saturn", "uranus", "neptune", "pluto"] + ), + Mask::from_iter([true, true, true, true, true, true, true, true, false]) + )] + #[tokio::test] + async fn test_gpu_filter_strings( + #[case] input: VarBinViewArray, + #[case] mask: Mask, + ) -> VortexResult<()> { + let mut cuda_ctx = CudaSession::create_execution_ctx(&VortexSession::empty()) + .vortex_expect("failed to create CUDA execution context"); + + let filter_array = FilterArray::try_new(input.into_array(), mask.clone())?; + + let cpu_result = filter_array.to_canonical()?.into_array(); + + let gpu_result = FilterExecutor + .execute(filter_array.into_array(), &mut cuda_ctx) + .await + .vortex_expect("GPU filter failed") + .into_host() + .await? + .into_array(); + + assert_arrays_eq!(cpu_result, gpu_result); + + Ok(()) + } +} diff --git a/vortex-cuda/src/lib.rs b/vortex-cuda/src/lib.rs index dca0b1aa335..30df645908a 100644 --- a/vortex-cuda/src/lib.rs +++ b/vortex-cuda/src/lib.rs @@ -8,6 +8,7 @@ use std::process::Command; mod canonical; mod device_buffer; pub mod executor; +mod host_to_device_allocator; mod kernel; mod session; mod stream; @@ -17,6 +18,7 @@ pub use device_buffer::CudaBufferExt; pub use device_buffer::CudaDeviceBuffer; pub use executor::CudaExecutionCtx; pub use executor::CudaKernelEvents; +pub use host_to_device_allocator::CopyDeviceReadAt; use kernel::ALPExecutor; use kernel::BitPackedExecutor; use kernel::DecimalBytePartsExecutor; diff --git a/vortex-cuda/src/session.rs b/vortex-cuda/src/session.rs index c83128def3e..8fc0f1e1292 100644 --- a/vortex-cuda/src/session.rs +++ b/vortex-cuda/src/session.rs @@ -16,6 +16,7 @@ use vortex_utils::aliases::dash_map::DashMap; use crate::executor::CudaExecute; pub use crate::executor::CudaExecutionCtx; use crate::kernel::KernelLoader; +use crate::stream::VortexCudaStream; /// CUDA session for GPU accelerated execution. /// @@ -42,17 +43,20 @@ impl CudaSession { pub fn create_execution_ctx( vortex_session: &vortex_session::VortexSession, ) -> VortexResult { - let stream = vortex_session - .cuda_session() - .context - .new_stream() - .map_err(|e| vortex_err!("Failed to create CUDA stream: {}", e))?; + let stream = vortex_session.cuda_session().new_stream()?; Ok(CudaExecutionCtx::new( stream, vortex_session.create_execution_ctx(), )) } + /// Create a new CUDA stream. + pub fn new_stream(&self) -> VortexResult { + Ok(VortexCudaStream(self.context.new_stream().map_err( + |e| vortex_err!("Failed to create CUDA stream: {}", e), + )?)) + } + /// Registers CUDA support for an array encoding. /// /// # Arguments diff --git a/vortex-cuda/src/stream.rs b/vortex-cuda/src/stream.rs index ba1f264ee60..e3080fe6271 100644 --- a/vortex-cuda/src/stream.rs +++ b/vortex-cuda/src/stream.rs @@ -3,12 +3,116 @@ //! CUDA stream utility functions. +use std::sync::Arc; + +use cudarc::driver::CudaSlice; use cudarc::driver::CudaStream; +use cudarc::driver::DevicePtrMut; +use cudarc::driver::DeviceRepr; +use cudarc::driver::result::memcpy_htod_async; use cudarc::driver::result::stream; +use futures::future::BoxFuture; use kanal::Sender; +use vortex_array::buffer::BufferHandle; +use vortex_buffer::Alignment; +use vortex_buffer::Buffer; +use vortex_error::VortexExpect; use vortex_error::VortexResult; use vortex_error::vortex_err; +use crate::CudaDeviceBuffer; + +#[derive(Clone)] +pub struct VortexCudaStream(pub Arc); + +impl VortexCudaStream { + /// Allocates a typed buffer on the GPU. + /// + /// Note: Allocation is async in case the CUDA driver supports this. + /// + /// The condition for alloc to be async is support for memory pools: + /// `CU_DEVICE_ATTRIBUTE_MEMORY_POOLS_SUPPORTED`. + /// + /// Any kernel submitted to the stream after alloc can safely use the + /// memory, as operations on the stream are ordered sequentially. + pub fn device_alloc(&self, len: usize) -> VortexResult> { + // SAFETY: No safety guarantees for allocations on the GPU. + unsafe { + self.0 + .alloc::(len) + .map_err(|e| vortex_err!("Failed to allocate device memory: {}", e)) + } + } + + /// Copies host data to the device asynchronously. + /// + /// Allocates device memory, schedules an async copy, and returns a future + /// that completes when the copy is finished. The source data is moved into + /// the future to ensure it remains valid until the copy completes. + /// + /// # Arguments + /// + /// * `data` - The host data to copy. + /// + /// # Returns + /// + /// A future that resolves to the device buffer handle when the copy completes. + pub fn copy_to_device( + &self, + data: D, + ) -> VortexResult>> + where + T: DeviceRepr + Send + Sync + 'static, + D: AsRef<[T]> + Send + 'static, + { + let host_slice: &[T] = data.as_ref(); + let mut cuda_slice: CudaSlice = self.device_alloc(host_slice.len())?; + let device_ptr = cuda_slice.device_ptr_mut(&self.0).0; + + unsafe { + memcpy_htod_async(device_ptr, host_slice, self.0.cu_stream()) + .map_err(|e| vortex_err!("Failed to schedule async copy to device: {}", e))?; + } + + println!( + "dev {}", + u8::try_from(device_ptr.trailing_zeros()).vortex_expect("aligment over 2^2^8??") + ); + let cuda_buf = CudaDeviceBuffer::new_aligned(cuda_slice, Alignment::new(256)); + let stream = Arc::clone(&self.0); + + Ok(Box::pin(async move { + await_stream_callback(&stream).await?; + + // Keep source memory alive until copy completes. + let _keep_alive = data; + + Ok(BufferHandle::new_device(Arc::new(cuda_buf))) + })) + } + + /// Moves a host buffer handle to the device asynchronously. + /// + /// # Arguments + /// + /// * `handle` - The host buffer to move. Must be a host buffer. + /// + /// # Returns + /// + /// A future that resolves to the device buffer handle when the copy completes. + pub fn move_to_device( + &self, + handle: BufferHandle, + ) -> VortexResult>> { + let host_buffer = handle + .as_host_opt() + .ok_or_else(|| vortex_err!("Buffer is not on host"))?; + + let buffer: Buffer = Buffer::from_byte_buffer(host_buffer.clone()); + self.copy_to_device(buffer) + } +} + /// Registers a callback and asynchronously waits for its completion. /// /// This function can be used to asynchronously wait for events previously diff --git a/vortex-duckdb/src/exporter/varbinview.rs b/vortex-duckdb/src/exporter/varbinview.rs index 23066f691d5..16ffe0232fe 100644 --- a/vortex-duckdb/src/exporter/varbinview.rs +++ b/vortex-duckdb/src/exporter/varbinview.rs @@ -46,12 +46,21 @@ pub(crate) fn new_exporter( &LogicalType::try_from(dtype)?, )); } + + let buffers = buffers + .iter() + .cloned() + .map(|b| b.unwrap_host()) + .collect_vec(); + + let buffers: Arc<[ByteBuffer]> = Arc::from(buffers); + Ok(validity::new_exporter( validity, Box::new(VarBinViewExporter { - views, - buffers: buffers.clone(), + views: Buffer::::from_byte_buffer(views.unwrap_host()), vector_buffers: buffers.iter().cloned().map(VectorBuffer::new).collect_vec(), + buffers, }), )) } diff --git a/vortex-file/Cargo.toml b/vortex-file/Cargo.toml index 011cbac39bb..7f9fefd2bd9 100644 --- a/vortex-file/Cargo.toml +++ b/vortex-file/Cargo.toml @@ -35,6 +35,7 @@ vortex-alp = { workspace = true } vortex-array = { workspace = true } vortex-buffer = { workspace = true } vortex-bytebool = { workspace = true } +vortex-cuda = { workspace = true } vortex-datetime-parts = { workspace = true } vortex-decimal-byte-parts = { workspace = true } vortex-dtype = { workspace = true } diff --git a/vortex-file/src/open.rs b/vortex-file/src/open.rs index af507f86278..315295cafef 100644 --- a/vortex-file/src/open.rs +++ b/vortex-file/src/open.rs @@ -12,6 +12,7 @@ use vortex_dtype::DType; use vortex_error::VortexError; use vortex_error::VortexExpect; use vortex_error::VortexResult; +use vortex_io::InstrumentedReadAt; use vortex_io::VortexReadAt; use vortex_io::session::RuntimeSessionExt; use vortex_layout::segments::NoOpSegmentCache; @@ -157,6 +158,7 @@ impl VortexOpenOptions { /// An API for opening a [`VortexFile`] using any [`VortexReadAt`] implementation. pub async fn open_read(self, reader: R) -> VortexResult { let metrics = self.metrics.clone().unwrap_or_default(); + let reader = InstrumentedReadAt::new(reader, &metrics); let footer = if let Some(footer) = self.footer { footer } else { @@ -209,7 +211,8 @@ impl VortexOpenOptions { let initial_offset = file_size - initial_read_size as u64; let initial_read: ByteBuffer = read .read_at(initial_offset, initial_read_size, Alignment::none()) - .await?; + .await? + .try_into_host_sync()?; let mut deserializer = Footer::deserializer(initial_read, self.session.clone()) .with_size(file_size) @@ -218,7 +221,10 @@ impl VortexOpenOptions { let footer = loop { match deserializer.deserialize()? { DeserializeStep::NeedMoreData { offset, len } => { - let more_data = read.read_at(offset, len, Alignment::none()).await?; + let more_data = read + .read_at(offset, len, Alignment::none()) + .await? + .try_into_host_sync()?; deserializer.prefix_data(more_data); } DeserializeStep::NeedFileSize => unreachable!("We passed file_size above"), @@ -287,6 +293,7 @@ mod tests { use futures::future::BoxFuture; use vortex_array::IntoArray; + use vortex_array::buffer::BufferHandle; use vortex_array::expr::session::ExprSession; use vortex_array::session::ArraySession; use vortex_buffer::Buffer; @@ -315,7 +322,7 @@ mod tests { offset: u64, length: usize, alignment: Alignment, - ) -> BoxFuture<'static, VortexResult> { + ) -> BoxFuture<'static, VortexResult> { self.total_read.fetch_add(length, Ordering::Relaxed); let _ = self.first_read_len.compare_exchange( 0, diff --git a/vortex-file/src/read/driver.rs b/vortex-file/src/read/driver.rs index f097385445e..e0aa59666fa 100644 --- a/vortex-file/src/read/driver.rs +++ b/vortex-file/src/read/driver.rs @@ -325,8 +325,8 @@ impl State { mod tests { use futures::StreamExt; use futures::stream; + use vortex_array::buffer::BufferHandle; use vortex_buffer::Alignment; - use vortex_buffer::ByteBuffer; use vortex_error::VortexResult; use super::*; @@ -336,7 +336,7 @@ mod tests { id: usize, offset: u64, length: usize, - ) -> (ReadRequest, oneshot::Receiver>) { + ) -> (ReadRequest, oneshot::Receiver>) { let (tx, rx) = oneshot::channel(); ( ReadRequest { diff --git a/vortex-file/src/read/request.rs b/vortex-file/src/read/request.rs index 256cb95851d..cdd71670070 100644 --- a/vortex-file/src/read/request.rs +++ b/vortex-file/src/read/request.rs @@ -7,8 +7,8 @@ use std::fmt::Formatter; use std::ops::Range; use std::sync::Arc; +use vortex_array::buffer::BufferHandle; use vortex_buffer::Alignment; -use vortex_buffer::ByteBuffer; use vortex_error::VortexError; use vortex_error::VortexExpect; use vortex_error::VortexResult; @@ -51,7 +51,7 @@ impl IoRequest { } /// Resolves the request with the given result. - pub fn resolve(self, result: VortexResult) { + pub fn resolve(self, result: VortexResult) { match self.0 { IoRequestInner::Single(req) => req.resolve(result), IoRequestInner::Coalesced(req) => req.resolve(result), @@ -90,7 +90,7 @@ pub struct ReadRequest { pub(crate) offset: u64, pub(crate) length: usize, pub(crate) alignment: Alignment, - pub(crate) callback: oneshot::Sender>, + pub(crate) callback: oneshot::Sender>, } impl Debug for ReadRequest { @@ -106,7 +106,7 @@ impl Debug for ReadRequest { } impl ReadRequest { - pub(crate) fn resolve(self, result: VortexResult) { + pub(crate) fn resolve(self, result: VortexResult) { if let Err(e) = self.callback.send(result) { tracing::debug!("ReadRequest {} dropped before resolving: {e}", self.id); } @@ -132,15 +132,31 @@ impl Debug for CoalescedRequest { } impl CoalescedRequest { - pub fn resolve(self, result: VortexResult) { + pub fn resolve(self, result: VortexResult) { match result { Ok(buffer) => { - let buffer = buffer.aligned(Alignment::none()); + let base = match buffer.ensure_aligned(Alignment::none()) { + Ok(base) => base, + Err(e) => { + let e = Arc::new(e); + for req in self.requests.into_iter() { + req.resolve(Err(VortexError::from(e.clone()))); + } + return; + } + }; + for req in self.requests.into_iter() { let start = usize::try_from(req.offset - self.range.start) .vortex_expect("invalid offset"); let end = start + req.length; - let slice = buffer.slice(start..end).aligned(req.alignment); + let slice = match base.slice(start..end).ensure_aligned(req.alignment) { + Ok(slice) => slice, + Err(e) => { + req.resolve(Err(e)); + continue; + } + }; req.resolve(Ok(slice)); } } diff --git a/vortex-file/src/segments/source.rs b/vortex-file/src/segments/source.rs index a1072af9998..344f805f516 100644 --- a/vortex-file/src/segments/source.rs +++ b/vortex-file/src/segments/source.rs @@ -14,7 +14,6 @@ use futures::StreamExt; use futures::channel::mpsc; use vortex_array::buffer::BufferHandle; use vortex_buffer::Alignment; -use vortex_buffer::ByteBuffer; use vortex_error::VortexResult; use vortex_error::vortex_err; use vortex_io::VortexReadAt; @@ -99,9 +98,9 @@ impl FileSegmentSource { stream .map(move |req| { - let source = reader.clone(); + let reader = reader.clone(); async move { - let result = source + let result = reader .read_at(req.offset(), req.len(), req.alignment()) .await; req.resolve(result); @@ -162,7 +161,6 @@ impl SegmentSource for FileSegmentSource { maybe_fut .ok_or_else(|| vortex_err!("Missing segment: {}", id))? .await - .map(BufferHandle::new_host) } .boxed() } @@ -174,13 +172,13 @@ impl SegmentSource for FileSegmentSource { /// If dropped, the read request will be canceled where possible. struct ReadFuture { id: usize, - recv: oneshot::Receiver>, + recv: oneshot::Receiver>, polled: bool, events: mpsc::UnboundedSender, } impl Future for ReadFuture { - type Output = VortexResult; + type Output = VortexResult; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { if !self.polled { diff --git a/vortex-file/src/tests.rs b/vortex-file/src/tests.rs index a43102b7d3c..fc46d2dc0ad 100644 --- a/vortex-file/src/tests.rs +++ b/vortex-file/src/tests.rs @@ -50,9 +50,15 @@ use vortex_array::validity::Validity; use vortex_buffer::Buffer; use vortex_buffer::ByteBufferMut; use vortex_buffer::buffer; +use vortex_cuda::CanonicalCudaExt; +use vortex_cuda::CopyDeviceReadAt; +use vortex_cuda::CudaSession; +use vortex_cuda::CudaSessionExt; +use vortex_cuda::executor::CudaArrayExt; use vortex_dtype::DType; use vortex_dtype::DecimalDType; use vortex_dtype::ExtDType; +use vortex_dtype::FieldNames; use vortex_dtype::Nullability; use vortex_dtype::PType; use vortex_dtype::PType::I32; @@ -61,7 +67,9 @@ use vortex_dtype::datetime::TIMESTAMP_ID; use vortex_dtype::datetime::TemporalMetadata; use vortex_dtype::datetime::TimeUnit; use vortex_error::VortexResult; +use vortex_io::file::std_file::FileReadAdapter; use vortex_io::session::RuntimeSession; +use vortex_io::session::RuntimeSessionExt; use vortex_layout::session::LayoutSession; use vortex_metrics::VortexMetrics; use vortex_scalar::Scalar; @@ -1635,8 +1643,82 @@ async fn main_test() -> Result<(), Box> { let results = stream.try_collect::>().await; - let err = results.err().unwrap(); - println!("Expected error: {}", err); + assert!(results.is_err()); + + Ok(()) +} + +#[tokio::test] +async fn gpu_scan() -> VortexResult<()> { + use vortex_alp::alp_encode; + + // Create an ALP-encoded array from primitive f64 values + let primitive = PrimitiveArray::from_iter((0..100).map(|i| i as f64 * 1.1)); + let alp_array = alp_encode(&primitive, None)?; + let str_array = VarBinViewArray::from_iter_str((0..100).map(|i| format!("number {i}"))); + + println!("alp {}", alp_array.display_tree()); + + let array = StructArray::new( + FieldNames::from(vec!["float_col", "int_col", "str_col"]), + vec![ + primitive.into_array(), + alp_array.into_array(), + str_array.into_array(), + ], + 100, + Validity::NonNullable, + ); + + // Write to a buffer, then to a temp file + let temp_path = std::env::temp_dir().join("gpu_scan_test.vortex"); + let mut buf = Vec::new(); + SESSION + .write_options() + .write(&mut buf, array.to_array_stream()) + .await?; + std::fs::write(&temp_path, &buf)?; + + // Read back via GPU + let handle = SESSION.handle(); + let source = Arc::new(FileReadAdapter::open(&temp_path, handle)?); + let gpu_reader = CopyDeviceReadAt::new(source.clone(), SESSION.cuda_session().new_stream()?); + let cpu_reader = source; + + let cpu_file = SESSION.open_options().open_read(cpu_reader).await?; + + let gpu_file = SESSION + .open_options() + .with_footer(cpu_file.footer) + .open_read(gpu_reader) + .await?; + + let mut cuda_ctx = CudaSession::create_execution_ctx(&SESSION)?; + + let mut res = Vec::new(); + let mut stream = gpu_file + .scan()? + // filter with a predefined mask + .with_row_indices(buffer![0, 10, 20, 30, 40, 50, 60, 70, 80, 90]) + .into_array_stream()?; + while let Some(a) = stream.next().await { + let a = a?; + // println!("arr {}", a.display_tree()); + let array = a + .execute_cuda(&mut cuda_ctx) + .await? + .into_host() + .await? + .into_array(); + res.push(array); + } + + for a in res { + println!("a {} ", a.display_tree()) + } + + // Cleanup + std::fs::remove_file(&temp_path)?; Ok(()) } diff --git a/vortex-io/Cargo.toml b/vortex-io/Cargo.toml index cef1c69e351..5a98bd28528 100644 --- a/vortex-io/Cargo.toml +++ b/vortex-io/Cargo.toml @@ -35,6 +35,7 @@ handle = "1.0.2" tokio = { workspace = true, features = ["io-util", "rt", "sync"] } tracing = { workspace = true } vortex-buffer = { workspace = true } +vortex-array = { workspace = true } vortex-error = { workspace = true } vortex-metrics = { workspace = true } vortex-session = { workspace = true } diff --git a/vortex-io/src/file/object_store.rs b/vortex-io/src/file/object_store.rs index 0d09cbdcd2b..80c8a9343fe 100644 --- a/vortex-io/src/file/object_store.rs +++ b/vortex-io/src/file/object_store.rs @@ -13,8 +13,8 @@ use object_store::GetRange; use object_store::GetResultPayload; use object_store::ObjectStore; use object_store::path::Path as ObjectPath; +use vortex_array::buffer::BufferHandle; use vortex_buffer::Alignment; -use vortex_buffer::ByteBuffer; use vortex_buffer::ByteBufferMut; use vortex_error::VortexError; use vortex_error::VortexResult; @@ -108,7 +108,7 @@ impl VortexReadAt for ObjectStoreSource { offset: u64, length: usize, alignment: Alignment, - ) -> BoxFuture<'static, VortexResult> { + ) -> BoxFuture<'static, VortexResult> { let store = self.store.clone(); let path = self.path.clone(); let handle = self.handle.clone(); @@ -161,7 +161,7 @@ impl VortexReadAt for ObjectStoreSource { } }; - Ok(buffer.freeze()) + Ok(BufferHandle::new_host(buffer.freeze())) }) .boxed() } diff --git a/vortex-io/src/file/std_file.rs b/vortex-io/src/file/std_file.rs index 56abd56eb60..77417aea659 100644 --- a/vortex-io/src/file/std_file.rs +++ b/vortex-io/src/file/std_file.rs @@ -2,6 +2,7 @@ // SPDX-FileCopyrightText: Copyright the Vortex contributors use std::fs::File; +use std::io; #[cfg(all(not(unix), not(windows)))] use std::io::Read; #[cfg(all(not(unix), not(windows)))] @@ -15,8 +16,8 @@ use std::sync::Arc; use futures::FutureExt; use futures::future::BoxFuture; +use vortex_array::buffer::BufferHandle; use vortex_buffer::Alignment; -use vortex_buffer::ByteBuffer; use vortex_buffer::ByteBufferMut; use vortex_error::VortexResult; @@ -27,7 +28,7 @@ use crate::runtime::Handle; /// Read exactly `buffer.len()` bytes from `file` starting at `offset`. /// This is a platform-specific helper that uses the most efficient method available. #[cfg(not(target_arch = "wasm32"))] -pub(crate) fn read_exact_at(file: &File, buffer: &mut [u8], offset: u64) -> std::io::Result<()> { +pub(crate) fn read_exact_at(file: &File, buffer: &mut [u8], offset: u64) -> io::Result<()> { #[cfg(unix)] { file.read_exact_at(buffer, offset) @@ -107,7 +108,7 @@ impl VortexReadAt for FileReadAdapter { offset: u64, length: usize, alignment: Alignment, - ) -> BoxFuture<'static, VortexResult> { + ) -> BoxFuture<'static, VortexResult> { let file = self.file.clone(); let handle = self.handle.clone(); async move { @@ -116,7 +117,7 @@ impl VortexReadAt for FileReadAdapter { let mut buffer = ByteBufferMut::with_capacity_aligned(length, alignment); unsafe { buffer.set_len(length) }; read_exact_at(&file, &mut buffer, offset)?; - Ok(buffer.freeze()) + Ok(BufferHandle::new_host(buffer.freeze())) }) .await } diff --git a/vortex-io/src/read.rs b/vortex-io/src/read.rs index fbcbd697d45..2cbdf017f9e 100644 --- a/vortex-io/src/read.rs +++ b/vortex-io/src/read.rs @@ -5,6 +5,7 @@ use std::sync::Arc; use futures::FutureExt; use futures::future::BoxFuture; +use vortex_array::buffer::BufferHandle; use vortex_buffer::Alignment; use vortex_buffer::ByteBuffer; use vortex_error::VortexExpect; @@ -71,7 +72,7 @@ pub trait VortexReadAt: Send + Sync + 'static { /// Asynchronously get the number of bytes of the underlying source. fn size(&self) -> BoxFuture<'static, VortexResult>; - /// Request an asynchronous positional read. Results will be returned as a [`ByteBuffer`]. + /// Request an asynchronous positional read. Results will be returned as a [`BufferHandle`]. /// /// If the reader does not have the requested number of bytes, the returned Future will complete /// with an [`UnexpectedEof`][std::io::ErrorKind::UnexpectedEof] error. @@ -80,7 +81,7 @@ pub trait VortexReadAt: Send + Sync + 'static { offset: u64, length: usize, alignment: Alignment, - ) -> BoxFuture<'static, VortexResult>; + ) -> BoxFuture<'static, VortexResult>; } impl VortexReadAt for Arc { @@ -105,7 +106,7 @@ impl VortexReadAt for Arc { offset: u64, length: usize, alignment: Alignment, - ) -> BoxFuture<'static, VortexResult> { + ) -> BoxFuture<'static, VortexResult> { self.as_ref().read_at(offset, length, alignment) } } @@ -132,7 +133,7 @@ impl VortexReadAt for Arc { offset: u64, length: usize, alignment: Alignment, - ) -> BoxFuture<'static, VortexResult> { + ) -> BoxFuture<'static, VortexResult> { self.as_ref().read_at(offset, length, alignment) } @@ -158,7 +159,7 @@ impl VortexReadAt for ByteBuffer { offset: u64, length: usize, alignment: Alignment, - ) -> BoxFuture<'static, VortexResult> { + ) -> BoxFuture<'static, VortexResult> { let buffer = self.clone(); async move { let start = usize::try_from(offset).vortex_expect("start too big for usize"); @@ -172,7 +173,9 @@ impl VortexReadAt for ByteBuffer { buffer.len() ); } - Ok(buffer.slice_unaligned(start..end).aligned(alignment)) + Ok(BufferHandle::new_host( + buffer.slice_unaligned(start..end).aligned(alignment), + )) } .boxed() } @@ -247,7 +250,7 @@ impl VortexReadAt for InstrumentedReadAt { offset: u64, length: usize, alignment: Alignment, - ) -> BoxFuture<'static, VortexResult> { + ) -> BoxFuture<'static, VortexResult> { let durations = self.durations.clone(); let sizes = self.sizes.clone(); let total_size = self.total_size.clone(); @@ -291,7 +294,7 @@ mod tests { let data = ByteBuffer::from(vec![1, 2, 3, 4, 5]); let result = data.read_at(1, 3, Alignment::none()).await.unwrap(); - assert_eq!(result.as_ref(), &[2, 3, 4]); + assert_eq!(result.to_host_sync().as_ref(), &[2, 3, 4]); } #[tokio::test] @@ -307,7 +310,7 @@ mod tests { let data = Arc::new(ByteBuffer::from(vec![1, 2, 3, 4, 5])); let result = data.read_at(2, 3, Alignment::none()).await.unwrap(); - assert_eq!(result.as_ref(), &[3, 4, 5]); + assert_eq!(result.to_host_sync().as_ref(), &[3, 4, 5]); let size = data.size().await.unwrap(); assert_eq!(size, 5); diff --git a/vortex-io/src/runtime/tests.rs b/vortex-io/src/runtime/tests.rs index 10832633983..928fb476406 100644 --- a/vortex-io/src/runtime/tests.rs +++ b/vortex-io/src/runtime/tests.rs @@ -11,6 +11,7 @@ use std::sync::atomic::Ordering; use futures::FutureExt; use futures::future::BoxFuture; use tempfile::NamedTempFile; +use vortex_array::buffer::BufferHandle; use vortex_buffer::Alignment; use vortex_buffer::ByteBuffer; use vortex_buffer::ByteBufferMut; @@ -42,7 +43,7 @@ fn test_file_read_with_single_thread_runtime() { .await .unwrap(); assert_eq!( - result.as_slice(), + result.to_host_sync().as_slice(), &TEST_DATA[TEST_OFFSET as usize..][..TEST_LEN] ); @@ -51,7 +52,7 @@ fn test_file_read_with_single_thread_runtime() { .read_at(0, TEST_DATA.len(), Alignment::new(1)) .await .unwrap(); - assert_eq!(full.as_slice(), TEST_DATA); + assert_eq!(full.to_host_sync().as_slice(), TEST_DATA); "success" } @@ -70,7 +71,7 @@ async fn test_file_read_with_tokio_runtime() { .await .unwrap(); assert_eq!( - result.as_slice(), + result.to_host_sync().as_slice(), &TEST_DATA[TEST_OFFSET as usize..][..TEST_LEN] ); @@ -79,7 +80,7 @@ async fn test_file_read_with_tokio_runtime() { .read_at(0, TEST_DATA.len(), Alignment::new(1)) .await .unwrap(); - assert_eq!(full.as_slice(), TEST_DATA); + assert_eq!(full.to_host_sync().as_slice(), TEST_DATA); } // ============================================================================ @@ -107,7 +108,7 @@ fn test_file_read_with_real_file_single_thread() { .await .unwrap(); assert_eq!( - result.as_slice(), + result.to_host_sync().as_slice(), &TEST_DATA[TEST_OFFSET as usize..][..TEST_LEN] ); @@ -116,7 +117,7 @@ fn test_file_read_with_real_file_single_thread() { .read_at(0, TEST_DATA.len(), Alignment::new(1)) .await .unwrap(); - assert_eq!(full.as_slice(), TEST_DATA); + assert_eq!(full.to_host_sync().as_slice(), TEST_DATA); "success" } @@ -144,7 +145,7 @@ async fn test_file_read_with_real_file_tokio() { .await .unwrap(); assert_eq!( - result.as_slice(), + result.to_host_sync().as_slice(), &TEST_DATA[TEST_OFFSET as usize..][..TEST_LEN] ); @@ -153,7 +154,7 @@ async fn test_file_read_with_real_file_tokio() { .read_at(0, TEST_DATA.len(), Alignment::new(1)) .await .unwrap(); - assert_eq!(full.as_slice(), TEST_DATA); + assert_eq!(full.to_host_sync().as_slice(), TEST_DATA); } // ============================================================================ @@ -174,10 +175,22 @@ async fn test_concurrent_reads() { let results = futures::future::join_all(futures).await; - assert_eq!(results[0].as_ref().unwrap().as_slice(), &TEST_DATA[0..5]); - assert_eq!(results[1].as_ref().unwrap().as_slice(), &TEST_DATA[5..10]); - assert_eq!(results[2].as_ref().unwrap().as_slice(), &TEST_DATA[10..15]); - assert_eq!(results[3].as_ref().unwrap().as_slice(), &TEST_DATA[15..20]); + assert_eq!( + results[0].as_ref().unwrap().to_host_sync().as_slice(), + &TEST_DATA[0..5] + ); + assert_eq!( + results[1].as_ref().unwrap().to_host_sync().as_slice(), + &TEST_DATA[5..10] + ); + assert_eq!( + results[2].as_ref().unwrap().to_host_sync().as_slice(), + &TEST_DATA[10..15] + ); + assert_eq!( + results[3].as_ref().unwrap().to_host_sync().as_slice(), + &TEST_DATA[15..20] + ); } // ============================================================================ @@ -240,7 +253,7 @@ impl VortexReadAt for CountingReadAt { offset: u64, length: usize, alignment: Alignment, - ) -> BoxFuture<'static, VortexResult> { + ) -> BoxFuture<'static, VortexResult> { self.read_count.fetch_add(1, Ordering::SeqCst); let data = self.data.clone(); async move { @@ -253,7 +266,7 @@ impl VortexReadAt for CountingReadAt { buffer .as_mut_slice() .copy_from_slice(&data.as_slice()[start..start + length]); - Ok(buffer.freeze()) + Ok(BufferHandle::new_host(buffer.freeze())) } .boxed() } diff --git a/vortex-layout/src/layouts/chunked/reader.rs b/vortex-layout/src/layouts/chunked/reader.rs index e4d26c7e808..6ef61a5504d 100644 --- a/vortex-layout/src/layouts/chunked/reader.rs +++ b/vortex-layout/src/layouts/chunked/reader.rs @@ -301,7 +301,10 @@ impl LayoutReader for ChunkedReader { } // Combine the arrays. - Ok(ChunkedArray::try_new(chunks, dtype)?.to_array()) + let x = ChunkedArray::try_new(chunks, dtype)?.to_array(); + println!("{}", x.display_tree()); + + Ok(x) } .boxed()) } diff --git a/vortex-layout/src/layouts/flat/reader.rs b/vortex-layout/src/layouts/flat/reader.rs index 273dfa74d51..05ddbe75754 100644 --- a/vortex-layout/src/layouts/flat/reader.rs +++ b/vortex-layout/src/layouts/flat/reader.rs @@ -72,6 +72,11 @@ impl FlatReader { let array_tree = self.layout.array_tree().cloned(); async move { let segment = segment_fut.await?; + println!( + "segment host {}, device {}", + segment.is_on_host(), + segment.is_on_device() + ); let parts = if let Some(array_tree) = array_tree { // Use the pre-stored flatbuffer from layout metadata combined with segment buffers. ArrayParts::from_flatbuffer_and_segment(array_tree, segment)? @@ -214,6 +219,8 @@ impl LayoutReader for FlatReader { // Evaluate the projection expression. array = array.apply(&expr)?; + println!("array {}", array.display_tree()); + Ok(array) } .boxed()) diff --git a/vortex-layout/src/layouts/struct_/reader.rs b/vortex-layout/src/layouts/struct_/reader.rs index 66c06505ff5..6b23d8c5b19 100644 --- a/vortex-layout/src/layouts/struct_/reader.rs +++ b/vortex-layout/src/layouts/struct_/reader.rs @@ -346,7 +346,7 @@ impl LayoutReader for StructReader { let mask = Mask::from_buffer(validity.to_bool().to_bit_buffer().not()); // If root expression was a pack, then we apply the validity to each child field - if is_pack_merge { + let res = if is_pack_merge { let struct_array = array.to_struct(); let masked_fields: Vec = struct_array .unmasked_fields() @@ -365,9 +365,12 @@ impl LayoutReader for StructReader { // If the root expression was not a pack or merge, e.g. if it's something like // a get_item, then we apply the validity directly to the result vortex_array::compute::mask(array.as_ref(), &mask) - } + }; + res } else { - projected.await + projected + .await + .inspect(|a| println!("ret array {}", a.display_tree())) } })) }