Skip to content

Commit 5da843e

Browse files
author
Dat Tran
committed
[BEAM-5107] Add support for ES-6.x to ElasticsearchIO
1 parent 2e13877 commit 5da843e

File tree

16 files changed

+565
-147
lines changed

16 files changed

+565
-147
lines changed

sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java

Lines changed: 1 addition & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,10 @@
1717
*/
1818
package org.apache.beam.sdk.io.elasticsearch;
1919

20-
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.BoundedElasticsearchSource;
2120
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.ConnectionConfiguration;
22-
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.Read;
23-
import static org.apache.beam.sdk.testing.SourceTestUtils.readFromSource;
24-
import static org.junit.Assert.assertEquals;
2521

26-
import java.util.List;
27-
import org.apache.beam.sdk.io.BoundedSource;
2822
import org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOITCommon.ElasticsearchPipelineOptions;
2923
import org.apache.beam.sdk.options.PipelineOptionsFactory;
30-
import org.apache.beam.sdk.testing.SourceTestUtils;
3124
import org.apache.beam.sdk.testing.TestPipeline;
3225
import org.elasticsearch.client.RestClient;
3326
import org.junit.AfterClass;
@@ -93,26 +86,7 @@ public static void afterClass() throws Exception {
9386

9487
@Test
9588
public void testSplitsVolume() throws Exception {
96-
Read read = ElasticsearchIO.read().withConnectionConfiguration(readConnectionConfiguration);
97-
BoundedElasticsearchSource initialSource =
98-
new BoundedElasticsearchSource(read, null, null, null);
99-
// desiredBundleSize is ignored because in ES 2.x there is no way to split shards. So we get
100-
// as many bundles as ES shards and bundle size is shard size
101-
long desiredBundleSizeBytes = 0;
102-
List<? extends BoundedSource<String>> splits =
103-
initialSource.split(desiredBundleSizeBytes, options);
104-
SourceTestUtils.assertSourcesEqualReferenceSource(initialSource, splits, options);
105-
// this is the number of ES shards
106-
// (By default, each index in Elasticsearch is allocated 5 primary shards)
107-
long expectedNumSplits = 5;
108-
assertEquals(expectedNumSplits, splits.size());
109-
int nonEmptySplits = 0;
110-
for (BoundedSource<String> subSource : splits) {
111-
if (readFromSource(subSource, options).size() > 0) {
112-
nonEmptySplits += 1;
113-
}
114-
}
115-
assertEquals(expectedNumSplits, nonEmptySplits);
89+
elasticsearchIOTestCommon.testSplit(0);
11690
}
11791

11892
@Test

sdks/java/io/elasticsearch-tests/elasticsearch-tests-2/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java

Lines changed: 2 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -17,26 +17,13 @@
1717
*/
1818
package org.apache.beam.sdk.io.elasticsearch;
1919

20-
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.BoundedElasticsearchSource;
2120
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.ConnectionConfiguration;
22-
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.Read;
23-
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.ACCEPTABLE_EMPTY_SPLITS_PERCENTAGE;
2421
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.ES_TYPE;
25-
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.NUM_DOCS_UTESTS;
2622
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.getEsIndex;
27-
import static org.apache.beam.sdk.testing.SourceTestUtils.readFromSource;
28-
import static org.hamcrest.Matchers.lessThan;
29-
import static org.junit.Assert.assertEquals;
30-
import static org.junit.Assert.assertThat;
3123

3224
import java.io.IOException;
3325
import java.io.Serializable;
3426
import java.net.ServerSocket;
35-
import java.util.List;
36-
import org.apache.beam.sdk.io.BoundedSource;
37-
import org.apache.beam.sdk.options.PipelineOptions;
38-
import org.apache.beam.sdk.options.PipelineOptionsFactory;
39-
import org.apache.beam.sdk.testing.SourceTestUtils;
4027
import org.apache.beam.sdk.testing.TestPipeline;
4128
import org.elasticsearch.client.RestClient;
4229
import org.elasticsearch.common.settings.Settings;
@@ -173,32 +160,7 @@ public void testWriteWithMaxBatchSizeBytes() throws Exception {
173160

174161
@Test
175162
public void testSplit() throws Exception {
176-
ElasticSearchIOTestUtils.insertTestDocuments(
177-
connectionConfiguration, NUM_DOCS_UTESTS, restClient);
178-
PipelineOptions options = PipelineOptionsFactory.create();
179-
Read read = ElasticsearchIO.read().withConnectionConfiguration(connectionConfiguration);
180-
BoundedElasticsearchSource initialSource =
181-
new BoundedElasticsearchSource(read, null, null, null);
182-
//desiredBundleSize is ignored because in ES 2.x there is no way to split shards. So we get
183-
// as many bundles as ES shards and bundle size is shard size
184-
int desiredBundleSizeBytes = 0;
185-
List<? extends BoundedSource<String>> splits =
186-
initialSource.split(desiredBundleSizeBytes, options);
187-
SourceTestUtils.assertSourcesEqualReferenceSource(initialSource, splits, options);
188-
//this is the number of ES shards
189-
// (By default, each index in Elasticsearch is allocated 5 primary shards)
190-
int expectedNumSources = 5;
191-
assertEquals("Wrong number of splits", expectedNumSources, splits.size());
192-
int emptySplits = 0;
193-
for (BoundedSource<String> subSource : splits) {
194-
if (readFromSource(subSource, options).isEmpty()) {
195-
emptySplits += 1;
196-
}
197-
}
198-
assertThat(
199-
"There are too many empty splits, parallelism is sub-optimal",
200-
emptySplits,
201-
lessThan((int) (ACCEPTABLE_EMPTY_SPLITS_PERCENTAGE * splits.size())));
163+
elasticsearchIOTestCommon.testSplit(0);
202164
}
203165

204166
@Test
@@ -216,7 +178,7 @@ public void testWriteWithIndexFn() throws Exception {
216178
@Test
217179
public void testWriteWithTypeFn() throws Exception {
218180
elasticsearchIOTestCommon.setPipeline(pipeline);
219-
elasticsearchIOTestCommon.testWriteWithTypeFn();
181+
elasticsearchIOTestCommon.testWriteWithTypeFn2x5x();
220182
}
221183

222184
@Test

sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/contrib/create_elk_container.sh

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
#
1919
################################################################################
2020

21-
#Create an ELK (Elasticsearch Logstash Kibana) container for ES v2.4 and compatible Logstash and Kibana versions,
21+
#Create an ELK (Elasticsearch Logstash Kibana) container for ES v5.4.3 and compatible Logstash and Kibana versions,
2222
#bind then on host ports, allow shell access to container and mount current directory on /home/$USER inside the container
2323

24-
docker create -p 5601:5601 -p 9200:9200 -p 5044:5044 -p 5000:5000 -p 9300:9300 -it -v $(pwd):/home/$USER/ --name elk-2.4 sebp/elk:es240_l240_k460
24+
docker create -p 5601:5601 -p 9200:9200 -p 5044:5044 -p 5000:5000 -p 9300:9300 -it -v $(pwd):/home/$USER/ --name elk-5.4.3 sebp/elk:543

sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOIT.java

Lines changed: 1 addition & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,10 @@
1717
*/
1818
package org.apache.beam.sdk.io.elasticsearch;
1919

20-
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.BoundedElasticsearchSource;
2120
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.ConnectionConfiguration;
22-
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.Read;
23-
import static org.apache.beam.sdk.testing.SourceTestUtils.readFromSource;
24-
import static org.junit.Assert.assertEquals;
2521

26-
import java.util.List;
27-
import org.apache.beam.sdk.io.BoundedSource;
2822
import org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOITCommon.ElasticsearchPipelineOptions;
2923
import org.apache.beam.sdk.options.PipelineOptionsFactory;
30-
import org.apache.beam.sdk.testing.SourceTestUtils;
3124
import org.apache.beam.sdk.testing.TestPipeline;
3225
import org.elasticsearch.client.RestClient;
3326
import org.junit.AfterClass;
@@ -93,24 +86,7 @@ public static void afterClass() throws Exception {
9386

9487
@Test
9588
public void testSplitsVolume() throws Exception {
96-
Read read = ElasticsearchIO.read().withConnectionConfiguration(readConnectionConfiguration);
97-
BoundedElasticsearchSource initialSource =
98-
new BoundedElasticsearchSource(read, null, null, null);
99-
int desiredBundleSizeBytes = 10000;
100-
List<? extends BoundedSource<String>> splits =
101-
initialSource.split(desiredBundleSizeBytes, options);
102-
SourceTestUtils.assertSourcesEqualReferenceSource(initialSource, splits, options);
103-
long indexSize = BoundedElasticsearchSource.estimateIndexSize(readConnectionConfiguration);
104-
float expectedNumSourcesFloat = (float) indexSize / desiredBundleSizeBytes;
105-
int expectedNumSources = (int) Math.ceil(expectedNumSourcesFloat);
106-
assertEquals(expectedNumSources, splits.size());
107-
int nonEmptySplits = 0;
108-
for (BoundedSource<String> subSource : splits) {
109-
if (readFromSource(subSource, options).size() > 0) {
110-
nonEmptySplits += 1;
111-
}
112-
}
113-
assertEquals("Wrong number of empty splits", expectedNumSources, nonEmptySplits);
89+
elasticsearchIOTestCommon.testSplit(10_000);
11490
}
11591

11692
@Test

sdks/java/io/elasticsearch-tests/elasticsearch-tests-5/src/test/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIOTest.java

Lines changed: 3 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -17,27 +17,16 @@
1717
*/
1818
package org.apache.beam.sdk.io.elasticsearch;
1919

20-
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.BoundedElasticsearchSource;
2120
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.ConnectionConfiguration;
22-
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.Read;
23-
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.ACCEPTABLE_EMPTY_SPLITS_PERCENTAGE;
2421
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.ES_TYPE;
25-
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.NUM_DOCS_UTESTS;
2622
import static org.apache.beam.sdk.io.elasticsearch.ElasticsearchIOTestCommon.getEsIndex;
27-
import static org.apache.beam.sdk.testing.SourceTestUtils.readFromSource;
28-
import static org.hamcrest.Matchers.lessThan;
2923

3024
import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope;
3125
import java.io.IOException;
3226
import java.io.Serializable;
3327
import java.net.InetSocketAddress;
3428
import java.util.ArrayList;
3529
import java.util.Collection;
36-
import java.util.List;
37-
import org.apache.beam.sdk.io.BoundedSource;
38-
import org.apache.beam.sdk.options.PipelineOptions;
39-
import org.apache.beam.sdk.options.PipelineOptionsFactory;
40-
import org.apache.beam.sdk.testing.SourceTestUtils;
4130
import org.apache.beam.sdk.testing.TestPipeline;
4231
import org.elasticsearch.common.settings.Settings;
4332
import org.elasticsearch.plugins.Plugin;
@@ -160,33 +149,10 @@ public void testWriteWithMaxBatchSizeBytes() throws Exception {
160149

161150
@Test
162151
public void testSplit() throws Exception {
163-
//need to create the index using the helper method (not create it at first insertion)
152+
// need to create the index using the helper method (not create it at first insertion)
164153
// for the indexSettings() to be run
165154
createIndex(getEsIndex());
166-
ElasticSearchIOTestUtils.insertTestDocuments(
167-
connectionConfiguration, NUM_DOCS_UTESTS, getRestClient());
168-
PipelineOptions options = PipelineOptionsFactory.create();
169-
Read read = ElasticsearchIO.read().withConnectionConfiguration(connectionConfiguration);
170-
BoundedElasticsearchSource initialSource =
171-
new BoundedElasticsearchSource(read, null, null, null);
172-
int desiredBundleSizeBytes = 2000;
173-
List<? extends BoundedSource<String>> splits =
174-
initialSource.split(desiredBundleSizeBytes, options);
175-
SourceTestUtils.assertSourcesEqualReferenceSource(initialSource, splits, options);
176-
long indexSize = BoundedElasticsearchSource.estimateIndexSize(connectionConfiguration);
177-
float expectedNumSourcesFloat = (float) indexSize / desiredBundleSizeBytes;
178-
int expectedNumSources = (int) Math.ceil(expectedNumSourcesFloat);
179-
assertEquals("Wrong number of splits", expectedNumSources, splits.size());
180-
int emptySplits = 0;
181-
for (BoundedSource<String> subSource : splits) {
182-
if (readFromSource(subSource, options).isEmpty()) {
183-
emptySplits += 1;
184-
}
185-
}
186-
assertThat(
187-
"There are too many empty splits, parallelism is sub-optimal",
188-
emptySplits,
189-
lessThan((int) (ACCEPTABLE_EMPTY_SPLITS_PERCENTAGE * splits.size())));
155+
elasticsearchIOTestCommon.testSplit(2_000);
190156
}
191157

192158
@Test
@@ -204,7 +170,7 @@ public void testWriteWithIndexFn() throws Exception {
204170
@Test
205171
public void testWriteWithTypeFn() throws Exception {
206172
elasticsearchIOTestCommon.setPipeline(pipeline);
207-
elasticsearchIOTestCommon.testWriteWithTypeFn();
173+
elasticsearchIOTestCommon.testWriteWithTypeFn2x5x();
208174
}
209175

210176
@Test
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* License); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an AS IS BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
apply plugin: org.apache.beam.gradle.BeamModulePlugin
20+
applyJavaNature()
21+
provideIntegrationTestingDependencies()
22+
enableJavaPerformanceTesting()
23+
24+
description = "Apache Beam :: SDKs :: Java :: IO :: Elasticsearch-Tests :: 6.x"
25+
ext.summary = "Tests of ElasticsearchIO on Elasticsearch 6.x"
26+
27+
test {
28+
// needed for ESIntegTestCase
29+
systemProperty "tests.security.manager", "false"
30+
}
31+
32+
def jna_version = "4.1.0"
33+
def log4j_version = "2.6.2"
34+
def elastic_search_version = "6.4.0"
35+
36+
configurations.all {
37+
resolutionStrategy {
38+
// Make sure the log4j versions for api and core match instead of taking the default
39+
// Gradle rule of using the latest.
40+
force "org.apache.logging.log4j:log4j-api:$log4j_version"
41+
force "org.apache.logging.log4j:log4j-core:$log4j_version"
42+
}
43+
}
44+
45+
dependencies {
46+
testCompile project(path: ":beam-sdks-java-io-elasticsearch-tests-common", configuration: "shadowTest")
47+
testCompile "org.elasticsearch.test:framework:$elastic_search_version"
48+
testCompile "org.elasticsearch.plugin:transport-netty4-client:$elastic_search_version"
49+
testCompile "com.carrotsearch.randomizedtesting:randomizedtesting-runner:2.5.2"
50+
testCompile "org.elasticsearch:elasticsearch:$elastic_search_version"
51+
52+
testCompile project(path: ":beam-sdks-java-core", configuration: "shadow")
53+
testCompile project(path: ":beam-sdks-java-io-elasticsearch", configuration: "shadow")
54+
testCompile project(path: ":beam-sdks-java-io-common", configuration: "shadowTest")
55+
testCompile project(path: ":beam-runners-direct-java", configuration: "shadow")
56+
testCompile "org.apache.logging.log4j:log4j-core:$log4j_version"
57+
testCompile "org.apache.logging.log4j:log4j-api:$log4j_version"
58+
testCompile library.java.slf4j_api
59+
testCompile "net.java.dev.jna:jna:$jna_version"
60+
testCompile library.java.hamcrest_core
61+
testCompile library.java.hamcrest_library
62+
testCompile library.java.slf4j_jdk14
63+
testCompile library.java.commons_io_1x
64+
testCompile library.java.junit
65+
testCompile "org.elasticsearch.client:elasticsearch-rest-client:$elastic_search_version"
66+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
#!/bin/sh
2+
################################################################################
3+
#
4+
# Licensed to the Apache Software Foundation (ASF) under one or more
5+
# contributor license agreements. See the NOTICE file distributed with
6+
# this work for additional information regarding copyright ownership.
7+
# The ASF licenses this file to You under the Apache License, Version 2.0
8+
# (the "License"); you may not use this file except in compliance with
9+
# the License. You may obtain a copy of the License at
10+
#
11+
# http://www.apache.org/licenses/LICENSE-2.0
12+
#
13+
# Unless required by applicable law or agreed to in writing, software
14+
# distributed under the License is distributed on an "AS IS" BASIS,
15+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
# See the License for the specific language governing permissions and
17+
# limitations under the License.
18+
#
19+
################################################################################
20+
21+
#Create an ELK (Elasticsearch Logstash Kibana) container for ES v6.4.0 and compatible Logstash and Kibana versions,
22+
#bind then on host ports, allow shell access to container and mount current directory on /home/$USER inside the container
23+
24+
docker create -p 5601:5601 -p 9200:9200 -p 5044:5044 -p 5000:5000 -p 9300:9300 -it -v $(pwd):/home/$USER/ --name elk-6.4.0 sebp/elk:640

0 commit comments

Comments
 (0)