SuccessChanges

Summary

  1. [SPARK-22951][SQL][BRANCH-2.2] fix aggregation after dropDuplicates on (details)
Commit f0c6f1da39550ac0cae274348dc885eba861c383 by dongjoon
[SPARK-22951][SQL][BRANCH-2.2] fix aggregation after dropDuplicates on
empty dataframes
## What changes were proposed in this pull request?
(courtesy of liancheng)
Spark SQL supports both global aggregation and grouping aggregation.
Global aggregation always return a single row with the initial
aggregation state as the output, even there are zero input rows. Spark
implements this by simply checking the number of grouping keys and
treats an aggregation as a global aggregation if it has zero grouping
keys.
However, this simple principle drops the ball in the following case:
```scala spark.emptyDataFrame.dropDuplicates().agg(count($"*") as
"c").show()
// +---+
// | c |
// +---+
// | 1 |
// +---+
```
The reason is that:
1. `df.dropDuplicates()` is roughly translated into something equivalent
to:
```scala val allColumns = df.columns.map { col } df.groupBy(allColumns:
_*).agg(allColumns.head, allColumns.tail: _*)
```
This translation is implemented in the rule
`ReplaceDeduplicateWithAggregate`.
2. `spark.emptyDataFrame` contains zero columns and zero rows.
Therefore, rule `ReplaceDeduplicateWithAggregate` makes a confusing
transformation roughly equivalent to the following one:
```scala spark.emptyDataFrame.dropDuplicates()
=> spark.emptyDataFrame.groupBy().agg(Map.empty[String, String])
```
The above transformation is confusing because the resulting aggregate
operator contains no grouping keys (because `emptyDataFrame` contains no
columns), and gets recognized as a global aggregation. As a result,
Spark SQL allocates a single row filled by the initial aggregation state
and uses it as the output, and returns a wrong result.
To fix this issue, this PR tweaks `ReplaceDeduplicateWithAggregate` by
appending a literal `1` to the grouping key list of the resulting
`Aggregate` operator when the input plan contains zero output columns.
In this way, `spark.emptyDataFrame.dropDuplicates()` is now translated
into a grouping aggregation, roughly depicted as:
```scala spark.emptyDataFrame.dropDuplicates()
=> spark.emptyDataFrame.groupBy(lit(1)).agg(Map.empty[String, String])
```
Which is now properly treated as a grouping aggregation and returns the
correct answer.
## How was this patch tested?
New unit tests added
Closes #23434 from dongjoon-hyun/SPARK-22951-2.
Authored-by: Feng Liu <fengliu@databricks.com> Signed-off-by: Dongjoon
Hyun <dongjoon@apache.org>
The file was modifiedsql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala (diff)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/DataFrameAggregateSuite.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala (diff)