1
+ # -*- coding: utf-8 -*-
2
+
3
+ # Licensed under the Apache License, Version 2.0 (the "License"); you may
4
+ # not use this file except in compliance with the License. You may obtain
5
+ # a copy of the License at
6
+ #
7
+ # http://www.apache.org/licenses/LICENSE-2.0
8
+ #
9
+ # Unless required by applicable law or agreed to in writing, software
10
+ # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11
+ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12
+ # License for the specific language governing permissions and limitations
13
+ # under the License.
14
+
15
+ """
16
+ test_k8sclient
17
+ ----------------------------------
18
+
19
+ Tests for `k8sclient` module. Deploy Kubernetes using:
20
+ http://kubernetes.io/docs/getting-started-guides/docker/
21
+
22
+ and then run this test
23
+ """
24
+
25
+ import unittest
26
+ import urllib3
27
+ import uuid
28
+
29
+ from kubernetes .client import api_client
30
+ from kubernetes .client .apis import core_v1_api
31
+
32
+
33
+ def _is_k8s_running ():
34
+ try :
35
+ urllib3 .PoolManager ().request ('GET' , '127.0.0.1:8080' )
36
+ return True
37
+ except urllib3 .exceptions .HTTPError :
38
+ return False
39
+
40
+
41
+ class TestK8sclient (unittest .TestCase ):
42
+ @unittest .skipUnless (
43
+ _is_k8s_running (), "Kubernetes is not available" )
44
+ def test_list_endpoints (self ):
45
+ client = api_client .ApiClient ('http://127.0.0.1:8080/' )
46
+ api = core_v1_api .CoreV1Api (client )
47
+
48
+ endpoints = api .list_endpoints_for_all_namespaces ()
49
+ self .assertTrue (len (endpoints .items ) > 0 )
50
+
51
+ @unittest .skipUnless (
52
+ _is_k8s_running (), "Kubernetes is not available" )
53
+ def test_pod_apis (self ):
54
+ client = api_client .ApiClient ('http://127.0.0.1:8080/' )
55
+ api = core_v1_api .CoreV1Api (client )
56
+
57
+ name = 'test-' + str (uuid .uuid4 ())
58
+
59
+ pod_manifest = {'apiVersion' : 'v1' ,
60
+ 'kind' : 'Pod' ,
61
+ 'metadata' : {'color' : 'blue' , 'name' : name },
62
+ 'spec' : {'containers' : [{'image' : 'dockerfile/redis' ,
63
+ 'name' : 'redis' }]}}
64
+
65
+ resp = api .create_namespaced_pod (body = pod_manifest ,
66
+ namespace = 'default' )
67
+ self .assertEqual (name , resp .metadata .name )
68
+ self .assertTrue (resp .status .phase )
69
+
70
+ resp = api .read_namespaced_pod (name = name ,
71
+ namespace = 'default' )
72
+ self .assertEqual (name , resp .metadata .name )
73
+ self .assertTrue (resp .status .phase )
74
+
75
+ number_of_pods = len (api .list_pod_for_all_namespaces ().items )
76
+ self .assertTrue (number_of_pods > 0 )
77
+
78
+ resp = api .delete_namespaced_pod (name = name , body = {},
79
+ namespace = 'default' )
80
+
81
+ @unittest .skipUnless (
82
+ _is_k8s_running (), "Kubernetes is not available" )
83
+ def test_service_apis (self ):
84
+ client = api_client .ApiClient ('http://127.0.0.1:8080/' )
85
+ api = core_v1_api .CoreV1Api (client )
86
+
87
+ service_manifest = {'apiVersion' : 'v1' ,
88
+ 'kind' : 'Service' ,
89
+ 'metadata' : {'labels' : {'name' : 'frontend' },
90
+ 'name' : 'frontend' ,
91
+ 'resourceversion' : 'v1' },
92
+ 'spec' : {'ports' : [{'name' : 'port' ,
93
+ 'port' : 80 ,
94
+ 'protocol' : 'TCP' ,
95
+ 'targetPort' : 80 }],
96
+ 'selector' : {'name' : 'frontend' }}}
97
+
98
+ resp = api .create_namespaced_service (body = service_manifest ,
99
+ namespace = 'default' )
100
+ self .assertEqual ('frontend' , resp .metadata .name )
101
+ self .assertTrue (resp .status )
102
+
103
+ resp = api .read_namespaced_service (name = 'frontend' ,
104
+ namespace = 'default' )
105
+ self .assertEqual ('frontend' , resp .metadata .name )
106
+ self .assertTrue (resp .status )
107
+
108
+ # TODO(dims) : Fails with "json: cannot unmarshal object into
109
+ # Go value of type jsonpatch.Patch"
110
+ # service_manifest['spec']['ports'] = [{'name': 'new',
111
+ # 'port': 8080,
112
+ # 'protocol': 'TCP',
113
+ # 'targetPort': 8080}]
114
+ # resp = api.patch_namespaced_service(body=service_manifest,
115
+ # name='frontend',
116
+ # namespace='default')
117
+ # self.assertEqual(2, len(resp.spec.ports))
118
+ # self.assertTrue(resp.status)
119
+
120
+ resp = api .delete_namespaced_service (name = 'frontend' ,
121
+ namespace = 'default' )
122
+
123
+ @unittest .skipUnless (
124
+ _is_k8s_running (), "Kubernetes is not available" )
125
+ def test_replication_controller_apis (self ):
126
+ client = api_client .ApiClient ('http://127.0.0.1:8080/' )
127
+ api = core_v1_api .CoreV1Api (client )
128
+
129
+ rc_manifest = {
130
+ 'apiVersion' : 'v1' ,
131
+ 'kind' : 'ReplicationController' ,
132
+ 'metadata' : {'labels' : {'name' : 'frontend' },
133
+ 'name' : 'frontend' },
134
+ 'spec' : {'replicas' : 2 ,
135
+ 'selector' : {'name' : 'frontend' },
136
+ 'template' : {'metadata' : {
137
+ 'labels' : {'name' : 'frontend' }},
138
+ 'spec' : {'containers' : [{
139
+ 'image' : 'nginx' ,
140
+ 'name' : 'nginx' ,
141
+ 'ports' : [{'containerPort' : 80 ,
142
+ 'protocol' : 'TCP' }]}]}}}}
143
+
144
+ resp = api .create_namespaced_replication_controller (
145
+ body = rc_manifest , namespace = 'default' )
146
+ self .assertEqual ('frontend' , resp .metadata .name )
147
+ self .assertEqual (2 , resp .spec .replicas )
148
+
149
+ resp = api .read_namespaced_replication_controller (
150
+ name = 'frontend' , namespace = 'default' )
151
+ self .assertEqual ('frontend' , resp .metadata .name )
152
+ self .assertEqual (2 , resp .spec .replicas )
153
+
154
+ resp = api .delete_namespaced_replication_controller (
155
+ name = 'frontend' , body = {}, namespace = 'default' )
156
+
157
+
158
+ @unittest .skipUnless (
159
+ _is_k8s_running (), "Kubernetes is not available" )
160
+ def test_configmap_apis (self ):
161
+ client = api_client .ApiClient ('http://127.0.0.1:8080/' )
162
+ api = core_v1_api .CoreV1Api (client )
163
+
164
+ test_configmap = {
165
+ "kind" : "ConfigMap" ,
166
+ "apiVersion" : "v1" ,
167
+ "metadata" : {
168
+ "name" : "test-configmap" ,
169
+ },
170
+ "data" : {
171
+ "config.json" : "{\" command\" :\" /usr/bin/mysqld_safe\" }" ,
172
+ "frontend.cnf" : "[mysqld]\n bind-address = 10.0.0.3\n port = 3306\n "
173
+ }
174
+ }
175
+
176
+ resp = api .create_namespaced_config_map (
177
+ body = test_configmap , namespace = 'default'
178
+ )
179
+ self .assertEqual ('test-configmap' , resp .metadata .name )
180
+
181
+ resp = api .read_namespaced_config_map (
182
+ name = 'test-configmap' , namespace = 'default' )
183
+ self .assertEqual ('test-configmap' , resp .metadata .name )
184
+
185
+ # TODO(dims): Fails with "json: cannot unmarshal object
186
+ # into Go value of type jsonpatch.Patch"
187
+ # test_configmap['data']['config.json'] = "{}"
188
+ # resp = api.patch_namespaced_config_map(
189
+ # name='test-configmap', namespace='default', body=test_configmap)
190
+
191
+ resp = api .delete_namespaced_config_map (
192
+ name = 'test-configmap' , body = {}, namespace = 'default' )
193
+
194
+
195
+ @unittest .skipUnless (
196
+ _is_k8s_running (), "Kubernetes is not available" )
197
+ def test_node_apis (self ):
198
+ client = api_client .ApiClient ('http://127.0.0.1:8080/' )
199
+ api = core_v1_api .CoreV1Api (client )
200
+
201
+ for item in api .list_node ().items :
202
+ node = api .read_node (name = item .metadata .name )
203
+ self .assertTrue (len (node .metadata .labels ) > 0 )
204
+ self .assertTrue (isinstance (node .metadata .labels , dict ))
0 commit comments