Skip to content
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 90 additions & 26 deletions posthog/ducklake/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
# DuckLake copy workflow configuration

The DuckLake copy workflow copies materialized data modeling outputs into a DuckLake-managed S3 bucket. Workers running this workflow must be configured explicitly; otherwise copies will fail before they even reach the first activity.
The DuckLake copy workflows copy data into a DuckLake-managed S3 bucket. There are two workflows:

1. **Data Modeling** (`ducklake-copy.data-modeling`) - copies materialized saved query outputs
2. **Data Imports** (`ducklake-copy.data-imports`) - copies external data source imports (Stripe, Hubspot, etc.)

Both workflows share the same infrastructure and configuration. Workers running these workflows must be configured explicitly; otherwise copies will fail before they even reach the first activity.

## Environment variables

Expand All @@ -22,7 +27,7 @@ For local dev the defaults are:

- `DUCKLAKE_RDS_HOST=localhost`
- `DUCKLAKE_RDS_PORT=5432`
- `DUCKLAKE_RDS_DATABASE=ducklake`
- `DUCKLAKE_RDS_DATABASE=ducklake_catalog`
- `DUCKLAKE_RDS_USERNAME=posthog`
- `DUCKLAKE_RDS_PASSWORD=posthog`
- `DUCKLAKE_BUCKET=ducklake-dev`
Expand All @@ -32,24 +37,30 @@ For local dev the defaults are:

## Feature flag gating

The modeling workflow launches the DuckLake copy child only when the
`ducklake-data-modeling-copy-workflow` feature flag is enabled for the team (as evaluated
via `feature_enabled`). Create or update that flag locally to target the team you are testing
with—otherwise the copy workflow will be skipped even if the rest of the configuration is correct.
Each workflow is gated by its own feature flag (evaluated via `feature_enabled`). Create or update the appropriate flag locally to target the team you are testing with—otherwise the copy workflow will be skipped even if the rest of the configuration is correct.

| Workflow | Feature Flag |
|----------|--------------|
| Data Modeling | `ducklake-data-modeling-copy-workflow` |
| Data Imports | `ducklake-data-imports-copy-workflow` |

## Target bucket layout

Every model copy is written to a deterministic prefix inside the DuckLake data bucket. Each workflow
namespaces its data under a workflow identifier (e.g., `data_modeling` for the Temporal pipeline captured
in this doc):
Every copy is written to a deterministic schema inside DuckLake. Each workflow namespaces its data under a workflow-specific schema:

### Data Modeling

```text
s3://<DUCKLAKE_BUCKET>/<workflow_identifier>/team_<team_id>/job_<job_id>/model_<model_label>/<normalized_name>.parquet
```
- **Schema**: `data_modeling_team_<team_id>`
- **Table**: `<model_label>` (derived from saved query name)
- **Example**: `ducklake.data_modeling_team_123.my_saved_query`

For the Temporal data modeling copy workflow, `<workflow_identifier>` is `data_modeling`.
### Data Imports

Re-running a copy simply overwrites the same Parquet object. Choose the bucket so its lifecycle/replication policies fit that structure.
- **Schema**: `data_imports_team_<team_id>`
- **Table**: `<source_type>_<normalized_name>_<schema_id_hex[:8]>`
- **Example**: `ducklake.data_imports_team_123.stripe_invoices_a1b2c3d4`

Re-running a copy simply overwrites the same table. Choose the bucket so its lifecycle/replication policies fit that structure.

## Required permissions

Expand All @@ -62,33 +73,86 @@ For AWS S3, grant the worker role at least `s3:ListBucket`, `s3:GetObject`, `s3:

## Local testing (dev)

Follow this checklist to exercise the DuckLake copy workflow on a local checkout without needing extra tribal knowledge:
Follow these checklists to exercise the DuckLake copy workflows on a local checkout.

### Testing Data Modeling workflow

1. **Start the dev stack**
1. **Start the dev stack**
Run `hogli start` (or `bin/start`) so Postgres, MinIO, Temporal, and all DuckLake defaults are up. Make sure the `ducklake-data-modeling-copy-workflow` feature flag is enabled for the team you plan to use.

2. **Trigger a model materialization from the app**
2. **Trigger a model materialization from the app**
In the PostHog UI, open Data Warehouse → Views, pick (or create) a view, open the Materialization section, enable it if needed, and click **Sync now**. This schedules the `data-modeling-run` workflow for that team/view.

3. **Observe the data-modeling workflow**
3. **Observe the data-modeling workflow**
Visit the Temporal UI at `http://localhost:8081/namespaces/default/workflows` and confirm a `data-modeling-run` execution appears. Wait for it to finish successfully.

4. **Verify the DuckLake copy workflow runs**
4. **Verify the DuckLake copy workflow runs**
Once the modeling workflow completes it automatically starts `ducklake-copy.data-modeling` as a child run. You should see it listed in the same Temporal UI; wait for the run to complete.

5. **Query the new DuckLake table**
The copy activity registers a view named `ducklake_dev.data_modeling_team_<team_id>.model_<model_label>`. From any DuckDB shell you can inspect it, for example:
5. **Query the new DuckLake table**
The copy activity creates a table at `ducklake.data_modeling_team_<team_id>.<model_label>`. From any DuckDB shell you can inspect it, for example:

```sql
duckdb -c "
INSTALL ducklake;
LOAD ducklake;
SET s3_region='us-east-1';
SET s3_endpoint='localhost:19000';
SET s3_use_ssl=false;
SET s3_access_key_id='object_storage_root_user';
SET s3_secret_access_key='object_storage_root_password';
SET s3_url_style='path';

ATTACH 'ducklake:postgres:dbname=ducklake_catalog host=localhost user=posthog password=posthog'
AS ducklake (DATA_PATH 's3://ducklake-dev/');

ATTACH 'postgres:dbname=ducklake host=localhost port=5432 user=posthog password=posthog'
AS ducklake (TYPE ducklake, DATA_PATH 's3://ducklake-dev/');
SELECT * FROM ducklake.data_modeling_team_${TEAM_ID}.model_${MODEL_LABEL} LIMIT 10;
-- Discover available schemas
SELECT * FROM information_schema.schemata WHERE catalog_name = 'ducklake';

-- List tables in the ducklake catalog
SELECT table_schema, table_name FROM information_schema.tables WHERE table_catalog = 'ducklake';

-- Query a specific table
SELECT * FROM ducklake.data_modeling_team_${TEAM_ID}.${MODEL_LABEL} LIMIT 10;
"
```

Replace `${TEAM_ID}` and `${MODEL_LABEL}` with the team/model that was materialized (the model label is logged by the workflow and matches the saved query’s UUID hex).
### Testing Data Imports workflow

1. **Start the dev stack**
Run `hogli start` (or `bin/start`) so Postgres, MinIO, Temporal, and all DuckLake defaults are up. Make sure the `ducklake-data-imports-copy-workflow` feature flag is enabled for the team you plan to use.

2. **Trigger a data import sync from the app**
In the PostHog UI, open Data Warehouse → Sources, connect a source (e.g., Stripe, Hubspot), select the schemas to sync, and click **Sync**. This schedules the `external-data-job` workflow.

3. **Observe the external-data-job workflow**
Visit the Temporal UI at `http://localhost:8081/namespaces/default/workflows` and confirm an `external-data-job` execution appears. Wait for it to finish successfully.

4. **Verify the DuckLake copy workflow runs**
Once the import workflow completes it automatically starts `ducklake-copy.data-imports` as a child run. You should see it listed in the same Temporal UI; wait for the run to complete.

5. **Query the new DuckLake table**
The copy activity creates a table at `ducklake.data_imports_team_<team_id>.<source_type>_<table_name>_<schema_id_hex>`. From any DuckDB shell you can inspect it:

```sql
duckdb -c "
INSTALL ducklake;
LOAD ducklake;
SET s3_endpoint='localhost:19000';
SET s3_use_ssl=false;
SET s3_access_key_id='object_storage_root_user';
SET s3_secret_access_key='object_storage_root_password';
SET s3_url_style='path';

ATTACH 'ducklake:postgres:dbname=ducklake_catalog host=localhost user=posthog password=posthog'
AS ducklake (DATA_PATH 's3://ducklake-dev/');

-- Discover available schemas
SELECT * FROM information_schema.schemata WHERE catalog_name = 'ducklake';

-- List tables in the ducklake catalog
SELECT table_schema, table_name FROM information_schema.tables WHERE table_catalog = 'ducklake';

-- Query a specific table
SELECT * FROM ducklake.data_imports_team_${TEAM_ID}.${SOURCE_TYPE}_${TABLE_NAME}_${SCHEMA_ID_HEX} LIMIT 10;
"
```
55 changes: 40 additions & 15 deletions posthog/ducklake/verification/README.md
Original file line number Diff line number Diff line change
@@ -1,30 +1,55 @@
# DuckLake Data Modeling Verification
# DuckLake Copy Verification

This document summarizes the automated checks executed after every DuckLake data modeling copy. Each copy finishes by running `verify_ducklake_copy_activity` in `posthog/temporal/data_modeling/ducklake_copy_workflow.py`, which issues direct DuckDB comparisons between the Parquet source and the freshly created DuckLake table. The YAML in `data_modeling.yaml` adds configurable SQL checks (for example, a row-count delta), while the workflow code enforces structural comparisons (schema, partitions, key cardinality, null ratios) that are derived from each saved query’s metadata. Together they catch schema drift or data loss before the workflow completes.
This document summarizes the automated checks executed after every DuckLake copy workflow. Both workflows (data modeling and data imports) run verification activities that issue direct DuckDB comparisons between the Delta source and the freshly created DuckLake table. YAML config files add configurable SQL checks (for example, a row-count delta), while the workflow code enforces structural comparisons (schema, partitions, key cardinality, null ratios). Together they catch schema drift or data loss before the workflow completes.

| Workflow | Verification Activity | Config File |
|----------|----------------------|-------------|
| Data Modeling | `verify_ducklake_copy_activity` in `posthog/temporal/data_modeling/ducklake_copy_workflow.py` | `data_modeling.yaml` |
| Data Imports | `verify_data_imports_ducklake_copy_activity` in `posthog/temporal/data_imports/ducklake_copy_data_imports_workflow.py` | `data_imports.yaml` |

## How verification works

1. `prepare_data_modeling_ducklake_metadata_activity` enriches each model with metadata derived from the `DataWarehouseSavedQuery.columns` definition so we know **what** to compare:
- `partition_column`: primary partition column reported by the Delta table metadata
- `key_columns`: inferred IDs (`person_id`, `distinct_id`, etc.) for distinct-count checks
Both workflows follow the same pattern:

1. **Metadata preparation** enriches each model with metadata so we know **what** to compare:
- `partition_column`: primary partition column (from Delta metadata or schema config)
- `key_columns`: inferred IDs (`person_id`, `distinct_id`, `*_id` columns) for distinct-count checks
- `non_nullable_columns`: any column that is not declared as `Nullable(...)`
2. `verify_ducklake_copy_activity` (see `posthog/temporal/data_modeling/ducklake_copy_workflow.py`) materializes the DuckLake table, executes the SQL queries from `data_modeling.yaml`, and then issues the built-in comparisons below directly in DuckDB. Any failure stops the workflow.

2. **Verification activity** executes the SQL queries from the YAML config, then issues the built-in comparisons directly in DuckDB. Any failure stops the workflow.

### Data Modeling specifics

- Metadata derived from `DataWarehouseSavedQuery.columns`
- Partition column detected from Delta table metadata

### Data Imports specifics

- Metadata derived from `ExternalDataSchema` and its associated `DataWarehouseTable.columns`
- Partition column detected from schema's `partitioning_keys` config or the standard `_ph_partition_key` column
- Key columns also consider `incremental_field` and `partitioning_keys` from the schema config

## Built-in checks

| Check | Description |
| -------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `model.schema_hash` | Reads the Delta source schema via DuckDB, reads the DuckLake table schema, hashes both, and fails if hashes differ. Prevents silent schema drift. |
| `model.partition_counts` | When a partition column is available, compares daily row counts between the source Delta table and DuckLake. Any partition mismatch fails verification. |
| `model.key_cardinality.<column>` | For each inferred key column, compares `COUNT(DISTINCT column)` between Delta and DuckLake to catch dropped/duplicate identifiers. |
| `model.null_ratio.<column>` | Ensures DuckLake null counts for columns marked non-nullable match the Delta source so we only fail when DuckLake drifts. |
| `row_count_delta_vs_ducklake` | Defined in `data_modeling.yaml`: compares total row counts using parameterized SQL. |
Both workflows run the same types of checks, but with different prefixes:

| Check Type | Data Modeling | Data Imports | Description |
| ---------- | ------------- | ------------ | ----------- |
| Schema hash | `model.schema_hash` | `data_imports.schema_hash` | Compares Delta source schema with DuckLake table schema. Fails if they differ. Prevents silent schema drift. |
| Partition counts | `model.partition_counts` | `data_imports.partition_counts` | When a partition column is available, compares row counts per partition between source and DuckLake. Any mismatch fails verification. |
| Key cardinality | `model.key_cardinality.<column>` | `data_imports.key_cardinality.<column>` | For each inferred key column, compares `COUNT(DISTINCT column)` between Delta and DuckLake to catch dropped/duplicate identifiers. |
| Null ratio | `model.null_ratio.<column>` | `data_imports.null_ratio.<column>` | Ensures DuckLake null counts for columns marked non-nullable match the Delta source. |

Additionally, YAML-defined checks (like `row_count_delta_vs_ducklake`) can be configured per workflow in the respective YAML files.

## Customizing checks

- Add or update parameterized verifications by editing `posthog/ducklake/verification/data_modeling.yaml`. The YAML file feeds into `DuckLakeCopyVerificationQuery` objects (see `posthog/ducklake/verification/config.py`), which are passed unchanged into `verify_ducklake_copy_activity`. The workflow renders the SQL, binds any listed parameters, and records the single numeric value returned by the query.
- Add or update parameterized verifications by editing the appropriate YAML file:
- `posthog/ducklake/verification/data_modeling.yaml` for data modeling workflow
- `posthog/ducklake/verification/data_imports.yaml` for data imports workflow
- Each YAML file feeds into `DuckLakeCopyVerificationQuery` objects (see `posthog/ducklake/verification/config.py`), which are passed to the verification activity. The workflow renders the SQL, binds any listed parameters, and records the single numeric value returned by the query.
- Each query may declare both an `expected` value and a `tolerance`. During runtime the workflow compares the observed value to `expected` and considers the query passing when `abs(observed - expected) <= tolerance`. If you omit either field, the runtime defaults to `0.0`, so set a tolerance whenever you expect minor drift.
- Built-in checks (schema hash, partition counts, key-cardinality, null ratios) are intentionally hardcoded in `posthog/temporal/data_modeling/ducklake_copy_workflow.py` and always run after the YAML queries. They rely on metadata detected from each saved query, so changing their behavior still requires Python changes today.
- Built-in checks (schema hash, partition counts, key-cardinality, null ratios) are intentionally hardcoded in the workflow files and always run after the YAML queries. They rely on metadata detected from each model, so changing their behavior still requires Python changes today.

### Per-model configuration

Expand Down
2 changes: 2 additions & 0 deletions posthog/ducklake/verification/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
DuckLakeCopyVerificationParameter,
DuckLakeCopyVerificationQuery,
DuckLakeVerificationConfig,
get_data_imports_verification_queries,
get_data_modeling_verification_queries,
)

Expand All @@ -10,4 +11,5 @@
"DuckLakeCopyVerificationQuery",
"DuckLakeVerificationConfig",
"get_data_modeling_verification_queries",
"get_data_imports_verification_queries",
]
27 changes: 24 additions & 3 deletions posthog/ducklake/verification/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,29 @@ def get_data_modeling_verification_queries(model_label: str) -> list[DuckLakeCop
return config.queries_for_model(model_label)


def get_data_imports_verification_queries(schema_name: str) -> list[DuckLakeCopyVerificationQuery]:
"""Return the configured verification queries for the given data imports schema."""
config = _get_data_imports_verification_config()
return config.queries_for_model(schema_name)


@functools.lru_cache
def _get_data_modeling_verification_config() -> DuckLakeVerificationConfig:
raw = _load_verification_yaml()
raw = _load_verification_yaml("data_modeling.yaml")
defaults = tuple(_parse_queries(raw.get("defaults", {}).get("queries")))
model_overrides = {
label: _ModelVerificationConfig(
queries=tuple(_parse_queries(cfg.get("queries"))),
inherit_defaults=cfg.get("inherit_defaults", True),
)
for label, cfg in (raw.get("models") or {}).items()
}
return DuckLakeVerificationConfig(default_queries=defaults, model_overrides=model_overrides)


@functools.lru_cache
def _get_data_imports_verification_config() -> DuckLakeVerificationConfig:
raw = _load_verification_yaml("data_imports.yaml")
defaults = tuple(_parse_queries(raw.get("defaults", {}).get("queries")))
model_overrides = {
label: _ModelVerificationConfig(
Expand All @@ -77,8 +97,8 @@ def _get_data_modeling_verification_config() -> DuckLakeVerificationConfig:
return DuckLakeVerificationConfig(default_queries=defaults, model_overrides=model_overrides)


def _load_verification_yaml() -> dict[str, Any]:
path = Path(__file__).with_name("data_modeling.yaml")
def _load_verification_yaml(filename: str) -> dict[str, Any]:
path = Path(__file__).with_name(filename)
with path.open("r", encoding="utf-8") as handle:
return yaml.safe_load(handle) or {}

Expand Down Expand Up @@ -118,4 +138,5 @@ def _parse_queries(raw_queries: Sequence[dict[str, Any]] | None) -> list[DuckLak
"DuckLakeCopyVerificationQuery",
"DuckLakeVerificationConfig",
"get_data_modeling_verification_queries",
"get_data_imports_verification_queries",
]
14 changes: 14 additions & 0 deletions posthog/ducklake/verification/data_imports.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
defaults:
queries:
- name: row_count_delta_vs_ducklake
description: Compare row counts between the source Delta table and the DuckLake copy.
sql: |
SELECT ABS(
(SELECT COUNT(*) FROM delta_scan(?))
-
(SELECT COUNT(*) FROM {ducklake_table})
) AS row_difference
parameters:
- source_table_uri
tolerance: 0
models: {}
Loading
Loading