Skip to content

Commit 4bb4363

Browse files
committed
feat: (WIP) adds example error_handling.py and tests of headers in error types
1 parent 45e6607 commit 4bb4363

File tree

4 files changed

+145
-0
lines changed

4 files changed

+145
-0
lines changed

examples/error_handling.py

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
import asyncio
2+
import time
3+
from datetime import datetime, timezone
4+
5+
from influxdb_client.client.exceptions import InfluxDBError
6+
7+
from influxdb_client.rest import ApiException
8+
9+
from influxdb_client.client.write_api import SYNCHRONOUS
10+
11+
from influxdb_client import WritePrecision, InfluxDBClient, Point
12+
from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync
13+
14+
15+
class BatchCB(object):
16+
17+
@staticmethod
18+
def success(self, conf: (str, str, str), data: str):
19+
print(f"Write success: {conf}, data: {data}")
20+
21+
@staticmethod
22+
def error(self, conf: (str, str, str), data: str, exception: InfluxDBError):
23+
print(f"Write failed: {conf}, data: {data}, error: {exception.message}")
24+
print(f" Date: {exception.headers.get("Date")}")
25+
print(f" X-Influxdb-Build: {exception.headers.get("X-Influxdb-Build")}")
26+
print(f" X-Influxdb-Version: {exception.headers.get("X-Influxdb-Version")}")
27+
print(f" X-Platform-Error-Code: {exception.headers.get("X-Platform-Error-Code")}")
28+
print(f" Retry-After: {exception.headers.get("Retry-After")}")
29+
30+
@staticmethod
31+
def retry(self, conf: (str, str, str), data: str, exception: InfluxDBError):
32+
print(f"Write failed but retryable: {conf}, data: {data}, error: {exception}")
33+
34+
35+
def report_ping(ping: bool):
36+
if not ping:
37+
raise ValueError("InfluxDB: Failed to ping server")
38+
else:
39+
print("InfluxDB: ready")
40+
41+
42+
def use_sync():
43+
print("Using sync")
44+
with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client:
45+
report_ping(client.ping())
46+
try:
47+
client.write_api(write_options=SYNCHRONOUS).write(bucket="my-bucket", record="cpu,location=G4 usage=")
48+
except ApiException as ae:
49+
print("\nCaught ae: ", ae.message)
50+
print(" Date: ", ae.headers.get("Date"))
51+
print(" X-Influxdb-Build: ", ae.headers.get("X-Influxdb-Build"))
52+
print(" X-Influxdb-Version: ", ae.headers.get("X-Influxdb-Version"))
53+
print(" X-Platform-Error-Code: ", ae.headers.get("X-Platform-Error-Code"))
54+
print(" Retry-After: ", ae.headers.get("Retry-After")) # Should be None
55+
56+
print("Sync write done")
57+
58+
59+
def use_batch():
60+
print("Using batch")
61+
with InfluxDBClient(url="http://localhost:8086", token="my-token", org="my-org") as client:
62+
cb = BatchCB()
63+
with client.write_api(success_callback=cb.success,
64+
error_callback=cb.error,
65+
retry_callback=cb.retry) as write_api:
66+
write_api.write(bucket="my-bucket", record="cpu,location=G9 usage=")
67+
print("Batch write sent")
68+
69+
70+
async def use_async():
71+
print("Using async")
72+
async with InfluxDBClientAsync(url="http://localhost:8086", token="my-token", org="my-org") as client:
73+
report_ping(await client.ping())
74+
try:
75+
await client.write_api().write(bucket="my-bucket", record="cpu,location=G7 usage=")
76+
except InfluxDBError as ie:
77+
print("\nCaught ie: ", ie)
78+
print("Async write done")
79+
80+
81+
if __name__ == "__main__":
82+
use_sync()
83+
print("\n Continuing...")
84+
use_batch()
85+
print("\n Continuing...")
86+
asyncio.run(use_async())

influxdb_client/client/exceptions.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,10 @@ def __init__(self, response: HTTPResponse = None, message: str = None):
1616
self.response = response
1717
self.message = self._get_message(response)
1818
if isinstance(response, HTTPResponse): # response is HTTPResponse
19+
self.headers = response.headers
1920
self.retry_after = response.headers.get('Retry-After')
2021
else: # response is RESTResponse
22+
self.headers = response.getheaders()
2123
self.retry_after = response.getheader('Retry-After')
2224
else:
2325
self.response = None

tests/test_InfluxDBClientAsync.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import asyncio
22
import logging
3+
import re
34
import unittest
45
import os
56
from datetime import datetime, timezone
@@ -389,6 +390,25 @@ async def test_query_exception_propagation(self):
389390
with pytest.raises(InfluxDBError) as e:
390391
await self.client.query_api().query("buckets()", "my-org")
391392
self.assertEqual("unauthorized access", e.value.message)
393+
print("DEBUG e.headers: ", e.value.headers)
394+
395+
@async_test
396+
async def test_write_exception_propagation(self):
397+
await self.client.close()
398+
self.client = InfluxDBClientAsync(url="http://localhost:8086", token="wrong", org="my-org")
399+
400+
with pytest.raises(InfluxDBError) as e:
401+
await self.client.write_api().write(bucket="my_bucket",
402+
record="temperature,location=hic cels=")
403+
self.assertEqual("unauthorized access", e.value.message)
404+
headers = e.value.headers
405+
self.assertIsNotNone(headers)
406+
self.assertIsNotNone(headers.get("Content-Length"))
407+
self.assertIsNotNone(headers.get("Date"))
408+
self.assertIsNotNone(headers.get("X-Platform-Error-Code"))
409+
self.assertIn("application/json", headers.get("Content-Type"))
410+
self.assertTrue(re.compile("^v.*").match(headers.get("X-Influxdb-Version")))
411+
self.assertEqual("OSS", headers.get("X-Influxdb-Build"))
392412

393413
@async_test
394414
@aioresponses()

tests/test_WriteApi.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,16 @@
33
from __future__ import absolute_import
44

55
import datetime
6+
import json
7+
import logging
68
import os
9+
import re
710
import sys
811
import unittest
912
from collections import namedtuple
1013
from datetime import timedelta
1114
from multiprocessing.pool import ApplyResult
15+
from types import SimpleNamespace
1216

1317
import httpretty
1418
import pytest
@@ -190,6 +194,17 @@ def test_write_error(self):
190194

191195
self.assertEqual(400, exception.status)
192196
self.assertEqual("Bad Request", exception.reason)
197+
# assert headers
198+
self.assertIsNotNone(exception.headers)
199+
self.assertIsNotNone(exception.headers.get("Content-Length"))
200+
self.assertIsNotNone(exception.headers.get("Date"))
201+
self.assertIsNotNone(exception.headers.get("X-Platform-Error-Code"))
202+
self.assertIn("application/json", exception.headers.get("Content-Type"))
203+
self.assertTrue(re.compile("^v.*").match(exception.headers.get("X-Influxdb-Version")))
204+
self.assertEqual("OSS", exception.headers.get("X-Influxdb-Build"))
205+
# assert body
206+
b = json.loads(exception.body, object_hook=lambda d: SimpleNamespace(**d))
207+
self.assertTrue(re.compile("^unable to parse.*invalid field format").match(b.message))
193208

194209
def test_write_dictionary(self):
195210
_bucket = self.create_test_bucket()
@@ -609,6 +624,28 @@ def test_write_result(self):
609624
self.assertEqual(None, result.get())
610625
self.delete_test_bucket(_bucket)
611626

627+
def test_write_error(self):
628+
_bucket = self.create_test_bucket()
629+
630+
_record = "h2o_feet,location=coyote_creek level\\ water_level="
631+
result = self.write_client.write(_bucket.name, self.org, _record)
632+
633+
with self.assertRaises(ApiException) as cm:
634+
result.get()
635+
self.assertEqual(400, cm.exception.status)
636+
self.assertEqual("Bad Request", cm.exception.reason)
637+
# assert headers
638+
self.assertIsNotNone(cm.exception.headers)
639+
self.assertIsNotNone(cm.exception.headers.get("Content-Length"))
640+
self.assertIsNotNone(cm.exception.headers.get("Date"))
641+
self.assertIsNotNone(cm.exception.headers.get("X-Platform-Error-Code"))
642+
self.assertIn("application/json", cm.exception.headers.get("Content-Type"))
643+
self.assertTrue(re.compile("^v.*").match(cm.exception.headers.get("X-Influxdb-Version")))
644+
self.assertEqual("OSS", cm.exception.headers.get("X-Influxdb-Build"))
645+
# assert body
646+
b = json.loads(cm.exception.body, object_hook=lambda d: SimpleNamespace(**d))
647+
self.assertTrue(re.compile("^unable to parse.*missing field value").match(b.message))
648+
612649
def test_write_dictionaries(self):
613650
bucket = self.create_test_bucket()
614651

0 commit comments

Comments
 (0)