Skip to content

Commit 6ebd9ec

Browse files
bersprocketsSandishKumarHN
authored andcommitted
[SPARK-41035][SQL] Don't patch foldable children of aggregate functions in RewriteDistinctAggregates
### What changes were proposed in this pull request? `RewriteDistinctAggregates` doesn't typically patch foldable children of the distinct aggregation functions except in one odd case (and seemingly by accident). This PR extends the policy of not patching foldables to that odd case. ### Why are the changes needed? This query produces incorrect results: ``` select a, count(distinct 100) as cnt1, count(distinct b, 100) as cnt2 from values (1, 2), (4, 5) as data(a, b) group by a; +---+----+----+ |a |cnt1|cnt2| +---+----+----+ |1 |1 |0 | |4 |1 |0 | +---+----+----+ ``` The values for `cnt2` should be 1 and 1 (not 0 and 0). If you change the literal used in the first aggregate function, the second aggregate function now works correctly: ``` select a, count(distinct 101) as cnt1, count(distinct b, 100) as cnt2 from values (1, 2), (4, 5) as data(a, b) group by a; +---+----+----+ |a |cnt1|cnt2| +---+----+----+ |1 |1 |1 | |4 |1 |1 | +---+----+----+ ``` The bug is in the rule `RewriteDistinctAggregates`. When a distinct aggregation has only foldable children, `RewriteDistinctAggregates` uses the first child as the grouping key (_grouping key_ in this context means the function children of distinct aggregate functions: `RewriteDistinctAggregates` groups distinct aggregations by function children to determine the `Expand` projections it needs to create). Therefore, the first foldable child gets included in the `Expand` projection associated with the aggregation, with a corresponding output attribute that is also included in the map for patching aggregate functions in the final aggregation. The `Expand` projections for all other distinct aggregate groups will have `null` in the slot associated with that output attribute. If the same foldable expression is used in a distinct aggregation associated with a different group, `RewriteDistinctAggregates` will improperly patch the associated aggregate function to use the previous aggregation's output attribute. Since the output attribute is associated with a different group, the value of that slot in the `Expand` projection will always be `null`. In the example above, `count(distinct 100) as cnt1` is the aggregation with only foldable children, and `count(distinct b, 100) as cnt2` is the aggregation that gets inappropriately patched with the wrong group's output attribute. As a result `count(distinct b, 100) as cnt2` (from the first example above) essentially becomes `count(distinct b, null) as cnt2`, which is always zero. `RewriteDistinctAggregates` doesn't typically patch foldable children of the distinct aggregation functions in the final aggregation. It potentially patches foldable expressions only when there is a distinct aggregation with only foldable children, and even then it doesn't patch the aggregation that has only foldable children, but instead some other unlucky aggregate function that happened to use the same foldable expression. This PR skips patching any foldable expressions in the aggregate functions to avoid patching an aggregation with a different group's output attribute. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New unit test. Closes apache#38565 from bersprockets/distinct_literal_issue. Authored-by: Bruce Robbins <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
1 parent 956e140 commit 6ebd9ec

File tree

2 files changed

+11
-1
lines changed

2 files changed

+11
-1
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,7 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] {
267267
}.unzip3
268268

269269
// Setup expand & aggregate operators for distinct aggregate expressions.
270-
val distinctAggChildAttrLookup = distinctAggChildAttrMap.toMap
270+
val distinctAggChildAttrLookup = distinctAggChildAttrMap.filter(!_._1.foldable).toMap
271271
val distinctAggFilterAttrLookup = Utils.toMap(distinctAggFilters, maxConds.map(_.toAttribute))
272272
val distinctAggOperatorMap = distinctAggGroups.toSeq.zipWithIndex.map {
273273
case ((group, expressions), i) =>

sql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1527,6 +1527,16 @@ class DataFrameAggregateSuite extends QueryTest
15271527
|""".stripMargin)
15281528
checkAnswer(res3, Row(1, 7, 4.5, 1) :: Row(2, 7, 4.5, 2) :: Nil)
15291529
}
1530+
1531+
test("SPARK-41035: Reuse of literal in distinct aggregations should work") {
1532+
val res = sql(
1533+
"""select a, count(distinct 100), count(distinct b, 100)
1534+
|from values (1, 2), (4, 5), (4, 6) as data(a, b)
1535+
|group by a;
1536+
|""".stripMargin
1537+
)
1538+
checkAnswer(res, Row(1, 1, 1) :: Row(4, 1, 2) :: Nil)
1539+
}
15301540
}
15311541

15321542
case class B(c: Option[Double])

0 commit comments

Comments
 (0)