Skip to content

Commit c0a4f62

Browse files
authored
xDS interop: Refresh k8s auth on 401 to pick up updated auth token (#32210)
This PR adds retries on create/get requests from the test driver to the K8s API when 401 Unauthorized error is encountered. K8S python library expects the ApiClient to be cycled on auth token refreshes. The problem is described in kubernetes-client/python#741. Currently we don't have any hypotheses why we weren't affected by this problem before. To force the ApiClient to pick up the new credentials, I shut down the current client, create a new one, and replace api_client properties on all k8s APIs we manage. This should also work with the Watch-based log collector recovering from an error. To support that, I replace default Configuration so that the next time Watch creates ApiClient implicitly, the Configuration with updated token will be used.
1 parent cf9d1c6 commit c0a4f62

File tree

2 files changed

+67
-18
lines changed

2 files changed

+67
-18
lines changed

framework/infrastructure/k8s.py

Lines changed: 46 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
# added to get around circular dependencies caused by k8s.py clashing with
1616
# k8s/__init__.py
1717
import datetime
18-
import functools
1918
import json
2019
import logging
2120
import pathlib
@@ -33,51 +32,78 @@
3332
from framework.infrastructure.k8s_internal import k8s_port_forwarder
3433

3534
logger = logging.getLogger(__name__)
35+
3636
# Type aliases
3737
_HighlighterYaml = framework.helpers.highlighter.HighlighterYaml
3838
PodLogCollector = k8s_log_collector.PodLogCollector
3939
PortForwarder = k8s_port_forwarder.PortForwarder
40+
ApiClient = client.ApiClient
4041
V1Deployment = client.V1Deployment
4142
V1ServiceAccount = client.V1ServiceAccount
4243
V1Pod = client.V1Pod
4344
V1PodList = client.V1PodList
4445
V1Service = client.V1Service
4546
V1Namespace = client.V1Namespace
4647
ApiException = client.ApiException
48+
FailToCreateError = utils.FailToCreateError
4749

4850

49-
def simple_resource_get(func):
51+
def _simple_resource_get(func):
5052

51-
def wrap_not_found_return_none(*args, **kwargs):
53+
def _wrap_simple_resource_get(self: 'KubernetesNamespace', *args, **kwargs):
5254
try:
53-
return func(*args, **kwargs)
54-
except client.ApiException as e:
55+
return func(self, *args, **kwargs)
56+
except ApiException as e:
5557
if e.status == 404:
56-
# Ignore 404
58+
# Instead of trowing an error when a resource doesn't exist,
59+
# just return None.
5760
return None
61+
elif e.status == 401:
62+
# 401 Unauthorized: token might be expired, attempt auth refresh
63+
self.refresh_auth()
64+
return func(self, *args, **kwargs)
65+
# Reraise for anything else.
5866
raise
5967

60-
return wrap_not_found_return_none
68+
return _wrap_simple_resource_get
6169

6270

6371
def label_dict_to_selector(labels: dict) -> str:
6472
return ','.join(f'{k}=={v}' for k, v in labels.items())
6573

6674

6775
class KubernetesApiManager:
76+
_client: ApiClient
77+
context: str
78+
apps: client.AppsV1Api
79+
core: client.CoreV1Api
80+
_apis: set
6881

69-
def __init__(self, context):
82+
def __init__(self, context: str):
7083
self.context = context
71-
self.client = self._cached_api_client_for_context(context)
84+
self._client = self._new_client_from_context(context)
7285
self.apps = client.AppsV1Api(self.client)
7386
self.core = client.CoreV1Api(self.client)
87+
self._apis = {self.apps, self.core}
88+
89+
@property
90+
def client(self) -> ApiClient:
91+
return self._client
7492

7593
def close(self):
7694
self.client.close()
7795

78-
@classmethod
79-
@functools.lru_cache(None)
80-
def _cached_api_client_for_context(cls, context: str) -> client.ApiClient:
96+
def reload(self):
97+
self.close()
98+
self._client = self._new_client_from_context(self.context)
99+
# Update default configuration so that modules that initialize
100+
# ApiClient implicitly (e.g. kubernetes.watch.Watch) get the updates.
101+
client.Configuration.set_default(self._client.configuration)
102+
for api in self._apis:
103+
api.api_client = self._client
104+
105+
@staticmethod
106+
def _new_client_from_context(context: str) -> ApiClient:
81107
client_instance = kubernetes.config.new_client_from_config(
82108
context=context)
83109
logger.info('Using kubernetes context "%s", active host: %s', context,
@@ -101,16 +127,20 @@ def __init__(self, api: KubernetesApiManager, name: str):
101127
self.name = name
102128
self.api = api
103129

130+
def refresh_auth(self):
131+
logger.info('Reloading k8s api client to refresh the auth.')
132+
self.api.reload()
133+
104134
def apply_manifest(self, manifest):
105135
return utils.create_from_dict(self.api.client,
106136
manifest,
107137
namespace=self.name)
108138

109-
@simple_resource_get
139+
@_simple_resource_get
110140
def get_service(self, name) -> V1Service:
111141
return self.api.core.read_namespaced_service(name, self.name)
112142

113-
@simple_resource_get
143+
@_simple_resource_get
114144
def get_service_account(self, name) -> V1Service:
115145
return self.api.core.read_namespaced_service_account(name, self.name)
116146

@@ -134,7 +164,7 @@ def delete_service_account(self,
134164
propagation_policy='Foreground',
135165
grace_period_seconds=grace_period_seconds))
136166

137-
@simple_resource_get
167+
@_simple_resource_get
138168
def get(self) -> V1Namespace:
139169
return self.api.core.read_namespace(self.name)
140170

@@ -202,7 +232,7 @@ def get_service_neg(self, service_name: str,
202232
neg_zones: List[str] = neg_info['zones']
203233
return neg_name, neg_zones
204234

205-
@simple_resource_get
235+
@_simple_resource_get
206236
def get_deployment(self, name) -> V1Deployment:
207237
return self.api.apps.read_namespaced_deployment(name, self.name)
208238

framework/test_app/runners/k8s/k8s_base_runner.py

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ def _template_file_from_name(cls, template_name):
121121
cls.TEMPLATE_DIR_RELATIVE_PATH)
122122
return templates_path.joinpath(template_name).resolve()
123123

124-
def _create_from_template(self, template_name, **kwargs):
124+
def _create_from_template(self, template_name, **kwargs) -> object:
125125
template_file = self._template_file_from_name(template_name)
126126
logger.debug("Loading k8s manifest template: %s", template_file)
127127

@@ -135,7 +135,26 @@ def _create_from_template(self, template_name, **kwargs):
135135
if next(manifests, False):
136136
raise _RunnerError('Exactly one document expected in manifest '
137137
f'{template_file}')
138-
k8s_objects = self.k8s_namespace.apply_manifest(manifest)
138+
139+
# TODO(sergiitk, b/178378578): add a retryer.
140+
try:
141+
k8s_objects = self.k8s_namespace.apply_manifest(manifest)
142+
except k8s.FailToCreateError as err_create:
143+
# Since we verified this is not a multi-doc yaml, we should
144+
# expect a single exception. Otherwise, something went horribly
145+
# wrong, or API promises got broken.
146+
if len(err_create.api_exceptions) != 1:
147+
raise
148+
149+
api_exception: k8s.ApiException = err_create.api_exceptions[0]
150+
if api_exception.status == 401:
151+
# 401 Unauthorized: token might be expired, attempt auth refresh
152+
self.k8s_namespace.refresh_auth()
153+
k8s_objects = self.k8s_namespace.apply_manifest(manifest)
154+
else:
155+
# Reraise for anything else.
156+
raise
157+
139158
if len(k8s_objects) != 1:
140159
raise _RunnerError('Expected exactly one object must created from '
141160
f'manifest {template_file}')

0 commit comments

Comments
 (0)