Skip to content

Commit b2bd2b2

Browse files
authored
Merge branch 'v3-2-test' into backport-1b3bf55-v3-2-test
2 parents 04598ab + 817ffa9 commit b2bd2b2

17 files changed

Lines changed: 260 additions & 40 deletions

File tree

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fix scheduler crash on asset-triggered DagRuns by eager-loading ``AssetEvent.source_aliases`` in ``SchedulerJobRunner.process_executor_events``.

airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4640,6 +4640,14 @@ paths:
46404640
type: integer
46414641
default: -1
46424642
title: Map Index
4643+
- name: try_number
4644+
in: query
4645+
required: false
4646+
schema:
4647+
anyOf:
4648+
- type: integer
4649+
- type: 'null'
4650+
title: Try Number
46434651
responses:
46444652
'200':
46454653
description: Successful Response

airflow-core/src/airflow/api_fastapi/core_api/routes/public/extra_links.py

Lines changed: 33 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -47,19 +47,14 @@ def get_extra_links(
4747
session: SessionDep,
4848
dag_bag: DagBagDep,
4949
map_index: int = -1,
50+
try_number: int | None = None,
5051
) -> ExtraLinkCollectionResponse:
5152
"""Get extra links for task instance."""
5253
from airflow.models.taskinstance import TaskInstance
54+
from airflow.models.taskinstancehistory import TaskInstanceHistory
5355

5456
dag_run = session.scalar(select(DagRun).where(DagRun.dag_id == dag_id, DagRun.run_id == dag_run_id))
5557

56-
dag = get_dag_for_run_or_latest_version(dag_bag, dag_run, dag_id, session)
57-
58-
try:
59-
task = dag.get_task(task_id)
60-
except TaskNotFound:
61-
raise HTTPException(status.HTTP_404_NOT_FOUND, f"Task with ID = {task_id} not found")
62-
6358
ti = session.scalar(
6459
select(TaskInstance).where(
6560
TaskInstance.dag_id == dag_id,
@@ -75,8 +70,38 @@ def get_extra_links(
7570
"TaskInstance not found",
7671
)
7772

73+
dag = get_dag_for_run_or_latest_version(dag_bag, dag_run, dag_id, session)
74+
75+
try:
76+
task = dag.get_task(task_id)
77+
except TaskNotFound:
78+
raise HTTPException(status.HTTP_404_NOT_FOUND, f"Task with ID = {task_id} not found")
79+
80+
# Resolve which object to use for link generation. For the current try we use
81+
# the live TI; for past tries we fetch the immutable TaskInstanceHistory record,
82+
# which also validates that the requested try_number actually exists.
83+
if try_number is not None and try_number != ti.try_number:
84+
tih = session.scalar(
85+
select(TaskInstanceHistory).where(
86+
TaskInstanceHistory.dag_id == dag_id,
87+
TaskInstanceHistory.task_id == task_id,
88+
TaskInstanceHistory.run_id == dag_run_id,
89+
TaskInstanceHistory.map_index == map_index,
90+
TaskInstanceHistory.try_number == try_number,
91+
)
92+
)
93+
if not tih:
94+
raise HTTPException(
95+
status.HTTP_404_NOT_FOUND,
96+
f"TaskInstanceHistory not found for try_number={try_number}",
97+
)
98+
ti_for_links = tih
99+
else:
100+
ti_for_links = ti
101+
78102
all_extra_link_pairs = (
79-
(link_name, task.get_extra_links(ti, link_name)) for link_name in task.extra_links
103+
(link_name, task.get_extra_links(ti_for_links, link_name))
104+
for link_name in task.extra_links # type: ignore[arg-type]
80105
)
81106
all_extra_links = {link_name: link_url or None for link_name, link_url in sorted(all_extra_link_pairs)}
82107

airflow-core/src/airflow/jobs/scheduler_job_runner.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1278,12 +1278,13 @@ def process_executor_events(
12781278
filter_for_tis = TI.filter_for_tis(tis_with_right_state)
12791279
if filter_for_tis is None:
12801280
return len(event_buffer)
1281-
asset_loader, _ = _eager_load_dag_run_for_validation()
1281+
asset_loader, alias_loader = _eager_load_dag_run_for_validation()
12821282
query = (
12831283
select(TI)
12841284
.where(filter_for_tis)
12851285
.options(selectinload(TI.dag_model))
12861286
.options(asset_loader)
1287+
.options(alias_loader)
12871288
.options(joinedload(TI.dag_run).selectinload(DagRun.created_dag_version))
12881289
.options(joinedload(TI.dag_version))
12891290
)

airflow-core/src/airflow/models/taskinstancehistory.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858

5959
from airflow.models import DagRun
6060
from airflow.models.taskinstance import TaskInstance
61+
from airflow.models.taskinstancekey import TaskInstanceKey
6162

6263

6364
class TaskInstanceHistory(Base):
@@ -179,6 +180,13 @@ def id(self) -> UUID:
179180
"""Alias for primary key field to support TaskInstance."""
180181
return self.task_instance_id
181182

183+
@property
184+
def key(self) -> TaskInstanceKey:
185+
"""Returns a key that identifies this history record, mirroring TaskInstance.key."""
186+
from airflow.models.taskinstancekey import TaskInstanceKey
187+
188+
return TaskInstanceKey(self.dag_id, self.task_id, self.run_id, self.try_number, self.map_index)
189+
182190
@staticmethod
183191
@provide_session
184192
def record_ti(ti: TaskInstance, session: Session = NEW_SESSION) -> None:

airflow-core/src/airflow/ui/openapi-gen/queries/common.ts

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -381,21 +381,23 @@ export const UseEventLogServiceGetEventLogsKeyFn = ({ after, before, dagId, dagI
381381
export type ExtraLinksServiceGetExtraLinksDefaultResponse = Awaited<ReturnType<typeof ExtraLinksService.getExtraLinks>>;
382382
export type ExtraLinksServiceGetExtraLinksQueryResult<TData = ExtraLinksServiceGetExtraLinksDefaultResponse, TError = unknown> = UseQueryResult<TData, TError>;
383383
export const useExtraLinksServiceGetExtraLinksKey = "ExtraLinksServiceGetExtraLinks";
384-
export const UseExtraLinksServiceGetExtraLinksKeyFn = ({ dagId, dagRunId, mapIndex, taskId }: {
384+
export const UseExtraLinksServiceGetExtraLinksKeyFn = ({ dagId, dagRunId, mapIndex, taskId, tryNumber }: {
385385
dagId: string;
386386
dagRunId: string;
387387
mapIndex?: number;
388388
taskId: string;
389-
}, queryKey?: Array<unknown>) => [useExtraLinksServiceGetExtraLinksKey, ...(queryKey ?? [{ dagId, dagRunId, mapIndex, taskId }])];
389+
tryNumber?: number;
390+
}, queryKey?: Array<unknown>) => [useExtraLinksServiceGetExtraLinksKey, ...(queryKey ?? [{ dagId, dagRunId, mapIndex, taskId, tryNumber }])];
390391
export type TaskInstanceServiceGetExtraLinksDefaultResponse = Awaited<ReturnType<typeof TaskInstanceService.getExtraLinks>>;
391392
export type TaskInstanceServiceGetExtraLinksQueryResult<TData = TaskInstanceServiceGetExtraLinksDefaultResponse, TError = unknown> = UseQueryResult<TData, TError>;
392393
export const useTaskInstanceServiceGetExtraLinksKey = "TaskInstanceServiceGetExtraLinks";
393-
export const UseTaskInstanceServiceGetExtraLinksKeyFn = ({ dagId, dagRunId, mapIndex, taskId }: {
394+
export const UseTaskInstanceServiceGetExtraLinksKeyFn = ({ dagId, dagRunId, mapIndex, taskId, tryNumber }: {
394395
dagId: string;
395396
dagRunId: string;
396397
mapIndex?: number;
397398
taskId: string;
398-
}, queryKey?: Array<unknown>) => [useTaskInstanceServiceGetExtraLinksKey, ...(queryKey ?? [{ dagId, dagRunId, mapIndex, taskId }])];
399+
tryNumber?: number;
400+
}, queryKey?: Array<unknown>) => [useTaskInstanceServiceGetExtraLinksKey, ...(queryKey ?? [{ dagId, dagRunId, mapIndex, taskId, tryNumber }])];
399401
export type TaskInstanceServiceGetTaskInstanceDefaultResponse = Awaited<ReturnType<typeof TaskInstanceService.getTaskInstance>>;
400402
export type TaskInstanceServiceGetTaskInstanceQueryResult<TData = TaskInstanceServiceGetTaskInstanceDefaultResponse, TError = unknown> = UseQueryResult<TData, TError>;
401403
export const useTaskInstanceServiceGetTaskInstanceKey = "TaskInstanceServiceGetTaskInstance";

airflow-core/src/airflow/ui/openapi-gen/queries/ensureQueryData.ts

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -769,15 +769,17 @@ export const ensureUseEventLogServiceGetEventLogsData = (queryClient: QueryClien
769769
* @param data.dagRunId
770770
* @param data.taskId
771771
* @param data.mapIndex
772+
* @param data.tryNumber
772773
* @returns ExtraLinkCollectionResponse Successful Response
773774
* @throws ApiError
774775
*/
775-
export const ensureUseExtraLinksServiceGetExtraLinksData = (queryClient: QueryClient, { dagId, dagRunId, mapIndex, taskId }: {
776+
export const ensureUseExtraLinksServiceGetExtraLinksData = (queryClient: QueryClient, { dagId, dagRunId, mapIndex, taskId, tryNumber }: {
776777
dagId: string;
777778
dagRunId: string;
778779
mapIndex?: number;
779780
taskId: string;
780-
}) => queryClient.ensureQueryData({ queryKey: Common.UseExtraLinksServiceGetExtraLinksKeyFn({ dagId, dagRunId, mapIndex, taskId }), queryFn: () => ExtraLinksService.getExtraLinks({ dagId, dagRunId, mapIndex, taskId }) });
781+
tryNumber?: number;
782+
}) => queryClient.ensureQueryData({ queryKey: Common.UseExtraLinksServiceGetExtraLinksKeyFn({ dagId, dagRunId, mapIndex, taskId, tryNumber }), queryFn: () => ExtraLinksService.getExtraLinks({ dagId, dagRunId, mapIndex, taskId, tryNumber }) });
781783
/**
782784
* Get Extra Links
783785
* Get extra links for task instance.
@@ -786,15 +788,17 @@ export const ensureUseExtraLinksServiceGetExtraLinksData = (queryClient: QueryCl
786788
* @param data.dagRunId
787789
* @param data.taskId
788790
* @param data.mapIndex
791+
* @param data.tryNumber
789792
* @returns ExtraLinkCollectionResponse Successful Response
790793
* @throws ApiError
791794
*/
792-
export const ensureUseTaskInstanceServiceGetExtraLinksData = (queryClient: QueryClient, { dagId, dagRunId, mapIndex, taskId }: {
795+
export const ensureUseTaskInstanceServiceGetExtraLinksData = (queryClient: QueryClient, { dagId, dagRunId, mapIndex, taskId, tryNumber }: {
793796
dagId: string;
794797
dagRunId: string;
795798
mapIndex?: number;
796799
taskId: string;
797-
}) => queryClient.ensureQueryData({ queryKey: Common.UseTaskInstanceServiceGetExtraLinksKeyFn({ dagId, dagRunId, mapIndex, taskId }), queryFn: () => TaskInstanceService.getExtraLinks({ dagId, dagRunId, mapIndex, taskId }) });
800+
tryNumber?: number;
801+
}) => queryClient.ensureQueryData({ queryKey: Common.UseTaskInstanceServiceGetExtraLinksKeyFn({ dagId, dagRunId, mapIndex, taskId, tryNumber }), queryFn: () => TaskInstanceService.getExtraLinks({ dagId, dagRunId, mapIndex, taskId, tryNumber }) });
798802
/**
799803
* Get Task Instance
800804
* Get task instance.

airflow-core/src/airflow/ui/openapi-gen/queries/prefetch.ts

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -769,15 +769,17 @@ export const prefetchUseEventLogServiceGetEventLogs = (queryClient: QueryClient,
769769
* @param data.dagRunId
770770
* @param data.taskId
771771
* @param data.mapIndex
772+
* @param data.tryNumber
772773
* @returns ExtraLinkCollectionResponse Successful Response
773774
* @throws ApiError
774775
*/
775-
export const prefetchUseExtraLinksServiceGetExtraLinks = (queryClient: QueryClient, { dagId, dagRunId, mapIndex, taskId }: {
776+
export const prefetchUseExtraLinksServiceGetExtraLinks = (queryClient: QueryClient, { dagId, dagRunId, mapIndex, taskId, tryNumber }: {
776777
dagId: string;
777778
dagRunId: string;
778779
mapIndex?: number;
779780
taskId: string;
780-
}) => queryClient.prefetchQuery({ queryKey: Common.UseExtraLinksServiceGetExtraLinksKeyFn({ dagId, dagRunId, mapIndex, taskId }), queryFn: () => ExtraLinksService.getExtraLinks({ dagId, dagRunId, mapIndex, taskId }) });
781+
tryNumber?: number;
782+
}) => queryClient.prefetchQuery({ queryKey: Common.UseExtraLinksServiceGetExtraLinksKeyFn({ dagId, dagRunId, mapIndex, taskId, tryNumber }), queryFn: () => ExtraLinksService.getExtraLinks({ dagId, dagRunId, mapIndex, taskId, tryNumber }) });
781783
/**
782784
* Get Extra Links
783785
* Get extra links for task instance.
@@ -786,15 +788,17 @@ export const prefetchUseExtraLinksServiceGetExtraLinks = (queryClient: QueryClie
786788
* @param data.dagRunId
787789
* @param data.taskId
788790
* @param data.mapIndex
791+
* @param data.tryNumber
789792
* @returns ExtraLinkCollectionResponse Successful Response
790793
* @throws ApiError
791794
*/
792-
export const prefetchUseTaskInstanceServiceGetExtraLinks = (queryClient: QueryClient, { dagId, dagRunId, mapIndex, taskId }: {
795+
export const prefetchUseTaskInstanceServiceGetExtraLinks = (queryClient: QueryClient, { dagId, dagRunId, mapIndex, taskId, tryNumber }: {
793796
dagId: string;
794797
dagRunId: string;
795798
mapIndex?: number;
796799
taskId: string;
797-
}) => queryClient.prefetchQuery({ queryKey: Common.UseTaskInstanceServiceGetExtraLinksKeyFn({ dagId, dagRunId, mapIndex, taskId }), queryFn: () => TaskInstanceService.getExtraLinks({ dagId, dagRunId, mapIndex, taskId }) });
800+
tryNumber?: number;
801+
}) => queryClient.prefetchQuery({ queryKey: Common.UseTaskInstanceServiceGetExtraLinksKeyFn({ dagId, dagRunId, mapIndex, taskId, tryNumber }), queryFn: () => TaskInstanceService.getExtraLinks({ dagId, dagRunId, mapIndex, taskId, tryNumber }) });
798802
/**
799803
* Get Task Instance
800804
* Get task instance.

airflow-core/src/airflow/ui/openapi-gen/queries/queries.ts

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -769,15 +769,17 @@ export const useEventLogServiceGetEventLogs = <TData = Common.EventLogServiceGet
769769
* @param data.dagRunId
770770
* @param data.taskId
771771
* @param data.mapIndex
772+
* @param data.tryNumber
772773
* @returns ExtraLinkCollectionResponse Successful Response
773774
* @throws ApiError
774775
*/
775-
export const useExtraLinksServiceGetExtraLinks = <TData = Common.ExtraLinksServiceGetExtraLinksDefaultResponse, TError = unknown, TQueryKey extends Array<unknown> = unknown[]>({ dagId, dagRunId, mapIndex, taskId }: {
776+
export const useExtraLinksServiceGetExtraLinks = <TData = Common.ExtraLinksServiceGetExtraLinksDefaultResponse, TError = unknown, TQueryKey extends Array<unknown> = unknown[]>({ dagId, dagRunId, mapIndex, taskId, tryNumber }: {
776777
dagId: string;
777778
dagRunId: string;
778779
mapIndex?: number;
779780
taskId: string;
780-
}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">) => useQuery<TData, TError>({ queryKey: Common.UseExtraLinksServiceGetExtraLinksKeyFn({ dagId, dagRunId, mapIndex, taskId }, queryKey), queryFn: () => ExtraLinksService.getExtraLinks({ dagId, dagRunId, mapIndex, taskId }) as TData, ...options });
781+
tryNumber?: number;
782+
}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">) => useQuery<TData, TError>({ queryKey: Common.UseExtraLinksServiceGetExtraLinksKeyFn({ dagId, dagRunId, mapIndex, taskId, tryNumber }, queryKey), queryFn: () => ExtraLinksService.getExtraLinks({ dagId, dagRunId, mapIndex, taskId, tryNumber }) as TData, ...options });
781783
/**
782784
* Get Extra Links
783785
* Get extra links for task instance.
@@ -786,15 +788,17 @@ export const useExtraLinksServiceGetExtraLinks = <TData = Common.ExtraLinksServi
786788
* @param data.dagRunId
787789
* @param data.taskId
788790
* @param data.mapIndex
791+
* @param data.tryNumber
789792
* @returns ExtraLinkCollectionResponse Successful Response
790793
* @throws ApiError
791794
*/
792-
export const useTaskInstanceServiceGetExtraLinks = <TData = Common.TaskInstanceServiceGetExtraLinksDefaultResponse, TError = unknown, TQueryKey extends Array<unknown> = unknown[]>({ dagId, dagRunId, mapIndex, taskId }: {
795+
export const useTaskInstanceServiceGetExtraLinks = <TData = Common.TaskInstanceServiceGetExtraLinksDefaultResponse, TError = unknown, TQueryKey extends Array<unknown> = unknown[]>({ dagId, dagRunId, mapIndex, taskId, tryNumber }: {
793796
dagId: string;
794797
dagRunId: string;
795798
mapIndex?: number;
796799
taskId: string;
797-
}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">) => useQuery<TData, TError>({ queryKey: Common.UseTaskInstanceServiceGetExtraLinksKeyFn({ dagId, dagRunId, mapIndex, taskId }, queryKey), queryFn: () => TaskInstanceService.getExtraLinks({ dagId, dagRunId, mapIndex, taskId }) as TData, ...options });
800+
tryNumber?: number;
801+
}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">) => useQuery<TData, TError>({ queryKey: Common.UseTaskInstanceServiceGetExtraLinksKeyFn({ dagId, dagRunId, mapIndex, taskId, tryNumber }, queryKey), queryFn: () => TaskInstanceService.getExtraLinks({ dagId, dagRunId, mapIndex, taskId, tryNumber }) as TData, ...options });
798802
/**
799803
* Get Task Instance
800804
* Get task instance.

airflow-core/src/airflow/ui/openapi-gen/queries/suspense.ts

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -769,15 +769,17 @@ export const useEventLogServiceGetEventLogsSuspense = <TData = Common.EventLogSe
769769
* @param data.dagRunId
770770
* @param data.taskId
771771
* @param data.mapIndex
772+
* @param data.tryNumber
772773
* @returns ExtraLinkCollectionResponse Successful Response
773774
* @throws ApiError
774775
*/
775-
export const useExtraLinksServiceGetExtraLinksSuspense = <TData = Common.ExtraLinksServiceGetExtraLinksDefaultResponse, TError = unknown, TQueryKey extends Array<unknown> = unknown[]>({ dagId, dagRunId, mapIndex, taskId }: {
776+
export const useExtraLinksServiceGetExtraLinksSuspense = <TData = Common.ExtraLinksServiceGetExtraLinksDefaultResponse, TError = unknown, TQueryKey extends Array<unknown> = unknown[]>({ dagId, dagRunId, mapIndex, taskId, tryNumber }: {
776777
dagId: string;
777778
dagRunId: string;
778779
mapIndex?: number;
779780
taskId: string;
780-
}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">) => useSuspenseQuery<TData, TError>({ queryKey: Common.UseExtraLinksServiceGetExtraLinksKeyFn({ dagId, dagRunId, mapIndex, taskId }, queryKey), queryFn: () => ExtraLinksService.getExtraLinks({ dagId, dagRunId, mapIndex, taskId }) as TData, ...options });
781+
tryNumber?: number;
782+
}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">) => useSuspenseQuery<TData, TError>({ queryKey: Common.UseExtraLinksServiceGetExtraLinksKeyFn({ dagId, dagRunId, mapIndex, taskId, tryNumber }, queryKey), queryFn: () => ExtraLinksService.getExtraLinks({ dagId, dagRunId, mapIndex, taskId, tryNumber }) as TData, ...options });
781783
/**
782784
* Get Extra Links
783785
* Get extra links for task instance.
@@ -786,15 +788,17 @@ export const useExtraLinksServiceGetExtraLinksSuspense = <TData = Common.ExtraLi
786788
* @param data.dagRunId
787789
* @param data.taskId
788790
* @param data.mapIndex
791+
* @param data.tryNumber
789792
* @returns ExtraLinkCollectionResponse Successful Response
790793
* @throws ApiError
791794
*/
792-
export const useTaskInstanceServiceGetExtraLinksSuspense = <TData = Common.TaskInstanceServiceGetExtraLinksDefaultResponse, TError = unknown, TQueryKey extends Array<unknown> = unknown[]>({ dagId, dagRunId, mapIndex, taskId }: {
795+
export const useTaskInstanceServiceGetExtraLinksSuspense = <TData = Common.TaskInstanceServiceGetExtraLinksDefaultResponse, TError = unknown, TQueryKey extends Array<unknown> = unknown[]>({ dagId, dagRunId, mapIndex, taskId, tryNumber }: {
793796
dagId: string;
794797
dagRunId: string;
795798
mapIndex?: number;
796799
taskId: string;
797-
}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">) => useSuspenseQuery<TData, TError>({ queryKey: Common.UseTaskInstanceServiceGetExtraLinksKeyFn({ dagId, dagRunId, mapIndex, taskId }, queryKey), queryFn: () => TaskInstanceService.getExtraLinks({ dagId, dagRunId, mapIndex, taskId }) as TData, ...options });
800+
tryNumber?: number;
801+
}, queryKey?: TQueryKey, options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">) => useSuspenseQuery<TData, TError>({ queryKey: Common.UseTaskInstanceServiceGetExtraLinksKeyFn({ dagId, dagRunId, mapIndex, taskId, tryNumber }, queryKey), queryFn: () => TaskInstanceService.getExtraLinks({ dagId, dagRunId, mapIndex, taskId, tryNumber }) as TData, ...options });
798802
/**
799803
* Get Task Instance
800804
* Get task instance.

0 commit comments

Comments
 (0)