Changes

Summary

  1. [SPARK-35467][SPARK-35468][SPARK-35477][PYTHON] Fix (commit: 1b75c24) (details)
  2. [SPARK-35495][R] Change SparkR maintainer for CRAN (commit: 1530876) (details)
  3. [SPARK-35449][SQL] Only extract common expressions from CaseWhen values (commit: 6c0c617) (details)
  4. [SPARK-34981][SQL][FOLLOWUP] Use SpecificInternalRow in (commit: c709efc) (details)
  5. [SPARK-35287][SQL] Allow RemoveRedundantProjects to preserve ProjectExec (commit: d4fb983) (details)
  6. [SPARK-35498][PYTHON] Add thread target wrapper API for pyspark pin (commit: fdd7ca5) (details)
  7. [SPARK-33122][SQL][FOLLOWUP] Extend RemoveRedundantAggregates optimizer (commit: 548e37b) (details)
  8. [SPARK-35497][PYTHON] Enable plotly tests in pandas-on-Spark (commit: 4a6d844) (details)
  9. [SPARK-35455][SQL] Unify empty relation optimization between normal and (commit: 631077d) (details)
  10. [SPARK-35486][CORE] TaskMemoryManager: retry if other task takes memory (commit: 58d4da1) (details)
  11. [SPARK-29223][SQL][SS] New option to specify timestamp on all (commit: a57afd4) (details)
  12. [SPARK-35447][SQL] Optimize skew join before coalescing shuffle (commit: 859a534) (details)
  13. [SPARK-35396] Add AutoCloseable close to BlockManager and (commit: 7258f69) (details)
  14. [SPARK-35514][INFRA] Automatically update version index of DocSearch via (commit: 321c654) (details)
  15. [SPARK-35505][PYTHON] Remove APIs which have been deprecated in Koalas (commit: d67d73b) (details)
  16. [SPARK-35513][BUILD] Update joda-time to 2.10.10 (commit: 4ba1db9) (details)
  17. [SPARK-35506][PYTHON][INFRA] Run tests with Python 3.9 in GitHub Actions (commit: e47e615) (details)
  18. [SPARK-32194][PYTHON] Use proper exception classes instead of plain (commit: 20750a3) (details)
  19. [SPARK-35440][SQL] Add function type to `ExpressionInfo` for UDF (commit: af1dba7) (details)
Commit 1b75c2494cda3d75600b18665fa500dda78cdebe by gurwls223
[SPARK-35467][SPARK-35468][SPARK-35477][PYTHON] Fix disallow_untyped_defs mypy checks

### What changes were proposed in this pull request?

Adds more type annotations in the files:

- `python/pyspark/pandas/spark/accessors.py`
- `python/pyspark/pandas/typedef/typehints.py`
- `python/pyspark/pandas/utils.py`

and fixes the mypy check failures.

### Why are the changes needed?

We should enable more `disallow_untyped_defs` mypy checks.

### Does this PR introduce _any_ user-facing change?

Yes.
This PR adds more type annotations in pandas APIs on Spark module, which can impact interaction with development tools for users.

### How was this patch tested?

The mypy check with a new configuration and existing tests should pass.

Closes #32627 from ueshin/issues/SPARK-35467_35468_35477/disallow_untyped_defs.

Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(commit: 1b75c24)
The file was modifiedpython/mypy.ini (diff)
The file was modifiedpython/pyspark/pandas/spark/accessors.py (diff)
The file was modifiedpython/pyspark/pandas/base.py (diff)
The file was modifiedpython/pyspark/pandas/namespace.py (diff)
The file was modifiedpython/pyspark/pandas/typedef/typehints.py (diff)
The file was modifiedpython/pyspark/pandas/utils.py (diff)
Commit 1530876615a64b009fff261da1ba064d03778fca by dhyun
[SPARK-35495][R] Change SparkR maintainer for CRAN

### What changes were proposed in this pull request?

As discussed, update SparkR maintainer for future release.

### Why are the changes needed?

Shivaram will not be able to work with this in the future, so we would like to migrate off the maintainer contact email.

shivaram

Closes #32642 from felixcheung/sparkr-maintainer.

Authored-by: Felix Cheung <felixcheung@users.noreply.github.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
(commit: 1530876)
The file was modifiedR/pkg/DESCRIPTION (diff)
Commit 6c0c617bd0639e52a7a37660c52db667d92b4fee by viirya
[SPARK-35449][SQL] Only extract common expressions from CaseWhen values if elseValue is set

### What changes were proposed in this pull request?

This PR fixes a bug with subexpression elimination for CaseWhen statements. https://github.com/apache/spark/pull/30245 added support for creating subexpressions that are present in all branches of conditional statements. However, for a statement to be in "all branches" of a CaseWhen statement, it must also be in the elseValue.

### Why are the changes needed?

Fix a bug where a subexpression can be created and run for branches of a conditional that don't pass. This can cause issues especially with a UDF in a branch that gets executed assuming the condition is true.

### Does this PR introduce _any_ user-facing change?

Yes, fixes a potential bug where a UDF could be eagerly executed even though it might expect to have already passed some form of validation. For example:
```
val col = when($"id" < 0, myUdf($"id"))
spark.range(1).select(when(col > 0, col)).show()
```

`myUdf($"id")` is considered a subexpression and eagerly evaluated, because it is pulled out as a common expression from both executions of the when clause, but if `id >= 0` it should never actually be run.

### How was this patch tested?

Updated existing test with new case.

Closes #32595 from Kimahriman/bug-case-subexpr-elimination.

Authored-by: Adam Binford <adamq43@gmail.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
(commit: 6c0c617)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala (diff)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala (diff)
The file was modifiedsql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SubexpressionEliminationSuite.scala (diff)
Commit c709efc1e7952f90547ebdba1d3fd173773bb902 by gurwls223
[SPARK-34981][SQL][FOLLOWUP] Use SpecificInternalRow in ApplyFunctionExpression

### What changes were proposed in this pull request?

Use `SpecificInternalRow` instead of `GenericInternalRow` to avoid boxing / unboxing cost.

### Why are the changes needed?

Since it doesn't know the input row schema, `GenericInternalRow` potentially need to apply boxing for input arguments. It's better to use `SpecificInternalRow` instead since we know input data types.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing tests.

Closes #32647 from sunchao/specific-input-row.

Authored-by: Chao Sun <sunchao@apple.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(commit: c709efc)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ApplyFunctionExpression.scala (diff)
The file was modifiedsql/core/benchmarks/V2FunctionBenchmark-jdk11-results.txt (diff)
The file was modifiedsql/core/benchmarks/V2FunctionBenchmark-results.txt (diff)
Commit d4fb98354a24e6343e8be66543c76cb445ec3a2c by wenchen
[SPARK-35287][SQL] Allow RemoveRedundantProjects to preserve ProjectExec which generates UnsafeRow for DataSourceV2ScanRelation

### What changes were proposed in this pull request?

This PR fixes an issue that `RemoveRedundantProjects` removes `ProjectExec` which is for generating `UnsafeRow`.
In `DataSourceV2Strategy`, `ProjectExec` will be inserted to ensure internal rows are `UnsafeRow`.

```
  private def withProjectAndFilter(
      project: Seq[NamedExpression],
      filters: Seq[Expression],
      scan: LeafExecNode,
      needsUnsafeConversion: Boolean): SparkPlan = {
    val filterCondition = filters.reduceLeftOption(And)
    val withFilter = filterCondition.map(FilterExec(_, scan)).getOrElse(scan)

    if (withFilter.output != project || needsUnsafeConversion) {
      ProjectExec(project, withFilter)
    } else {
      withFilter
    }
  }
...
    case PhysicalOperation(project, filters, relation: DataSourceV2ScanRelation) =>
      // projection and filters were already pushed down in the optimizer.
      // this uses PhysicalOperation to get the projection and ensure that if the batch scan does
      // not support columnar, a projection is added to convert the rows to UnsafeRow.
      val batchExec = BatchScanExec(relation.output, relation.scan)
      withProjectAndFilter(project, filters, batchExec, !batchExec.supportsColumnar) :: Nil
```
So, the hierarchy of the partial tree should be like `ProjectExec(FilterExec(BatchScan))`.
But `RemoveRedundantProjects` doesn't consider this type of hierarchy, leading `ClassCastException`.

A concreate example to reproduce this issue is reported:
```
import scala.collection.JavaConverters._

import org.apache.iceberg.{PartitionSpec, TableProperties}
import org.apache.iceberg.hadoop.HadoopTables
import org.apache.iceberg.spark.SparkSchemaUtil
import org.apache.spark.sql.{DataFrame, QueryTest, SparkSession}
import org.apache.spark.sql.internal.SQLConf

class RemoveRedundantProjectsTest extends QueryTest {
  override val spark: SparkSession = SparkSession
    .builder()
    .master("local[4]")
    .config("spark.driver.bindAddress", "127.0.0.1")
    .appName(suiteName)
    .getOrCreate()
  test("RemoveRedundantProjects removes non-redundant projects") {
    withSQLConf(
      SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
      SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false",
      SQLConf.REMOVE_REDUNDANT_PROJECTS_ENABLED.key -> "true") {
      withTempDir { dir =>
        val path = dir.getCanonicalPath
        val data = spark.range(3).toDF
        val table = new HadoopTables().create(
          SparkSchemaUtil.convert(data.schema),
          PartitionSpec.unpartitioned(),
          Map(TableProperties.WRITE_NEW_DATA_LOCATION -> path).asJava,
          path)
        data.write.format("iceberg").mode("overwrite").save(path)
        table.refresh()

        val df = spark.read.format("iceberg").load(path)
        val dfX = df.as("x")
        val dfY = df.as("y")
        val join = dfX.filter(dfX("id") > 0).join(dfY, "id")
        join.explain("extended")
        assert(join.count() == 2)
      }
    }
  }
}
```
```
[info] - RemoveRedundantProjects removes non-redundant projects *** FAILED ***
[info]   org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 4) (xeroxms100.northamerica.corp.microsoft.com executor driver): java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericInternalRow cannot be cast to org.apache.spark.sql.catalyst.expressions.UnsafeRow
[info]  at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:226)
[info]  at org.apache.spark.sql.execution.SortExec.$anonfun$doExecute$1(SortExec.scala:119)
```

### Why are the changes needed?

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New test.

Closes #32606 from sarutak/fix-project-removal-issue.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(commit: d4fb983)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantProjectsSuite.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/RemoveRedundantProjects.scala (diff)
Commit fdd7ca5f4e35a906090f3c6b160bdba9ac9fd0ca by gurwls223
[SPARK-35498][PYTHON] Add thread target wrapper API for pyspark pin thread mode

### What changes were proposed in this pull request?
Add thread target wrapper API for pyspark pin thread mode.

### Why are the changes needed?
A helper method which make user easier to write threading code under pin thread mode.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Manual.

Closes #32644 from WeichenXu123/add_thread_target_wrapper_api.

Authored-by: Weichen Xu <weichen.xu@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(commit: fdd7ca5)
The file was modifiedpython/pyspark/util.py (diff)
Commit 548e37b00b0492bcc2d4703bf4983ca8ed45f878 by yamamuro
[SPARK-33122][SQL][FOLLOWUP] Extend RemoveRedundantAggregates optimizer rule to apply to more cases

### What changes were proposed in this pull request?

Addressed the dongjoon-hyun comments on the previous PR #30018.
Extended the `RemoveRedundantAggregates` rule to remove redundant aggregations in even more queries. For example in
```
dataset
    .dropDuplicates()
    .groupBy('a)
    .agg(max('b))
```
the `dropDuplicates` is not needed, because the result on `max` does not depend on duplicate values.

### Why are the changes needed?

Improve performance.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

UT

Closes #31914 from tanelk/SPARK-33122_redundant_aggs_followup.

Lead-authored-by: tanel.kiis@gmail.com <tanel.kiis@gmail.com>
Co-authored-by: Tanel Kiis <tanel.kiis@gmail.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
(commit: 548e37b)
The file was addedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAggregates.scala
The file was modifiedsql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAggregatesSuite.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala (diff)
Commit 4a6d844184a5798e4a37b0e002d44fb0b05a2be4 by gurwls223
[SPARK-35497][PYTHON] Enable plotly tests in pandas-on-Spark

### What changes were proposed in this pull request?

This PR enables plot tests with plotly

```bash
./python/run-tests --python-executables=python3 --modules=pyspark-pandas
```

**Before**:

```
Traceback (most recent call last):
  File "/.../miniconda3/envs/python3.8/lib/python3.8/runpy.py", line 194, in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "/.../miniconda3/envs/python3.8/lib/python3.8/runpy.py", line 87, in _run_code
    exec(code, run_globals)
  File "/.../pyspark/pandas/tests/plot/test_frame_plot_plotly.py", line 42, in <module>
    plotly_requirement_message + " Or pandas<1.0; pandas<1.0 does not support latest plotly "
TypeError: unsupported operand type(s) for +: 'NoneType' and 'str'

```

**After**:

```
...
Starting test(python3): pyspark.pandas.tests.plot.test_series_plot_plotly
...
Finished test(python3): pyspark.pandas.tests.plot.test_series_plot_plotly (23s)
...
Tests passed in 1296 seconds
```

### Why are the changes needed?

For test coverage.

### Does this PR introduce _any_ user-facing change?

No, test-only.

### How was this patch tested?

By running the tests.

Closes #32649 from HyukjinKwon/SPARK-35497.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(commit: 4a6d844)
The file was modifiedpython/pyspark/pandas/tests/plot/test_series_plot_plotly.py (diff)
The file was modifiedpython/pyspark/pandas/tests/plot/test_frame_plot_plotly.py (diff)
The file was modified.github/workflows/build_and_test.yml (diff)
Commit 631077db08a6b88e2c5fe66517ad6fa4e7051470 by wenchen
[SPARK-35455][SQL] Unify empty relation optimization between normal and AQE optimizer

### What changes were proposed in this pull request?

* remove `EliminateUnnecessaryJoin`, using `AQEPropagateEmptyRelation` instead.
* eliminate join, aggregate, limit, repartition, sort, generate which is beneficial.

### Why are the changes needed?

Make `EliminateUnnecessaryJoin` available with more case.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Add test.

Closes #32602 from ulysses-you/SPARK-35455.

Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(commit: 631077d)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala (diff)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelation.scala (diff)
The file was addedsql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEPropagateEmptyRelation.scala
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEOptimizer.scala (diff)
The file was removedsql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateUnnecessaryJoin.scala
Commit 58d4da1bdb7de254ebe7009d72d6207e054fe7c3 by yi.wu
[SPARK-35486][CORE] TaskMemoryManager: retry if other task takes memory freed by partial self-spill

### What changes were proposed in this pull request?

When a memory reservation triggers a self-spill, `ExecutionMemoryPool#releaseMemory()` will immediately notify waiting tasks that memory has been freed. If there are any waiting tasks with less than 1/2N of the memory pool, they may acquire the newly-freed memory before the current task has a chance to do so. This will cause the original memory reservation to fail. If the initial spill did not release all available memory, the reservation could have been satisfied by asking it to spill again.

This PR adds logic to TaskMemoryManager to detect this case and retry.

### Why are the changes needed?

This bug affects queries with a MemoryConsumer that can spill part of its memory, such as BytesToBytesMap. If the MemoryConsumer is using all available memory and there is a waiting task, then attempting to acquire more memory on the MemoryConsumer will trigger a partial self-spill. However, because the waiting task gets priority, the attempt to acquire memory will fail even if it could have been satisfied by another spill.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Added a test to MemoryManagerSuite that previously failed and now passes.

Closes #32625 from ankurdave/SPARK-35486.

Authored-by: Ankur Dave <ankurdave@gmail.com>
Signed-off-by: yi.wu <yi.wu@databricks.com>
(commit: 58d4da1)
The file was modifiedcore/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala (diff)
The file was modifiedcore/src/main/java/org/apache/spark/memory/TaskMemoryManager.java (diff)
The file was addedcore/src/test/java/org/apache/spark/memory/TestPartialSpillingMemoryConsumer.java
Commit a57afd442cf7758d6db494f44429c33f55271b7b by kabhwan.opensource
[SPARK-29223][SQL][SS] New option to specify timestamp on all subscribing topic-partitions in Kafka source

### What changes were proposed in this pull request?

This patch is a follow-up of SPARK-26848 (#23747). In SPARK-26848, we decided to open possibility to let end users set individual timestamp per partition. But in many cases, specifying timestamp represents the intention that we would want to go back to specific timestamp and reprocess records, which should be applied to all topics and partitions.

This patch proposes to provide a way to set a global timestamp across topic-partitions which the source is subscribing to, so that end users can set all offsets by specific timestamp easily. To provide the way to config the timestamp easier, the new options only receive "a" timestamp for start/end timestamp.

New options introduced in this PR:

* startingTimestamp
* endingTimestamp

All two options receive timestamp as string.

There're priorities for options regarding starting/ending offset as we will have three options for start offsets and another three options for end offsets. Priorities are following:

* starting offsets: startingTimestamp -> startingOffsetsByTimestamp -> startingOffsets
* ending offsets: startingTimestamp -> startingOffsetsByTimestamp -> startingOffsets

### Why are the changes needed?

Existing option to specify timestamp as offset is quite verbose if there're a lot of partitions across topics. Suppose there're 100s of partitions in a topic, the json should contain 100s of times of the same timestamp.

Also, the number of partitions can also change, which requires either:

* fixing the code if the json is statically created
* introducing the dependencies on Kafka client and deal with Kafka API on crafting json programmatically

Both approaches are even not "acceptable" if we're dealing with ad-hoc query; anyone doesn't want to write the code more complicated than the query itself. Flink [provides the option](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#kafka-consumers-start-position-configuration) to specify a timestamp for all topic-partitions like this PR, and even doesn't provide the option to specify the timestamp per topic-partition.

With this PR, end users are only required to provide a single timestamp value. No more complicated JSON format end users need to know about the structure.

### Does this PR introduce _any_ user-facing change?

Yes, this PR introduces two new options, described in above section.

Doc changes are following:

![스크린샷 2021-05-21 오후 12 01 02](https://user-images.githubusercontent.com/1317309/119076244-3034e680-ba2d-11eb-8323-0e227932d2e5.png)
![스크린샷 2021-05-21 오후 12 01 12](https://user-images.githubusercontent.com/1317309/119076255-35923100-ba2d-11eb-9d79-538a7f9ee738.png)
![스크린샷 2021-05-21 오후 12 01 24](https://user-images.githubusercontent.com/1317309/119076264-39be4e80-ba2d-11eb-8265-ac158f55c360.png)
![스크린샷 2021-05-21 오후 12 06 01](https://user-images.githubusercontent.com/1317309/119076271-3d51d580-ba2d-11eb-98ea-35fd72b1bbfc.png)

### How was this patch tested?

New UTs covering new functionalities. Also manually tested via simple batch & streaming queries.

Closes #32609 from HeartSaVioR/SPARK-29223-v2.

Authored-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
(commit: a57afd4)
The file was modifiedexternal/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala (diff)
The file was modifiedexternal/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderAdmin.scala (diff)
The file was modifiedexternal/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala (diff)
The file was modifiedexternal/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala (diff)
The file was modifiedexternal/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala (diff)
The file was modifiedexternal/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReaderConsumer.scala (diff)
The file was modifiedexternal/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeLimit.scala (diff)
The file was modifiedexternal/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala (diff)
The file was modifiedexternal/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala (diff)
The file was modifiedexternal/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala (diff)
The file was modifieddocs/structured-streaming-kafka-integration.md (diff)
Commit 859a53424abdfb04a6aca5c5c0ee96643ae7a07a by wenchen
[SPARK-35447][SQL] Optimize skew join before coalescing shuffle partitions

### What changes were proposed in this pull request?

This PR improves the interaction between partition coalescing and skew handling by moving the skew join rule ahead of the partition coalescing rule and making corresponding changes to the two rules:
1. Simplify `OptimizeSkewedJoin` as it doesn't need to handle `CustomShuffleReaderExec` anymore.
2. Update `CoalesceShufflePartitions` to support coalescing non-skewed partitions.

### Why are the changes needed?

It's a bit hard to reason about skew join if partitions have been coalesced. A skewed partition needs to be much larger than other partitions and we need to look at the raw sizes before coalescing.

It also makes `OptimizeSkewedJoin` more robust, as we don't need to worry about a skewed partition being coalesced with a small partition and breaks skew join handling.

It also helps with https://github.com/apache/spark/pull/31653 , which needs to move `OptimizeSkewedJoin` to an earlier phase and run before `CoalesceShufflePartitions`.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

new UT and existing tests

Closes #32594 from cloud-fan/shuffle.

Lead-authored-by: Wenchen Fan <wenchen@databricks.com>
Co-authored-by: Wenchen Fan <cloud0fan@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(commit: 859a534)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/execution/ShufflePartitionsUtilSuite.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ShufflePartitionsUtil.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CoalesceShufflePartitions.scala (diff)
Commit 7258f691887aedcf7ba3eb4e478d67a5637643b9 by srowen
[SPARK-35396] Add AutoCloseable close to BlockManager and InMemoryRelation

This PR is proposing a add-on to support to manual close entries in MemoryStore and InMemoryRelation

### What changes were proposed in this pull request?
Currently:
    MemoryStore uses a LinkedHashMap[BlockId, MemoryEntry[_]] to store all OnHeap or OffHeap entries.
And when memoryStore.remove(blockId) is called, codes will simply remove one entry from LinkedHashMap and leverage Java GC to do release work.

This PR:
    We are proposing a add-on to manually close any object stored in MemoryStore and InMemoryRelation if this object is extended from AutoCloseable.

Veifiication:
    In our own use case, we implemented a user-defined off-heap-hashRelation for BHJ, and we verified that by adding this manual close, we can make sure our defined off-heap-hashRelation can be released when evict is called.
    Also, we implemented user-defined cachedBatch and will be release when InMemoryRelation.clearCache() is called by this PR

### Why are the changes needed?
This changes can help to clean some off-heap user-defined object may be cached in InMemoryRelation or MemoryStore

### Does this PR introduce _any_ user-facing change?
NO

### How was this patch tested?
WIP

Signed-off-by: Chendi Xue <chendi.xueintel.com>

Closes #32534 from xuechendi/support_manual_close_in_memorystore.

Authored-by: Chendi Xue <chendi.xue@intel.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
(commit: 7258f69)
The file was modifiedcore/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala (diff)
The file was modifiedcore/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala (diff)
Commit 321c6545b38976b8b051ac1e80666f96922d5950 by ltnwgl
[SPARK-35514][INFRA] Automatically update version index of DocSearch via release-tag.sh

### What changes were proposed in this pull request?

Automatically update version index of DocSearch via release-tag.sh for releasing new documentation site, instead of the current manual update.

### Why are the changes needed?

Simplify the release process.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Manually run the following command and check the diff
```
R_NEXT_VERSION=3.2.0
sed -i".tmp8" "s/'facetFilters':.*$/'facetFilters': [\"version:$R_NEXT_VERSION\"]/g" docs/_config.yml
```

Closes #32662 from gengliangwang/updateDocsearchInRelease.

Authored-by: Gengliang Wang <ltnwgl@gmail.com>
Signed-off-by: Gengliang Wang <ltnwgl@gmail.com>
(commit: 321c654)
The file was modifieddev/create-release/release-tag.sh (diff)
Commit d67d73b70860d4e56fdcd6fc61f826245a52d186 by ueshin
[SPARK-35505][PYTHON] Remove APIs which have been deprecated in Koalas

### What changes were proposed in this pull request?

Removes APIs which have been deprecated in Koalas.

### Why are the changes needed?

There are some APIs that have been deprecated in Koalas. We shouldn't have those in pandas APIs on Spark.

### Does this PR introduce _any_ user-facing change?

Yes, the APIs deprecated in Koalas will be no longer available.

### How was this patch tested?

Modified some tests which use the deprecated APIs, and the other existing tests should pass.

Closes #32656 from ueshin/issues/SPARK-35505/remove_deprecated.

Authored-by: Takuya UESHIN <ueshin@databricks.com>
Signed-off-by: Takuya UESHIN <ueshin@databricks.com>
(commit: d67d73b)
The file was modifiedpython/pyspark/pandas/tests/test_dataframe.py (diff)
The file was modifiedpython/pyspark/pandas/indexes/base.py (diff)
The file was modifiedpython/pyspark/pandas/series.py (diff)
The file was modifiedpython/pyspark/pandas/frame.py (diff)
The file was modifiedpython/pyspark/pandas/base.py (diff)
The file was modifiedpython/pyspark/pandas/indexes/multi.py (diff)
Commit 4ba1db91f03c2a3a83d03ee124fdcb026d311174 by dhyun
[SPARK-35513][BUILD] Update joda-time to 2.10.10

### What changes were proposed in this pull request?
This PR aims to upgrade joda-time from 2.10.5 to 2.10.10

### Why are the changes needed?
Improvement and bug fixes in joda-time
https://www.joda.org/joda-time/changes-report.html#a2.10.10

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Ran with the existing UTs

Closes #32661 from vinodkc/br_build_upgrade_joda_time.

Authored-by: Vinod KC <vinod.kc.in@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
(commit: 4ba1db9)
The file was modifieddev/deps/spark-deps-hadoop-2.7-hive-2.3 (diff)
The file was modifiedpom.xml (diff)
The file was modifieddev/deps/spark-deps-hadoop-3.2-hive-2.3 (diff)
Commit e47e615c0ede9692fd3aa1098155e92e4fb50b7f by gurwls223
[SPARK-35506][PYTHON][INFRA] Run tests with Python 3.9 in GitHub Actions

### What changes were proposed in this pull request?

This PR enables GitHub Actions to test PySpark with Python 3.9.

### Why are the changes needed?

To verify the support of Python 3.9.

### Does this PR introduce _any_ user-facing change?

No, test-only.

### How was this patch tested?

Existing tests should cover.

Closes #32657 from HyukjinKwon/SPARK-35506.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(commit: e47e615)
The file was modified.github/workflows/build_and_test.yml (diff)
The file was modifiedpython/pyspark/pandas/tests/test_stats.py (diff)
The file was modifieddev/run-tests.py (diff)
Commit 20750a3f9e13a2f02860859f87bbc38a18cba85e by gurwls223
[SPARK-32194][PYTHON] Use proper exception classes instead of plain Exception

### What changes were proposed in this pull request?

This PR proposes to use a proper built-in exceptions instead of the plain `Exception` in Python.

While I am here, I fixed another minor issue at `DataFrams.schema` together:

```diff
- except AttributeError as e:
-     raise Exception(
-         "Unable to parse datatype from schema. %s" % e)
+ except Exception as e:
+     raise ValueError(
+         "Unable to parse datatype from schema. %s" % e) from e
```

Now it catches all exceptions during schema parsing, chains the exception with `ValueError`. Previously it only caught `AttributeError` that does not catch all cases.

### Why are the changes needed?

For users to expect the proper exceptions.

### Does this PR introduce _any_ user-facing change?

Yeah, the exception classes became different but should be compatible because previous exception was plain `Exception` which other exceptions inherit.

### How was this patch tested?

Existing unittests should cover,

Closes #31238

Closes #32650 from HyukjinKwon/SPARK-32194.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(commit: 20750a3)
The file was modifiedpython/pyspark/accumulators.py (diff)
The file was modifiedsql/gen-sql-config-docs.py (diff)
The file was modifiedpython/pyspark/taskcontext.py (diff)
The file was modifiedpython/pyspark/tests/test_appsubmit.py (diff)
The file was modifiedpython/pyspark/tests/test_context.py (diff)
The file was modifiedpython/pyspark/java_gateway.py (diff)
The file was modifiedpython/pyspark/sql/dataframe.py (diff)
The file was modifiedpython/pyspark/streaming/context.py (diff)
The file was modifiedpython/pyspark/statcounter.py (diff)
The file was modifiedpython/pyspark/sql/streaming.py (diff)
The file was modifiedpython/pyspark/rdd.py (diff)
The file was modifiedpython/pyspark/mllib/__init__.py (diff)
The file was modifiedpython/pyspark/worker.py (diff)
The file was modifiedpython/pyspark/ml/linalg/__init__.py (diff)
The file was modifiedpython/pyspark/broadcast.py (diff)
The file was modifiedpython/pyspark/tests/test_worker.py (diff)
The file was modifiedpython/pyspark/testing/utils.py (diff)
The file was modifiedpython/run-tests.py (diff)
The file was modifiedpython/pyspark/conf.py (diff)
The file was modifiedpython/pyspark/sql/tests/test_arrow.py (diff)
The file was modifiedpython/pyspark/sql/types.py (diff)
The file was modifiedpython/pyspark/tests/test_rdd.py (diff)
The file was modifiedpython/pyspark/tests/test_serializers.py (diff)
The file was modifiedpython/pyspark/tests/test_taskcontext.py (diff)
The file was modifiedpython/pyspark/streaming/tests/test_dstream.py (diff)
The file was modifiedpython/pyspark/sql/tests/test_streaming.py (diff)
The file was modifiedpython/pyspark/mllib/linalg/__init__.py (diff)
The file was modifiedpython/pyspark/mllib/clustering.py (diff)
The file was modifiedpython/pyspark/context.py (diff)
Commit af1dba7ca501fd9372b158793119163e3fcd1f24 by wenchen
[SPARK-35440][SQL] Add function type to `ExpressionInfo` for UDF

### What changes were proposed in this pull request?
Add the function type, such as "scala_udf", "python_udf", "java_udf", "hive", "built-in" to the `ExpressionInfo` for UDF.

### Why are the changes needed?
Make the `ExpressionInfo` of UDF more meaningful

### Does this PR introduce _any_ user-facing change?
no

### How was this patch tested?
existing and newly added UT

Closes #32587 from linhongliu-db/udf-language.

Authored-by: Linhong Liu <linhong.liu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(commit: af1dba7)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala (diff)
The file was modifiedsql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/ExpressionInfo.java (diff)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala (diff)
The file was modifiedsql/hive/src/test/scala/org/apache/spark/sql/hive/UDFSuite.scala (diff)
The file was modifiedsql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/ExpressionDescription.java (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala (diff)
The file was modifiedsql/core/src/test/java/test/org/apache/spark/sql/JavaUDFSuite.java (diff)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/expressions/ExpressionInfoSuite.scala (diff)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala (diff)