Skip to content

BulkIngester hangs #532

Closed
Closed
@frank-montyne

Description

@frank-montyne

Elasticsearch Version

7.17.9

Installed Plugins

No response

Java Version

bundled

OS Version

macOS Monterey version 12.6.2

Problem Description

BulkIngester hangs:

I add 100 documents to the ElasticSearch index with the BulkIngester.

When using maxConcurrentRequests(0) on the bulk ingester builder and invoking bulkIngester.flush() after adding the 100 documents to to the bulk ingester, the ingester hangs in the function public T whenReadyIf(BooleanSupplier canRun, Supplier fn) of the class FnCondition on line 82 condition.awaitUninterruptibly();

When using maxConcurrentRequests(1) the documents are ingested.

Steps to Reproduce

This is to code using the BulkIngester:

private void handleBulkPerIndexSet(Collection<E> indexSetEvents) {
   // Create bulk listener.
   BulkListener<String> bulkListener = new BulkListener<String>() {
      @Override
      public void beforeBulk(long executionId, BulkRequest bulkRequest, List<String> contexts) {
      }

      @Override
      public void afterBulk(long executionId, BulkRequest bulkRequest, List<String> contexts, BulkResponse bulkResponse) {
      }

      @Override
      public void afterBulk(long executionId, BulkRequest bulkRequest, List<String> contexts, Throwable failure) {
      }
   };

   // Create bulk ingester.
   BulkIngester<String> bulkIngester = BulkIngester.of(ingesterBuilder -> 
      ingesterBuilder
         .client(elasticSearch.getESClient())
         .maxConcurrentRequests(1)
         .maxOperations(-1)
         .maxSize(5 * 1024 * 1024)
         .globalSettings(gsBuilder -> 
            gsBuilder
               .waitForActiveShards(asBuilder -> asBuilder.count(1)))
               .refresh(Refresh.True)
         .listener(bulkListener)
      );
   
   try {
      // Add events to bulk ingester.
      for (E event : indexSetEvents) {
         // Add request to bulk processor.
         switch (event.action()) {
            case create:
               bulkIngester.add(new CreateOperation.Builder<BinaryData>()
                     .index(event.esIndex())
                     .id(event.id())
                     .document(BinaryData.of(event.toESJson().getBytes(Charsets.UTF_8)))
                     .build()
                     ._toBulkOperation());
               break;

            case update:
               bulkIngester.add(new IndexOperation.Builder<BinaryData>()
                     .index(event.esIndex())
                     .id(event.id())
                     .document(BinaryData.of(event.toESJson().getBytes(Charsets.UTF_8)))
                     .build()
                     ._toBulkOperation());					
               break;

            case delete:
               // Soft delete event.
               event.deleted(true);
               bulkIngester.add(new IndexOperation.Builder<BinaryData>()
                     .index(event.esIndex())
                     .id(event.id())
                     .document(BinaryData.of(event.toESJson().getBytes(Charsets.UTF_8)))
                     .build()
                     ._toBulkOperation());					
               break;

            case undelete:
               // Soft undelete event.
               event.deleted(false);
               bulkIngester.add(new IndexOperation.Builder<BinaryData>()
                     .index(event.esIndex())
                     .id(event.id())
                     .document(BinaryData.of(event.toESJson().getBytes(Charsets.UTF_8)))
                     .build()
                     ._toBulkOperation());					
               break;

            case purge:
               // Real physical delete.
               bulkIngester.add(new DeleteOperation.Builder()
                     .index(event.esIndex())
                     .id(event.id())
                     .build()
                     ._toBulkOperation());					
               break;

            default:
               // Should not get here. Log anyway.
               logger.error(String.format("Skipped event with unsupported action '%s' -> %s", event.action().name(), event.toJson()));
               break;
         }
      }
      bulkIngester.flush();
      bulkIngester.close();
   }
   catch (Exception e) {
      bulkException = new EventBulkProcessorException("Failed to process %d events. %s".formatted(indexSetEvents.size(), e.getMessage()));
      logger.error(bulkException.getMessage());
   }
}

Logs (if relevant)

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions