Skip to content
This repository was archived by the owner on Oct 29, 2024. It is now read-only.

Commit edd52c0

Browse files
committed
Add support for messagepack
1 parent d5d1249 commit edd52c0

File tree

3 files changed

+63
-7
lines changed

3 files changed

+63
-7
lines changed

influxdb/client.py

Lines changed: 38 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,10 @@
1010
import random
1111

1212
import json
13+
import struct
14+
import datetime
1315
import socket
16+
import msgpack
1417
import requests
1518
import requests.exceptions
1619
from six.moves import xrange
@@ -21,6 +24,13 @@
2124
from .exceptions import InfluxDBClientError
2225
from .exceptions import InfluxDBServerError
2326

27+
def _msgpack_ext_hook(code, data):
28+
if code == 5:
29+
(epoch_s, epoch_ns) = struct.unpack(">QI", data)
30+
time = datetime.datetime.utcfromtimestamp(epoch_s)
31+
time += datetime.timedelta(microseconds=epoch_ns/1000)
32+
return time.isoformat() + 'Z'
33+
return msgpack.ExtType(code, data)
2434

2535
class InfluxDBClient(object):
2636
"""InfluxDBClient primary client object to connect InfluxDB.
@@ -128,7 +138,7 @@ def __init__(self,
128138

129139
self._headers = {
130140
'Content-Type': 'application/json',
131-
'Accept': 'text/plain'
141+
'Accept': 'application/x-msgpack'
132142
}
133143

134144
@property
@@ -277,13 +287,21 @@ def request(self, url, method='GET', params=None, data=None,
277287
time.sleep((2 ** _try) * random.random() / 100.0)
278288
if not retry:
279289
raise
290+
291+
def reformat_error(response):
292+
err = self._parse_msgpack(response)
293+
if err:
294+
return json.dumps(err, separators=(',',':'))
295+
else:
296+
return response.content
297+
280298
# if there's not an error, there must have been a successful response
281299
if 500 <= response.status_code < 600:
282-
raise InfluxDBServerError(response.content)
300+
raise InfluxDBServerError(reformat_error(response))
283301
elif response.status_code == expected_response_code:
284302
return response
285303
else:
286-
raise InfluxDBClientError(response.content, response.status_code)
304+
raise InfluxDBClientError(reformat_error(response), response.status_code)
287305

288306
def write(self, data, params=None, expected_response_code=204,
289307
protocol='json'):
@@ -342,6 +360,18 @@ def _read_chunked_response(response, raise_errors=True):
342360
_key, []).extend(result[_key])
343361
return ResultSet(result_set, raise_errors=raise_errors)
344362

363+
@staticmethod
364+
def _parse_msgpack(response):
365+
""" Returns the decoded response if it is encoded as msgpack,
366+
otherwise return None
367+
"""
368+
headers = response.headers
369+
if headers and headers["Content-Type"] == "application/x-msgpack":
370+
return msgpack.unpackb(
371+
response.content,
372+
ext_hook=_msgpack_ext_hook,
373+
raw=False)
374+
345375
def query(self,
346376
query,
347377
params=None,
@@ -434,10 +464,11 @@ def query(self,
434464
expected_response_code=expected_response_code
435465
)
436466

437-
if chunked:
438-
return self._read_chunked_response(response)
439-
440-
data = response.json()
467+
data = self._parse_msgpack(response)
468+
if not data:
469+
if chunked:
470+
return self._read_chunked_response(response)
471+
data = response.json()
441472

442473
results = [
443474
ResultSet(result, raise_errors=raise_errors)

influxdb/tests/client_test.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -465,6 +465,29 @@ def test_query(self):
465465
[{'value': 0.64, 'time': '2009-11-10T23:00:00Z'}]
466466
)
467467

468+
def test_query_msgpack(self):
469+
"""Test query method with a messagepack response"""
470+
example_response = bytes(bytearray.fromhex(
471+
"81a7726573756c74739182ac73746174656d656e745f696400a673657269"
472+
"65739183a46e616d65a161a7636f6c756d6e7392a474696d65a176a67661"
473+
"6c7565739192c70c05000000005d26178a019096c8cb3ff0000000000000"
474+
))
475+
476+
with requests_mock.Mocker() as m:
477+
m.register_uri(
478+
requests_mock.GET,
479+
"http://localhost:8086/query",
480+
request_headers={"Accept": "application/x-msgpack"},
481+
headers={"Content-Type": "application/x-msgpack"},
482+
content=example_response
483+
)
484+
rs = self.cli.query('select * from a')
485+
486+
self.assertListEqual(
487+
list(rs.get_points()),
488+
[{'v': 1.0, 'time': '2019-07-10T16:51:22.026253Z'}]
489+
)
490+
468491
def test_select_into_post(self):
469492
"""Test SELECT.*INTO is POSTed."""
470493
example_response = (

requirements.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,5 @@ python-dateutil>=2.6.0
22
pytz
33
requests>=2.17.0
44
six>=1.10.0
5+
msgpack==0.6.1
6+

0 commit comments

Comments
 (0)