Skip to content

Commit 17b7123

Browse files
peter-tothwangyum
authored andcommitted
[SPARK-42346][SQL] Rewrite distinct aggregates after subquery merge
### What changes were proposed in this pull request? Unfortunately #32298 introduced a regression from Spark 3.2 to 3.3 as after that change a merged subquery can contain multiple distict type aggregates. Those aggregates need to be rewritten by the `RewriteDistinctAggregates` rule to get the correct results. This PR fixed that. ### Why are the changes needed? The following query: ``` SELECT (SELECT count(distinct c1) FROM t1), (SELECT count(distinct c2) FROM t1) ``` currently fails with: ``` java.lang.IllegalStateException: You hit a query analyzer bug. Please report your query to Spark user mailing list. at org.apache.spark.sql.execution.SparkStrategies$Aggregation$.apply(SparkStrategies.scala:538) ``` but works again after this PR. ### Does this PR introduce _any_ user-facing change? Yes, the above query works again. ### How was this patch tested? Added new UT. Closes #39887 from peter-toth/SPARK-42346-rewrite-distinct-aggregates-after-subquery-merge. Authored-by: Peter Toth <[email protected]> Signed-off-by: Yuming Wang <[email protected]> (cherry picked from commit 5940b98) Signed-off-by: Yuming Wang <[email protected]>
1 parent cdb494b commit 17b7123

File tree

2 files changed

+27
-1
lines changed

2 files changed

+27
-1
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala

+2-1
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,8 @@ class SparkOptimizer(
5555
InjectRuntimeFilter,
5656
RewritePredicateSubquery) :+
5757
Batch("MergeScalarSubqueries", Once,
58-
MergeScalarSubqueries) :+
58+
MergeScalarSubqueries,
59+
RewriteDistinctAggregates) :+
5960
Batch("Pushdown Filters from PartitionPruning", fixedPoint,
6061
PushDownPredicates) :+
6162
Batch("Cleanup filters that cannot be pushed down", Once,

sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala

+25
Original file line numberDiff line numberDiff line change
@@ -2269,4 +2269,29 @@ class SubquerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
22692269
assert(findProject(df2).size == 3)
22702270
}
22712271
}
2272+
2273+
test("SPARK-42346: Rewrite distinct aggregates after merging subqueries") {
2274+
withTempView("t1") {
2275+
Seq((1, 2), (3, 4)).toDF("c1", "c2").createOrReplaceTempView("t1")
2276+
2277+
checkAnswer(sql(
2278+
"""
2279+
|SELECT
2280+
| (SELECT count(distinct c1) FROM t1),
2281+
| (SELECT count(distinct c2) FROM t1)
2282+
|""".stripMargin),
2283+
Row(2, 2))
2284+
2285+
// In this case we don't merge the subqueries as `RewriteDistinctAggregates` kicks off for the
2286+
// 2 subqueries first but `MergeScalarSubqueries` is not prepared for the `Expand` nodes that
2287+
// are inserted by the rewrite.
2288+
checkAnswer(sql(
2289+
"""
2290+
|SELECT
2291+
| (SELECT count(distinct c1) + sum(distinct c2) FROM t1),
2292+
| (SELECT count(distinct c2) + sum(distinct c1) FROM t1)
2293+
|""".stripMargin),
2294+
Row(8, 6))
2295+
}
2296+
}
22722297
}

0 commit comments

Comments
 (0)