Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
a82ca1c
refactor: Allow MSGraphAsyncOperator and MSGraphSensor to directly st…
davidblain-infrabel Jul 8, 2025
cb3d718
refactor: Reformatted MSGraphSensor
davidblain-infrabel Jul 8, 2025
9ae30b2
refactor: Updated TODO in ImportError clause
davidblain-infrabel Jul 8, 2025
57a1194
refactor: Refactored MSGraphAsyncOperator and MSGraphSensor to always…
davidblain-infrabel Jul 8, 2025
1954fe7
Merge branch 'main' into feature/msgraph-start-from-trigger
dabla Jul 8, 2025
be8aaf1
refactor: Added test for api version in KiotaRequestAdapterHook test
davidblain-infrabel Jul 8, 2025
8442e5d
refactor: Log RequestInformation URL in KiotaRequestAdapterHook
davidblain-infrabel Jul 8, 2025
8279e58
refactor: Do render templated fields before initialising StartTrigger…
davidblain-infrabel Jul 8, 2025
3db6ada
Merge branch 'main' into feature/msgraph-start-from-trigger
dabla Jul 8, 2025
666b026
refactor: Initialise StartTriggerArgs in constructor
davidblain-infrabel Jul 8, 2025
2aaa6ba
refactor: Only allow start_from_trigger in MSGraphOperator if Airflow…
davidblain-infrabel Jul 9, 2025
996aaf5
refactor: Check on start_from_trigger instead of Airflow version in e…
davidblain-infrabel Jul 9, 2025
1d81786
refactor: Make sure base_url ends with slash
davidblain-infrabel Jul 9, 2025
3d517e8
Merge branch 'main' into feature/msgraph-start-from-trigger
dabla Jul 9, 2025
6787f0e
refactor: Disable start_trigger_args if import of StartTriggerArgs fails
davidblain-infrabel Jul 9, 2025
8375eba
refactor: Import BaseSensorOperator from compat_version
davidblain-infrabel Jul 9, 2025
980e337
refactor: Allow task to be None in deferrable_operator
davidblain-infrabel Jul 9, 2025
58dd886
refactor: Reformatted files
davidblain-infrabel Jul 9, 2025
6c1a0a5
Merge branch 'main' into feature/msgraph-start-from-trigger
dabla Jul 9, 2025
9e2c7ec
refactor: Refactored deferrable_operator to check on start_from_trigg…
davidblain-infrabel Jul 9, 2025
f51cd7b
refactor: Refactored deferrable_operator to use start_trigger_args in…
davidblain-infrabel Jul 9, 2025
a2677fb
Revert "refactor: Refactored deferrable_operator to use start_trigger…
davidblain-infrabel Jul 9, 2025
53ac7c0
Merge branch 'main' into feature/msgraph-start-from-trigger
dabla Aug 4, 2025
9eae491
Merge branch 'main' into feature/msgraph-start-from-trigger
dabla Aug 4, 2025
ebe7cd8
Merge branch 'main' into feature/msgraph-start-from-trigger
dabla Aug 18, 2025
179a4c2
Merge branch 'main' into feature/msgraph-start-from-trigger
dabla Aug 19, 2025
5016feb
Merge branch 'main' into feature/msgraph-start-from-trigger
dabla Aug 20, 2025
a6bfd20
Merge branch 'main' into feature/msgraph-start-from-trigger
dabla Aug 21, 2025
d02e656
Merge branch 'main' into feature/msgraph-start-from-trigger
dabla Aug 22, 2025
c95a15b
Merge branch 'main' into feature/msgraph-start-from-trigger
dabla Aug 22, 2025
05a1895
Merge branch 'main' into feature/msgraph-start-from-trigger
dabla Aug 22, 2025
3a5617d
Merge branch 'main' into feature/msgraph-start-from-trigger
dabla Sep 2, 2025
0ea5e5a
Merge branch 'main' into feature/msgraph-start-from-trigger
dabla Oct 11, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,14 @@
from typing import TYPE_CHECKING, Any

from airflow.exceptions import TaskDeferred
from airflow.utils.module_loading import import_string
from airflow.utils.session import NEW_SESSION

from tests_common.test_utils.mock_context import mock_context

if TYPE_CHECKING:
from sqlalchemy.orm import Session

from airflow.models import Operator
from airflow.triggers.base import BaseTrigger, TriggerEvent

Expand All @@ -45,14 +49,25 @@ def execute_operator(operator: Operator) -> tuple[Any, Any]:
return asyncio.run(deferrable_operator(context, operator))


async def deferrable_operator(context, operator):
async def deferrable_operator(context, operator, session: Session = NEW_SESSION):
result = None
triggered_events = []
try:
if operator.start_from_trigger:
trigger_cls = import_string(operator.start_trigger_args.trigger_cls)
trigger = trigger_cls(
**operator.expand_start_trigger_args(context=context, session=session).trigger_kwargs
)
raise TaskDeferred(
trigger=trigger,
method_name=operator.start_trigger_args.next_method,
kwargs=operator.start_trigger_args.next_kwargs,
timeout=operator.start_trigger_args.timeout,
)
operator.render_template_fields(context=context)
result = operator.execute(context=context)
except TaskDeferred as deferred:
task = deferred
task: TaskDeferred | None = deferred

while task:
events = await run_tigger(task.trigger)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
MSGraphTrigger,
ResponseSerializer,
)
from airflow.providers.microsoft.azure.version_compat import XCOM_RETURN_KEY, BaseOperator
from airflow.providers.microsoft.azure.version_compat import AIRFLOW_V_3_1_PLUS, XCOM_RETURN_KEY, BaseOperator

if TYPE_CHECKING:
from io import BytesIO
Expand Down Expand Up @@ -109,6 +109,7 @@ class MSGraphAsyncOperator(BaseOperator):
Bytes will be base64 encoded into a string, so it can be stored as an XCom.
"""

start_from_trigger = AIRFLOW_V_3_1_PLUS
template_fields: Sequence[str] = (
"url",
"response_type",
Expand Down Expand Up @@ -162,27 +163,55 @@ def __init__(
self.result_processor = result_processor
self.event_handler = event_handler or default_event_handler
self.serializer: ResponseSerializer = serializer()
if self.start_from_trigger:
try:
from airflow.triggers.base import StartTriggerArgs

self.start_trigger_args = StartTriggerArgs(
trigger_cls=f"{MSGraphTrigger.__module__}.{MSGraphTrigger.__name__}",
trigger_kwargs=dict(
url=self.url,
response_type=self.response_type,
path_parameters=self.path_parameters,
url_template=self.url_template,
method=self.method,
query_parameters=self.query_parameters,
headers=self.headers,
data=self.data,
conn_id=self.conn_id,
timeout=self.timeout,
proxies=self.proxies,
scopes=self.scopes,
api_version=self.api_version,
serializer=f"{type(self.serializer).__module__}.{type(self.serializer).__name__}",
),
next_method=self.execute_complete.__name__,
)
except ImportError:
self.start_from_trigger = False

def execute(self, context: Context) -> None:
self.defer(
trigger=MSGraphTrigger(
url=self.url,
response_type=self.response_type,
path_parameters=self.path_parameters,
url_template=self.url_template,
method=self.method,
query_parameters=self.query_parameters,
headers=self.headers,
data=self.data,
conn_id=self.conn_id,
timeout=self.timeout,
proxies=self.proxies,
scopes=self.scopes,
api_version=self.api_version,
serializer=type(self.serializer),
),
method_name=self.execute_complete.__name__,
)
if not self.start_from_trigger:
self.defer(
trigger=MSGraphTrigger(
url=self.url,
response_type=self.response_type,
path_parameters=self.path_parameters,
url_template=self.url_template,
method=self.method,
query_parameters=self.query_parameters,
headers=self.headers,
data=self.data,
conn_id=self.conn_id,
timeout=self.timeout,
proxies=self.proxies,
scopes=self.scopes,
api_version=self.api_version,
serializer=type(self.serializer),
),
method_name=self.execute_complete.__name__,
)
return

def execute_complete(
self,
Expand Down Expand Up @@ -228,14 +257,14 @@ def execute_complete(
self.trigger_next_link(
response=response, method_name=self.execute_complete.__name__, context=context
)
except TaskDeferred as exception:
except TaskDeferred as task_deferred:
self.append_result(
results=results,
result=result,
append_result_as_list_if_absent=True,
)
self.push_xcom(context=context, value=results)
raise exception
raise task_deferred

if not results:
return result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@
from airflow.providers.microsoft.azure.hooks.msgraph import KiotaRequestAdapterHook
from airflow.providers.microsoft.azure.operators.msgraph import execute_callable
from airflow.providers.microsoft.azure.triggers.msgraph import MSGraphTrigger, ResponseSerializer
from airflow.providers.microsoft.azure.version_compat import BaseSensorOperator
from airflow.providers.microsoft.azure.version_compat import (
AIRFLOW_V_3_1_PLUS,
BaseSensorOperator,
)

if TYPE_CHECKING:
from datetime import timedelta
Expand Down Expand Up @@ -60,6 +63,7 @@ class MSGraphSensor(BaseSensorOperator):
Bytes will be base64 encoded into a string, so it can be stored as an XCom.
"""

start_from_trigger = AIRFLOW_V_3_1_PLUS
template_fields: Sequence[str] = (
"url",
"response_type",
Expand Down Expand Up @@ -107,8 +111,61 @@ def __init__(
self.event_processor = event_processor
self.result_processor = result_processor
self.serializer = serializer()
if self.start_from_trigger:
try:
from airflow.triggers.base import StartTriggerArgs

self.start_trigger_args = StartTriggerArgs(
trigger_cls=f"{MSGraphTrigger.__module__}.{MSGraphTrigger.__name__}",
trigger_kwargs=dict(
url=self.url,
response_type=self.response_type,
path_parameters=self.path_parameters,
url_template=self.url_template,
method=self.method,
query_parameters=self.query_parameters,
headers=self.headers,
data=self.data,
conn_id=self.conn_id,
timeout=self.timeout,
proxies=self.proxies,
scopes=self.scopes,
api_version=self.api_version,
serializer=f"{type(self.serializer).__module__}.{type(self.serializer).__name__}",
),
next_method=self.execute_complete.__name__,
)
except ImportError:
self.start_from_trigger = False

def execute(self, context: Context) -> None:
if not self.start_from_trigger:
self.defer(
trigger=MSGraphTrigger(
url=self.url,
response_type=self.response_type,
path_parameters=self.path_parameters,
url_template=self.url_template,
method=self.method,
query_parameters=self.query_parameters,
headers=self.headers,
data=self.data,
conn_id=self.conn_id,
timeout=self.timeout,
proxies=self.proxies,
scopes=self.scopes,
api_version=self.api_version,
serializer=type(self.serializer),
),
method_name=self.execute_complete.__name__,
)
return

def execute(self, context: Context):
def retry_execute(
self,
context: Context,
**kwargs,
) -> Any:
self.defer(
trigger=MSGraphTrigger(
url=self.url,
Expand All @@ -129,13 +186,6 @@ def execute(self, context: Context):
method_name=self.execute_complete.__name__,
)

def retry_execute(
self,
context: Context,
**kwargs,
) -> Any:
self.execute(context=context)

def execute_complete(
self,
context: Context,
Expand Down
Loading