Skip to content

Fixing scala 2.13 problems in integration tests #1880

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jan 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@

import static org.hamcrest.Matchers.*;

import static scala.collection.JavaConversions.*;
import static org.elasticsearch.spark.integration.ScalaUtils.propertiesAsScalaMap;
import scala.Tuple2;

public class AbstractHadoopBasicSparkTest implements Serializable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@

import static org.hamcrest.Matchers.*;

import static scala.collection.JavaConversions.*;
import static org.elasticsearch.spark.integration.ScalaUtils.propertiesAsScalaMap;
import scala.Tuple2;

@FixMethodOrder(MethodSorters.NAME_ASCENDING)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package org.elasticsearch.spark.integration;

import scala.collection.mutable.Map;
import scala.collection.mutable.HashMap;

import java.util.Properties;

public class ScalaUtils {
/*
* Scala removed scala.collection.JavaConversions.propertiesAsScalaMap() in 2.13, replacing it with an implicit
* jdk.CollectionConverters.asScala. This is an attempt to get a method that works in 2.10-2.13. It can be removed and replaced with
* jdk.CollectionConverters once we no longer support scala older than 2.13.
*/
public static scala.collection.Map<String, String> propertiesAsScalaMap(Properties props) {
Map scalaMap = new HashMap();
for (java.util.Map.Entry<Object, Object> entry : props.entrySet()) {
scalaMap.put(entry.getKey(), entry.getValue());
}
return scalaMap;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ import org.junit.runner.RunWith
import org.junit.runners.Parameterized
import org.junit.runners.Parameterized.Parameters

import scala.collection.JavaConversions.propertiesAsScalaMap
import scala.collection.JavaConverters.asScalaBufferConverter
import org.elasticsearch.spark.integration.ScalaUtils.propertiesAsScalaMap
import org.elasticsearch.spark.rdd.JDKCollectionConvertersCompat.Converters._

object AbstractScalaEsScalaSpark {
@transient val conf = new SparkConf()
Expand All @@ -95,7 +95,7 @@ object AbstractScalaEsScalaSpark {

@BeforeClass
def setup() {
conf.setAll(TestSettings.TESTING_PROPS);
conf.setAll(propertiesAsScalaMap(TestSettings.TESTING_PROPS));
sc = new SparkContext(conf)
}

Expand Down Expand Up @@ -141,7 +141,7 @@ class AbstractScalaEsScalaSpark(prefix: String, readMetadata: jl.Boolean) extend
// don't use the sc.read.json/textFile to avoid the whole Hadoop madness
val path = Paths.get(uri)
// because Windows
val lines = Files.readAllLines(path, StandardCharsets.ISO_8859_1).asScala
val lines: Seq[String] = Files.readAllLines(path, StandardCharsets.ISO_8859_1).asScala.toSeq
sc.parallelize(lines)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ package org.elasticsearch.spark.rdd
* warning in any Scala version.
* From https://github.com/scala/scala-collection-compat/issues/208#issuecomment-497735669
*/
private[rdd] object JDKCollectionConvertersCompat {
private[elasticsearch] object JDKCollectionConvertersCompat {
object Scope1 {
object jdk {
type CollectionConverters = Int
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ import org.junit.runners.Parameterized
import org.junit.runners.Parameterized.Parameters

import scala.collection.JavaConversions.propertiesAsScalaMap
import scala.collection.JavaConverters.asScalaBufferConverter
import scala.io.Codec
import scala.io.Source

Expand Down Expand Up @@ -622,8 +623,8 @@ class AbstractScalaEsSparkStructuredStreaming(prefix: String, something: Boolean
searchResult = RestUtils.get(target + "/_search?version=true")
val result: java.util.Map[String, Object] = new ObjectMapper().readValue(searchResult, classOf[java.util.Map[String, Object]])
val hits = result.get("hits").asInstanceOf[java.util.Map[String, Object]].get("hits").asInstanceOf[java.util.List[java.util.Map[String,
Object]]]
hits.forEach(hit => {
Object]]].asScala
hits.foreach(hit => {
hit.get("_id").asInstanceOf[String] match {
case "1" => {
assertEquals(1, hit.get("_version"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@

import static org.hamcrest.Matchers.*;

import static scala.collection.JavaConversions.*;
import static org.elasticsearch.spark.integration.ScalaUtils.propertiesAsScalaMap;

@FixMethodOrder(MethodSorters.NAME_ASCENDING)
public class AbstractJavaEsSparkSQLTest implements Serializable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@
import static org.elasticsearch.hadoop.util.TestUtils.resource;
import static org.hamcrest.Matchers.*;
import static org.junit.Assert.*;
import static scala.collection.JavaConversions.propertiesAsScalaMap;
import static org.elasticsearch.spark.integration.ScalaUtils.propertiesAsScalaMap;

@FixMethodOrder(MethodSorters.NAME_ASCENDING)
@RunWith(Parameterized.class)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package org.elasticsearch.spark.integration;

import scala.collection.mutable.Map;
import scala.collection.mutable.HashMap;

import java.util.Properties;

public class ScalaUtils {
/*
* Scala removed scala.collection.JavaConversions.propertiesAsScalaMap() in 2.13, replacing it with an implicit
* jdk.CollectionConverters.asScala. This is an attempt to get a method that works in 2.10-2.13. It can be removed and replaced with
* jdk.CollectionConverters once we no longer support scala older than 2.13.
*/
public static scala.collection.Map<String, String> propertiesAsScalaMap(Properties props) {
Map scalaMap = new HashMap();
for (java.util.Map.Entry<Object, Object> entry : props.entrySet()) {
scalaMap.put(entry.getKey(), entry.getValue());
}
return scalaMap;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static scala.collection.JavaConversions.propertiesAsScalaMap;
import static org.elasticsearch.spark.integration.ScalaUtils.propertiesAsScalaMap;

@FixMethodOrder(MethodSorters.NAME_ASCENDING)
@RunWith(Parameterized.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ import org.junit.runner.RunWith
import org.junit.runners.Parameterized.Parameters
import org.junit.runners.{MethodSorters, Parameterized}

import scala.collection.JavaConversions.propertiesAsScalaMap
import org.elasticsearch.spark.integration.ScalaUtils.propertiesAsScalaMap
import scala.collection.mutable
import scala.reflect.ClassTag

Expand All @@ -62,7 +62,7 @@ object AbstractScalaEsScalaSparkStreaming {

@BeforeClass
def setup(): Unit = {
conf.setAll(TestSettings.TESTING_PROPS)
conf.setAll(propertiesAsScalaMap(TestSettings.TESTING_PROPS))
sc = new SparkContext(conf)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,8 @@ import java.nio.file.Paths
import java.sql.Timestamp
import java.{util => ju}
import java.util.concurrent.TimeUnit
import scala.collection.JavaConversions.propertiesAsScalaMap
import scala.collection.JavaConverters.asScalaBufferConverter
import scala.collection.JavaConverters.mapAsJavaMapConverter
import org.elasticsearch.spark.integration.ScalaUtils.propertiesAsScalaMap
import org.elasticsearch.spark.rdd.JDKCollectionConvertersCompat.Converters._
import scala.collection.Map
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.SparkConf
Expand Down Expand Up @@ -116,7 +115,7 @@ object AbstractScalaEsScalaSparkSQL {

@BeforeClass
def setup() {
conf.setAll(TestSettings.TESTING_PROPS);
conf.setAll(propertiesAsScalaMap(TestSettings.TESTING_PROPS));
sc = new SparkContext(conf)
sqc = SparkSession.builder().config(conf).getOrCreate().sqlContext

Expand Down Expand Up @@ -231,7 +230,7 @@ class AbstractScalaEsScalaSparkSQL(prefix: String, readMetadata: jl.Boolean, pus
@Test
def test1KryoScalaEsRow() {
val kryo = SparkUtils.sparkSerializer(sc.getConf)
val row = new ScalaEsRow(new ArrayBuffer() ++= StringUtils.tokenize("foo,bar,tar").asScala)
val row = new ScalaEsRow((new ArrayBuffer() ++= StringUtils.tokenize("foo,bar,tar").asScala).toSeq)

val storage = Array.ofDim[Byte](512)
val output = new KryoOutput(storage)
Expand Down Expand Up @@ -733,7 +732,7 @@ class AbstractScalaEsScalaSparkSQL(prefix: String, readMetadata: jl.Boolean, pus
def testEsDataFrame3WriteWithRichMapping() {
val path = Paths.get(AbstractScalaEsScalaSparkSQL.testData.sampleArtistsDatUri())
// because Windows...
val lines = Files.readAllLines(path, StandardCharsets.ISO_8859_1).asScala
val lines = Files.readAllLines(path, StandardCharsets.ISO_8859_1).asScala.toSeq

val data = sc.parallelize(lines)

Expand Down Expand Up @@ -1614,7 +1613,7 @@ class AbstractScalaEsScalaSparkSQL(prefix: String, readMetadata: jl.Boolean, pus
assertEquals("long", nested.asInstanceOf[ArrayType].elementType.typeName)

val first = df.first
val vals = first.getStruct(0).getSeq[Seq[Long]](0)(0)
val vals = first.getStruct(0).getSeq[scala.collection.Seq[Long]](0)(0)
assertEquals(50, vals(0))
assertEquals(32, vals(1))
}
Expand Down Expand Up @@ -2366,7 +2365,7 @@ class AbstractScalaEsScalaSparkSQL(prefix: String, readMetadata: jl.Boolean, pus

// No "es.read.field.include", so everything is included:
var df = reader.load("read_field_include_test")
var result = df.select("features.hashtags").first().getAs[IndexedSeq[Row]](0)
var result = df.select("features.hashtags").first().getAs[scala.collection.IndexedSeq[Row]](0)
assertEquals(2, result(0).size)
assertEquals("hello", result(0).getAs("text"))
assertEquals("2", result(0).getAs("count"))
Expand Down Expand Up @@ -2413,7 +2412,7 @@ class AbstractScalaEsScalaSparkSQL(prefix: String, readMetadata: jl.Boolean, pus
val reader = sqc.read.schema(schema).format("org.elasticsearch.spark.sql").option("es.read.field.as.array.include","samples")
var resultDf = reader.load("nested_fields_upsert_test")
assertEquals(2, resultDf.count())
var samples = resultDf.select("samples").where("id = '2'").first().getAs[IndexedSeq[Row]](0)
var samples = resultDf.select("samples").where("id = '2'").first().getAs[scala.collection.IndexedSeq[Row]](0)
assertEquals(2, samples.size)
assertEquals("hello", samples(0).get(0))
assertEquals("world", samples(1).get(0))
Expand All @@ -2425,7 +2424,7 @@ class AbstractScalaEsScalaSparkSQL(prefix: String, readMetadata: jl.Boolean, pus
df.write.format("org.elasticsearch.spark.sql").options(es_conf).mode(SaveMode.Append).save("nested_fields_upsert_test")

resultDf = reader.load("nested_fields_upsert_test")
samples = resultDf.select("samples").where("id = '1'").first().getAs[IndexedSeq[Row]](0)
samples = resultDf.select("samples").where("id = '1'").first().getAs[scala.collection.IndexedSeq[Row]](0)
assertEquals(2, samples.size)
assertEquals("goodbye", samples(0).get(0))
assertEquals("world", samples(1).get(0))
Expand All @@ -2437,7 +2436,7 @@ class AbstractScalaEsScalaSparkSQL(prefix: String, readMetadata: jl.Boolean, pus
df.write.format("org.elasticsearch.spark.sql").options(es_conf).mode(SaveMode.Append).save("nested_fields_upsert_test")

resultDf = reader.load("nested_fields_upsert_test")
samples = resultDf.select("samples").where("id = '2'").first().getAs[IndexedSeq[Row]](0)
samples = resultDf.select("samples").where("id = '2'").first().getAs[scala.collection.IndexedSeq[Row]](0)
assertEquals(2, samples.size)
assertEquals("goodbye", samples(0).get(0))
assertEquals("again", samples(1).get(0))
Expand Down Expand Up @@ -2561,7 +2560,7 @@ class AbstractScalaEsScalaSparkSQL(prefix: String, readMetadata: jl.Boolean, pus
// don't use the sc.read.json/textFile to avoid the whole Hadoop madness
val path = Paths.get(uri)
// because Windows
val lines = Files.readAllLines(path, StandardCharsets.ISO_8859_1).asScala
val lines = Files.readAllLines(path, StandardCharsets.ISO_8859_1).asScala.toSeq
sc.parallelize(lines)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ import org.junit.runners.MethodSorters
import org.junit.runners.Parameterized
import org.junit.runners.Parameterized.Parameters

import scala.collection.JavaConversions.propertiesAsScalaMap
import org.elasticsearch.spark.integration.ScalaUtils.propertiesAsScalaMap
import scala.io.Codec
import scala.io.Source

Expand All @@ -84,7 +84,7 @@ object AbstractScalaEsSparkStructuredStreaming {

@BeforeClass
def setup(): Unit = {
sparkConf.setAll(TestSettings.TESTING_PROPS)
sparkConf.setAll(propertiesAsScalaMap(TestSettings.TESTING_PROPS))
spark = Some(
SparkSession.builder()
.config(sparkConf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.elasticsearch.spark.sql

import java.util

import org.apache.spark.sql.Row
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.StructField
Expand All @@ -29,10 +28,7 @@ import org.elasticsearch.hadoop.serialization.handler.write.SerializationFailure
import org.elasticsearch.hadoop.serialization.handler.write.impl.SerializationEventConverter
import org.elasticsearch.hadoop.util.DateUtils
import org.elasticsearch.hadoop.util.StringUtils
import org.hamcrest.Matchers.equalTo
import org.junit.Assert.assertEquals
import org.junit.Assert.assertThat
import org.junit.Assert.assertTrue
import org.junit.Assert.{assertEquals, assertTrue}
import org.junit.Test

class RowSerializationEventConverterTest {
Expand All @@ -52,7 +48,9 @@ class RowSerializationEventConverterTest {
new util.ArrayList[String])

val rawEvent = eventConverter.getRawEvent(iaeFailure)
assertThat(rawEvent, equalTo("(StructType(StructField(field1,StringType,true), " +

// Scala 2.13 changed what toString() looks like, so can't do an exact match here:
assertTrue(rawEvent.contains("(StructField(field1,StringType,true), " +
"StructField(field2,StringType,true), StructField(field3,StringType,true)),[value1,value2,value3])"))
val timestamp = eventConverter.getTimestamp(iaeFailure)
assertTrue(StringUtils.hasText(timestamp))
Expand Down