45
45
import java .util .List ;
46
46
import java .util .Map ;
47
47
import java .util .Set ;
48
+ import java .util .concurrent .CountDownLatch ;
49
+ import java .util .concurrent .Executors ;
50
+ import java .util .concurrent .ScheduledExecutorService ;
51
+ import java .util .concurrent .TimeUnit ;
52
+ import java .util .concurrent .atomic .LongAdder ;
48
53
import java .util .function .BiConsumer ;
49
54
import java .util .function .Consumer ;
50
55
import java .util .function .Function ;
@@ -63,7 +68,9 @@ public class ElasticsearchClientV8 extends ElasticsearchClient { // checkstyle-d
63
68
private final long mBulkSizeValue ;
64
69
65
70
private BulkIngester <Void > mBulkIngester ;
71
+ private ElasticsearchBulkListener mBulkListener ;
66
72
private RestClientTransport mTransport ;
73
+ private ScheduledExecutorService mBulkScheduler ;
67
74
private co .elastic .clients .elasticsearch .ElasticsearchClient mClient ;
68
75
69
76
public ElasticsearchClientV8 (final Settings aSettings ) {
@@ -75,6 +82,8 @@ public ElasticsearchClientV8(final Settings aSettings) {
75
82
@ Override
76
83
public void reset () {
77
84
mBulkIngester = null ;
85
+ mBulkListener = null ;
86
+ mBulkScheduler = null ;
78
87
79
88
super .reset ();
80
89
}
@@ -269,9 +278,19 @@ private void addBulk(final Function<BulkOperation.Builder, ObjectBuilder<BulkOpe
269
278
270
279
@ Override
271
280
protected void createBulk (final int aBulkActions , final int aBulkRequests ) {
281
+ mBulkScheduler = Executors .newScheduledThreadPool (aBulkRequests + 1 , r -> {
282
+ final Thread t = Executors .defaultThreadFactory ().newThread (r );
283
+ t .setName ("bulk-ingester-executor#" + t .getId ());
284
+ t .setDaemon (true );
285
+ return t ;
286
+ });
287
+
288
+ mBulkListener = new ElasticsearchBulkListener (this );
289
+
272
290
mBulkIngester = BulkIngester .of (b -> b
273
291
.client (mClient )
274
- .listener (new ElasticsearchBulkListener (this ))
292
+ .listener (mBulkListener )
293
+ .scheduler (mBulkScheduler )
275
294
.maxOperations (aBulkActions )
276
295
.maxSize (mBulkSizeValue )
277
296
.maxConcurrentRequests (aBulkRequests )
@@ -287,10 +306,23 @@ protected boolean isBulkClosed() {
287
306
protected boolean closeBulk () throws InterruptedException {
288
307
try {
289
308
mBulkIngester .close ();
309
+
310
+ if (!mBulkListener .awaitTermination (1 , TimeUnit .MINUTES )) {
311
+ getLogger ().warn ("Some bulk listener tasks still pending" );
312
+ }
313
+
314
+ mBulkScheduler .shutdown ();
315
+
316
+ if (!mBulkScheduler .awaitTermination (1 , TimeUnit .MINUTES )) {
317
+ getLogger ().warn ("Some bulk scheduler tasks still pending" );
318
+ }
319
+
290
320
return mBulkIngester .pendingRequests () == 0 ;
291
321
}
292
322
finally {
293
323
mBulkIngester = null ;
324
+ mBulkListener = null ;
325
+ mBulkScheduler = null ;
294
326
}
295
327
}
296
328
@@ -349,19 +381,24 @@ private interface IOFunction<T, R> {
349
381
350
382
private static class ElasticsearchBulkListener implements BulkListener <Void > {
351
383
384
+ private static final int PENDING_TASKS_WAIT = 100 ;
385
+
352
386
private final ElasticsearchClient mClient ;
387
+ private final LongAdder mBulkHandlers = new LongAdder ();
353
388
354
389
private ElasticsearchBulkListener (final ElasticsearchClient aClient ) {
355
390
mClient = aClient ;
356
391
}
357
392
358
393
@ Override
359
394
public void beforeBulk (final long aId , final BulkRequest aRequest , final List <Void > aContexts ) {
395
+ mBulkHandlers .increment ();
360
396
mClient .beforeBulk (aId , aRequest .operations ().size (), -1 );
361
397
}
362
398
363
399
@ Override
364
400
public void afterBulk (final long aId , final BulkRequest aRequest , final List <Void > aContexts , final BulkResponse aResponse ) {
401
+ mBulkHandlers .decrement ();
365
402
mClient .afterBulk (aId , aResponse .took (), (d , s , f ) -> aResponse .items ().forEach (r -> {
366
403
if (r .operationType () == OperationType .Delete ) {
367
404
d .run ();
@@ -381,9 +418,29 @@ public void afterBulk(final long aId, final BulkRequest aRequest, final List<Voi
381
418
382
419
@ Override
383
420
public void afterBulk (final long aId , final BulkRequest aRequest , final List <Void > aContexts , final Throwable aThrowable ) {
421
+ mBulkHandlers .decrement ();
384
422
mClient .afterBulk (aId , aThrowable );
385
423
}
386
424
425
+ public boolean awaitTermination (final long aTimeout , final TimeUnit aUnit ) throws InterruptedException {
426
+ final CountDownLatch latch = new CountDownLatch (1 );
427
+
428
+ new Thread (() -> {
429
+ while (mBulkHandlers .sum () != 0 ) {
430
+ try {
431
+ Thread .sleep (PENDING_TASKS_WAIT );
432
+ }
433
+ catch (final InterruptedException e ) {
434
+ throw new RuntimeException (e );
435
+ }
436
+ }
437
+
438
+ latch .countDown ();
439
+ }).start ();
440
+
441
+ return latch .await (aTimeout , aUnit );
442
+ }
443
+
387
444
}
388
445
389
446
}
0 commit comments