Skip to content

Commit 06edc79

Browse files
committed
fix(scheduler): set end_date on tasks skipped by dagrun timeout
When a DAG run times out via dagrun_timeout, unfinished tasks are marked as SKIPPED but end_date was not set. This caused task duration to keep increasing in the UI even though the task was already skipped. Set end_date to the current time when marking tasks as SKIPPED during DAG run timeout handling. Closes: #58536
1 parent 0f68191 commit 06edc79

File tree

2 files changed

+42
-0
lines changed

2 files changed

+42
-0
lines changed

airflow-core/src/airflow/jobs/scheduler_job_runner.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2309,8 +2309,10 @@ def _schedule_dag_run(
23092309
key=lambda ti: ti.start_date or timezone.make_aware(datetime.min),
23102310
default=None,
23112311
)
2312+
now = timezone.utcnow()
23122313
for task_instance in unfinished_task_instances:
23132314
task_instance.state = TaskInstanceState.SKIPPED
2315+
task_instance.end_date = now
23142316
session.merge(task_instance)
23152317
session.flush()
23162318
self.log.info("Run %s of %s has timed-out", dag_run.run_id, dag_run.dag_id)

airflow-core/tests/unit/jobs/test_scheduler_job.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3386,6 +3386,46 @@ def test_dagrun_timeout_fails_run(self, dag_maker):
33863386
session.rollback()
33873387
session.close()
33883388

3389+
def test_dagrun_timeout_sets_end_date_on_skipped_tasks(self, dag_maker):
3390+
"""Test that when a DAG run times out, skipped tasks get end_date set."""
3391+
session = settings.Session()
3392+
with dag_maker(
3393+
dag_id="test_dagrun_timeout_sets_end_date",
3394+
dagrun_timeout=datetime.timedelta(seconds=60),
3395+
session=session,
3396+
):
3397+
EmptyOperator(task_id="task1")
3398+
EmptyOperator(task_id="task2")
3399+
3400+
dr = dag_maker.create_dagrun(start_date=timezone.utcnow() - datetime.timedelta(days=1))
3401+
3402+
# Set tasks to running state with a start_date but no end_date
3403+
tis = dr.get_task_instances(session=session)
3404+
for ti in tis:
3405+
ti.state = State.RUNNING
3406+
ti.start_date = timezone.utcnow() - datetime.timedelta(hours=1)
3407+
ti.end_date = None
3408+
session.merge(ti)
3409+
session.flush()
3410+
3411+
scheduler_job = Job()
3412+
self.job_runner = SchedulerJobRunner(job=scheduler_job)
3413+
3414+
self.job_runner._schedule_dag_run(dr, session)
3415+
session.flush()
3416+
3417+
session.refresh(dr)
3418+
assert dr.state == State.FAILED
3419+
3420+
tis = dr.get_task_instances(session=session)
3421+
for ti in tis:
3422+
session.refresh(ti)
3423+
assert ti.state == TaskInstanceState.SKIPPED
3424+
assert ti.end_date is not None, f"end_date should be set for skipped task {ti.task_id}"
3425+
3426+
session.rollback()
3427+
session.close()
3428+
33893429
def test_dagrun_timeout_fails_run_and_update_next_dagrun(self, dag_maker):
33903430
"""
33913431
Test that dagrun timeout fails run and update the next dagrun

0 commit comments

Comments
 (0)