45
45
import java .util .List ;
46
46
import java .util .Map ;
47
47
import java .util .Set ;
48
+ import java .util .concurrent .Executors ;
49
+ import java .util .concurrent .ScheduledExecutorService ;
50
+ import java .util .concurrent .TimeUnit ;
48
51
import java .util .function .BiConsumer ;
49
52
import java .util .function .Consumer ;
50
53
import java .util .function .Function ;
@@ -64,6 +67,7 @@ public class ElasticsearchClientV8 extends ElasticsearchClient { // checkstyle-d
64
67
65
68
private BulkIngester <Void > mBulkIngester ;
66
69
private RestClientTransport mTransport ;
70
+ private ScheduledExecutorService mBulkScheduler ;
67
71
private co .elastic .clients .elasticsearch .ElasticsearchClient mClient ;
68
72
69
73
public ElasticsearchClientV8 (final Settings aSettings ) {
@@ -75,6 +79,7 @@ public ElasticsearchClientV8(final Settings aSettings) {
75
79
@ Override
76
80
public void reset () {
77
81
mBulkIngester = null ;
82
+ mBulkScheduler = null ;
78
83
79
84
super .reset ();
80
85
}
@@ -269,8 +274,16 @@ private void addBulk(final Function<BulkOperation.Builder, ObjectBuilder<BulkOpe
269
274
270
275
@ Override
271
276
protected void createBulk (final int aBulkActions , final int aBulkRequests ) {
277
+ mBulkScheduler = Executors .newSingleThreadScheduledExecutor (r -> {
278
+ final Thread t = Executors .defaultThreadFactory ().newThread (r );
279
+ t .setName ("limetrans-bulk-ingester-executor#" );
280
+ t .setDaemon (true );
281
+ return t ;
282
+ });
283
+
272
284
mBulkIngester = BulkIngester .of (b -> b
273
285
.client (mClient )
286
+ .scheduler (mBulkScheduler )
274
287
.listener (new ElasticsearchBulkListener (this ))
275
288
.maxOperations (aBulkActions )
276
289
.maxSize (mBulkSizeValue )
@@ -287,10 +300,13 @@ protected boolean isBulkClosed() {
287
300
protected boolean closeBulk () throws InterruptedException {
288
301
try {
289
302
mBulkIngester .close ();
303
+ mBulkScheduler .shutdown ();
304
+ mBulkScheduler .awaitTermination (2 , TimeUnit .MINUTES );
290
305
return mBulkIngester .pendingRequests () == 0 ;
291
306
}
292
307
finally {
293
308
mBulkIngester = null ;
309
+ mBulkScheduler = null ;
294
310
}
295
311
}
296
312
0 commit comments