Skip to content

Add docs for the BulkIngester #481

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 43 additions & 5 deletions docs/usage/indexing-bulk.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -30,23 +30,61 @@ include-tagged::{doc-tests-src}/usage/IndexingBulkTest.java[bulk-objects]
[discrete]
==== Indexing raw JSON data

The `document` property of a bulk index request can be any object that can be serialized to JSON using your Elasticsearch client's JSON mapper. In the example below we will use the {java-client}'s `JsonData` object to read json files from a log directory and send them in a bulk request.
The `document` property of a bulk index request can be any object that can be serialized to JSON using your Elasticsearch client's JSON mapper. However, data that is ingested in bulk is often available as JSON text (e.g. files on disk), and parsing this JSON just to re-serialize it to send the bulk request would be a waste of resources. So documents in bulk operations can also be of type `BinaryData` that are sent verbatim (without parsing) to the {es} server.

Since `JsonData` doesn't allow reading directly from an input stream (this will be added in a future release), we will use the following function for that:
In the example below we will use the {java-client}'s `BinaryData` to read json files from a log directory and send them in a bulk request.

["source","java"]
--------------------------------------------------
include-tagged::{doc-tests-src}/usage/IndexingBulkTest.java[read-json]
include-tagged::{doc-tests-src}/usage/IndexingBulkTest.java[bulk-json]
--------------------------------------------------

We can now read the contents of the log directory and send it to {es}:
[discrete]
==== Streaming ingestion with the Bulk Ingester

The `BulkIngester` simplifies the usage of the Bulk API by providing a utility class that allows index/update/delete operations to be transparently grouped in bulk requests. You only have to `add()` bulk operations to the ingester and
it will take care of grouping and sending them in bulk according to its configuration.

The ingester will send a bulk request when one of the following criteria is met:

- the number of operations exceeds a maximum (defaults to 1000)
- the bulk request size in bytes exceeds a maximum (defaults to 5 MiB)
- a delay since the last request has expired (periodic flush, no default)

Additionally, you can define a maximum number of concurrent request waiting to be executed by {es} (defaults to 1). When that maximum is reached and the maximum number of operations have been collected, adding a new operation to the indexer will block. This is avoids overloading the {es} server by putting backpressure on the client application.

["source","java"]
--------------------------------------------------
include-tagged::{doc-tests-src}/usage/IndexingBulkTest.java[bulk-json]
include-tagged::{doc-tests-src}/usage/IndexingBulkTest.java[bulk-ingester-setup]
--------------------------------------------------
<1> Sets the {es} client used to send bulk requests.
<2> Sets the maximum number of operations to collect before sending a bulk request.
<3> Sets the flush interval.
<4> Adds a bulk operation to the ingester.
<5> Closes the ingester to flush the pending operations and release resources.

Additionally, the bulk ingester accepts a listener so that your application can be notified of bulk requests that are
sent and their result. To allow correlating bulk operations to application context, the `add()` method optionally
accepts a `context` parameter. The type of this context parameter is used as the generic parameter of the `BulkIngester`
object. You may have noticed the `Void` type in `BulkIngester<Void>` above: this is because we did not register a listener,
and therefore did not care about context values.

The following example shows how you can use context values to implement a bulk ingestion listener: as previously it
sends JSON log files in bulk, but tracks bulk request errors and failed operations. When an operation fails, depending on the error type you may want to re-add it to the ingester.

["source","java"]
--------------------------------------------------
include-tagged::{doc-tests-src}/usage/IndexingBulkTest.java[bulk-ingester-context]
--------------------------------------------------
<1> Creates a listener where context values are strings for the ingested file name.
<2> Registers the listener on the bulk ingester.
<3> Sets the file name as the context value for a bulk operation.

The bulk ingest also exposes statistic information that allows monitoring the ingestion process and tune its configuration:

- number of operations added,
- number of calls to `add()` that were blocked because the maximum number of concurrent requests was reached (contention),
- number of bulk requests sent,
- number of bulk requests that were blocked because the maximum number of concurrent requests was reached.

{doc-tests-blurb}
Original file line number Diff line number Diff line change
Expand Up @@ -21,30 +21,31 @@

import co.elastic.clients.documentation.DocTestsTransport;
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._helpers.bulk.BulkIngester;
import co.elastic.clients.elasticsearch._helpers.bulk.BulkListener;
import co.elastic.clients.elasticsearch.core.BulkRequest;
import co.elastic.clients.elasticsearch.core.BulkResponse;
import co.elastic.clients.elasticsearch.core.bulk.BulkResponseItem;
import co.elastic.clients.elasticsearch.model.ModelTestCase;
import co.elastic.clients.json.JsonData;
import co.elastic.clients.json.JsonpMapper;
import jakarta.json.spi.JsonProvider;
import co.elastic.clients.util.BinaryData;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.FileInputStream;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;

public class IndexingBulkTest extends ModelTestCase {

private final DocTestsTransport transport = new DocTestsTransport();
private final ElasticsearchClient esClient = new ElasticsearchClient(transport);

private final Logger logger = LoggerFactory.getLogger(this.getClass());
private final Log logger = LogFactory.getLog(this.getClass());

BulkResponse result = BulkResponse.of(r -> r
.errors(false)
Expand Down Expand Up @@ -92,15 +93,6 @@ public void indexBulk() throws Exception {
//end::bulk-objects
}

//tag::read-json
public static JsonData readJson(InputStream input, ElasticsearchClient esClient) {
JsonpMapper jsonpMapper = esClient._transport().jsonpMapper();
JsonProvider jsonProvider = jsonpMapper.jsonProvider();

return JsonData.from(jsonProvider.createParser(input), jsonpMapper);
}
//end::read-json

@Test
public void indexBulkJson() throws Exception {
transport.setResult(result);
Expand All @@ -116,15 +108,105 @@ public void indexBulkJson() throws Exception {
BulkRequest.Builder br = new BulkRequest.Builder();

for (File file: logFiles) {
JsonData json = readJson(new FileInputStream(file), esClient);
FileInputStream input = new FileInputStream(file);
BinaryData data = BinaryData.of(IOUtils.toByteArray(input));

br.operations(op -> op
.index(idx -> idx
.index("logs")
.document(json)
.document(data)
)
);
}
//end::bulk-json
}

@Test
public void useBulkIndexer() throws Exception {

File logDir = new File(".");
File[] logFiles = logDir.listFiles(
file -> file.getName().matches("log-.*\\.json")
);

//tag::bulk-ingester-setup
BulkIngester<Void> ingester = BulkIngester.of(b -> b
.client(esClient) // <1>
.maxOperations(100) // <2>
.flushInterval(1, TimeUnit.SECONDS) // <3>
);

for (File file: logFiles) {
FileInputStream input = new FileInputStream(file);
BinaryData data = BinaryData.of(IOUtils.toByteArray(input));

ingester.add(op -> op // <4>
.index(idx -> idx
.index("logs")
.document(data)
)
);
}

ingester.close(); // <5>
//end::bulk-ingester-setup

}

@Test
public void useBulkIndexerWithContext() throws Exception {

File[] logFiles = new File[]{};

//tag::bulk-ingester-context
BulkListener<String> listener = new BulkListener<String>() { // <1>
@Override
public void beforeBulk(long executionId, BulkRequest request, List<String> contexts) {
}

@Override
public void afterBulk(long executionId, BulkRequest request, List<String> contexts, BulkResponse response) {
// The request was accepted, but may contain failed items.
// The "context" list gives the file name for each bulk item.
logger.debug("Bulk request " + executionId + " completed");
for (int i = 0; i < contexts.size(); i++) {
BulkResponseItem item = response.items().get(i);
if (item.error() != null) {
// Inspect the failure cause
logger.error("Failed to index file " + contexts.get(i) + " - " + item.error().reason());
}
}
}

@Override
public void afterBulk(long executionId, BulkRequest request, List<String> contexts, Throwable failure) {
// The request could not be sent
logger.debug("Bulk request " + executionId + " failed", failure);
}
};

BulkIngester<String> ingester = BulkIngester.of(b -> b
.client(esClient)
.maxOperations(100)
.flushInterval(1, TimeUnit.SECONDS)
.listener(listener) // <2>
);

for (File file: logFiles) {
FileInputStream input = new FileInputStream(file);
BinaryData data = BinaryData.of(IOUtils.toByteArray(input));

ingester.add(op -> op
.index(idx -> idx
.index("logs")
.document(data)
),
file.getName() // <3>
);
}

ingester.close();
//end::bulk-ingester-context

}
}