Skip to content

Commit 490d69c

Browse files
committed
Fixed bug in async_streaming_bulk()
1 parent 262c1bb commit 490d69c

File tree

9 files changed

+186
-178
lines changed

9 files changed

+186
-178
lines changed

elasticsearch/_async/compat.py

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,22 +26,17 @@ def get_sleep():
2626

2727
async def sleep(duration):
2828
await asyncio.sleep(duration, loop=loop)
29+
2930
return sleep
3031

3132

3233
def azip(*iterables):
33-
print("AZIP", iterables)
3434
iterators = [aiter(x) for x in iterables]
35-
print("AZIPTOR", iterators)
3635

3736
async def generator():
3837
while True:
3938
try:
40-
tuple_items = []
41-
for iterator in iterators:
42-
tuple_items.append(await iterator.__anext__())
43-
print("azip tuple", tuple_items)
44-
yield tuple(tuple_items)
39+
yield tuple([await i.__anext__() for i in iterators])
4540
except StopAsyncIteration:
4641
break
4742

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
# Licensed to Elasticsearch B.V under one or more agreements.
2+
# Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
3+
# See the LICENSE file in the project root for more information
4+

elasticsearch/_async/helpers/actions.py

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,6 @@ async def _process_bulk_chunk(
115115
"""
116116
Send a bulk request to elasticsearch and process the output.
117117
"""
118-
print("BULK DATA", bulk_data)
119118
# if raise on error is set, we need to collect errors per chunk before raising them
120119
errors = []
121120

@@ -226,7 +225,10 @@ async def actions_generator():
226225

227226
async def generator():
228227
async for bulk_data, bulk_actions in _chunk_actions(
229-
actions_generator(), chunk_size, max_chunk_bytes, client.transport.serializer
228+
aiter(actions_generator()),
229+
chunk_size,
230+
max_chunk_bytes,
231+
client.transport.serializer,
230232
):
231233

232234
for attempt in range(max_retries + 1):
@@ -235,17 +237,18 @@ async def generator():
235237
await sleep(min(max_backoff, initial_backoff * 2 ** (attempt - 1)))
236238

237239
try:
238-
print("before zip", bulk_actions, bulk_data)
239-
async for data, (ok, info) in azip(bulk_actions, _process_bulk_chunk(
240-
client,
241-
bulk_actions,
240+
async for data, (ok, info) in azip(
242241
bulk_data,
243-
raise_on_exception,
244-
raise_on_error,
245-
*args,
246-
**kwargs
247-
)):
248-
print("zipped", data, ok, info)
242+
_process_bulk_chunk(
243+
client,
244+
bulk_actions,
245+
bulk_data,
246+
raise_on_exception,
247+
raise_on_error,
248+
*args,
249+
**kwargs
250+
),
251+
):
249252
if not ok:
250253
action, info = info.popitem()
251254
# retry if retries enabled, we get 429, and we are not
@@ -257,7 +260,6 @@ async def generator():
257260
):
258261
# _process_bulk_chunk expects strings so we need to
259262
# re-serialize the data
260-
print("RETRY", data)
261263
to_retry.extend(
262264
map(client.transport.serializer.dumps, data)
263265
)
@@ -442,6 +444,7 @@ def scan(
442444
)
443445
444446
"""
447+
445448
async def generator(query, scroll_kwargs):
446449
scroll_kwargs = scroll_kwargs or {}
447450

@@ -451,7 +454,11 @@ async def generator(query, scroll_kwargs):
451454

452455
# initial search
453456
resp = await client.search(
454-
body=query, scroll=scroll, size=size, request_timeout=request_timeout, **kwargs
457+
body=query,
458+
scroll=scroll,
459+
size=size,
460+
request_timeout=request_timeout,
461+
**kwargs
455462
)
456463
scroll_id = resp.get("_scroll_id")
457464

@@ -487,7 +494,9 @@ async def generator(query, scroll_kwargs):
487494

488495
finally:
489496
if scroll_id and clear_scroll:
490-
await client.clear_scroll(body={"scroll_id": [scroll_id]}, ignore=(404,))
497+
await client.clear_scroll(
498+
body={"scroll_id": [scroll_id]}, ignore=(404,)
499+
)
491500

492501
return aiter(generator(query, scroll_kwargs))
493502

elasticsearch/client/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,9 @@ def __enter__(self):
229229
return self
230230

231231
def __exit__(self, *_):
232+
self.close()
233+
234+
def close(self):
232235
self.transport.close()
233236

234237
# AUTO-GENERATED-API-DEFINITIONS #

elasticsearch/compat.py

Lines changed: 1 addition & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
string_types = str, bytes
1818
from urllib.parse import quote, quote_plus, urlencode, urlparse, unquote
1919

20+
zip = zip
2021
map = map
2122
from queue import Queue
2223

@@ -25,25 +26,6 @@ def get_sleep():
2526
return time.sleep
2627

2728

28-
def zip(*iterables):
29-
print("ZIP", iterables)
30-
iterators = [iter(x) for x in iterables]
31-
print("ZIPTOR", iterators)
32-
33-
def generator():
34-
while True:
35-
try:
36-
tuple_items = []
37-
for iterator in iterators:
38-
tuple_items.append(iterator.__next__())
39-
print("zip tuple", tuple_items)
40-
yield tuple(tuple_items)
41-
except StopIteration:
42-
break
43-
44-
return generator().__iter__()
45-
46-
4729
# These match against 'anext' and 'aiter'
4830
next = next
4931
iter = iter

0 commit comments

Comments
 (0)