Skip to content

Commit 261bf9c

Browse files
XD-DENGCloud Composer Team
authored and
Cloud Composer Team
committed
Clearer code for PodGenerator.deserialize_model_file (#26641)
Especially for how it handles non-existent file. When the file path received doesn't exist, the current way is to use yaml.safe_load() to process it, and it returns the path as a string. Then this string is passed to deserialize_model_dict() and results in an empty object. Passing 'None' to deserialize_model_dict() will do the same, but the code will become clearer, and less misleading. Meanwhile, when the model file path received doesn't exist, there should be a warning in the log. (This change shouldn't cause any behaviour change) GitOrigin-RevId: 35deda4f44997195d95855fcb19a701c25a0494b
1 parent 32b7abc commit 261bf9c

File tree

3 files changed

+43
-36
lines changed

3 files changed

+43
-36
lines changed

airflow/kubernetes/pod_generator.py

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import copy
2626
import datetime
2727
import hashlib
28+
import logging
2829
import os
2930
import re
3031
import uuid
@@ -40,6 +41,8 @@
4041
from airflow.utils import yaml
4142
from airflow.version import version as airflow_version
4243

44+
log = logging.getLogger(__name__)
45+
4346
MAX_LABEL_LEN = 63
4447

4548

@@ -412,24 +415,25 @@ def deserialize_model_file(path: str) -> k8s.V1Pod:
412415
"""
413416
:param path: Path to the file
414417
:return: a kubernetes.client.models.V1Pod
415-
416-
Unfortunately we need access to the private method
417-
``_ApiClient__deserialize_model`` from the kubernetes client.
418-
This issue is tracked here; https://github.com/kubernetes-client/python/issues/977.
419418
"""
420419
if os.path.exists(path):
421420
with open(path) as stream:
422421
pod = yaml.safe_load(stream)
423422
else:
424-
pod = yaml.safe_load(path)
423+
pod = None
424+
log.warning("Model file %s does not exist", path)
425425

426426
return PodGenerator.deserialize_model_dict(pod)
427427

428428
@staticmethod
429-
def deserialize_model_dict(pod_dict: dict) -> k8s.V1Pod:
429+
def deserialize_model_dict(pod_dict: dict | None) -> k8s.V1Pod:
430430
"""
431431
Deserializes python dictionary to k8s.V1Pod
432432
433+
Unfortunately we need access to the private method
434+
``_ApiClient__deserialize_model`` from the kubernetes client.
435+
This issue is tracked here; https://github.com/kubernetes-client/python/issues/977.
436+
433437
:param pod_dict: Serialized dict of k8s.V1Pod object
434438
:return: De-serialized k8s.V1Pod
435439
"""

tests/executors/test_kubernetes_executor.py

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,13 @@
2020
import random
2121
import re
2222
import string
23+
import sys
2324
import unittest
2425
from datetime import datetime, timedelta
2526
from unittest import mock
2627

2728
import pytest
29+
import yaml
2830
from kubernetes.client import models as k8s
2931
from kubernetes.client.rest import ApiException
3032
from urllib3 import HTTPResponse
@@ -100,14 +102,33 @@ def test_create_pod_id(self):
100102
@mock.patch("airflow.kubernetes.pod_generator.PodGenerator")
101103
@mock.patch("airflow.executors.kubernetes_executor.KubeConfig")
102104
def test_get_base_pod_from_template(self, mock_kubeconfig, mock_generator):
105+
# Provide non-existent file path,
106+
# so None will be passed to deserialize_model_dict().
103107
pod_template_file_path = "/bar/biz"
104108
get_base_pod_from_template(pod_template_file_path, None)
105109
assert "deserialize_model_dict" == mock_generator.mock_calls[0][0]
106-
assert pod_template_file_path == mock_generator.mock_calls[0][1][0]
110+
assert mock_generator.mock_calls[0][1][0] is None
111+
107112
mock_kubeconfig.pod_template_file = "/foo/bar"
108113
get_base_pod_from_template(None, mock_kubeconfig)
109114
assert "deserialize_model_dict" == mock_generator.mock_calls[1][0]
110-
assert "/foo/bar" == mock_generator.mock_calls[1][1][0]
115+
assert mock_generator.mock_calls[1][1][0] is None
116+
117+
# Provide existent file path,
118+
# so loaded YAML file content should be used to call deserialize_model_dict(), rather than None.
119+
path = sys.path[0] + '/tests/kubernetes/pod.yaml'
120+
with open(path) as stream:
121+
expected_pod_dict = yaml.safe_load(stream)
122+
123+
pod_template_file_path = path
124+
get_base_pod_from_template(pod_template_file_path, None)
125+
assert "deserialize_model_dict" == mock_generator.mock_calls[2][0]
126+
assert mock_generator.mock_calls[2][1][0] == expected_pod_dict
127+
128+
mock_kubeconfig.pod_template_file = path
129+
get_base_pod_from_template(None, mock_kubeconfig)
130+
assert "deserialize_model_dict" == mock_generator.mock_calls[3][0]
131+
assert mock_generator.mock_calls[3][1][0] == expected_pod_dict
111132

112133
def test_make_safe_label_value(self):
113134
for dag_id, task_id in self._cases():
@@ -228,8 +249,6 @@ def test_run_next_exception_requeue(
228249
- 400 BadRequest is returned when your parameters are invalid e.g. asking for cpu=100ABC123.
229250
230251
"""
231-
import sys
232-
233252
path = sys.path[0] + '/tests/kubernetes/pod_generator_base_with_secrets.yaml'
234253

235254
response = HTTPResponse(body='{"message": "any message"}', status=status)
@@ -283,8 +302,6 @@ def test_run_next_pod_reconciliation_error(self, mock_get_kube_client, mock_kube
283302
"""
284303
When construct_pod raises PodReconciliationError, we should fail the task.
285304
"""
286-
import sys
287-
288305
path = sys.path[0] + '/tests/kubernetes/pod_generator_base_with_secrets.yaml'
289306

290307
mock_kube_client = mock.patch('kubernetes.client.CoreV1Api', autospec=True)

tests/kubernetes/test_pod_generator.py

Lines changed: 10 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -712,11 +712,20 @@ def test_reconcile_specs_init_containers(self):
712712
res = PodGenerator.reconcile_specs(base_spec, client_spec)
713713
assert res.init_containers == base_spec.init_containers + client_spec.init_containers
714714

715-
def test_deserialize_model_file(self):
715+
def test_deserialize_model_file(self, caplog):
716716
path = sys.path[0] + '/tests/kubernetes/pod.yaml'
717717
result = PodGenerator.deserialize_model_file(path)
718718
sanitized_res = self.k8s_client.sanitize_for_serialization(result)
719719
assert sanitized_res == self.deserialize_result
720+
assert len(caplog.records) == 0
721+
722+
def test_deserialize_non_existent_model_file(self, caplog):
723+
path = sys.path[0] + '/tests/kubernetes/non_existent.yaml'
724+
result = PodGenerator.deserialize_model_file(path)
725+
sanitized_res = self.k8s_client.sanitize_for_serialization(result)
726+
assert sanitized_res == {}
727+
assert len(caplog.records) == 1
728+
assert 'does not exist' in caplog.text
720729

721730
@parameterized.expand(
722731
(
@@ -761,29 +770,6 @@ def test_pod_name_is_valid(self, pod_id, expected_starts_with):
761770

762771
assert name.rsplit("-", 1)[0] == expected_starts_with
763772

764-
def test_deserialize_model_string(self):
765-
fixture = """
766-
apiVersion: v1
767-
kind: Pod
768-
metadata:
769-
name: memory-demo
770-
namespace: mem-example
771-
spec:
772-
containers:
773-
- name: memory-demo-ctr
774-
image: ghcr.io/apache/airflow-stress:1.0.4-2021.07.04
775-
resources:
776-
limits:
777-
memory: "200Mi"
778-
requests:
779-
memory: "100Mi"
780-
command: ["stress"]
781-
args: ["--vm", "1", "--vm-bytes", "150M", "--vm-hang", "1"]
782-
"""
783-
result = PodGenerator.deserialize_model_file(fixture)
784-
sanitized_res = self.k8s_client.sanitize_for_serialization(result)
785-
assert sanitized_res == self.deserialize_result
786-
787773
def test_validate_pod_generator(self):
788774
with pytest.raises(AirflowConfigException):
789775
PodGenerator(pod=k8s.V1Pod(), pod_template_file='k')

0 commit comments

Comments
 (0)