Skip to content

Commit 2740c1c

Browse files
jschefflpotiuk
authored andcommitted
Fix core tests from start to SkipMixin for Database Isolation Mode (#41369)
* Skip core tests from start to SkipMixin for Database Isolation Mode * Skip core tests from start to SkipMixin for Database Isolation Mode, uups * Skip core tests from start to SkipMixin for Database Isolation Mode, uups (cherry picked from commit b87f987)
1 parent dc0548e commit 2740c1c

11 files changed

+104
-7
lines changed

tests/jobs/test_base_job.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,7 @@ def test_essential_attr(self, mock_getuser, mock_hostname, mock_init_executors,
267267
assert test_job.executor == mock_sequential_executor
268268
assert test_job.executors == [mock_sequential_executor]
269269

270+
@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
270271
def test_heartbeat(self, frozen_sleep, monkeypatch):
271272
monkeypatch.setattr("airflow.jobs.job.sleep", frozen_sleep)
272273
with create_session() as session:

tests/jobs/test_local_task_job.py

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ def test_localtaskjob_essential_attr(self, dag_maker):
109109
of LocalTaskJob can be assigned with
110110
proper values without intervention
111111
"""
112-
with dag_maker("test_localtaskjob_essential_attr"):
112+
with dag_maker("test_localtaskjob_essential_attr", serialized=True):
113113
op1 = EmptyOperator(task_id="op1")
114114

115115
dr = dag_maker.create_dagrun()
@@ -127,6 +127,7 @@ def test_localtaskjob_essential_attr(self, dag_maker):
127127
check_result_2 = [getattr(job1, attr) is not None for attr in essential_attr]
128128
assert all(check_result_2)
129129

130+
@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
130131
def test_localtaskjob_heartbeat(self, dag_maker):
131132
session = settings.Session()
132133
with dag_maker("test_localtaskjob_heartbeat"):
@@ -173,6 +174,7 @@ def test_localtaskjob_heartbeat(self, dag_maker):
173174
assert not job1.task_runner.run_as_user
174175
job_runner.heartbeat_callback()
175176

177+
@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
176178
@mock.patch("subprocess.check_call")
177179
@mock.patch("airflow.jobs.local_task_job_runner.psutil")
178180
def test_localtaskjob_heartbeat_with_run_as_user(self, psutil_mock, _, dag_maker):
@@ -227,6 +229,7 @@ def test_localtaskjob_heartbeat_with_run_as_user(self, psutil_mock, _, dag_maker
227229
assert ti.pid != job1.task_runner.process.pid
228230
job_runner.heartbeat_callback()
229231

232+
@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
230233
@conf_vars({("core", "default_impersonation"): "testuser"})
231234
@mock.patch("subprocess.check_call")
232235
@mock.patch("airflow.jobs.local_task_job_runner.psutil")
@@ -282,6 +285,7 @@ def test_localtaskjob_heartbeat_with_default_impersonation(self, psutil_mock, _,
282285
assert ti.pid != job1.task_runner.process.pid
283286
job_runner.heartbeat_callback()
284287

288+
@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
285289
def test_heartbeat_failed_fast(self):
286290
"""
287291
Test that task heartbeat will sleep when it fails fast
@@ -323,6 +327,7 @@ def test_heartbeat_failed_fast(self):
323327
delta = (time2 - time1).total_seconds()
324328
assert abs(delta - job.heartrate) < 0.8
325329

330+
@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
326331
@conf_vars({("core", "task_success_overtime"): "1"})
327332
def test_mark_success_no_kill(self, caplog, get_test_dag, session):
328333
"""
@@ -354,6 +359,7 @@ def test_mark_success_no_kill(self, caplog, get_test_dag, session):
354359
"State of this instance has been externally set to success. Terminating instance." in caplog.text
355360
)
356361

362+
@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
357363
def test_localtaskjob_double_trigger(self):
358364
dag = self.dagbag.dags.get("test_localtaskjob_double_trigger")
359365
task = dag.get_task("test_localtaskjob_double_trigger_task")
@@ -392,6 +398,7 @@ def test_localtaskjob_double_trigger(self):
392398

393399
session.close()
394400

401+
@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
395402
@patch.object(StandardTaskRunner, "return_code")
396403
@mock.patch("airflow.jobs.scheduler_job_runner.Stats.incr", autospec=True)
397404
def test_local_task_return_code_metric(self, mock_stats_incr, mock_return_code, create_dummy_dag):
@@ -424,6 +431,7 @@ def test_local_task_return_code_metric(self, mock_stats_incr, mock_return_code,
424431
]
425432
)
426433

434+
@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
427435
@patch.object(StandardTaskRunner, "return_code")
428436
def test_localtaskjob_maintain_heart_rate(self, mock_return_code, caplog, create_dummy_dag):
429437
dag, task = create_dummy_dag("test_localtaskjob_double_trigger")
@@ -456,6 +464,7 @@ def test_localtaskjob_maintain_heart_rate(self, mock_return_code, caplog, create
456464
assert time_end - time_start < job1.heartrate
457465
assert "Task exited with return code 0" in caplog.text
458466

467+
@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
459468
def test_mark_failure_on_failure_callback(self, caplog, get_test_dag):
460469
"""
461470
Test that ensures that mark_failure in the UI fails
@@ -488,6 +497,7 @@ def test_mark_failure_on_failure_callback(self, caplog, get_test_dag):
488497
"State of this instance has been externally set to failed. Terminating instance."
489498
) in caplog.text
490499

500+
@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
491501
def test_dagrun_timeout_logged_in_task_logs(self, caplog, get_test_dag):
492502
"""
493503
Test that ensures that if a running task is externally skipped (due to a dagrun timeout)
@@ -520,6 +530,7 @@ def test_dagrun_timeout_logged_in_task_logs(self, caplog, get_test_dag):
520530
assert ti.state == State.SKIPPED
521531
assert "DagRun timed out after " in caplog.text
522532

533+
@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
523534
def test_failure_callback_called_by_airflow_run_raw_process(self, monkeypatch, tmp_path, get_test_dag):
524535
"""
525536
Ensure failure callback of a task is run by the airflow run --raw process
@@ -555,6 +566,7 @@ def test_failure_callback_called_by_airflow_run_raw_process(self, monkeypatch, t
555566
assert m, "pid expected in output."
556567
assert os.getpid() != int(m.group(1))
557568

569+
@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
558570
@conf_vars({("core", "task_success_overtime"): "5"})
559571
def test_mark_success_on_success_callback(self, caplog, get_test_dag):
560572
"""
@@ -586,6 +598,7 @@ def test_mark_success_on_success_callback(self, caplog, get_test_dag):
586598
"State of this instance has been externally set to success. Terminating instance." in caplog.text
587599
)
588600

601+
@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
589602
def test_success_listeners_executed(self, caplog, get_test_dag):
590603
"""
591604
Test that ensures that when listeners are executed, the task is not killed before they finish
@@ -623,6 +636,7 @@ def test_success_listeners_executed(self, caplog, get_test_dag):
623636
)
624637
lm.clear()
625638

639+
@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
626640
@conf_vars({("core", "task_success_overtime"): "3"})
627641
def test_success_slow_listeners_executed_kill(self, caplog, get_test_dag):
628642
"""
@@ -659,6 +673,7 @@ def test_success_slow_listeners_executed_kill(self, caplog, get_test_dag):
659673
)
660674
lm.clear()
661675

676+
@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
662677
@conf_vars({("core", "task_success_overtime"): "3"})
663678
def test_success_slow_task_not_killed_by_overtime_but_regular_timeout(self, caplog, get_test_dag):
664679
"""
@@ -698,6 +713,7 @@ def test_success_slow_task_not_killed_by_overtime_but_regular_timeout(self, capl
698713
)
699714
lm.clear()
700715

716+
@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
701717
@pytest.mark.parametrize("signal_type", [signal.SIGTERM, signal.SIGKILL])
702718
def test_process_os_signal_calls_on_failure_callback(
703719
self, monkeypatch, tmp_path, get_test_dag, signal_type
@@ -792,6 +808,7 @@ def send_signal(ti, signal_sent, sig):
792808
lines = f.readlines()
793809
assert len(lines) == 0
794810

811+
@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
795812
@pytest.mark.parametrize(
796813
"conf, init_state, first_run_state, second_run_state, task_ids_to_run, error_message",
797814
[
@@ -876,6 +893,7 @@ def test_fast_follow(
876893
if scheduler_job_runner.processor_agent:
877894
scheduler_job_runner.processor_agent.end()
878895

896+
@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
879897
@conf_vars({("scheduler", "schedule_after_task_execution"): "True"})
880898
def test_mini_scheduler_works_with_wait_for_upstream(self, caplog, get_test_dag):
881899
dag = get_test_dag("test_dagrun_fast_follow")
@@ -944,7 +962,7 @@ def task_function(ti):
944962

945963
os.kill(psutil.Process(os.getpid()).ppid(), signal.SIGSEGV)
946964

947-
with dag_maker(dag_id="test_segmentation_fault"):
965+
with dag_maker(dag_id="test_segmentation_fault", serialized=True):
948966
task = PythonOperator(
949967
task_id="test_sigsegv",
950968
python_callable=task_function,
@@ -975,7 +993,7 @@ def test_number_of_queries_single_loop(mock_get_task_runner, dag_maker):
975993
mock_get_task_runner.return_value.return_code.side_effects = [[0], codes]
976994

977995
unique_prefix = str(uuid.uuid4())
978-
with dag_maker(dag_id=f"{unique_prefix}_test_number_of_queries"):
996+
with dag_maker(dag_id=f"{unique_prefix}_test_number_of_queries", serialized=True):
979997
task = EmptyOperator(task_id="test_state_succeeded1")
980998

981999
dr = dag_maker.create_dagrun(run_id=unique_prefix, state=State.NONE)
@@ -992,6 +1010,7 @@ def test_number_of_queries_single_loop(mock_get_task_runner, dag_maker):
9921010
class TestSigtermOnRunner:
9931011
"""Test receive SIGTERM on Task Runner."""
9941012

1013+
@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
9951014
@pytest.mark.parametrize(
9961015
"daemon", [pytest.param(True, id="daemon"), pytest.param(False, id="non-daemon")]
9971016
)

tests/jobs/test_triggerer_job.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ def create_trigger_in_db(session, trigger, operator=None):
113113
return dag_model, run, trigger_orm, task_instance
114114

115115

116+
@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
116117
def test_trigger_logging_sensitive_info(session, caplog):
117118
"""
118119
Checks that when a trigger fires, it doesn't log any sensitive
@@ -176,6 +177,7 @@ def test_is_alive():
176177
assert not triggerer_job.is_alive(), "Completed jobs even with recent heartbeat should not be alive"
177178

178179

180+
@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
179181
def test_is_needed(session):
180182
"""Checks the triggerer-is-needed logic"""
181183
# No triggers, no need
@@ -219,6 +221,7 @@ def test_capacity_decode():
219221
TriggererJobRunner(job=job, capacity=input_str)
220222

221223

224+
@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
222225
def test_trigger_lifecycle(session):
223226
"""
224227
Checks that the triggerer will correctly see a new Trigger in the database
@@ -309,6 +312,7 @@ def test_update_trigger_with_triggerer_argument_change(
309312
assert "got an unexpected keyword argument 'not_exists_arg'" in caplog.text
310313

311314

315+
@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
312316
@pytest.mark.asyncio
313317
async def test_trigger_create_race_condition_38599(session, tmp_path):
314318
"""
@@ -389,6 +393,7 @@ async def test_trigger_create_race_condition_38599(session, tmp_path):
389393
assert path.read_text() == "hi\n"
390394

391395

396+
@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
392397
def test_trigger_create_race_condition_18392(session, tmp_path):
393398
"""
394399
This verifies the resolution of race condition documented in github issue #18392.
@@ -499,6 +504,7 @@ def handle_events(self):
499504
assert len(instances) == 1
500505

501506

507+
@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
502508
def test_trigger_from_dead_triggerer(session, create_task_instance):
503509
"""
504510
Checks that the triggerer will correctly claim a Trigger that is assigned to a
@@ -526,6 +532,7 @@ def test_trigger_from_dead_triggerer(session, create_task_instance):
526532
assert [x for x, y in job_runner.trigger_runner.to_create] == [1]
527533

528534

535+
@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
529536
def test_trigger_from_expired_triggerer(session, create_task_instance):
530537
"""
531538
Checks that the triggerer will correctly claim a Trigger that is assigned to a
@@ -560,6 +567,7 @@ def test_trigger_from_expired_triggerer(session, create_task_instance):
560567
assert [x for x, y in job_runner.trigger_runner.to_create] == [1]
561568

562569

570+
@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
563571
def test_trigger_runner_exception_stops_triggerer(session):
564572
"""
565573
Checks that if an exception occurs when creating triggers, that the triggerer
@@ -603,6 +611,7 @@ async def create_triggers(self):
603611
thread.join()
604612

605613

614+
@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
606615
def test_trigger_firing(session):
607616
"""
608617
Checks that when a trigger fires, it correctly makes it into the
@@ -633,6 +642,7 @@ def test_trigger_firing(session):
633642
job_runner.trigger_runner.join(30)
634643

635644

645+
@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
636646
def test_trigger_failing(session):
637647
"""
638648
Checks that when a trigger fails, it correctly makes it into the
@@ -667,6 +677,7 @@ def test_trigger_failing(session):
667677
job_runner.trigger_runner.join(30)
668678

669679

680+
@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
670681
def test_trigger_cleanup(session):
671682
"""
672683
Checks that the triggerer will correctly clean up triggers that do not
@@ -686,6 +697,7 @@ def test_trigger_cleanup(session):
686697
assert session.query(Trigger).count() == 0
687698

688699

700+
@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
689701
def test_invalid_trigger(session, dag_maker):
690702
"""
691703
Checks that the triggerer will correctly fail task instances that depend on

tests/models/test_baseoperator.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1037,6 +1037,7 @@ def get_states(dr):
10371037
return dict(ti_dict)
10381038

10391039

1040+
@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
10401041
@pytest.mark.db_test
10411042
def test_teardown_and_fail_stop(dag_maker):
10421043
"""
@@ -1082,6 +1083,7 @@ def my_teardown():
10821083
assert states == expected
10831084

10841085

1086+
@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
10851087
@pytest.mark.db_test
10861088
def test_get_task_instances(session):
10871089
import pendulum

tests/models/test_baseoperatormeta.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ def setup_method(self):
4747
def teardown_method(self, method):
4848
ExecutorSafeguard.test_mode = conf.getboolean("core", "unit_test_mode")
4949

50+
@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
5051
@pytest.mark.db_test
5152
def test_executor_when_classic_operator_called_from_dag(self, dag_maker):
5253
with dag_maker() as dag:
@@ -55,6 +56,7 @@ def test_executor_when_classic_operator_called_from_dag(self, dag_maker):
5556
dag_run = dag.test()
5657
assert dag_run.state == DagRunState.SUCCESS
5758

59+
@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
5860
@pytest.mark.parametrize(
5961
"state, exception, retries",
6062
[
@@ -101,6 +103,7 @@ def _raise_if_exception():
101103
assert ti.next_kwargs is None
102104
assert ti.state == state
103105

106+
@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
104107
@pytest.mark.db_test
105108
def test_executor_when_classic_operator_called_from_decorated_task_with_allow_nested_operators_false(
106109
self, dag_maker
@@ -117,6 +120,7 @@ def say_hello(**context):
117120
dag_run = dag.test()
118121
assert dag_run.state == DagRunState.FAILED
119122

123+
@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
120124
@pytest.mark.db_test
121125
@patch.object(HelloWorldOperator, "log")
122126
def test_executor_when_classic_operator_called_from_decorated_task_without_allow_nested_operators(
@@ -139,6 +143,7 @@ def say_hello(**context):
139143
"HelloWorldOperator.execute cannot be called outside TaskInstance!"
140144
)
141145

146+
@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
142147
@pytest.mark.db_test
143148
def test_executor_when_classic_operator_called_from_python_operator_with_allow_nested_operators_false(
144149
self,
@@ -159,6 +164,7 @@ def say_hello(**context):
159164
dag_run = dag.test()
160165
assert dag_run.state == DagRunState.FAILED
161166

167+
@pytest.mark.skip_if_database_isolation_mode # Does not work in db isolation mode
162168
@pytest.mark.db_test
163169
@patch.object(HelloWorldOperator, "log")
164170
def test_executor_when_classic_operator_called_from_python_operator_without_allow_nested_operators(

0 commit comments

Comments
 (0)