Skip to content

Commit 0967259

Browse files
committed
Clear autoregistered DAGs if there are any import errors (#26398)
We need to clear any autoregistered DAGs that may have been already registered if we encounter any import errors while parsing a given DAG file. This maintains the behavior before we autoregistered DAGs. (cherry picked from commit 01e3fb7)
1 parent 62322ef commit 0967259

File tree

4 files changed

+55
-0
lines changed

4 files changed

+55
-0
lines changed

airflow/models/dagbag.py

+2
Original file line numberDiff line numberDiff line change
@@ -326,6 +326,7 @@ def parse(mod_name, filepath):
326326
loader.exec_module(new_module)
327327
return [new_module]
328328
except Exception as e:
329+
DagContext.autoregistered_dags.clear()
329330
self.log.exception("Failed to import: %s", filepath)
330331
if self.dagbag_import_error_tracebacks:
331332
self.import_errors[filepath] = traceback.format_exc(
@@ -391,6 +392,7 @@ def _load_modules_from_zip(self, filepath, safe_mode):
391392
current_module = importlib.import_module(mod_name)
392393
mods.append(current_module)
393394
except Exception as e:
395+
DagContext.autoregistered_dags.clear()
394396
fileloc = os.path.join(filepath, zip_info.filename)
395397
self.log.exception("Failed to import: %s", fileloc)
396398
if self.dagbag_import_error_tracebacks:

tests/dags/test_invalid_dup_task.py

+30
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
from __future__ import annotations
18+
19+
from datetime import datetime
20+
21+
from airflow import DAG
22+
from airflow.operators.empty import EmptyOperator
23+
24+
with DAG(
25+
"test_invalid_dup_task",
26+
start_date=datetime(2021, 1, 1),
27+
schedule="@once",
28+
):
29+
EmptyOperator(task_id="hi")
30+
EmptyOperator(task_id="hi")

tests/jobs/test_scheduler_job.py

+1
Original file line numberDiff line numberDiff line change
@@ -2766,6 +2766,7 @@ def test_list_py_file_paths(self):
27662766
ignored_files = {
27672767
'no_dags.py',
27682768
'test_invalid_cron.py',
2769+
'test_invalid_dup_task.py',
27692770
'test_ignore_this.py',
27702771
'test_invalid_param.py',
27712772
'test_nested_dag.py',

tests/models/test_dagbag.py

+22
Original file line numberDiff line numberDiff line change
@@ -357,6 +357,28 @@ def test_get_dag_registration(self, file_to_load, expected):
357357
assert dag, f"{dag_id} was bagged"
358358
assert dag.fileloc.endswith(path)
359359

360+
def test_dag_registration_with_failure(self):
361+
dagbag = models.DagBag(dag_folder=os.devnull, include_examples=False)
362+
found = dagbag.process_file(str(TEST_DAGS_FOLDER / 'test_invalid_dup_task.py'))
363+
assert [] == found
364+
365+
@pytest.fixture()
366+
def zip_with_valid_dag_and_dup_tasks(self, tmp_path: pathlib.Path) -> Iterator[str]:
367+
failing_dag_file = TEST_DAGS_FOLDER / 'test_invalid_dup_task.py'
368+
working_dag_file = TEST_DAGS_FOLDER / 'test_example_bash_operator.py'
369+
zipped = os.path.join(tmp_path, "test_zip_invalid_dup_task.zip")
370+
with zipfile.ZipFile(zipped, "w") as zf:
371+
zf.write(failing_dag_file, os.path.basename(failing_dag_file))
372+
zf.write(working_dag_file, os.path.basename(working_dag_file))
373+
yield zipped
374+
os.unlink(zipped)
375+
376+
def test_dag_registration_with_failure_zipped(self, zip_with_valid_dag_and_dup_tasks):
377+
dagbag = models.DagBag(dag_folder=os.devnull, include_examples=False)
378+
found = dagbag.process_file(zip_with_valid_dag_and_dup_tasks)
379+
assert 1 == len(found)
380+
assert ['test_example_bash_operator'] == [dag.dag_id for dag in found]
381+
360382
@patch.object(DagModel, "get_current")
361383
def test_refresh_py_dag(self, mock_dagmodel):
362384
"""

0 commit comments

Comments
 (0)