From 748dc04feb1e242453922ea7122b37cc42b85c03 Mon Sep 17 00:00:00 2001 From: Sylvain Wallez Date: Tue, 10 Jan 2023 15:54:09 +0100 Subject: [PATCH] Add docs for the BulkIngester --- docs/usage/indexing-bulk.asciidoc | 48 ++++++- .../documentation/usage/IndexingBulkTest.java | 118 +++++++++++++++--- 2 files changed, 143 insertions(+), 23 deletions(-) diff --git a/docs/usage/indexing-bulk.asciidoc b/docs/usage/indexing-bulk.asciidoc index 67c0a3a22..5d29e0b86 100644 --- a/docs/usage/indexing-bulk.asciidoc +++ b/docs/usage/indexing-bulk.asciidoc @@ -30,23 +30,61 @@ include-tagged::{doc-tests-src}/usage/IndexingBulkTest.java[bulk-objects] [discrete] ==== Indexing raw JSON data -The `document` property of a bulk index request can be any object that can be serialized to JSON using your Elasticsearch client's JSON mapper. In the example below we will use the {java-client}'s `JsonData` object to read json files from a log directory and send them in a bulk request. +The `document` property of a bulk index request can be any object that can be serialized to JSON using your Elasticsearch client's JSON mapper. However, data that is ingested in bulk is often available as JSON text (e.g. files on disk), and parsing this JSON just to re-serialize it to send the bulk request would be a waste of resources. So documents in bulk operations can also be of type `BinaryData` that are sent verbatim (without parsing) to the {es} server. -Since `JsonData` doesn't allow reading directly from an input stream (this will be added in a future release), we will use the following function for that: +In the example below we will use the {java-client}'s `BinaryData` to read json files from a log directory and send them in a bulk request. ["source","java"] -------------------------------------------------- -include-tagged::{doc-tests-src}/usage/IndexingBulkTest.java[read-json] +include-tagged::{doc-tests-src}/usage/IndexingBulkTest.java[bulk-json] -------------------------------------------------- -We can now read the contents of the log directory and send it to {es}: +[discrete] +==== Streaming ingestion with the Bulk Ingester + +The `BulkIngester` simplifies the usage of the Bulk API by providing a utility class that allows index/update/delete operations to be transparently grouped in bulk requests. You only have to `add()` bulk operations to the ingester and +it will take care of grouping and sending them in bulk according to its configuration. + +The ingester will send a bulk request when one of the following criteria is met: + +- the number of operations exceeds a maximum (defaults to 1000) +- the bulk request size in bytes exceeds a maximum (defaults to 5 MiB) +- a delay since the last request has expired (periodic flush, no default) + +Additionally, you can define a maximum number of concurrent request waiting to be executed by {es} (defaults to 1). When that maximum is reached and the maximum number of operations have been collected, adding a new operation to the indexer will block. This is avoids overloading the {es} server by putting backpressure on the client application. ["source","java"] -------------------------------------------------- -include-tagged::{doc-tests-src}/usage/IndexingBulkTest.java[bulk-json] +include-tagged::{doc-tests-src}/usage/IndexingBulkTest.java[bulk-ingester-setup] -------------------------------------------------- +<1> Sets the {es} client used to send bulk requests. +<2> Sets the maximum number of operations to collect before sending a bulk request. +<3> Sets the flush interval. +<4> Adds a bulk operation to the ingester. +<5> Closes the ingester to flush the pending operations and release resources. + +Additionally, the bulk ingester accepts a listener so that your application can be notified of bulk requests that are +sent and their result. To allow correlating bulk operations to application context, the `add()` method optionally +accepts a `context` parameter. The type of this context parameter is used as the generic parameter of the `BulkIngester` +object. You may have noticed the `Void` type in `BulkIngester` above: this is because we did not register a listener, +and therefore did not care about context values. + +The following example shows how you can use context values to implement a bulk ingestion listener: as previously it +sends JSON log files in bulk, but tracks bulk request errors and failed operations. When an operation fails, depending on the error type you may want to re-add it to the ingester. +["source","java"] +-------------------------------------------------- +include-tagged::{doc-tests-src}/usage/IndexingBulkTest.java[bulk-ingester-context] +-------------------------------------------------- +<1> Creates a listener where context values are strings for the ingested file name. +<2> Registers the listener on the bulk ingester. +<3> Sets the file name as the context value for a bulk operation. +The bulk ingest also exposes statistic information that allows monitoring the ingestion process and tune its configuration: +- number of operations added, +- number of calls to `add()` that were blocked because the maximum number of concurrent requests was reached (contention), +- number of bulk requests sent, +- number of bulk requests that were blocked because the maximum number of concurrent requests was reached. {doc-tests-blurb} diff --git a/java-client/src/test/java/co/elastic/clients/documentation/usage/IndexingBulkTest.java b/java-client/src/test/java/co/elastic/clients/documentation/usage/IndexingBulkTest.java index 3c9d56253..8b514135a 100644 --- a/java-client/src/test/java/co/elastic/clients/documentation/usage/IndexingBulkTest.java +++ b/java-client/src/test/java/co/elastic/clients/documentation/usage/IndexingBulkTest.java @@ -21,30 +21,31 @@ import co.elastic.clients.documentation.DocTestsTransport; import co.elastic.clients.elasticsearch.ElasticsearchClient; +import co.elastic.clients.elasticsearch._helpers.bulk.BulkIngester; +import co.elastic.clients.elasticsearch._helpers.bulk.BulkListener; import co.elastic.clients.elasticsearch.core.BulkRequest; import co.elastic.clients.elasticsearch.core.BulkResponse; import co.elastic.clients.elasticsearch.core.bulk.BulkResponseItem; import co.elastic.clients.elasticsearch.model.ModelTestCase; -import co.elastic.clients.json.JsonData; -import co.elastic.clients.json.JsonpMapper; -import jakarta.json.spi.JsonProvider; +import co.elastic.clients.util.BinaryData; +import org.apache.commons.io.IOUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.junit.jupiter.api.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.File; import java.io.FileInputStream; -import java.io.InputStream; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.concurrent.TimeUnit; public class IndexingBulkTest extends ModelTestCase { private final DocTestsTransport transport = new DocTestsTransport(); private final ElasticsearchClient esClient = new ElasticsearchClient(transport); - private final Logger logger = LoggerFactory.getLogger(this.getClass()); + private final Log logger = LogFactory.getLog(this.getClass()); BulkResponse result = BulkResponse.of(r -> r .errors(false) @@ -92,15 +93,6 @@ public void indexBulk() throws Exception { //end::bulk-objects } - //tag::read-json - public static JsonData readJson(InputStream input, ElasticsearchClient esClient) { - JsonpMapper jsonpMapper = esClient._transport().jsonpMapper(); - JsonProvider jsonProvider = jsonpMapper.jsonProvider(); - - return JsonData.from(jsonProvider.createParser(input), jsonpMapper); - } - //end::read-json - @Test public void indexBulkJson() throws Exception { transport.setResult(result); @@ -116,15 +108,105 @@ public void indexBulkJson() throws Exception { BulkRequest.Builder br = new BulkRequest.Builder(); for (File file: logFiles) { - JsonData json = readJson(new FileInputStream(file), esClient); + FileInputStream input = new FileInputStream(file); + BinaryData data = BinaryData.of(IOUtils.toByteArray(input)); br.operations(op -> op .index(idx -> idx .index("logs") - .document(json) + .document(data) ) ); } //end::bulk-json } + + @Test + public void useBulkIndexer() throws Exception { + + File logDir = new File("."); + File[] logFiles = logDir.listFiles( + file -> file.getName().matches("log-.*\\.json") + ); + + //tag::bulk-ingester-setup + BulkIngester ingester = BulkIngester.of(b -> b + .client(esClient) // <1> + .maxOperations(100) // <2> + .flushInterval(1, TimeUnit.SECONDS) // <3> + ); + + for (File file: logFiles) { + FileInputStream input = new FileInputStream(file); + BinaryData data = BinaryData.of(IOUtils.toByteArray(input)); + + ingester.add(op -> op // <4> + .index(idx -> idx + .index("logs") + .document(data) + ) + ); + } + + ingester.close(); // <5> + //end::bulk-ingester-setup + + } + + @Test + public void useBulkIndexerWithContext() throws Exception { + + File[] logFiles = new File[]{}; + + //tag::bulk-ingester-context + BulkListener listener = new BulkListener() { // <1> + @Override + public void beforeBulk(long executionId, BulkRequest request, List contexts) { + } + + @Override + public void afterBulk(long executionId, BulkRequest request, List contexts, BulkResponse response) { + // The request was accepted, but may contain failed items. + // The "context" list gives the file name for each bulk item. + logger.debug("Bulk request " + executionId + " completed"); + for (int i = 0; i < contexts.size(); i++) { + BulkResponseItem item = response.items().get(i); + if (item.error() != null) { + // Inspect the failure cause + logger.error("Failed to index file " + contexts.get(i) + " - " + item.error().reason()); + } + } + } + + @Override + public void afterBulk(long executionId, BulkRequest request, List contexts, Throwable failure) { + // The request could not be sent + logger.debug("Bulk request " + executionId + " failed", failure); + } + }; + + BulkIngester ingester = BulkIngester.of(b -> b + .client(esClient) + .maxOperations(100) + .flushInterval(1, TimeUnit.SECONDS) + .listener(listener) // <2> + ); + + for (File file: logFiles) { + FileInputStream input = new FileInputStream(file); + BinaryData data = BinaryData.of(IOUtils.toByteArray(input)); + + ingester.add(op -> op + .index(idx -> idx + .index("logs") + .document(data) + ), + file.getName() // <3> + ); + } + + ingester.close(); + //end::bulk-ingester-context + + } }