From 2b0886d496c48dd0e3d7cc10dd04d9cefc87180d Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Tue, 25 Jan 2022 10:39:42 -0600 Subject: [PATCH 1/6] Making itests work with scala 2.13 --- .../AbstractHadoopBasicSparkTest.java | 2 +- .../integration/AbstractJavaEsSparkTest.java | 2 +- .../spark/integration/ScalaUtils.java | 38 +++++++++++++++++++ .../integration/AbstractScalaEsSpark.scala | 8 ++-- .../rdd/JDKCollectionConvertersCompat.scala | 2 +- 5 files changed, 45 insertions(+), 7 deletions(-) create mode 100644 spark/core/src/itest/java/org/elasticsearch/spark/integration/ScalaUtils.java diff --git a/spark/core/src/itest/java/org/elasticsearch/spark/integration/AbstractHadoopBasicSparkTest.java b/spark/core/src/itest/java/org/elasticsearch/spark/integration/AbstractHadoopBasicSparkTest.java index 5b698f20d..a8924f03b 100644 --- a/spark/core/src/itest/java/org/elasticsearch/spark/integration/AbstractHadoopBasicSparkTest.java +++ b/spark/core/src/itest/java/org/elasticsearch/spark/integration/AbstractHadoopBasicSparkTest.java @@ -56,7 +56,7 @@ import static org.hamcrest.Matchers.*; -import static scala.collection.JavaConversions.*; +import static org.elasticsearch.spark.integration.ScalaUtils.propertiesAsScalaMap; import scala.Tuple2; public class AbstractHadoopBasicSparkTest implements Serializable { diff --git a/spark/core/src/itest/java/org/elasticsearch/spark/integration/AbstractJavaEsSparkTest.java b/spark/core/src/itest/java/org/elasticsearch/spark/integration/AbstractJavaEsSparkTest.java index aa4831445..32e20e706 100644 --- a/spark/core/src/itest/java/org/elasticsearch/spark/integration/AbstractJavaEsSparkTest.java +++ b/spark/core/src/itest/java/org/elasticsearch/spark/integration/AbstractJavaEsSparkTest.java @@ -53,7 +53,7 @@ import static org.hamcrest.Matchers.*; -import static scala.collection.JavaConversions.*; +import static org.elasticsearch.spark.integration.ScalaUtils.propertiesAsScalaMap; import scala.Tuple2; @FixMethodOrder(MethodSorters.NAME_ASCENDING) diff --git a/spark/core/src/itest/java/org/elasticsearch/spark/integration/ScalaUtils.java b/spark/core/src/itest/java/org/elasticsearch/spark/integration/ScalaUtils.java new file mode 100644 index 000000000..0d8709fc7 --- /dev/null +++ b/spark/core/src/itest/java/org/elasticsearch/spark/integration/ScalaUtils.java @@ -0,0 +1,38 @@ +package org.elasticsearch.spark.integration; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.List; +import java.util.Properties; + +public class ScalaUtils { + /* + * Scala renamed scala.collection.JavaConversions to jdk.CollectionConverters in 2.13. + */ + public static scala.collection.Map propertiesAsScalaMap(Properties props) { + Class conversionsClass; + try { + conversionsClass = Class.forName("scala.collection.JavaConversions"); + } catch (ClassNotFoundException e) { + try { + conversionsClass = Class.forName("jdk.CollectionConverters"); + } catch (ClassNotFoundException classNotFoundException) { + throw new RuntimeException("No collection converter class found"); + } + } + Method method; + try { + method = conversionsClass.getMethod("propertiesAsScalaMap", Properties.class); + } catch (NoSuchMethodException e) { + throw new RuntimeException("No propertiesAsScalaMap method on " + conversionsClass); + } + try { + Object result = method.invoke(null, props); + return (scala.collection.Map) result; + } catch (IllegalAccessException e) { + throw new RuntimeException(e); + } catch (InvocationTargetException e) { + throw new RuntimeException(e); + } + } +} diff --git a/spark/core/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSpark.scala b/spark/core/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSpark.scala index e3bf73e98..16edb4434 100644 --- a/spark/core/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSpark.scala +++ b/spark/core/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSpark.scala @@ -80,8 +80,8 @@ import org.junit.runner.RunWith import org.junit.runners.Parameterized import org.junit.runners.Parameterized.Parameters -import scala.collection.JavaConversions.propertiesAsScalaMap -import scala.collection.JavaConverters.asScalaBufferConverter +import org.elasticsearch.spark.integration.ScalaUtils.propertiesAsScalaMap +import org.elasticsearch.spark.rdd.JDKCollectionConvertersCompat.Converters._ object AbstractScalaEsScalaSpark { @transient val conf = new SparkConf() @@ -95,7 +95,7 @@ object AbstractScalaEsScalaSpark { @BeforeClass def setup() { - conf.setAll(TestSettings.TESTING_PROPS); + conf.setAll(propertiesAsScalaMap(TestSettings.TESTING_PROPS)); sc = new SparkContext(conf) } @@ -141,7 +141,7 @@ class AbstractScalaEsScalaSpark(prefix: String, readMetadata: jl.Boolean) extend // don't use the sc.read.json/textFile to avoid the whole Hadoop madness val path = Paths.get(uri) // because Windows - val lines = Files.readAllLines(path, StandardCharsets.ISO_8859_1).asScala + val lines: Seq[String] = Files.readAllLines(path, StandardCharsets.ISO_8859_1).asScala.toSeq sc.parallelize(lines) } diff --git a/spark/core/src/main/scala/org/elasticsearch/spark/rdd/JDKCollectionConvertersCompat.scala b/spark/core/src/main/scala/org/elasticsearch/spark/rdd/JDKCollectionConvertersCompat.scala index c32b379ea..d41998a5d 100644 --- a/spark/core/src/main/scala/org/elasticsearch/spark/rdd/JDKCollectionConvertersCompat.scala +++ b/spark/core/src/main/scala/org/elasticsearch/spark/rdd/JDKCollectionConvertersCompat.scala @@ -24,7 +24,7 @@ package org.elasticsearch.spark.rdd * warning in any Scala version. * From https://github.com/scala/scala-collection-compat/issues/208#issuecomment-497735669 */ -private[rdd] object JDKCollectionConvertersCompat { +private[elasticsearch] object JDKCollectionConvertersCompat { object Scope1 { object jdk { type CollectionConverters = Int From 5e6602d6abb67c6cd36edf1545bb997ce852fab7 Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Tue, 25 Jan 2022 11:17:31 -0600 Subject: [PATCH 2/6] Getting spark 3 itests to compile --- ...tractScalaEsSparkStructuredStreaming.scala | 5 ++- .../AbstractJavaEsSparkSQLTest.java | 35 ++++++++++++++++- .../AbstractJavaEsSparkStreamingTest.java | 2 +- .../spark/integration/ScalaUtils.java | 38 +++++++++++++++++++ ...actJavaEsSparkStructuredStreamingTest.java | 2 +- .../AbstractScalaEsScalaSparkStreaming.scala | 4 +- .../integration/AbstractScalaEsSparkSQL.scala | 13 +++---- ...tractScalaEsSparkStructuredStreaming.scala | 4 +- 8 files changed, 87 insertions(+), 16 deletions(-) create mode 100644 spark/sql-30/src/itest/java/org/elasticsearch/spark/integration/ScalaUtils.java diff --git a/spark/sql-20/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSparkStructuredStreaming.scala b/spark/sql-20/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSparkStructuredStreaming.scala index a707c447a..a0206458b 100644 --- a/spark/sql-20/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSparkStructuredStreaming.scala +++ b/spark/sql-20/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSparkStructuredStreaming.scala @@ -65,6 +65,7 @@ import org.junit.runners.Parameterized import org.junit.runners.Parameterized.Parameters import scala.collection.JavaConversions.propertiesAsScalaMap +import scala.collection.JavaConverters.asScalaBufferConverter import scala.io.Codec import scala.io.Source @@ -622,8 +623,8 @@ class AbstractScalaEsSparkStructuredStreaming(prefix: String, something: Boolean searchResult = RestUtils.get(target + "/_search?version=true") val result: java.util.Map[String, Object] = new ObjectMapper().readValue(searchResult, classOf[java.util.Map[String, Object]]) val hits = result.get("hits").asInstanceOf[java.util.Map[String, Object]].get("hits").asInstanceOf[java.util.List[java.util.Map[String, - Object]]] - hits.forEach(hit => { + Object]]].asScala + hits.foreach(hit => { hit.get("_id").asInstanceOf[String] match { case "1" => { assertEquals(1, hit.get("_version")) diff --git a/spark/sql-30/src/itest/java/org/elasticsearch/spark/integration/AbstractJavaEsSparkSQLTest.java b/spark/sql-30/src/itest/java/org/elasticsearch/spark/integration/AbstractJavaEsSparkSQLTest.java index f7f8d9b1d..0bcfee767 100644 --- a/spark/sql-30/src/itest/java/org/elasticsearch/spark/integration/AbstractJavaEsSparkSQLTest.java +++ b/spark/sql-30/src/itest/java/org/elasticsearch/spark/integration/AbstractJavaEsSparkSQLTest.java @@ -19,12 +19,15 @@ package org.elasticsearch.spark.integration; import java.io.Serializable; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.sql.Timestamp; import java.util.List; +import java.util.Properties; import java.util.concurrent.TimeUnit; import javax.xml.bind.DatatypeConverter; @@ -63,7 +66,7 @@ import static org.hamcrest.Matchers.*; -import static scala.collection.JavaConversions.*; +import static org.elasticsearch.spark.integration.ScalaUtils.propertiesAsScalaMap; @FixMethodOrder(MethodSorters.NAME_ASCENDING) public class AbstractJavaEsSparkSQLTest implements Serializable { @@ -208,4 +211,34 @@ public Row call(String[] r) throws Exception { return sqc.createDataFrame(rowData, schema); } + + /* + * Scala renamed scala.collection.JavaConversions to jdk.CollectionConverters in 2.13. + */ + public static scala.collection.Map propertiesAsScalaMap(Properties props) { + Class conversionsClass; + try { + conversionsClass = Class.forName("scala.collection.JavaConversions"); + } catch (ClassNotFoundException e) { + try { + conversionsClass = Class.forName("jdk.CollectionConverters"); + } catch (ClassNotFoundException classNotFoundException) { + throw new RuntimeException("No collection converter class found"); + } + } + Method method; + try { + method = conversionsClass.getMethod("propertiesAsScalaMap", Properties.class); + } catch (NoSuchMethodException e) { + throw new RuntimeException("No propertiesAsScalaMap method on " + conversionsClass); + } + try { + Object result = method.invoke(null, props); + return (scala.collection.Map) result; + } catch (IllegalAccessException e) { + throw new RuntimeException(e); + } catch (InvocationTargetException e) { + throw new RuntimeException(e); + } + } } diff --git a/spark/sql-30/src/itest/java/org/elasticsearch/spark/integration/AbstractJavaEsSparkStreamingTest.java b/spark/sql-30/src/itest/java/org/elasticsearch/spark/integration/AbstractJavaEsSparkStreamingTest.java index d551398d0..ce4f1681a 100644 --- a/spark/sql-30/src/itest/java/org/elasticsearch/spark/integration/AbstractJavaEsSparkStreamingTest.java +++ b/spark/sql-30/src/itest/java/org/elasticsearch/spark/integration/AbstractJavaEsSparkStreamingTest.java @@ -78,7 +78,7 @@ import static org.elasticsearch.hadoop.util.TestUtils.resource; import static org.hamcrest.Matchers.*; import static org.junit.Assert.*; -import static scala.collection.JavaConversions.propertiesAsScalaMap; +import static org.elasticsearch.spark.integration.ScalaUtils.propertiesAsScalaMap; @FixMethodOrder(MethodSorters.NAME_ASCENDING) @RunWith(Parameterized.class) diff --git a/spark/sql-30/src/itest/java/org/elasticsearch/spark/integration/ScalaUtils.java b/spark/sql-30/src/itest/java/org/elasticsearch/spark/integration/ScalaUtils.java new file mode 100644 index 000000000..0d8709fc7 --- /dev/null +++ b/spark/sql-30/src/itest/java/org/elasticsearch/spark/integration/ScalaUtils.java @@ -0,0 +1,38 @@ +package org.elasticsearch.spark.integration; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.List; +import java.util.Properties; + +public class ScalaUtils { + /* + * Scala renamed scala.collection.JavaConversions to jdk.CollectionConverters in 2.13. + */ + public static scala.collection.Map propertiesAsScalaMap(Properties props) { + Class conversionsClass; + try { + conversionsClass = Class.forName("scala.collection.JavaConversions"); + } catch (ClassNotFoundException e) { + try { + conversionsClass = Class.forName("jdk.CollectionConverters"); + } catch (ClassNotFoundException classNotFoundException) { + throw new RuntimeException("No collection converter class found"); + } + } + Method method; + try { + method = conversionsClass.getMethod("propertiesAsScalaMap", Properties.class); + } catch (NoSuchMethodException e) { + throw new RuntimeException("No propertiesAsScalaMap method on " + conversionsClass); + } + try { + Object result = method.invoke(null, props); + return (scala.collection.Map) result; + } catch (IllegalAccessException e) { + throw new RuntimeException(e); + } catch (InvocationTargetException e) { + throw new RuntimeException(e); + } + } +} diff --git a/spark/sql-30/src/itest/scala/org/elasticsearch/spark/integration/AbstractJavaEsSparkStructuredStreamingTest.java b/spark/sql-30/src/itest/scala/org/elasticsearch/spark/integration/AbstractJavaEsSparkStructuredStreamingTest.java index 654aea416..3ec67b7c0 100644 --- a/spark/sql-30/src/itest/scala/org/elasticsearch/spark/integration/AbstractJavaEsSparkStructuredStreamingTest.java +++ b/spark/sql-30/src/itest/scala/org/elasticsearch/spark/integration/AbstractJavaEsSparkStructuredStreamingTest.java @@ -60,7 +60,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; -import static scala.collection.JavaConversions.propertiesAsScalaMap; +import static org.elasticsearch.spark.integration.ScalaUtils.propertiesAsScalaMap; @FixMethodOrder(MethodSorters.NAME_ASCENDING) @RunWith(Parameterized.class) diff --git a/spark/sql-30/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsScalaSparkStreaming.scala b/spark/sql-30/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsScalaSparkStreaming.scala index ff5d76880..6db9081c9 100644 --- a/spark/sql-30/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsScalaSparkStreaming.scala +++ b/spark/sql-30/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsScalaSparkStreaming.scala @@ -46,7 +46,7 @@ import org.junit.runner.RunWith import org.junit.runners.Parameterized.Parameters import org.junit.runners.{MethodSorters, Parameterized} -import scala.collection.JavaConversions.propertiesAsScalaMap +import org.elasticsearch.spark.integration.ScalaUtils.propertiesAsScalaMap import scala.collection.mutable import scala.reflect.ClassTag @@ -62,7 +62,7 @@ object AbstractScalaEsScalaSparkStreaming { @BeforeClass def setup(): Unit = { - conf.setAll(TestSettings.TESTING_PROPS) + conf.setAll(propertiesAsScalaMap(TestSettings.TESTING_PROPS)) sc = new SparkContext(conf) } diff --git a/spark/sql-30/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSparkSQL.scala b/spark/sql-30/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSparkSQL.scala index da2087571..755869d86 100644 --- a/spark/sql-30/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSparkSQL.scala +++ b/spark/sql-30/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSparkSQL.scala @@ -27,9 +27,8 @@ import java.nio.file.Paths import java.sql.Timestamp import java.{util => ju} import java.util.concurrent.TimeUnit -import scala.collection.JavaConversions.propertiesAsScalaMap -import scala.collection.JavaConverters.asScalaBufferConverter -import scala.collection.JavaConverters.mapAsJavaMapConverter +import org.elasticsearch.spark.integration.ScalaUtils.propertiesAsScalaMap +import org.elasticsearch.spark.rdd.JDKCollectionConvertersCompat.Converters._ import scala.collection.Map import scala.collection.mutable.ArrayBuffer import org.apache.spark.SparkConf @@ -116,7 +115,7 @@ object AbstractScalaEsScalaSparkSQL { @BeforeClass def setup() { - conf.setAll(TestSettings.TESTING_PROPS); + conf.setAll(propertiesAsScalaMap(TestSettings.TESTING_PROPS)); sc = new SparkContext(conf) sqc = SparkSession.builder().config(conf).getOrCreate().sqlContext @@ -231,7 +230,7 @@ class AbstractScalaEsScalaSparkSQL(prefix: String, readMetadata: jl.Boolean, pus @Test def test1KryoScalaEsRow() { val kryo = SparkUtils.sparkSerializer(sc.getConf) - val row = new ScalaEsRow(new ArrayBuffer() ++= StringUtils.tokenize("foo,bar,tar").asScala) + val row = new ScalaEsRow((new ArrayBuffer() ++= StringUtils.tokenize("foo,bar,tar").asScala).toSeq) val storage = Array.ofDim[Byte](512) val output = new KryoOutput(storage) @@ -733,7 +732,7 @@ class AbstractScalaEsScalaSparkSQL(prefix: String, readMetadata: jl.Boolean, pus def testEsDataFrame3WriteWithRichMapping() { val path = Paths.get(AbstractScalaEsScalaSparkSQL.testData.sampleArtistsDatUri()) // because Windows... - val lines = Files.readAllLines(path, StandardCharsets.ISO_8859_1).asScala + val lines = Files.readAllLines(path, StandardCharsets.ISO_8859_1).asScala.toSeq val data = sc.parallelize(lines) @@ -2561,7 +2560,7 @@ class AbstractScalaEsScalaSparkSQL(prefix: String, readMetadata: jl.Boolean, pus // don't use the sc.read.json/textFile to avoid the whole Hadoop madness val path = Paths.get(uri) // because Windows - val lines = Files.readAllLines(path, StandardCharsets.ISO_8859_1).asScala + val lines = Files.readAllLines(path, StandardCharsets.ISO_8859_1).asScala.toSeq sc.parallelize(lines) } } \ No newline at end of file diff --git a/spark/sql-30/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSparkStructuredStreaming.scala b/spark/sql-30/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSparkStructuredStreaming.scala index 5afaae8e1..cfa1de779 100644 --- a/spark/sql-30/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSparkStructuredStreaming.scala +++ b/spark/sql-30/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSparkStructuredStreaming.scala @@ -64,7 +64,7 @@ import org.junit.runners.MethodSorters import org.junit.runners.Parameterized import org.junit.runners.Parameterized.Parameters -import scala.collection.JavaConversions.propertiesAsScalaMap +import org.elasticsearch.spark.integration.ScalaUtils.propertiesAsScalaMap import scala.io.Codec import scala.io.Source @@ -84,7 +84,7 @@ object AbstractScalaEsSparkStructuredStreaming { @BeforeClass def setup(): Unit = { - sparkConf.setAll(TestSettings.TESTING_PROPS) + sparkConf.setAll(propertiesAsScalaMap(TestSettings.TESTING_PROPS)) spark = Some( SparkSession.builder() .config(sparkConf) From 132a7cfbc08fa00e7311644699fdb52b88e27fe5 Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Tue, 25 Jan 2022 11:20:17 -0600 Subject: [PATCH 3/6] Removing unused code --- .../AbstractJavaEsSparkSQLTest.java | 33 ------------------- 1 file changed, 33 deletions(-) diff --git a/spark/sql-30/src/itest/java/org/elasticsearch/spark/integration/AbstractJavaEsSparkSQLTest.java b/spark/sql-30/src/itest/java/org/elasticsearch/spark/integration/AbstractJavaEsSparkSQLTest.java index 0bcfee767..00cb02640 100644 --- a/spark/sql-30/src/itest/java/org/elasticsearch/spark/integration/AbstractJavaEsSparkSQLTest.java +++ b/spark/sql-30/src/itest/java/org/elasticsearch/spark/integration/AbstractJavaEsSparkSQLTest.java @@ -19,15 +19,12 @@ package org.elasticsearch.spark.integration; import java.io.Serializable; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; import java.sql.Timestamp; import java.util.List; -import java.util.Properties; import java.util.concurrent.TimeUnit; import javax.xml.bind.DatatypeConverter; @@ -211,34 +208,4 @@ public Row call(String[] r) throws Exception { return sqc.createDataFrame(rowData, schema); } - - /* - * Scala renamed scala.collection.JavaConversions to jdk.CollectionConverters in 2.13. - */ - public static scala.collection.Map propertiesAsScalaMap(Properties props) { - Class conversionsClass; - try { - conversionsClass = Class.forName("scala.collection.JavaConversions"); - } catch (ClassNotFoundException e) { - try { - conversionsClass = Class.forName("jdk.CollectionConverters"); - } catch (ClassNotFoundException classNotFoundException) { - throw new RuntimeException("No collection converter class found"); - } - } - Method method; - try { - method = conversionsClass.getMethod("propertiesAsScalaMap", Properties.class); - } catch (NoSuchMethodException e) { - throw new RuntimeException("No propertiesAsScalaMap method on " + conversionsClass); - } - try { - Object result = method.invoke(null, props); - return (scala.collection.Map) result; - } catch (IllegalAccessException e) { - throw new RuntimeException(e); - } catch (InvocationTargetException e) { - throw new RuntimeException(e); - } - } } From 5ca89e49d8a7637a7710001783c15fbfb058e370 Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Tue, 25 Jan 2022 13:22:34 -0600 Subject: [PATCH 4/6] Handling change in toString() --- .../spark/sql/RowSerializationEventConverterTest.scala | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/spark/sql-30/src/test/scala/org/elasticsearch/spark/sql/RowSerializationEventConverterTest.scala b/spark/sql-30/src/test/scala/org/elasticsearch/spark/sql/RowSerializationEventConverterTest.scala index 3f7ff7bf3..74138e27c 100644 --- a/spark/sql-30/src/test/scala/org/elasticsearch/spark/sql/RowSerializationEventConverterTest.scala +++ b/spark/sql-30/src/test/scala/org/elasticsearch/spark/sql/RowSerializationEventConverterTest.scala @@ -20,7 +20,6 @@ package org.elasticsearch.spark.sql import java.util - import org.apache.spark.sql.Row import org.apache.spark.sql.types.StringType import org.apache.spark.sql.types.StructField @@ -29,10 +28,7 @@ import org.elasticsearch.hadoop.serialization.handler.write.SerializationFailure import org.elasticsearch.hadoop.serialization.handler.write.impl.SerializationEventConverter import org.elasticsearch.hadoop.util.DateUtils import org.elasticsearch.hadoop.util.StringUtils -import org.hamcrest.Matchers.equalTo -import org.junit.Assert.assertEquals -import org.junit.Assert.assertThat -import org.junit.Assert.assertTrue +import org.junit.Assert.{assertEquals, assertTrue} import org.junit.Test class RowSerializationEventConverterTest { @@ -52,7 +48,9 @@ class RowSerializationEventConverterTest { new util.ArrayList[String]) val rawEvent = eventConverter.getRawEvent(iaeFailure) - assertThat(rawEvent, equalTo("(StructType(StructField(field1,StringType,true), " + + + // Scala 2.13 changed what toString() looks like, so can't do an exact match here: + assertTrue(rawEvent.contains("(StructField(field1,StringType,true), " + "StructField(field2,StringType,true), StructField(field3,StringType,true)),[value1,value2,value3])")) val timestamp = eventConverter.getTimestamp(iaeFailure) assertTrue(StringUtils.hasText(timestamp)) From 79afc47ae3ab1ad2c14ae1937f8c5296326ff8e2 Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Tue, 25 Jan 2022 18:05:57 -0600 Subject: [PATCH 5/6] Removing reflection code that did not work --- .../spark/integration/ScalaUtils.java | 38 ++++++------------ .../spark/integration/ScalaUtils.java | 39 ++++++------------- 2 files changed, 22 insertions(+), 55 deletions(-) diff --git a/spark/core/src/itest/java/org/elasticsearch/spark/integration/ScalaUtils.java b/spark/core/src/itest/java/org/elasticsearch/spark/integration/ScalaUtils.java index 0d8709fc7..3161e73f9 100644 --- a/spark/core/src/itest/java/org/elasticsearch/spark/integration/ScalaUtils.java +++ b/spark/core/src/itest/java/org/elasticsearch/spark/integration/ScalaUtils.java @@ -1,38 +1,22 @@ package org.elasticsearch.spark.integration; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.util.List; +import scala.collection.mutable.Map; +import scala.collection.mutable.HashMap; + import java.util.Properties; public class ScalaUtils { /* - * Scala renamed scala.collection.JavaConversions to jdk.CollectionConverters in 2.13. + * Scala removed scala.collection.JavaConversions.propertiesAsScalaMap() in 2.13, replacing it with an implicit + * jdk.CollectionConverters.asScala. This is an attempt to get a method that works in 2.10-2.13. It can be removed and replaced with + * jdk.CollectionConverters once we no longer support scala older than 2.13. */ public static scala.collection.Map propertiesAsScalaMap(Properties props) { - Class conversionsClass; - try { - conversionsClass = Class.forName("scala.collection.JavaConversions"); - } catch (ClassNotFoundException e) { - try { - conversionsClass = Class.forName("jdk.CollectionConverters"); - } catch (ClassNotFoundException classNotFoundException) { - throw new RuntimeException("No collection converter class found"); - } - } - Method method; - try { - method = conversionsClass.getMethod("propertiesAsScalaMap", Properties.class); - } catch (NoSuchMethodException e) { - throw new RuntimeException("No propertiesAsScalaMap method on " + conversionsClass); - } - try { - Object result = method.invoke(null, props); - return (scala.collection.Map) result; - } catch (IllegalAccessException e) { - throw new RuntimeException(e); - } catch (InvocationTargetException e) { - throw new RuntimeException(e); + Map scalaMap = new HashMap(); + for (java.util.Map.Entry entry : props.entrySet()) { + scalaMap.put(entry.getKey(), entry.getValue()); } + return scalaMap; } } + diff --git a/spark/sql-30/src/itest/java/org/elasticsearch/spark/integration/ScalaUtils.java b/spark/sql-30/src/itest/java/org/elasticsearch/spark/integration/ScalaUtils.java index 0d8709fc7..2c1b9ee48 100644 --- a/spark/sql-30/src/itest/java/org/elasticsearch/spark/integration/ScalaUtils.java +++ b/spark/sql-30/src/itest/java/org/elasticsearch/spark/integration/ScalaUtils.java @@ -1,38 +1,21 @@ package org.elasticsearch.spark.integration; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.util.List; +import scala.collection.mutable.Map; +import scala.collection.mutable.HashMap; + import java.util.Properties; public class ScalaUtils { /* - * Scala renamed scala.collection.JavaConversions to jdk.CollectionConverters in 2.13. + * Scala removed scala.collection.JavaConversions.propertiesAsScalaMap() in 2.13, replacing it with an implicit + * jdk.CollectionConverters.asScala. This is an attempt to get a method that works in 2.10-2.13. It can be removed and replaced with + * jdk.CollectionConverters once we no longer support scala older than 2.13. */ public static scala.collection.Map propertiesAsScalaMap(Properties props) { - Class conversionsClass; - try { - conversionsClass = Class.forName("scala.collection.JavaConversions"); - } catch (ClassNotFoundException e) { - try { - conversionsClass = Class.forName("jdk.CollectionConverters"); - } catch (ClassNotFoundException classNotFoundException) { - throw new RuntimeException("No collection converter class found"); - } - } - Method method; - try { - method = conversionsClass.getMethod("propertiesAsScalaMap", Properties.class); - } catch (NoSuchMethodException e) { - throw new RuntimeException("No propertiesAsScalaMap method on " + conversionsClass); - } - try { - Object result = method.invoke(null, props); - return (scala.collection.Map) result; - } catch (IllegalAccessException e) { - throw new RuntimeException(e); - } catch (InvocationTargetException e) { - throw new RuntimeException(e); + Map scalaMap = new HashMap(); + for (java.util.Map.Entry entry : props.entrySet()) { + scalaMap.put(entry.getKey(), entry.getValue()); } + return scalaMap; } -} +} \ No newline at end of file From 6b1f7f666f87d97edadc736fc9eb5834c4dd48cb Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Wed, 26 Jan 2022 09:35:00 -0600 Subject: [PATCH 6/6] Fixing ClassCastExceptions in integration tests --- .../spark/integration/AbstractScalaEsSparkSQL.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/spark/sql-30/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSparkSQL.scala b/spark/sql-30/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSparkSQL.scala index 755869d86..12cc701fc 100644 --- a/spark/sql-30/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSparkSQL.scala +++ b/spark/sql-30/src/itest/scala/org/elasticsearch/spark/integration/AbstractScalaEsSparkSQL.scala @@ -1613,7 +1613,7 @@ class AbstractScalaEsScalaSparkSQL(prefix: String, readMetadata: jl.Boolean, pus assertEquals("long", nested.asInstanceOf[ArrayType].elementType.typeName) val first = df.first - val vals = first.getStruct(0).getSeq[Seq[Long]](0)(0) + val vals = first.getStruct(0).getSeq[scala.collection.Seq[Long]](0)(0) assertEquals(50, vals(0)) assertEquals(32, vals(1)) } @@ -2365,7 +2365,7 @@ class AbstractScalaEsScalaSparkSQL(prefix: String, readMetadata: jl.Boolean, pus // No "es.read.field.include", so everything is included: var df = reader.load("read_field_include_test") - var result = df.select("features.hashtags").first().getAs[IndexedSeq[Row]](0) + var result = df.select("features.hashtags").first().getAs[scala.collection.IndexedSeq[Row]](0) assertEquals(2, result(0).size) assertEquals("hello", result(0).getAs("text")) assertEquals("2", result(0).getAs("count")) @@ -2412,7 +2412,7 @@ class AbstractScalaEsScalaSparkSQL(prefix: String, readMetadata: jl.Boolean, pus val reader = sqc.read.schema(schema).format("org.elasticsearch.spark.sql").option("es.read.field.as.array.include","samples") var resultDf = reader.load("nested_fields_upsert_test") assertEquals(2, resultDf.count()) - var samples = resultDf.select("samples").where("id = '2'").first().getAs[IndexedSeq[Row]](0) + var samples = resultDf.select("samples").where("id = '2'").first().getAs[scala.collection.IndexedSeq[Row]](0) assertEquals(2, samples.size) assertEquals("hello", samples(0).get(0)) assertEquals("world", samples(1).get(0)) @@ -2424,7 +2424,7 @@ class AbstractScalaEsScalaSparkSQL(prefix: String, readMetadata: jl.Boolean, pus df.write.format("org.elasticsearch.spark.sql").options(es_conf).mode(SaveMode.Append).save("nested_fields_upsert_test") resultDf = reader.load("nested_fields_upsert_test") - samples = resultDf.select("samples").where("id = '1'").first().getAs[IndexedSeq[Row]](0) + samples = resultDf.select("samples").where("id = '1'").first().getAs[scala.collection.IndexedSeq[Row]](0) assertEquals(2, samples.size) assertEquals("goodbye", samples(0).get(0)) assertEquals("world", samples(1).get(0)) @@ -2436,7 +2436,7 @@ class AbstractScalaEsScalaSparkSQL(prefix: String, readMetadata: jl.Boolean, pus df.write.format("org.elasticsearch.spark.sql").options(es_conf).mode(SaveMode.Append).save("nested_fields_upsert_test") resultDf = reader.load("nested_fields_upsert_test") - samples = resultDf.select("samples").where("id = '2'").first().getAs[IndexedSeq[Row]](0) + samples = resultDf.select("samples").where("id = '2'").first().getAs[scala.collection.IndexedSeq[Row]](0) assertEquals(2, samples.size) assertEquals("goodbye", samples(0).get(0)) assertEquals("again", samples(1).get(0))