diff --git a/Cargo.lock b/Cargo.lock index 098a7440a..e7d0dfc8c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -28,6 +28,7 @@ checksum = "320119579fcad9c21884f5c4861d16174d0e06250625266f50fe6898340abefa" name = "admin-api" version = "0.1.0" dependencies = [ + "amp-data-store", "async-trait", "axum 0.8.7", "common", @@ -921,6 +922,26 @@ dependencies = [ "uuid", ] +[[package]] +name = "amp-data-store" +version = "0.1.0" +dependencies = [ + "amp-object-store", + "bytes", + "datafusion", + "datasets-common", + "foyer", + "futures", + "metadata-db", + "object_store", + "rand 0.9.2", + "serde", + "serde_json", + "thiserror 2.0.17", + "url", + "uuid", +] + [[package]] name = "amp-object-store" version = "0.1.0" @@ -992,6 +1013,7 @@ dependencies = [ name = "ampd" version = "0.1.0" dependencies = [ + "amp-data-store", "amp-object-store", "clap", "common", @@ -2629,7 +2651,7 @@ name = "common" version = "0.1.0" dependencies = [ "alloy", - "amp-object-store", + "amp-data-store", "async-stream", "async-trait", "axum 0.8.7", @@ -2640,9 +2662,6 @@ dependencies = [ "datafusion-datasource", "datafusion-tracing", "datasets-common", - "figment", - "foyer", - "fs-err", "futures", "governor 0.10.4", "indoc", @@ -2845,6 +2864,7 @@ name = "controller" version = "0.1.0" dependencies = [ "admin-api", + "amp-data-store", "async-trait", "axum 0.8.7", "common", @@ -4028,6 +4048,7 @@ dependencies = [ name = "datasets-derived" version = "0.1.0" dependencies = [ + "amp-data-store", "common", "datafusion", "datasets-common", @@ -4387,6 +4408,7 @@ name = "dump" version = "0.1.0" dependencies = [ "alloy", + "amp-data-store", "async-stream", "common", "datafusion", @@ -9111,6 +9133,7 @@ dependencies = [ name = "server" version = "0.1.0" dependencies = [ + "amp-data-store", "arrow-flight", "async-stream", "async-trait", @@ -11882,6 +11905,7 @@ version = "0.1.0" dependencies = [ "alloy", "amp-client", + "amp-data-store", "amp-object-store", "ampctl", "arrow-flight", @@ -13591,6 +13615,7 @@ checksum = "f17a85883d4e6d00e8a97c586de764dabcc06133f7f1d55dce5cdc070ad7fe59" name = "worker" version = "0.1.0" dependencies = [ + "amp-data-store", "backon", "chrono", "common", diff --git a/Cargo.toml b/Cargo.toml index d50acf5cd..35a0b0758 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,7 @@ members = [ "crates/bin/ampup", "crates/clients/flight", "crates/core/common", + "crates/core/data-store", "crates/core/dataset-store", "crates/core/datasets-common", "crates/core/datasets-derived", diff --git a/crates/bin/ampd/Cargo.toml b/crates/bin/ampd/Cargo.toml index 5f4357427..f04b833cd 100644 --- a/crates/bin/ampd/Cargo.toml +++ b/crates/bin/ampd/Cargo.toml @@ -14,6 +14,7 @@ console-subscriber = ["dep:console-subscriber"] snmalloc = ["dep:snmalloc-rs"] [dependencies] +amp-data-store = { path = "../../core/data-store" } amp-object-store = { path = "../../core/object-store" } clap.workspace = true common = { path = "../../core/common" } diff --git a/crates/bin/ampd/src/controller_cmd.rs b/crates/bin/ampd/src/controller_cmd.rs index d39c431f1..aecf8b888 100644 --- a/crates/bin/ampd/src/controller_cmd.rs +++ b/crates/bin/ampd/src/controller_cmd.rs @@ -1,7 +1,8 @@ use std::{net::SocketAddr, sync::Arc}; +use amp_data_store::DataStore; use amp_object_store::ObjectStoreCreationError; -use common::{BoxError, store::Store}; +use common::BoxError; use config::Config as CommonConfig; use controller::config::Config; use dataset_store::{ @@ -16,8 +17,12 @@ pub async fn run(config: CommonConfig, meter: Option, at: SocketAddr) -> .await .map_err(|err| Error::MetadataDbConnection(Box::new(err)))?; - let data_store = Store::new(metadata_db.clone(), config.data_store_url.clone()) - .map_err(Error::DataStoreCreation)?; + let data_store = DataStore::new( + metadata_db.clone(), + config.data_store_url.clone(), + config.parquet.cache_size_mb, + ) + .map_err(Error::DataStoreCreation)?; let dataset_store = { let provider_configs_store = ProviderConfigsStore::new( diff --git a/crates/bin/ampd/src/server_cmd.rs b/crates/bin/ampd/src/server_cmd.rs index 8c074aca3..51742fbf3 100644 --- a/crates/bin/ampd/src/server_cmd.rs +++ b/crates/bin/ampd/src/server_cmd.rs @@ -1,7 +1,8 @@ use std::sync::Arc; +use amp_data_store::DataStore; use amp_object_store::ObjectStoreCreationError; -use common::{BoxError, store::Store}; +use common::BoxError; use config::{Addrs, Config as CommonConfig}; use dataset_store::{ DatasetStore, manifests::DatasetManifestsStore, providers::ProviderConfigsStore, @@ -21,8 +22,12 @@ pub async fn run( .await .map_err(|err| Error::MetadataDbConnection(Box::new(err)))?; - let data_store = Store::new(metadata_db.clone(), config.data_store_url.clone()) - .map_err(Error::DataStoreCreation)?; + let data_store = DataStore::new( + metadata_db.clone(), + config.data_store_url.clone(), + config.parquet.cache_size_mb, + ) + .map_err(Error::DataStoreCreation)?; let dataset_store = { let provider_configs_store = ProviderConfigsStore::new( @@ -146,6 +151,5 @@ pub fn config_from_common(config: &CommonConfig) -> ServerConfig { max_mem_mb: config.max_mem_mb, query_max_mem_mb: config.query_max_mem_mb, spill_location: config.spill_location.clone(), - parquet_cache_size_mb: config.parquet.cache_size_mb, } } diff --git a/crates/bin/ampd/src/solo_cmd.rs b/crates/bin/ampd/src/solo_cmd.rs index 43828fc3d..390a51f90 100644 --- a/crates/bin/ampd/src/solo_cmd.rs +++ b/crates/bin/ampd/src/solo_cmd.rs @@ -1,7 +1,8 @@ use std::{future::Future, pin::Pin, sync::Arc}; +use amp_data_store::DataStore; use amp_object_store::ObjectStoreCreationError; -use common::{BoxError, store::Store}; +use common::BoxError; use config::Config as CommonConfig; use dataset_store::{ DatasetStore, manifests::DatasetManifestsStore, providers::ProviderConfigsStore, @@ -24,8 +25,12 @@ pub async fn run( .await .map_err(|err| Error::MetadataDbConnection(Box::new(err)))?; - let data_store = Store::new(metadata_db.clone(), config.data_store_url.clone()) - .map_err(Error::DataStoreCreation)?; + let data_store = DataStore::new( + metadata_db.clone(), + config.data_store_url.clone(), + config.parquet.cache_size_mb, + ) + .map_err(Error::DataStoreCreation)?; let dataset_store = { let provider_configs_store = ProviderConfigsStore::new( diff --git a/crates/bin/ampd/src/worker_cmd.rs b/crates/bin/ampd/src/worker_cmd.rs index 540777bb4..c2fa0cfd1 100644 --- a/crates/bin/ampd/src/worker_cmd.rs +++ b/crates/bin/ampd/src/worker_cmd.rs @@ -1,5 +1,5 @@ +use amp_data_store::DataStore; use amp_object_store::ObjectStoreCreationError; -use common::store::Store; use config::Config; use dataset_store::{ DatasetStore, manifests::DatasetManifestsStore, providers::ProviderConfigsStore, @@ -13,8 +13,12 @@ pub async fn run(config: Config, meter: Option, node_id: NodeId) -> Resul .await .map_err(|err| Error::MetadataDbConnection(Box::new(err)))?; - let data_store = Store::new(metadata_db.clone(), config.data_store_url.clone()) - .map_err(Error::DataStoreCreation)?; + let data_store = DataStore::new( + metadata_db.clone(), + config.data_store_url.clone(), + config.parquet.cache_size_mb, + ) + .map_err(Error::DataStoreCreation)?; let dataset_store = { let provider_configs_store = ProviderConfigsStore::new( diff --git a/crates/config/src/lib.rs b/crates/config/src/lib.rs index f40f89acc..9b29d03a7 100644 --- a/crates/config/src/lib.rs +++ b/crates/config/src/lib.rs @@ -314,7 +314,6 @@ impl Config { self.max_mem_mb, self.query_max_mem_mb, &self.spill_location, - self.parquet.cache_size_mb, ) } diff --git a/crates/core/common/Cargo.toml b/crates/core/common/Cargo.toml index 0f1dcd65e..428ee3f90 100644 --- a/crates/core/common/Cargo.toml +++ b/crates/core/common/Cargo.toml @@ -6,7 +6,7 @@ license-file.workspace = true [dependencies] alloy.workspace = true -amp-object-store = { path = "../object-store" } +amp-data-store = { path = "../data-store" } async-stream.workspace = true async-trait.workspace = true axum.workspace = true @@ -17,9 +17,6 @@ datafusion.workspace = true datafusion-datasource.workspace = true datafusion-tracing.workspace = true datasets-common = { path = "../datasets-common" } -figment.workspace = true -foyer = "0.21" -fs-err.workspace = true futures.workspace = true governor.workspace = true indoc.workspace = true diff --git a/crates/core/common/src/catalog/physical.rs b/crates/core/common/src/catalog/physical.rs index 1d94192e2..ad423a195 100644 --- a/crates/core/common/src/catalog/physical.rs +++ b/crates/core/common/src/catalog/physical.rs @@ -1,5 +1,9 @@ use std::{ops::RangeInclusive, sync::Arc}; +use amp_data_store::{ + DataStore, + physical_table::{PhyTablePath, PhyTableRevisionPath, PhyTableUrl}, +}; use datafusion::{ arrow::datatypes::SchemaRef, catalog::{Session, memory::DataSourceExec}, @@ -17,7 +21,7 @@ use datafusion::{ prelude::Expr, }; use datafusion_datasource::compute_all_files_statistics; -use datasets_common::{hash_reference::HashReference, name::Name, table_name::TableName}; +use datasets_common::{hash_reference::HashReference, table_name::TableName}; use futures::{Stream, StreamExt, TryStreamExt, stream, stream::BoxStream}; use metadata_db::LocationId; use object_store::{ObjectMeta, ObjectStore, path::Path}; @@ -33,12 +37,8 @@ use crate::{ segments::{BlockRange, Chain, Segment, canonical_chain, missing_ranges}, }, sql::TableReference, - store::{CachedStore, Store}, }; -/// Path delimiter used in object store paths. -const PATH_DELIMITER: char = '/'; - #[derive(Debug, Clone)] pub struct Catalog { tables: Vec>, @@ -74,9 +74,7 @@ impl Catalog { let mut earliest = None; for table in &self.tables { // Create a snapshot to get synced range - // Use empty cache (0 bytes) since we're only checking metadata - let dummy_cached_store = CachedStore::new(table.store.clone(), 0); - let snapshot = table.snapshot(false, dummy_cached_store).await?; + let snapshot = table.snapshot(false, table.store.clone()).await?; let synced_range = snapshot.synced_range(); match (earliest, &synced_range) { (None, Some(range)) => earliest = Some(range.start()), @@ -97,7 +95,7 @@ impl CatalogSnapshot { pub async fn from_catalog( catalog: Catalog, ignore_canonical_segments: bool, - store: CachedStore, + store: DataStore, ) -> Result { let mut table_snapshots = Vec::new(); for physical_table in &catalog.tables { @@ -148,7 +146,7 @@ impl CatalogSnapshot { /// table. Only active revisions are used for query execution. #[tracing::instrument(skip_all, fields(table = %table), err)] pub async fn register_new_table_revision( - store: Store, + store: DataStore, dataset: HashReference, table: ResolvedTable, ) -> Result { @@ -180,7 +178,7 @@ pub async fn register_new_table_revision( /// This error type is used by `register_new_table_revision()`. #[derive(Debug, thiserror::Error)] #[error("Failed to register and activate new table revision")] -pub struct RegisterNewTableRevisionError(#[source] pub crate::store::RegisterTableRevisionError); +pub struct RegisterNewTableRevisionError(#[source] pub amp_data_store::RegisterTableRevisionError); #[derive(Debug, Clone)] pub struct PhysicalTable { @@ -197,7 +195,7 @@ pub struct PhysicalTable { /// Location ID in the metadata database. location_id: LocationId, /// Data store for accessing metadata database and object storage. - store: Store, + store: DataStore, } // Methods for creating and managing PhysicalTable instances @@ -247,7 +245,7 @@ impl PhysicalTable { /// partway through, the revision will be marked as active but with incomplete file metadata. /// Only the physical table registration (steps 3-4) is transactional. pub async fn restore_latest_revision( - store: Store, + store: DataStore, dataset: &HashReference, table: &ResolvedTable, ) -> Result, RestoreLatestRevisionError> { @@ -286,7 +284,9 @@ impl PhysicalTable { let (file_name, amp_meta, footer) = amp_metadata_from_parquet_file(&store, &object_meta) .await - .map_err(RestoreLatestRevisionError::ReadParquetMetadata)?; + .map_err(|e: BoxError| { + RestoreLatestRevisionError::ReadParquetMetadata(e) + })?; let parquet_meta_json = serde_json::to_value(amp_meta) .map_err(RestoreLatestRevisionError::SerializeMetadata)?; @@ -342,7 +342,10 @@ impl PhysicalTable { /// /// Returns the PhysicalTable if an active revision exists, or None if no active /// revision is found in the metadata database. - pub async fn get_active(store: Store, table: ResolvedTable) -> Result, BoxError> { + pub async fn get_active( + store: DataStore, + table: ResolvedTable, + ) -> Result, BoxError> { let manifest_hash = table.dataset().manifest_hash(); let table_name = table.name(); @@ -468,7 +471,7 @@ impl PhysicalTable { /// Returns a stream of object metadata for each file in the table's directory. pub fn list_files( &self, - ) -> BoxStream<'_, Result> + ) -> BoxStream<'_, Result> { self.store.stream_revision_files_in_object_store(&self.path) } @@ -525,7 +528,7 @@ impl PhysicalTable { pub async fn snapshot( &self, ignore_canonical_segments: bool, - store: CachedStore, + store: DataStore, ) -> Result { let canonical_segments = if ignore_canonical_segments { self.segments().await? @@ -782,14 +785,14 @@ pub enum RestoreLatestRevisionError { /// This occurs when the object store cannot be queried for existing revisions, /// typically due to network issues, permission errors, or storage unavailability. #[error("Failed to find latest revision from object store")] - FindLatestRevision(#[source] crate::store::FindLatestTableRevisionInObjectStoreError), + FindLatestRevision(#[source] amp_data_store::FindLatestTableRevisionInObjectStoreError), /// Failed to register revision in metadata database /// /// This occurs when the metadata database transaction for registering the /// physical table and marking it as active fails. #[error("Failed to register revision in metadata database")] - RegisterRevision(#[source] crate::store::RegisterTableRevisionError), + RegisterRevision(#[source] amp_data_store::RegisterTableRevisionError), /// Failed to begin transaction for marking table active #[error("Failed to begin transaction")] @@ -815,7 +818,7 @@ pub enum RestoreLatestRevisionError { /// Failed to list files in the restored revision #[error("Failed to list files in restored revision")] - ListFiles(#[source] crate::store::ListRevisionFilesInObjectStoreError), + ListFiles(#[source] amp_data_store::ListRevisionFilesInObjectStoreError), /// Failed to read Amp metadata from parquet file /// @@ -834,261 +837,3 @@ pub enum RestoreLatestRevisionError { #[error("Failed to register file in metadata database")] RegisterFile(#[source] metadata_db::Error), } - -/// Physical table URL _new-type_ wrapper -/// -/// Represents a base directory URL in the object store containing all parquet files for a table. -/// Individual file URLs are constructed by appending the filename to this base URL. -/// -/// ## URL Format -/// -/// `////` -/// -/// Where: -/// - `store_base_url`: Object store base URL, may include path prefix after bucket -/// (e.g., `s3://bucket/prefix`, `file:///data/subdir`) -/// - `dataset_name`: Dataset name (without namespace) -/// - `table_name`: Table name -/// - `revision_id`: Unique identifier for this table revision (typically UUIDv7) -/// -/// ## Example -/// -/// ```text -/// s3://my-bucket/prefix/ethereum_mainnet/logs/01234567-89ab-cdef-0123-456789abcdef/ -/// ``` -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct PhyTableUrl(Url); - -impl PhyTableUrl { - /// Constructs a table URL from a base URL and revision path. - /// - /// The URL follows the structure: `//` - /// - /// Where: - /// - `base_url`: Object store root URL (e.g., `s3://bucket/prefix/`, `file:///data/`) - /// - `revision_path`: Complete path to the table revision (dataset_name/table_name/revision_uuid) - pub fn new(base_url: &Url, revision_path: &PhyTableRevisionPath) -> Self { - // SAFETY: Path components (Name, TableName, Uuid) contain only URL-safe characters - let raw_url = base_url - .join(&format!("{}/", revision_path)) - .expect("path is URL-safe"); - PhyTableUrl(raw_url) - } - - /// Get the URL as a string slice - pub fn as_str(&self) -> &str { - self.0.as_str() - } - - /// Get a reference to the inner [`Url`] - pub fn inner(&self) -> &Url { - &self.0 - } -} - -impl std::str::FromStr for PhyTableUrl { - type Err = PhyTableUrlParseError; - - fn from_str(s: &str) -> Result { - let url = s.parse().map_err(|err| PhyTableUrlParseError { - url: s.to_string(), - source: err, - })?; - Ok(PhyTableUrl(url)) - } -} - -impl std::fmt::Display for PhyTableUrl { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}", self.0.as_str()) - } -} - -/// Path to a table directory in object storage (without revision). -/// -/// Represents the parent directory containing all revisions of a table. -/// Format: `/` -/// -/// **NOTE**: The underlying [`object_store::Path`] type automatically strips leading and -/// trailing slashes, so the string representation will not contain a trailing slash. -/// -/// ## Example -/// -/// ```text -/// ethereum_mainnet/logs -/// ``` -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct PhyTablePath(Path); - -impl PhyTablePath { - /// Constructs the path to a table directory (without revision). - pub fn new(dataset_name: impl AsRef, table_name: impl AsRef) -> Self { - Self(format!("{}/{}", dataset_name.as_ref(), table_name.as_ref()).into()) - } - - /// Create a revision path by appending the given revision ID to this table path. - pub fn with_revision(&self, revision_id: impl AsRef) -> PhyTableRevisionPath { - let path = self.0.child(revision_id.as_ref().to_string()); - PhyTableRevisionPath(path) - } - - /// Get a reference to the underlying [`object_store::path::Path`] - pub fn as_object_store_path(&self) -> &Path { - &self.0 - } - - /// Get the path as a string slice - pub fn as_str(&self) -> &str { - self.0.as_ref() - } -} - -impl std::fmt::Display for PhyTablePath { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - self.0.fmt(f) - } -} - -impl std::ops::Deref for PhyTablePath { - type Target = Path; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -/// Path to a table revision directory in object storage. -/// -/// Represents a specific revision of a table, identified by a UUID. -/// Format: `//` -/// -/// **NOTE**: The underlying [`object_store::Path`] type automatically strips leading and -/// trailing slashes, so the string representation will not contain a trailing slash. -/// -/// ## Example -/// -/// ```text -/// ethereum_mainnet/logs/01234567-89ab-cdef-0123-456789abcdef -/// ``` -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct PhyTableRevisionPath(Path); - -impl PhyTableRevisionPath { - /// Constructs the path to a table revision directory. - pub fn new( - dataset_name: impl AsRef, - table_name: impl AsRef, - revision_id: impl AsRef, - ) -> Self { - Self( - format!( - "{}/{}/{}", - dataset_name.as_ref(), - table_name.as_ref(), - revision_id.as_ref() - ) - .into(), - ) - } - - /// Get a reference to the underlying [`Path`] - pub fn as_object_store_path(&self) -> &Path { - &self.0 - } - - /// Get the path as a string slice - pub fn as_str(&self) -> &str { - self.0.as_ref() - } -} - -impl std::str::FromStr for PhyTableRevisionPath { - type Err = PhyTableRevisionPathError; - - fn from_str(s: &str) -> Result { - let mut parts = s.trim_end_matches(PATH_DELIMITER).split(PATH_DELIMITER); - - let revision_uuid: Uuid = parts - .next_back() - .filter(|s| !s.is_empty()) - .ok_or(PhyTableRevisionPathError::NotEnoughComponents(0))? - .parse() - .map_err(PhyTableRevisionPathError::InvalidRevisionUuid)?; - - let table_name: TableName = parts - .next_back() - .filter(|s| !s.is_empty()) - .ok_or(PhyTableRevisionPathError::NotEnoughComponents(1))? - .parse() - .map_err(PhyTableRevisionPathError::InvalidTableName)?; - - let dataset_name: Name = parts - .next_back() - .filter(|s| !s.is_empty()) - .ok_or(PhyTableRevisionPathError::NotEnoughComponents(2))? - .parse() - .map_err(PhyTableRevisionPathError::InvalidDatasetName)?; - - Ok(Self::new(dataset_name, table_name, revision_uuid)) - } -} - -impl std::fmt::Display for PhyTableRevisionPath { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - self.0.fmt(f) - } -} - -impl std::ops::Deref for PhyTableRevisionPath { - type Target = Path; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -/// Error when parsing a path into a [`PhyTableRevisionPath`] -#[derive(Debug, thiserror::Error)] -pub enum PhyTableRevisionPathError { - #[error("path must have at least 3 components, got {0}")] - NotEnoughComponents(usize), - - #[error("invalid dataset name")] - InvalidDatasetName(#[source] datasets_common::name::NameError), - - #[error("invalid table name")] - InvalidTableName(#[source] datasets_common::table_name::TableNameError), - - #[error("invalid revision UUID")] - InvalidRevisionUuid(#[source] uuid::Error), -} - -/// Error type for PhyTableUrl parsing -#[derive(Debug, thiserror::Error)] -#[error("invalid object store URL '{url}'")] -pub struct PhyTableUrlParseError { - url: String, - #[source] - source: url::ParseError, -} - -impl From for metadata_db::physical_table::TablePathOwned { - fn from(value: PhyTableRevisionPath) -> Self { - metadata_db::physical_table::TablePath::from_owned_unchecked(value.as_str().to_owned()) - } -} - -impl<'a> From<&'a PhyTableRevisionPath> for metadata_db::physical_table::TablePath<'a> { - fn from(value: &'a PhyTableRevisionPath) -> Self { - metadata_db::physical_table::TablePath::from_ref_unchecked(value.as_str()) - } -} - -impl From for PhyTableRevisionPath { - fn from(value: metadata_db::physical_table::TablePathOwned) -> Self { - value - .as_str() - .parse() - .expect("database path should be a valid revision path") - } -} diff --git a/crates/core/common/src/catalog/reader.rs b/crates/core/common/src/catalog/reader.rs index b666d984d..346f69165 100644 --- a/crates/core/common/src/catalog/reader.rs +++ b/crates/core/common/src/catalog/reader.rs @@ -1,5 +1,6 @@ use std::{ops::Range, sync::Arc}; +use amp_data_store::{CachedParquetData, DataStore}; use bytes::Bytes; use datafusion::{ arrow::datatypes::SchemaRef, @@ -18,15 +19,12 @@ use datafusion::{ use futures::future::BoxFuture; use metadata_db::{LocationId, files::FileId}; -use crate::{ - BoxError, - store::{CachedParquetData, CachedStore}, -}; +use crate::BoxError; #[derive(Debug, Clone)] pub struct AmpReaderFactory { pub location_id: LocationId, - pub store: CachedStore, + pub store: DataStore, pub schema: SchemaRef, } @@ -35,7 +33,7 @@ impl AmpReaderFactory { self.store .get_cached_parquet_metadata(file, self.schema.clone()) .await - .map_err(|e| e.into()) + .map_err(|err| -> BoxError { err.into() }) } } @@ -81,7 +79,7 @@ pub struct AmpReader { pub file_id: FileId, pub file_metrics: ParquetFileMetrics, pub inner: ParquetObjectReader, - pub store: CachedStore, + pub store: DataStore, pub schema: SchemaRef, } @@ -114,7 +112,9 @@ impl AsyncFileReader for AmpReader { .get_cached_parquet_metadata(file_id, schema) .await .map(|cached| cached.metadata) - .map_err(|err| ParquetError::External(err.into())) + .map_err(|err: amp_data_store::GetCachedMetadataError| { + ParquetError::External(err.into()) + }) }) } } diff --git a/crates/core/common/src/catalog/sql.rs b/crates/core/common/src/catalog/sql.rs index cf362629a..31df84798 100644 --- a/crates/core/common/src/catalog/sql.rs +++ b/crates/core/common/src/catalog/sql.rs @@ -69,6 +69,7 @@ use std::{ sync::Arc, }; +use amp_data_store::DataStore; use datafusion::{logical_expr::ScalarUDF, sql::parser::Statement}; use datasets_common::{ func_name::ETH_CALL_FUNCTION_NAME, hash::Hash, partial_reference::PartialReference, @@ -83,7 +84,7 @@ use super::{ physical::{Catalog, PhysicalTable}, }; use crate::{ - PlanningContext, ResolvedTable, Store, + PlanningContext, ResolvedTable, query_context::QueryEnv, sql::{ FunctionReference, TableReference, resolve_function_references, resolve_table_references, @@ -113,7 +114,7 @@ use crate::{ /// 4. Constructs physical catalog for query execution pub async fn catalog_for_sql( dataset_store: &impl DatasetAccess, - data_store: &Store, + data_store: &DataStore, query: &Statement, env: QueryEnv, ) -> Result { diff --git a/crates/core/common/src/lib.rs b/crates/core/common/src/lib.rs index e12f439ed..ed0543871 100644 --- a/crates/core/common/src/lib.rs +++ b/crates/core/common/src/lib.rs @@ -10,7 +10,6 @@ pub mod planning_context; pub mod query_context; pub mod sql; pub mod sql_str; -pub mod store; pub mod stream_helpers; pub mod utils; @@ -29,14 +28,11 @@ use datafusion::arrow::{ error::ArrowError, }; pub use datafusion::{arrow, parquet}; -pub use foyer::Cache; use futures::{Stream, StreamExt}; use metadata::segments::BlockRange; -use metadata_db::files::FileId; pub use planning_context::{DetachedLogicalPlan, PlanningContext}; pub use query_context::{Error as QueryError, QueryContext}; use serde::{Deserialize, Serialize}; -pub use store::{CachedStore, Store}; pub type BoxError = Box; pub type BoxResult = Result; @@ -59,9 +55,6 @@ pub type EvmAddressArrayType = FixedSizeBinaryArray; /// Payment amount in the EVM. Used for gas or value transfers. pub const EVM_CURRENCY_TYPE: DataType = DataType::Decimal128(DECIMAL128_MAX_PRECISION, 0); -pub use store::CachedParquetData; -pub type ParquetFooterCache = Cache; - #[derive(Clone, Copy, Debug, Default, Deserialize, Serialize)] pub struct Timestamp(pub Duration); diff --git a/crates/core/common/src/metadata.rs b/crates/core/common/src/metadata.rs index 6d5ab0d1b..e43eec80e 100644 --- a/crates/core/common/src/metadata.rs +++ b/crates/core/common/src/metadata.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use amp_data_store::{DataStore, file_name::FileName}; use datafusion::parquet::{ arrow::{arrow_reader::ArrowReaderOptions, async_reader::AsyncFileReader}, errors::ParquetError, @@ -12,17 +13,15 @@ use metadata_db::{ use object_store::{ObjectMeta, path::Path}; use tracing::instrument; -mod file_name; pub mod parquet; pub mod segments; mod size; use self::parquet::{PARQUET_METADATA_KEY, ParquetMeta}; -pub use self::{ - file_name::FileName, - size::{Generation, Overflow, SegmentSize, get_block_count, le_bytes_to_nonzero_i64_opt}, +pub use self::size::{ + Generation, Overflow, SegmentSize, get_block_count, le_bytes_to_nonzero_i64_opt, }; -use crate::{BoxError, Store}; +use crate::BoxError; #[derive(Debug, Clone)] pub struct FileMetadata { @@ -83,7 +82,7 @@ impl FileMetadata { #[instrument(skip(object_meta, store), err)] pub async fn extract_footer_bytes_from_file( - store: &Store, + store: &DataStore, object_meta: &ObjectMeta, ) -> Result { let parquet_metadata = extract_parquet_metadata_from_file(store, object_meta).await?; @@ -95,7 +94,7 @@ pub async fn extract_footer_bytes_from_file( #[instrument(skip(object_meta, store), err)] pub async fn amp_metadata_from_parquet_file( - store: &Store, + store: &DataStore, object_meta: &ObjectMeta, ) -> Result<(FileName, ParquetMeta, FooterBytes), BoxError> { let parquet_metadata = extract_parquet_metadata_from_file(store, object_meta).await?; @@ -146,7 +145,7 @@ pub async fn amp_metadata_from_parquet_file( } async fn extract_parquet_metadata_from_file( - store: &Store, + store: &DataStore, object_meta: &ObjectMeta, ) -> Result, ParquetError> { let mut reader = store diff --git a/crates/core/common/src/metadata/parquet.rs b/crates/core/common/src/metadata/parquet.rs index 88344c40b..15ecc84fe 100644 --- a/crates/core/common/src/metadata/parquet.rs +++ b/crates/core/common/src/metadata/parquet.rs @@ -18,12 +18,10 @@ //! //! See also: metadata-consistency +use amp_data_store::file_name::FileName; use serde::{Deserialize, Serialize}; -use crate::{ - Timestamp, - metadata::{file_name::FileName, segments::BlockRange}, -}; +use crate::{Timestamp, metadata::segments::BlockRange}; pub const PARQUET_METADATA_KEY: &str = "nozzle_metadata"; pub const PARENT_FILE_ID_METADATA_KEY: &str = "parent_file_ids"; diff --git a/crates/core/common/src/metadata/size.rs b/crates/core/common/src/metadata/size.rs index 5c236fb6e..cec7f6786 100644 --- a/crates/core/common/src/metadata/size.rs +++ b/crates/core/common/src/metadata/size.rs @@ -835,12 +835,11 @@ pub fn le_bytes_to_nonzero_i64_opt(bytes: &[u8]) -> Result, T pub mod test { use std::sync::Arc; + use amp_data_store::file_name::FileName; + use crate::{ Timestamp, - metadata::{ - file_name::FileName, - parquet::{GENERATION_METADATA_KEY, PARQUET_METADATA_KEY, ParquetMeta}, - }, + metadata::parquet::{GENERATION_METADATA_KEY, PARQUET_METADATA_KEY, ParquetMeta}, parquet::{ basic::{Repetition, Type as PhysicalType}, file::{ diff --git a/crates/core/common/src/query_context.rs b/crates/core/common/src/query_context.rs index 0e01be2d9..c61769e96 100644 --- a/crates/core/common/src/query_context.rs +++ b/crates/core/common/src/query_context.rs @@ -3,6 +3,7 @@ use std::{ sync::{Arc, LazyLock}, }; +use amp_data_store::DataStore; use arrow::{array::ArrayRef, compute::concat_batches}; use axum::response::IntoResponse; use datafusion::{ @@ -32,10 +33,10 @@ use futures::{TryStreamExt, stream}; use js_runtime::isolate_pool::IsolatePool; use regex::Regex; use thiserror::Error; -use tracing::{debug, field, instrument}; +use tracing::{field, instrument}; use crate::{ - BlockNum, BoxError, Store, arrow, + BlockNum, BoxError, arrow, catalog::physical::{Catalog, CatalogSnapshot, TableSnapshot}, evm::udfs::{ EvmDecodeLog, EvmDecodeParams, EvmDecodeType, EvmEncodeParams, EvmEncodeType, EvmTopic, @@ -47,7 +48,6 @@ use crate::{ forbid_underscore_prefixed_aliases, }, sql::TableReference, - store::CachedStore, utils::error_with_causes, }; @@ -128,7 +128,6 @@ pub struct QueryEnv { // Per-query memory limit configuration pub query_max_mem_mb: usize, - pub parquet_cache_size_mb: u64, } /// Creates a QueryEnv with specified memory and cache configuration @@ -139,7 +138,6 @@ pub fn create_query_env( max_mem_mb: usize, query_max_mem_mb: usize, spill_location: &[PathBuf], - parquet_cache_size_mb: u64, ) -> Result { let spill_allowed = !spill_location.is_empty(); let disk_manager_mode = if spill_allowed { @@ -171,7 +169,6 @@ pub fn create_query_env( object_store_registry: runtime_env.object_store_registry, isolate_pool, query_max_mem_mb, - parquet_cache_size_mb, }) } @@ -182,15 +179,15 @@ pub struct QueryContext { session_config: SessionConfig, catalog: CatalogSnapshot, /// Per-query memory pool (if per-query limits are enabled) - tiered_memory_pool: Arc, - store: CachedStore, + tiered_memory_pool: Arc, + store: DataStore, } impl QueryContext { pub async fn for_catalog( catalog: Catalog, env: QueryEnv, - store: Store, + store: DataStore, ignore_canonical_segments: bool, ) -> Result { // This contains various tuning options for the query engine. @@ -222,9 +219,6 @@ impl QueryContext { opts.execution.parquet.pushdown_filters = true; } - // Create cached store with parquet metadata caching - let store = CachedStore::new(store, env.parquet_cache_size_mb); - // Create a catalog snapshot with canonical chain locked in let catalog_snapshot = CatalogSnapshot::from_catalog(catalog, ignore_canonical_segments, store.clone()) @@ -413,7 +407,7 @@ fn read_only_check(plan: &LogicalPlan) -> Result<(), Error> { fn register_table( ctx: &SessionContext, table: Arc, - store: &Store, + store: &DataStore, ) -> Result<(), DataFusionError> { // The catalog schema needs to be explicitly created or table creation will fail. create_catalog_schema(ctx, table.physical_table().catalog_schema().to_string()); @@ -467,7 +461,7 @@ async fn execute_plan( use datafusion::physical_plan::execute_stream; read_only_check(&plan)?; - debug!("logical plan: {}", plan.to_string().replace('\n', "\\n")); + tracing::debug!("logical plan: {}", plan.to_string().replace('\n', "\\n")); if logical_optimize { plan = ctx.state().optimize(&plan).map_err(Error::PlanningError)?; @@ -483,7 +477,7 @@ async fn execute_plan( forbid_duplicate_field_names(&physical_plan, &plan).map_err(Error::PlanningError)?; - debug!("physical plan: {}", print_physical_plan(&*physical_plan)); + tracing::debug!("physical plan: {}", print_physical_plan(&*physical_plan)); match is_explain { false => execute_stream(physical_plan, ctx.task_ctx()).map_err(Error::PlanningError), diff --git a/crates/core/data-store/Cargo.toml b/crates/core/data-store/Cargo.toml new file mode 100644 index 000000000..ff1ddd84f --- /dev/null +++ b/crates/core/data-store/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "amp-data-store" +edition.workspace = true +version.workspace = true +license-file.workspace = true + +[dependencies] +amp-object-store = { path = "../object-store" } +bytes.workspace = true +datafusion.workspace = true +datasets-common = { path = "../datasets-common" } +foyer = "0.21" +futures.workspace = true +metadata-db = { path = "../metadata-db" } +object_store.workspace = true +rand.workspace = true +serde.workspace = true +serde_json.workspace = true +thiserror.workspace = true +url.workspace = true +uuid.workspace = true diff --git a/crates/core/common/src/metadata/file_name.rs b/crates/core/data-store/src/file_name.rs similarity index 99% rename from crates/core/common/src/metadata/file_name.rs rename to crates/core/data-store/src/file_name.rs index 86bae446d..d67103958 100644 --- a/crates/core/common/src/metadata/file_name.rs +++ b/crates/core/data-store/src/file_name.rs @@ -1,4 +1,5 @@ -use crate::BlockNum; +/// Block number type alias. +pub type BlockNum = u64; /// A validated file name for parquet files. /// diff --git a/crates/core/common/src/store.rs b/crates/core/data-store/src/lib.rs similarity index 93% rename from crates/core/common/src/store.rs rename to crates/core/data-store/src/lib.rs index b091ee514..092a01fac 100644 --- a/crates/core/common/src/store.rs +++ b/crates/core/data-store/src/lib.rs @@ -1,6 +1,6 @@ //! Object store abstraction layer. //! -//! This module provides the [`Store`] wrapper. +//! This module provides the [`DataStore`] wrapper. use std::sync::Arc; @@ -19,14 +19,19 @@ use datafusion::{ }; use datasets_common::{hash::Hash, hash_reference::HashReference, table_name::TableName}; use foyer::Cache; +// Re-export foyer::Cache for use by downstream crates +pub use foyer::Cache as FoyerCache; use futures::{Stream, StreamExt as _, TryStreamExt as _, stream::BoxStream}; use metadata_db::{LocationId, MetadataDb, PhysicalTable, files::FileId}; use object_store::{ObjectMeta, ObjectStore, buffered::BufWriter, path::Path}; use url::Url; -use crate::{ - catalog::physical::{PhyTablePath, PhyTableRevisionPath}, - metadata::FileName, +pub mod file_name; +pub mod physical_table; + +use self::{ + file_name::FileName, + physical_table::{PhyTablePath, PhyTableRevisionPath}, }; /// Data store. @@ -39,13 +44,14 @@ use crate::{ /// - Can be extended with helper functions. /// - Provides integrated metadata database operations. #[derive(Debug, Clone)] -pub struct Store { +pub struct DataStore { metadata_db: MetadataDb, object_store: Arc, url: Arc, + parquet_footer_cache: Cache, } -impl Store { +impl DataStore { /// Creates a store for an object store URL (or filesystem directory). /// /// Examples of valid formats for `data_location`: @@ -55,16 +61,27 @@ impl Store { /// - Prefixed: `s3://bucket-name/my_prefix/` /// /// If `data_location` is a relative filesystem path, then `base` will be used as the prefix. + /// + /// The `cache_size_mb` parameter controls the maximum memory footprint of the parquet + /// metadata cache. The cache uses a memory-weighted eviction policy. pub fn new( metadata_db: MetadataDb, url: ObjectStoreUrl, + cache_size_mb: u64, ) -> Result { let object_store: Arc = amp_object_store::new_with_prefix(&url, url.path())?; + + let cache_size_bytes = cache_size_mb * 1024 * 1024; + let parquet_footer_cache = foyer::CacheBuilder::new(cache_size_bytes as usize) + .with_weighter(|_k, v: &CachedParquetData| v.metadata.memory_size()) + .build(); + Ok(Self { metadata_db, object_store, url: Arc::new(url), + parquet_footer_cache, }) } @@ -92,7 +109,7 @@ impl Store { } /// Physical table revision management -impl Store { +impl DataStore { /// Registers a new physical table revision and marks it as active. /// /// This atomically registers the revision location in the metadata database @@ -184,7 +201,7 @@ impl Store { } /// Physical table files management -impl Store { +impl DataStore { /// Registers a file in the metadata database. /// /// Associates the file with a specific location ID and stores its Parquet metadata. @@ -328,10 +345,56 @@ impl Store { .map(|r| r.map_err(DeleteFilesStreamError)), ) } + + /// Gets cached parquet metadata for a file, fetching from database on cache miss. + /// + /// This method encapsulates the cache lookup and database fetch logic. On cache miss, + /// it retrieves the footer bytes from the metadata database, parses the parquet metadata, + /// computes statistics, and stores the result in the cache. + /// + /// The `schema` parameter is required to compute DataFusion statistics from the parquet + /// metadata. + pub async fn get_cached_parquet_metadata( + &self, + file_id: FileId, + schema: SchemaRef, + ) -> Result { + let cache = self.parquet_footer_cache.clone(); + let metadata_db = self.metadata_db.clone(); + let file_id1 = file_id; + + cache + .get_or_fetch(&file_id1, || async move { + // Cache miss, fetch from database + let footer = metadata_db::files::get_footer_bytes(&metadata_db, file_id1) + .await + .map_err(GetCachedMetadataError::FetchFooter)?; + + let metadata = Arc::new( + ParquetMetaDataReader::new() + .with_page_index_policy(PageIndexPolicy::Required) + .parse_and_finish(&Bytes::from_owner(footer)) + .map_err(GetCachedMetadataError::ParseMetadata)?, + ); + + let statistics = Arc::new( + DFParquetMetadata::statistics_from_parquet_metadata(&metadata, &schema) + .map_err(GetCachedMetadataError::ComputeStatistics)?, + ); + + Ok::(CachedParquetData { + metadata, + statistics, + }) + }) + .await + .map(|entry| entry.value().clone()) + .map_err(GetCachedMetadataError::CacheError) + } } /// Object store file readers and writers -impl Store { +impl DataStore { /// Creates a buffered writer for writing a file to a table revision. pub fn create_revision_file_writer( &self, @@ -610,118 +673,6 @@ pub struct DeleteFileInObjectStoreError(#[source] pub object_store::Error); #[error("Failed to delete file during streaming deletion")] pub struct DeleteFilesStreamError(#[source] pub object_store::Error); -/// Cached data store. -/// -/// A wrapper around [`Store`] that adds parquet metadata caching for query execution. -/// -/// This type provides: -/// - All [`Store`] functionality via [`Deref`] -/// - Cached access to parquet file metadata via `get_cached_parquet_metadata()` -/// - Automatic cache management with configurable size -/// -/// Use this type in query contexts where repeated metadata access benefits from caching. -/// For non-query contexts (dump, GC, etc.), use [`Store`] directly. -#[derive(Debug, Clone)] -pub struct CachedStore { - store: Store, - parquet_footer_cache: Cache, -} - -impl CachedStore { - /// Creates a cached store with the specified cache size. - /// - /// The cache uses a memory-weighted eviction policy based on the size of cached - /// parquet metadata. The `cache_size_mb` parameter controls the maximum memory - /// footprint of the cache. - pub fn new(store: Store, cache_size_mb: u64) -> Self { - let cache_size_bytes = cache_size_mb * 1024 * 1024; - let parquet_footer_cache = foyer::CacheBuilder::new(cache_size_bytes as usize) - .with_weighter(|_k, v: &CachedParquetData| v.metadata.memory_size()) - .build(); - - Self { - store, - parquet_footer_cache, - } - } - - /// Creates a cached store from existing store and cache instances. - /// - /// Use this when you already have a cache instance that should be shared - /// across multiple components. - pub fn from_parts( - store: Store, - parquet_footer_cache: Cache, - ) -> Self { - Self { - store, - parquet_footer_cache, - } - } - - /// Returns a clone of the inner `Store`. - /// - /// Use this when you need to pass a `Store` to code that doesn't support `CachedStore`. - /// Since `Store` is cheap to clone (uses `Arc` internally), this operation is efficient. - pub fn as_store(&self) -> Store { - self.store.clone() - } - - /// Gets cached parquet metadata for a file, fetching from database on cache miss. - /// - /// This method encapsulates the cache lookup and database fetch logic. On cache miss, - /// it retrieves the footer bytes from the metadata database, parses the parquet metadata, - /// computes statistics, and stores the result in the cache. - /// - /// The `schema` parameter is required to compute DataFusion statistics from the parquet - /// metadata. - pub async fn get_cached_parquet_metadata( - &self, - file_id: FileId, - schema: SchemaRef, - ) -> Result { - let cache = self.parquet_footer_cache.clone(); - let metadata_db = self.store.metadata_db.clone(); - let file_id1 = file_id; - - cache - .get_or_fetch(&file_id1, || async move { - // Cache miss, fetch from database - let footer = metadata_db::files::get_footer_bytes(&metadata_db, file_id1) - .await - .map_err(GetCachedMetadataError::FetchFooter)?; - - let metadata = Arc::new( - ParquetMetaDataReader::new() - .with_page_index_policy(PageIndexPolicy::Required) - .parse_and_finish(&Bytes::from_owner(footer)) - .map_err(GetCachedMetadataError::ParseMetadata)?, - ); - - let statistics = Arc::new( - DFParquetMetadata::statistics_from_parquet_metadata(&metadata, &schema) - .map_err(GetCachedMetadataError::ComputeStatistics)?, - ); - - Ok::(CachedParquetData { - metadata, - statistics, - }) - }) - .await - .map(|entry| entry.value().clone()) - .map_err(GetCachedMetadataError::CacheError) - } -} - -impl std::ops::Deref for CachedStore { - type Target = Store; - - fn deref(&self) -> &Self::Target { - &self.store - } -} - /// Cached parquet data including metadata and computed statistics. #[derive(Clone)] pub struct CachedParquetData { diff --git a/crates/core/data-store/src/physical_table.rs b/crates/core/data-store/src/physical_table.rs new file mode 100644 index 000000000..9ae95bd41 --- /dev/null +++ b/crates/core/data-store/src/physical_table.rs @@ -0,0 +1,265 @@ +use datasets_common::{name::Name, table_name::TableName}; +use object_store::path::Path; +use url::Url; +use uuid::Uuid; + +/// Path delimiter used in object store paths. +const PATH_DELIMITER: char = '/'; + +/// Physical table URL _new-type_ wrapper +/// +/// Represents a base directory URL in the object store containing all parquet files for a table. +/// Individual file URLs are constructed by appending the filename to this base URL. +/// +/// ## URL Format +/// +/// `////` +/// +/// Where: +/// - `store_base_url`: Object store base URL, may include path prefix after bucket +/// (e.g., `s3://bucket/prefix`, `file:///data/subdir`) +/// - `dataset_name`: Dataset name (without namespace) +/// - `table_name`: Table name +/// - `revision_id`: Unique identifier for this table revision (typically UUIDv7) +/// +/// ## Example +/// +/// ```text +/// s3://my-bucket/prefix/ethereum_mainnet/logs/01234567-89ab-cdef-0123-456789abcdef/ +/// ``` +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct PhyTableUrl(Url); + +impl PhyTableUrl { + /// Constructs a table URL from a base URL and revision path. + /// + /// The URL follows the structure: `//` + /// + /// Where: + /// - `base_url`: Object store root URL (e.g., `s3://bucket/prefix/`, `file:///data/`) + /// - `revision_path`: Complete path to the table revision (dataset_name/table_name/revision_uuid) + pub fn new(base_url: &Url, revision_path: &PhyTableRevisionPath) -> Self { + // SAFETY: Path components (Name, TableName, Uuid) contain only URL-safe characters + let raw_url = base_url + .join(&format!("{}/", revision_path)) + .expect("path is URL-safe"); + PhyTableUrl(raw_url) + } + + /// Get the URL as a string slice + pub fn as_str(&self) -> &str { + self.0.as_str() + } + + /// Get a reference to the inner [`Url`] + pub fn inner(&self) -> &Url { + &self.0 + } +} + +impl std::str::FromStr for PhyTableUrl { + type Err = PhyTableUrlParseError; + + fn from_str(s: &str) -> Result { + let url = s.parse().map_err(|err| PhyTableUrlParseError { + url: s.to_string(), + source: err, + })?; + Ok(PhyTableUrl(url)) + } +} + +impl std::fmt::Display for PhyTableUrl { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.0.as_str()) + } +} + +/// Path to a table directory in object storage (without revision). +/// +/// Represents the parent directory containing all revisions of a table. +/// Format: `/` +/// +/// **NOTE**: The underlying [`object_store::Path`] type automatically strips leading and +/// trailing slashes, so the string representation will not contain a trailing slash. +/// +/// ## Example +/// +/// ```text +/// ethereum_mainnet/logs +/// ``` +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct PhyTablePath(Path); + +impl PhyTablePath { + /// Constructs the path to a table directory (without revision). + pub fn new(dataset_name: impl AsRef, table_name: impl AsRef) -> Self { + Self(format!("{}/{}", dataset_name.as_ref(), table_name.as_ref()).into()) + } + + /// Create a revision path by appending the given revision ID to this table path. + pub fn with_revision(&self, revision_id: impl AsRef) -> PhyTableRevisionPath { + let path = self.0.child(revision_id.as_ref().to_string()); + PhyTableRevisionPath(path) + } + + /// Get a reference to the underlying [`object_store::path::Path`] + pub fn as_object_store_path(&self) -> &Path { + &self.0 + } + + /// Get the path as a string slice + pub fn as_str(&self) -> &str { + self.0.as_ref() + } +} + +impl std::fmt::Display for PhyTablePath { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.0.fmt(f) + } +} + +impl std::ops::Deref for PhyTablePath { + type Target = Path; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +/// Path to a table revision directory in object storage. +/// +/// Represents a specific revision of a table, identified by a UUID. +/// Format: `//` +/// +/// **NOTE**: The underlying [`object_store::Path`] type automatically strips leading and +/// trailing slashes, so the string representation will not contain a trailing slash. +/// +/// ## Example +/// +/// ```text +/// ethereum_mainnet/logs/01234567-89ab-cdef-0123-456789abcdef +/// ``` +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct PhyTableRevisionPath(Path); + +impl PhyTableRevisionPath { + /// Constructs the path to a table revision directory. + pub fn new( + dataset_name: impl AsRef, + table_name: impl AsRef, + revision_id: impl AsRef, + ) -> Self { + Self( + format!( + "{}/{}/{}", + dataset_name.as_ref(), + table_name.as_ref(), + revision_id.as_ref() + ) + .into(), + ) + } + + /// Get a reference to the underlying [`Path`] + pub fn as_object_store_path(&self) -> &Path { + &self.0 + } + + /// Get the path as a string slice + pub fn as_str(&self) -> &str { + self.0.as_ref() + } +} + +impl std::str::FromStr for PhyTableRevisionPath { + type Err = PhyTableRevisionPathError; + + fn from_str(s: &str) -> Result { + let mut parts = s.trim_end_matches(PATH_DELIMITER).split(PATH_DELIMITER); + + let revision_uuid: Uuid = parts + .next_back() + .filter(|s| !s.is_empty()) + .ok_or(PhyTableRevisionPathError::NotEnoughComponents(0))? + .parse() + .map_err(PhyTableRevisionPathError::InvalidRevisionUuid)?; + + let table_name: TableName = parts + .next_back() + .filter(|s| !s.is_empty()) + .ok_or(PhyTableRevisionPathError::NotEnoughComponents(1))? + .parse() + .map_err(PhyTableRevisionPathError::InvalidTableName)?; + + let dataset_name: Name = parts + .next_back() + .filter(|s| !s.is_empty()) + .ok_or(PhyTableRevisionPathError::NotEnoughComponents(2))? + .parse() + .map_err(PhyTableRevisionPathError::InvalidDatasetName)?; + + Ok(Self::new(dataset_name, table_name, revision_uuid)) + } +} + +impl std::fmt::Display for PhyTableRevisionPath { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.0.fmt(f) + } +} + +impl std::ops::Deref for PhyTableRevisionPath { + type Target = Path; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +/// Error when parsing a path into a [`PhyTableRevisionPath`] +#[derive(Debug, thiserror::Error)] +pub enum PhyTableRevisionPathError { + #[error("path must have at least 3 components, got {0}")] + NotEnoughComponents(usize), + + #[error("invalid dataset name")] + InvalidDatasetName(#[source] datasets_common::name::NameError), + + #[error("invalid table name")] + InvalidTableName(#[source] datasets_common::table_name::TableNameError), + + #[error("invalid revision UUID")] + InvalidRevisionUuid(#[source] uuid::Error), +} + +/// Error type for PhyTableUrl parsing +#[derive(Debug, thiserror::Error)] +#[error("invalid object store URL '{url}'")] +pub struct PhyTableUrlParseError { + url: String, + #[source] + source: url::ParseError, +} + +impl From for metadata_db::physical_table::TablePathOwned { + fn from(value: PhyTableRevisionPath) -> Self { + metadata_db::physical_table::TablePath::from_owned_unchecked(value.as_str().to_owned()) + } +} + +impl<'a> From<&'a PhyTableRevisionPath> for metadata_db::physical_table::TablePath<'a> { + fn from(value: &'a PhyTableRevisionPath) -> Self { + metadata_db::physical_table::TablePath::from_ref_unchecked(value.as_str()) + } +} + +impl From for PhyTableRevisionPath { + fn from(value: metadata_db::physical_table::TablePathOwned) -> Self { + value + .as_str() + .parse() + .expect("database path should be a valid revision path") + } +} diff --git a/crates/core/datasets-derived/Cargo.toml b/crates/core/datasets-derived/Cargo.toml index 5c7593fc4..7b74bac6d 100644 --- a/crates/core/datasets-derived/Cargo.toml +++ b/crates/core/datasets-derived/Cargo.toml @@ -8,6 +8,7 @@ license-file.workspace = true schemars = ["dep:schemars", "datasets-common/schemars", "common/schemars", "dep:serde_json"] [dependencies] +amp-data-store = { path = "../data-store" } common = { version = "0.1.0", path = "../common" } datafusion.workspace = true datasets-common = { path = "../datasets-common" } diff --git a/crates/core/datasets-derived/src/catalog.rs b/crates/core/datasets-derived/src/catalog.rs index f62619fb8..d883ccaea 100644 --- a/crates/core/datasets-derived/src/catalog.rs +++ b/crates/core/datasets-derived/src/catalog.rs @@ -15,8 +15,9 @@ use std::{ sync::Arc, }; +use amp_data_store::DataStore; use common::{ - BoxError, PlanningContext, ResolvedTable, Store, + BoxError, PlanningContext, ResolvedTable, catalog::{ dataset_access::DatasetAccess, logical::LogicalCatalog, @@ -46,7 +47,7 @@ use crate::manifest::Function; pub async fn catalog_for_sql_with_deps( store: &impl DatasetAccess, - data_store: &Store, + data_store: &DataStore, query: &Statement, env: &QueryEnv, dependencies: &BTreeMap, @@ -72,7 +73,7 @@ pub async fn catalog_for_sql_with_deps( async fn get_physical_catalog_with_deps( dataset_store: &impl DatasetAccess, - data_store: &Store, + data_store: &DataStore, table_refs: impl IntoIterator>, function_refs: impl IntoIterator>, env: &QueryEnv, diff --git a/crates/core/dump/Cargo.toml b/crates/core/dump/Cargo.toml index abd9e1068..19c53d6cb 100644 --- a/crates/core/dump/Cargo.toml +++ b/crates/core/dump/Cargo.toml @@ -5,6 +5,7 @@ version.workspace = true license-file.workspace = true [dependencies] +amp-data-store = { path = "../data-store" } async-stream.workspace = true metadata-db = { path = "../metadata-db" } common = { path = "../common" } diff --git a/crates/core/dump/src/check.rs b/crates/core/dump/src/check.rs index 7a5b9fba5..cee7c35b0 100644 --- a/crates/core/dump/src/check.rs +++ b/crates/core/dump/src/check.rs @@ -1,8 +1,7 @@ use std::collections::{BTreeMap, BTreeSet}; -use common::{ - Store, catalog::physical::PhysicalTable, metadata::FileName, query_context::Error as QueryError, -}; +use amp_data_store::{DataStore, file_name::FileName}; +use common::{catalog::physical::PhysicalTable, query_context::Error as QueryError}; use futures::TryStreamExt as _; use metadata_db::LocationId; use object_store::ObjectMeta; @@ -31,7 +30,7 @@ use object_store::ObjectMeta; /// These deletions are logged at `WARN` level before execution. pub async fn consistency_check( table: &PhysicalTable, - store: &Store, + store: &DataStore, ) -> Result<(), ConsistencyError> { // See also: metadata-consistency @@ -135,7 +134,7 @@ pub enum ConsistencyError { ListObjectStore { location_id: LocationId, #[source] - source: common::store::StreamRevisionFilesInObjectStoreError, + source: amp_data_store::StreamRevisionFilesInObjectStoreError, }, /// Failed to delete orphaned file from object store diff --git a/crates/core/dump/src/compaction.rs b/crates/core/dump/src/compaction.rs index 18b021531..52beb15d3 100644 --- a/crates/core/dump/src/compaction.rs +++ b/crates/core/dump/src/compaction.rs @@ -11,8 +11,9 @@ use std::{ }; pub use algorithm::{CompactionAlgorithm, SegmentSizeLimit}; +use amp_data_store::DataStore; pub use collector::{Collector, CollectorProperties}; -use common::{CachedStore, Timestamp, catalog::physical::PhysicalTable}; +use common::{Timestamp, catalog::physical::PhysicalTable}; pub use compactor::{Compactor, CompactorProperties}; use error::{CollectionResult, CollectorError, CompactionResult, CompactorError}; use futures::FutureExt; @@ -89,7 +90,7 @@ pub struct AmpCollectorInnerTask { impl AmpCollectorInnerTask { pub fn new( metadata_db: MetadataDb, - store: CachedStore, + store: DataStore, props: Arc, table: Arc, metrics: Option>, @@ -103,7 +104,7 @@ impl AmpCollectorInnerTask { ); let collector = Collector::new( metadata_db, - store.as_store(), + store, props.clone(), table.clone(), metrics.clone(), @@ -124,7 +125,7 @@ impl AmpCollectorInnerTask { fn start( metadata_db: MetadataDb, - store: CachedStore, + store: DataStore, props: Arc, table: Arc, metrics: Option>, @@ -262,7 +263,7 @@ impl AmpCompactorTask { pub fn start( metadata_db: MetadataDb, - store: CachedStore, + store: DataStore, props: Arc, table: Arc, metrics: Option>, @@ -289,7 +290,7 @@ impl AmpCompactorTask { impl AmpCompactor { pub fn start( metadata_db: MetadataDb, - store: CachedStore, + store: DataStore, props: Arc, table: Arc, metrics: Option>, diff --git a/crates/core/dump/src/compaction/collector.rs b/crates/core/dump/src/compaction/collector.rs index f924b40d8..468395202 100644 --- a/crates/core/dump/src/compaction/collector.rs +++ b/crates/core/dump/src/compaction/collector.rs @@ -5,8 +5,9 @@ use std::{ time::Duration, }; -use common::{Store, catalog::physical::PhysicalTable, store::DeleteFilesStreamError}; -use futures::{StreamExt, TryStreamExt, stream}; +use amp_data_store::{DataStore, DeleteFilesStreamError}; +use common::catalog::physical::PhysicalTable; +use futures::{StreamExt as _, TryStreamExt as _, stream}; use metadata_db::{MetadataDb, files::FileId, gc::GcManifestRow}; use object_store::{Error as ObjectStoreError, path::Path}; @@ -36,7 +37,7 @@ impl<'a> From<&'a ParquetConfig> for CollectorProperties { #[derive(Clone)] pub struct Collector { metadata_db: MetadataDb, - store: Store, + store: DataStore, table: Arc, props: Arc, metrics: Option>, @@ -55,7 +56,7 @@ impl Debug for Collector { impl Collector { pub fn new( metadata_db: MetadataDb, - store: Store, + store: DataStore, props: Arc, table: Arc, metrics: Option>, diff --git a/crates/core/dump/src/compaction/compactor.rs b/crates/core/dump/src/compaction/compactor.rs index 5cb9e675e..5833be5fa 100644 --- a/crates/core/dump/src/compaction/compactor.rs +++ b/crates/core/dump/src/compaction/compactor.rs @@ -8,10 +8,11 @@ use std::{ time::Duration, }; +use amp_data_store::{DataStore, file_name::FileName}; use common::{ - BlockNum, CachedStore, + BlockNum, catalog::physical::PhysicalTable, - metadata::{FileName, SegmentSize, segments::BlockRange}, + metadata::{SegmentSize, segments::BlockRange}, }; use futures::{StreamExt, TryStreamExt, stream}; use metadata_db::MetadataDb; @@ -52,7 +53,7 @@ impl<'a> From<&'a ParquetConfig> for CompactorProperties { #[derive(Clone)] pub struct Compactor { metadata_db: MetadataDb, - store: CachedStore, + store: DataStore, table: Arc, props: Arc, metrics: Option>, @@ -83,7 +84,7 @@ impl Display for Compactor { impl Compactor { pub fn new( metadata_db: MetadataDb, - store: CachedStore, + store: DataStore, props: Arc, table: Arc, metrics: Option>, @@ -165,7 +166,7 @@ impl Compactor { pub struct CompactionGroup { metadata_db: MetadataDb, - store: CachedStore, + store: DataStore, props: Arc, metrics: Option>, pub(super) size: SegmentSize, @@ -187,7 +188,7 @@ impl Debug for CompactionGroup { impl CompactionGroup { pub fn new_empty( metadata_db: MetadataDb, - store: CachedStore, + store: DataStore, props: Arc, table: Arc, metrics: Option>, @@ -240,10 +241,9 @@ impl CompactionGroup { let filename = FileName::new_with_random_suffix(range.start()); let buf_writer = self .store - .as_store() .create_revision_file_writer(self.table.path(), &filename); let mut writer = ParquetFileWriter::new( - self.store.as_store(), + self.store, buf_writer, filename, Arc::clone(&self.table), @@ -346,7 +346,7 @@ impl ParquetFileWriterOutput { async fn commit_metadata(&self, metadata_db: &MetadataDb) -> Result<(), metadata_db::Error> { let location_id = self.location_id; let file_name = self.object_meta.location.filename().unwrap().to_string(); - let file_name = common::metadata::FileName::new_unchecked(file_name); + let file_name = FileName::new_unchecked(file_name); let object_size = self.object_meta.size; let object_e_tag = self.object_meta.e_tag.clone(); let object_version = self.object_meta.version.clone(); diff --git a/crates/core/dump/src/compaction/plan.rs b/crates/core/dump/src/compaction/plan.rs index 19a303a0a..bdbba844a 100644 --- a/crates/core/dump/src/compaction/plan.rs +++ b/crates/core/dump/src/compaction/plan.rs @@ -6,8 +6,8 @@ use std::{ task::{Context, Poll}, }; +use amp_data_store::DataStore; use common::{ - CachedStore, catalog::{ physical::{PhysicalTable, TableSnapshot}, reader::AmpReaderFactory, @@ -111,7 +111,7 @@ pub struct CompactionPlan<'a> { /// The metadata database for committing compaction results. metadata_db: MetadataDb, /// The data store for object storage operations. - store: CachedStore, + store: DataStore, /// Stream of files to be considered for compaction. files: BoxStream<'a, CompactionResult>, /// Compaction properties configuring the compaction algorithm @@ -138,7 +138,7 @@ impl<'a> CompactionPlan<'a> { #[tracing::instrument(skip_all)] pub fn from_snapshot( metadata_db: MetadataDb, - store: CachedStore, + store: DataStore, opts: Arc, table: &'a TableSnapshot, metrics: &Option>, diff --git a/crates/core/dump/src/config.rs b/crates/core/dump/src/config.rs index 1768bf413..0c95ccf27 100644 --- a/crates/core/dump/src/config.rs +++ b/crates/core/dump/src/config.rs @@ -39,7 +39,6 @@ impl Config { self.max_mem_mb, self.query_max_mem_mb, &self.spill_location, - self.parquet.cache_size_mb, ) } } diff --git a/crates/core/dump/src/derived_dataset.rs b/crates/core/dump/src/derived_dataset.rs index b6a6b7686..6a3dcc80b 100644 --- a/crates/core/dump/src/derived_dataset.rs +++ b/crates/core/dump/src/derived_dataset.rs @@ -96,10 +96,11 @@ use std::{collections::BTreeMap, sync::Arc, time::Instant}; +use amp_data_store::file_name::FileName; use common::{ BlockNum, BoxError, DetachedLogicalPlan, PlanningContext, QueryContext, catalog::physical::{Catalog, PhysicalTable}, - metadata::{FileName, Generation, segments::ResumeWatermark}, + metadata::{Generation, segments::ResumeWatermark}, query_context::QueryEnv, }; use datasets_common::{deps::alias::DepAlias, hash_reference::HashReference}; @@ -502,7 +503,7 @@ async fn dump_sql_query( keep_alive_interval, ) .await? - .as_stream() + .into_stream() }; let mut microbatch_start = start; diff --git a/crates/core/dump/src/lib.rs b/crates/core/dump/src/lib.rs index 3f7eb4a3c..60a9866fc 100644 --- a/crates/core/dump/src/lib.rs +++ b/crates/core/dump/src/lib.rs @@ -2,10 +2,10 @@ use std::sync::Arc; +use amp_data_store::DataStore; use common::{ catalog::physical::PhysicalTable, parquet::file::properties::WriterProperties as ParquetWriterProperties, - store::Store as DataStore, }; use dataset_store::{DatasetKind, DatasetStore}; use datasets_common::hash_reference::HashReference; diff --git a/crates/core/dump/src/parquet_writer.rs b/crates/core/dump/src/parquet_writer.rs index 7523b0659..be1ce7dcd 100644 --- a/crates/core/dump/src/parquet_writer.rs +++ b/crates/core/dump/src/parquet_writer.rs @@ -1,11 +1,12 @@ use std::sync::Arc; +use amp_data_store::{DataStore, file_name::FileName}; use common::{ - BoxError, Store, Timestamp, + BoxError, Timestamp, arrow::array::RecordBatch, catalog::physical::PhysicalTable, metadata::{ - FileName, Generation, extract_footer_bytes_from_file, + Generation, extract_footer_bytes_from_file, parquet::{ GENERATION_METADATA_KEY, PARENT_FILE_ID_METADATA_KEY, PARQUET_METADATA_KEY, ParquetMeta, }, @@ -60,7 +61,7 @@ pub async fn commit_metadata( } pub struct ParquetFileWriter { - store: Store, + store: DataStore, writer: AsyncArrowWriter, filename: FileName, table: Arc, @@ -69,7 +70,7 @@ pub struct ParquetFileWriter { impl ParquetFileWriter { pub fn new( - store: Store, + store: DataStore, writer: BufWriter, filename: FileName, table: Arc, diff --git a/crates/core/dump/src/raw_dataset.rs b/crates/core/dump/src/raw_dataset.rs index 685784dd7..152407955 100644 --- a/crates/core/dump/src/raw_dataset.rs +++ b/crates/core/dump/src/raw_dataset.rs @@ -89,8 +89,9 @@ use std::{ time::{Duration, Instant}, }; +use amp_data_store::DataStore; use common::{ - BlockNum, BlockStreamer, BoxError, LogicalCatalog, Store, + BlockNum, BlockStreamer, BoxError, LogicalCatalog, catalog::physical::{Catalog, PhysicalTable}, metadata::segments::merge_ranges, }; @@ -559,7 +560,7 @@ struct DumpPartition { /// The metadata database metadata_db: MetadataDb, /// The data store for object storage operations - data_store: Store, + data_store: DataStore, /// The tables to write to catalog: Catalog, /// The block ranges to scan diff --git a/crates/core/dump/src/raw_dataset_writer.rs b/crates/core/dump/src/raw_dataset_writer.rs index 59c7b9882..fdd45e388 100644 --- a/crates/core/dump/src/raw_dataset_writer.rs +++ b/crates/core/dump/src/raw_dataset_writer.rs @@ -1,10 +1,11 @@ use std::{collections::BTreeMap, ops::RangeInclusive, sync::Arc}; +use amp_data_store::{DataStore, file_name::FileName}; use common::{ - BlockNum, BoxError, RawTableRows, Store, + BlockNum, BoxError, RawTableRows, arrow::array::RecordBatch, catalog::physical::{Catalog, PhysicalTable}, - metadata::{FileName, Generation, segments::BlockRange}, + metadata::{Generation, segments::BlockRange}, }; use datasets_common::table_name::TableName; use metadata_db::MetadataDb; @@ -30,7 +31,7 @@ impl RawDatasetWriter { pub fn new( catalog: Catalog, metadata_db: MetadataDb, - store: Store, + store: DataStore, opts: Arc, missing_ranges_by_table: BTreeMap>>, compactors_by_table: BTreeMap>, @@ -114,7 +115,7 @@ impl RawDatasetWriter { struct RawTableWriter { table: Arc, - store: Store, + store: DataStore, opts: Arc, /// The ranges of block numbers that this writer is responsible for. @@ -132,7 +133,7 @@ struct RawTableWriter { impl RawTableWriter { pub fn new( table: Arc, - store: Store, + store: DataStore, compactor: Arc, opts: Arc, missing_ranges: Vec>, diff --git a/crates/core/dump/src/streaming_query.rs b/crates/core/dump/src/streaming_query.rs index 5ed50aeb2..ac0b88daf 100644 --- a/crates/core/dump/src/streaming_query.rs +++ b/crates/core/dump/src/streaming_query.rs @@ -7,9 +7,10 @@ use std::{ }; use alloy::{hex::ToHexExt as _, primitives::BlockHash}; +use amp_data_store::DataStore; use common::{ BlockNum, BoxError, Dataset, DetachedLogicalPlan, LogicalCatalog, PlanningContext, - QueryContext, SPECIAL_BLOCK_NUM, Store, + QueryContext, SPECIAL_BLOCK_NUM, arrow::{array::RecordBatch, datatypes::SchemaRef}, catalog::physical::{Catalog, PhysicalTable}, incrementalizer::incrementalize_plan, @@ -33,7 +34,7 @@ use tokio::{ }; use tokio_stream::wrappers::ReceiverStream; use tokio_util::task::AbortOnDropHandle; -use tracing::{Instrument, debug, instrument}; +use tracing::{Instrument, instrument}; /// Awaits any update for tables in a query context catalog. struct TableUpdates { @@ -143,7 +144,7 @@ pub struct StreamingQueryHandle { } impl StreamingQueryHandle { - pub fn as_stream(self) -> BoxStream<'static, Result> { + pub fn into_stream(self) -> BoxStream<'static, Result> { let data_stream = MessageStreamWithBlockComplete::new(ReceiverStream::new(self.rx).map(Ok)); let join = self.join_handle; @@ -178,7 +179,7 @@ impl StreamingQueryHandle { /// stream. pub struct StreamingQuery { query_env: QueryEnv, - data_store: Store, + data_store: DataStore, catalog: Catalog, plan: DetachedLogicalPlan, start_block: BlockNum, @@ -208,7 +209,7 @@ impl StreamingQuery { query_env: QueryEnv, catalog: Catalog, dataset_store: &DatasetStore, - data_store: Store, + data_store: DataStore, plan: DetachedLogicalPlan, start_block: BlockNum, end_block: Option, @@ -394,7 +395,7 @@ impl StreamingQuery { // The latest common watermark across the source tables. let Some(common_watermark) = self.latest_src_watermark(&blocks_ctx, chains).await? else { // No common watermark across source tables. - debug!("no common watermark found"); + tracing::debug!("no common watermark found"); return Ok(None); }; @@ -404,7 +405,7 @@ impl StreamingQuery { } let Some(direction) = self.next_microbatch_start(&blocks_ctx).await? else { - debug!("no next microbatch start found"); + tracing::debug!("no next microbatch start found"); return Ok(None); }; let start = direction.segment_start(); @@ -412,7 +413,7 @@ impl StreamingQuery { .next_microbatch_end(&blocks_ctx, start, common_watermark) .await? else { - debug!("no next microbatch end found"); + tracing::debug!("no next microbatch end found"); return Ok(None); }; Ok(Some(MicrobatchRange { @@ -624,7 +625,7 @@ impl StreamingQuery { let plan = ctx.plan_sql(query).await?; let results = ctx.execute_and_concat(plan).await?; if results.num_rows() == 0 { - debug!("blocks table missing block {} {:?}", number, hash); + tracing::debug!("blocks table missing block {} {:?}", number, hash); return Ok(None); } let get_hash_value = |column_name: &str| -> Option { @@ -666,7 +667,7 @@ pub fn keep_alive_stream<'a>( schema: SchemaRef, keep_alive_interval: u64, ) -> BoxStream<'a, Result> { - let period = tokio::time::Duration::from_secs(keep_alive_interval); + let period = Duration::from_secs(keep_alive_interval); let mut keep_alive_interval = tokio::time::interval(period); let missed_tick_behavior = MissedTickBehavior::Delay; @@ -703,7 +704,7 @@ pub fn keep_alive_stream<'a>( #[tracing::instrument(skip(dataset_store, data_store), err)] async fn resolve_blocks_table( dataset_store: &DatasetStore, - data_store: Store, + data_store: DataStore, root_datasets: BTreeMap>, network: &str, ) -> Result { diff --git a/crates/core/monitoring/src/logging.rs b/crates/core/monitoring/src/logging.rs index 8be5faa25..155c3c303 100644 --- a/crates/core/monitoring/src/logging.rs +++ b/crates/core/monitoring/src/logging.rs @@ -66,6 +66,7 @@ const AMP_CRATES: &[&str] = &[ "admin_api", "admin_client", "amp_client", + "amp_data_store", "amp_object_store", "ampcc", "ampctl", diff --git a/crates/services/admin-api/Cargo.toml b/crates/services/admin-api/Cargo.toml index 7f13dfa7d..1d7203922 100644 --- a/crates/services/admin-api/Cargo.toml +++ b/crates/services/admin-api/Cargo.toml @@ -5,6 +5,7 @@ license-file.workspace = true edition.workspace = true [dependencies] +amp-data-store = { path = "../../core/data-store" } async-trait.workspace = true axum.workspace = true common = { path = "../../core/common" } diff --git a/crates/services/admin-api/src/ctx.rs b/crates/services/admin-api/src/ctx.rs index 7d2370589..ba51c8063 100644 --- a/crates/services/admin-api/src/ctx.rs +++ b/crates/services/admin-api/src/ctx.rs @@ -1,7 +1,7 @@ //! Service context use std::sync::Arc; -use common::store::Store; +use amp_data_store::DataStore; use config::BuildInfo; use dataset_store::DatasetStore; use metadata_db::MetadataDb; @@ -15,7 +15,7 @@ pub struct Ctx { pub dataset_store: DatasetStore, pub scheduler: Arc, /// Object store for output data (used by dataset restore handler) - pub data_store: Store, + pub data_store: DataStore, /// Build information (version, git SHA, etc.) pub build_info: BuildInfo, } diff --git a/crates/services/controller/Cargo.toml b/crates/services/controller/Cargo.toml index 0de814c5e..9db0e8045 100644 --- a/crates/services/controller/Cargo.toml +++ b/crates/services/controller/Cargo.toml @@ -6,6 +6,7 @@ license-file.workspace = true [dependencies] admin-api = { path = "../admin-api" } +amp-data-store = { path = "../../core/data-store" } async-trait.workspace = true axum.workspace = true common = { path = "../../core/common" } diff --git a/crates/services/controller/src/service.rs b/crates/services/controller/src/service.rs index e1f398ab2..80d349bb2 100644 --- a/crates/services/controller/src/service.rs +++ b/crates/services/controller/src/service.rs @@ -1,13 +1,14 @@ use std::{future::Future, net::SocketAddr, sync::Arc, time::Duration}; use admin_api::ctx::Ctx; +use amp_data_store::DataStore; use axum::{ Router, http::StatusCode, routing::get, serve::{Listener as _, ListenerExt as _}, }; -use common::{BoxError, store::Store}; +use common::BoxError; use dataset_store::DatasetStore; use metadata_db::MetadataDb; use monitoring::telemetry::metrics::Meter; @@ -32,7 +33,7 @@ const RECONCILIATION_INTERVAL: Duration = Duration::from_secs(60); pub async fn new( config: Arc, metadata_db: MetadataDb, - data_store: Store, + data_store: DataStore, dataset_store: DatasetStore, meter: Option, at: SocketAddr, diff --git a/crates/services/server/Cargo.toml b/crates/services/server/Cargo.toml index 87bfc301d..5616f241b 100644 --- a/crates/services/server/Cargo.toml +++ b/crates/services/server/Cargo.toml @@ -5,6 +5,7 @@ version.workspace = true license-file.workspace = true [dependencies] +amp-data-store = { path = "../../core/data-store" } arrow-flight.workspace = true async-stream.workspace = true async-trait.workspace = true diff --git a/crates/services/server/src/config.rs b/crates/services/server/src/config.rs index ddb29bf94..bd79ccf0c 100644 --- a/crates/services/server/src/config.rs +++ b/crates/services/server/src/config.rs @@ -16,8 +16,6 @@ pub struct Config { pub query_max_mem_mb: usize, /// Paths for DataFusion temporary files for spill-to-disk pub spill_location: Vec, - /// Parquet metadata cache size (in MB) - pub parquet_cache_size_mb: u64, } impl Config { @@ -32,7 +30,6 @@ impl Config { self.max_mem_mb, self.query_max_mem_mb, &self.spill_location, - self.parquet_cache_size_mb, ) } } diff --git a/crates/services/server/src/flight.rs b/crates/services/server/src/flight.rs index 2658ae26c..d1613dab6 100644 --- a/crates/services/server/src/flight.rs +++ b/crates/services/server/src/flight.rs @@ -6,6 +6,7 @@ use std::{collections::BTreeMap, pin::Pin, sync::Arc}; +use amp_data_store::DataStore; use arrow_flight::{ ActionType, FlightData, FlightDescriptor, FlightEndpoint, FlightInfo, HandshakeRequest, HandshakeResponse, PutResult, SchemaAsIpc, Ticket, @@ -34,7 +35,6 @@ use common::{ metadata::segments::{BlockRange, ResumeWatermark}, query_context::{Error as CoreError, QueryEnv}, sql_str::SqlStr, - store::Store, utils::error_with_causes, }; use datafusion::{ @@ -52,7 +52,7 @@ use prost::Message as _; use serde_json::json; use thiserror::Error; use tonic::{Request, Response, Status, service::Routes}; -use tracing::{debug, instrument}; +use tracing::instrument; use crate::{ config::Config, metrics::MetricsRegistry, @@ -65,7 +65,7 @@ type TonicStream = Pin> + Send + 'sta pub struct Service { config: Arc, env: QueryEnv, - data_store: Store, + data_store: DataStore, dataset_store: DatasetStore, notification_multiplexer: Arc, metrics: Option>, @@ -74,7 +74,7 @@ pub struct Service { impl Service { pub async fn create( config: Arc, - data_store: Store, + data_store: DataStore, metadata_db: MetadataDb, dataset_store: DatasetStore, meter: Option, @@ -236,7 +236,7 @@ impl Service { let stream = QueryResultStream::Incremental { stream: query - .as_stream() + .into_stream() .map_err(|err| Error::StreamingExecutionError(err.to_string())) .boxed(), schema, @@ -367,7 +367,7 @@ impl Service { .parse::() .map_err(|err| Error::InvalidQuery(err.to_string()))?; - debug!("SQL query: {}", sql_str); + tracing::debug!("SQL query: {}", sql_str); let stream = self .execute_query( diff --git a/crates/services/server/src/service.rs b/crates/services/server/src/service.rs index 0fe205f5a..974e36ee5 100644 --- a/crates/services/server/src/service.rs +++ b/crates/services/server/src/service.rs @@ -5,13 +5,14 @@ use std::{future::Future, net::SocketAddr, sync::Arc}; +use amp_data_store::DataStore; use axum::{ Router, http::StatusCode, routing::get, serve::{Listener as _, ListenerExt as _}, }; -use common::{BoxError, store::Store}; +use common::BoxError; use datafusion::error::DataFusionError; use dataset_store::DatasetStore; use futures::FutureExt; @@ -30,7 +31,7 @@ use crate::{config::Config, flight, jsonl}; pub async fn new( config: Arc, metadata_db: MetadataDb, - data_store: Store, + data_store: DataStore, dataset_store: DatasetStore, meter: Option, flight_at: impl Into>, diff --git a/crates/services/worker/Cargo.toml b/crates/services/worker/Cargo.toml index 7ad435eaa..6ab2c0f48 100644 --- a/crates/services/worker/Cargo.toml +++ b/crates/services/worker/Cargo.toml @@ -5,6 +5,7 @@ version.workspace = true license-file.workspace = true [dependencies] +amp-data-store = { path = "../../core/data-store" } backon.workspace = true chrono.workspace = true common = { path = "../../core/common" } diff --git a/crates/services/worker/src/service.rs b/crates/services/worker/src/service.rs index 34cd8596b..bc839dea7 100644 --- a/crates/services/worker/src/service.rs +++ b/crates/services/worker/src/service.rs @@ -1,7 +1,7 @@ use std::{future::Future, sync::Arc, time::Duration}; +use amp_data_store::DataStore; use backon::{ExponentialBuilder, Retryable}; -use common::{CachedParquetData, ParquetFooterCache, store::Store as DataStore}; use dataset_store::DatasetStore; use futures::TryStreamExt as _; use metadata_db::{ @@ -77,12 +77,6 @@ pub async fn new( // Create notification multiplexer let notification_multiplexer = Arc::new(notification_multiplexer::spawn(metadata_db.clone())); - // Create shared parquet footer cache - let parquet_opts = dump::parquet_opts(&config.parquet); - let parquet_footer_cache = ParquetFooterCache::builder(parquet_opts.cache_size_mb) - .with_weighter(|_k, v: &CachedParquetData| v.metadata.memory_size()) - .build(); - // Worker bootstrap: If the worker is restarted, it needs to be able to resume its state // from the last known state. // @@ -105,7 +99,6 @@ pub async fn new( data_store, notification_multiplexer, meter, - parquet_footer_cache, }, ); @@ -222,7 +215,6 @@ pub(crate) struct WorkerJobCtx { pub data_store: DataStore, pub notification_multiplexer: Arc, pub meter: Option, - pub parquet_footer_cache: ParquetFooterCache, } pub struct Worker { diff --git a/crates/services/worker/src/service/job_impl.rs b/crates/services/worker/src/service/job_impl.rs index cd56918d6..139f69e7d 100644 --- a/crates/services/worker/src/service/job_impl.rs +++ b/crates/services/worker/src/service/job_impl.rs @@ -87,13 +87,9 @@ pub(super) async fn new( } .into(); - let cached_store = common::CachedStore::from_parts( - ctx.data_store.clone(), - job_ctx.parquet_footer_cache.clone(), - ); let compactor = AmpCompactor::start( ctx.metadata_db.clone(), - cached_store, + ctx.data_store.clone(), opts.clone(), physical_table.clone(), metrics.clone(), diff --git a/tests/Cargo.toml b/tests/Cargo.toml index b218d1196..8e54e70f5 100644 --- a/tests/Cargo.toml +++ b/tests/Cargo.toml @@ -13,6 +13,7 @@ name = "tests" [dependencies] alloy.workspace = true amp-client = { path = "../crates/clients/flight" } +amp-data-store = { path = "../crates/core/data-store" } amp-object-store = { path = "../crates/core/object-store" } ampctl = { path = "../crates/bin/ampctl" } arrow-flight.workspace = true diff --git a/tests/src/main.rs b/tests/src/main.rs index b53c1fc78..0f7ab2ee0 100644 --- a/tests/src/main.rs +++ b/tests/src/main.rs @@ -47,8 +47,9 @@ use std::{path::PathBuf, sync::Arc}; +use amp_data_store::DataStore; use clap::Parser; -use common::{BoxError, Store}; +use common::BoxError; use config::Config; use dataset_store::{ DatasetStore, dataset_and_dependencies, manifests::DatasetManifestsStore, @@ -154,8 +155,12 @@ async fn main() { .await .expect("Failed to load config"), ); - let data_store = Store::new(sysdb.conn_pool().clone(), config.data_store_url.clone()) - .expect("Failed to create data store"); + let data_store = DataStore::new( + sysdb.conn_pool().clone(), + config.data_store_url.clone(), + config.parquet.cache_size_mb, + ) + .expect("Failed to create data store"); let dataset_store = { let provider_configs_store = ProviderConfigsStore::new( @@ -272,7 +277,7 @@ async fn bless( ampctl: &Ampctl, dataset_store: DatasetStore, metadata_db: MetadataDb, - data_store: Store, + data_store: DataStore, dataset: Reference, end: u64, ) -> Result<(), BoxError> { @@ -326,7 +331,7 @@ async fn bless( .delete_stream(path_stream) .try_collect::>() .await - .map_err(|err| { + .map_err(|err: object_store::Error| { format!( "Failed to clear existing data for dataset '{}': {}", dataset, err diff --git a/tests/src/testlib/ctx.rs b/tests/src/testlib/ctx.rs index 2eeec8bcc..930e838c8 100644 --- a/tests/src/testlib/ctx.rs +++ b/tests/src/testlib/ctx.rs @@ -29,7 +29,8 @@ use std::{collections::BTreeSet, path::Path, sync::Arc}; -use common::{BoxError, store::Store}; +use amp_data_store::DataStore; +use common::BoxError; use config::Config; use datasets_common::reference::Reference; use worker::node_id::NodeId; @@ -342,9 +343,10 @@ impl TestCtxBuilder { let config = Arc::new(Config::load(daemon_state_dir.config_file(), false, None, true, None).await?); - let data_store = Store::new( + let data_store = DataStore::new( metadata_db.conn_pool().clone(), config.data_store_url.clone(), + config.parquet.cache_size_mb, )?; // Create shared DatasetStore instance (used by both server and worker) diff --git a/tests/src/testlib/fixtures/daemon_controller.rs b/tests/src/testlib/fixtures/daemon_controller.rs index 421d02a69..14dacaddd 100644 --- a/tests/src/testlib/fixtures/daemon_controller.rs +++ b/tests/src/testlib/fixtures/daemon_controller.rs @@ -6,8 +6,11 @@ use std::{net::SocketAddr, sync::Arc}; -use common::{BoxError, BoxResult, store::Store}; +use amp_data_store::DataStore; +use common::{BoxError, BoxResult}; use controller::config::Config; +use dataset_store::DatasetStore; +use metadata_db::MetadataDb; use opentelemetry::metrics::Meter; use tokio::task::JoinHandle; @@ -32,9 +35,9 @@ impl DaemonController { /// The controller will be automatically shut down when the fixture is dropped. pub async fn new( config: Arc, - metadata_db: metadata_db::MetadataDb, - data_store: Store, - dataset_store: dataset_store::DatasetStore, + metadata_db: MetadataDb, + data_store: DataStore, + dataset_store: DatasetStore, meter: Option, ) -> Result { // Convert common config to controller config @@ -71,12 +74,12 @@ impl DaemonController { } /// Get a reference to the metadata database. - pub fn metadata_db(&self) -> &metadata_db::MetadataDb { + pub fn metadata_db(&self) -> &MetadataDb { &self.metadata_db } /// Get a reference to the dataset store. - pub fn dataset_store(&self) -> &dataset_store::DatasetStore { + pub fn dataset_store(&self) -> &DatasetStore { &self.dataset_store } diff --git a/tests/src/testlib/fixtures/daemon_server.rs b/tests/src/testlib/fixtures/daemon_server.rs index 571da8ec3..10b9cf237 100644 --- a/tests/src/testlib/fixtures/daemon_server.rs +++ b/tests/src/testlib/fixtures/daemon_server.rs @@ -6,7 +6,8 @@ use std::{net::SocketAddr, sync::Arc}; -use common::{BoxError, store::Store}; +use amp_data_store::DataStore; +use common::BoxError; use dataset_store::DatasetStore; use metadata_db::MetadataDb; use opentelemetry::metrics::Meter; @@ -21,7 +22,7 @@ use tokio::task::JoinHandle; pub struct DaemonServer { config: Arc, metadata_db: MetadataDb, - data_store: Store, + data_store: DataStore, dataset_store: DatasetStore, server_addrs: BoundAddrs, @@ -38,7 +39,7 @@ impl DaemonServer { pub async fn new( config: Arc, metadb: MetadataDb, - data_store: Store, + data_store: DataStore, dataset_store: DatasetStore, meter: Option, enable_flight: bool, @@ -93,7 +94,7 @@ impl DaemonServer { } /// Get a reference to the data store. - pub fn data_store(&self) -> &Store { + pub fn data_store(&self) -> &DataStore { &self.data_store } @@ -151,6 +152,5 @@ fn server_config_from_common(config: &config::Config) -> Config { max_mem_mb: config.max_mem_mb, query_max_mem_mb: config.query_max_mem_mb, spill_location: config.spill_location.clone(), - parquet_cache_size_mb: config.parquet.cache_size_mb, } } diff --git a/tests/src/testlib/fixtures/daemon_worker.rs b/tests/src/testlib/fixtures/daemon_worker.rs index f48a7e3de..00af37318 100644 --- a/tests/src/testlib/fixtures/daemon_worker.rs +++ b/tests/src/testlib/fixtures/daemon_worker.rs @@ -6,7 +6,8 @@ use std::sync::Arc; -use common::{BoxError, store::Store}; +use amp_data_store::DataStore; +use common::BoxError; use dataset_store::DatasetStore; use metadata_db::MetadataDb; use opentelemetry::metrics::Meter; @@ -21,7 +22,7 @@ use worker::{config::Config, node_id::NodeId, service::RuntimeError as WorkerRun pub struct DaemonWorker { config: Config, metadata_db: MetadataDb, - data_store: Store, + data_store: DataStore, dataset_store: DatasetStore, node_id: NodeId, @@ -36,7 +37,7 @@ impl DaemonWorker { pub async fn new( config: Arc, metadata_db: MetadataDb, - data_store: Store, + data_store: DataStore, dataset_store: DatasetStore, meter: Option, node_id: NodeId, @@ -84,7 +85,7 @@ impl DaemonWorker { } /// Get a reference to the data store. - pub fn data_store(&self) -> &Store { + pub fn data_store(&self) -> &DataStore { &self.data_store } diff --git a/tests/src/testlib/fixtures/snapshot_ctx.rs b/tests/src/testlib/fixtures/snapshot_ctx.rs index 7f4581c89..87657d63a 100644 --- a/tests/src/testlib/fixtures/snapshot_ctx.rs +++ b/tests/src/testlib/fixtures/snapshot_ctx.rs @@ -11,8 +11,9 @@ use std::sync::Arc; +use amp_data_store::DataStore; use common::{ - BoxError, LogicalCatalog, QueryContext, Store, + BoxError, LogicalCatalog, QueryContext, catalog::physical::{Catalog, PhysicalTable}, }; use server::config::Config; @@ -33,7 +34,7 @@ impl SnapshotContext { /// from dataset snapshot reference data or fresh ETL extraction pipeline dumps. pub async fn from_tables( config: &Config, - store: Store, + store: DataStore, tables: Vec>, ) -> Result { let logical = LogicalCatalog::from_tables(tables.iter().map(|t| t.table())); diff --git a/tests/src/testlib/helpers.rs b/tests/src/testlib/helpers.rs index 77376396a..dfcdeb9d0 100644 --- a/tests/src/testlib/helpers.rs +++ b/tests/src/testlib/helpers.rs @@ -9,8 +9,9 @@ pub mod git; use std::{collections::BTreeMap, sync::Arc}; +use amp_data_store::DataStore; use common::{ - BoxError, CachedParquetData, LogicalCatalog, ParquetFooterCache, Store, + BoxError, LogicalCatalog, arrow::array::RecordBatch, catalog::physical::{Catalog, PhysicalTable}, metadata::segments::BlockRange, @@ -40,7 +41,7 @@ use super::fixtures::SnapshotContext; pub async fn dump_internal( config: WorkerConfig, metadata_db: MetadataDb, - data_store: Store, + data_store: DataStore, dataset_store: DatasetStore, dataset_ref: Reference, end_block: EndBlock, @@ -61,9 +62,6 @@ pub async fn dump_internal( let mut tables = Vec::with_capacity(dataset.tables.len()); let opts = dump::parquet_opts(&config.parquet); - let cache = ParquetFooterCache::builder(opts.cache_size_mb) - .with_weighter(|_k, v: &CachedParquetData| v.metadata.memory_size()) - .build(); for table in dataset.resolved_tables(dataset_ref.clone().into()) { // Always reuse existing physical tables in test scenarios (fresh = false) let physical_table: Arc = @@ -79,10 +77,9 @@ pub async fn dump_internal( } } .into(); - let cached_store = common::CachedStore::from_parts(data_store.clone(), cache.clone()); let compactor = AmpCompactor::start( metadata_db.clone(), - cached_store, + data_store.clone(), opts.clone(), physical_table.clone(), None, @@ -131,7 +128,7 @@ pub async fn dump_internal( pub async fn dump_dataset( config: WorkerConfig, metadata_db: MetadataDb, - data_store: Store, + data_store: DataStore, dataset_store: DatasetStore, dataset: Reference, end_block: u64, @@ -157,7 +154,7 @@ pub async fn dump_dataset( /// or synchronization issues between metadata and storage. pub async fn check_table_consistency( table: &Arc, - store: &Store, + store: &DataStore, ) -> Result<(), BoxError> { consistency_check(table, store).await.map_err(Into::into) } @@ -176,7 +173,7 @@ pub async fn check_table_consistency( pub async fn restore_dataset_snapshot( ampctl: &super::fixtures::Ampctl, dataset_store: &DatasetStore, - data_store: &Store, + data_store: &DataStore, dataset_ref: &Reference, ) -> Result>, BoxError> { // 1. Restore via Admin API (indexes files into metadata DB) @@ -358,7 +355,7 @@ pub async fn assert_snapshots_eq(left: &SnapshotContext, right: &SnapshotContext pub async fn catalog_for_dataset( dataset_name: &str, dataset_store: &DatasetStore, - data_store: &Store, + data_store: &DataStore, ) -> Result { let dataset_ref: Reference = format!("_/{dataset_name}@latest") .parse() diff --git a/tests/src/tests/it_dump.rs b/tests/src/tests/it_dump.rs index a46477a6a..9c5825d83 100644 --- a/tests/src/tests/it_dump.rs +++ b/tests/src/tests/it_dump.rs @@ -1,4 +1,4 @@ -use common::Store; +use amp_data_store::DataStore; use datasets_common::reference::Reference; use monitoring::logging; @@ -208,7 +208,7 @@ struct TestCtx { impl TestCtx { /// Get the data store from the daemon server. - fn data_store(&self) -> &Store { + fn data_store(&self) -> &DataStore { self.ctx.daemon_server().data_store() } diff --git a/tests/src/tests/it_sql_dataset_batch_size.rs b/tests/src/tests/it_sql_dataset_batch_size.rs index 903df8f74..10ebc6744 100644 --- a/tests/src/tests/it_sql_dataset_batch_size.rs +++ b/tests/src/tests/it_sql_dataset_batch_size.rs @@ -4,7 +4,7 @@ use std::{ time::Duration, }; -use common::{BoxError, ParquetFooterCache, metadata::Generation}; +use common::{BoxError, metadata::Generation}; use dataset_store::DatasetStore; use datasets_common::reference::Reference; use dump::{ @@ -84,7 +84,6 @@ async fn sql_dataset_input_batch_size() { /// and collection workflows with specific batch size configurations. struct TestCtx { ctx: testlib::ctx::TestCtx, - cache: ParquetFooterCache, } impl TestCtx { @@ -99,11 +98,6 @@ impl TestCtx { .await .expect("Failed to create test context"); - let cache = ParquetFooterCache::builder( - (ctx.daemon_worker().config().parquet.cache_size_mb * 1024 * 1024) as usize, - ) - .build(); - // Deploy the TypeScript dataset let sql_stream_ds = DatasetPackage::new("sql_stream_ds", Some("amp.config.ts")); let cli = ctx.new_amp_cli(); @@ -112,7 +106,7 @@ impl TestCtx { .await .expect("Failed to register sql_stream_ds dataset"); - Self { ctx, cache } + Self { ctx } } /// Get reference to the dataset store. @@ -180,12 +174,10 @@ impl TestCtx { opts_mut.compactor.interval = Duration::ZERO; opts_mut.compactor.algorithm.cooldown_duration = Duration::ZERO; opts_mut.partition = SegmentSizeLimit::new(100, 0, 0, 0, Generation::default(), 10); - let cache = self.cache.clone(); let metadata_db = self.ctx.daemon_worker().metadata_db().clone(); let data_store = self.ctx.daemon_server().data_store().clone(); - let cached_store = common::CachedStore::from_parts(data_store, cache); let mut task = - AmpCompactor::start(metadata_db, cached_store, opts.clone(), table.clone(), None); + AmpCompactor::start(metadata_db, data_store, opts.clone(), table.clone(), None); task.join_current_then_spawn_new().await.unwrap(); while !task.is_finished() { tokio::task::yield_now().await; @@ -211,12 +203,10 @@ impl TestCtx { opts_mut.collector.interval = Duration::ZERO; opts_mut.compactor.interval = Duration::ZERO; opts_mut.partition = SegmentSizeLimit::new(1, 1, 1, 0, Generation::default(), 1.5); - let cache = self.cache.clone(); let metadata_db = self.ctx.daemon_worker().metadata_db().clone(); let data_store = self.ctx.daemon_server().data_store().clone(); - let cached_store = common::CachedStore::from_parts(data_store, cache); let mut task = - AmpCompactor::start(metadata_db, cached_store, opts.clone(), table.clone(), None); + AmpCompactor::start(metadata_db, data_store, opts.clone(), table.clone(), None); task.join_current_then_spawn_new().await.unwrap(); while !task.is_finished() { tokio::task::yield_now().await;