Skip to content

Commit 2cd0c8e

Browse files
authored
Fixing scala 2.13 problems in integration tests (#1880)
Fixing spark 3 integration tests to be compatible with scala 2.13 and 2.12.
1 parent c1dd804 commit 2cd0c8e

File tree

14 files changed

+75
-34
lines changed

14 files changed

+75
-34
lines changed

spark/core/src/itest/java/org/elasticsearch/spark/integration/AbstractHadoopBasicSparkTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@
5656

5757
import static org.hamcrest.Matchers.*;
5858

59-
import static scala.collection.JavaConversions.*;
59+
import static org.elasticsearch.spark.integration.ScalaUtils.propertiesAsScalaMap;
6060
import scala.Tuple2;
6161

6262
public class AbstractHadoopBasicSparkTest implements Serializable {

spark/core/src/itest/java/org/elasticsearch/spark/integration/AbstractJavaEsSparkTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@
5353

5454
import static org.hamcrest.Matchers.*;
5555

56-
import static scala.collection.JavaConversions.*;
56+
import static org.elasticsearch.spark.integration.ScalaUtils.propertiesAsScalaMap;
5757
import scala.Tuple2;
5858

5959
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package org.elasticsearch.spark.integration;
2+
3+
import scala.collection.mutable.Map;
4+
import scala.collection.mutable.HashMap;
5+
6+
import java.util.Properties;
7+
8+
public class ScalaUtils {
9+
/*
10+
* Scala removed scala.collection.JavaConversions.propertiesAsScalaMap() in 2.13, replacing it with an implicit
11+
* jdk.CollectionConverters.asScala. This is an attempt to get a method that works in 2.10-2.13. It can be removed and replaced with
12+
* jdk.CollectionConverters once we no longer support scala older than 2.13.
13+
*/
14+
public static scala.collection.Map<String, String> propertiesAsScalaMap(Properties props) {
15+
Map scalaMap = new HashMap();
16+
for (java.util.Map.Entry<Object, Object> entry : props.entrySet()) {
17+
scalaMap.put(entry.getKey(), entry.getValue());
18+
}
19+
return scalaMap;
20+
}
21+
}
22+

spark/core/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSpark.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -80,8 +80,8 @@ import org.junit.runner.RunWith
8080
import org.junit.runners.Parameterized
8181
import org.junit.runners.Parameterized.Parameters
8282

83-
import scala.collection.JavaConversions.propertiesAsScalaMap
84-
import scala.collection.JavaConverters.asScalaBufferConverter
83+
import org.elasticsearch.spark.integration.ScalaUtils.propertiesAsScalaMap
84+
import org.elasticsearch.spark.rdd.JDKCollectionConvertersCompat.Converters._
8585

8686
object AbstractScalaEsScalaSpark {
8787
@transient val conf = new SparkConf()
@@ -95,7 +95,7 @@ object AbstractScalaEsScalaSpark {
9595

9696
@BeforeClass
9797
def setup() {
98-
conf.setAll(TestSettings.TESTING_PROPS);
98+
conf.setAll(propertiesAsScalaMap(TestSettings.TESTING_PROPS));
9999
sc = new SparkContext(conf)
100100
}
101101

@@ -141,7 +141,7 @@ class AbstractScalaEsScalaSpark(prefix: String, readMetadata: jl.Boolean) extend
141141
// don't use the sc.read.json/textFile to avoid the whole Hadoop madness
142142
val path = Paths.get(uri)
143143
// because Windows
144-
val lines = Files.readAllLines(path, StandardCharsets.ISO_8859_1).asScala
144+
val lines: Seq[String] = Files.readAllLines(path, StandardCharsets.ISO_8859_1).asScala.toSeq
145145
sc.parallelize(lines)
146146
}
147147

spark/core/src/main/scala/org/elasticsearch/spark/rdd/JDKCollectionConvertersCompat.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ package org.elasticsearch.spark.rdd
2424
* warning in any Scala version.
2525
* From https://github.com/scala/scala-collection-compat/issues/208#issuecomment-497735669
2626
*/
27-
private[rdd] object JDKCollectionConvertersCompat {
27+
private[elasticsearch] object JDKCollectionConvertersCompat {
2828
object Scope1 {
2929
object jdk {
3030
type CollectionConverters = Int

spark/sql-20/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSparkStructuredStreaming.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ import org.junit.runners.Parameterized
6565
import org.junit.runners.Parameterized.Parameters
6666

6767
import scala.collection.JavaConversions.propertiesAsScalaMap
68+
import scala.collection.JavaConverters.asScalaBufferConverter
6869
import scala.io.Codec
6970
import scala.io.Source
7071

@@ -622,8 +623,8 @@ class AbstractScalaEsSparkStructuredStreaming(prefix: String, something: Boolean
622623
searchResult = RestUtils.get(target + "/_search?version=true")
623624
val result: java.util.Map[String, Object] = new ObjectMapper().readValue(searchResult, classOf[java.util.Map[String, Object]])
624625
val hits = result.get("hits").asInstanceOf[java.util.Map[String, Object]].get("hits").asInstanceOf[java.util.List[java.util.Map[String,
625-
Object]]]
626-
hits.forEach(hit => {
626+
Object]]].asScala
627+
hits.foreach(hit => {
627628
hit.get("_id").asInstanceOf[String] match {
628629
case "1" => {
629630
assertEquals(1, hit.get("_version"))

spark/sql-30/src/itest/java/org/elasticsearch/spark/integration/AbstractJavaEsSparkSQLTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@
6363

6464
import static org.hamcrest.Matchers.*;
6565

66-
import static scala.collection.JavaConversions.*;
66+
import static org.elasticsearch.spark.integration.ScalaUtils.propertiesAsScalaMap;
6767

6868
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
6969
public class AbstractJavaEsSparkSQLTest implements Serializable {

spark/sql-30/src/itest/java/org/elasticsearch/spark/integration/AbstractJavaEsSparkStreamingTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@
7878
import static org.elasticsearch.hadoop.util.TestUtils.resource;
7979
import static org.hamcrest.Matchers.*;
8080
import static org.junit.Assert.*;
81-
import static scala.collection.JavaConversions.propertiesAsScalaMap;
81+
import static org.elasticsearch.spark.integration.ScalaUtils.propertiesAsScalaMap;
8282

8383
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
8484
@RunWith(Parameterized.class)
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package org.elasticsearch.spark.integration;
2+
3+
import scala.collection.mutable.Map;
4+
import scala.collection.mutable.HashMap;
5+
6+
import java.util.Properties;
7+
8+
public class ScalaUtils {
9+
/*
10+
* Scala removed scala.collection.JavaConversions.propertiesAsScalaMap() in 2.13, replacing it with an implicit
11+
* jdk.CollectionConverters.asScala. This is an attempt to get a method that works in 2.10-2.13. It can be removed and replaced with
12+
* jdk.CollectionConverters once we no longer support scala older than 2.13.
13+
*/
14+
public static scala.collection.Map<String, String> propertiesAsScalaMap(Properties props) {
15+
Map scalaMap = new HashMap();
16+
for (java.util.Map.Entry<Object, Object> entry : props.entrySet()) {
17+
scalaMap.put(entry.getKey(), entry.getValue());
18+
}
19+
return scalaMap;
20+
}
21+
}

spark/sql-30/src/itest/scala/org/elasticsearch/spark/integration/AbstractJavaEsSparkStructuredStreamingTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@
6060
import static org.junit.Assert.assertEquals;
6161
import static org.junit.Assert.assertThat;
6262
import static org.junit.Assert.assertTrue;
63-
import static scala.collection.JavaConversions.propertiesAsScalaMap;
63+
import static org.elasticsearch.spark.integration.ScalaUtils.propertiesAsScalaMap;
6464

6565
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
6666
@RunWith(Parameterized.class)

spark/sql-30/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsScalaSparkStreaming.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ import org.junit.runner.RunWith
4646
import org.junit.runners.Parameterized.Parameters
4747
import org.junit.runners.{MethodSorters, Parameterized}
4848

49-
import scala.collection.JavaConversions.propertiesAsScalaMap
49+
import org.elasticsearch.spark.integration.ScalaUtils.propertiesAsScalaMap
5050
import scala.collection.mutable
5151
import scala.reflect.ClassTag
5252

@@ -62,7 +62,7 @@ object AbstractScalaEsScalaSparkStreaming {
6262

6363
@BeforeClass
6464
def setup(): Unit = {
65-
conf.setAll(TestSettings.TESTING_PROPS)
65+
conf.setAll(propertiesAsScalaMap(TestSettings.TESTING_PROPS))
6666
sc = new SparkContext(conf)
6767
}
6868

spark/sql-30/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSparkSQL.scala

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,8 @@ import java.nio.file.Paths
2727
import java.sql.Timestamp
2828
import java.{util => ju}
2929
import java.util.concurrent.TimeUnit
30-
import scala.collection.JavaConversions.propertiesAsScalaMap
31-
import scala.collection.JavaConverters.asScalaBufferConverter
32-
import scala.collection.JavaConverters.mapAsJavaMapConverter
30+
import org.elasticsearch.spark.integration.ScalaUtils.propertiesAsScalaMap
31+
import org.elasticsearch.spark.rdd.JDKCollectionConvertersCompat.Converters._
3332
import scala.collection.Map
3433
import scala.collection.mutable.ArrayBuffer
3534
import org.apache.spark.SparkConf
@@ -116,7 +115,7 @@ object AbstractScalaEsScalaSparkSQL {
116115

117116
@BeforeClass
118117
def setup() {
119-
conf.setAll(TestSettings.TESTING_PROPS);
118+
conf.setAll(propertiesAsScalaMap(TestSettings.TESTING_PROPS));
120119
sc = new SparkContext(conf)
121120
sqc = SparkSession.builder().config(conf).getOrCreate().sqlContext
122121

@@ -231,7 +230,7 @@ class AbstractScalaEsScalaSparkSQL(prefix: String, readMetadata: jl.Boolean, pus
231230
@Test
232231
def test1KryoScalaEsRow() {
233232
val kryo = SparkUtils.sparkSerializer(sc.getConf)
234-
val row = new ScalaEsRow(new ArrayBuffer() ++= StringUtils.tokenize("foo,bar,tar").asScala)
233+
val row = new ScalaEsRow((new ArrayBuffer() ++= StringUtils.tokenize("foo,bar,tar").asScala).toSeq)
235234

236235
val storage = Array.ofDim[Byte](512)
237236
val output = new KryoOutput(storage)
@@ -733,7 +732,7 @@ class AbstractScalaEsScalaSparkSQL(prefix: String, readMetadata: jl.Boolean, pus
733732
def testEsDataFrame3WriteWithRichMapping() {
734733
val path = Paths.get(AbstractScalaEsScalaSparkSQL.testData.sampleArtistsDatUri())
735734
// because Windows...
736-
val lines = Files.readAllLines(path, StandardCharsets.ISO_8859_1).asScala
735+
val lines = Files.readAllLines(path, StandardCharsets.ISO_8859_1).asScala.toSeq
737736

738737
val data = sc.parallelize(lines)
739738

@@ -1614,7 +1613,7 @@ class AbstractScalaEsScalaSparkSQL(prefix: String, readMetadata: jl.Boolean, pus
16141613
assertEquals("long", nested.asInstanceOf[ArrayType].elementType.typeName)
16151614

16161615
val first = df.first
1617-
val vals = first.getStruct(0).getSeq[Seq[Long]](0)(0)
1616+
val vals = first.getStruct(0).getSeq[scala.collection.Seq[Long]](0)(0)
16181617
assertEquals(50, vals(0))
16191618
assertEquals(32, vals(1))
16201619
}
@@ -2366,7 +2365,7 @@ class AbstractScalaEsScalaSparkSQL(prefix: String, readMetadata: jl.Boolean, pus
23662365

23672366
// No "es.read.field.include", so everything is included:
23682367
var df = reader.load("read_field_include_test")
2369-
var result = df.select("features.hashtags").first().getAs[IndexedSeq[Row]](0)
2368+
var result = df.select("features.hashtags").first().getAs[scala.collection.IndexedSeq[Row]](0)
23702369
assertEquals(2, result(0).size)
23712370
assertEquals("hello", result(0).getAs("text"))
23722371
assertEquals("2", result(0).getAs("count"))
@@ -2413,7 +2412,7 @@ class AbstractScalaEsScalaSparkSQL(prefix: String, readMetadata: jl.Boolean, pus
24132412
val reader = sqc.read.schema(schema).format("org.elasticsearch.spark.sql").option("es.read.field.as.array.include","samples")
24142413
var resultDf = reader.load("nested_fields_upsert_test")
24152414
assertEquals(2, resultDf.count())
2416-
var samples = resultDf.select("samples").where("id = '2'").first().getAs[IndexedSeq[Row]](0)
2415+
var samples = resultDf.select("samples").where("id = '2'").first().getAs[scala.collection.IndexedSeq[Row]](0)
24172416
assertEquals(2, samples.size)
24182417
assertEquals("hello", samples(0).get(0))
24192418
assertEquals("world", samples(1).get(0))
@@ -2425,7 +2424,7 @@ class AbstractScalaEsScalaSparkSQL(prefix: String, readMetadata: jl.Boolean, pus
24252424
df.write.format("org.elasticsearch.spark.sql").options(es_conf).mode(SaveMode.Append).save("nested_fields_upsert_test")
24262425

24272426
resultDf = reader.load("nested_fields_upsert_test")
2428-
samples = resultDf.select("samples").where("id = '1'").first().getAs[IndexedSeq[Row]](0)
2427+
samples = resultDf.select("samples").where("id = '1'").first().getAs[scala.collection.IndexedSeq[Row]](0)
24292428
assertEquals(2, samples.size)
24302429
assertEquals("goodbye", samples(0).get(0))
24312430
assertEquals("world", samples(1).get(0))
@@ -2437,7 +2436,7 @@ class AbstractScalaEsScalaSparkSQL(prefix: String, readMetadata: jl.Boolean, pus
24372436
df.write.format("org.elasticsearch.spark.sql").options(es_conf).mode(SaveMode.Append).save("nested_fields_upsert_test")
24382437

24392438
resultDf = reader.load("nested_fields_upsert_test")
2440-
samples = resultDf.select("samples").where("id = '2'").first().getAs[IndexedSeq[Row]](0)
2439+
samples = resultDf.select("samples").where("id = '2'").first().getAs[scala.collection.IndexedSeq[Row]](0)
24412440
assertEquals(2, samples.size)
24422441
assertEquals("goodbye", samples(0).get(0))
24432442
assertEquals("again", samples(1).get(0))
@@ -2561,7 +2560,7 @@ class AbstractScalaEsScalaSparkSQL(prefix: String, readMetadata: jl.Boolean, pus
25612560
// don't use the sc.read.json/textFile to avoid the whole Hadoop madness
25622561
val path = Paths.get(uri)
25632562
// because Windows
2564-
val lines = Files.readAllLines(path, StandardCharsets.ISO_8859_1).asScala
2563+
val lines = Files.readAllLines(path, StandardCharsets.ISO_8859_1).asScala.toSeq
25652564
sc.parallelize(lines)
25662565
}
25672566
}

spark/sql-30/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSparkStructuredStreaming.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ import org.junit.runners.MethodSorters
6464
import org.junit.runners.Parameterized
6565
import org.junit.runners.Parameterized.Parameters
6666

67-
import scala.collection.JavaConversions.propertiesAsScalaMap
67+
import org.elasticsearch.spark.integration.ScalaUtils.propertiesAsScalaMap
6868
import scala.io.Codec
6969
import scala.io.Source
7070

@@ -84,7 +84,7 @@ object AbstractScalaEsSparkStructuredStreaming {
8484

8585
@BeforeClass
8686
def setup(): Unit = {
87-
sparkConf.setAll(TestSettings.TESTING_PROPS)
87+
sparkConf.setAll(propertiesAsScalaMap(TestSettings.TESTING_PROPS))
8888
spark = Some(
8989
SparkSession.builder()
9090
.config(sparkConf)

spark/sql-30/src/test/scala/org/elasticsearch/spark/sql/RowSerializationEventConverterTest.scala

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
package org.elasticsearch.spark.sql
2121

2222
import java.util
23-
2423
import org.apache.spark.sql.Row
2524
import org.apache.spark.sql.types.StringType
2625
import org.apache.spark.sql.types.StructField
@@ -29,10 +28,7 @@ import org.elasticsearch.hadoop.serialization.handler.write.SerializationFailure
2928
import org.elasticsearch.hadoop.serialization.handler.write.impl.SerializationEventConverter
3029
import org.elasticsearch.hadoop.util.DateUtils
3130
import org.elasticsearch.hadoop.util.StringUtils
32-
import org.hamcrest.Matchers.equalTo
33-
import org.junit.Assert.assertEquals
34-
import org.junit.Assert.assertThat
35-
import org.junit.Assert.assertTrue
31+
import org.junit.Assert.{assertEquals, assertTrue}
3632
import org.junit.Test
3733

3834
class RowSerializationEventConverterTest {
@@ -52,7 +48,9 @@ class RowSerializationEventConverterTest {
5248
new util.ArrayList[String])
5349

5450
val rawEvent = eventConverter.getRawEvent(iaeFailure)
55-
assertThat(rawEvent, equalTo("(StructType(StructField(field1,StringType,true), " +
51+
52+
// Scala 2.13 changed what toString() looks like, so can't do an exact match here:
53+
assertTrue(rawEvent.contains("(StructField(field1,StringType,true), " +
5654
"StructField(field2,StringType,true), StructField(field3,StringType,true)),[value1,value2,value3])"))
5755
val timestamp = eventConverter.getTimestamp(iaeFailure)
5856
assertTrue(StringUtils.hasText(timestamp))

0 commit comments

Comments
 (0)