Changes

Summary

  1. [SPARK-37048][PYTHON] Clean up inlining type hints under SQL module (details)
  2. [SPARK-37025][BUILD] Update RoaringBitmap version to 0.9.22 (details)
  3. [SPARK-37061][SQL] Fix CustomMetrics when using Inner Classes (details)
  4. [SPARK-37033][PYTHON] Inline type hints for (details)
  5. [SPARK-36952][PYTHON] Inline type hints for (details)
  6. [SPARK-37079][PYTHON][SQL] Fix DataFrameWriterV2.partitionedBy to send (details)
  7. [SPARK-37078][CORE] Support old 3-parameter Sink constructors (details)
  8. [SPARK-37080][INFRA] Add benchmark tool guide in pull request template (details)
  9. [SPARK-37076][SQL] Implement StructType.toString explicitly for Scala (details)
  10. [SPARK-37075][SQL] Move UDAF expression building from sql/catalyst to (details)
  11. [SPARK-33277][PYSPARK][SQL] Writer thread must not access input after (details)
Commit 64eabb6292baaaf18ee4e31cb48b204ef64aa488 by ueshin
[SPARK-37048][PYTHON] Clean up inlining type hints under SQL module

### What changes were proposed in this pull request?

Cleans up inlining type hints under SQL module.

### Why are the changes needed?

Now that most of type hits under the SQL module are inlined, we should clean up for the module now.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

`lint-python` and existing tests should pass.

Closes #34318 from ueshin/issues/SPARK-37048/cleanup.

Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: Takuya UESHIN <ueshin@databricks.com>
The file was modifiedpython/pyspark/sql/dataframe.py (diff)
The file was modifiedpython/pyspark/pandas/spark/functions.py (diff)
The file was modifiedpython/pyspark/sql/pandas/conversion.py (diff)
The file was modifiedpython/pyspark/sql/context.py (diff)
The file was modifiedpython/pyspark/pandas/frame.py (diff)
The file was modifiedpython/pyspark/pandas/window.py (diff)
The file was modifiedpython/pyspark/sql/streaming.py (diff)
The file was modifiedpython/pyspark/sql/readwriter.py (diff)
The file was modifiedpython/pyspark/sql/udf.py (diff)
The file was modifiedpython/pyspark/pandas/data_type_ops/base.py (diff)
The file was modifiedpython/pyspark/sql/avro/functions.py (diff)
The file was modifiedpython/pyspark/pandas/generic.py (diff)
The file was modifiedpython/pyspark/sql/group.py (diff)
The file was modifiedpython/pyspark/sql/window.py (diff)
The file was modifiedpython/pyspark/sql/types.py (diff)
The file was modifiedpython/pyspark/sql/functions.py (diff)
The file was modifiedpython/pyspark/sql/pandas/group_ops.py (diff)
The file was modifiedpython/pyspark/sql/catalog.py (diff)
The file was modifiedpython/pyspark/sql/column.py (diff)
The file was modifiedpython/pyspark/sql/observation.py (diff)
The file was modifiedpython/pyspark/sql/session.py (diff)
The file was modifiedpython/pyspark/sql/utils.py (diff)
The file was modifiedpython/pyspark/sql/conf.py (diff)
The file was modifiedpython/pyspark/sql/tests/test_functions.py (diff)
Commit eda3fd0948b7328ec496ee6623e9c30e2dacbfb3 by srowen
[SPARK-37025][BUILD] Update RoaringBitmap version to 0.9.22

### What changes were proposed in this pull request?
RoaringBitmap library subminor version upgrade

### Why are the changes needed?
Pull in latest fixes and performance improvements for RoaringBitmap

### Does this PR introduce any user-facing change?
No

### How was this patch tested?
Spark CI

Closes #34300 from medb/roaringbitmap-0.9.22.

Authored-by: Igor Dvorzhak <idv@google.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
The file was modifiedpom.xml (diff)
The file was modifieddev/deps/spark-deps-hadoop-3.2-hive-2.3 (diff)
The file was modifieddev/deps/spark-deps-hadoop-2.7-hive-2.3 (diff)
Commit 2ce551e3e14cbba09ab67bb54e8d79f5062312be by dongjoon
[SPARK-37061][SQL] Fix CustomMetrics when using Inner Classes

### What changes were proposed in this pull request?
Previously CustomMetrics use Class.getCanonicalName when attempting to get the
class name of CustomMetric implementations. These names replace special characters
for marking inner classes like ($) with ".". While those names are appropriate for
referring to classes within source files, they will not work during reflection where
the Class.getName output should be used.

### Why are the changes needed?
InnerClasses could never be found in when they are used as Custom Metrics

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Tests modified so they access both an independent metric class as well as an inner class.

Closes #34345 from RussellSpitzer/SPARK-37061.

Authored-by: Russell Spitzer <russell.spitzer@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/metric/CustomMetrics.scala (diff)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala (diff)
Commit 857d9f93a889f35220011f2e4a45aaf747c1e894 by gurwls223
[SPARK-37033][PYTHON] Inline type hints for python/pyspark/resource/requests.py

### What changes were proposed in this pull request?
Inline type hints for python/pyspark/resource/requests.py

### Why are the changes needed?
Currently, there is type hint stub files python/pyspark/streaming/context.pyi to show the expected types for functions, but we can also take advantage of static type checking within the functions by inlining the type hints.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Existing tests

Closes #34321 from dchvn/SPARK-37033.

Authored-by: dchvn <dgd_contributor@viettel.com.vn>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
The file was removedpython/pyspark/resource/requests.pyi
The file was modifiedpython/pyspark/resource/requests.py (diff)
Commit 482eeca4168925e55d9b550f3c066717d53f60c4 by gurwls223
[SPARK-36952][PYTHON] Inline type hints for python/pyspark/resource/information.py and python/pyspark/resource/profile.py

### What changes were proposed in this pull request?
Inline type hints for python/pyspark/resource/information.py and python/pyspark/resource/profile.py

### Why are the changes needed?
Currently, there is type hint stub files to show the expected types for functions, but we can also take advantage of static type checking within the functions by inlining the type hints.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Existing test.

Closes #34222 from dchvn/SPARK-36952.

Authored-by: dch nguyen <dgd_contributor@viettel.com.vn>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
The file was modifiedpython/pyspark/resource/information.py (diff)
The file was removedpython/pyspark/resource/profile.pyi
The file was modifiedpython/pyspark/resource/profile.py (diff)
The file was removedpython/pyspark/resource/information.pyi
Commit 33deeb35f1c994328b577970d4577e6d9288bfc2 by gurwls223
[SPARK-37079][PYTHON][SQL] Fix DataFrameWriterV2.partitionedBy to send the arguments to JVM properly

### What changes were proposed in this pull request?

Fix `DataFrameWriterV2.partitionedBy` to send the arguments to JVM properly.

### Why are the changes needed?

In PySpark, `DataFrameWriterV2.partitionedBy` doesn't send the arguments to JVM properly.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Manually checked whether the arguments are sent to JVM or not.

Closes #34347 from ueshin/issues/SPARK-37079/partitionBy.

Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
The file was modifiedpython/pyspark/sql/readwriter.py (diff)
Commit 427816d165c5754e0e017bec3fdc7ae34a6786c1 by dongjoon
[SPARK-37078][CORE] Support old 3-parameter Sink constructors

### What changes were proposed in this pull request?

This PR aims to support 3-parameter Sink constructors which have `SecurityManager` as the 3rd parameter.

### Why are the changes needed?

Apache Spark 3.2.0 cannot load old Sink libraries because SPARK-34520 removed `SecurityManager` parameter and try to create Sink class with new two-parameter constructor only.

### Does this PR introduce _any_ user-facing change?

This will recover the breaking change.

### How was this patch tested?

Pass the CIs with newly added test coverage.

Closes #34348 from dongjoon-hyun/SPARK-37078.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
The file was modifiedcore/src/test/scala/org/apache/spark/metrics/MetricsSystemSuite.scala (diff)
The file was modifiedcore/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala (diff)
Commit 6f0d109513f47ff85afc1828cfe207865cd549de by gurwls223
[SPARK-37080][INFRA] Add benchmark tool guide in pull request template

### What changes were proposed in this pull request?

Add benchmark tool guide in pull request template

### Why are the changes needed?

Add benchmark tool guide in pull request template to help developers find it.

### Does this PR introduce _any_ user-facing change?

yes, pr template changed

### How was this patch tested?

not needed

Closes #34349 from ulysses-you/benchmark-guide1.

Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
The file was modified.github/PULL_REQUEST_TEMPLATE (diff)
Commit c3c58b8b1cf1417d9c820d6f14dcf92dd5c7f8ec by gurwls223
[SPARK-37076][SQL] Implement StructType.toString explicitly for Scala 2.13

### What changes were proposed in this pull request?

This PR fixes an issue that the string returned by `StructType.toString` is different between Scala 2.12 and 2.13.

* Scala 2.12
```
val st = StructType(StructField("a", IntegerType) :: Nil)
st.toString
res0: String = StructType(StructField(a,IntegerType,true)
```

* Scala 2.13
```
val st = StructType(StructField("a", IntegerType) :: Nil)
st.toString
val res0: String = Seq(StructField(a,IntegerType,true))
```

It's because the logic to make the prefix of the string was changed from Scala 2.13.

Scala 2.12: https://github.com/scala/scala/blob/v2.12.15/src/library/scala/collection/TraversableLike.scala#L804
Scala 2:13:https://github.com/scala/scala/blob/v2.13.5/src/library/scala/collection/Seq.scala#L46

One option which solves this issue would be to override `className` because it's used as the prefix of the string returned by `toString`.
https://github.com/scala/scala/blob/v2.13.5/src/library/scala/collection/Iterable.scala#L76

But it doesn't works with Scala 2.12 because `className` was introduced from Scala 2.12.
Another option would be to override `stringPrefix` and this should work with both Scala 2.12 and 2.13.
https://github.com/scala/scala/blob/v2.12.15/src/library/scala/collection/TraversableLike.scala#L796
https://github.com/scala/scala/blob/v2.13.5/src/library/scala/collection/Iterable.scala#L56

But we need to make the method `public` for Scala 2.12 though it is expected to be used only from `toString`, while the method is `private[this]` for Scala 2.13.

So, rather than exposing a new public method, this PR proposes to implement `Struct.toString` explicitly.

### Why are the changes needed?

To fix a kind of compatibility issue.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New test is added and confirmed it passed with the following command.
```
build/sbt  -Pscala-2.13 "testOnly org.apache.spark.sql.types.StructTypeSuite -- -z SPARK-37076"
```

Closes #34341 from sarutak/fix-structtype-tostring.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
The file was modifiedsql/catalyst/src/test/scala/org/apache/spark/sql/types/StructTypeSuite.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala (diff)
Commit 32aa114671e914f51f5a758b5f1a925feac3d9b8 by wenchen
[SPARK-37075][SQL] Move UDAF expression building from sql/catalyst to sql/core

### What changes were proposed in this pull request?

This PR adds a new internal interface `FunctionExpressionBuilder`, to replace `SessionCatalog.makeFunctionExpression`. Then we can put the interface implementation in sql/core, to avoid using reflection in `SessionCatalog.makeFunctionExpression`, because the class `UserDefinedAggregateFunction` is not available in sql/catalyst.

### Why are the changes needed?

code cleanup, and make it easier to support using `Aggregator` as UDAF later (https://github.com/apache/spark/pull/34303).

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

existing tests

Closes #34340 from cloud-fan/function.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala (diff)
The file was modifiedsql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala (diff)
The file was addedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/FunctionExpressionBuilder.scala
The file was modifiedsql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala (diff)
Commit dfca1d153a8350ae90c29c52e741f3dd5c5343cd by wenchen
[SPARK-33277][PYSPARK][SQL] Writer thread must not access input after task completion listener returns

### What changes were proposed in this pull request?

Python UDFs in Spark SQL are run in a separate Python process. The Python process is fed input by a dedicated thread (`BasePythonRunner.WriterThread`). This writer thread drives the child plan by pulling rows from its output iterator and serializing them across a socket.

When the child exec node is the off-heap vectorized Parquet reader, these rows are backed by off-heap memory. The child node uses a task completion listener to free the off-heap memory at the end of the task, which invalidates the output iterator and any rows it has produced. Since task completion listeners are registered bottom-up and executed in reverse order of registration, this is safe as long as an exec node never accesses its input after its task completion listener has executed.[^1]

The BasePythonRunner task completion listener violates this assumption. It interrupts the writer thread, but does not wait for it to exit. This causes a race condition that can lead to an executor crash:
1. The Python writer thread is processing a row backed by off-heap memory.
2. The task finishes, for example because it has reached a row limit.
3. The BasePythonRunner task completion listener sets the interrupt status of the writer thread, but the writer thread does not check it immediately.
4. The child plan's task completion listener frees its off-heap memory, invalidating the row that the Python writer thread is processing.
5. The Python writer thread attempts to access the invalidated row. The use-after-free triggers a segfault that crashes the executor.

This PR fixes the bug by making the BasePythonRunner task completion listener wait for the writer thread to exit before returning. This prevents its input from being invalidated while the thread is running. The sequence of events is now as follows:
1. The Python writer thread is processing a row backed by off-heap memory.
2. The task finishes, for example because it has reached a row limit.
3. The BasePythonRunner task completion listener sets the interrupt status of the writer thread and waits for the writer thread to exit.
4. The child plan's task completion listener can safely free its off-heap memory without invalidating live rows.

TaskContextImpl previously held a lock while invoking the task completion listeners. This would now cause a deadlock because the writer thread's exception handler calls `TaskContextImpl#isCompleted()`, which needs to acquire the same lock. To avoid deadlock, this PR modifies TaskContextImpl to release the lock before invoking the listeners, while still maintaining sequential execution of listeners.

[^1]: This guarantee was not historically recognized, leading to similar bugs as far back as 2014 ([SPARK-1019](https://issues.apache.org/jira/browse/SPARK-1019?focusedCommentId=13953661&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-13953661)). The root cause was the lack of a reliably-ordered mechanism for operators to free resources at the end of a task. Such a mechanism (task completion listeners) was added and gradually refined, and we can now make this guarantee explicit. (An alternative approach is to use closeable iterators everywhere, but this would be a major change.)

### Why are the changes needed?

Without this PR, attempting to use Python UDFs while the off-heap vectorized Parquet reader is enabled (`spark.sql.columnVector.offheap.enabled true`) can cause executors to segfault.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

A [previous PR](https://github.com/apache/spark/pull/30177) reduced the likelihood of encountering this race condition, but did not eliminate it. The accompanying tests were therefore flaky and had to be disabled. This PR eliminates the race condition, allowing us to re-enable these tests. One of the tests, `test_pandas_udf_scalar`, previously failed 30/1000 times and now always succeeds.

An internal workload previously failed with a segfault about 40% of the time when run with `spark.sql.columnVector.offheap.enabled true`, and now succeeds 100% of the time.

Closes #34245 from ankurdave/SPARK-33277-thread-join.

Authored-by: Ankur Dave <ankurdave@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
The file was modifiedpython/pyspark/sql/tests/test_pandas_map.py (diff)
The file was modifiedpython/pyspark/sql/tests/test_pandas_udf_scalar.py (diff)
The file was modifiedcore/src/main/scala/org/apache/spark/TaskContextImpl.scala (diff)
The file was modifiedcore/src/main/scala/org/apache/spark/api/python/PythonRunner.scala (diff)
The file was modifiedcore/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala (diff)
The file was modifiedpython/pyspark/sql/tests/test_udf.py (diff)
The file was modifiedcore/src/main/scala/org/apache/spark/TaskContext.scala (diff)