Skip to content

Conversation

@kaxil
Copy link
Member

@kaxil kaxil commented Jan 19, 2026

When DAG files are loaded from paths that change between deployments (e.g., versioned directories), the python_callable_name in serialized DAGs changes even when the actual DAG code is unchanged.

This happens because qualname() includes the module name, which contains a hash derived from the file path via get_unique_dag_module_name(). When the path changes, the module name changes, causing:

  1. Different python_callable_name values
  2. Different dag_hash
  3. New SerializedDagModel entries on every deployment

Solution

Add exclude_module parameter to qualname() that returns only the qualified name without the module prefix:

# Before: "unusual_prefix_abc123_dag_file.my_function"
qualname(func)

# After: "my_function"  
qualname(func, exclude_module=True)

Use this in operator serialization to ensure python_callable_name is stable across deployments.

Add exclude_module parameter to qualname() that returns only the
qualified name without module prefix. This prevents unnecessary DAG
version churn when bundle paths change (e.g., version or timestamp-based folders).

The module name contains a path-derived hash that changes with each
deployment, causing new `SerializedDagModel` entries even when DAG code
is unchanged.
@kaxil kaxil force-pushed the fix-python-callable-name-stability branch from 01f2900 to d7c6c22 Compare January 19, 2026 22:56
@kaxil kaxil requested a review from vatsrahul1001 January 20, 2026 00:55
Comment on lines 1038 to 1069
# Store python_callable_name instead of python_callable.
# exclude_module=True ensures stable names across bundle version changes.
python_callable = op.partial_kwargs.get("python_callable", None)
if python_callable:
callable_name = qualname(python_callable)
serialized_op["partial_kwargs"]["python_callable_name"] = callable_name
serialized_op["partial_kwargs"]["python_callable_name"] = qualname(
python_callable, exclude_module=True
)
del serialized_op["partial_kwargs"]["python_callable"]

serialized_op["_is_mapped"] = True
return serialized_op

@classmethod
def serialize_operator(cls, op: SdkOperator) -> dict[str, Any]:
return cls._serialize_node(op)

@classmethod
def _serialize_node(cls, op: SdkOperator) -> dict[str, Any]:
"""Serialize operator into a JSON object."""
serialize_op = cls.serialize_to_json(op, cls._decorated_fields)

if not op.email:
# If "email" is empty, we do not need to include other email attrs
for attr in ["email_on_failure", "email_on_retry"]:
if attr in serialize_op:
del serialize_op[attr]

# Detect if there's a change in python callable name
# Store python_callable_name for change detection.
# exclude_module=True ensures stable names across bundle version changes.
python_callable = getattr(op, "python_callable", None)
if python_callable:
callable_name = qualname(python_callable)
serialize_op["python_callable_name"] = callable_name
serialize_op["python_callable_name"] = qualname(python_callable, exclude_module=True)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was just reviewing in consideration to collision between tasks but since its scoped deep in the nested structure, we are good

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants