diff --git a/vortex-cuda/benches/dict_cuda.rs b/vortex-cuda/benches/dict_cuda.rs index 0142cb8842d..74b7f30c0e6 100644 --- a/vortex-cuda/benches/dict_cuda.rs +++ b/vortex-cuda/benches/dict_cuda.rs @@ -101,7 +101,7 @@ fn launch_dict_kernel_timed + +// Apply patches to a source array +template +__device__ void patches( + ValueT *const values, + const IndexT *const patchIndices, + const ValueT *const patchValues, + uint64_t patchesLen +) { + const uint32_t idx = blockIdx.x * blockDim.x + threadIdx.x; + + if (idx > patchesLen) { + return; + } + + const IndexT patchIdx = patchIndices[idx]; + const ValueT patchVal = patchValues[idx]; + + const size_t valueIdx = static_cast(patchIdx); + values[valueIdx] = patchVal; +} + +#define GENERATE_PATCHES_KERNEL(ValueT, value_suffix, IndexT, index_suffix) \ +extern "C" __global__ void patches_##value_suffix##_##index_suffix( \ + ValueT *const values, \ + const IndexT *const patchIndices, \ + const ValueT *const patchValues, \ + uint64_t patchesLen \ +) { \ + patches(values, patchIndices, patchValues, patchesLen); \ +} + +#define GENERATE_PATCHES_KERNEL_FOR_VALUE(ValueT, value_suffix) \ + GENERATE_PATCHES_KERNEL(ValueT, value_suffix, uint8_t, u8) \ + GENERATE_PATCHES_KERNEL(ValueT, value_suffix, uint16_t, u16) \ + GENERATE_PATCHES_KERNEL(ValueT, value_suffix, uint32_t, u32) \ + GENERATE_PATCHES_KERNEL(ValueT, value_suffix, uint64_t, u64) + + +GENERATE_PATCHES_KERNEL_FOR_VALUE(uint8_t, u8) +GENERATE_PATCHES_KERNEL_FOR_VALUE(uint16_t, u16) +GENERATE_PATCHES_KERNEL_FOR_VALUE(uint32_t, u32) +GENERATE_PATCHES_KERNEL_FOR_VALUE(uint64_t, u64) + +GENERATE_PATCHES_KERNEL_FOR_VALUE(int8_t, i8) +GENERATE_PATCHES_KERNEL_FOR_VALUE(int16_t, i16) +GENERATE_PATCHES_KERNEL_FOR_VALUE(int32_t, i32) +GENERATE_PATCHES_KERNEL_FOR_VALUE(int64_t, i64) + +GENERATE_PATCHES_KERNEL_FOR_VALUE(float, f32) +GENERATE_PATCHES_KERNEL_FOR_VALUE(double, f64) diff --git a/vortex-cuda/src/device_buffer.rs b/vortex-cuda/src/device_buffer.rs index 92b8fe08233..520f4551e96 100644 --- a/vortex-cuda/src/device_buffer.rs +++ b/vortex-cuda/src/device_buffer.rs @@ -27,6 +27,7 @@ use crate::stream::await_stream_callback; /// A [`DeviceBuffer`] wrapping a CUDA GPU allocation. /// /// Like the host `BufferHandle` variant, all slicing/referencing works in terms of byte units. +#[derive(Clone)] pub struct CudaDeviceBuffer { allocation: Arc, /// Offset in bytes from the start of the allocation @@ -39,8 +40,6 @@ pub struct CudaDeviceBuffer { alignment: Alignment, } -// We can call the sys methods, it's just a lot of extra code...fuck that lol - mod private { use std::fmt::Debug; use std::sync::Arc; diff --git a/vortex-cuda/src/kernel/arrays/dict.rs b/vortex-cuda/src/kernel/arrays/dict.rs index c9cb27f2cd3..02b02dca2c9 100644 --- a/vortex-cuda/src/kernel/arrays/dict.rs +++ b/vortex-cuda/src/kernel/arrays/dict.rs @@ -129,7 +129,7 @@ async fn execute_dict_prim_typed(patches.clone(), output_buf, ctx).await? + }) + } else { + output_buf + }; - // Launch kernel - let _cuda_events = - launch_cuda_kernel_impl(&mut launch_builder, CU_EVENT_DISABLE_TIMING, array_len)?; + // TODO(aduffy): scatter patch values validity. There are several places we'll need to start + // handling validity. - // Build result with newly allocated buffer let output_handle = BufferHandle::new_device(Arc::new(output_buf)); Ok(Canonical::Primitive(PrimitiveArray::from_buffer_handle( output_handle, @@ -117,8 +132,10 @@ mod tests { use vortex_array::IntoArray; use vortex_array::arrays::PrimitiveArray; use vortex_array::assert_arrays_eq; - use vortex_array::validity::Validity::NonNullable; + use vortex_array::patches::Patches; + use vortex_array::validity::Validity; use vortex_buffer::Buffer; + use vortex_buffer::buffer; use vortex_error::VortexExpect; use vortex_session::VortexSession; @@ -138,13 +155,24 @@ mod tests { let encoded_data: Vec = vec![100, 200, 300, 400, 500]; let exponents = Exponents { e: 0, f: 2 }; // multiply by 100 + // Patches + let patches = Patches::new( + 5, + 0, + PrimitiveArray::new(buffer![0u32, 4u32], Validity::NonNullable).into_array(), + PrimitiveArray::new(buffer![0.0f32, 999f32], Validity::NonNullable).into_array(), + None, + ) + .unwrap(); + let alp_array = ALPArray::try_new( - PrimitiveArray::new(Buffer::from(encoded_data.clone()), NonNullable).into_array(), + PrimitiveArray::new(Buffer::from(encoded_data.clone()), Validity::NonNullable) + .into_array(), exponents, - None, + Some(patches), )?; - let cpu_result = alp_array.to_canonical()?; + let cpu_result = alp_array.to_canonical()?.into_array(); let gpu_result = ALPExecutor .execute(alp_array.to_array(), &mut cuda_ctx) @@ -154,7 +182,7 @@ mod tests { .await? .into_array(); - assert_arrays_eq!(cpu_result.into_array(), gpu_result); + assert_arrays_eq!(cpu_result, gpu_result); Ok(()) } diff --git a/vortex-cuda/src/kernel/encodings/bitpacked.rs b/vortex-cuda/src/kernel/encodings/bitpacked.rs index 635ee11a57b..c1e905aff77 100644 --- a/vortex-cuda/src/kernel/encodings/bitpacked.rs +++ b/vortex-cuda/src/kernel/encodings/bitpacked.rs @@ -2,6 +2,7 @@ // SPDX-FileCopyrightText: Copyright the Vortex contributors use std::fmt::Debug; +use std::sync::Arc; use async_trait::async_trait; use cudarc::driver::DeviceRepr; @@ -16,6 +17,8 @@ use vortex_array::buffer::DeviceBufferExt; use vortex_cuda_macros::cuda_tests; use vortex_dtype::NativePType; use vortex_dtype::match_each_integer_ptype; +use vortex_dtype::match_each_unsigned_integer_ptype; +use vortex_error::VortexExpect; use vortex_error::VortexResult; use vortex_error::vortex_ensure; use vortex_error::vortex_err; @@ -29,6 +32,7 @@ use crate::CudaDeviceBuffer; use crate::executor::CudaExecute; use crate::executor::CudaExecutionCtx; use crate::kernel::launch_cuda_kernel_with_config; +use crate::kernel::patches::execute_patches; /// CUDA decoder for ALP (Adaptive Lossless floating-Point) decompression. #[derive(Debug)] @@ -74,7 +78,6 @@ where } = array.into_parts(); vortex_ensure!(len > 0, "Non empty array"); - vortex_ensure!(patches.is_none(), "Patches not supported"); let offset = offset as usize; let device_input: BufferHandle = if packed.is_on_device() { @@ -97,27 +100,46 @@ where let thread_count = if bits == 64 { 16 } else { 32 }; let suffixes: [&str; _] = [&format!("{bit_width}bw"), &format!("{thread_count}t")]; let cuda_function = ctx.load_function(&format!("bit_unpack_{}", bits), &suffixes)?; - let mut launch_builder = ctx.launch_builder(&cuda_function); - // Build launch args: input, output, f, e, length - launch_builder.arg(&input_view); - launch_builder.arg(&output_view); + { + let mut launch_builder = ctx.launch_builder(&cuda_function); - let num_blocks = u32::try_from(len.div_ceil(1024))?; + // Build launch args: input, output, f, e, length + launch_builder.arg(&input_view); + launch_builder.arg(&output_view); - let config = LaunchConfig { - grid_dim: (num_blocks, 1, 1), - block_dim: (thread_count, 1, 1), - shared_mem_bytes: 0, - }; + let num_blocks = u32::try_from(len.div_ceil(1024))?; + + let config = LaunchConfig { + grid_dim: (num_blocks, 1, 1), + block_dim: (thread_count, 1, 1), + shared_mem_bytes: 0, + }; - // Launch kernel - let _cuda_events = - launch_cuda_kernel_with_config(&mut launch_builder, config, CU_EVENT_DISABLE_TIMING)?; + // Launch kernel + let _cuda_events = + launch_cuda_kernel_with_config(&mut launch_builder, config, CU_EVENT_DISABLE_TIMING)?; + } + + let output_handle = match patches { + None => BufferHandle::new_device(output_buf.slice_typed::(offset..(offset + len))), + Some(p) => { + let output_buf = output_buf.slice_typed::(offset..(offset + len)); + let buf = output_buf + .as_any() + .downcast_ref::() + .vortex_expect("we created this as CudaDeviceBuffer") + .clone(); + + let patched_buf = match_each_unsigned_integer_ptype!(p.indices_ptype()?, |I| { + execute_patches::(p, buf, ctx).await? + }); + + BufferHandle::new_device(Arc::new(patched_buf)) + } + }; // Build result with newly allocated buffer - let output_handle = - BufferHandle::new_device(output_buf.slice_typed::(offset..(offset + len))); Ok(Canonical::Primitive(PrimitiveArray::from_buffer_handle( output_handle, A::PTYPE, @@ -141,6 +163,34 @@ mod tests { use crate::CanonicalCudaExt; use crate::session::CudaSession; + #[test] + fn test_patches() -> VortexResult<()> { + let mut cuda_ctx = CudaSession::create_execution_ctx(&VortexSession::empty()) + .vortex_expect("failed to create execution context"); + + let array = PrimitiveArray::new((0u16..=513).collect::>(), NonNullable); + + // Last two items should be patched + let bp_with_patches = BitPackedArray::encode(array.as_ref(), 9)?; + assert!(bp_with_patches.patches().is_some()); + + let cpu_result = bp_with_patches.to_canonical()?.into_array(); + + let gpu_result = block_on(async { + BitPackedExecutor + .execute(bp_with_patches.to_array(), &mut cuda_ctx) + .await + .vortex_expect("GPU decompression failed") + .into_host() + .await + .map(|a| a.into_array()) + })?; + + assert_arrays_eq!(cpu_result, gpu_result); + + Ok(()) + } + #[rstest] #[case::bw_1(1)] #[case::bw_2(2)] diff --git a/vortex-cuda/src/kernel/mod.rs b/vortex-cuda/src/kernel/mod.rs index 51ba8cc55bd..afba4453db8 100644 --- a/vortex-cuda/src/kernel/mod.rs +++ b/vortex-cuda/src/kernel/mod.rs @@ -23,6 +23,7 @@ use vortex_utils::aliases::dash_map::DashMap; mod arrays; mod encodings; mod filter; +mod patches; mod slice; pub use arrays::DictExecutor; @@ -65,7 +66,7 @@ macro_rules! launch_cuda_kernel { array_len: $len:expr ) => {{ use ::cudarc::driver::PushKernelArg as _; - let cuda_function = $ctx.load_function($module, $ptypes)?; + let cuda_function = $ctx.load_function_ptype($module, $ptypes)?; let mut launch_builder = $ctx.launch_builder(&cuda_function); $( diff --git a/vortex-cuda/src/kernel/patches/mod.rs b/vortex-cuda/src/kernel/patches/mod.rs new file mode 100644 index 00000000000..9922c7bf56d --- /dev/null +++ b/vortex-cuda/src/kernel/patches/mod.rs @@ -0,0 +1,217 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +use cudarc::driver::DeviceRepr; +use cudarc::driver::sys::CUevent_flags::CU_EVENT_DISABLE_TIMING; +use vortex_array::arrays::PrimitiveArrayParts; +use vortex_array::patches::Patches; +use vortex_array::validity::Validity; +use vortex_array::vtable::ValidityHelper; +use vortex_cuda_macros::cuda_tests; +use vortex_dtype::NativePType; +use vortex_error::VortexResult; +use vortex_error::vortex_ensure; + +use crate::CudaBufferExt; +use crate::CudaDeviceBuffer; +use crate::CudaExecutionCtx; +use crate::executor::CudaArrayExt; +use crate::launch_cuda_kernel; + +/// Apply a set of patches in-place onto a [`CudaDeviceBuffer`] holding `ValuesT`. +pub(crate) async fn execute_patches< + ValuesT: NativePType + DeviceRepr, + IndicesT: NativePType + DeviceRepr, +>( + patches: Patches, + target: CudaDeviceBuffer, + ctx: &mut CudaExecutionCtx, +) -> VortexResult { + let indices = patches.indices().clone(); + let values = patches.values().clone(); + drop(patches); + + let indices = indices.execute_cuda(ctx).await?.into_primitive(); + let values = values.execute_cuda(ctx).await?.into_primitive(); + + let supported = matches!( + values.validity(), + Validity::NonNullable | Validity::AllValid + ); + vortex_ensure!( + supported, + "Applying patches with null values not currently supported on the GPU" + ); + + vortex_ensure!( + indices.ptype() == IndicesT::PTYPE, + "expected PType {} for patch indices, was {}", + IndicesT::PTYPE, + indices.ptype() + ); + + vortex_ensure!( + values.ptype() == ValuesT::PTYPE, + "expected PType {} for patch values, was {}", + ValuesT::PTYPE, + values.ptype() + ); + + let patches_len = indices.len(); + let patches_len_u64 = patches_len as u64; + + let PrimitiveArrayParts { + buffer: indices_buffer, + .. + } = indices.into_parts(); + + let PrimitiveArrayParts { + buffer: values_buffer, + .. + } = values.into_parts(); + + let d_patch_indices = if indices_buffer.is_on_device() { + indices_buffer + } else { + ctx.move_to_device::(indices_buffer)?.await? + }; + + let d_patch_values = if values_buffer.is_on_device() { + values_buffer + } else { + ctx.move_to_device::(values_buffer)?.await? + }; + + let d_target_view = target.as_view::(); + let d_patch_indices_view = d_patch_indices.cuda_view::()?; + let d_patch_values_view = d_patch_values.cuda_view::()?; + + // kernel arg order for patches is values, patchIndices, patchValues, patchesLen + let _events = launch_cuda_kernel!( + execution_ctx: ctx, + module: "patches", + ptypes: &[ValuesT::PTYPE, IndicesT::PTYPE], + launch_args: [ + d_target_view, + d_patch_indices_view, + d_patch_values_view, + patches_len_u64, + ], + event_recording: CU_EVENT_DISABLE_TIMING, + array_len: patches_len + ); + + Ok(target) +} + +#[cuda_tests] +mod tests { + use std::sync::Arc; + + use cudarc::driver::DeviceRepr; + use vortex_array::IntoArray; + use vortex_array::ToCanonical; + use vortex_array::arrays::PrimitiveArray; + use vortex_array::arrays::PrimitiveArrayParts; + use vortex_array::assert_arrays_eq; + use vortex_array::buffer::BufferHandle; + use vortex_array::compute::cast; + use vortex_array::patches::Patches; + use vortex_array::validity::Validity; + use vortex_buffer::buffer; + use vortex_dtype::DType; + use vortex_dtype::NativePType; + use vortex_dtype::Nullability; + use vortex_session::VortexSession; + + use crate::CanonicalCudaExt; + use crate::CudaDeviceBuffer; + use crate::CudaSession; + use crate::kernel::patches::execute_patches; + + #[tokio::test] + async fn test_patches() { + test_case::().await; + test_case::().await; + test_case::().await; + test_case::().await; + + test_case::().await; + test_case::().await; + test_case::().await; + test_case::().await; + + test_case::().await; + test_case::().await; + } + + async fn test_case() { + full_test_case::().await; + full_test_case::().await; + full_test_case::().await; + full_test_case::().await; + } + + async fn full_test_case() { + let mut ctx = CudaSession::create_execution_ctx(&VortexSession::empty()).unwrap(); + + let values = PrimitiveArray::from_iter(0..128); + let values = force_cast::(values); + + let patch_idx = PrimitiveArray::new(buffer![0, 8, 16, 32], Validity::NonNullable); + let patch_idx = force_cast::(patch_idx); + + let patch_val = PrimitiveArray::new(buffer![99, 99, 99, 99], Validity::NonNullable); + let patch_val = force_cast::(patch_val); + + // Copy all to GPU + let patches = + Patches::new(128, 0, patch_idx.into_array(), patch_val.into_array(), None).unwrap(); + + let cpu_result = values.clone().patch(&patches).unwrap(); + + let PrimitiveArrayParts { + buffer: cuda_buffer, + .. + } = values.into_parts(); + + let handle = ctx + .move_to_device::(cuda_buffer) + .unwrap() + .await + .unwrap(); + let device_buf = handle + .as_device() + .as_any() + .downcast_ref::() + .unwrap() + .clone(); + + let patched_buf = execute_patches::(patches, device_buf, &mut ctx) + .await + .unwrap(); + + let gpu_result = PrimitiveArray::from_buffer_handle( + BufferHandle::new_device(Arc::new(patched_buf)), + Values::PTYPE, + Validity::NonNullable, + ) + .to_canonical() + .unwrap() + .into_host() + .await + .unwrap() + .into_primitive(); + + assert_arrays_eq!(cpu_result, gpu_result); + } + + fn force_cast(array: PrimitiveArray) -> PrimitiveArray { + cast( + array.as_ref(), + &DType::Primitive(T::PTYPE, Nullability::NonNullable), + ) + .unwrap() + .to_primitive() + } +}