From 57daf8ccd5027c796a2fd3934b8e88d3982d300e Mon Sep 17 00:00:00 2001 From: Ophir LOJKINE Date: Wed, 10 Jul 2019 18:34:23 +0200 Subject: [PATCH 1/4] Add support for messagepack --- influxdb/client.py | 42 +++++++++++++++++++++++++++++------ influxdb/tests/client_test.py | 23 +++++++++++++++++++ requirements.txt | 2 ++ 3 files changed, 60 insertions(+), 7 deletions(-) diff --git a/influxdb/client.py b/influxdb/client.py index 8ac557d3..39f45b88 100644 --- a/influxdb/client.py +++ b/influxdb/client.py @@ -10,7 +10,10 @@ import random import json +import struct +import datetime import socket +import msgpack import requests import requests.exceptions from six.moves import xrange @@ -128,7 +131,7 @@ def __init__(self, self._headers = { 'Content-Type': 'application/json', - 'Accept': 'text/plain' + 'Accept': 'application/x-msgpack' } @property @@ -277,13 +280,22 @@ def request(self, url, method='GET', params=None, data=None, time.sleep((2 ** _try) * random.random() / 100.0) if not retry: raise + + def reformat_error(response): + err = self._parse_msgpack(response) + if err: + return json.dumps(err, separators=(',', ':')) + else: + return response.content + # if there's not an error, there must have been a successful response if 500 <= response.status_code < 600: - raise InfluxDBServerError(response.content) + raise InfluxDBServerError(reformat_error(response)) elif response.status_code == expected_response_code: return response else: - raise InfluxDBClientError(response.content, response.status_code) + err_msg = reformat_error(response) + raise InfluxDBClientError(err_msg, response.status_code) def write(self, data, params=None, expected_response_code=204, protocol='json'): @@ -342,6 +354,21 @@ def _read_chunked_response(response, raise_errors=True): _key, []).extend(result[_key]) return ResultSet(result_set, raise_errors=raise_errors) + @staticmethod + def _parse_msgpack(response): + """Return the decoded response if it is encoded as msgpack.""" + def hook(code, data): + if code == 5: + (epoch_s, epoch_ns) = struct.unpack(">QI", data) + time = datetime.datetime.utcfromtimestamp(epoch_s) + time += datetime.timedelta(microseconds=(epoch_ns / 1000)) + return time.isoformat() + 'Z' + return msgpack.ExtType(code, data) + + headers = response.headers + if headers and headers["Content-Type"] == "application/x-msgpack": + return msgpack.unpackb(response.content, ext_hook=hook, raw=False) + def query(self, query, params=None, @@ -434,10 +461,11 @@ def query(self, expected_response_code=expected_response_code ) - if chunked: - return self._read_chunked_response(response) - - data = response.json() + data = self._parse_msgpack(response) + if not data: + if chunked: + return self._read_chunked_response(response) + data = response.json() results = [ ResultSet(result, raise_errors=raise_errors) diff --git a/influxdb/tests/client_test.py b/influxdb/tests/client_test.py index e4cc7e11..3b89a03d 100644 --- a/influxdb/tests/client_test.py +++ b/influxdb/tests/client_test.py @@ -465,6 +465,29 @@ def test_query(self): [{'value': 0.64, 'time': '2009-11-10T23:00:00Z'}] ) + def test_query_msgpack(self): + """Test query method with a messagepack response.""" + example_response = bytes(bytearray.fromhex( + "81a7726573756c74739182ac73746174656d656e745f696400a673657269" + "65739183a46e616d65a161a7636f6c756d6e7392a474696d65a176a67661" + "6c7565739192c70c05000000005d26178a019096c8cb3ff0000000000000" + )) + + with requests_mock.Mocker() as m: + m.register_uri( + requests_mock.GET, + "http://localhost:8086/query", + request_headers={"Accept": "application/x-msgpack"}, + headers={"Content-Type": "application/x-msgpack"}, + content=example_response + ) + rs = self.cli.query('select * from a') + + self.assertListEqual( + list(rs.get_points()), + [{'v': 1.0, 'time': '2019-07-10T16:51:22.026253Z'}] + ) + def test_select_into_post(self): """Test SELECT.*INTO is POSTed.""" example_response = ( diff --git a/requirements.txt b/requirements.txt index db5f6f85..46f6cae7 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,3 +2,5 @@ python-dateutil>=2.6.0 pytz requests>=2.17.0 six>=1.10.0 +msgpack==0.6.1 + From 5f196fda6cd4c82c1949d6da1f8f581096a48425 Mon Sep 17 00:00:00 2001 From: Ophir LOJKINE Date: Fri, 12 Jul 2019 01:48:30 +0200 Subject: [PATCH 2/4] Remove unnecessary blank line Fixes https://github.com/influxdata/influxdb-python/pull/734/files/57daf8ccd5027c796a2fd3934b8e88d3982d300e#r302769403 --- requirements.txt | 1 - 1 file changed, 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 46f6cae7..77d7306f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,4 +3,3 @@ pytz requests>=2.17.0 six>=1.10.0 msgpack==0.6.1 - From e18333eeb3a95a72b48b86e4b32b8cf6997f9ad1 Mon Sep 17 00:00:00 2001 From: Ophir LOJKINE Date: Fri, 12 Jul 2019 02:35:59 +0200 Subject: [PATCH 3/4] Small code reorganization --- influxdb/client.py | 48 +++++++++++++++++++++++----------------------- 1 file changed, 24 insertions(+), 24 deletions(-) diff --git a/influxdb/client.py b/influxdb/client.py index 39f45b88..01a7e8cf 100644 --- a/influxdb/client.py +++ b/influxdb/client.py @@ -6,13 +6,13 @@ from __future__ import print_function from __future__ import unicode_literals -import time -import random - -import json -import struct import datetime +import json +import random import socket +import struct +import time + import msgpack import requests import requests.exceptions @@ -281,10 +281,16 @@ def request(self, url, method='GET', params=None, data=None, if not retry: raise + content_type = response.headers and response.headers.get("Content-Type") + if content_type == "application/x-msgpack" and response.content: + response._msgpack = msgpack.unpackb( + packed=response.content, + ext_hook=_msgpack_parse_hook, + raw=False) + def reformat_error(response): - err = self._parse_msgpack(response) - if err: - return json.dumps(err, separators=(',', ':')) + if response._msgpack: + return json.dumps(response._msgpack, separators=(',', ':')) else: return response.content @@ -354,21 +360,6 @@ def _read_chunked_response(response, raise_errors=True): _key, []).extend(result[_key]) return ResultSet(result_set, raise_errors=raise_errors) - @staticmethod - def _parse_msgpack(response): - """Return the decoded response if it is encoded as msgpack.""" - def hook(code, data): - if code == 5: - (epoch_s, epoch_ns) = struct.unpack(">QI", data) - time = datetime.datetime.utcfromtimestamp(epoch_s) - time += datetime.timedelta(microseconds=(epoch_ns / 1000)) - return time.isoformat() + 'Z' - return msgpack.ExtType(code, data) - - headers = response.headers - if headers and headers["Content-Type"] == "application/x-msgpack": - return msgpack.unpackb(response.content, ext_hook=hook, raw=False) - def query(self, query, params=None, @@ -461,7 +452,7 @@ def query(self, expected_response_code=expected_response_code ) - data = self._parse_msgpack(response) + data = getattr(response, '_msgpack', None) if not data: if chunked: return self._read_chunked_response(response) @@ -1131,3 +1122,12 @@ def _parse_netloc(netloc): 'password': info.password or None, 'host': info.hostname or 'localhost', 'port': info.port or 8086} + + +def _msgpack_parse_hook(code, data): + if code == 5: + (epoch_s, epoch_ns) = struct.unpack(">QI", data) + timestamp = datetime.datetime.utcfromtimestamp(epoch_s) + timestamp += datetime.timedelta(microseconds=(epoch_ns / 1000)) + return timestamp.isoformat() + 'Z' + return msgpack.ExtType(code, data) From 0d5204099b3e7353e1e2a64c8ad0bdc29f4e6eed Mon Sep 17 00:00:00 2001 From: Ophir LOJKINE Date: Fri, 12 Jul 2019 08:21:52 +0200 Subject: [PATCH 4/4] Small code reorganization Fixes https://github.com/influxdata/influxdb-python/pull/734#discussion_r302770011 --- influxdb/client.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/influxdb/client.py b/influxdb/client.py index 01a7e8cf..c5bc2cf4 100644 --- a/influxdb/client.py +++ b/influxdb/client.py @@ -281,12 +281,14 @@ def request(self, url, method='GET', params=None, data=None, if not retry: raise - content_type = response.headers and response.headers.get("Content-Type") - if content_type == "application/x-msgpack" and response.content: + type_header = response.headers and response.headers.get("Content-Type") + if type_header == "application/x-msgpack" and response.content: response._msgpack = msgpack.unpackb( packed=response.content, ext_hook=_msgpack_parse_hook, raw=False) + else: + response._msgpack = None def reformat_error(response): if response._msgpack: @@ -452,7 +454,7 @@ def query(self, expected_response_code=expected_response_code ) - data = getattr(response, '_msgpack', None) + data = response._msgpack if not data: if chunked: return self._read_chunked_response(response)