Skip to content

Commit 77a4aa0

Browse files
Raghu AngadiHyukjinKwon
Raghu Angadi
authored andcommitted
[SPARK-44337][PROTOBUF] Any fields set to 'Any.getDefaultInstance' cause parse errors
### What changes were proposed in this pull request? While building the Protobuf descriptor from FileDescriptorSet, this PR uses bundled Java descriptor for 'google/protobuf/any.proto'. This works around a bug in Protobuf's JsonFormat while parsing Any fields initialized with `Any.getDefaultInstance()`. An existing test for Any fields is updated to test this case. Without the fix, the test passes with Java classes, but fails with FileDescriptorSet bytes. The latter is the common use case. The bug in JsonFormat is related to how it checks for Any values at runtime that were originally initialized with `Any.getDefaultInstance()`. Specifically the conditional [here](https://github.com/protocolbuffers/protobuf/blob/51f79d4415338cfe068a39d7ce466870df223245/java/util/src/main/java/com/google/protobuf/util/JsonFormat.java#L860) in JsonFormat.java fails since it tries to compare with actual default instance in Java from Java descriptor. If we build the same descriptor separately, JsonFormat does not recognize it. ### Why are the changes needed? Without this fix, input that includes fields initialized to `Any.getDefaultInstance` can not be parsed. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? An existing unit test is updated to test this case. Closes #41897 from rangadi/any-default. Authored-by: Raghu Angadi <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
1 parent b7c6c84 commit 77a4aa0

File tree

2 files changed

+18
-3
lines changed

2 files changed

+18
-3
lines changed

connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala

+10-1
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,16 @@ private[sql] object ProtobufUtils extends Logging {
266266
val fileDescriptorList = fileDescriptorProto.getDependencyList().asScala.map { dependency =>
267267
fileDescriptorProtoMap.get(dependency) match {
268268
case Some(dependencyProto) =>
269-
buildFileDescriptor(dependencyProto, fileDescriptorProtoMap)
269+
if (dependencyProto.getName == "google/protobuf/any.proto"
270+
&& dependencyProto.getPackage == "google.protobuf") {
271+
// For Any, use the descriptor already included as part of the Java dependency.
272+
// Without this, JsonFormat used for converting Any fields fails when
273+
// an Any field in input is set to `Any.getDefaultInstance()`.
274+
com.google.protobuf.AnyProto.getDescriptor
275+
// Should we do the same for timestamp.proto and empty.proto?
276+
} else {
277+
buildFileDescriptor(dependencyProto, fileDescriptorProtoMap)
278+
}
270279
case None =>
271280
throw QueryCompilationErrors.protobufDescriptorDependencyError(dependency)
272281
}

connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala

+8-2
Original file line numberDiff line numberDiff line change
@@ -1214,9 +1214,10 @@ class ProtobufFunctionsSuite extends QueryTest with SharedSparkSession with Prot
12141214

12151215
checkWithFileAndClassName("ProtoWithAnyArray") { case (name, descFilePathOpt) =>
12161216

1217-
// proto: message { string description = 1; repeated google.protobuf.Any items = 2;
1217+
// proto: message { string description = 1; repeated google.protobuf.Any items = 2 };
12181218

1219-
// Use two different types of protos for 'items'. One with an Any field, and one without.
1219+
// Use 3 different types of protos for 'items'. One with an Any field, and one without,
1220+
// and one with default instance of Any. The last one triggers JsonFormat bug.
12201221

12211222
val simpleProto = SimpleMessage.newBuilder() // Json: {"id":10,"string_value":"galaxy"}
12221223
.setId(10)
@@ -1232,6 +1233,7 @@ class ProtobufFunctionsSuite extends QueryTest with SharedSparkSession with Prot
12321233
.setDescription("nested any demo")
12331234
.addItems(AnyProto.pack(simpleProto)) // A simple proto
12341235
.addItems(AnyProto.pack(protoWithAny)) // A proto with any field inside it.
1236+
.addItems(AnyProto.getDefaultInstance) // An Any field initialized to default instance.
12351237
.build()
12361238
.toByteArray
12371239

@@ -1246,6 +1248,9 @@ class ProtobufFunctionsSuite extends QueryTest with SharedSparkSession with Prot
12461248
"items: ARRAY<STRUCT<type_url: STRING, value: BINARY>>>"
12471249
)
12481250

1251+
// Print df to see how the Any fields look like without json conversion.
1252+
log.info(s"Input row without json conversion: ${df.collect()(0)}")
1253+
12491254
// String for items with 'convert.to.json' option enabled.
12501255
val options = Map(ProtobufOptions.CONVERT_ANY_FIELDS_TO_JSON_CONFIG -> "true")
12511256
val dfJson = inputDF.select(from_protobuf_wrapper(
@@ -1276,6 +1281,7 @@ class ProtobufFunctionsSuite extends QueryTest with SharedSparkSession with Prot
12761281
| "string_value":"galaxy"
12771282
| }
12781283
| }""".stripMargin))
1284+
assert(items.get(2) == "{}") // 3rd field is empty (Any.getDefaultInstance)
12791285
}
12801286
}
12811287

0 commit comments

Comments
 (0)