-
-
Notifications
You must be signed in to change notification settings - Fork 158
feat: Multi-tenancy for Parseable server #1518
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
WalkthroughThis pull request introduces comprehensive multi-tenant support across the entire Parseable codebase. The changes thread an optional Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~65 minutes This PR represents a fundamental architectural shift to multi-tenant support affecting nearly every module in the codebase. While individual changes follow consistent patterns (parameter threading and HashMap restructuring), the scope is vast with 60+ files modified across heterogeneous subsystems (alerts, RBAC, storage, query, streams, handlers). Each subsystem requires independent verification of correctness, proper tenant isolation, and parameter propagation, necessitating careful, interconnected review across multiple domains. Possibly related PRs
Suggested labels
Suggested reviewers
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (2 warnings)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 20
Note
Due to the large number of review comments, Critical, Major severity comments were prioritized as inline comments.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (22)
src/hottier.rs (2)
208-220: Critical:delete_hot_tierdoes not usetenant_idwhen constructing the delete path.The function accepts
tenant_idand uses it for the existence check at line 213, but line 216 constructs the path without considering tenant_id:let path = self.hot_tier_path.join(stream);For multi-tenant deployments, this will delete the wrong directory (the non-tenant path) instead of the tenant-scoped path.
🐛 Proposed fix
pub async fn delete_hot_tier( &self, stream: &str, tenant_id: &Option<String>, ) -> Result<(), HotTierError> { if !self.check_stream_hot_tier_exists(stream, tenant_id) { return Err(HotTierValidationError::NotFound(stream.to_owned()).into()); } - let path = self.hot_tier_path.join(stream); + let path = if let Some(tid) = tenant_id.as_ref() { + self.hot_tier_path.join(tid).join(stream) + } else { + self.hot_tier_path.join(stream) + }; fs::remove_dir_all(path).await?; Ok(()) }
471-497:fetch_hot_tier_datesandget_stream_path_for_datemust accept and usetenant_idparameter.These functions construct paths without tenant awareness, while
hot_tier_file_path()is already tenant-scoped. This causes a mismatch:cleanup_hot_tier_old_data()has access totenant_idbut cannot pass it tofetch_hot_tier_dates(), andprocess_parquet_file()cannot passtenant_idtoget_stream_path_for_date(). In multi-tenant deployments, this will cause incorrect path resolution for hot-tier data. Update both function signatures to accepttenant_idand construct paths asself.hot_tier_path.join(tenant_id).join(stream)when present, consistent withhot_tier_file_path().src/handlers/http/alerts.rs (1)
209-244: Missing tenant_id in list endpoint - potential cross-tenant alert visibility.The
listhandler does not extracttenant_idfrom the request, unlike all other handlers in this file. Thelist_alerts_for_usercall may return alerts across all tenants instead of filtering by the requesting tenant's context.🐛 Proposed fix to add tenant context
pub async fn list(req: HttpRequest) -> Result<impl Responder, AlertError> { let session_key = extract_session_key_from_req(&req)?; + let tenant_id = get_tenant_id_from_request(&req); let query_map = web::Query::<HashMap<String, String>>::from_query(req.query_string()) .map_err(|_| AlertError::InvalidQueryParameter("malformed query parameters".to_string()))?; // ... existing code ... // Fetch alerts for the user let alerts = alerts - .list_alerts_for_user(session_key, params.tags_list) + .list_alerts_for_user(session_key, params.tags_list, &tenant_id) .await?;src/storage/store_metadata.rs (1)
301-323: Missing directory creation for tenant-specific staging path.When
tenant_idis provided, the code constructs a path under a tenant subdirectory (line 309), but doesn't ensure this directory exists. TheOpenOptions::open()call will fail withNotFoundif the tenant directory hasn't been created yet.🐛 Proposed fix to ensure tenant directory exists
pub fn put_staging_metadata(meta: &StorageMetadata, tenant_id: &Option<String>) -> io::Result<()> { let mut staging_metadata = meta.clone(); staging_metadata.server_mode = PARSEABLE.options.mode; staging_metadata.staging = PARSEABLE.options.staging_dir().to_path_buf(); let path = if let Some(tenant_id) = tenant_id.as_ref() { - PARSEABLE + let tenant_dir = PARSEABLE .options .staging_dir() - .join(tenant_id) - .join(PARSEABLE_METADATA_FILE_NAME) + .join(tenant_id); + fs::create_dir_all(&tenant_dir)?; + tenant_dir.join(PARSEABLE_METADATA_FILE_NAME) } else { PARSEABLE .options .staging_dir() .join(PARSEABLE_METADATA_FILE_NAME) };src/handlers/http/targets.rs (2)
35-45: Missing tenant_id in post endpoint - targets created without tenant context.The
posthandler doesn't extracttenant_idfrom the request, unlikelist,get,update, anddelete. This could result in targets being created without proper tenant association, breaking tenant isolation.🐛 Proposed fix to add tenant context
// POST /targets pub async fn post( - _req: HttpRequest, + req: HttpRequest, Json(target): Json<Target>, ) -> Result<impl Responder, AlertError> { + let tenant_id = get_tenant_id_from_request(&req); // should check for duplicacy and liveness (??) // add to the map - TARGETS.update(target.clone()).await?; + TARGETS.update(target.clone(), &tenant_id).await?; // Ok(web::Json(target.mask())) Ok(web::Json(target)) }
72-98: update handler missing tenant_id in TARGETS.update call.While
tenant_idis correctly extracted and used to fetchold_target, the subsequentTARGETS.update(target.clone())call on line 94 doesn't pass the tenant context. This may cause the updated target to lose tenant association.🐛 Proposed fix
// should check for duplicacy and liveness (??) // add to the map - TARGETS.update(target.clone()).await?; + TARGETS.update(target.clone(), &tenant_id).await?;src/alerts/alerts_utils.rs (1)
77-90: Tenant isolation gap:execute_remote_querydoes not receivetenant_idparameter.The
execute_local_querypath explicitly receives and usestenant_idfor stream creation and query execution (lines 101, 112), butexecute_remote_query(line 84) is called without this parameter and does not propagate any tenant context tosend_query_request. TheQuerystruct serialized to the remote querier contains no tenant information. If Prism mode requires tenant isolation, either:
- Add
tenant_idparameter toexecute_remote_queryand include it in theQuerystruct or HTTP request, or- Verify that tenant context is derived from the Authorization header on the remote side and document this assumption.
src/handlers/http/ingest.rs (1)
426-445: Pass tenant context through the unchecked event path.
push_logs_uncheckedandappend_temporary_eventshardcodetenant_id: None, but the calling context inairplane.rshas access to tenant information via thekey(SessionKey) parameter. Extract tenant_id usingget_tenant_id_from_key(&key)and thread it through both functions to maintain consistency with the normal ingest flow.src/handlers/http/modal/ingest/ingestor_rbac.rs (2)
189-213: Metadata persisted before password hash is updated.Line 198 calls
put_staging_metadatabefore the password hash is actually updated in the metadata (lines 199-211). This means the old password hash is persisted instead of the new one.🐛 Proposed fix: Move persistence after the mutation
pub async fn post_gen_password( req: HttpRequest, username: web::Path<String>, ) -> Result<HttpResponse, RBACError> { let username = username.into_inner(); let tenant_id = get_tenant_id_from_request(&req); let mut new_hash = String::default(); let mut metadata = get_metadata(&tenant_id).await?; - let _ = storage::put_staging_metadata(&metadata, &tenant_id); if let Some(user) = metadata .users .iter_mut() .filter_map(|user| match user.ty { user::UserType::Native(ref mut user) => Some(user), _ => None, }) .find(|user| user.username == username) { new_hash.clone_from(&user.password_hash); } else { return Err(RBACError::UserDoesNotExist); } + let _ = storage::put_staging_metadata(&metadata, &tenant_id); Users.change_password_hash(&username, &new_hash, &tenant_id); Ok(HttpResponse::Ok().status(StatusCode::OK).finish()) }
98-107: Usetenant_idto access the nested roles HashMap.The
roles().get(r)calls at lines 101 and 145 (inremove_roles_from_user) incorrectly attempt to look up role names directly. Theroles()function returnsHashMap<tenant_id, HashMap<role_name, privileges>>, so the lookup must first access bytenant_id. Both functions havetenant_idavailable from the request but don't use it:Change:
if roles().get(r).is_none()To:
if roles().get(&tenant_id).and_then(|r_map| r_map.get(r)).is_none()This mirrors the pattern used throughout the codebase (e.g.,
src/rbac/utils.rs,src/rbac/mod.rs).src/users/dashboards.rs (1)
244-268: Critical: Dashboard creation silently fails for new tenants.If
dashboards.get_mut(tenant)returnsNone(tenant doesn't exist in the map), the function returnsOk(())without creating the dashboard. This is a logic error — new tenants would never be able to create dashboards.pub async fn create( &self, user_id: &str, dashboard: &mut Dashboard, tenant_id: &Option<String>, ) -> Result<(), DashboardError> { dashboard.created = Some(Utc::now()); dashboard.set_metadata(user_id, None); let tenant = tenant_id.as_ref().map_or(DEFAULT_TENANT, |v| v); let mut dashboards = self.0.write().await; - if let Some(dbs) = dashboards.get_mut(tenant) { - let has_duplicate = dbs - .iter() - .any(|d| d.title == dashboard.title && d.dashboard_id != dashboard.dashboard_id); - if has_duplicate { - return Err(DashboardError::Metadata("Dashboard title must be unique")); - } - self.save_dashboard(dashboard, tenant_id).await?; - - dbs.push(dashboard.clone()); + let dbs = dashboards.entry(tenant.to_owned()).or_default(); + let has_duplicate = dbs + .iter() + .any(|d| d.title == dashboard.title && d.dashboard_id != dashboard.dashboard_id); + if has_duplicate { + return Err(DashboardError::Metadata("Dashboard title must be unique")); } + self.save_dashboard(dashboard, tenant_id).await?; + dbs.push(dashboard.clone()); Ok(()) }src/handlers/http/rbac.rs (1)
128-136: Role existence checks are not tenant-aware.The
roles().contains_key(role)checks query the global roles map without tenant scoping. In a multi-tenant system, this could allow:
- Validating against roles from other tenants
- Assigning roles that exist in another tenant but not in the user's tenant
Consider using tenant-scoped role lookups:
- for role in &user_roles { - if !roles().contains_key(role) { - non_existent_roles.push(role.clone()); - } - } + let tenant = tenant_id.as_ref().map_or(DEFAULT_TENANT, |v| v); + for role in &user_roles { + if !roles().get(tenant).map_or(true, |r| r.contains_key(role)) { + non_existent_roles.push(role.clone()); + } + }Also applies to: 322-333, 378-389
src/rbac/user.rs (1)
153-164: Use standardSaltString::generate(&mut OsRng)instead of custom salt generation.RFC 9106 (Argon2 specification) recommends 16 bytes of salt; this implementation uses 32 bytes. While the custom approach with
SaltString::encode_b64is technically compatible with Argon2, it's unnecessarily complex and deviates from the specification without clear justification. The commented-out standard approach (SaltString::generate(&mut OsRng)) handles salt generation correctly and should be used instead for consistency with best practices.src/catalog/mod.rs (1)
397-490: Avoid failing snapshot/retention flows if the stream isn’t in memory.
Bothcreate_manifest()andremove_manifest_from_snapshot()can error out onPARSEABLE.get_stream(...)?, which can break cleanup on nodes that haven’t loaded that stream. Prefer best-effort in-memory updates, and keep storage updates authoritative.Proposed fix (best-effort in-memory updates)
- let mut first_event_at = PARSEABLE - .get_stream(stream_name, tenant_id)? - .get_first_event(); + let mut first_event_at = PARSEABLE + .get_stream(stream_name, tenant_id) + .ok() + .and_then(|s| s.get_first_event()); ... - match PARSEABLE.get_stream(stream_name, tenant_id) { - Ok(stream) => stream.set_first_event_at(first_event_at.as_ref().unwrap()), - Err(err) => error!(...), - } + if let Some(first_event_at) = first_event_at.as_deref() + && let Ok(stream) = PARSEABLE.get_stream(stream_name, tenant_id) + { + stream.set_first_event_at(first_event_at); + } // remove_manifest_from_snapshot(): - PARSEABLE.get_stream(stream_name, tenant_id)?.reset_first_event_at(); + if let Ok(stream) = PARSEABLE.get_stream(stream_name, tenant_id) { + stream.reset_first_event_at(); + }Also applies to: 492-527
src/parseable/streams.rs (1)
1188-1725: Tests need updates for newStream::new(..., tenant_id)+local_stream_data_path(..., tenant_id)signatures.
As written, the test module still uses the old function arity and will fail to compile.src/rbac/map.rs (1)
201-306: Sessions.user_sessions indexing is inconsistent (will reduce to “always not found”).
track_new()writesuser_sessions[user][tenant], butis_session_expired()/remove_session()/remove_user()/remove_expired_session()read it asuser_sessions[tenant][user]. Also,remove_expired_session()keeps expired sessions (expiry < now).Proposed fix (align to user → tenant → sessions, and correct expiry retention)
pub fn is_session_expired(&self, key: &SessionKey) -> bool { let (userid, tenant_id) = if let Some((user, tenant_id, _)) = self.active_sessions.get(key) { (user, tenant_id) } else { return false; }; - let session = if let Some(tenant_sessions) = self.user_sessions.get(tenant_id) - && let Some(session) = tenant_sessions.get(userid) - { - session - } else { - return false; - }; + let session = self + .user_sessions + .get(userid) + .and_then(|m| m.get(tenant_id)); + let Some(session) = session else { return false }; session .par_iter() .find_first(|(sessionid, expiry)| sessionid.eq(key) && expiry < &Utc::now()) .is_some() } pub fn track_new( &mut self, user: String, key: SessionKey, expiry: DateTime<Utc>, permissions: Vec<Permission>, tenant_id: &Option<String>, ) { let tenant_id = tenant_id.as_ref().map_or(DEFAULT_TENANT, |v| v); self.remove_expired_session(&user, tenant_id); - let sessions = self.user_sessions.entry(user.clone()).or_default(); - sessions.insert(tenant_id.to_owned(), vec![(key.clone(), expiry)]); + self.user_sessions + .entry(user.clone()) + .or_default() + .entry(tenant_id.to_owned()) + .or_default() + .push((key.clone(), expiry)); self.active_sessions .insert(key, (user, tenant_id.to_string(), permissions)); } pub fn remove_session(&mut self, key: &SessionKey) -> Option<String> { let (user, tenant_id, _) = self.active_sessions.remove(key)?; - if let Some(tenant_sessions) = self.user_sessions.get_mut(&tenant_id) - && let Some(sessions) = tenant_sessions.get_mut(&user) + if let Some(user_sessions) = self.user_sessions.get_mut(&user) + && let Some(sessions) = user_sessions.get_mut(&tenant_id) { sessions.retain(|(session, _)| session != key); Some(user) } else { None } } pub fn remove_user(&mut self, username: &str, tenant_id: &str) { - tracing::warn!("removing user- {username}, tenant_id- {tenant_id}"); - tracing::warn!("active sessions- {:?}", self.active_sessions); - tracing::warn!("user sessions- {:?}", self.user_sessions); - let sessions = if let Some(tenant_sessions) = self.user_sessions.get_mut(tenant_id) { - tenant_sessions.remove(username) - } else { - None - }; + let sessions = self + .user_sessions + .get_mut(username) + .and_then(|m| m.remove(tenant_id)); if let Some(sessions) = sessions { sessions.into_iter().for_each(|(key, _)| { self.active_sessions.remove(&key); }) } } fn remove_expired_session(&mut self, user: &str, tenant_id: &str) { let now = Utc::now(); - let sessions = if let Some(tenant_sessions) = self.user_sessions.get_mut(tenant_id) - && let Some(sessions) = tenant_sessions.get_mut(user) - { - sessions - } else { - return; - }; - sessions.retain(|(_, expiry)| expiry < &now); + let Some(sessions) = self + .user_sessions + .get_mut(user) + .and_then(|m| m.get_mut(tenant_id)) + else { + return; + }; + // keep only non-expired + sessions.retain(|(_, expiry)| expiry >= &now); }src/storage/object_storage.rs (1)
1149-1182: Inconsistent tenant_id handling across path builder functions.
schema_path(),stream_json_path(), andmanifest_path()include empty string segments whentenant_idis None, whereasalert_json_path()andmttr_json_path()in the same file use conditional logic to omit the tenant segment entirely. Standardize all path builders to conditionally include tenant only when present, matching the established pattern.Proposed fix (conditional segments)
pub fn schema_path(stream_name: &str, tenant_id: &Option<String>) -> RelativePathBuf { - let tenant = tenant_id.as_ref().map_or("", |v| v); + let tenant = tenant_id.as_deref(); if PARSEABLE.options.mode == Mode::Ingest { ... - RelativePathBuf::from_iter([tenant, stream_name, STREAM_ROOT_DIRECTORY, &file_name]) + if let Some(tenant) = tenant { + RelativePathBuf::from_iter([tenant, stream_name, STREAM_ROOT_DIRECTORY, &file_name]) + } else { + RelativePathBuf::from_iter([stream_name, STREAM_ROOT_DIRECTORY, &file_name]) + } } else { - RelativePathBuf::from_iter([tenant, stream_name, STREAM_ROOT_DIRECTORY, SCHEMA_FILE_NAME]) + if let Some(tenant) = tenant { + RelativePathBuf::from_iter([tenant, stream_name, STREAM_ROOT_DIRECTORY, SCHEMA_FILE_NAME]) + } else { + RelativePathBuf::from_iter([stream_name, STREAM_ROOT_DIRECTORY, SCHEMA_FILE_NAME]) + } } } pub fn stream_json_path(stream_name: &str, tenant_id: &Option<String>) -> RelativePathBuf { - let tenant = tenant_id.as_ref().map_or("", |v| v); + let tenant = tenant_id.as_deref(); if PARSEABLE.options.mode == Mode::Ingest { ... - RelativePathBuf::from_iter([tenant, stream_name, STREAM_ROOT_DIRECTORY, &file_name]) + if let Some(tenant) = tenant { + RelativePathBuf::from_iter([tenant, stream_name, STREAM_ROOT_DIRECTORY, &file_name]) + } else { + RelativePathBuf::from_iter([stream_name, STREAM_ROOT_DIRECTORY, &file_name]) + } } else { - RelativePathBuf::from_iter([ - tenant, - stream_name, - STREAM_ROOT_DIRECTORY, - STREAM_METADATA_FILE_NAME, - ]) + if let Some(tenant) = tenant { + RelativePathBuf::from_iter([ + tenant, + stream_name, + STREAM_ROOT_DIRECTORY, + STREAM_METADATA_FILE_NAME, + ]) + } else { + RelativePathBuf::from_iter([ + stream_name, + STREAM_ROOT_DIRECTORY, + STREAM_METADATA_FILE_NAME, + ]) + } } } pub fn manifest_path(prefix: &str, tenant_id: &Option<String>) -> RelativePathBuf { - let tenant = tenant_id.as_ref().map_or("", |v| v); + let tenant = tenant_id.as_deref(); ... - RelativePathBuf::from_iter([tenant, prefix, &manifest_file_name]) + if let Some(tenant) = tenant { + RelativePathBuf::from_iter([tenant, prefix, &manifest_file_name]) + } else { + RelativePathBuf::from_iter([prefix, &manifest_file_name]) + } }src/metastore/metastores/object_store_metastore.rs (5)
342-390:put_alert_statedoes not usetenant_idin path construction.Similar to
get_alert_state_entry, thetenant_idparameter is accepted but not used when callingalert_state_json_pathat line 352.Proposed fix
- let path = alert_state_json_path(id); + let path = alert_state_json_path(id, tenant_id);
1028-1049:get_all_schemasdoes not usetenant_idin path construction.The path is constructed as
{stream_name}/{STREAM_ROOT_DIRECTORY}without tenant prefix, which would fetch schemas from the wrong location for tenant-scoped streams.Proposed fix
async fn get_all_schemas( &self, stream_name: &str, tenant_id: &Option<String>, ) -> Result<Vec<Schema>, MetastoreError> { - let path_prefix = - relative_path::RelativePathBuf::from(format!("{stream_name}/{STREAM_ROOT_DIRECTORY}")); + let path_prefix = if let Some(tenant) = tenant_id { + relative_path::RelativePathBuf::from(format!("{tenant}/{stream_name}/{STREAM_ROOT_DIRECTORY}")) + } else { + relative_path::RelativePathBuf::from(format!("{stream_name}/{STREAM_ROOT_DIRECTORY}")) + };
864-866:date_pathinget_all_manifest_filesdoesn't include tenant prefix.While
rootis correctly constructed with tenant prefix, thedate_pathon line 865 only usesstream_namewithout the tenant, which may cause incorrect path resolution.Proposed fix
for date in dates { - let date_path = object_store::path::Path::from(format!("{}/{}", stream_name, &date)); + let date_path = object_store::path::Path::from(format!("{}/{}", root, &date)); let resp = self.storage.list_with_delimiter(Some(date_path)).await?;
323-340:alert_state_json_pathfunction signature must be updated to accept and usetenant_id.The
get_alert_state_entry,put_alert_state, anddelete_alert_statemethods accepttenant_idbut don't use it when constructing paths. This breaks tenant isolation—different tenants can access and modify each other's alert states.The root cause is that
alert_state_json_path(alert_id: Ulid)doesn't accepttenant_id, unlike related functions such asalert_json_pathandmttr_json_pathwhich properly scope paths by tenant. Theget_alert_statesmethod correctly demonstrates the pattern by constructing tenant-scoped paths:{tenant}/.alerts/.Update
alert_state_json_pathto accepttenant_idand include it in the path construction, similar to howalert_json_pathhandles tenants. Then update all callers to passtenant_id.
392-403: Unusedtenant_idparameter creates inconsistent behavior in delete/put methods.Methods like
delete_alert_state,delete_alert,delete_target, and others accepttenant_idbut ignore it when constructing paths. However, the correspondingget_*methods usetenant_idto retrieve the same data (e.g.,get_alert_statesretrieves from[&tenant, ALERTS_ROOT_DIRECTORY]butdelete_alert_stateuses the tenant-independentalert_state_json_path(id)). This inconsistency creates cross-tenant data isolation risks.For example:
Targetstruct has atenantfield, butget_object_path()doesn't include it, yetget_targets()retrieves from tenant-specific pathsAlertStateEntryis retrieved with tenant context inget_alert_states()but deleted without it indelete_alert_state()The pattern affects:
delete_alert,delete_alert_state,put_llmconfig,delete_llmconfig,put_dashboard,delete_dashboard,put_chat,delete_chat,put_filter,delete_filter,put_correlation,delete_correlation,put_target,delete_target.Either remove the unused parameter from the method signature, or ensure the path construction includes tenant context consistently with how data is retrieved.
🤖 Fix all issues with AI agents
In @src/alerts/mod.rs:
- Around line 1244-1254: The update() method (and similarly update_state() and
update_notification_state()) currently ignores writes when
self.alerts.write().await.get_mut(tenant) returns None; change the logic to
ensure a tenant bucket is created when missing before inserting: acquire the
write lock on self.alerts and use an entry-or-insert pattern (or explicitly
insert a default bucket for tenant/DEFAULT_TENANT) so that
alerts.insert(*alert.get_id(), alert.clone_box()) always runs for first-time
tenants or racing initializations; apply the same fix to the other referenced
functions (update_state, update_notification_state) that use get_mut(tenant).
In @src/correlation.rs:
- Around line 203-206: The memory delete is removing from the outer map using
correlation.id (self.write().await.remove(&correlation.id)) which deletes a
tenant entry; instead, acquire the write lock, find the tenant's CorrelationMap
by tenant_id, and remove the correlation.id from that inner map (and optionally
remove the tenant key if the inner map becomes empty). Update the code that
follows PARSEABLE.metastore.delete_correlation to lookup
self.write().await.get_mut(&tenant_id) or equivalent, call
inner_map.remove(&correlation.id), and only fall back to removing the outer
entry if you explicitly intend to delete the tenant when its map is empty.
- Around line 140-144: create() persists correlations but only inserts into the
in-memory map if the tenant bucket already exists, so new tenant buckets are
never created and the in-memory cache is inconsistent; update the write lock
usage to use the HashMap entry API (e.g., on self.write().await) for tenant
(using tenant_id.as_ref().map_or(DEFAULT_TENANT, |v| v)) so you call
entry(tenant).or_insert_with(|| /* new map */) and then insert the correlation
(correlation.id / correlation.clone()) into that returned bucket; ensure this
change is applied where the current code checks if let Some(corrs) =
self.write().await.get_mut(tenant) and also verify get_correlation reads from
the same map.
- Around line 176-183: The current update() silently does nothing when the
tenant bucket is missing; change the in-memory update to use the map's entry API
instead of get_mut so a missing bucket is created and the correlation is
inserted. Specifically, replace the get_mut(tenant) branch with an entry for the
tenant (using DEFAULT_TENANT when tenant_id is None) that either returns the
existing corrs map or inserts a new empty map, then put updated_correlation.id
-> updated_correlation.clone() into that map (referencing tenant,
DEFAULT_TENANT, updated_correlation, and corrs.insert).
In @src/handlers/http/cluster/mod.rs:
- Around line 319-322: The sync_streams_with_ingestors call is dropping tenant
context: uncomment and restore the tenant_id parameter in
sync_streams_with_ingestors's signature and update its implementation to attach
tenant_id to the outgoing sync payload/headers, then propagate tenant_id from
put_stream (where it is extracted) into the call; also update
create_update_stream so it preserves and returns headers that include tenant_id
instead of discarding them. Mirror the pattern used by
sync_role_update_with_ingestors and fetch_stats_from_ingestors: add tenant_id
parameter to signatures, forward it through all callers, and ensure ingestor
requests include the tenant_id header/field so stream syncs honor tenant
isolation.
In @src/handlers/http/modal/ingest/ingestor_role.rs:
- Around line 46-52: The tenant validation is inverted: currently it rejects
when a non-default request tenant matches the payload tenant. Update the
condition in the block that uses get_tenant_id_from_request, req_tenant_id,
req_tenant, DEFAULT_TENANT and sync_req.tenant_id to reject only when the
request tenant is not the default AND does not equal the payload tenant (i.e.,
change the second check to a mismatch), and keep returning RoleError::Anyhow
with the same message in that case.
In @src/handlers/http/modal/ingest/mod.rs:
- Around line 27-36: The SyncRole struct currently has private fields which
break external access in ingestor_role.rs; either make the fields public (change
privileges: Vec<DefaultPrivilege> and tenant_id: String to pub privileges:
Vec<DefaultPrivilege> and pub tenant_id: String in SyncRole) or add accessor
methods (e.g., impl SyncRole { pub fn tenant_id(&self) -> &String {
&self.tenant_id } and pub fn privileges(&self) -> &Vec<DefaultPrivilege> {
&self.privileges } }) and update ingestor_role.rs to use those getters if
chosen.
In @src/handlers/http/modal/query/querier_logstream.rs:
- Around line 73-75: The delete_stream call on PARSEABLE.storage currently omits
tenant scoping and may delete across tenants; modify the deletion to use tenant
context by passing tenant_id to delete_stream (or build the tenant-scoped path
before calling it). Locate PARSEABLE.storage.delete_stream and change the
invocation to include tenant_id and stream_name (or compute path = [tenant_id,
stream_name, ...] and call delete_stream(path)) so the removal is scoped to the
requesting tenant.
In @src/handlers/http/oidc.rs:
- Around line 286-290: Existing OIDC user creation is omitting tenant context:
in the match over (existing_user, final_roles) replace the None tenant passed to
put_user with the extracted tenant_id so new users are created within the tenant
scope (i.e., call put_user(&user_id, roles, user_info, bearer, tenant_id)); if
leaving it intentionally unset, instead add a clear TODO with a tracking issue
reference next to the (None, roles) arm to avoid silent multi-tenant gaps;
update related comments to reflect the chosen approach and ensure
existing_user/lookups use the same tenant_id variable.
In @src/handlers/http/rbac.rs:
- Around line 147-148: The user is created without tenant context by calling
user::User::new_basic(username.clone(), None) which breaks tenant scoping;
change that call to pass the extracted tenant_id (e.g. Some(tenant_id.clone())
or wrap/convert tenant_id to the expected tenant type) so the new user is
affiliated with the correct tenant, and ensure the surrounding code imports/uses
tenant_id and matches the function signature of user::User::new_basic.
In @src/hottier.rs:
- Around line 100-116: The loop filtering logic incorrectly uses separate
inequality checks for stream and tenant; replace the dual checks so we skip only
when both the stream and tenant match the current ones. Concretely, in the
for-loop condition that currently uses stream != current_stream && tenant_id !=
*current_tenant_id, change it to exclude entries only when (stream ==
current_stream && tenant_id == *current_tenant_id) — e.g., use && !(stream ==
current_stream && tenant_id == *current_tenant_id) or equivalent — so
check_stream_hot_tier_exists(&stream, &tenant_id) && !(stream == current_stream
&& tenant_id == *current_tenant_id) before calling get_hot_tier and accumulating
into total_hot_tier_size/total_hot_tier_used_size.
In @src/metastore/metastores/object_store_metastore.rs:
- Around line 949-954: The put_manifest (and delete_manifest) implementation
uses manifest_path("", tenant_id) which creates a different hierarchy than
get_manifest_path that calls manifest_path(path.as_str(), tenant_id); update
put_manifest (and delete_manifest) to pass the full partition path into
manifest_path (i.e., build the partition_path string first via
partition_path(stream_name, lower_bound, upper_bound) and use that string as the
first argument to manifest_path) so manifest_path, get_manifest_path,
put_manifest, and delete_manifest all construct the same final object key
consistently.
In @src/parseable/mod.rs:
- Around line 1073-1109: The function load_tenants has an empty branch for
single-tenant mode and silently swallows a poisoned write lock; change the
tenant-meta handling so that when get_parseable_metadata returns Some(...) and
is_multi_tenant is false you return an error (e.g., "Found tenant directory
while not in multi-tenant mode") instead of doing nothing, and replace the last
block that currently does if let Ok(mut t) = self.tenants.write() { ... } else {
Ok(None) } with a match that on Ok(mut t) extends and returns Ok(Some(())) and
on Err(poison) converts the poisoned lock into an anyhow::Error and returns
Err(...) so callers always get an Err on real failures rather than Ok(None); use
the existing symbols load_tenants, PARSEABLE.metastore.get_parseable_metadata,
TENANT_METADATA.insert, and self.tenants.write() to locate and update the logic.
In @src/prism/logstream/mod.rs:
- Around line 71-73: The code has a hardcoded fallback—QueriedStats::default()
with a tracing::warn—that replaces the real query result; revert to using the
actual result by restoring the original assignment (replace the fake "let stats
= QueriedStats::default();" and warning with the real "let stats = stats?;" or
equivalent error-propagating handling), remove the debugging warn, and ensure
the enclosing function signature allows propagation of the error (or map the
error to the appropriate error type) so the real stats are returned instead of
defaults.
In @src/query/mod.rs:
- Around line 136-167: The registered per-tenant schemas (via
catalog.register_schema in create_session_context and GlobalSchemaProvider)
don’t give unqualified table names a way to resolve to the current tenant at
query time; fix by applying tenant-specific schema routing per request rather
than only at transform time: either (A) ensure SQL is rewritten to qualify table
names with the tenant (e.g., "tenant"."table") before parsing/execution, or (B)
set the session’s default schema/search path per request using the tenant_id
(update the SessionContext/SessionState before parsing/execution in the request
path that calls execute()) so unqualified names resolve to the tenant’s
registered schema; update code references create_session_context,
GlobalSchemaProvider, catalog.register_schema and the request/execution
entrypoint that passes tenant_id to apply the per-request default schema.
In @src/query/stream_schema_provider.rs:
- Around line 284-291: The borrow-of-temporary and unwrap are present here as in
get_hottier_execution_plan: stop passing a reference to a temporary format!
result and remove unwrap; construct an owned String for object_store_url (e.g.
let object_store_url = if let Some(tenant_id) = self.tenant_id.as_ref() {
format!("file:///{tenant_id}/") } else { "file:///".to_string() }) and then call
ObjectStoreUrl::parse(&object_store_url) handling the Result (propagate with ?
or map_err to a descriptive error) before passing the parsed ObjectStoreUrl into
create_parquet_physical_plan; update the surrounding function signature to
return Result if needed.
- Around line 224-231: The code currently takes a reference to a temporary
String with &format!(...) and then calls ObjectStoreUrl::parse(...).unwrap(),
which risks a borrow-of-temporary and panics on invalid input; change to build
an owned String (e.g., let object_store_url_string = if let Some(tenant_id) =
self.tenant_id.as_ref() { format!("file:///{tenant_id}/") } else {
"file:///".to_string() }) and then call
ObjectStoreUrl::parse(&object_store_url_string) but handle the Result instead of
unwrap (propagate the error, return a Result, or map_err with a descriptive
error) before passing the parsed ObjectStoreUrl into
create_parquet_physical_plan so no temporary borrow or panic occurs; refer to
tenant_id, object_store_url_string, ObjectStoreUrl::parse, and
create_parquet_physical_plan.
- Around line 631-638: Replace the unwraps with proper error propagation in the
scan() flow: when building object_store_url use
glob_storage.store_url().join(tenant_id).map_err(|e|
DataFusionError::Execution(format!("joining tenant id into store URL failed:
{}", e)))? (or propagate with ? after mapping to DataFusionError), and when
converting to ObjectStoreUrl call
ObjectStoreUrl::parse(object_store_url).map_err(|e|
DataFusionError::Execution(format!("parsing ObjectStoreUrl failed: {}", e)))? so
the errors bubble up from the join and parse calls instead of panicking; keep
the call to self.create_parquet_physical_plan(...) but pass the parsed
ObjectStoreUrl result.
In @src/rbac/map.rs:
- Around line 62-104: Remove the sensitive debug logging that can leak BasicAuth
passwords: delete the tracing::warn! calls in users(), mut_users(), roles(), and
mut_roles() (remove the warn lines that print caller info and {by}). Also update
remove_user() (the code that logs active_sessions and user_sessions around lines
276-306) to stop printing SessionKey::BasicAuth passwords — either remove those
session logs or map/redact sessions before logging (e.g., log only session IDs,
types, or a redacted flag, not the password). Ensure no SessionKey or
Display/Debug impl reveals the password in any log emitted by remove_user(),
users(), mut_users(), roles(), or mut_roles().
In @src/users/filters.rs:
- Around line 128-133: The update method currently uses get_mut on self.0 and
silently drops filters when the tenant bucket doesn't exist; change update to
use the map's entry API so a bucket is created if missing (use
entry(tenant_key_or_DEFAULT_TENANT).or_default()), then remove any existing
filter with matching filter_id (retain) and push the cloned filter into that
bucket; reference the update function, DEFAULT_TENANT, self.0, retain, and push
when locating where to replace the get_mut logic.
🟡 Minor comments (7)
src/hottier.rs-596-603 (1)
596-603: Avoidunwrap()onhot_tier_file_pathresult - could panic on path errors.
hot_tier_file_pathreturns aResultand can fail (e.g., on invalid path conversion). Usingunwrap()here could cause a panic and crash the service. Since this is an existence check, it should gracefully returnfalseon path errors.Also, remove the commented-out dead code (lines 597-600).
🐛 Proposed fix
pub fn check_stream_hot_tier_exists(&self, stream: &str, tenant_id: &Option<String>) -> bool { - // let path = self - // .hot_tier_path - // .join(stream) - // .join(STREAM_HOT_TIER_FILENAME); - let path = self.hot_tier_file_path(stream, tenant_id).unwrap(); - PathBuf::from(path.to_string()).exists() + match self.hot_tier_file_path(stream, tenant_id) { + Ok(path) => PathBuf::from(path.to_string()).exists(), + Err(_) => false, + } }src/utils/mod.rs-79-85 (1)
79-85: Potential panic on invalid UTF-8 header value.
tenant_value.to_str().unwrap()will panic if thetenantheader contains non-UTF8 bytes. Consider handling the error gracefully.🔧 Proposed fix
pub fn get_tenant_id_from_request(req: &HttpRequest) -> Option<String> { if let Some(tenant_value) = req.headers().get("tenant") { - Some(tenant_value.to_str().unwrap().to_owned()) + tenant_value.to_str().ok().map(|s| s.to_owned()) } else { None } }src/handlers/http/middleware.rs-167-177 (1)
167-177: Potential panic on invalid tenant_id header value.
HeaderValue::from_str(&tid).unwrap()will panic iftidcontains characters that are invalid in HTTP headers (e.g., non-visible ASCII). Consider handling the error gracefully.🔧 Proposed fix
let user_and_tenant_id = match get_user_and_tenant_from_request(req.request()) { Ok((uid, tid)) => { - req.headers_mut().insert( - HeaderName::from_static("tenant"), - HeaderValue::from_str(&tid).unwrap(), - ); + if let Ok(header_value) = HeaderValue::from_str(&tid) { + req.headers_mut().insert( + HeaderName::from_static("tenant"), + header_value, + ); + } Ok((uid, tid)) } Err(e) => Err(e), };src/rbac/map.rs-95-104 (1)
95-104: Minor:mut_roles()log message saysmut_users.
Copy/paste typo makes debugging harder.src/catalog/mod.rs-455-461 (1)
455-461:warn!formanifest path_urlis too loud (and tenant-revealing).
This should likely bedebug!/trace!(or removed).src/storage/object_storage.rs-109-117 (1)
109-117: Remove/خفضwarn!debug spam from the hot path.
These look like temporary diagnostics and will flood logs (and may expose tenant IDs and local paths). Preferdebug!/trace!behind a feature flag if needed.Also applies to: 460-461, 964-973, 1078-1084, 1086-1116
src/metastore/metastore_traits.rs-172-182 (1)
172-182: Remove unusedget_chats()method or align return type with other similar methods.
get_chats()returnsDashMap<String, Vec<Bytes>>while all comparable methods (get_dashboards(),get_filters(),get_correlations(),get_keystones(), etc.) returnHashMap. The DashMap choice is inconsistent with the pattern used throughout the trait. Additionally, this method is not called anywhere in the codebase, making it dead code. Either remove the method if no longer needed, or change the return type toHashMapto match other similar getters.
| /// Update the in-mem vector of alerts | ||
| async fn update(&self, alert: &dyn AlertTrait) { | ||
| self.alerts | ||
| .write() | ||
| .await | ||
| .insert(*alert.get_id(), alert.clone_box()); | ||
| let tenant = alert.get_tenant_id().as_ref().map_or(DEFAULT_TENANT, |v| v); | ||
| if let Some(alerts) = self.alerts.write().await.get_mut(tenant) { | ||
| alerts.insert(*alert.get_id(), alert.clone_box()); | ||
| } | ||
| // self.alerts | ||
| // .write() | ||
| // .await | ||
| // .insert(*alert.get_id(), alert.clone_box()); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don’t silently drop alert updates when the tenant bucket doesn’t exist.
update(), the write-back in update_state(), and update_notification_state() only insert if get_mut(tenant) returns Some(_). For first-time tenants (or races with initialization), this can lose writes.
Proposed fix (always create tenant bucket)
async fn update(&self, alert: &dyn AlertTrait) {
let tenant = alert.get_tenant_id().as_ref().map_or(DEFAULT_TENANT, |v| v);
- if let Some(alerts) = self.alerts.write().await.get_mut(tenant) {
- alerts.insert(*alert.get_id(), alert.clone_box());
- }
+ let mut guard = self.alerts.write().await;
+ guard
+ .entry(tenant.to_owned())
+ .or_default()
+ .insert(*alert.get_id(), alert.clone_box());
}
// In update_state() write-back block:
{
let mut write_access = self.alerts.write().await;
let tenant = alert.get_tenant_id().as_ref().map_or(DEFAULT_TENANT, |v| v);
- if let Some(alerts) = write_access.get_mut(tenant) {
- alerts.insert(*alert.get_id(), alert.clone_box());
- }
+ write_access
+ .entry(tenant.to_owned())
+ .or_default()
+ .insert(*alert.get_id(), alert.clone_box());
}
// In update_notification_state():
- if let Some(alerts) = write_access.get_mut(tenant) {
- alerts.insert(*alert.get_id(), alert.clone_box());
- }
+ write_access
+ .entry(tenant.to_owned())
+ .or_default()
+ .insert(*alert.get_id(), alert.clone_box());Also applies to: 1333-1342, 1347-1386, 1388-1404
🤖 Prompt for AI Agents
In @src/alerts/mod.rs around lines 1244 - 1254, The update() method (and
similarly update_state() and update_notification_state()) currently ignores
writes when self.alerts.write().await.get_mut(tenant) returns None; change the
logic to ensure a tenant bucket is created when missing before inserting:
acquire the write lock on self.alerts and use an entry-or-insert pattern (or
explicitly insert a default bucket for tenant/DEFAULT_TENANT) so that
alerts.insert(*alert.get_id(), alert.clone_box()) always runs for first-time
tenants or racing initializations; apply the same fix to the other referenced
functions (update_state, update_notification_state) that use get_mut(tenant).
| let tenant = tenant_id.as_ref().map_or(DEFAULT_TENANT, |v| v); | ||
| // Update in memory | ||
| self.write() | ||
| .await | ||
| .insert(correlation.id.to_owned(), correlation.clone()); | ||
| if let Some(corrs) = self.write().await.get_mut(tenant) { | ||
| corrs.insert(correlation.id.to_owned(), correlation.clone()); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correlation silently not added to memory when tenant bucket doesn't exist.
In create(), if the tenant bucket doesn't exist in the in-memory map (e.g., first correlation for a tenant after restart), the correlation is persisted to metastore but not added to the in-memory cache. This could cause inconsistencies where get_correlation fails even after successful creation.
🐛 Proposed fix using entry API
let tenant = tenant_id.as_ref().map_or(DEFAULT_TENANT, |v| v);
// Update in memory
- if let Some(corrs) = self.write().await.get_mut(tenant) {
- corrs.insert(correlation.id.to_owned(), correlation.clone());
- }
+ self.write()
+ .await
+ .entry(tenant.to_owned())
+ .or_default()
+ .insert(correlation.id.to_owned(), correlation.clone());🤖 Prompt for AI Agents
In @src/correlation.rs around lines 140 - 144, create() persists correlations
but only inserts into the in-memory map if the tenant bucket already exists, so
new tenant buckets are never created and the in-memory cache is inconsistent;
update the write lock usage to use the HashMap entry API (e.g., on
self.write().await) for tenant (using tenant_id.as_ref().map_or(DEFAULT_TENANT,
|v| v)) so you call entry(tenant).or_insert_with(|| /* new map */) and then
insert the correlation (correlation.id / correlation.clone()) into that returned
bucket; ensure this change is applied where the current code checks if let
Some(corrs) = self.write().await.get_mut(tenant) and also verify get_correlation
reads from the same map.
| let tenant = tenant_id.as_ref().map_or(DEFAULT_TENANT, |v| v); | ||
| // Update in memory | ||
| self.write().await.insert( | ||
| updated_correlation.id.to_owned(), | ||
| updated_correlation.clone(), | ||
| ); | ||
| if let Some(corrs) = self.write().await.get_mut(tenant) { | ||
| corrs.insert( | ||
| updated_correlation.id.to_owned(), | ||
| updated_correlation.clone(), | ||
| ); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same issue: update() silently fails when tenant bucket doesn't exist.
Apply the same fix using the entry API to ensure the correlation is always inserted.
🐛 Proposed fix
let tenant = tenant_id.as_ref().map_or(DEFAULT_TENANT, |v| v);
// Update in memory
- if let Some(corrs) = self.write().await.get_mut(tenant) {
- corrs.insert(
- updated_correlation.id.to_owned(),
- updated_correlation.clone(),
- );
- }
+ self.write()
+ .await
+ .entry(tenant.to_owned())
+ .or_default()
+ .insert(updated_correlation.id.to_owned(), updated_correlation.clone());📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| let tenant = tenant_id.as_ref().map_or(DEFAULT_TENANT, |v| v); | |
| // Update in memory | |
| self.write().await.insert( | |
| updated_correlation.id.to_owned(), | |
| updated_correlation.clone(), | |
| ); | |
| if let Some(corrs) = self.write().await.get_mut(tenant) { | |
| corrs.insert( | |
| updated_correlation.id.to_owned(), | |
| updated_correlation.clone(), | |
| ); | |
| } | |
| let tenant = tenant_id.as_ref().map_or(DEFAULT_TENANT, |v| v); | |
| // Update in memory | |
| self.write() | |
| .await | |
| .entry(tenant.to_owned()) | |
| .or_default() | |
| .insert(updated_correlation.id.to_owned(), updated_correlation.clone()); |
🤖 Prompt for AI Agents
In @src/correlation.rs around lines 176 - 183, The current update() silently
does nothing when the tenant bucket is missing; change the in-memory update to
use the map's entry API instead of get_mut so a missing bucket is created and
the correlation is inserted. Specifically, replace the get_mut(tenant) branch
with an entry for the tenant (using DEFAULT_TENANT when tenant_id is None) that
either returns the existing corrs map or inserts a new empty map, then put
updated_correlation.id -> updated_correlation.clone() into that map (referencing
tenant, DEFAULT_TENANT, updated_correlation, and corrs.insert).
| PARSEABLE.metastore.delete_correlation(&correlation, tenant_id).await?; | ||
|
|
||
| // Delete from memory | ||
| self.write().await.remove(&correlation.id); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Critical bug: remove operates on wrong map level, deletes entire tenant's correlations.
Line 206 calls self.write().await.remove(&correlation.id) which removes an entry from the outer HashMap<String, CorrelationMap> using correlation.id as the key. This is incorrect—it should remove the correlation from the inner CorrelationMap for the specific tenant. As written, this could corrupt the correlation store by deleting an unrelated tenant's data (if a tenant_id happens to match a correlation_id) or silently fail.
🐛 Proposed fix
// Delete from storage
PARSEABLE.metastore.delete_correlation(&correlation, tenant_id).await?;
// Delete from memory
- self.write().await.remove(&correlation.id);
+ let tenant = tenant_id.as_ref().map_or(DEFAULT_TENANT, |v| v.as_str());
+ if let Some(corrs) = self.write().await.get_mut(tenant) {
+ corrs.remove(&correlation.id);
+ }
Ok(())🤖 Prompt for AI Agents
In @src/correlation.rs around lines 203 - 206, The memory delete is removing
from the outer map using correlation.id
(self.write().await.remove(&correlation.id)) which deletes a tenant entry;
instead, acquire the write lock, find the tenant's CorrelationMap by tenant_id,
and remove the correlation.id from that inner map (and optionally remove the
tenant key if the inner map becomes empty). Update the code that follows
PARSEABLE.metastore.delete_correlation to lookup
self.write().await.get_mut(&tenant_id) or equivalent, call
inner_map.remove(&correlation.id), and only fall back to removing the outer
entry if you explicitly intend to delete the tenant when its map is empty.
| body: Bytes, | ||
| stream_name: &str, | ||
| // tenant_id: &Option<String> | ||
| ) -> Result<(), StreamError> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
cat -n src/handlers/http/cluster/mod.rs | sed -n '310,330p'Repository: parseablehq/parseable
Length of output: 754
🏁 Script executed:
rg -n "sync_streams_with_ingestors" --type rust -B 3 -A 8Repository: parseablehq/parseable
Length of output: 4914
🏁 Script executed:
rg -n "SyncStream" --type rust -B 2 -A 5Repository: parseablehq/parseable
Length of output: 47
🏁 Script executed:
# Check how tenant_id is handled in other sync functions
rg -n "fn sync_.*_with_ingestors" --type rust -B 1 -A 3Repository: parseablehq/parseable
Length of output: 2398
🏁 Script executed:
cat -n src/handlers/http/cluster/mod.rs | sed -n '317,380p'Repository: parseablehq/parseable
Length of output: 2787
🏁 Script executed:
# Check if tenant_id is used elsewhere in cluster sync functions
rg -n "tenant_id" src/handlers/http/cluster/mod.rs -B 2 -A 2Repository: parseablehq/parseable
Length of output: 1159
🏁 Script executed:
# Look for tenant context in headers or request extraction
rg -n "extract_tenant_id\|get_tenant\|HeaderMap" src/handlers/http/modal/query/querier_logstream.rs -B 2 -A 2Repository: parseablehq/parseable
Length of output: 47
🏁 Script executed:
# Check the querier_logstream context around the sync_streams call
cat -n src/handlers/http/modal/query/querier_logstream.rs | sed -n '100,150p'Repository: parseablehq/parseable
Length of output: 1963
🏁 Script executed:
# Check the parseable/mod.rs context around the sync_streams calls
cat -n src/parseable/mod.rs | sed -n '500,530p'Repository: parseablehq/parseable
Length of output: 1483
🏁 Script executed:
# Look for how tenant_id is extracted from headers in other handlers
rg -n "extract_tenant_id\|parse.*tenant\|X-TENANT\|tenant.*header" --type rust -B 1 -A 1Repository: parseablehq/parseable
Length of output: 47
🏁 Script executed:
# Check what get_tenant_id_from_request does
rg -n "fn get_tenant_id_from_request\|extract.*tenant" --type rust -B 2 -A 8Repository: parseablehq/parseable
Length of output: 47
🏁 Script executed:
# Look for tenant context in the returned headers from create_update_stream
rg -n "create_update_stream" src/parseable/mod.rs -B 2 -A 15Repository: parseablehq/parseable
Length of output: 1710
🏁 Script executed:
# Check if there's any tenant-related header being set
rg -n "TENANT\|tenant" src/handlers/http/modal/query/querier_logstream.rs -iRepository: parseablehq/parseable
Length of output: 47
🏁 Script executed:
# Check the full create_update_stream method to see what it returns in headers
cat -n src/parseable/mod.rs | sed -n '636,750p'Repository: parseablehq/parseable
Length of output: 5004
🏁 Script executed:
# Look for how tenant_id is supposed to be communicated to ingestors
rg -n "SyncRole\|sync_role_update_with_ingestors" src/handlers/http/cluster/mod.rs -B 5 -A 15 | head -60Repository: parseablehq/parseable
Length of output: 47
🏁 Script executed:
# Check if tenant context is embedded in the body or headers when syncing
rg -n "tenant" src/handlers/http/cluster/mod.rs -i -B 2 -A 2 | head -40Repository: parseablehq/parseable
Length of output: 1420
🏁 Script executed:
# Find ALL callers of sync_streams_with_ingestors to understand the impact
rg -n "sync_streams_with_ingestors\(" --type rust -B 3 -A 1Repository: parseablehq/parseable
Length of output: 1483
🏁 Script executed:
# Check if querier_logstream has access to tenant_id at the point of sync call
cat -n src/handlers/http/modal/query/querier_logstream.rs | sed -n '120,145p'Repository: parseablehq/parseable
Length of output: 1019
Tenant context must be propagated to ingestors during stream synchronization.
The tenant_id is available in put_stream (extracted at line 126 of querier_logstream.rs) but is not passed to sync_streams_with_ingestors. The function signature has the parameter commented out (line 321), and create_update_stream discards the tenant context when returning headers. This causes ingestors to receive stream sync requests without tenant association, breaking tenant isolation in multi-tenant deployments.
Other sync functions in the same module (sync_role_update_with_ingestors, fetch_stats_from_ingestors) correctly propagate tenant_id; the same pattern should be applied here.
🤖 Prompt for AI Agents
In @src/handlers/http/cluster/mod.rs around lines 319 - 322, The
sync_streams_with_ingestors call is dropping tenant context: uncomment and
restore the tenant_id parameter in sync_streams_with_ingestors's signature and
update its implementation to attach tenant_id to the outgoing sync
payload/headers, then propagate tenant_id from put_stream (where it is
extracted) into the call; also update create_update_stream so it preserves and
returns headers that include tenant_id instead of discarding them. Mirror the
pattern used by sync_role_update_with_ingestors and fetch_stats_from_ingestors:
add tenant_id parameter to signatures, forward it through all callers, and
ensure ingestor requests include the tenant_id header/field so stream syncs
honor tenant isolation.
| let object_store_url = if let Some(tenant_id) = self.tenant_id.as_ref() { | ||
| &format!("file:///{tenant_id}/") | ||
| } else { | ||
| "file:///" | ||
| }; | ||
| self.create_parquet_physical_plan( | ||
| execution_plans, | ||
| ObjectStoreUrl::parse("file:///").unwrap(), | ||
| ObjectStoreUrl::parse(object_store_url).unwrap(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential borrow-of-temporary issue and unhandled parse error.
-
The pattern
&format!("file:///{tenant_id}/")creates a reference to a temporaryStringthat may be dropped before use. This typically causes a compile error ("temporary value dropped while borrowed") or leads to fragile code. -
The
unwrap()onObjectStoreUrl::parse()can panic iftenant_idcontains characters that produce an invalid URL.
Suggested fix using owned String
- let object_store_url = if let Some(tenant_id) = self.tenant_id.as_ref() {
- &format!("file:///{tenant_id}/")
- } else {
- "file:///"
- };
+ let object_store_url = match self.tenant_id.as_ref() {
+ Some(tenant_id) => format!("file:///{tenant_id}/"),
+ None => "file:///".to_string(),
+ };
self.create_parquet_physical_plan(
execution_plans,
- ObjectStoreUrl::parse(object_store_url).unwrap(),
+ ObjectStoreUrl::parse(&object_store_url)
+ .map_err(|e| DataFusionError::Plan(format!("Invalid object store URL: {e}")))?,🤖 Prompt for AI Agents
In @src/query/stream_schema_provider.rs around lines 224 - 231, The code
currently takes a reference to a temporary String with &format!(...) and then
calls ObjectStoreUrl::parse(...).unwrap(), which risks a borrow-of-temporary and
panics on invalid input; change to build an owned String (e.g., let
object_store_url_string = if let Some(tenant_id) = self.tenant_id.as_ref() {
format!("file:///{tenant_id}/") } else { "file:///".to_string() }) and then call
ObjectStoreUrl::parse(&object_store_url_string) but handle the Result instead of
unwrap (propagate the error, return a Result, or map_err with a descriptive
error) before passing the parsed ObjectStoreUrl into
create_parquet_physical_plan so no temporary borrow or panic occurs; refer to
tenant_id, object_store_url_string, ObjectStoreUrl::parse, and
create_parquet_physical_plan.
| let object_store_url = if let Some(tenant_id) = self.tenant_id.as_ref() { | ||
| &format!("file:///{tenant_id}/") | ||
| } else { | ||
| "file:///" | ||
| }; | ||
| self.create_parquet_physical_plan( | ||
| execution_plans, | ||
| ObjectStoreUrl::parse("file:///").unwrap(), | ||
| ObjectStoreUrl::parse(object_store_url).unwrap(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same borrow-of-temporary and unwrap issues as hot tier path.
This duplicates the problematic pattern from get_hottier_execution_plan. Apply the same fix using an owned String and proper error handling for ObjectStoreUrl::parse().
🤖 Prompt for AI Agents
In @src/query/stream_schema_provider.rs around lines 284 - 291, The
borrow-of-temporary and unwrap are present here as in
get_hottier_execution_plan: stop passing a reference to a temporary format!
result and remove unwrap; construct an owned String for object_store_url (e.g.
let object_store_url = if let Some(tenant_id) = self.tenant_id.as_ref() {
format!("file:///{tenant_id}/") } else { "file:///".to_string() }) and then call
ObjectStoreUrl::parse(&object_store_url) handling the Result (propagate with ?
or map_err to a descriptive error) before passing the parsed ObjectStoreUrl into
create_parquet_physical_plan; update the surrounding function signature to
return Result if needed.
| let object_store_url = if let Some(tenant_id) = self.tenant_id.as_ref() { | ||
| glob_storage.store_url().join(tenant_id).unwrap() | ||
| } else { | ||
| glob_storage.store_url() | ||
| }; | ||
| self.create_parquet_physical_plan( | ||
| &mut execution_plans, | ||
| ObjectStoreUrl::parse(glob_storage.store_url()).unwrap(), | ||
| ObjectStoreUrl::parse(object_store_url).unwrap(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# First, let's look at the actual file and the specific lines
cat -n src/query/stream_schema_provider.rs | sed -n '625,645p'Repository: parseablehq/parseable
Length of output: 1004
🏁 Script executed:
#!/bin/bash
# Find the store_url() method definition
rg -n "fn store_url" --type rust -B 2 -A 5Repository: parseablehq/parseable
Length of output: 2397
🏁 Script executed:
#!/bin/bash
# Check ObjectStoreUrl type and its methods, particularly parse() and join()
rg -n "ObjectStoreUrl" --type rust | head -30Repository: parseablehq/parseable
Length of output: 1227
🏁 Script executed:
#!/bin/bash
# Look for the actual implementation of join() on the store_url return type
rg -n "pub fn join" --type rust -A 3Repository: parseablehq/parseable
Length of output: 47
🏁 Script executed:
#!/bin/bash
# Check the url crate's join() method behavior - look for examples in the codebase
rg -n "\.join\(" --type rust src/query/stream_schema_provider.rs -B 2 -A 2Repository: parseablehq/parseable
Length of output: 585
🏁 Script executed:
#!/bin/bash
# Check how other code in this file handles ObjectStoreUrl::parse errors
rg -n "ObjectStoreUrl::parse" --type rust -B 2 -A 2 src/query/stream_schema_provider.rsRepository: parseablehq/parseable
Length of output: 735
🏁 Script executed:
#!/bin/bash
# Look at the create_parquet_physical_plan signature to understand what type it expects
rg -n "fn create_parquet_physical_plan" --type rust -A 8 src/query/stream_schema_provider.rsRepository: parseablehq/parseable
Length of output: 435
🏁 Script executed:
#!/antml/bash
# Check if there are any error handling patterns for url::Url operations elsewhere
rg -n "store_url\(\)\.join" --type rust -B 3 -A 3Repository: parseablehq/parseable
Length of output: 640
🏁 Script executed:
#!/bin/bash
# Search for where tenant_id is assigned/validated in the scan method
rg -n "tenant_id" --type rust src/query/stream_schema_provider.rs | head -20Repository: parseablehq/parseable
Length of output: 1225
🏁 Script executed:
#!/bin/bash
# Check the scan method signature and how tenant_id is provided
rg -n "fn scan" --type rust -A 15 src/query/stream_schema_provider.rs | head -50Repository: parseablehq/parseable
Length of output: 745
Handle error cases in URL construction instead of unwrapping.
The join() call on line 632 and ObjectStoreUrl::parse() on line 638 should propagate errors rather than panic, since the scan() method returns Result<Arc<dyn ExecutionPlan>, DataFusionError>. The parse() call is necessary here for type conversion from url::Url to ObjectStoreUrl, not redundant.
Suggested fix with error handling
let object_store_url = if let Some(tenant_id) = self.tenant_id.as_ref() {
- glob_storage.store_url().join(tenant_id).unwrap()
+ glob_storage.store_url().join(tenant_id)
+ .map_err(|e| DataFusionError::Plan(format!("Invalid tenant path: {e}")))?
} else {
glob_storage.store_url()
};
self.create_parquet_physical_plan(
&mut execution_plans,
- ObjectStoreUrl::parse(object_store_url).unwrap(),
+ ObjectStoreUrl::parse(&object_store_url)
+ .map_err(|e| DataFusionError::Plan(format!("Invalid object store URL: {e}")))?,🤖 Prompt for AI Agents
In @src/query/stream_schema_provider.rs around lines 631 - 638, Replace the
unwraps with proper error propagation in the scan() flow: when building
object_store_url use glob_storage.store_url().join(tenant_id).map_err(|e|
DataFusionError::Execution(format!("joining tenant id into store URL failed:
{}", e)))? (or propagate with ? after mapping to DataFusionError), and when
converting to ObjectStoreUrl call
ObjectStoreUrl::parse(object_store_url).map_err(|e|
DataFusionError::Execution(format!("parsing ObjectStoreUrl failed: {}", e)))? so
the errors bubble up from the join and parse calls instead of panicking; keep
the call to self.create_parquet_physical_plan(...) but pass the parsed
ObjectStoreUrl result.
| pub fn users() -> RwLockReadGuard<'static, Users> { | ||
| USERS | ||
| .get() | ||
| .expect("map is set") | ||
| .read() | ||
| .expect("not poisoned") | ||
| { | ||
| tracing::warn!("users called by !!!!!"); | ||
| USERS | ||
| .get() | ||
| .expect("map is set") | ||
| .read() | ||
| .expect("not poisoned") | ||
| } | ||
| } | ||
|
|
||
| pub fn mut_users() -> RwLockWriteGuard<'static, Users> { | ||
| USERS | ||
| .get() | ||
| .expect("map is set") | ||
| .write() | ||
| .expect("not poisoned") | ||
| pub fn mut_users(by: &str) -> RwLockWriteGuard<'static, Users> { | ||
| { | ||
| tracing::warn!("mut_users called by {by}!!!!!"); | ||
| USERS | ||
| .get() | ||
| .expect("map is set") | ||
| .write() | ||
| .expect("not poisoned") | ||
| } | ||
| } | ||
|
|
||
| pub fn roles() -> RwLockReadGuard<'static, Roles> { | ||
| ROLES | ||
| .get() | ||
| .expect("map is set") | ||
| .read() | ||
| .expect("not poisoned") | ||
| { | ||
| tracing::warn!("roles called by !!!!!"); | ||
| ROLES | ||
| .get() | ||
| .expect("map is set") | ||
| .read() | ||
| .expect("not poisoned") | ||
| } | ||
| } | ||
|
|
||
| pub fn mut_roles() -> RwLockWriteGuard<'static, Roles> { | ||
| ROLES | ||
| .get() | ||
| .expect("map is set") | ||
| .write() | ||
| .expect("not poisoned") | ||
| pub fn mut_roles(by: &str) -> RwLockWriteGuard<'static, Roles> { | ||
| { | ||
| tracing::warn!("mut_users called by {by}!!!!!"); | ||
| ROLES | ||
| .get() | ||
| .expect("map is set") | ||
| .write() | ||
| .expect("not poisoned") | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove debug logging that can leak credentials (BasicAuth password).
remove_user() logs active_sessions and user_sessions; active_sessions includes SessionKey::BasicAuth { password }. This is a hard blocker.
Also applies to: 276-306
🤖 Prompt for AI Agents
In @src/rbac/map.rs around lines 62 - 104, Remove the sensitive debug logging
that can leak BasicAuth passwords: delete the tracing::warn! calls in users(),
mut_users(), roles(), and mut_roles() (remove the warn lines that print caller
info and {by}). Also update remove_user() (the code that logs active_sessions
and user_sessions around lines 276-306) to stop printing SessionKey::BasicAuth
passwords — either remove those session logs or map/redact sessions before
logging (e.g., log only session IDs, types, or a redacted flag, not the
password). Ensure no SessionKey or Display/Debug impl reveals the password in
any log emitted by remove_user(), users(), mut_users(), roles(), or mut_roles().
| pub async fn update(&self, filter: &Filter, tenant_id: &Option<String>) { | ||
| let mut s = self.0.write().await; | ||
| s.retain(|f| f.filter_id != filter.filter_id); | ||
| s.push(filter.clone()); | ||
| if let Some(filters) = s.get_mut(tenant_id.as_ref().map_or(DEFAULT_TENANT, |v| v)) { | ||
| filters.retain(|f| f.filter_id != filter.filter_id); | ||
| filters.push(filter.clone()); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Filter silently dropped when tenant bucket doesn't exist.
The update method only modifies existing tenant buckets via get_mut(). If the tenant entry doesn't exist in the map (e.g., when adding the first filter for a new tenant after a server restart), the filter will be silently dropped without insertion.
🐛 Proposed fix using entry API
pub async fn update(&self, filter: &Filter, tenant_id: &Option<String>) {
let mut s = self.0.write().await;
- if let Some(filters) = s.get_mut(tenant_id.as_ref().map_or(DEFAULT_TENANT, |v| v)) {
- filters.retain(|f| f.filter_id != filter.filter_id);
- filters.push(filter.clone());
- }
+ let tenant = tenant_id.as_ref().map_or(DEFAULT_TENANT, |v| v.as_str());
+ let filters = s.entry(tenant.to_owned()).or_default();
+ filters.retain(|f| f.filter_id != filter.filter_id);
+ filters.push(filter.clone());
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| pub async fn update(&self, filter: &Filter, tenant_id: &Option<String>) { | |
| let mut s = self.0.write().await; | |
| s.retain(|f| f.filter_id != filter.filter_id); | |
| s.push(filter.clone()); | |
| if let Some(filters) = s.get_mut(tenant_id.as_ref().map_or(DEFAULT_TENANT, |v| v)) { | |
| filters.retain(|f| f.filter_id != filter.filter_id); | |
| filters.push(filter.clone()); | |
| } | |
| pub async fn update(&self, filter: &Filter, tenant_id: &Option<String>) { | |
| let mut s = self.0.write().await; | |
| let tenant = tenant_id.as_ref().map_or(DEFAULT_TENANT, |v| v.as_str()); | |
| let filters = s.entry(tenant.to_owned()).or_default(); | |
| filters.retain(|f| f.filter_id != filter.filter_id); | |
| filters.push(filter.clone()); | |
| } |
🤖 Prompt for AI Agents
In @src/users/filters.rs around lines 128 - 133, The update method currently
uses get_mut on self.0 and silently drops filters when the tenant bucket doesn't
exist; change update to use the map's entry API so a bucket is created if
missing (use entry(tenant_key_or_DEFAULT_TENANT).or_default()), then remove any
existing filter with matching filter_id (retain) and push the cloned filter into
that bucket; reference the update function, DEFAULT_TENANT, self.0, retain, and
push when locating where to replace the get_mut logic.
This is a WIP
PR to introduce multi-tenancy to parseable server
Description
This PR has:
Summary by CodeRabbit
Release Notes
New Features
Improvements
✏️ Tip: You can customize this high-level summary in your review settings.