-
Notifications
You must be signed in to change notification settings - Fork 2.1k
feat(ducklake): copy data_imports result to Ducklake in local dev #42862
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Size Change: -9 B (0%) Total Size: 3.51 MB ℹ️ View Unchanged
|
…by row count and partition check
…k because delta doesn't expose
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
10 files reviewed, no comments
fuziontech
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PR Review: DuckLake Copy for Data Imports
Summary
This PR adds a Temporal workflow to copy data imports (Stripe, Hubspot, etc.) into DuckLake after successful syncs. It follows the existing pattern from the data modeling workflow and includes feature flag gating, verification, and metrics.
Positive Aspects
- Clean separation of concerns - Each activity has a single responsibility
- Feature flag gating - Safe rollout with
ducklake-data-imports-copy-workflow - Fire-and-forget pattern - Uses
ParentClosePolicy.ABANDONso parent workflow succeeds even if copy fails - Comprehensive verification - Row counts, schema hashes, and partition count checks
- Good test coverage - Tests covering serialization, feature flags, metadata prep, copy execution, and workflow integration
- Observability - Metrics for workflow completion and verification results
- Documentation - Thorough README with local testing instructions
Suggestions (Non-blocking)
1. Potential SQL Injection Defense in Depth (Medium)
ducklake_copy_data_imports_workflow.py:233-236:
conn.execute(f"CREATE SCHEMA IF NOT EXISTS {qualified_schema}")
conn.execute(
f"CREATE OR REPLACE TABLE {qualified_table} AS SELECT * FROM delta_scan(?)",
[inputs.model.source_table_uri],
)While _sanitize_ducklake_identifier sanitizes the names, consider using DuckDB's identifier quoting for the schema/table names for defense in depth.
2. Missing Error Handling for Schema Not Found
ducklake_copy_data_imports_workflow.py:176-178 - If a schema is deleted between the parent workflow starting and this activity running, ExternalDataSchema.DoesNotExist will be raised. Could catch and log gracefully rather than failing the entire workflow.
3. Hardcoded Identifier Length Limit
ducklake_copy_data_imports_workflow.py:267 - The 63-character limit appears PostgreSQL-inspired but DuckDB identifiers can be longer. Consider documenting why 63.
4. Connection Could Use Context Manager
ducklake_copy_data_imports_workflow.py:217-240 - Using with duckdb.connect() as conn: would be slightly cleaner than try/finally.
5. Duplicate Schema Fetch
_run_data_imports_partition_verification and _run_data_imports_schema_verification both call _fetch_delta_schema. These could share the fetched schema to avoid redundant network calls.
6. Missing Type Annotation
ducklake_copy_data_imports_workflow.py:270 - The logger parameter is missing a type annotation.
Questions
- Should the DuckLake copy workflow run on a dedicated task queue (mentioned as TODO)?
- What's the expected table size for data imports? The 30-minute timeout seems generous.
- Is there a plan to handle partial failures (e.g., 3 of 5 schemas succeed)?
Verdict: Approve ✅
The implementation is solid, follows existing patterns, and has good test coverage. The concerns raised are mostly defensive improvements rather than blocking issues. The fire-and-forget pattern correctly ensures data import success doesn't depend on DuckLake availability.
|
On the review points 1, SQL injection: sanitization keeps only alpha-numeric values, underscore(_) and a manually added dot(.), this should cover all bases. |
Problem
Part of PostHog Data Warehouse work stream.
Changes
Follows same pattern as the data modeling prototype #42005 #42541 #42231
What's included:
What's yet to be done:
How did you test this code?
Unit tests
pytest posthog/temporal/tests/data_imports/test_ducklake_copy_data_imports_workflow.py
Manual test
1, create and enable feature flag ducklake-data-imports-copy-workflow
2, trigger a data import (i.e. Stripe), if you already have it, can simply go to temporal workflow and trigger an import workflow
3, upon completion of the import workflow, it should trigger a copy sub-workflow of type ducklake-copy.data-imports
4, this should create a new table in ducklake, which can be found and queried like this
Documentation
posthog/ducklake/README.md
Changelog: (features only) Is this feature complete?
No