-
Notifications
You must be signed in to change notification settings - Fork 16.3k
Fix rendering of template fields with start from trigger #55068
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Fix rendering of template fields with start from trigger #55068
Conversation
|
https://github.com/apache/airflow/blob/3.0.0rc3/airflow-core/newsfragments/aip-66.significant.rst
(Emphasis mine) |
|
This would also be very useful for async callbacks (currently used for Deadline Alerts) running in the triggerer! Once this is merged in, I could create a followup PR to replace the implementation in #55241 |
|
@ashb I've introduced a SerializedDagBag class, which acts as a simple cache in the same as the DagBag but then only uses serialized DAG's from the DB instead of from the filesystem. The SerializedDagBag needs dag_id and dag_version_id to be able to return the corresponding DAG. The reason why I still retrieve the serialized DAG from within the update_triggers (which has DB access) instead of the scheduler to render the templates there, as you proposed, is to still be able to construct the RuntimeTaskInstance from within the update_triggers (which needs a task to be able to construct) so that we can assign that instance to the trigger instead of the serialized TI from the workloads module, as this will also be needed to be able to run the multiple yielded trigger events for AIP-88 (as those will need the template context to be able to do the pagination from within the triggerer while yielding the events). I know this is a very complicated explanation, let me know if you want some clarification there. I still need to write a unit test for SerializedDagBag though. |
|
SerializedDagBag already exists in the form of SchedulerDagBag I think -- rather than a new one, it might be better to rename that one if it otherwise fits your need |
You're right, it's called DBDagBag, it serves the same purpose, so will replace it with that one, thx @ashb for pointing this out 👍 |
I ditched it and won't be using it in this PR as I need the SerializedDagModel, not the SerializedDAG, but I'm already using it for AIP-88. |
|
So small explanation what I did in this PR.
|
…er-in-trigger-subprocess
… get_task_state before querying
…er-in-trigger-subprocess
…er-in-trigger-subprocess
b48d4eb to
85f5e2b
Compare
…date_triggers method in TriggerRunnerSupervisor
…er-in-trigger-subprocess
…er-in-trigger-subprocess
|
Re-openen this PR as I realised I needed it to be able to finish AIP-88. |
…er-in-trigger-subprocess
…er-in-trigger-subprocess
…er-in-trigger-subprocess
…er-in-trigger-subprocess
…er-in-trigger-subprocess
…er-in-trigger-subprocess
| self.task_instance.run_id, | ||
| self.task_instance.map_index, | ||
| if not self.task_instance: | ||
| raise AirflowException(f"TaskInstance not set on {self.__class__.__name__}!") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wei is correct. We don't want to use AirflowExecption for new use cases.
| if not self.task_instance: | ||
| raise AirflowException(f"TaskInstance not set on {self.__class__.__name__}!") | ||
|
|
||
| if not isinstance(self.task_instance, RuntimeTaskInstance): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is for for 3.0..3.1 right? Might be worth a comment saying when we could hit this case.
| :param session: Sqlalchemy session | ||
| """ | ||
| if not self.task_instance: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't this going to fail on Airflow 2.x? Or maybe on Airflow 3.1? It feels like it's going to fail sometime when it should still be supported.
| self._BaseOperator__init_kwargs.update(kwargs) # type: ignore | ||
|
|
||
| # Validate trigger kwargs | ||
| if hasattr(self, "_validate_start_from_trigger_kwargs"): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given we define a _validate_start_from_trigger_kwargs on L1415 how can this ever be False?
| """ | ||
| return self | ||
|
|
||
| def expand_start_from_trigger(self, *, context: Context) -> bool: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this function? Given the doc string I would have expected to see a different implementation of this for mapped operators, but it seems to be working without it, so maybe this fn isn't needed?
This PR tries to fix the rendering of templated fields with start from trigger args without breaking architectural assumptions, which the initial PR 53071 did violate and was reverted in PR 55037.
This PR could be simplified without the need of parsing the DAG if the render_template_fields would be re-usable in the triggerers, as of now it's part of the BaseOperator and thus not re-usable in BaseTrigger. As discussed with @ashb , apparently @amoghrajesh would be working on this (e.g. making template renderers reusable).
As opposed to previous attempt, here the rendering of the templates isn't done while creating the trigger into the database within the TriggerRunnerSupervisor. Here, the TriggerRunnerSupervisor will only retrieve the serialized DAG from the DagModel, and pass it to the workloads RunTrigger as an extra parameter (e.g named dag_data), which in turn will be used by the TriggerRunner in the subprocess in which there it will have access to the XCom's, and thus, shouldn't raise the: ImportError: cannot import name 'SUPERVISOR_COMMS' from 'airflow.sdk.execution_time.task_runner'.
So, if template rendering would be reusable outside the BaseOperator, then we wouldn't need to parse the DAG from the DagBag to retrieve the task (e.g. BaseOperator) to be able to do the template rendering, which would make the solution even easier but also less heavy (e.g. more performant) for the triggerer subprocess.After further reflection, also regarding the work I'm doing for AIP-88, the current solution will still need to get the task (e.g. Operator) for the trigger when yielding multiple events. Same for the rendering of templates, you need the context to be able to render those, and to get the context, you need at least a RuntimeTaskInstance, which of course, requires guess what? a task (e.g. BaseOperator). So even if the rendering of templates would be reusable across operator and triggerer, you will still need the context, thus, in this case, wouldn't change that much to the current solution.
As loading DAG code is prohibited in the triggerer, we load the serialized DAG version from the database. As a consequence, callables aren't supported as kwargs when start_from_trigger is enabled. Thus, when enabling an operator as start_from_trigger, we will automatically validate the kwargs and check if those don't contain callables, otherwise an AirflowException will be raised stating which kwargs has a callable.
Also the test dag from @kaxil is working:
This PR only fixes the start_from_trigger with rendered templates for non-expanded tasks, to allow start_from_trigger with expanded tasks, another PR will be needed, but at least, we can already support start_from_trigger on non-expanded tasks.
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named
{pr_number}.significant.rstor{issue_number}.significant.rst, in airflow-core/newsfragments.