SuccessChanges

Summary

  1. [SPARK-23207][SPARK-22905][SPARK-24564][SPARK-25114][SQL][BACKPORT-2.1] (details)
Commit 4d2d3d47e00e78893b1ecd5a9a9070adc5243ac9 by gatorsmile
[SPARK-23207][SPARK-22905][SPARK-24564][SPARK-25114][SQL][BACKPORT-2.1]
Shuffle+Repartition on a DataFrame could lead to incorrect answers
## What changes were proposed in this pull request?
    Back port of #20393 and #22079.
    Currently shuffle repartition uses RoundRobinPartitioning, the
generated result is nondeterministic since the sequence of input rows
are not determined.
    The bug can be triggered when there is a repartition call following
a shuffle (which would lead to non-deterministic row ordering), as the
pattern shows below:
   upstream stage -> repartition stage -> result stage
   (-> indicate a shuffle)
   When one of the executors process goes down, some tasks on the
repartition stage will be retried and generate inconsistent ordering,
and some tasks of the result stage will be retried generating different
data.
    The following code returns 931532, instead of 1000000:
   ```
   import scala.sys.process._
    import org.apache.spark.TaskContext
   val res = spark.range(0, 1000 * 1000, 1).repartition(200).map { x =>
     x
   }.repartition(200).map { x =>
     if (TaskContext.get.attemptNumber == 0 &&
TaskContext.get.partitionId < 2) {
       throw new Exception("pkill -f java".!!)
     }
     x
   }
   res.distinct().count()
   ```
    In this PR, we propose a most straight-forward way to fix this
problem by performing a local sort before partitioning, after we make
the input row ordering deterministic, the function from rows to
partitions is fully deterministic too.
    The downside of the approach is that with extra local sort inserted,
the performance of repartition() will go down, so we add a new config
named `spark.sql.execution.sortBeforeRepartition` to control whether
this patch is applied. The patch is default enabled to be
safe-by-default, but user may choose to manually turn it off to avoid
performance regression.
    This patch also changes the output rows ordering of repartition(),
that leads to a bunch of test cases failure because they are comparing
the results directly.
    Add unit test in ExchangeSuite.
    With this patch(and `spark.sql.execution.sortBeforeRepartition` set
to true), the following query returns 1000000:
   ```
   import scala.sys.process._
    import org.apache.spark.TaskContext
    spark.conf.set("spark.sql.execution.sortBeforeRepartition", "true")
    val res = spark.range(0, 1000 * 1000, 1).repartition(200).map { x =>
     x
   }.repartition(200).map { x =>
     if (TaskContext.get.attemptNumber == 0 &&
TaskContext.get.partitionId < 2) {
       throw new Exception("pkill -f java".!!)
     }
     x
   }
   res.distinct().count()
    res7: Long = 1000000
   ```
    Author: Xingbo Jiang <xingbo.jiangdatabricks.com>
Author: Xingbo Jiang <xingbo.jiang@databricks.com> Author: Henry
Robinson <henry@apache.org>
Closes #22211 from henryr/spark-23207-branch-2.1.
The file was modifiedcore/src/main/java/org/apache/spark/util/collection/unsafe/sort/RecordComparator.java (diff)
The file was addedsql/core/src/test/java/test/org/apache/spark/sql/execution/sort/RecordBinaryComparatorSuite.java
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeSuite.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala (diff)
The file was modifiedcore/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java (diff)
The file was modifiedsql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java (diff)
The file was modifiedcore/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillMerger.java (diff)
The file was addedsql/catalyst/src/main/java/org/apache/spark/sql/execution/RecordBinaryComparator.java
The file was modifiedcore/src/test/java/org/apache/spark/memory/TestMemoryConsumer.java (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala (diff)
The file was modifiedcore/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java (diff)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala (diff)
The file was modifiedmllib/src/main/scala/org/apache/spark/mllib/feature/ChiSqSelector.scala (diff)
The file was modifiedmllib/src/test/scala/org/apache/spark/ml/feature/Word2VecSuite.scala (diff)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala (diff)
The file was modifiedcore/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java (diff)
The file was modifiedmllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixtureModel.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchange.scala (diff)
The file was modifiedsql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java (diff)
The file was modifiedcore/src/main/scala/org/apache/spark/rdd/RDD.scala (diff)