1. [SPARK-24334] Fix race condition in ArrowPythonRunner causes unclean (commit: 9b0f6f530dcc86c0865744f0d51c9577b12aa216) (details)
  2. [SPARK-24392][PYTHON] Label pandas_udf as Experimental (commit: 8bb6c2285c6017f28d8c94f4030df518f6d3048d) (details)
  3. [SPARK-24373][SQL] Add AnalysisBarrier to RelationalGroupedDataset's and (commit: a9700cb4a622b5dd2fd379292f293ad4a603e87d) (details)
Commit 9b0f6f530dcc86c0865744f0d51c9577b12aa216 by hyukjinkwon
[SPARK-24334] Fix race condition in ArrowPythonRunner causes unclean
shutdown of Arrow memory allocator
## What changes were proposed in this pull request?
There is a race condition of closing Arrow VectorSchemaRoot and
Allocator in the writer thread of ArrowPythonRunner.
The race results in memory leak exception when closing the allocator.
This patch removes the closing routine from the TaskCompletionListener
and make the writer thread responsible for cleaning up the Arrow memory.
This issue be reproduced by this test:
``` def test_memory_leak(self):
   from pyspark.sql.functions import pandas_udf, col, PandasUDFType,
array, lit, explode
   # Have all data in a single executor thread so it can trigger the
race condition easier
   with self.sql_conf({'spark.sql.shuffle.partitions': 1}):
       df = self.spark.range(0, 1000)
       df = df.withColumn('id', array([lit(i) for i in range(0, 300)]))
                  .withColumn('id', explode(col('id'))) \
                  .withColumn('v',  array([lit(i) for i in range(0,
       pandas_udf(df.schema, PandasUDFType.GROUPED_MAP)
      def foo(pdf):
          return pdf
       result = df.groupby('id').apply(foo)
       with QuietTest(
          with self.assertRaises(py4j.protocol.Py4JJavaError) as
          self.assertTrue('Memory leaked' not in str(context.exception))
Note: Because of the race condition, the test case cannot reproduce the
issue reliably so it's not added to test cases.
## How was this patch tested?
Because of the race condition, the bug cannot be unit test easily. So
far it has only happens on large amount of data. This is currently
tested manually.
Author: Li Jin <>
Closes #21397 from icexelloss/SPARK-24334-arrow-memory-leak.
(cherry picked from commit 672209f2909a95e891f3c779bfb2f0e534239851)
Signed-off-by: hyukjinkwon <>
(commit: 9b0f6f530dcc86c0865744f0d51c9577b12aa216)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowPythonRunner.scala (diff)
Commit 8bb6c2285c6017f28d8c94f4030df518f6d3048d by hyukjinkwon
[SPARK-24392][PYTHON] Label pandas_udf as Experimental
The pandas_udf functionality was introduced in 2.3.0, but is not
completely stable and still evolving.  This adds a label to indicate it
is still an experimental API.
Author: Bryan Cutler <>
Closes #21435 from
(cherry picked from commit fa2ae9d2019f839647d17932d8fea769e7622777)
Signed-off-by: hyukjinkwon <>
(commit: 8bb6c2285c6017f28d8c94f4030df518f6d3048d)
The file was modifieddocs/ (diff)
The file was modifiedpython/pyspark/sql/ (diff)
The file was modifiedpython/pyspark/sql/ (diff)
The file was modifiedpython/pyspark/sql/ (diff)
The file was modifiedpython/pyspark/sql/ (diff)
Commit a9700cb4a622b5dd2fd379292f293ad4a603e87d by wenchen
[SPARK-24373][SQL] Add AnalysisBarrier to RelationalGroupedDataset's and
KeyValueGroupedDataset's child
When we create a `RelationalGroupedDataset` or a
`KeyValueGroupedDataset` we set its child to the `logicalPlan` of the
`DataFrame` we need to aggregate. Since the `logicalPlan` is already
analyzed, we should not analyze it again. But this happens when the new
plan of the aggregate is analyzed.
The current behavior in most of the cases is likely to produce no harm,
but in other cases re-analyzing an analyzed plan can change it, since
the analysis is not idempotent. This can cause issues like the one
described in the JIRA (missing to find a cached plan).
The PR adds an `AnalysisBarrier` to the `logicalPlan` which is used as
child of `RelationalGroupedDataset` or a `KeyValueGroupedDataset`.
added UT
Author: Marco Gaido <>
Closes #21432 from mgaido91/SPARK-24373.
(cherry picked from commit de01a8d50c9c3e196591db057d544f5d7b24d95f)
Signed-off-by: Wenchen Fan <>
(commit: a9700cb4a622b5dd2fd379292f293ad4a603e87d)
The file was addedsql/core/src/test/scala/org/apache/spark/sql/GroupedDatasetSuite.scala
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/Dataset.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala (diff)