1. [SPARK-24589][CORE] Correctly identify tasks in output commit (commit: 3a4b6f3be50fe2b400565c303675f6d9c23ecf41) (details)
  2. [SPARK-24588][SS] streaming join should require (commit: a1e9640079d4a567adb87a78eee8e4bcb1431a14) (details)
Commit 3a4b6f3be50fe2b400565c303675f6d9c23ecf41 by tgraves
[SPARK-24589][CORE] Correctly identify tasks in output commit
When an output stage is retried, it's possible that tasks from the
previous attempt are still running. In that case, there would be a new
task for the same partition in the new attempt, and the coordinator
would allow both tasks to commit their output since it did not keep
track of stage attempts.
The change adds more information to the stage state tracked by the
coordinator, so that only one task is allowed to commit the output in
the above case. The stage state in the coordinator is also maintained
across stage retries, so that a stray speculative task from a previous
stage attempt is not allowed to commit.
This also removes some code added in SPARK-18113 that allowed for
duplicate commit requests; with the RPC code used in Spark 2, that
situation cannot happen, so there is no need to handle it.
Author: Marcelo Vanzin <>
Closes #21577 from vanzin/SPARK-24552.
(cherry picked from commit c8e909cd498b67b121fa920ceee7631c652dac38)
Signed-off-by: Thomas Graves <>
(commit: 3a4b6f3be50fe2b400565c303675f6d9c23ecf41)
The file was modifiedcore/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala (diff)
The file was modifiedcore/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala (diff)
The file was modifiedcore/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala (diff)
The file was modifiedcore/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala (diff)
Commit a1e9640079d4a567adb87a78eee8e4bcb1431a14 by gatorsmile
[SPARK-24588][SS] streaming join should require
HashClusteredPartitioning from children
## What changes were proposed in this pull request?
In we simplified the
distribution/partitioning framework, and make all the join-like
operators require `HashClusteredDistribution` from children.
Unfortunately streaming join operator was missed.
This can cause wrong result. Think about
``` val input1 = MemoryStream[Int] val input2 = MemoryStream[Int]
val df1 ='value as 'a, 'value * 2 as 'b) val df2 ='value as 'a, 'value * 2 as 'b).repartition('b) val
joined = df1.join(df2, Seq("a", "b")).select('a)
The physical plan is
*(3) Project [a#5]
+- StreamingSymmetricHashJoin [a#5, b#6], [a#10, b#11], Inner, condition
= [ leftOnly = null, rightOnly = null, both = null, full = null ], state
info [ checkpoint = <unknown>, runId =
54e31fce-f055-4686-b75d-fcd2b076f8d8, opId = 0, ver = 0, numPartitions =
5], 0, state cleanup [ left = null, right = null ]
  :- Exchange hashpartitioning(a#5, b#6, 5)
  :  +- *(1) Project [value#1 AS a#5, (value#1 * 2) AS b#6]
  :     +- StreamingRelation MemoryStream[value#1], [value#1]
  +- Exchange hashpartitioning(b#11, 5)
     +- *(2) Project [value#3 AS a#10, (value#3 * 2) AS b#11]
        +- StreamingRelation MemoryStream[value#3], [value#3]
The left table is hash partitioned by `a, b`, while the right table is
hash partitioned by `b`. This means, we may have a matching record that
is in different partitions, which should be in the output but not.
## How was this patch tested?
Author: Wenchen Fan <>
Closes #21587 from cloud-fan/join.
(cherry picked from commit dc8a6befa5dad861a731b4d7865f3ccf37482ae0)
Signed-off-by: Xiao Li <>
(commit: a1e9640079d4a567adb87a78eee8e4bcb1431a14)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala (diff)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala (diff)
The file was modifiedsql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/DistributionSuite.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourcePartitioning.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala (diff)