Skip to content

Commit 4a0f821

Browse files
committed
Aiohtt>=2.x compatibility
Thanks @thecodingrobot and @evertlammerts for the work on this patch!
1 parent ef5ad1e commit 4a0f821

File tree

2 files changed

+23
-26
lines changed

2 files changed

+23
-26
lines changed

elasticsearch_async/connection.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,12 @@
11
import asyncio
22

33
import aiohttp
4-
from aiohttp.errors import FingerprintMismatch, ClientError
4+
from aiohttp.client_exceptions import ServerFingerprintMismatch
55

66
from elasticsearch.exceptions import ConnectionError, ConnectionTimeout, SSLError
77
from elasticsearch.connection import Connection
88
from elasticsearch.compat import urlencode
99

10-
from .helpers import ensure_future
1110

1211
class AIOHttpConnection(Connection):
1312
def __init__(self, host='localhost', port=9200, http_auth=None,
@@ -26,10 +25,10 @@ def __init__(self, host='localhost', port=9200, http_auth=None,
2625

2726
self.session = aiohttp.ClientSession(
2827
auth=http_auth,
28+
conn_timeout=self.timeout,
2929
connector=aiohttp.TCPConnector(
3030
loop=self.loop,
3131
verify_ssl=verify_certs,
32-
conn_timeout=self.timeout,
3332
use_dns_cache=use_dns_cache,
3433
)
3534
)
@@ -40,7 +39,7 @@ def __init__(self, host='localhost', port=9200, http_auth=None,
4039
)
4140

4241
def close(self):
43-
return ensure_future(self.session.close())
42+
return self.session.close()
4443

4544
@asyncio.coroutine
4645
def perform_request(self, method, url, params=None, body=None, timeout=None, ignore=()):
@@ -59,7 +58,7 @@ def perform_request(self, method, url, params=None, body=None, timeout=None, ign
5958

6059
except Exception as e:
6160
self.log_request_fail(method, url, url_path, body, self.loop.time() - start, exception=e)
62-
if isinstance(e, FingerprintMismatch):
61+
if isinstance(e, ServerFingerprintMismatch):
6362
raise SSLError('N/A', str(e), e)
6463
if isinstance(e, asyncio.TimeoutError):
6564
raise ConnectionTimeout('TIMEOUT', str(e), e)

test_elasticsearch_async/conftest.py

Lines changed: 19 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,39 +1,38 @@
11
import asyncio
22
import json
3-
from urllib.parse import urlparse, parse_qsl
4-
5-
from aiohttp.server import ServerHttpProtocol
6-
from aiohttp import Response
73

84
from pytest import yield_fixture, fixture
95

6+
import aiohttp.web
107
from elasticsearch_async import AIOHttpConnection, AsyncElasticsearch
118

129

1310
@yield_fixture
1411
def connection(event_loop, server, port):
1512
connection = AIOHttpConnection(port=port, loop=event_loop)
1613
yield connection
17-
event_loop.run_until_complete(connection.close())
14+
connection.close()
15+
16+
17+
class DummyElasticsearch(aiohttp.web.Server):
1818

19-
class DummyElasticsearch(ServerHttpProtocol):
2019
def __init__(self, **kwargs):
21-
super().__init__(**kwargs)
20+
super().__init__(handler=self.handler, **kwargs)
2221
self._responses = {}
2322
self.calls = []
2423

2524
def register_response(self, path, response={}, status=200):
2625
self._responses[path] = status, response
2726

2827
@asyncio.coroutine
29-
def handle_request(self, message, payload):
30-
url = urlparse(message.path)
28+
def handler(self, request):
29+
url = request.url
3130

32-
params = dict(parse_qsl(url.query))
33-
body = yield from payload.read()
31+
params = dict(request.query)
32+
body = yield from request.read()
3433
body = json.loads(body.decode('utf-8')) if body else ''
3534

36-
self.calls.append((message.method, url.path, body, params))
35+
self.calls.append((request.method, url.path, body, params))
3736

3837
if url.path in self._responses:
3938
status, body = self._responses.pop(url.path)
@@ -42,18 +41,16 @@ def handle_request(self, message, payload):
4241
else:
4342
status = 200
4443
body = {
45-
'method': message.method,
44+
'method': request.method,
4645
'params': params,
4746
'path': url.path,
4847
'body': body
4948
}
5049

51-
out = json.dumps(body).encode('utf-8')
50+
out = json.dumps(body)
51+
52+
return aiohttp.web.Response(body=out, status=status, content_type='application/json')
5253

53-
response = Response(self.writer, status)
54-
response.send_headers()
55-
response.write(out)
56-
yield from response.write_eof()
5754

5855
i = 0
5956
@fixture
@@ -62,12 +59,13 @@ def port():
6259
i += 1
6360
return 8080 + i
6461

65-
@fixture
62+
@yield_fixture
6663
def server(event_loop, port):
6764
server = DummyElasticsearch(debug=True, keep_alive=75)
68-
f = event_loop.create_server(lambda: server, '127.0.0.1', port)
65+
f = event_loop.create_server(server, '127.0.0.1', port)
6966
event_loop.run_until_complete(f)
70-
return server
67+
yield server
68+
event_loop.run_until_complete(server.shutdown(timeout=.5))
7169

7270
@yield_fixture
7371
def client(event_loop, server, port):

0 commit comments

Comments
 (0)