Closed
Description
Elasticsearch Version
7.17.9
Installed Plugins
No response
Java Version
bundled
OS Version
macOS Monterey version 12.6.2
Problem Description
BulkIngester hangs:
I add 100 documents to the ElasticSearch index with the BulkIngester.
When using maxConcurrentRequests(0) on the bulk ingester builder and invoking bulkIngester.flush() after adding the 100 documents to to the bulk ingester, the ingester hangs in the function public T whenReadyIf(BooleanSupplier canRun, Supplier fn) of the class FnCondition on line 82 condition.awaitUninterruptibly();
When using maxConcurrentRequests(1) the documents are ingested.
Steps to Reproduce
This is to code using the BulkIngester:
private void handleBulkPerIndexSet(Collection<E> indexSetEvents) {
// Create bulk listener.
BulkListener<String> bulkListener = new BulkListener<String>() {
@Override
public void beforeBulk(long executionId, BulkRequest bulkRequest, List<String> contexts) {
}
@Override
public void afterBulk(long executionId, BulkRequest bulkRequest, List<String> contexts, BulkResponse bulkResponse) {
}
@Override
public void afterBulk(long executionId, BulkRequest bulkRequest, List<String> contexts, Throwable failure) {
}
};
// Create bulk ingester.
BulkIngester<String> bulkIngester = BulkIngester.of(ingesterBuilder ->
ingesterBuilder
.client(elasticSearch.getESClient())
.maxConcurrentRequests(1)
.maxOperations(-1)
.maxSize(5 * 1024 * 1024)
.globalSettings(gsBuilder ->
gsBuilder
.waitForActiveShards(asBuilder -> asBuilder.count(1)))
.refresh(Refresh.True)
.listener(bulkListener)
);
try {
// Add events to bulk ingester.
for (E event : indexSetEvents) {
// Add request to bulk processor.
switch (event.action()) {
case create:
bulkIngester.add(new CreateOperation.Builder<BinaryData>()
.index(event.esIndex())
.id(event.id())
.document(BinaryData.of(event.toESJson().getBytes(Charsets.UTF_8)))
.build()
._toBulkOperation());
break;
case update:
bulkIngester.add(new IndexOperation.Builder<BinaryData>()
.index(event.esIndex())
.id(event.id())
.document(BinaryData.of(event.toESJson().getBytes(Charsets.UTF_8)))
.build()
._toBulkOperation());
break;
case delete:
// Soft delete event.
event.deleted(true);
bulkIngester.add(new IndexOperation.Builder<BinaryData>()
.index(event.esIndex())
.id(event.id())
.document(BinaryData.of(event.toESJson().getBytes(Charsets.UTF_8)))
.build()
._toBulkOperation());
break;
case undelete:
// Soft undelete event.
event.deleted(false);
bulkIngester.add(new IndexOperation.Builder<BinaryData>()
.index(event.esIndex())
.id(event.id())
.document(BinaryData.of(event.toESJson().getBytes(Charsets.UTF_8)))
.build()
._toBulkOperation());
break;
case purge:
// Real physical delete.
bulkIngester.add(new DeleteOperation.Builder()
.index(event.esIndex())
.id(event.id())
.build()
._toBulkOperation());
break;
default:
// Should not get here. Log anyway.
logger.error(String.format("Skipped event with unsupported action '%s' -> %s", event.action().name(), event.toJson()));
break;
}
}
bulkIngester.flush();
bulkIngester.close();
}
catch (Exception e) {
bulkException = new EventBulkProcessorException("Failed to process %d events. %s".formatted(indexSetEvents.size(), e.getMessage()));
logger.error(bulkException.getMessage());
}
}
Logs (if relevant)
No response