Skip to content

Commit 547737b

Browse files
ulysses-youcloud-fan
authored andcommitted
[SPARK-41959][SQL] Improve v1 writes with empty2null
### What changes were proposed in this pull request? Cleanup some unnecessary `Empty2Null` related code ### Why are the changes needed? V1Writes checked idempotency using WriteFiles, so it's unnecessary to check if empty2null if exists again. ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? pass CI Closes #39475 from ulysses-you/SPARK-41959. Authored-by: ulysses-you <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 70a098c commit 547737b

File tree

2 files changed

+4
-15
lines changed

2 files changed

+4
-15
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -206,13 +206,8 @@ object FileFormatWriter extends Logging {
206206
partitionColumns: Seq[Attribute],
207207
sortColumns: Seq[Attribute],
208208
orderingMatched: Boolean): Set[String] = {
209-
val hasEmpty2Null = plan.exists(p => V1WritesUtils.hasEmptyToNull(p.expressions))
210-
val empty2NullPlan = if (hasEmpty2Null) {
211-
plan
212-
} else {
213-
val projectList = V1WritesUtils.convertEmptyToNull(plan.output, partitionColumns)
214-
if (projectList.nonEmpty) ProjectExec(projectList, plan) else plan
215-
}
209+
val projectList = V1WritesUtils.convertEmptyToNull(plan.output, partitionColumns)
210+
val empty2NullPlan = if (projectList.nonEmpty) ProjectExec(projectList, plan) else plan
216211

217212
writeAndCommit(job, description, committer) {
218213
val (planToExecute, concurrentOutputWriterSpec) = if (orderingMatched) {

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/V1Writes.scala

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -93,13 +93,8 @@ object V1Writes extends Rule[LogicalPlan] with SQLConfHelper {
9393
}
9494

9595
private def prepareQuery(write: V1WriteCommand, query: LogicalPlan): LogicalPlan = {
96-
val hasEmpty2Null = query.exists(p => hasEmptyToNull(p.expressions))
97-
val empty2NullPlan = if (hasEmpty2Null) {
98-
query
99-
} else {
100-
val projectList = convertEmptyToNull(query.output, write.partitionColumns)
101-
if (projectList.isEmpty) query else Project(projectList, query)
102-
}
96+
val projectList = convertEmptyToNull(query.output, write.partitionColumns)
97+
val empty2NullPlan = if (projectList.isEmpty) query else Project(projectList, query)
10398
assert(empty2NullPlan.output.length == query.output.length)
10499
val attrMap = AttributeMap(query.output.zip(empty2NullPlan.output))
105100

@@ -108,7 +103,6 @@ object V1Writes extends Rule[LogicalPlan] with SQLConfHelper {
108103
case a: Attribute => attrMap.getOrElse(a, a)
109104
}.asInstanceOf[SortOrder])
110105
val outputOrdering = query.outputOrdering
111-
// Check if the ordering is already matched to ensure the idempotency of the rule.
112106
val orderingMatched = isOrderingMatched(requiredOrdering, outputOrdering)
113107
if (orderingMatched) {
114108
empty2NullPlan

0 commit comments

Comments
 (0)