Skip to content

Commit d4ee83f

Browse files
committed
Upgrade Elasticsearch 8.x to version 8.15.0. [DOX-412]
Incompatible change in elastic/elasticsearch-java#830. (Totally meaningless release notes entry: "Fixed bug in BulkIngester") https://www.elastic.co/guide/en/elasticsearch/client/java-api-client/8.15/release-highlights.html
1 parent ca09bd4 commit d4ee83f

File tree

2 files changed

+21
-1
lines changed

2 files changed

+21
-1
lines changed

settings.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ gradle.rootProject {
1414
'commons-cli' : '1.3.1',
1515
'commons-io' : '2.7',
1616
'elasticsearch2' : '2.2.1',
17-
'elasticsearch8' : '8.14.1',
17+
'elasticsearch8' : '8.15.0',
1818
'htsjdk' : '4.0.1',
1919
'jackson' : '2.13.4.2',
2020
'jdk' : '17',

src/main/java/hbz/limetrans/ElasticsearchClientV8.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@
4545
import java.util.List;
4646
import java.util.Map;
4747
import java.util.Set;
48+
import java.util.concurrent.Executors;
49+
import java.util.concurrent.ScheduledExecutorService;
50+
import java.util.concurrent.TimeUnit;
4851
import java.util.function.BiConsumer;
4952
import java.util.function.Consumer;
5053
import java.util.function.Function;
@@ -64,6 +67,7 @@ public class ElasticsearchClientV8 extends ElasticsearchClient { // checkstyle-d
6467

6568
private BulkIngester<Void> mBulkIngester;
6669
private RestClientTransport mTransport;
70+
private ScheduledExecutorService mBulkScheduler;
6771
private co.elastic.clients.elasticsearch.ElasticsearchClient mClient;
6872

6973
public ElasticsearchClientV8(final Settings aSettings) {
@@ -75,6 +79,7 @@ public ElasticsearchClientV8(final Settings aSettings) {
7579
@Override
7680
public void reset() {
7781
mBulkIngester = null;
82+
mBulkScheduler = null;
7883

7984
super.reset();
8085
}
@@ -269,8 +274,16 @@ private void addBulk(final Function<BulkOperation.Builder, ObjectBuilder<BulkOpe
269274

270275
@Override
271276
protected void createBulk(final int aBulkActions, final int aBulkRequests) {
277+
mBulkScheduler = Executors.newSingleThreadScheduledExecutor(r -> {
278+
final Thread t = Executors.defaultThreadFactory().newThread(r);
279+
t.setName("limetrans-bulk-ingester-executor#");
280+
t.setDaemon(true);
281+
return t;
282+
});
283+
272284
mBulkIngester = BulkIngester.of(b -> b
273285
.client(mClient)
286+
.scheduler(mBulkScheduler)
274287
.listener(new ElasticsearchBulkListener(this))
275288
.maxOperations(aBulkActions)
276289
.maxSize(mBulkSizeValue)
@@ -287,10 +300,17 @@ protected boolean isBulkClosed() {
287300
protected boolean closeBulk() throws InterruptedException {
288301
try {
289302
mBulkIngester.close();
303+
mBulkScheduler.shutdown();
304+
305+
if (!mBulkScheduler.awaitTermination(2, TimeUnit.MINUTES)) {
306+
getLogger().warn("Some bulk ingester tasks still pending");
307+
}
308+
290309
return mBulkIngester.pendingRequests() == 0;
291310
}
292311
finally {
293312
mBulkIngester = null;
313+
mBulkScheduler = null;
294314
}
295315
}
296316

0 commit comments

Comments
 (0)