1. [SPARK-23243][CORE][2.3] Fix RDD.repartition() data correctness issue (details)
Commit d22379ec2a01fb3aa2121c312a863f057a5761ed by wenchen
[SPARK-23243][CORE][2.3] Fix RDD.repartition() data correctness issue
backport to 2.3
An alternative fix for
When Spark rerun tasks for an RDD, there are 3 different behaviors: 1.
determinate. Always return the same result with same order when rerun.
2. unordered. Returns same data set in random order when rerun. 3.
indeterminate. Returns different result when rerun.
Normally Spark doesn't need to care about it. Spark runs stages one by
one, when a task is failed, just rerun it. Although the rerun task may
return a different result, users will not be surprised.
However, Spark may rerun a finished stage when seeing fetch failures.
When this happens, Spark needs to rerun all the tasks of all the
succeeding stages if the RDD output is indeterminate, because the input
of the succeeding stages has been changed.
If the RDD output is determinate, we only need to rerun the failed tasks
of the succeeding stages, because the input doesn't change.
If the RDD output is unordered, it's same as determinate, because
shuffle partitioner is always deterministic(round-robin partitioner is
not a shuffle partitioner that extends `org.apache.spark.Partitioner`),
so the reducers will still get the same input data set.
This PR fixed the failure handling for `repartition`, to avoid
correctness issues.
For `repartition`, it applies a stateful map function to generate a
round-robin id, which is order sensitive and makes the RDD's output
indeterminate. When the stage contains `repartition` reruns, we must
also rerun all the tasks of all the succeeding stages.
**future improvement:** 1. Currently we can't rollback and rerun a
shuffle map stage, and just fail. We should fix it later. 2. Currently we can't
rollback and rerun a result stage, and just fail. We should fix it
later. 3. We should
provide public API to allow users to tag the random level of the RDD's
computing function.
a new test case
Closes #22354 from cloud-fan/repartition.
Authored-by: Wenchen Fan <> Signed-off-by: Wenchen
Fan <>
The file was modifiedcore/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala (diff)
The file was modifiedcore/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala (diff)
The file was modifiedcore/src/main/scala/org/apache/spark/rdd/RDD.scala (diff)
The file was modifiedcore/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala (diff)
The file was modifiedcore/src/main/scala/org/apache/spark/Partitioner.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala (diff)