Skip to content

Commit 363cb56

Browse files
committed
Revert "Add docs for the BulkIngester (#481)"
This reverts commit 03b3d48. Unfortunately the 8.6.0 release was cut before this commit. It will be part of 8.7.0
1 parent 575b1d6 commit 363cb56

File tree

2 files changed

+23
-143
lines changed

2 files changed

+23
-143
lines changed

docs/usage/indexing-bulk.asciidoc

+5-43
Original file line numberDiff line numberDiff line change
@@ -30,61 +30,23 @@ include-tagged::{doc-tests-src}/usage/IndexingBulkTest.java[bulk-objects]
3030
[discrete]
3131
==== Indexing raw JSON data
3232

33-
The `document` property of a bulk index request can be any object that can be serialized to JSON using your Elasticsearch client's JSON mapper. However, data that is ingested in bulk is often available as JSON text (e.g. files on disk), and parsing this JSON just to re-serialize it to send the bulk request would be a waste of resources. So documents in bulk operations can also be of type `BinaryData` that are sent verbatim (without parsing) to the {es} server.
33+
The `document` property of a bulk index request can be any object that can be serialized to JSON using your Elasticsearch client's JSON mapper. In the example below we will use the {java-client}'s `JsonData` object to read json files from a log directory and send them in a bulk request.
3434

35-
In the example below we will use the {java-client}'s `BinaryData` to read json files from a log directory and send them in a bulk request.
35+
Since `JsonData` doesn't allow reading directly from an input stream (this will be added in a future release), we will use the following function for that:
3636

3737
["source","java"]
3838
--------------------------------------------------
39-
include-tagged::{doc-tests-src}/usage/IndexingBulkTest.java[bulk-json]
39+
include-tagged::{doc-tests-src}/usage/IndexingBulkTest.java[read-json]
4040
--------------------------------------------------
4141

42-
[discrete]
43-
==== Streaming ingestion with the Bulk Ingester
44-
45-
The `BulkIngester` simplifies the usage of the Bulk API by providing a utility class that allows index/update/delete operations to be transparently grouped in bulk requests. You only have to `add()` bulk operations to the ingester and
46-
it will take care of grouping and sending them in bulk according to its configuration.
47-
48-
The ingester will send a bulk request when one of the following criteria is met:
49-
50-
- the number of operations exceeds a maximum (defaults to 1000)
51-
- the bulk request size in bytes exceeds a maximum (defaults to 5 MiB)
52-
- a delay since the last request has expired (periodic flush, no default)
53-
54-
Additionally, you can define a maximum number of concurrent request waiting to be executed by {es} (defaults to 1). When that maximum is reached and the maximum number of operations have been collected, adding a new operation to the indexer will block. This is avoids overloading the {es} server by putting backpressure on the client application.
42+
We can now read the contents of the log directory and send it to {es}:
5543

5644
["source","java"]
5745
--------------------------------------------------
58-
include-tagged::{doc-tests-src}/usage/IndexingBulkTest.java[bulk-ingester-setup]
46+
include-tagged::{doc-tests-src}/usage/IndexingBulkTest.java[bulk-json]
5947
--------------------------------------------------
60-
<1> Sets the {es} client used to send bulk requests.
61-
<2> Sets the maximum number of operations to collect before sending a bulk request.
62-
<3> Sets the flush interval.
63-
<4> Adds a bulk operation to the ingester.
64-
<5> Closes the ingester to flush the pending operations and release resources.
65-
66-
Additionally, the bulk ingester accepts a listener so that your application can be notified of bulk requests that are
67-
sent and their result. To allow correlating bulk operations to application context, the `add()` method optionally
68-
accepts a `context` parameter. The type of this context parameter is used as the generic parameter of the `BulkIngester`
69-
object. You may have noticed the `Void` type in `BulkIngester<Void>` above: this is because we did not register a listener,
70-
and therefore did not care about context values.
71-
72-
The following example shows how you can use context values to implement a bulk ingestion listener: as previously it
73-
sends JSON log files in bulk, but tracks bulk request errors and failed operations. When an operation fails, depending on the error type you may want to re-add it to the ingester.
7448

75-
["source","java"]
76-
--------------------------------------------------
77-
include-tagged::{doc-tests-src}/usage/IndexingBulkTest.java[bulk-ingester-context]
78-
--------------------------------------------------
79-
<1> Creates a listener where context values are strings for the ingested file name.
80-
<2> Registers the listener on the bulk ingester.
81-
<3> Sets the file name as the context value for a bulk operation.
8249

83-
The bulk ingest also exposes statistic information that allows monitoring the ingestion process and tune its configuration:
8450

85-
- number of operations added,
86-
- number of calls to `add()` that were blocked because the maximum number of concurrent requests was reached (contention),
87-
- number of bulk requests sent,
88-
- number of bulk requests that were blocked because the maximum number of concurrent requests was reached.
8951

9052
{doc-tests-blurb}

java-client/src/test/java/co/elastic/clients/documentation/usage/IndexingBulkTest.java

+18-100
Original file line numberDiff line numberDiff line change
@@ -21,31 +21,30 @@
2121

2222
import co.elastic.clients.documentation.DocTestsTransport;
2323
import co.elastic.clients.elasticsearch.ElasticsearchClient;
24-
import co.elastic.clients.elasticsearch._helpers.bulk.BulkIngester;
25-
import co.elastic.clients.elasticsearch._helpers.bulk.BulkListener;
2624
import co.elastic.clients.elasticsearch.core.BulkRequest;
2725
import co.elastic.clients.elasticsearch.core.BulkResponse;
2826
import co.elastic.clients.elasticsearch.core.bulk.BulkResponseItem;
2927
import co.elastic.clients.elasticsearch.model.ModelTestCase;
30-
import co.elastic.clients.util.BinaryData;
31-
import org.apache.commons.io.IOUtils;
32-
import org.apache.commons.logging.Log;
33-
import org.apache.commons.logging.LogFactory;
28+
import co.elastic.clients.json.JsonData;
29+
import co.elastic.clients.json.JsonpMapper;
30+
import jakarta.json.spi.JsonProvider;
3431
import org.junit.jupiter.api.Test;
32+
import org.slf4j.Logger;
33+
import org.slf4j.LoggerFactory;
3534

3635
import java.io.File;
3736
import java.io.FileInputStream;
37+
import java.io.InputStream;
3838
import java.util.ArrayList;
3939
import java.util.Collections;
4040
import java.util.List;
41-
import java.util.concurrent.TimeUnit;
4241

4342
public class IndexingBulkTest extends ModelTestCase {
4443

4544
private final DocTestsTransport transport = new DocTestsTransport();
4645
private final ElasticsearchClient esClient = new ElasticsearchClient(transport);
4746

48-
private final Log logger = LogFactory.getLog(this.getClass());
47+
private final Logger logger = LoggerFactory.getLogger(this.getClass());
4948

5049
BulkResponse result = BulkResponse.of(r -> r
5150
.errors(false)
@@ -93,6 +92,15 @@ public void indexBulk() throws Exception {
9392
//end::bulk-objects
9493
}
9594

95+
//tag::read-json
96+
public static JsonData readJson(InputStream input, ElasticsearchClient esClient) {
97+
JsonpMapper jsonpMapper = esClient._transport().jsonpMapper();
98+
JsonProvider jsonProvider = jsonpMapper.jsonProvider();
99+
100+
return JsonData.from(jsonProvider.createParser(input), jsonpMapper);
101+
}
102+
//end::read-json
103+
96104
@Test
97105
public void indexBulkJson() throws Exception {
98106
transport.setResult(result);
@@ -108,105 +116,15 @@ public void indexBulkJson() throws Exception {
108116
BulkRequest.Builder br = new BulkRequest.Builder();
109117

110118
for (File file: logFiles) {
111-
FileInputStream input = new FileInputStream(file);
112-
BinaryData data = BinaryData.of(IOUtils.toByteArray(input));
119+
JsonData json = readJson(new FileInputStream(file), esClient);
113120

114121
br.operations(op -> op
115122
.index(idx -> idx
116123
.index("logs")
117-
.document(data)
124+
.document(json)
118125
)
119126
);
120127
}
121128
//end::bulk-json
122129
}
123-
124-
@Test
125-
public void useBulkIndexer() throws Exception {
126-
127-
File logDir = new File(".");
128-
File[] logFiles = logDir.listFiles(
129-
file -> file.getName().matches("log-.*\\.json")
130-
);
131-
132-
//tag::bulk-ingester-setup
133-
BulkIngester<Void> ingester = BulkIngester.of(b -> b
134-
.client(esClient) // <1>
135-
.maxOperations(100) // <2>
136-
.flushInterval(1, TimeUnit.SECONDS) // <3>
137-
);
138-
139-
for (File file: logFiles) {
140-
FileInputStream input = new FileInputStream(file);
141-
BinaryData data = BinaryData.of(IOUtils.toByteArray(input));
142-
143-
ingester.add(op -> op // <4>
144-
.index(idx -> idx
145-
.index("logs")
146-
.document(data)
147-
)
148-
);
149-
}
150-
151-
ingester.close(); // <5>
152-
//end::bulk-ingester-setup
153-
154-
}
155-
156-
@Test
157-
public void useBulkIndexerWithContext() throws Exception {
158-
159-
File[] logFiles = new File[]{};
160-
161-
//tag::bulk-ingester-context
162-
BulkListener<String> listener = new BulkListener<String>() { // <1>
163-
@Override
164-
public void beforeBulk(long executionId, BulkRequest request, List<String> contexts) {
165-
}
166-
167-
@Override
168-
public void afterBulk(long executionId, BulkRequest request, List<String> contexts, BulkResponse response) {
169-
// The request was accepted, but may contain failed items.
170-
// The "context" list gives the file name for each bulk item.
171-
logger.debug("Bulk request " + executionId + " completed");
172-
for (int i = 0; i < contexts.size(); i++) {
173-
BulkResponseItem item = response.items().get(i);
174-
if (item.error() != null) {
175-
// Inspect the failure cause
176-
logger.error("Failed to index file " + contexts.get(i) + " - " + item.error().reason());
177-
}
178-
}
179-
}
180-
181-
@Override
182-
public void afterBulk(long executionId, BulkRequest request, List<String> contexts, Throwable failure) {
183-
// The request could not be sent
184-
logger.debug("Bulk request " + executionId + " failed", failure);
185-
}
186-
};
187-
188-
BulkIngester<String> ingester = BulkIngester.of(b -> b
189-
.client(esClient)
190-
.maxOperations(100)
191-
.flushInterval(1, TimeUnit.SECONDS)
192-
.listener(listener) // <2>
193-
);
194-
195-
for (File file: logFiles) {
196-
FileInputStream input = new FileInputStream(file);
197-
BinaryData data = BinaryData.of(IOUtils.toByteArray(input));
198-
199-
ingester.add(op -> op
200-
.index(idx -> idx
201-
.index("logs")
202-
.document(data)
203-
),
204-
file.getName() // <3>
205-
);
206-
}
207-
208-
ingester.close();
209-
//end::bulk-ingester-context
210-
211-
}
212130
}

0 commit comments

Comments
 (0)