|
4 | 4 | import threading
|
5 | 5 | import unittest
|
6 | 6 |
|
| 7 | +import httpretty |
7 | 8 | import pytest
|
8 | 9 | from urllib3.exceptions import NewConnectionError, HTTPError
|
9 | 10 |
|
@@ -175,11 +176,6 @@ def test_timeout_as_float(self):
|
175 | 176 | write_api.write(bucket="my-bucket", org="my-org", record="mem,tag=a value=1")
|
176 | 177 | self.assertIn("Failed to establish a new connection", str(e.value))
|
177 | 178 |
|
178 |
| - def test_init_without_token(self): |
179 |
| - self.client = InfluxDBClient("http://localhost:8086") |
180 |
| - self.assertIsNotNone(self.client) |
181 |
| - self.client.query_api().query("buckets()", "my-org") |
182 |
| - |
183 | 179 |
|
184 | 180 | class InfluxDBClientTestIT(BaseTest):
|
185 | 181 | httpRequest = []
|
@@ -249,6 +245,24 @@ def do_GET(self):
|
249 | 245 | self.httpd_thread.start()
|
250 | 246 |
|
251 | 247 |
|
| 248 | +class InfluxDBClientTestMock(unittest.TestCase): |
| 249 | + |
| 250 | + def setUp(self) -> None: |
| 251 | + httpretty.enable() |
| 252 | + httpretty.reset() |
| 253 | + |
| 254 | + def tearDown(self) -> None: |
| 255 | + if self.influxdb_client: |
| 256 | + self.influxdb_client.close() |
| 257 | + httpretty.disable() |
| 258 | + |
| 259 | + def test_init_without_token(self): |
| 260 | + httpretty.register_uri(httpretty.POST, uri="http://localhost/api/v2/query", status=200, body="") |
| 261 | + self.influxdb_client = InfluxDBClient("http://localhost") |
| 262 | + self.assertIsNotNone(self.influxdb_client) |
| 263 | + self.influxdb_client.query_api().query("buckets()", "my-org") |
| 264 | + |
| 265 | + |
252 | 266 | class ServerWithSelfSingedSSL(http.server.SimpleHTTPRequestHandler):
|
253 | 267 | def _set_headers(self):
|
254 | 268 | self.send_response(200)
|
|
0 commit comments