diff --git a/spark/core/src/itest/java/org/elasticsearch/spark/integration/AbstractHadoopBasicSparkTest.java b/spark/core/src/itest/java/org/elasticsearch/spark/integration/AbstractHadoopBasicSparkTest.java index 5b698f20d..a8924f03b 100644 --- a/spark/core/src/itest/java/org/elasticsearch/spark/integration/AbstractHadoopBasicSparkTest.java +++ b/spark/core/src/itest/java/org/elasticsearch/spark/integration/AbstractHadoopBasicSparkTest.java @@ -56,7 +56,7 @@ import static org.hamcrest.Matchers.*; -import static scala.collection.JavaConversions.*; +import static org.elasticsearch.spark.integration.ScalaUtils.propertiesAsScalaMap; import scala.Tuple2; public class AbstractHadoopBasicSparkTest implements Serializable { diff --git a/spark/core/src/itest/java/org/elasticsearch/spark/integration/AbstractJavaEsSparkTest.java b/spark/core/src/itest/java/org/elasticsearch/spark/integration/AbstractJavaEsSparkTest.java index aa4831445..32e20e706 100644 --- a/spark/core/src/itest/java/org/elasticsearch/spark/integration/AbstractJavaEsSparkTest.java +++ b/spark/core/src/itest/java/org/elasticsearch/spark/integration/AbstractJavaEsSparkTest.java @@ -53,7 +53,7 @@ import static org.hamcrest.Matchers.*; -import static scala.collection.JavaConversions.*; +import static org.elasticsearch.spark.integration.ScalaUtils.propertiesAsScalaMap; import scala.Tuple2; @FixMethodOrder(MethodSorters.NAME_ASCENDING) diff --git a/spark/core/src/itest/java/org/elasticsearch/spark/integration/ScalaUtils.java b/spark/core/src/itest/java/org/elasticsearch/spark/integration/ScalaUtils.java new file mode 100644 index 000000000..3161e73f9 --- /dev/null +++ b/spark/core/src/itest/java/org/elasticsearch/spark/integration/ScalaUtils.java @@ -0,0 +1,22 @@ +package org.elasticsearch.spark.integration; + +import scala.collection.mutable.Map; +import scala.collection.mutable.HashMap; + +import java.util.Properties; + +public class ScalaUtils { + /* + * Scala removed scala.collection.JavaConversions.propertiesAsScalaMap() in 2.13, replacing it with an implicit + * jdk.CollectionConverters.asScala. This is an attempt to get a method that works in 2.10-2.13. It can be removed and replaced with + * jdk.CollectionConverters once we no longer support scala older than 2.13. + */ + public static scala.collection.Map propertiesAsScalaMap(Properties props) { + Map scalaMap = new HashMap(); + for (java.util.Map.Entry entry : props.entrySet()) { + scalaMap.put(entry.getKey(), entry.getValue()); + } + return scalaMap; + } +} + diff --git a/spark/core/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSpark.scala b/spark/core/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSpark.scala index e3bf73e98..16edb4434 100644 --- a/spark/core/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSpark.scala +++ b/spark/core/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSpark.scala @@ -80,8 +80,8 @@ import org.junit.runner.RunWith import org.junit.runners.Parameterized import org.junit.runners.Parameterized.Parameters -import scala.collection.JavaConversions.propertiesAsScalaMap -import scala.collection.JavaConverters.asScalaBufferConverter +import org.elasticsearch.spark.integration.ScalaUtils.propertiesAsScalaMap +import org.elasticsearch.spark.rdd.JDKCollectionConvertersCompat.Converters._ object AbstractScalaEsScalaSpark { @transient val conf = new SparkConf() @@ -95,7 +95,7 @@ object AbstractScalaEsScalaSpark { @BeforeClass def setup() { - conf.setAll(TestSettings.TESTING_PROPS); + conf.setAll(propertiesAsScalaMap(TestSettings.TESTING_PROPS)); sc = new SparkContext(conf) } @@ -141,7 +141,7 @@ class AbstractScalaEsScalaSpark(prefix: String, readMetadata: jl.Boolean) extend // don't use the sc.read.json/textFile to avoid the whole Hadoop madness val path = Paths.get(uri) // because Windows - val lines = Files.readAllLines(path, StandardCharsets.ISO_8859_1).asScala + val lines: Seq[String] = Files.readAllLines(path, StandardCharsets.ISO_8859_1).asScala.toSeq sc.parallelize(lines) } diff --git a/spark/core/src/main/scala/org/elasticsearch/spark/rdd/JDKCollectionConvertersCompat.scala b/spark/core/src/main/scala/org/elasticsearch/spark/rdd/JDKCollectionConvertersCompat.scala index c32b379ea..d41998a5d 100644 --- a/spark/core/src/main/scala/org/elasticsearch/spark/rdd/JDKCollectionConvertersCompat.scala +++ b/spark/core/src/main/scala/org/elasticsearch/spark/rdd/JDKCollectionConvertersCompat.scala @@ -24,7 +24,7 @@ package org.elasticsearch.spark.rdd * warning in any Scala version. * From https://github.com/scala/scala-collection-compat/issues/208#issuecomment-497735669 */ -private[rdd] object JDKCollectionConvertersCompat { +private[elasticsearch] object JDKCollectionConvertersCompat { object Scope1 { object jdk { type CollectionConverters = Int diff --git a/spark/sql-20/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSparkStructuredStreaming.scala b/spark/sql-20/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSparkStructuredStreaming.scala index a707c447a..a0206458b 100644 --- a/spark/sql-20/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSparkStructuredStreaming.scala +++ b/spark/sql-20/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSparkStructuredStreaming.scala @@ -65,6 +65,7 @@ import org.junit.runners.Parameterized import org.junit.runners.Parameterized.Parameters import scala.collection.JavaConversions.propertiesAsScalaMap +import scala.collection.JavaConverters.asScalaBufferConverter import scala.io.Codec import scala.io.Source @@ -622,8 +623,8 @@ class AbstractScalaEsSparkStructuredStreaming(prefix: String, something: Boolean searchResult = RestUtils.get(target + "/_search?version=true") val result: java.util.Map[String, Object] = new ObjectMapper().readValue(searchResult, classOf[java.util.Map[String, Object]]) val hits = result.get("hits").asInstanceOf[java.util.Map[String, Object]].get("hits").asInstanceOf[java.util.List[java.util.Map[String, - Object]]] - hits.forEach(hit => { + Object]]].asScala + hits.foreach(hit => { hit.get("_id").asInstanceOf[String] match { case "1" => { assertEquals(1, hit.get("_version")) diff --git a/spark/sql-30/src/itest/java/org/elasticsearch/spark/integration/AbstractJavaEsSparkSQLTest.java b/spark/sql-30/src/itest/java/org/elasticsearch/spark/integration/AbstractJavaEsSparkSQLTest.java index f7f8d9b1d..00cb02640 100644 --- a/spark/sql-30/src/itest/java/org/elasticsearch/spark/integration/AbstractJavaEsSparkSQLTest.java +++ b/spark/sql-30/src/itest/java/org/elasticsearch/spark/integration/AbstractJavaEsSparkSQLTest.java @@ -63,7 +63,7 @@ import static org.hamcrest.Matchers.*; -import static scala.collection.JavaConversions.*; +import static org.elasticsearch.spark.integration.ScalaUtils.propertiesAsScalaMap; @FixMethodOrder(MethodSorters.NAME_ASCENDING) public class AbstractJavaEsSparkSQLTest implements Serializable { diff --git a/spark/sql-30/src/itest/java/org/elasticsearch/spark/integration/AbstractJavaEsSparkStreamingTest.java b/spark/sql-30/src/itest/java/org/elasticsearch/spark/integration/AbstractJavaEsSparkStreamingTest.java index d551398d0..ce4f1681a 100644 --- a/spark/sql-30/src/itest/java/org/elasticsearch/spark/integration/AbstractJavaEsSparkStreamingTest.java +++ b/spark/sql-30/src/itest/java/org/elasticsearch/spark/integration/AbstractJavaEsSparkStreamingTest.java @@ -78,7 +78,7 @@ import static org.elasticsearch.hadoop.util.TestUtils.resource; import static org.hamcrest.Matchers.*; import static org.junit.Assert.*; -import static scala.collection.JavaConversions.propertiesAsScalaMap; +import static org.elasticsearch.spark.integration.ScalaUtils.propertiesAsScalaMap; @FixMethodOrder(MethodSorters.NAME_ASCENDING) @RunWith(Parameterized.class) diff --git a/spark/sql-30/src/itest/java/org/elasticsearch/spark/integration/ScalaUtils.java b/spark/sql-30/src/itest/java/org/elasticsearch/spark/integration/ScalaUtils.java new file mode 100644 index 000000000..2c1b9ee48 --- /dev/null +++ b/spark/sql-30/src/itest/java/org/elasticsearch/spark/integration/ScalaUtils.java @@ -0,0 +1,21 @@ +package org.elasticsearch.spark.integration; + +import scala.collection.mutable.Map; +import scala.collection.mutable.HashMap; + +import java.util.Properties; + +public class ScalaUtils { + /* + * Scala removed scala.collection.JavaConversions.propertiesAsScalaMap() in 2.13, replacing it with an implicit + * jdk.CollectionConverters.asScala. This is an attempt to get a method that works in 2.10-2.13. It can be removed and replaced with + * jdk.CollectionConverters once we no longer support scala older than 2.13. + */ + public static scala.collection.Map propertiesAsScalaMap(Properties props) { + Map scalaMap = new HashMap(); + for (java.util.Map.Entry entry : props.entrySet()) { + scalaMap.put(entry.getKey(), entry.getValue()); + } + return scalaMap; + } +} \ No newline at end of file diff --git a/spark/sql-30/src/itest/scala/org/elasticsearch/spark/integration/AbstractJavaEsSparkStructuredStreamingTest.java b/spark/sql-30/src/itest/scala/org/elasticsearch/spark/integration/AbstractJavaEsSparkStructuredStreamingTest.java index 654aea416..3ec67b7c0 100644 --- a/spark/sql-30/src/itest/scala/org/elasticsearch/spark/integration/AbstractJavaEsSparkStructuredStreamingTest.java +++ b/spark/sql-30/src/itest/scala/org/elasticsearch/spark/integration/AbstractJavaEsSparkStructuredStreamingTest.java @@ -60,7 +60,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; -import static scala.collection.JavaConversions.propertiesAsScalaMap; +import static org.elasticsearch.spark.integration.ScalaUtils.propertiesAsScalaMap; @FixMethodOrder(MethodSorters.NAME_ASCENDING) @RunWith(Parameterized.class) diff --git a/spark/sql-30/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsScalaSparkStreaming.scala b/spark/sql-30/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsScalaSparkStreaming.scala index ff5d76880..6db9081c9 100644 --- a/spark/sql-30/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsScalaSparkStreaming.scala +++ b/spark/sql-30/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsScalaSparkStreaming.scala @@ -46,7 +46,7 @@ import org.junit.runner.RunWith import org.junit.runners.Parameterized.Parameters import org.junit.runners.{MethodSorters, Parameterized} -import scala.collection.JavaConversions.propertiesAsScalaMap +import org.elasticsearch.spark.integration.ScalaUtils.propertiesAsScalaMap import scala.collection.mutable import scala.reflect.ClassTag @@ -62,7 +62,7 @@ object AbstractScalaEsScalaSparkStreaming { @BeforeClass def setup(): Unit = { - conf.setAll(TestSettings.TESTING_PROPS) + conf.setAll(propertiesAsScalaMap(TestSettings.TESTING_PROPS)) sc = new SparkContext(conf) } diff --git a/spark/sql-30/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSparkSQL.scala b/spark/sql-30/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSparkSQL.scala index da2087571..12cc701fc 100644 --- a/spark/sql-30/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSparkSQL.scala +++ b/spark/sql-30/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSparkSQL.scala @@ -27,9 +27,8 @@ import java.nio.file.Paths import java.sql.Timestamp import java.{util => ju} import java.util.concurrent.TimeUnit -import scala.collection.JavaConversions.propertiesAsScalaMap -import scala.collection.JavaConverters.asScalaBufferConverter -import scala.collection.JavaConverters.mapAsJavaMapConverter +import org.elasticsearch.spark.integration.ScalaUtils.propertiesAsScalaMap +import org.elasticsearch.spark.rdd.JDKCollectionConvertersCompat.Converters._ import scala.collection.Map import scala.collection.mutable.ArrayBuffer import org.apache.spark.SparkConf @@ -116,7 +115,7 @@ object AbstractScalaEsScalaSparkSQL { @BeforeClass def setup() { - conf.setAll(TestSettings.TESTING_PROPS); + conf.setAll(propertiesAsScalaMap(TestSettings.TESTING_PROPS)); sc = new SparkContext(conf) sqc = SparkSession.builder().config(conf).getOrCreate().sqlContext @@ -231,7 +230,7 @@ class AbstractScalaEsScalaSparkSQL(prefix: String, readMetadata: jl.Boolean, pus @Test def test1KryoScalaEsRow() { val kryo = SparkUtils.sparkSerializer(sc.getConf) - val row = new ScalaEsRow(new ArrayBuffer() ++= StringUtils.tokenize("foo,bar,tar").asScala) + val row = new ScalaEsRow((new ArrayBuffer() ++= StringUtils.tokenize("foo,bar,tar").asScala).toSeq) val storage = Array.ofDim[Byte](512) val output = new KryoOutput(storage) @@ -733,7 +732,7 @@ class AbstractScalaEsScalaSparkSQL(prefix: String, readMetadata: jl.Boolean, pus def testEsDataFrame3WriteWithRichMapping() { val path = Paths.get(AbstractScalaEsScalaSparkSQL.testData.sampleArtistsDatUri()) // because Windows... - val lines = Files.readAllLines(path, StandardCharsets.ISO_8859_1).asScala + val lines = Files.readAllLines(path, StandardCharsets.ISO_8859_1).asScala.toSeq val data = sc.parallelize(lines) @@ -1614,7 +1613,7 @@ class AbstractScalaEsScalaSparkSQL(prefix: String, readMetadata: jl.Boolean, pus assertEquals("long", nested.asInstanceOf[ArrayType].elementType.typeName) val first = df.first - val vals = first.getStruct(0).getSeq[Seq[Long]](0)(0) + val vals = first.getStruct(0).getSeq[scala.collection.Seq[Long]](0)(0) assertEquals(50, vals(0)) assertEquals(32, vals(1)) } @@ -2366,7 +2365,7 @@ class AbstractScalaEsScalaSparkSQL(prefix: String, readMetadata: jl.Boolean, pus // No "es.read.field.include", so everything is included: var df = reader.load("read_field_include_test") - var result = df.select("features.hashtags").first().getAs[IndexedSeq[Row]](0) + var result = df.select("features.hashtags").first().getAs[scala.collection.IndexedSeq[Row]](0) assertEquals(2, result(0).size) assertEquals("hello", result(0).getAs("text")) assertEquals("2", result(0).getAs("count")) @@ -2413,7 +2412,7 @@ class AbstractScalaEsScalaSparkSQL(prefix: String, readMetadata: jl.Boolean, pus val reader = sqc.read.schema(schema).format("org.elasticsearch.spark.sql").option("es.read.field.as.array.include","samples") var resultDf = reader.load("nested_fields_upsert_test") assertEquals(2, resultDf.count()) - var samples = resultDf.select("samples").where("id = '2'").first().getAs[IndexedSeq[Row]](0) + var samples = resultDf.select("samples").where("id = '2'").first().getAs[scala.collection.IndexedSeq[Row]](0) assertEquals(2, samples.size) assertEquals("hello", samples(0).get(0)) assertEquals("world", samples(1).get(0)) @@ -2425,7 +2424,7 @@ class AbstractScalaEsScalaSparkSQL(prefix: String, readMetadata: jl.Boolean, pus df.write.format("org.elasticsearch.spark.sql").options(es_conf).mode(SaveMode.Append).save("nested_fields_upsert_test") resultDf = reader.load("nested_fields_upsert_test") - samples = resultDf.select("samples").where("id = '1'").first().getAs[IndexedSeq[Row]](0) + samples = resultDf.select("samples").where("id = '1'").first().getAs[scala.collection.IndexedSeq[Row]](0) assertEquals(2, samples.size) assertEquals("goodbye", samples(0).get(0)) assertEquals("world", samples(1).get(0)) @@ -2437,7 +2436,7 @@ class AbstractScalaEsScalaSparkSQL(prefix: String, readMetadata: jl.Boolean, pus df.write.format("org.elasticsearch.spark.sql").options(es_conf).mode(SaveMode.Append).save("nested_fields_upsert_test") resultDf = reader.load("nested_fields_upsert_test") - samples = resultDf.select("samples").where("id = '2'").first().getAs[IndexedSeq[Row]](0) + samples = resultDf.select("samples").where("id = '2'").first().getAs[scala.collection.IndexedSeq[Row]](0) assertEquals(2, samples.size) assertEquals("goodbye", samples(0).get(0)) assertEquals("again", samples(1).get(0)) @@ -2561,7 +2560,7 @@ class AbstractScalaEsScalaSparkSQL(prefix: String, readMetadata: jl.Boolean, pus // don't use the sc.read.json/textFile to avoid the whole Hadoop madness val path = Paths.get(uri) // because Windows - val lines = Files.readAllLines(path, StandardCharsets.ISO_8859_1).asScala + val lines = Files.readAllLines(path, StandardCharsets.ISO_8859_1).asScala.toSeq sc.parallelize(lines) } } \ No newline at end of file diff --git a/spark/sql-30/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSparkStructuredStreaming.scala b/spark/sql-30/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSparkStructuredStreaming.scala index 5afaae8e1..cfa1de779 100644 --- a/spark/sql-30/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSparkStructuredStreaming.scala +++ b/spark/sql-30/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSparkStructuredStreaming.scala @@ -64,7 +64,7 @@ import org.junit.runners.MethodSorters import org.junit.runners.Parameterized import org.junit.runners.Parameterized.Parameters -import scala.collection.JavaConversions.propertiesAsScalaMap +import org.elasticsearch.spark.integration.ScalaUtils.propertiesAsScalaMap import scala.io.Codec import scala.io.Source @@ -84,7 +84,7 @@ object AbstractScalaEsSparkStructuredStreaming { @BeforeClass def setup(): Unit = { - sparkConf.setAll(TestSettings.TESTING_PROPS) + sparkConf.setAll(propertiesAsScalaMap(TestSettings.TESTING_PROPS)) spark = Some( SparkSession.builder() .config(sparkConf) diff --git a/spark/sql-30/src/test/scala/org/elasticsearch/spark/sql/RowSerializationEventConverterTest.scala b/spark/sql-30/src/test/scala/org/elasticsearch/spark/sql/RowSerializationEventConverterTest.scala index 3f7ff7bf3..74138e27c 100644 --- a/spark/sql-30/src/test/scala/org/elasticsearch/spark/sql/RowSerializationEventConverterTest.scala +++ b/spark/sql-30/src/test/scala/org/elasticsearch/spark/sql/RowSerializationEventConverterTest.scala @@ -20,7 +20,6 @@ package org.elasticsearch.spark.sql import java.util - import org.apache.spark.sql.Row import org.apache.spark.sql.types.StringType import org.apache.spark.sql.types.StructField @@ -29,10 +28,7 @@ import org.elasticsearch.hadoop.serialization.handler.write.SerializationFailure import org.elasticsearch.hadoop.serialization.handler.write.impl.SerializationEventConverter import org.elasticsearch.hadoop.util.DateUtils import org.elasticsearch.hadoop.util.StringUtils -import org.hamcrest.Matchers.equalTo -import org.junit.Assert.assertEquals -import org.junit.Assert.assertThat -import org.junit.Assert.assertTrue +import org.junit.Assert.{assertEquals, assertTrue} import org.junit.Test class RowSerializationEventConverterTest { @@ -52,7 +48,9 @@ class RowSerializationEventConverterTest { new util.ArrayList[String]) val rawEvent = eventConverter.getRawEvent(iaeFailure) - assertThat(rawEvent, equalTo("(StructType(StructField(field1,StringType,true), " + + + // Scala 2.13 changed what toString() looks like, so can't do an exact match here: + assertTrue(rawEvent.contains("(StructField(field1,StringType,true), " + "StructField(field2,StringType,true), StructField(field3,StringType,true)),[value1,value2,value3])")) val timestamp = eventConverter.getTimestamp(iaeFailure) assertTrue(StringUtils.hasText(timestamp))