Skip to content

Commit c497cda

Browse files
committed
fix(scheduler): set end_date on tasks skipped by dagrun timeout
When a DAG run times out via dagrun_timeout, unfinished tasks are marked as SKIPPED but end_date was not set. This caused task duration to keep increasing in the UI even though the task was already skipped. Set end_date to the current time when marking tasks as SKIPPED during DAG run timeout handling. Closes: #58536
1 parent 26a700f commit c497cda

File tree

2 files changed

+42
-0
lines changed

2 files changed

+42
-0
lines changed

airflow-core/src/airflow/jobs/scheduler_job_runner.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2328,8 +2328,10 @@ def _schedule_dag_run(
23282328
key=lambda ti: ti.start_date or timezone.make_aware(datetime.min),
23292329
default=None,
23302330
)
2331+
now = timezone.utcnow()
23312332
for task_instance in unfinished_task_instances:
23322333
task_instance.state = TaskInstanceState.SKIPPED
2334+
task_instance.end_date = now
23332335
session.merge(task_instance)
23342336
session.flush()
23352337
self.log.info("Run %s of %s has timed-out", dag_run.run_id, dag_run.dag_id)

airflow-core/tests/unit/jobs/test_scheduler_job.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3387,6 +3387,46 @@ def test_dagrun_timeout_fails_run(self, dag_maker):
33873387
session.rollback()
33883388
session.close()
33893389

3390+
def test_dagrun_timeout_sets_end_date_on_skipped_tasks(self, dag_maker):
3391+
"""Test that when a DAG run times out, skipped tasks get end_date set."""
3392+
session = settings.Session()
3393+
with dag_maker(
3394+
dag_id="test_dagrun_timeout_sets_end_date",
3395+
dagrun_timeout=datetime.timedelta(seconds=60),
3396+
session=session,
3397+
):
3398+
EmptyOperator(task_id="task1")
3399+
EmptyOperator(task_id="task2")
3400+
3401+
dr = dag_maker.create_dagrun(start_date=timezone.utcnow() - datetime.timedelta(days=1))
3402+
3403+
# Set tasks to running state with a start_date but no end_date
3404+
tis = dr.get_task_instances(session=session)
3405+
for ti in tis:
3406+
ti.state = State.RUNNING
3407+
ti.start_date = timezone.utcnow() - datetime.timedelta(hours=1)
3408+
ti.end_date = None
3409+
session.merge(ti)
3410+
session.flush()
3411+
3412+
scheduler_job = Job()
3413+
self.job_runner = SchedulerJobRunner(job=scheduler_job)
3414+
3415+
self.job_runner._schedule_dag_run(dr, session)
3416+
session.flush()
3417+
3418+
session.refresh(dr)
3419+
assert dr.state == State.FAILED
3420+
3421+
tis = dr.get_task_instances(session=session)
3422+
for ti in tis:
3423+
session.refresh(ti)
3424+
assert ti.state == TaskInstanceState.SKIPPED
3425+
assert ti.end_date is not None, f"end_date should be set for skipped task {ti.task_id}"
3426+
3427+
session.rollback()
3428+
session.close()
3429+
33903430
def test_dagrun_timeout_fails_run_and_update_next_dagrun(self, dag_maker):
33913431
"""
33923432
Test that dagrun timeout fails run and update the next dagrun

0 commit comments

Comments
 (0)