Skip to content

Commit 62b4b86

Browse files
authored
Supporting scala 2.1.3 for spark 3.2.0 (#1841)
This commit adds support for spark 3.2 running scala 2.13. Closes #1815
1 parent 3ef547d commit 62b4b86

File tree

10 files changed

+59
-13
lines changed

10 files changed

+59
-13
lines changed

gradle.properties

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ scala211Version = 2.11.12
3939
scala211MajorVersion = 2.11
4040
scala212Version = 2.12.8
4141
scala212MajorVersion = 2.12
42+
scala213Version = 2.13.6
43+
scala213MajorVersion = 2.13
4244

4345
stormVersion = 1.0.6
4446

spark/core/build.gradle

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ apply plugin: 'spark.variants'
1010
sparkVariants {
1111
capabilityGroup 'org.elasticsearch.spark.variant'
1212
setCoreDefaultVariant "spark20scala212", spark24Version, scala212Version
13+
addCoreFeatureVariant "spark30scala213", spark30Version, scala213Version
1314
addCoreFeatureVariant "spark30scala212", spark30Version, scala212Version
1415
addCoreFeatureVariant "spark20scala211", spark24Version, scala211Version
1516
addCoreFeatureVariant "spark20scala210", spark22Version, scala210Version
@@ -50,7 +51,7 @@ sparkVariants {
5051
add(variant.configuration('implementation'), project(":elasticsearch-hadoop-mr"))
5152
add(variant.configuration('implementation'), "commons-logging:commons-logging:1.1.1")
5253

53-
add(variant.configuration('compileOnly'), "com.fasterxml.jackson.module:jackson-module-scala_${variant.scalaMajorVersion}:2.6.7.1")
54+
add(variant.configuration('compileOnly'), "com.fasterxml.jackson.module:jackson-module-scala_${variant.scalaMajorVersion}:2.9.10")
5455
add(variant.configuration('compileOnly'), "com.fasterxml.jackson.core:jackson-annotations:2.6.7")
5556
add(variant.configuration('compileOnly'), "com.google.guava:guava:14.0.1")
5657
add(variant.configuration('compileOnly'), "com.google.protobuf:protobuf-java:2.5.0")
@@ -100,7 +101,7 @@ sparkVariants {
100101
String generatedJavaDirectory = "$buildDir/generated/java/${variant.name}"
101102
Configuration scalaCompilerPlugin = project.configurations.maybeCreate(variant.configuration('scalaCompilerPlugin'))
102103
scalaCompilerPlugin.defaultDependencies { dependencies ->
103-
dependencies.add(project.dependencies.create("com.typesafe.genjavadoc:genjavadoc-plugin_${variant.scalaVersion}:0.13"))
104+
dependencies.add(project.dependencies.create("com.typesafe.genjavadoc:genjavadoc-plugin_${variant.scalaVersion}:0.18"))
104105
}
105106

106107
ScalaCompile compileScala = tasks.getByName(scalaCompileTaskName) as ScalaCompile

spark/core/src/main/scala/org/elasticsearch/spark/rdd/AbstractEsRDD.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,7 @@
1818
*/
1919
package org.elasticsearch.spark.rdd;
2020

21-
import scala.collection.JavaConversions.collectionAsScalaIterable
22-
import scala.collection.JavaConversions.mapAsJavaMap
21+
import JDKCollectionConvertersCompat.Converters._
2322
import scala.reflect.ClassTag
2423
import org.apache.commons.logging.LogFactory
2524
import org.apache.spark.Partition
@@ -45,7 +44,7 @@ private[spark] abstract class AbstractEsRDD[T: ClassTag](
4544
@transient protected lazy val logger = LogFactory.getLog(this.getClass())
4645

4746
override def getPartitions: Array[Partition] = {
48-
esPartitions.zipWithIndex.map { case(esPartition, idx) =>
47+
esPartitions.asScala.zipWithIndex.map { case(esPartition, idx) =>
4948
new EsPartition(id, idx, esPartition)
5049
}.toArray
5150
}
@@ -70,7 +69,7 @@ private[spark] abstract class AbstractEsRDD[T: ClassTag](
7069

7170
@transient private[spark] lazy val esCfg = {
7271
val cfg = new SparkSettingsManager().load(sc.getConf).copy();
73-
cfg.merge(params)
72+
cfg.merge(params.asJava)
7473
InitializationUtils.setUserProviderIfNotSet(cfg, classOf[HadoopUserProvider], logger)
7574
cfg
7675
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.spark.rdd
20+
21+
/**
22+
* Magic to get cross-compiling access to scala.jdk.CollectionConverter
23+
* with a fallback on scala.collection.JavaConverters, without deprecation
24+
* warning in any Scala version.
25+
* From https://github.com/scala/scala-collection-compat/issues/208#issuecomment-497735669
26+
*/
27+
private[rdd] object JDKCollectionConvertersCompat {
28+
object Scope1 {
29+
object jdk {
30+
type CollectionConverters = Int
31+
}
32+
}
33+
import Scope1._
34+
35+
object Scope2 {
36+
import scala.collection.{JavaConverters => CollectionConverters}
37+
object Inner {
38+
import scala._
39+
import jdk.CollectionConverters
40+
val Converters = CollectionConverters
41+
}
42+
}
43+
44+
val Converters = Scope2.Inner.Converters
45+
}

spark/core/src/main/scala/org/elasticsearch/spark/serialization/ScalaMapFieldExtractor.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,8 @@
1818
*/
1919
package org.elasticsearch.spark.serialization
2020

21-
import scala.collection.GenMapLike
2221
import scala.collection.Map
2322

24-
import org.elasticsearch.hadoop.serialization.field.ConstantFieldExtractor
2523
import org.elasticsearch.hadoop.serialization.MapFieldExtractor
2624
import org.elasticsearch.hadoop.serialization.field.FieldExtractor._
2725
import org.elasticsearch.spark.serialization.{ ReflectionUtils => RU }

spark/sql-13/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ sparkVariants {
125125
String generatedJavaDirectory = "$buildDir/generated/java/${variant.name}"
126126
Configuration scalaCompilerPlugin = project.configurations.maybeCreate(variant.configuration('scalaCompilerPlugin'))
127127
scalaCompilerPlugin.defaultDependencies { dependencies ->
128-
dependencies.add(project.dependencies.create("com.typesafe.genjavadoc:genjavadoc-plugin_${variant.scalaVersion}:0.13"))
128+
dependencies.add(project.dependencies.create("com.typesafe.genjavadoc:genjavadoc-plugin_${variant.scalaVersion}:0.18"))
129129
}
130130

131131
ScalaCompile compileScala = tasks.getByName(scalaCompileTaskName) as ScalaCompile

spark/sql-20/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ sparkVariants {
144144
String generatedJavaDirectory = "$buildDir/generated/java/${variant.name}"
145145
Configuration scalaCompilerPlugin = project.configurations.maybeCreate(variant.configuration('scalaCompilerPlugin'))
146146
scalaCompilerPlugin.defaultDependencies { dependencies ->
147-
dependencies.add(project.dependencies.create("com.typesafe.genjavadoc:genjavadoc-plugin_${variant.scalaVersion}:0.13"))
147+
dependencies.add(project.dependencies.create("com.typesafe.genjavadoc:genjavadoc-plugin_${variant.scalaVersion}:0.18"))
148148
}
149149

150150
ScalaCompile compileScala = tasks.getByName(scalaCompileTaskName) as ScalaCompile

spark/sql-30/build.gradle

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ apply plugin: 'spark.variants'
1010
sparkVariants {
1111
capabilityGroup 'org.elasticsearch.spark.sql.variant'
1212
setDefaultVariant "spark30scala212", spark30Version, scala212Version
13+
addFeatureVariant "spark30scala213", spark30Version, scala213Version
1314

1415
all { SparkVariantPlugin.SparkVariant variant ->
1516
String scalaCompileTaskName = project.sourceSets
@@ -130,7 +131,7 @@ sparkVariants {
130131
String generatedJavaDirectory = "$buildDir/generated/java/${variant.name}"
131132
Configuration scalaCompilerPlugin = project.configurations.maybeCreate(variant.configuration('scalaCompilerPlugin'))
132133
scalaCompilerPlugin.defaultDependencies { dependencies ->
133-
dependencies.add(project.dependencies.create("com.typesafe.genjavadoc:genjavadoc-plugin_${variant.scalaVersion}:0.13"))
134+
dependencies.add(project.dependencies.create("com.typesafe.genjavadoc:genjavadoc-plugin_${variant.scalaVersion}:0.18"))
134135
}
135136

136137
ScalaCompile compileScala = tasks.getByName(scalaCompileTaskName) as ScalaCompile

spark/sql-30/src/main/scala/org/elasticsearch/spark/sql/DefaultSource.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -351,7 +351,7 @@ private[sql] case class ElasticsearchRelation(parameters: Map[String, String], @
351351

352352
case EqualTo(attribute, value) => {
353353
// if we get a null, translate it into a missing query (we're extra careful - Spark should translate the equals into isMissing anyway)
354-
if (value == null || value == None || value == Unit) {
354+
if (value == null || value == None || value == ()) {
355355
if (isES50) {
356356
s"""{"bool":{"must_not":{"exists":{"field":"$attribute"}}}}"""
357357
}

spark/sql-30/src/main/scala/org/elasticsearch/spark/sql/SchemaUtils.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,7 @@ private[sql] object SchemaUtils {
261261
for (prop <- rowOrderProps.asScala) {
262262
val value = StringUtils.tokenize(prop._2).asScala
263263
if (!value.isEmpty) {
264-
order.put(prop._1, new ArrayBuffer() ++= value)
264+
order.put(prop._1, value.toSeq)
265265
}
266266
}
267267

0 commit comments

Comments
 (0)