Skip to content

Commit 71d6401

Browse files
authored
BulkIngester - Running listener code in separate thread pool (#830)
* running listener code in separate thread * thread name * closing logic * comment * javadoc fixes
1 parent 3ef2887 commit 71d6401

File tree

1 file changed

+52
-31
lines changed
  • java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk

1 file changed

+52
-31
lines changed

java-client/src/main/java/co/elastic/clients/elasticsearch/_helpers/bulk/BulkIngester.java

Lines changed: 52 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ public class BulkIngester<Context> implements AutoCloseable {
6262

6363
private @Nullable ScheduledFuture<?> flushTask;
6464
private @Nullable ScheduledExecutorService scheduler;
65+
private boolean isExternalScheduler = false;
6566

6667
// Current state
6768
private List<BulkOperation> operations = new ArrayList<>();
@@ -82,7 +83,8 @@ private static class RequestExecution<Context> {
8283
public final List<Context> contexts;
8384
public final CompletionStage<BulkResponse> futureResponse;
8485

85-
RequestExecution(long id, BulkRequest request, List<Context> contexts, CompletionStage<BulkResponse> futureResponse) {
86+
RequestExecution(long id, BulkRequest request, List<Context> contexts,
87+
CompletionStage<BulkResponse> futureResponse) {
8688
this.id = id;
8789
this.request = request;
8890
this.contexts = contexts;
@@ -99,27 +101,25 @@ private BulkIngester(Builder<Context> builder) {
99101
this.maxOperations = builder.bulkOperations < 0 ? Integer.MAX_VALUE : builder.bulkOperations;
100102
this.listener = builder.listener;
101103
this.flushIntervalMillis = builder.flushIntervalMillis;
102-
103-
if (flushIntervalMillis != null) {
104-
long flushInterval = flushIntervalMillis;
105104

105+
if (flushIntervalMillis != null || listener != null) {
106106
// Create a scheduler if needed
107-
ScheduledExecutorService scheduler;
108107
if (builder.scheduler == null) {
109-
scheduler = Executors.newSingleThreadScheduledExecutor((r) -> {
110-
Thread t = Executors.defaultThreadFactory().newThread(r);
111-
t.setName("bulk-ingester-flusher#" + ingesterId);
112-
t.setDaemon(true);
113-
return t;
114-
});
115-
116-
// Keep it, we'll have to close it.
117-
this.scheduler = scheduler;
108+
this.scheduler = Executors.newScheduledThreadPool(maxRequests + 1, (r) -> {
109+
Thread t = Executors.defaultThreadFactory().newThread(r);
110+
t.setName("bulk-ingester-executor#" + ingesterId + "#" + t.getId());
111+
t.setDaemon(true);
112+
return t;
113+
});
118114
} else {
119115
// It's not ours, we will not close it.
120-
scheduler = builder.scheduler;
116+
this.scheduler = builder.scheduler;
117+
this.isExternalScheduler = true;
121118
}
122-
119+
}
120+
121+
if (flushIntervalMillis != null) {
122+
long flushInterval = flushIntervalMillis;
123123
this.flushTask = scheduler.scheduleWithFixedDelay(
124124
this::failsafeFlush,
125125
flushInterval, flushInterval,
@@ -221,7 +221,7 @@ public long requestCount() {
221221
* @see Builder#maxConcurrentRequests
222222
*/
223223
public long requestContentionsCount() {
224-
return this.sendRequestCondition.contentions();
224+
return this.sendRequestCondition.contentions();
225225
}
226226

227227
//----- Predicates for the condition variables
@@ -265,7 +265,7 @@ private BulkRequest.Builder newRequest() {
265265
private void failsafeFlush() {
266266
try {
267267
flush();
268-
} catch(Throwable thr) {
268+
} catch (Throwable thr) {
269269
// Log the error and continue
270270
logger.error("Error in background flush", thr);
271271
}
@@ -280,7 +280,8 @@ public void flush() {
280280
() -> {
281281
// Build the request
282282
BulkRequest request = newRequest().operations(operations).build();
283-
List<Context> requestContexts = contexts == null ? Collections.nCopies(operations.size(), null) : contexts;
283+
List<Context> requestContexts = contexts == null ? Collections.nCopies(operations.size(),
284+
null) : contexts;
284285

285286
// Prepare for next round
286287
operations = new ArrayList<>();
@@ -291,7 +292,8 @@ public void flush() {
291292
long id = sendRequestCondition.invocations();
292293

293294
if (listener != null) {
294-
listener.beforeBulk(id, request, requestContexts);
295+
BulkRequest finalRequest = request;
296+
scheduler.submit(() -> listener.beforeBulk(id, finalRequest, requestContexts));
295297
}
296298

297299
CompletionStage<BulkResponse> result = client.bulk(request);
@@ -303,7 +305,7 @@ public void flush() {
303305
}
304306

305307
return new RequestExecution<>(id, request, requestContexts, result);
306-
});
308+
});
307309

308310
if (exec != null) {
309311
// A request was actually sent
@@ -317,12 +319,14 @@ public void flush() {
317319
if (resp != null) {
318320
// Success
319321
if (listener != null) {
320-
listener.afterBulk(exec.id, exec.request, exec.contexts, resp);
322+
scheduler.submit(() -> listener.afterBulk(exec.id, exec.request,
323+
exec.contexts, resp));
321324
}
322325
} else {
323326
// Failure
324327
if (listener != null) {
325-
listener.afterBulk(exec.id, exec.request, exec.contexts, thr);
328+
scheduler.submit(() -> listener.afterBulk(exec.id, exec.request,
329+
exec.contexts, thr));
326330
}
327331
}
328332
return null;
@@ -383,13 +387,14 @@ public void close() {
383387
// Flush buffered operations
384388
flush();
385389
// and wait for all requests to be completed
386-
closeCondition.whenReady(() -> {});
390+
closeCondition.whenReady(() -> {
391+
});
387392

388393
if (flushTask != null) {
389394
flushTask.cancel(false);
390395
}
391396

392-
if (scheduler != null) {
397+
if (scheduler != null && !isExternalScheduler) {
393398
scheduler.shutdownNow();
394399
}
395400
}
@@ -404,7 +409,7 @@ public static class Builder<Context> implements ObjectBuilder<BulkIngester<Conte
404409
private ElasticsearchAsyncClient client;
405410
private BulkRequest globalSettings;
406411
private int bulkOperations = 1000;
407-
private long bulkSize = 5*1024*1024;
412+
private long bulkSize = 5 * 1024 * 1024;
408413
private int maxConcurrentRequests = 1;
409414
private Long flushIntervalMillis;
410415
private BulkListener<Context> listener;
@@ -438,7 +443,8 @@ public Builder<Context> maxOperations(int count) {
438443
}
439444

440445
/**
441-
* Sets when to flush a new bulk request based on the size in bytes of actions currently added. A request is sent
446+
* Sets when to flush a new bulk request based on the size in bytes of actions currently added. A
447+
* request is sent
442448
* once that size has been exceeded. Defaults to 5 megabytes. Can be set to {@code -1} to disable it.
443449
*
444450
* @throws IllegalArgumentException if less than -1.
@@ -452,7 +458,8 @@ public Builder<Context> maxSize(long bytes) {
452458
}
453459

454460
/**
455-
* Sets the number of concurrent requests allowed to be executed. A value of 1 means 1 request is allowed to be executed
461+
* Sets the number of concurrent requests allowed to be executed. A value of 1 means 1 request is
462+
* allowed to be executed
456463
* while accumulating new bulk requests. Defaults to {@code 1}.
457464
*
458465
* @throws IllegalArgumentException if less than 1.
@@ -468,7 +475,8 @@ public Builder<Context> maxConcurrentRequests(int max) {
468475
/**
469476
* Sets an interval flushing any bulk actions pending if the interval passes. Defaults to not set.
470477
* <p>
471-
* Flushing is still subject to the maximum number of requests set with {@link #maxConcurrentRequests}.
478+
* Flushing is still subject to the maximum number of requests set with
479+
* {@link #maxConcurrentRequests}.
472480
*
473481
* @throws IllegalArgumentException if not a positive duration.
474482
*/
@@ -483,13 +491,25 @@ public Builder<Context> flushInterval(long value, TimeUnit unit) {
483491
/**
484492
* Sets an interval flushing any bulk actions pending if the interval passes. Defaults to not set.
485493
* <p>
486-
* Flushing is still subject to the maximum number of requests set with {@link #maxConcurrentRequests}.
494+
* Flushing is still subject to the maximum number of requests set with
495+
* {@link #maxConcurrentRequests}.
496+
* @deprecated use {@link #scheduler(ScheduledExecutorService)}
487497
*/
498+
@Deprecated
488499
public Builder<Context> flushInterval(long value, TimeUnit unit, ScheduledExecutorService scheduler) {
489500
this.scheduler = scheduler;
490501
return flushInterval(value, unit);
491502
}
492503

504+
/**
505+
* Sets a custom scheduler to run the flush thread and the listener logic. A default one is used if
506+
* not set.
507+
*/
508+
public Builder<Context> scheduler(ScheduledExecutorService scheduler) {
509+
this.scheduler = scheduler;
510+
return this;
511+
}
512+
493513
public Builder<Context> listener(BulkListener<Context> listener) {
494514
this.listener = listener;
495515
return this;
@@ -518,7 +538,8 @@ public Builder<Context> globalSettings(Function<BulkRequest.Builder, BulkRequest
518538
@Override
519539
public BulkIngester<Context> build() {
520540
// Ensure some chunking criteria are defined
521-
boolean hasCriteria = this.bulkOperations >= 0 || this.bulkSize >= 0 || this.flushIntervalMillis != null;
541+
boolean hasCriteria =
542+
this.bulkOperations >= 0 || this.bulkSize >= 0 || this.flushIntervalMillis != null;
522543

523544
if (!hasCriteria) {
524545
throw new IllegalStateException("No bulk operation chunking criteria have been set.");

0 commit comments

Comments
 (0)