Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 5 additions & 2 deletions encodings/sparse/src/canonical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use vortex_array::arrays::PrimitiveArray;
use vortex_array::arrays::StructArray;
use vortex_array::arrays::VarBinViewArray;
use vortex_array::arrays::build_views::BinaryView;
use vortex_array::buffer::BufferHandle;
use vortex_array::builders::ArrayBuilder;
use vortex_array::builders::DecimalBuilder;
use vortex_array::builders::ListViewBuilder;
Expand Down Expand Up @@ -480,7 +481,7 @@ fn canonicalize_varbin_inner<I: IntegerPType>(
let mut buffers = values.buffers().to_vec();

let fill = if let Some(buffer) = &fill_value {
buffers.push(buffer.clone());
buffers.push(BufferHandle::new_host(buffer.clone()));
BinaryView::make_view(
buffer.as_ref(),
u32::try_from(n_patch_buffers).vortex_expect("too many buffers"),
Expand All @@ -498,9 +499,11 @@ fn canonicalize_varbin_inner<I: IntegerPType>(
views[patch_index_usize] = patch;
}

let views = BufferHandle::new_host(views.freeze().into_byte_buffer());

// SAFETY: views are constructed to maintain the invariants
let array = unsafe {
VarBinViewArray::new_unchecked(views.freeze(), Arc::from(buffers), dtype, validity)
VarBinViewArray::new_handle_unchecked(views, Arc::from(buffers), dtype, validity)
};

Canonical::VarBinView(array)
Expand Down
8 changes: 4 additions & 4 deletions fuzz/src/array/fill_null.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,8 @@ fn fill_varbinview_array(
let string_refs: Vec<&str> = strings.iter().map(|s| s.as_str()).collect();
let result = VarBinViewArray::from_iter_str(string_refs).into_array();
if result_nullability == Nullability::Nullable {
VarBinViewArray::new(
result.to_varbinview().views().clone(),
VarBinViewArray::new_handle(
result.to_varbinview().views_handle().clone(),
result.to_varbinview().buffers().clone(),
result.dtype().as_nullable(),
result_nullability.into(),
Expand Down Expand Up @@ -230,8 +230,8 @@ fn fill_varbinview_array(
let binary_refs: Vec<&[u8]> = binaries.iter().map(|b| b.as_slice()).collect();
let result = VarBinViewArray::from_iter_bin(binary_refs).into_array();
if result_nullability == Nullability::Nullable {
VarBinViewArray::new(
result.to_varbinview().views().clone(),
VarBinViewArray::new_handle(
result.to_varbinview().views_handle().clone(),
result.to_varbinview().buffers().clone(),
result.dtype().as_nullable(),
result_nullability.into(),
Expand Down
4 changes: 2 additions & 2 deletions fuzz/src/array/mask.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ pub fn mask_canonical_array(canonical: Canonical, mask: &Mask) -> VortexResult<A
}
Canonical::VarBinView(array) => {
let new_validity = array.validity().mask(mask);
VarBinViewArray::new(
array.views().clone(),
VarBinViewArray::new_handle(
array.views_handle().clone(),
array.buffers().clone(),
array.dtype().with_nullability(new_validity.nullability()),
new_validity,
Expand Down
12 changes: 5 additions & 7 deletions vortex-array/src/arrays/decimal/vtable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,11 @@ impl VTable for DecimalVTable {

match_each_decimal_value_type!(metadata.values_type(), |D| {
// Check and reinterpret-cast the buffer
if let Some(buffer) = values.as_host_opt() {
vortex_ensure!(
buffer.is_aligned(Alignment::of::<D>()),
"DecimalArray buffer not aligned for values type {:?}",
D::DECIMAL_TYPE
);
}
vortex_ensure!(
values.alignment().is_aligned_to(Alignment::of::<D>()),
"DecimalArray buffer not aligned for values type {:?}",
D::DECIMAL_TYPE
);
DecimalArray::try_new_handle(values, metadata.values_type(), *decimal_dtype, validity)
})
}
Expand Down
16 changes: 16 additions & 0 deletions vortex-array/src/arrays/filter/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,14 @@ use vortex_mask::Mask;
use crate::ArrayRef;
use crate::stats::ArrayStats;

/// Decomposed parts of the filter array.
pub struct FilterArrayParts {
/// Child array that is filtered by the mask
pub child: ArrayRef,
/// Mask to apply at filter time. Child elements with set indices are kept, the rest discarded.
pub mask: Mask,
}

// TODO(connor): Write docs on why we have this, and what we had in the old world so that the future
// does not repeat the mistakes of the past.
/// A lazy array that represents filtering a child array by a boolean [`Mask`].
Expand Down Expand Up @@ -56,4 +64,12 @@ impl FilterArray {
pub fn filter_mask(&self) -> &Mask {
&self.mask
}

/// Consume the array and return its individual components.
pub fn into_parts(self) -> FilterArrayParts {
FilterArrayParts {
child: self.child,
mask: self.mask,
}
}
}
15 changes: 4 additions & 11 deletions vortex-array/src/arrays/filter/execute/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,22 +31,15 @@ mod struct_;
mod varbinview;

/// Reconstruct a [`Mask`] from an [`Arc<MaskValues>`].
#[inline]
fn values_to_mask(values: &Arc<MaskValues>) -> Mask {
Mask::Values(values.clone())
}

/// A helper function that lazily filters a [`Validity`] with a selection mask.
///
/// If the validity is a [`Validity::Array`], then this wraps it in a `FilterArray` instead of
/// eagerly filtering the values immediately.
/// A helper function that lazily filters a [`Validity`] with selection mask values.
fn filter_validity(validity: Validity, mask: &Arc<MaskValues>) -> Validity {
match validity {
v @ (Validity::NonNullable | Validity::AllValid | Validity::AllInvalid) => v,
Validity::Array(arr) => {
Validity::Array(FilterArray::new(arr.clone(), values_to_mask(mask)).into_array())
}
}
validity
.filter(&values_to_mask(mask))
.vortex_expect("Somehow unable to wrap filter around a validity array")
}

/// Check for some fast-path execution conditions before calling [`execute_filter`].
Expand Down
1 change: 1 addition & 0 deletions vortex-array/src/arrays/filter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

mod array;
pub use array::FilterArray;
pub use array::FilterArrayParts;

mod execute;

Expand Down
4 changes: 2 additions & 2 deletions vortex-array/src/arrays/masked/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@ fn mask_validity_varbinview(array: VarBinViewArray, mask: &Mask) -> VarBinViewAr
let new_validity = combine_validity(array.validity(), mask, len);
// SAFETY: We're only changing validity, not the data structure
unsafe {
VarBinViewArray::new_unchecked(
array.views().clone(),
VarBinViewArray::new_handle_unchecked(
array.views_handle().clone(),
array.buffers().clone(),
dtype,
new_validity,
Expand Down
17 changes: 7 additions & 10 deletions vortex-array/src/arrays/primitive/vtable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,16 +101,13 @@ impl VTable for PrimitiveVTable {
);
}

// For host buffers, we eagerly check alignment on construction.
// TODO(aduffy): check for device buffers. CUDA buffers are generally 256-byte aligned,
// but not sure about other devices.
if let Some(host_buf) = buffer.as_host_opt() {
vortex_ensure!(
host_buf.is_aligned(Alignment::new(ptype.byte_width())),
"PrimitiveArray::build: Buffer must be aligned to {}",
ptype.byte_width()
);
}
vortex_ensure!(
buffer
.alignment()
.is_aligned_to(Alignment::new(ptype.byte_width())),
"PrimitiveArray::build: Buffer must be aligned to {}",
ptype.byte_width()
);

// SAFETY: checked ahead of time
unsafe {
Expand Down
100 changes: 91 additions & 9 deletions vortex-array/src/arrays/varbinview/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

use std::sync::Arc;

use vortex_buffer::Alignment;
use vortex_buffer::Buffer;
use vortex_buffer::ByteBuffer;
use vortex_dtype::DType;
Expand All @@ -15,6 +16,7 @@ use vortex_error::vortex_err;
use vortex_error::vortex_panic;
use vortex_vector::binaryview::BinaryView;

use crate::buffer::BufferHandle;
use crate::builders::ArrayBuilder;
use crate::builders::VarBinViewBuilder;
use crate::stats::ArrayStats;
Expand Down Expand Up @@ -82,16 +84,16 @@ use crate::validity::Validity;
#[derive(Clone, Debug)]
pub struct VarBinViewArray {
pub(super) dtype: DType,
pub(super) buffers: Arc<[ByteBuffer]>,
pub(super) views: Buffer<BinaryView>,
pub(super) buffers: Arc<[BufferHandle]>,
pub(super) views: BufferHandle,
pub(super) validity: Validity,
pub(super) stats_set: ArrayStats,
}

pub struct VarBinViewArrayParts {
pub dtype: DType,
pub buffers: Arc<[ByteBuffer]>,
pub views: Buffer<BinaryView>,
pub buffers: Arc<[BufferHandle]>,
pub views: BufferHandle,
pub validity: Validity,
}

Expand All @@ -112,6 +114,22 @@ impl VarBinViewArray {
.vortex_expect("VarBinViewArray construction failed")
}

/// Creates a new [`VarBinViewArray`] with device or host memory.
///
/// # Panics
///
/// Panics if the provided components do not satisfy the invariants documented
/// in [`VarBinViewArray::new_unchecked`].
pub fn new_handle(
views: BufferHandle,
buffers: Arc<[BufferHandle]>,
dtype: DType,
validity: Validity,
) -> Self {
Self::try_new_handle(views, buffers, dtype, validity)
.vortex_expect("VarbinViewArray construction failed")
}

/// Constructs a new `VarBinViewArray`.
///
/// See [`VarBinViewArray::new_unchecked`] for more information.
Expand All @@ -132,6 +150,32 @@ impl VarBinViewArray {
Ok(unsafe { Self::new_unchecked(views, buffers, dtype, validity) })
}

/// Constructs a new `VarBinViewArray`.
///
/// See [`VarBinViewArray::new_unchecked`] for more information.
///
/// # Errors
///
/// Returns an error if the provided components do not satisfy the invariants documented in
/// [`VarBinViewArray::new_unchecked`].
pub fn try_new_handle(
views: BufferHandle,
buffers: Arc<[BufferHandle]>,
dtype: DType,
validity: Validity,
) -> VortexResult<Self> {
// TODO(aduffy): device validation.
if let Some(host) = views.as_host_opt() {
vortex_ensure!(
host.is_aligned(Alignment::of::<BinaryView>()),
"Views on host must be 16 byte aligned"
);
}

// SAFETY: validate ensures all invariants are met.
Ok(unsafe { Self::new_handle_unchecked(views, buffers, dtype, validity) })
}

/// Creates a new [`VarBinViewArray`] without validation from these components:
///
/// * `views` is a buffer of 16-byte view entries (one per logical element).
Expand Down Expand Up @@ -171,10 +215,38 @@ impl VarBinViewArray {
Self::validate(&views, &buffers, &dtype, &validity)
.vortex_expect("[Debug Assertion]: Invalid `VarBinViewArray` parameters");

let handles: Vec<BufferHandle> = buffers
.iter()
.cloned()
.map(BufferHandle::new_host)
.collect();

let handles = Arc::from(handles);

Self {
dtype,
buffers,
buffers: handles,
views: BufferHandle::new_host(views.into_byte_buffer()),
validity,
stats_set: Default::default(),
}
}

/// Construct a new array from `BufferHandle`s without validation.
///
/// # Safety
///
/// See documentation in `new_unchecked`.
pub unsafe fn new_handle_unchecked(
views: BufferHandle,
buffers: Arc<[BufferHandle]>,
dtype: DType,
validity: Validity,
) -> Self {
Self {
views,
buffers,
dtype,
validity,
stats_set: Default::default(),
}
Expand Down Expand Up @@ -290,7 +362,16 @@ impl VarBinViewArray {
/// contain either a pointer into one of the array's owned `buffer`s OR an inlined copy of
/// the string (if the string has 12 bytes or fewer).
#[inline]
pub fn views(&self) -> &Buffer<BinaryView> {
pub fn views(&self) -> &[BinaryView] {
let host_views = self.views.as_host();
let len = host_views.len() / size_of::<BinaryView>();

// SAFETY: data alignment is checked for host buffers on construction
unsafe { std::slice::from_raw_parts(host_views.as_ptr().cast(), len) }
}

/// Return the buffer handle backing the views.
pub fn views_handle(&self) -> &BufferHandle {
&self.views
}

Expand All @@ -308,7 +389,8 @@ impl VarBinViewArray {
.slice(view_ref.as_range())
} else {
// Return access to the range of bytes around it.
views
self.views_handle()
.as_host()
.clone()
.into_byte_buffer()
.slice_ref(view.as_inlined().value())
Expand All @@ -329,12 +411,12 @@ impl VarBinViewArray {
self.nbuffers()
);
}
&self.buffers[idx]
self.buffers[idx].as_host()
}

/// Iterate over the underlying raw data buffers, not including the views buffer.
#[inline]
pub fn buffers(&self) -> &Arc<[ByteBuffer]> {
pub fn buffers(&self) -> &Arc<[BufferHandle]> {
&self.buffers
}

Expand Down
4 changes: 2 additions & 2 deletions vortex-array/src/arrays/varbinview/compute/cast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ impl CastKernel for VarBinViewVTable {
// SAFETY: casting just changes the DType, does not affect invariants on views/buffers.
unsafe {
Ok(Some(
VarBinViewArray::new_unchecked(
array.views().clone(),
VarBinViewArray::new_handle_unchecked(
array.views_handle().clone(),
array.buffers().clone(),
new_dtype,
new_validity,
Expand Down
Loading
Loading