SuccessChanges

Summary

  1. [SPARK-24687][CORE] Avoid job hanging when generate task binary causes (commit: a22a11b3a1160b6564e5c39571a4b13e29b14936) (details)
Commit a22a11b3a1160b6564e5c39571a4b13e29b14936 by sean.owen
[SPARK-24687][CORE] Avoid job hanging when generate task binary causes
fatal error
## What changes were proposed in this pull request? When
NoClassDefFoundError thrown,it will cause job hang.
`Exception in thread "dag-scheduler-event-loop"
java.lang.NoClassDefFoundError:
Lcom/xxx/data/recommend/aggregator/queue/QueueName;
at java.lang.Class.getDeclaredFields0(Native Method)
at java.lang.Class.privateGetDeclaredFields(Class.java:2436)
at java.lang.Class.getDeclaredField(Class.java:1946)
at
java.io.ObjectStreamClass.getDeclaredSUID(ObjectStreamClass.java:1659)
at java.io.ObjectStreamClass.access$700(ObjectStreamClass.java:72)
at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:480)
at java.io.ObjectStreamClass$2.run(ObjectStreamClass.java:468)
at java.security.AccessController.doPrivileged(Native Method)
at java.io.ObjectStreamClass.<init>(ObjectStreamClass.java:468)
at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:365)
at java.io.ObjectOutputStream.writeClass(ObjectOutputStream.java:1212)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1119)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377)`
It is caused by NoClassDefFoundError will not catch up during task
seriazation.
`var taskBinary: Broadcast[Array[Byte]] = null
   try {
     // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
     // For ResultTask, serialize and broadcast (rdd, func).
     val taskBinaryBytes: Array[Byte] = stage match {
       case stage: ShuffleMapStage =>
         JavaUtils.bufferToArray(
           closureSerializer.serialize((stage.rdd, stage.shuffleDep):
AnyRef))
       case stage: ResultStage =>
         JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd,
stage.func): AnyRef))
     }
      taskBinary = sc.broadcast(taskBinaryBytes)
   } catch {
     // In the case of a failure during serialization, abort the stage.
     case e: NotSerializableException =>
       abortStage(stage, "Task not serializable: " + e.toString,
Some(e))
       runningStages -= stage
        // Abort execution
       return
     case NonFatal(e) =>
       abortStage(stage, s"Task serialization failed:
$e\n${Utils.exceptionString(e)}", Some(e))
       runningStages -= stage
       return
   }` image below shows that stage 33 blocked and never be scheduled.
<img width="1273" alt="2018-06-28 4 28 42"
src="https://user-images.githubusercontent.com/26762018/42621188-b87becca-85ef-11e8-9a0b-0ddf07504c96.png">
<img width="569" alt="2018-06-28 4 28 49"
src="https://user-images.githubusercontent.com/26762018/42621191-b8b260e8-85ef-11e8-9d10-e97a5918baa6.png">
## How was this patch tested? UT
Closes #21664 from caneGuy/zhoukang/fix-noclassdeferror.
Authored-by: zhoukang <zhoukang199191@gmail.com> Signed-off-by: Sean
Owen <sean.owen@databricks.com>
(cherry picked from commit 7c8f4756c34a0b00931c2987c827a18d989e6c08)
Signed-off-by: Sean Owen <sean.owen@databricks.com>
(commit: a22a11b3a1160b6564e5c39571a4b13e29b14936)
The file was modifiedcore/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala (diff)