Skip to content

Commit b86bb24

Browse files
vVv-AAJasmin
and
Jasmin
authored
37437 (#43107)
Allow SqlSensor to inspect the entire result row by adding a selector field. This is useful to customize the success/failure criteria instead of just the first cell. Co-authored-by: Jasmin <[email protected]>
1 parent 15f09c7 commit b86bb24

File tree

2 files changed

+39
-9
lines changed
  • providers

2 files changed

+39
-9
lines changed

providers/src/airflow/providers/common/sql/sensors/sql.py

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
# under the License.
1717
from __future__ import annotations
1818

19+
from operator import itemgetter
1920
from typing import TYPE_CHECKING, Any, Callable, Mapping, Sequence
2021

2122
from airflow.exceptions import AirflowException
@@ -46,10 +47,12 @@ class SqlSensor(BaseSensorOperator):
4647
:param sql: The SQL to run. To pass, it needs to return at least one cell
4748
that contains a non-zero / empty string value.
4849
:param parameters: The parameters to render the SQL query with (optional).
49-
:param success: Success criteria for the sensor is a Callable that takes the first_cell's value
50-
as the only argument, and returns a boolean (optional).
51-
:param failure: Failure criteria for the sensor is a Callable that takes the first_cell's value
52-
as the only argument and returns a boolean (optional).
50+
:param success: Success criteria for the sensor is a Callable that takes the output
51+
of selector as the only argument, and returns a boolean (optional).
52+
:param failure: Failure criteria for the sensor is a Callable that takes the output
53+
of selector as the only argument and returns a boolean (optional).
54+
:param selector: Function which takes the resulting row and transforms it before
55+
it is passed to success or failure (optional). Takes the first cell by default.
5356
:param fail_on_empty: Explicitly fail on no rows returned.
5457
:param hook_params: Extra config params to be passed to the underlying hook.
5558
Should match the desired hook constructor params.
@@ -67,6 +70,7 @@ def __init__(
6770
parameters: Mapping[str, Any] | None = None,
6871
success: Callable[[Any], bool] | None = None,
6972
failure: Callable[[Any], bool] | None = None,
73+
selector: Callable[[tuple[Any]], Any] | None = itemgetter(0),
7074
fail_on_empty: bool = False,
7175
hook_params: Mapping[str, Any] | None = None,
7276
**kwargs,
@@ -76,6 +80,7 @@ def __init__(
7680
self.parameters = parameters
7781
self.success = success
7882
self.failure = failure
83+
self.selector = selector
7984
self.fail_on_empty = fail_on_empty
8085
self.hook_params = hook_params
8186
super().__init__(**kwargs)
@@ -102,20 +107,20 @@ def poke(self, context: Context) -> bool:
102107
else:
103108
return False
104109

105-
first_cell = records[0][0]
110+
condition = self.selector(records[0])
106111
if self.failure is not None:
107112
if callable(self.failure):
108-
if self.failure(first_cell):
109-
message = f"Failure criteria met. self.failure({first_cell}) returned True"
113+
if self.failure(condition):
114+
message = f"Failure criteria met. self.failure({condition}) returned True"
110115
raise AirflowException(message)
111116
else:
112117
message = f"self.failure is present, but not callable -> {self.failure}"
113118
raise AirflowException(message)
114119

115120
if self.success is not None:
116121
if callable(self.success):
117-
return self.success(first_cell)
122+
return self.success(condition)
118123
else:
119124
message = f"self.success is present, but not callable -> {self.success}"
120125
raise AirflowException(message)
121-
return bool(first_cell)
126+
return bool(condition)

providers/tests/common/sql/sensors/test_sql.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,31 @@ def test_sql_sensor_postgres_poke_invalid_success(
264264
op.poke({})
265265
assert "self.success is present, but not callable -> [1]" == str(ctx.value)
266266

267+
@pytest.mark.backend("postgres")
268+
def test_sql_sensor_postgres_with_selector(self):
269+
op1 = SqlSensor(
270+
task_id="sql_sensor_check_1",
271+
conn_id="postgres_default",
272+
sql="SELECT 0, 1",
273+
dag=self.dag,
274+
success=lambda x: x in [1],
275+
failure=lambda x: x in [0],
276+
selector=lambda x: x[1],
277+
)
278+
op1.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)
279+
280+
op2 = SqlSensor(
281+
task_id="sql_sensor_check_2",
282+
conn_id="postgres_default",
283+
sql="SELECT 0, 1",
284+
dag=self.dag,
285+
success=lambda x: x in [1],
286+
failure=lambda x: x in [0],
287+
selector=lambda x: x[0],
288+
)
289+
with pytest.raises(AirflowException):
290+
op2.poke({})
291+
267292
@pytest.mark.db_test
268293
def test_sql_sensor_hook_params(self):
269294
op = SqlSensor(

0 commit comments

Comments
 (0)