Skip to content

Commit a8f3ab4

Browse files
committed
Patch kubernetes Api Client to work around lock contention issues
Unclear whether the benefits are worth the risk of breaking changes here, but this is a workaround for the signifcant lock contention issues with the k8s client surfaced in #23933 (reply in thread). This appears to be a known issue with the k8s python client - see kubernetes-client/python#2284. Test Plan: BK
1 parent 3bf5c78 commit a8f3ab4

File tree

1 file changed

+45
-2
lines changed
  • python_modules/libraries/dagster-k8s/dagster_k8s

1 file changed

+45
-2
lines changed

python_modules/libraries/dagster-k8s/dagster_k8s/client.py

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,14 @@
66

77
import kubernetes.client
88
import kubernetes.client.rest
9+
import six
910
from dagster import (
1011
DagsterInstance,
1112
_check as check,
1213
)
1314
from dagster._core.storage.dagster_run import DagsterRunStatus
15+
from kubernetes.client.api_client import ApiClient
16+
from kubernetes.client.configuration import Configuration
1417
from kubernetes.client.models import V1Job, V1JobStatus
1518

1619
try:
@@ -91,6 +94,42 @@ class DagsterK8sJobStatusException(Exception):
9194
]
9295

9396

97+
shared_k8s_model_configuration = Configuration()
98+
99+
100+
class PatchedApiClient(ApiClient):
101+
# Forked from ApiClient implementation to pass configuration object down into created model
102+
# objects, avoiding lock contention issues. See https://github.com/kubernetes-client/python/issues/2284
103+
def __deserialize_model(self, data, klass):
104+
"""Deserializes list or dict to model.
105+
106+
:param data: dict, list.
107+
:param klass: class literal.
108+
:return: model object.
109+
"""
110+
if not klass.openapi_types and not hasattr(klass, "get_real_child_model"):
111+
return data
112+
113+
# Below is the only change from the base ApiClient implementation - pass through the
114+
# Configuration object to each newly created model so that each one does not have to create
115+
# one and acquire a lock
116+
kwargs = {"local_vars_configuration": shared_k8s_model_configuration}
117+
118+
if data is not None and klass.openapi_types is not None and isinstance(data, (list, dict)):
119+
for attr, attr_type in six.iteritems(klass.openapi_types):
120+
if klass.attribute_map[attr] in data:
121+
value = data[klass.attribute_map[attr]]
122+
kwargs[attr] = self.__deserialize(value, attr_type)
123+
124+
instance = klass(**kwargs)
125+
126+
if hasattr(instance, "get_real_child_model"):
127+
klass_name = instance.get_real_child_model(data)
128+
if klass_name:
129+
instance = self.__deserialize(data, klass_name)
130+
return instance
131+
132+
94133
def k8s_api_retry(
95134
fn: Callable[..., T],
96135
max_retries: int,
@@ -209,8 +248,12 @@ def __init__(self, batch_api, core_api, logger, sleeper, timer):
209248
@staticmethod
210249
def production_client(batch_api_override=None, core_api_override=None):
211250
return DagsterKubernetesClient(
212-
batch_api=batch_api_override or kubernetes.client.BatchV1Api(),
213-
core_api=core_api_override or kubernetes.client.CoreV1Api(),
251+
batch_api=(
252+
batch_api_override or kubernetes.client.BatchV1Api(api_client=PatchedApiClient())
253+
),
254+
core_api=(
255+
core_api_override or kubernetes.client.CoreV1Api(api_client=PatchedApiClient())
256+
),
214257
logger=logging.info,
215258
sleeper=time.sleep,
216259
timer=time.time,

0 commit comments

Comments
 (0)