Skip to content

Commit d81b297

Browse files
uranusjrjedcunningham
authored andcommitted
Use COALESCE when ordering runs to handle NULL (#26626)
Data interval columns are NULL for runs created before 2.3, but SQL's NULL-sorting logic would make those old runs always appear first. In a perfect world we'd want to sort by get_run_data_interval(), but that's not efficient, so instead the columns are coalesced into logical date, which is good enough in most cases. (cherry picked from commit 22d52c0)
1 parent 0b9a540 commit d81b297

File tree

2 files changed

+39
-10
lines changed

2 files changed

+39
-10
lines changed

airflow/www/utils.py

Lines changed: 38 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,9 @@
2020
import json
2121
import textwrap
2222
import time
23-
from typing import Any
23+
from typing import TYPE_CHECKING, Any, Sequence
2424
from urllib.parse import urlencode
2525

26-
import sqlalchemy as sqla
2726
from flask import request, url_for
2827
from flask.helpers import flash
2928
from flask_appbuilder.forms import FieldConverter
@@ -37,11 +36,12 @@
3736
from pendulum.datetime import DateTime
3837
from pygments import highlight, lexers
3938
from pygments.formatters import HtmlFormatter
39+
from sqlalchemy import func, types
4040
from sqlalchemy.ext.associationproxy import AssociationProxy
4141

42-
from airflow import models
4342
from airflow.exceptions import RemovedInAirflow3Warning
4443
from airflow.models import errors
44+
from airflow.models.dagrun import DagRun
4545
from airflow.models.dagwarning import DagWarning
4646
from airflow.models.taskinstance import TaskInstance
4747
from airflow.utils import timezone
@@ -51,6 +51,10 @@
5151
from airflow.www.forms import DateTimeWithTimezoneField
5252
from airflow.www.widgets import AirflowDateTimePickerWidget
5353

54+
if TYPE_CHECKING:
55+
from sqlalchemy.orm.query import Query
56+
from sqlalchemy.sql.operators import ColumnOperators
57+
5458

5559
def datetime_to_string(value: DateTime | None) -> str | None:
5660
if value is None:
@@ -129,7 +133,7 @@ def get_mapped_summary(parent_instance, task_instances):
129133
}
130134

131135

132-
def encode_dag_run(dag_run: models.DagRun | None) -> dict[str, Any] | None:
136+
def encode_dag_run(dag_run: DagRun | None) -> dict[str, Any] | None:
133137
if not dag_run:
134138
return None
135139

@@ -436,6 +440,34 @@ def dag_run_link(attr):
436440
return Markup('<a href="{url}">{run_id}</a>').format(url=url, run_id=run_id)
437441

438442

443+
def _get_run_ordering_expr(name: str) -> ColumnOperators:
444+
expr = DagRun.__table__.columns[name]
445+
# Data interval columns are NULL for runs created before 2.3, but SQL's
446+
# NULL-sorting logic would make those old runs always appear first. In a
447+
# perfect world we'd want to sort by ``get_run_data_interval()``, but that's
448+
# not efficient, so instead the columns are coalesced into execution_date,
449+
# which is good enough in most cases.
450+
if name in ("data_interval_start", "data_interval_end"):
451+
expr = func.coalesce(expr, DagRun.execution_date)
452+
return expr.desc()
453+
454+
455+
def sorted_dag_runs(query: Query, *, ordering: Sequence[str], limit: int) -> Sequence[DagRun]:
456+
"""Produce DAG runs sorted by specified columns.
457+
458+
:param query: An ORM query object against *DagRun*.
459+
:param ordering: Column names to sort the runs. should generally come from a
460+
timetable's ``run_ordering``.
461+
:param limit: Number of runs to limit to.
462+
:return: A list of DagRun objects ordered by the specified columns. The list
463+
contains only the *last* objects, but in *ascending* order.
464+
"""
465+
ordering_exprs = (_get_run_ordering_expr(name) for name in ordering)
466+
runs = query.order_by(*ordering_exprs, DagRun.id.desc()).limit(limit).all()
467+
runs.reverse()
468+
return runs
469+
470+
439471
def format_map_index(attr: dict) -> str:
440472
"""Format map index for list columns in model view."""
441473
value = attr['map_index']
@@ -651,7 +683,7 @@ def is_utcdatetime(self, col_name):
651683
obj = self.list_columns[col_name].type
652684
return (
653685
isinstance(obj, UtcDateTime)
654-
or isinstance(obj, sqla.types.TypeDecorator)
686+
or isinstance(obj, types.TypeDecorator)
655687
and isinstance(obj.impl, UtcDateTime)
656688
)
657689
return False
@@ -664,7 +696,7 @@ def is_extendedjson(self, col_name):
664696
obj = self.list_columns[col_name].type
665697
return (
666698
isinstance(obj, ExtendedJSON)
667-
or isinstance(obj, sqla.types.TypeDecorator)
699+
or isinstance(obj, types.TypeDecorator)
668700
and isinstance(obj.impl, ExtendedJSON)
669701
)
670702
return False

airflow/www/views.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3454,10 +3454,7 @@ def grid_data(self):
34543454
if run_state:
34553455
query = query.filter(DagRun.state == run_state)
34563456

3457-
ordering = (DagRun.__table__.columns[name].desc() for name in dag.timetable.run_ordering)
3458-
dag_runs = query.order_by(*ordering, DagRun.id.desc()).limit(num_runs).all()
3459-
dag_runs.reverse()
3460-
3457+
dag_runs = wwwutils.sorted_dag_runs(query, ordering=dag.timetable.run_ordering, limit=num_runs)
34613458
encoded_runs = [wwwutils.encode_dag_run(dr) for dr in dag_runs]
34623459
data = {
34633460
'groups': dag_to_grid(dag, dag_runs, session),

0 commit comments

Comments
 (0)