SuccessChanges

Summary

  1. [SPARK-23004][SS] Ensure StateStore.commit is called only once in a (commit: 8eb9a411d89e43110e9553e0d19a16cdc37bf789) (details)
  2. Revert "[SPARK-23799][SQL] FilterEstimation.evaluateInSet produces (commit: 1c3e8205d04d8e40a9d73633631534a728d7b1fe) (details)
Commit 8eb9a411d89e43110e9553e0d19a16cdc37bf789 by tathagata.das1565
[SPARK-23004][SS] Ensure StateStore.commit is called only once in a
streaming aggregation task
## What changes were proposed in this pull request?
A structured streaming query with a streaming aggregation can throw the
following error in rare cases. 
``` java.lang.IllegalStateException: Cannot commit after already
committed or aborted
at
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider.org$apache$spark$sql$execution$streaming$state$HDFSBackedStateStoreProvider$$verify(HDFSBackedStateStoreProvider.scala:643)
at
org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.commit(HDFSBackedStateStoreProvider.scala:135)
at
org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3$$anon$2$$anonfun$hasNext$2.apply$mcV$sp(statefulOperators.scala:359)
at
org.apache.spark.sql.execution.streaming.StateStoreWriter$class.timeTakenMs(statefulOperators.scala:102)
at
org.apache.spark.sql.execution.streaming.StateStoreSaveExec.timeTakenMs(statefulOperators.scala:251)
at
org.apache.spark.sql.execution.streaming.StateStoreSaveExec$$anonfun$doExecute$3$$anon$2.hasNext(statefulOperators.scala:359)
at
org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.processInputs(ObjectAggregationIterator.scala:188)
at
org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.<init>(ObjectAggregationIterator.scala:78)
at
org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:114)
at
org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2.apply(ObjectHashAggregateExec.scala:105)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$apply$24.apply(RDD.scala:830)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$apply$24.apply(RDD.scala:830)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:42)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:336)
```
This can happen when the following conditions are accidentally hit. 
- Streaming aggregation with aggregation function that is a subset of
[`TypedImperativeAggregation`](https://github.com/apache/spark/blob/76b8b840ddc951ee6203f9cccd2c2b9671c1b5e8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala#L473)
(for example, `collect_set`, `collect_list`, `percentile`, etc.). 
- Query running in `update}` mode
- After the shuffle, a partition has exactly 128 records. 
This causes StateStore.commit to be called twice. See the
[JIRA](https://issues.apache.org/jira/browse/SPARK-23004) for a more
detailed explanation. The solution is to use `NextIterator` or
`CompletionIterator`, each of which has a flag to prevent the
"onCompletion" task from being called more than once. In this PR, I
chose to implement using `NextIterator`.
## How was this patch tested?
Added unit test that I have confirm will fail without the fix.
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes #21124 from tdas/SPARK-23004.
(cherry picked from commit 770add81c3474e754867d7105031a5eaf27159bd)
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
(commit: 8eb9a411d89e43110e9553e0d19a16cdc37bf789)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala (diff)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingAggregationSuite.scala (diff)
Commit 1c3e8205d04d8e40a9d73633631534a728d7b1fe by gatorsmile
Revert "[SPARK-23799][SQL] FilterEstimation.evaluateInSet produces
devision by zero in a case of empty table with analyzed statistics"
This reverts commit c2f4ee7baf07501cc1f8a23dd21d14aea53606c7.
(commit: 1c3e8205d04d8e40a9d73633631534a728d7b1fe)
The file was modifiedsql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala (diff)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala (diff)