SuccessChanges

Summary

  1. [SPARK-24002][SQL][BACKPORT-2.3] Task not serializable caused by (commit: 1708de27e84fa722510b6220471dd1746a4f581f) (details)
  2. [SPARK-21945][YARN][PYTHON] Make --py-files work with PySpark shell in (commit: 28973e152f7de73f82583ab373ffdecc55dcabc2) (details)
Commit 1708de27e84fa722510b6220471dd1746a4f581f by wenchen
[SPARK-24002][SQL][BACKPORT-2.3] Task not serializable caused by
org.apache.parquet.io.api.Binary$ByteBufferBackedBinary.getBytes
This PR is to backport https://github.com/apache/spark/pull/21086 to
Apache Spark 2.3
----
``` Py4JJavaError: An error occurred while calling o153.sql.
: org.apache.spark.SparkException: Job aborted.
at
org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:223)
at
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:189)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:190)
at org.apache.spark.sql.Dataset$$anonfun$6.apply(Dataset.scala:190)
at org.apache.spark.sql.Dataset$$anonfun$59.apply(Dataset.scala:3021)
at
org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:89)
at
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:127)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3020)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:190)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:74)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:646)
at sun.reflect.GeneratedMethodAccessor153.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
at py4j.Gateway.invoke(Gateway.java:293)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:226)
at java.lang.Thread.run(Thread.java:748) Caused by:
org.apache.spark.SparkException: Exception thrown in Future.get:
at
org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:190)
at
org.apache.spark.sql.execution.InputAdapter.doExecuteBroadcast(WholeStageCodegenExec.scala:267)
at
org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec.doConsume(BroadcastNestedLoopJoinExec.scala:530)
at
org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:155)
at
org.apache.spark.sql.execution.ProjectExec.consume(basicPhysicalOperators.scala:37)
at
org.apache.spark.sql.execution.ProjectExec.doConsume(basicPhysicalOperators.scala:69)
at
org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:155)
at
org.apache.spark.sql.execution.FilterExec.consume(basicPhysicalOperators.scala:144)
...
at
org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:190)
... 23 more Caused by: java.util.concurrent.ExecutionException:
org.apache.spark.SparkException: Task not serializable
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:206)
at
org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:179)
... 276 more Caused by: org.apache.spark.SparkException: Task not
serializable
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:340)
at
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:330)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:156)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2380)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:850)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1.apply(RDD.scala:849)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:371)
at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:849)
at
org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:417)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:123)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:118)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$3.apply(SparkPlan.scala:152)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:149)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:118)
at
org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:89)
at
org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:125)
at
org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:116)
at
org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
at
org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:116)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:123)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:118)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$3.apply(SparkPlan.scala:152)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:149)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:118)
at
org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:271)
at
org.apache.spark.sql.execution.aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:181)
at
org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:414)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:123)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:118)
at
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$3.apply(SparkPlan.scala:152)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:149)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:118)
at
org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:61)
at
org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:70)
at
org.apache.spark.sql.execution.SparkPlan.executeCollectResult(SparkPlan.scala:264)
at
org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anon$1$$anonfun$call$1.apply(BroadcastExchangeExec.scala:93)
at
org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anon$1$$anonfun$call$1.apply(BroadcastExchangeExec.scala:81)
at
org.apache.spark.sql.execution.SQLExecution$.withExecutionId(SQLExecution.scala:150)
at
org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anon$1.call(BroadcastExchangeExec.scala:80)
at
org.apache.spark.sql.execution.exchange.BroadcastExchangeExec$$anon$1.call(BroadcastExchangeExec.scala:76)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more Caused by: java.nio.BufferUnderflowException
at java.nio.HeapByteBuffer.get(HeapByteBuffer.java:151)
at java.nio.ByteBuffer.get(ByteBuffer.java:715)
at
org.apache.parquet.io.api.Binary$ByteBufferBackedBinary.getBytes(Binary.java:405)
at
org.apache.parquet.io.api.Binary$ByteBufferBackedBinary.getBytesUnsafe(Binary.java:414)
at
org.apache.parquet.io.api.Binary$ByteBufferBackedBinary.writeObject(Binary.java:484)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1128)
at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
```
The Parquet filters are serializable but not thread safe.
SparkPlan.prepare() could be called in different threads
(BroadcastExchange will call it in a thread pool). Thus, we could
serialize the same Parquet filter at the same time. This is not easily
reproduced. The fix is to avoid serializing these Parquet filters in the
driver. This PR is to avoid serializing these Parquet filters by moving
the parquet filter generation from the driver to executors.
## How was this patch tested?
Having two queries one is a 1000-line SQL query and a 3000-line SQL
query. Need to run at least one hour with a heavy write workload to
reproduce once.
Author: gatorsmile <gatorsmile@gmail.com>
Closes #21351 from gatorsmile/backportSPARK-24002.
(commit: 1708de27e84fa722510b6220471dd1746a4f581f)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala (diff)
Commit 28973e152f7de73f82583ab373ffdecc55dcabc2 by vanzin
[SPARK-21945][YARN][PYTHON] Make --py-files work with PySpark shell in
Yarn client mode
When we run _PySpark shell with Yarn client mode_, specified
`--py-files` are not recognised in _driver side_.
Here are the steps I took to check:
```bash
$ cat /home/spark/tmp.py def testtest():
   return 1
```
```bash
$ ./bin/pyspark --master yarn --deploy-mode client --py-files
/home/spark/tmp.py
```
```python
>>> def test():
...     import tmp
...     return tmp.testtest()
...
>>> spark.range(1).rdd.map(lambda _: test()).collect()  # executor side
[1]
>>> test()  # driver side Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "<stdin>", line 2, in test ImportError: No module named tmp
```
Unlike Yarn cluster and client mode with Spark submit, when Yarn client
mode with PySpark shell specifically,
1. It first runs Python shell via:
https://github.com/apache/spark/blob/3cb82047f2f51af553df09b9323796af507d36f8/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java#L158
as pointed out by tgravescs in the JIRA.
2. this triggers shell.py and submit another application to launch a
py4j gateway:
https://github.com/apache/spark/blob/209b9361ac8a4410ff797cff1115e1888e2f7e66/python/pyspark/java_gateway.py#L45-L60
3. it runs a Py4J gateway:
https://github.com/apache/spark/blob/3cb82047f2f51af553df09b9323796af507d36f8/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L425
4. it copies (or downloads) --py-files  into local temp directory:
https://github.com/apache/spark/blob/3cb82047f2f51af553df09b9323796af507d36f8/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L365-L376
and then these files are set up to `spark.submit.pyFiles`
5. Py4J JVM is launched and then the Python paths are set via:
https://github.com/apache/spark/blob/7013eea11cb32b1e0038dc751c485da5c94a484b/python/pyspark/context.py#L209-L216
However, these are not actually set because those files were copied into
a tmp directory in 4. whereas this code path looks for
`SparkFiles.getRootDirectory` where the files are stored only when
`SparkContext.addFile()` is called.
In other cluster mode, `spark.files` are set via:
https://github.com/apache/spark/blob/3cb82047f2f51af553df09b9323796af507d36f8/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L554-L555
and those files are explicitly added via:
https://github.com/apache/spark/blob/ecb8b383af1cf1b67f3111c148229e00c9c17c40/core/src/main/scala/org/apache/spark/SparkContext.scala#L395
So we are fine in other modes.
In case of Yarn client and cluster with _submit_, these are manually
being handled. In particular https://github.com/apache/spark/pull/6360
added most of the logics. In this case, the Python path looks manually
set via, for example, `deploy.PythonRunner`. We don't use `spark.files`
here.
I tried to make an isolated approach as possible as I can: simply copy
py file or zip files into `SparkFiles.getRootDirectory()` in driver side
if not existing. Another possible way is to set `spark.files` but it
does unnecessary stuff together and sounds a bit invasive.
**Before**
```python
>>> def test():
...     import tmp
...     return tmp.testtest()
...
>>> spark.range(1).rdd.map(lambda _: test()).collect()
[1]
>>> test() Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "<stdin>", line 2, in test ImportError: No module named tmp
```
**After**
```python
>>> def test():
...     import tmp
...     return tmp.testtest()
...
>>> spark.range(1).rdd.map(lambda _: test()).collect()
[1]
>>> test() 1
```
I manually tested in standalone and yarn cluster with PySpark shell.
.zip and .py files were also tested with the similar steps above. It's
difficult to add a test.
Author: hyukjinkwon <gurwls223@apache.org>
Closes #21267 from HyukjinKwon/SPARK-21945.
(cherry picked from commit 9a641e7f721d01d283afb09dccefaf32972d3c04)
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
(commit: 28973e152f7de73f82583ab373ffdecc55dcabc2)
The file was modifiedpython/pyspark/context.py (diff)