Skip to content

Commit ee170e1

Browse files
authored
Add docs for the BulkIngester (#481)
1 parent 02c6585 commit ee170e1

File tree

2 files changed

+143
-23
lines changed

2 files changed

+143
-23
lines changed

docs/usage/indexing-bulk.asciidoc

+43-5
Original file line numberDiff line numberDiff line change
@@ -30,23 +30,61 @@ include-tagged::{doc-tests-src}/usage/IndexingBulkTest.java[bulk-objects]
3030
[discrete]
3131
==== Indexing raw JSON data
3232

33-
The `document` property of a bulk index request can be any object that can be serialized to JSON using your Elasticsearch client's JSON mapper. In the example below we will use the {java-client}'s `JsonData` object to read json files from a log directory and send them in a bulk request.
33+
The `document` property of a bulk index request can be any object that can be serialized to JSON using your Elasticsearch client's JSON mapper. However, data that is ingested in bulk is often available as JSON text (e.g. files on disk), and parsing this JSON just to re-serialize it to send the bulk request would be a waste of resources. So documents in bulk operations can also be of type `BinaryData` that are sent verbatim (without parsing) to the {es} server.
3434

35-
Since `JsonData` doesn't allow reading directly from an input stream (this will be added in a future release), we will use the following function for that:
35+
In the example below we will use the {java-client}'s `BinaryData` to read json files from a log directory and send them in a bulk request.
3636

3737
["source","java"]
3838
--------------------------------------------------
39-
include-tagged::{doc-tests-src}/usage/IndexingBulkTest.java[read-json]
39+
include-tagged::{doc-tests-src}/usage/IndexingBulkTest.java[bulk-json]
4040
--------------------------------------------------
4141

42-
We can now read the contents of the log directory and send it to {es}:
42+
[discrete]
43+
==== Streaming ingestion with the Bulk Ingester
44+
45+
The `BulkIngester` simplifies the usage of the Bulk API by providing a utility class that allows index/update/delete operations to be transparently grouped in bulk requests. You only have to `add()` bulk operations to the ingester and
46+
it will take care of grouping and sending them in bulk according to its configuration.
47+
48+
The ingester will send a bulk request when one of the following criteria is met:
49+
50+
- the number of operations exceeds a maximum (defaults to 1000)
51+
- the bulk request size in bytes exceeds a maximum (defaults to 5 MiB)
52+
- a delay since the last request has expired (periodic flush, no default)
53+
54+
Additionally, you can define a maximum number of concurrent request waiting to be executed by {es} (defaults to 1). When that maximum is reached and the maximum number of operations have been collected, adding a new operation to the indexer will block. This is avoids overloading the {es} server by putting backpressure on the client application.
4355

4456
["source","java"]
4557
--------------------------------------------------
46-
include-tagged::{doc-tests-src}/usage/IndexingBulkTest.java[bulk-json]
58+
include-tagged::{doc-tests-src}/usage/IndexingBulkTest.java[bulk-ingester-setup]
4759
--------------------------------------------------
60+
<1> Sets the {es} client used to send bulk requests.
61+
<2> Sets the maximum number of operations to collect before sending a bulk request.
62+
<3> Sets the flush interval.
63+
<4> Adds a bulk operation to the ingester.
64+
<5> Closes the ingester to flush the pending operations and release resources.
65+
66+
Additionally, the bulk ingester accepts a listener so that your application can be notified of bulk requests that are
67+
sent and their result. To allow correlating bulk operations to application context, the `add()` method optionally
68+
accepts a `context` parameter. The type of this context parameter is used as the generic parameter of the `BulkIngester`
69+
object. You may have noticed the `Void` type in `BulkIngester<Void>` above: this is because we did not register a listener,
70+
and therefore did not care about context values.
71+
72+
The following example shows how you can use context values to implement a bulk ingestion listener: as previously it
73+
sends JSON log files in bulk, but tracks bulk request errors and failed operations. When an operation fails, depending on the error type you may want to re-add it to the ingester.
4874

75+
["source","java"]
76+
--------------------------------------------------
77+
include-tagged::{doc-tests-src}/usage/IndexingBulkTest.java[bulk-ingester-context]
78+
--------------------------------------------------
79+
<1> Creates a listener where context values are strings for the ingested file name.
80+
<2> Registers the listener on the bulk ingester.
81+
<3> Sets the file name as the context value for a bulk operation.
4982

83+
The bulk ingest also exposes statistic information that allows monitoring the ingestion process and tune its configuration:
5084

85+
- number of operations added,
86+
- number of calls to `add()` that were blocked because the maximum number of concurrent requests was reached (contention),
87+
- number of bulk requests sent,
88+
- number of bulk requests that were blocked because the maximum number of concurrent requests was reached.
5189

5290
{doc-tests-blurb}

java-client/src/test/java/co/elastic/clients/documentation/usage/IndexingBulkTest.java

+100-18
Original file line numberDiff line numberDiff line change
@@ -21,30 +21,31 @@
2121

2222
import co.elastic.clients.documentation.DocTestsTransport;
2323
import co.elastic.clients.elasticsearch.ElasticsearchClient;
24+
import co.elastic.clients.elasticsearch._helpers.bulk.BulkIngester;
25+
import co.elastic.clients.elasticsearch._helpers.bulk.BulkListener;
2426
import co.elastic.clients.elasticsearch.core.BulkRequest;
2527
import co.elastic.clients.elasticsearch.core.BulkResponse;
2628
import co.elastic.clients.elasticsearch.core.bulk.BulkResponseItem;
2729
import co.elastic.clients.elasticsearch.model.ModelTestCase;
28-
import co.elastic.clients.json.JsonData;
29-
import co.elastic.clients.json.JsonpMapper;
30-
import jakarta.json.spi.JsonProvider;
30+
import co.elastic.clients.util.BinaryData;
31+
import org.apache.commons.io.IOUtils;
32+
import org.apache.commons.logging.Log;
33+
import org.apache.commons.logging.LogFactory;
3134
import org.junit.jupiter.api.Test;
32-
import org.slf4j.Logger;
33-
import org.slf4j.LoggerFactory;
3435

3536
import java.io.File;
3637
import java.io.FileInputStream;
37-
import java.io.InputStream;
3838
import java.util.ArrayList;
3939
import java.util.Collections;
4040
import java.util.List;
41+
import java.util.concurrent.TimeUnit;
4142

4243
public class IndexingBulkTest extends ModelTestCase {
4344

4445
private final DocTestsTransport transport = new DocTestsTransport();
4546
private final ElasticsearchClient esClient = new ElasticsearchClient(transport);
4647

47-
private final Logger logger = LoggerFactory.getLogger(this.getClass());
48+
private final Log logger = LogFactory.getLog(this.getClass());
4849

4950
BulkResponse result = BulkResponse.of(r -> r
5051
.errors(false)
@@ -92,15 +93,6 @@ public void indexBulk() throws Exception {
9293
//end::bulk-objects
9394
}
9495

95-
//tag::read-json
96-
public static JsonData readJson(InputStream input, ElasticsearchClient esClient) {
97-
JsonpMapper jsonpMapper = esClient._transport().jsonpMapper();
98-
JsonProvider jsonProvider = jsonpMapper.jsonProvider();
99-
100-
return JsonData.from(jsonProvider.createParser(input), jsonpMapper);
101-
}
102-
//end::read-json
103-
10496
@Test
10597
public void indexBulkJson() throws Exception {
10698
transport.setResult(result);
@@ -116,15 +108,105 @@ public void indexBulkJson() throws Exception {
116108
BulkRequest.Builder br = new BulkRequest.Builder();
117109

118110
for (File file: logFiles) {
119-
JsonData json = readJson(new FileInputStream(file), esClient);
111+
FileInputStream input = new FileInputStream(file);
112+
BinaryData data = BinaryData.of(IOUtils.toByteArray(input));
120113

121114
br.operations(op -> op
122115
.index(idx -> idx
123116
.index("logs")
124-
.document(json)
117+
.document(data)
125118
)
126119
);
127120
}
128121
//end::bulk-json
129122
}
123+
124+
@Test
125+
public void useBulkIndexer() throws Exception {
126+
127+
File logDir = new File(".");
128+
File[] logFiles = logDir.listFiles(
129+
file -> file.getName().matches("log-.*\\.json")
130+
);
131+
132+
//tag::bulk-ingester-setup
133+
BulkIngester<Void> ingester = BulkIngester.of(b -> b
134+
.client(esClient) // <1>
135+
.maxOperations(100) // <2>
136+
.flushInterval(1, TimeUnit.SECONDS) // <3>
137+
);
138+
139+
for (File file: logFiles) {
140+
FileInputStream input = new FileInputStream(file);
141+
BinaryData data = BinaryData.of(IOUtils.toByteArray(input));
142+
143+
ingester.add(op -> op // <4>
144+
.index(idx -> idx
145+
.index("logs")
146+
.document(data)
147+
)
148+
);
149+
}
150+
151+
ingester.close(); // <5>
152+
//end::bulk-ingester-setup
153+
154+
}
155+
156+
@Test
157+
public void useBulkIndexerWithContext() throws Exception {
158+
159+
File[] logFiles = new File[]{};
160+
161+
//tag::bulk-ingester-context
162+
BulkListener<String> listener = new BulkListener<String>() { // <1>
163+
@Override
164+
public void beforeBulk(long executionId, BulkRequest request, List<String> contexts) {
165+
}
166+
167+
@Override
168+
public void afterBulk(long executionId, BulkRequest request, List<String> contexts, BulkResponse response) {
169+
// The request was accepted, but may contain failed items.
170+
// The "context" list gives the file name for each bulk item.
171+
logger.debug("Bulk request " + executionId + " completed");
172+
for (int i = 0; i < contexts.size(); i++) {
173+
BulkResponseItem item = response.items().get(i);
174+
if (item.error() != null) {
175+
// Inspect the failure cause
176+
logger.error("Failed to index file " + contexts.get(i) + " - " + item.error().reason());
177+
}
178+
}
179+
}
180+
181+
@Override
182+
public void afterBulk(long executionId, BulkRequest request, List<String> contexts, Throwable failure) {
183+
// The request could not be sent
184+
logger.debug("Bulk request " + executionId + " failed", failure);
185+
}
186+
};
187+
188+
BulkIngester<String> ingester = BulkIngester.of(b -> b
189+
.client(esClient)
190+
.maxOperations(100)
191+
.flushInterval(1, TimeUnit.SECONDS)
192+
.listener(listener) // <2>
193+
);
194+
195+
for (File file: logFiles) {
196+
FileInputStream input = new FileInputStream(file);
197+
BinaryData data = BinaryData.of(IOUtils.toByteArray(input));
198+
199+
ingester.add(op -> op
200+
.index(idx -> idx
201+
.index("logs")
202+
.document(data)
203+
),
204+
file.getName() // <3>
205+
);
206+
}
207+
208+
ingester.close();
209+
//end::bulk-ingester-context
210+
211+
}
130212
}

0 commit comments

Comments
 (0)