Regression
org.apache.spark.sql.kafka010.KafkaMicroBatchV2SourceSuite.delete a topic when a Spark job is running
Failing for the past 1 build
(Since
#5031 )

Error Message
org.apache.spark.sql.streaming.StreamingQueryException: Query [id = 8a461d8a-ce02-4d23-895b-60a40680fff0, runId = 377d8e7f-6575-4d5a-b140-d2f8db7730db] terminated with exception: Writing job aborted.
Stacktrace
sbt.ForkMain$ForkError: org.apache.spark.sql.streaming.StreamingQueryException: Query [id = 8a461d8a-ce02-4d23-895b-60a40680fff0, runId = 377d8e7f-6575-4d5a-b140-d2f8db7730db] terminated with exception: Writing job aborted. at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:354) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244) Caused by: sbt.ForkMain$ForkError: org.apache.spark.SparkException: Writing job aborted. at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:413) at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:361) at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.writeWithV2(WriteToDataSourceV2Exec.scala:322) at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.run(WriteToDataSourceV2Exec.scala:329) at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:39) at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:39) at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:45) at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3625) at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2938) at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3616) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3614) at org.apache.spark.sql.Dataset.collect(Dataset.scala:2938) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:576) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:571) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:352) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:571) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:223) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:352) at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:191) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:185) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:333) ... 1 more Caused by: sbt.ForkMain$ForkError: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 198.0 failed 1 times, most recent failure: Lost task 0.0 in stage 198.0 (TID 366, amp-jenkins-worker-04.amp, executor driver): org.scalatest.exceptions.TestFailedDueToTimeoutException: The code passed to eventually never returned normally. Attempted 248 times over 1.2391863584333331 minutes. Last failure message: Session expired either before or while waiting for connection. at org.scalatest.concurrent.Eventually.tryTryAgain$1(Eventually.scala:432) at org.scalatest.concurrent.Eventually.eventually(Eventually.scala:439) at org.scalatest.concurrent.Eventually.eventually$(Eventually.scala:391) at org.scalatest.concurrent.Eventually$.eventually(Eventually.scala:479) at org.scalatest.concurrent.Eventually.eventually(Eventually.scala:308) at org.scalatest.concurrent.Eventually.eventually$(Eventually.scala:307) at org.scalatest.concurrent.Eventually$.eventually(Eventually.scala:479) at org.apache.spark.sql.kafka010.KafkaTestUtils.verifyTopicDeletionWithRetries(KafkaTestUtils.scala:636) at org.apache.spark.sql.kafka010.KafkaTestUtils.deleteTopic(KafkaTestUtils.scala:428) at org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase$$anon$1.open(KafkaMicroBatchSourceSuite.scala:607) at org.apache.spark.sql.execution.streaming.sources.ForeachDataWriter.<init>(ForeachWriterTable.scala:131) at org.apache.spark.sql.execution.streaming.sources.ForeachWriterFactory.createWriter(ForeachWriterTable.scala:110) at org.apache.spark.sql.execution.streaming.sources.ForeachWriterFactory.createWriter(ForeachWriterTable.scala:102) at org.apache.spark.sql.execution.streaming.sources.MicroBatchWriterFactory.createWriter(MicroBatchWrite.scala:48) at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:433) at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:385) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:127) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: kafka.zookeeper.ZooKeeperClientExpiredException: Session expired either before or while waiting for connection at kafka.zookeeper.ZooKeeperClient.$anonfun$waitUntilConnected$3(ZooKeeperClient.scala:267) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:253) at kafka.zookeeper.ZooKeeperClient.waitUntilConnected(ZooKeeperClient.scala:255) at kafka.zookeeper.ZooKeeperClient.$anonfun$waitUntilConnected$1(ZooKeeperClient.scala:249) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:253) at kafka.zookeeper.ZooKeeperClient.waitUntilConnected(ZooKeeperClient.scala:249) at kafka.zk.KafkaZkClient.retryRequestsUntilConnected(KafkaZkClient.scala:1708) at kafka.zk.KafkaZkClient.retryRequestsUntilConnected(KafkaZkClient.scala:1678) at kafka.zk.KafkaZkClient.retryRequestUntilConnected(KafkaZkClient.scala:1673) at kafka.zk.KafkaZkClient.getAllTopicsInCluster(KafkaZkClient.scala:462) at org.apache.spark.sql.kafka010.KafkaTestUtils.verifyTopicDeletion(KafkaTestUtils.scala:627) at org.apache.spark.sql.kafka010.KafkaTestUtils.$anonfun$verifyTopicDeletionWithRetries$1(KafkaTestUtils.scala:638) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.scalatest.concurrent.Eventually.makeAValiantAttempt$1(Eventually.scala:395) at org.scalatest.concurrent.Eventually.tryTryAgain$1(Eventually.scala:409) ... 23 more Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2023) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:1972) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:1971) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1971) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:950) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:950) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:950) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2203) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2152) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2141) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:752) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2093) at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:382) ... 37 more Caused by: sbt.ForkMain$ForkError: org.scalatest.exceptions.TestFailedDueToTimeoutException: The code passed to eventually never returned normally. Attempted 248 times over 1.2391863584333331 minutes. Last failure message: Session expired either before or while waiting for connection. at org.scalatest.concurrent.Eventually.tryTryAgain$1(Eventually.scala:432) at org.scalatest.concurrent.Eventually.eventually(Eventually.scala:439) at org.scalatest.concurrent.Eventually.eventually$(Eventually.scala:391) at org.scalatest.concurrent.Eventually$.eventually(Eventually.scala:479) at org.scalatest.concurrent.Eventually.eventually(Eventually.scala:308) at org.scalatest.concurrent.Eventually.eventually$(Eventually.scala:307) at org.scalatest.concurrent.Eventually$.eventually(Eventually.scala:479) at org.apache.spark.sql.kafka010.KafkaTestUtils.verifyTopicDeletionWithRetries(KafkaTestUtils.scala:636) at org.apache.spark.sql.kafka010.KafkaTestUtils.deleteTopic(KafkaTestUtils.scala:428) at org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase$$anon$1.open(KafkaMicroBatchSourceSuite.scala:607) at org.apache.spark.sql.execution.streaming.sources.ForeachDataWriter.<init>(ForeachWriterTable.scala:131) at org.apache.spark.sql.execution.streaming.sources.ForeachWriterFactory.createWriter(ForeachWriterTable.scala:110) at org.apache.spark.sql.execution.streaming.sources.ForeachWriterFactory.createWriter(ForeachWriterTable.scala:102) at org.apache.spark.sql.execution.streaming.sources.MicroBatchWriterFactory.createWriter(MicroBatchWrite.scala:48) at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:433) at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:385) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:127) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: sbt.ForkMain$ForkError: kafka.zookeeper.ZooKeeperClientExpiredException: Session expired either before or while waiting for connection at kafka.zookeeper.ZooKeeperClient.$anonfun$waitUntilConnected$3(ZooKeeperClient.scala:267) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:253) at kafka.zookeeper.ZooKeeperClient.waitUntilConnected(ZooKeeperClient.scala:255) at kafka.zookeeper.ZooKeeperClient.$anonfun$waitUntilConnected$1(ZooKeeperClient.scala:249) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:253) at kafka.zookeeper.ZooKeeperClient.waitUntilConnected(ZooKeeperClient.scala:249) at kafka.zk.KafkaZkClient.retryRequestsUntilConnected(KafkaZkClient.scala:1708) at kafka.zk.KafkaZkClient.retryRequestsUntilConnected(KafkaZkClient.scala:1678) at kafka.zk.KafkaZkClient.retryRequestUntilConnected(KafkaZkClient.scala:1673) at kafka.zk.KafkaZkClient.getAllTopicsInCluster(KafkaZkClient.scala:462) at org.apache.spark.sql.kafka010.KafkaTestUtils.verifyTopicDeletion(KafkaTestUtils.scala:627) at org.apache.spark.sql.kafka010.KafkaTestUtils.$anonfun$verifyTopicDeletionWithRetries$1(KafkaTestUtils.scala:638) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.scalatest.concurrent.Eventually.makeAValiantAttempt$1(Eventually.scala:395) at org.scalatest.concurrent.Eventually.tryTryAgain$1(Eventually.scala:409) ... 23 more