Skip to content

Commit 8232d7e

Browse files
add custom sleep parameter to async_bulk and async_streaming_bulk
1 parent e1603f4 commit 8232d7e

File tree

1 file changed

+7
-2
lines changed

1 file changed

+7
-2
lines changed

elasticsearch/_async/helpers.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
Any,
2222
AsyncIterable,
2323
AsyncIterator,
24+
Awaitable,
2425
Callable,
2526
Collection,
2627
Dict,
@@ -167,6 +168,7 @@ async def async_streaming_bulk(
167168
expand_action_callback: Callable[
168169
[_TYPE_BULK_ACTION], _TYPE_BULK_ACTION_HEADER_AND_BODY
169170
] = expand_action,
171+
sleep: Callable[[float],Awaitable[None]] = asyncio.sleep,
170172
raise_on_exception: bool = True,
171173
max_retries: int = 0,
172174
initial_backoff: float = 2,
@@ -202,6 +204,7 @@ async def async_streaming_bulk(
202204
:arg expand_action_callback: callback executed on each action passed in,
203205
should return a tuple containing the action line and the data line
204206
(`None` if data line should be omitted).
207+
:arg sleep: custom callable defined for custom action on cancelling
205208
:arg retry_on_status: HTTP status code that will trigger a retry.
206209
(if `None` is specified only status 429 will retry).
207210
:arg max_retries: maximum number of times a document will be retried when
@@ -246,7 +249,7 @@ async def map_actions() -> AsyncIterable[_TYPE_BULK_ACTION_HEADER_AND_BODY]:
246249
]
247250
] = []
248251
if attempt:
249-
await asyncio.sleep(
252+
await sleep(
250253
min(max_backoff, initial_backoff * 2 ** (attempt - 1))
251254
)
252255

@@ -304,6 +307,7 @@ async def async_bulk(
304307
client: AsyncElasticsearch,
305308
actions: Union[Iterable[_TYPE_BULK_ACTION], AsyncIterable[_TYPE_BULK_ACTION]],
306309
stats_only: bool = False,
310+
sleep: Callable[[float],Awaitable[None]] = asyncio.sleep,
307311
ignore_status: Union[int, Collection[int]] = (),
308312
*args: Any,
309313
**kwargs: Any,
@@ -329,6 +333,7 @@ async def async_bulk(
329333
:arg actions: iterator containing the actions
330334
:arg stats_only: if `True` only report number of successful/failed
331335
operations instead of just number of successful and a list of error responses
336+
:arg sleep: custom callable defined for custom action on cancelling
332337
:arg ignore_status: list of HTTP status code that you want to ignore
333338
334339
Any additional keyword arguments will be passed to
@@ -344,7 +349,7 @@ async def async_bulk(
344349
# make streaming_bulk yield successful results so we can count them
345350
kwargs["yield_ok"] = True
346351
async for ok, item in async_streaming_bulk(
347-
client, actions, ignore_status=ignore_status, *args, **kwargs # type: ignore[misc]
352+
client, actions, sleep=sleep, ignore_status=ignore_status, *args, **kwargs # type: ignore[misc]
348353
):
349354
# go through request-response pairs and detect failures
350355
if not ok:

0 commit comments

Comments
 (0)