Skip to content

Commit 201a9c5

Browse files
Add accept_none to SQLTableCheckOperator (#63210)
* issue-30082: Adding accept_none to SQLTableCheckOperator * Update providers/common/sql/src/airflow/providers/common/sql/operators/sql.py Co-authored-by: Elad Kalif <45845474+eladkal@users.noreply.github.com> * fix static checks --------- Co-authored-by: Elad Kalif <45845474+eladkal@users.noreply.github.com>
1 parent 48ef5d2 commit 201a9c5

File tree

2 files changed

+28
-0
lines changed
  • providers/common/sql
    • src/airflow/providers/common/sql/operators
    • tests/unit/common/sql/operators

2 files changed

+28
-0
lines changed

providers/common/sql/src/airflow/providers/common/sql/operators/sql.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -714,6 +714,7 @@ class SQLTableCheckOperator(BaseSQLOperator):
714714
715715
:param conn_id: the connection ID used to connect to the database
716716
:param database: name of database which overwrite the defined one in connection
717+
:param accept_none: If True, an empty table (row count=0) will not trigger a failure.
717718
718719
.. seealso::
719720
For more information on how to use this operator, take a look at the guide:
@@ -738,20 +739,32 @@ def __init__(
738739
partition_clause: str | None = None,
739740
conn_id: str | None = None,
740741
database: str | None = None,
742+
accept_none: bool = False,
741743
**kwargs,
742744
):
743745
super().__init__(conn_id=conn_id, database=database, **kwargs)
744746

745747
self.table = table
746748
self.checks = checks
747749
self.partition_clause = _initialize_partition_clause(partition_clause)
750+
self.accept_none = accept_none
748751
self.sql = f"SELECT check_name, check_result FROM ({self._generate_sql_query()}) AS check_table"
749752

750753
def execute(self, context: Context):
751754
hook = self.get_db_hook()
752755
records = hook.get_records(self.sql)
753756

754757
if not records:
758+
# accept_none prevents an error from being thrown if there are no records in the table
759+
if self.accept_none:
760+
self.log.warning(
761+
"No records found for table: %s, but accept_none=True, "
762+
"therefore, no tests are being marked as failed.",
763+
self.table,
764+
)
765+
return
766+
767+
# Otherwise, we'll raise an exception
755768
self._raise_exception(f"The following query returned zero rows: {self.sql}")
756769

757770
self.log.info("Record:\n%s", records)

providers/common/sql/tests/unit/common/sql/operators/test_sql.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -605,6 +605,21 @@ def test_sql_check_partition_clause_templating(self, conn_id):
605605
finally:
606606
hook.run(["DROP TABLE employees"])
607607

608+
def test_sql_check_accept_none_true(self, monkeypatch):
609+
"""Validate that empty table does not fail when accept_none=True."""
610+
records = []
611+
operator = self._construct_operator(monkeypatch, self.checks, records)
612+
operator.accept_none = True
613+
operator.execute(context=MagicMock())
614+
615+
def test_sql_check_accept_none_false(self, monkeypatch):
616+
"""Validate that empty table throws an exception when accept_none=False."""
617+
records = []
618+
operator = self._construct_operator(monkeypatch, self.checks, records)
619+
operator.accept_none = False # This is default, technically
620+
with pytest.raises(AirflowException):
621+
operator.execute(context=MagicMock())
622+
608623
def test_pass_all_checks_check(self, monkeypatch):
609624
records = [("row_count_check", 1), ("column_sum_check", "y")]
610625
operator = self._construct_operator(monkeypatch, self.checks, records)

0 commit comments

Comments
 (0)