Skip to content
This repository was archived by the owner on Mar 13, 2022. It is now read-only.

Commit a9c0f20

Browse files
committed
Add dynamic client
1 parent 474e9fb commit a9c0f20

File tree

6 files changed

+1237
-0
lines changed

6 files changed

+1237
-0
lines changed

dynamic/__init__.py

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
from .client import * # NOQA

dynamic/apply.py

+95
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
from copy import deepcopy
2+
import json
3+
4+
from .exceptions import NotFoundError
5+
6+
LAST_APPLIED_CONFIG_ANNOTATION = 'kubectl.kubernetes.io/last-applied-configuration'
7+
8+
def apply(resource, definition):
9+
desired_annotation = dict(
10+
metadata=dict(
11+
annotations={
12+
LAST_APPLIED_CONFIG_ANNOTATION: json.dumps(definition, separators=(',', ':'), indent=None)
13+
}
14+
)
15+
)
16+
try:
17+
actual = resource.get(name=definition['metadata']['name'], namespace=definition['metadata'].get('namespace'))
18+
except NotFoundError:
19+
return resource.create(body=dict_merge(definition, desired_annotation), namespace=definition['metadata'].get('namespace'))
20+
last_applied = actual.metadata.get('annotations',{}).get(LAST_APPLIED_CONFIG_ANNOTATION)
21+
22+
if last_applied:
23+
last_applied = json.loads(last_applied)
24+
actual_dict = actual.to_dict()
25+
del actual_dict['metadata']['annotations'][LAST_APPLIED_CONFIG_ANNOTATION]
26+
patch = merge(last_applied, definition, actual_dict)
27+
if patch:
28+
return resource.patch(body=dict_merge(patch, desired_annotation),
29+
name=definition['metadata']['name'],
30+
namespace=definition['metadata'].get('namespace'),
31+
content_type='application/merge-patch+json')
32+
else:
33+
return actual
34+
else:
35+
return resource.patch(
36+
body=definition,
37+
name=definition['metadata']['name'],
38+
namespace=definition['metadata'].get('namespace'),
39+
content_type='application/merge-patch+json')
40+
41+
42+
# The patch is the difference from actual to desired without deletions, plus deletions
43+
# from last_applied to desired. To find it, we compute deletions, which are the deletions from
44+
# last_applied to desired, and delta, which is the difference from actual to desired without
45+
# deletions, and then apply delta to deletions as a patch, which should be strictly additive.
46+
def merge(last_applied, desired, actual):
47+
deletions = get_deletions(last_applied, desired)
48+
delta = get_delta(actual, desired)
49+
return dict_merge(deletions, delta)
50+
51+
52+
# dict_merge taken from Ansible's module_utils.common.dict_transformations
53+
def dict_merge(a, b):
54+
'''recursively merges dicts. not just simple a['key'] = b['key'], if
55+
both a and b have a key whose value is a dict then dict_merge is called
56+
on both values and the result stored in the returned dictionary.'''
57+
if not isinstance(b, dict):
58+
return b
59+
result = deepcopy(a)
60+
for k, v in b.items():
61+
if k in result and isinstance(result[k], dict):
62+
result[k] = dict_merge(result[k], v)
63+
else:
64+
result[k] = deepcopy(v)
65+
return result
66+
67+
68+
def get_deletions(last_applied, desired):
69+
patch = {}
70+
for k, last_applied_value in last_applied.items():
71+
desired_value = desired.get(k)
72+
if desired_value is None:
73+
patch[k] = None
74+
elif type(last_applied_value) != type(desired_value):
75+
patch[k] = desired_value
76+
elif isinstance(last_applied_value, dict):
77+
p = get_deletions(last_applied_value, desired_value)
78+
if p:
79+
patch[k] = p
80+
elif last_applied_value != desired_value:
81+
patch[k] = desired_value
82+
return patch
83+
84+
85+
def get_delta(actual, desired):
86+
patch = {}
87+
for k, desired_value in desired.items():
88+
actual_value = actual.get(k)
89+
if actual_value is None:
90+
patch[k] = desired_value
91+
elif isinstance(desired_value, dict):
92+
p = get_delta(actual_value, desired_value)
93+
if p:
94+
patch[k] = p
95+
return patch

dynamic/client.py

+274
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,274 @@
1+
import six
2+
import json
3+
4+
from kubernetes import watch
5+
from kubernetes.client.rest import ApiException
6+
7+
from .apply import apply
8+
from .discovery import EagerDiscoverer, LazyDiscoverer
9+
from .exceptions import api_exception, KubernetesValidateMissing
10+
from .resource import Resource, ResourceList, Subresource, ResourceInstance, ResourceField
11+
12+
try:
13+
import kubernetes_validate
14+
HAS_KUBERNETES_VALIDATE = True
15+
except ImportError:
16+
HAS_KUBERNETES_VALIDATE = False
17+
18+
try:
19+
from kubernetes_validate.utils import VersionNotSupportedError
20+
except ImportError:
21+
class VersionNotSupportedError(NotImplementedError):
22+
pass
23+
24+
__all__ = [
25+
'DynamicClient',
26+
'ResourceInstance',
27+
'Resource',
28+
'ResourceList',
29+
'Subresource',
30+
'EagerDiscoverer',
31+
'LazyDiscoverer',
32+
'ResourceField',
33+
]
34+
35+
36+
def meta_request(func):
37+
""" Handles parsing response structure and translating API Exceptions """
38+
def inner(self, *args, **kwargs):
39+
serialize_response = kwargs.pop('serialize', True)
40+
serializer = kwargs.pop('serializer', ResourceInstance)
41+
try:
42+
resp = func(self, *args, **kwargs)
43+
except ApiException as e:
44+
raise api_exception(e)
45+
if serialize_response:
46+
try:
47+
if six.PY2:
48+
return serializer(self, json.loads(resp.data))
49+
return serializer(self, json.loads(resp.data.decode('utf8')))
50+
except ValueError:
51+
if six.PY2:
52+
return resp.data
53+
return resp.data.decode('utf8')
54+
return resp
55+
56+
return inner
57+
58+
59+
class DynamicClient(object):
60+
""" A kubernetes client that dynamically discovers and interacts with
61+
the kubernetes API
62+
"""
63+
64+
def __init__(self, client, cache_file=None, discoverer=None):
65+
# Setting default here to delay evaluation of LazyDiscoverer class
66+
# until constructor is called
67+
discoverer = discoverer or LazyDiscoverer
68+
69+
self.client = client
70+
self.configuration = client.configuration
71+
self.__discoverer = discoverer(self, cache_file)
72+
73+
@property
74+
def resources(self):
75+
return self.__discoverer
76+
77+
@property
78+
def version(self):
79+
return self.__discoverer.version
80+
81+
def ensure_namespace(self, resource, namespace, body):
82+
namespace = namespace or body.get('metadata', {}).get('namespace')
83+
if not namespace:
84+
raise ValueError("Namespace is required for {}.{}".format(resource.group_version, resource.kind))
85+
return namespace
86+
87+
def serialize_body(self, body):
88+
if hasattr(body, 'to_dict'):
89+
return body.to_dict()
90+
return body or {}
91+
92+
def get(self, resource, name=None, namespace=None, **kwargs):
93+
path = resource.path(name=name, namespace=namespace)
94+
return self.request('get', path, **kwargs)
95+
96+
def create(self, resource, body=None, namespace=None, **kwargs):
97+
body = self.serialize_body(body)
98+
if resource.namespaced:
99+
namespace = self.ensure_namespace(resource, namespace, body)
100+
path = resource.path(namespace=namespace)
101+
return self.request('post', path, body=body, **kwargs)
102+
103+
def delete(self, resource, name=None, namespace=None, label_selector=None, field_selector=None, **kwargs):
104+
if not (name or label_selector or field_selector):
105+
raise ValueError("At least one of name|label_selector|field_selector is required")
106+
if resource.namespaced and not (label_selector or field_selector or namespace):
107+
raise ValueError("At least one of namespace|label_selector|field_selector is required")
108+
path = resource.path(name=name, namespace=namespace)
109+
return self.request('delete', path, label_selector=label_selector, field_selector=field_selector, **kwargs)
110+
111+
def replace(self, resource, body=None, name=None, namespace=None, **kwargs):
112+
body = self.serialize_body(body)
113+
name = name or body.get('metadata', {}).get('name')
114+
if not name:
115+
raise ValueError("name is required to replace {}.{}".format(resource.group_version, resource.kind))
116+
if resource.namespaced:
117+
namespace = self.ensure_namespace(resource, namespace, body)
118+
path = resource.path(name=name, namespace=namespace)
119+
return self.request('put', path, body=body, **kwargs)
120+
121+
def patch(self, resource, body=None, name=None, namespace=None, **kwargs):
122+
body = self.serialize_body(body)
123+
name = name or body.get('metadata', {}).get('name')
124+
if not name:
125+
raise ValueError("name is required to patch {}.{}".format(resource.group_version, resource.kind))
126+
if resource.namespaced:
127+
namespace = self.ensure_namespace(resource, namespace, body)
128+
129+
content_type = kwargs.pop('content_type', 'application/strategic-merge-patch+json')
130+
path = resource.path(name=name, namespace=namespace)
131+
132+
return self.request('patch', path, body=body, content_type=content_type, **kwargs)
133+
134+
def apply(self, resource, body=None, name=None, namespace=None):
135+
body = self.serialize_body(body)
136+
body['metadata'] = body.get('metadata', dict())
137+
name = name or body['metadata'].get('name')
138+
if not name:
139+
raise ValueError("name is required to apply {}.{}".format(resource.group_version, resource.kind))
140+
if resource.namespaced:
141+
body['metadata']['namespace'] = self.ensure_namespace(resource, namespace, body)
142+
return apply(resource, body)
143+
144+
def watch(self, resource, namespace=None, name=None, label_selector=None, field_selector=None, resource_version=None, timeout=None):
145+
"""
146+
Stream events for a resource from the Kubernetes API
147+
148+
:param resource: The API resource object that will be used to query the API
149+
:param namespace: The namespace to query
150+
:param name: The name of the resource instance to query
151+
:param label_selector: The label selector with which to filter results
152+
:param field_selector: The field selector with which to filter results
153+
:param resource_version: The version with which to filter results. Only events with
154+
a resource_version greater than this value will be returned
155+
:param timeout: The amount of time in seconds to wait before terminating the stream
156+
157+
:return: Event object with these keys:
158+
'type': The type of event such as "ADDED", "DELETED", etc.
159+
'raw_object': a dict representing the watched object.
160+
'object': A ResourceInstance wrapping raw_object.
161+
162+
Example:
163+
client = DynamicClient(k8s_client)
164+
v1_pods = client.resources.get(api_version='v1', kind='Pod')
165+
166+
for e in v1_pods.watch(resource_version=0, namespace=default, timeout=5):
167+
print(e['type'])
168+
print(e['object'].metadata)
169+
"""
170+
watcher = watch.Watch()
171+
for event in watcher.stream(
172+
resource.get,
173+
namespace=namespace,
174+
name=name,
175+
field_selector=field_selector,
176+
label_selector=label_selector,
177+
resource_version=resource_version,
178+
serialize=False,
179+
timeout_seconds=timeout
180+
):
181+
event['object'] = ResourceInstance(resource, event['object'])
182+
yield event
183+
184+
@meta_request
185+
def request(self, method, path, body=None, **params):
186+
if not path.startswith('/'):
187+
path = '/' + path
188+
189+
path_params = params.get('path_params', {})
190+
query_params = params.get('query_params', [])
191+
if params.get('pretty') is not None:
192+
query_params.append(('pretty', params['pretty']))
193+
if params.get('_continue') is not None:
194+
query_params.append(('continue', params['_continue']))
195+
if params.get('include_uninitialized') is not None:
196+
query_params.append(('includeUninitialized', params['include_uninitialized']))
197+
if params.get('field_selector') is not None:
198+
query_params.append(('fieldSelector', params['field_selector']))
199+
if params.get('label_selector') is not None:
200+
query_params.append(('labelSelector', params['label_selector']))
201+
if params.get('limit') is not None:
202+
query_params.append(('limit', params['limit']))
203+
if params.get('resource_version') is not None:
204+
query_params.append(('resourceVersion', params['resource_version']))
205+
if params.get('timeout_seconds') is not None:
206+
query_params.append(('timeoutSeconds', params['timeout_seconds']))
207+
if params.get('watch') is not None:
208+
query_params.append(('watch', params['watch']))
209+
210+
header_params = params.get('header_params', {})
211+
form_params = []
212+
local_var_files = {}
213+
# HTTP header `Accept`
214+
header_params['Accept'] = self.client.select_header_accept([
215+
'application/json',
216+
'application/yaml',
217+
'application/vnd.kubernetes.protobuf'
218+
])
219+
220+
# HTTP header `Content-Type`
221+
if params.get('content_type'):
222+
header_params['Content-Type'] = params['content_type']
223+
else:
224+
header_params['Content-Type'] = self.client.select_header_content_type(['*/*'])
225+
226+
# Authentication setting
227+
auth_settings = ['BearerToken']
228+
229+
return self.client.call_api(
230+
path,
231+
method.upper(),
232+
path_params,
233+
query_params,
234+
header_params,
235+
body=body,
236+
post_params=form_params,
237+
async_req=params.get('async_req'),
238+
files=local_var_files,
239+
auth_settings=auth_settings,
240+
_preload_content=False,
241+
_return_http_data_only=params.get('_return_http_data_only', True)
242+
)
243+
244+
def validate(self, definition, version=None, strict=False):
245+
"""validate checks a kubernetes resource definition
246+
247+
Args:
248+
definition (dict): resource definition
249+
version (str): version of kubernetes to validate against
250+
strict (bool): whether unexpected additional properties should be considered errors
251+
252+
Returns:
253+
warnings (list), errors (list): warnings are missing validations, errors are validation failures
254+
"""
255+
if not HAS_KUBERNETES_VALIDATE:
256+
raise KubernetesValidateMissing()
257+
258+
errors = list()
259+
warnings = list()
260+
try:
261+
if version is None:
262+
try:
263+
version = self.version['kubernetes']['gitVersion']
264+
except KeyError:
265+
version = kubernetes_validate.latest_version()
266+
kubernetes_validate.validate(definition, version, strict)
267+
except kubernetes_validate.utils.ValidationError as e:
268+
errors.append("resource definition validation error at %s: %s" % ('.'.join([str(item) for item in e.path]), e.message)) # noqa: B306
269+
except VersionNotSupportedError:
270+
errors.append("Kubernetes version %s is not supported by kubernetes-validate" % version)
271+
except kubernetes_validate.utils.SchemaNotFoundError as e:
272+
warnings.append("Could not find schema for object kind %s with API version %s in Kubernetes version %s (possibly Custom Resource?)" %
273+
(e.kind, e.api_version, e.version))
274+
return warnings, errors

0 commit comments

Comments
 (0)