Skip to content

Commit 5940b98

Browse files
peter-tothwangyum
authored andcommitted
[SPARK-42346][SQL] Rewrite distinct aggregates after subquery merge
### What changes were proposed in this pull request? Unfortunately #32298 introduced a regression from Spark 3.2 to 3.3 as after that change a merged subquery can contain multiple distict type aggregates. Those aggregates need to be rewritten by the `RewriteDistinctAggregates` rule to get the correct results. This PR fixed that. ### Why are the changes needed? The following query: ``` SELECT (SELECT count(distinct c1) FROM t1), (SELECT count(distinct c2) FROM t1) ``` currently fails with: ``` java.lang.IllegalStateException: You hit a query analyzer bug. Please report your query to Spark user mailing list. at org.apache.spark.sql.execution.SparkStrategies$Aggregation$.apply(SparkStrategies.scala:538) ``` but works again after this PR. ### Does this PR introduce _any_ user-facing change? Yes, the above query works again. ### How was this patch tested? Added new UT. Closes #39887 from peter-toth/SPARK-42346-rewrite-distinct-aggregates-after-subquery-merge. Authored-by: Peter Toth <[email protected]> Signed-off-by: Yuming Wang <[email protected]>
1 parent b6eadf0 commit 5940b98

File tree

2 files changed

+27
-1
lines changed

2 files changed

+27
-1
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,8 @@ class SparkOptimizer(
5858
Batch("InjectRuntimeFilter", FixedPoint(1),
5959
InjectRuntimeFilter) :+
6060
Batch("MergeScalarSubqueries", Once,
61-
MergeScalarSubqueries) :+
61+
MergeScalarSubqueries,
62+
RewriteDistinctAggregates) :+
6263
Batch("Pushdown Filters from PartitionPruning", fixedPoint,
6364
PushDownPredicates) :+
6465
Batch("Cleanup filters that cannot be pushed down", Once,

sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2598,4 +2598,29 @@ class SubquerySuite extends QueryTest
25982598
Row("aa"))
25992599
}
26002600
}
2601+
2602+
test("SPARK-42346: Rewrite distinct aggregates after merging subqueries") {
2603+
withTempView("t1") {
2604+
Seq((1, 2), (3, 4)).toDF("c1", "c2").createOrReplaceTempView("t1")
2605+
2606+
checkAnswer(sql(
2607+
"""
2608+
|SELECT
2609+
| (SELECT count(distinct c1) FROM t1),
2610+
| (SELECT count(distinct c2) FROM t1)
2611+
|""".stripMargin),
2612+
Row(2, 2))
2613+
2614+
// In this case we don't merge the subqueries as `RewriteDistinctAggregates` kicks off for the
2615+
// 2 subqueries first but `MergeScalarSubqueries` is not prepared for the `Expand` nodes that
2616+
// are inserted by the rewrite.
2617+
checkAnswer(sql(
2618+
"""
2619+
|SELECT
2620+
| (SELECT count(distinct c1) + sum(distinct c2) FROM t1),
2621+
| (SELECT count(distinct c2) + sum(distinct c1) FROM t1)
2622+
|""".stripMargin),
2623+
Row(8, 6))
2624+
}
2625+
}
26012626
}

0 commit comments

Comments
 (0)