Changes

Summary

  1. Revert "[SPARK-34674][CORE][K8S] Close SparkContext after the Main (details)
  2. [SPARK-35002][INFRA] Fix the java.net.BindException when testing with (details)
  3. [SPARK-34886][PYTHON] Port/integrate Koalas DataFrame unit test into (details)
  4. [SPARK-34881][SQL][FOLLOWUP] Implement toString() and sql() methods for (details)
  5. [SPARK-35002][INFRA][FOLLOW-UP] Use localhost instead of 127.0.0.1 at (details)
  6. [SPARK-34989] Improve the performance of mapChildren and withNewChildren (details)
  7. [SPARK-35004][TEST] Fix Incorrect assertion of `master/worker web ui (details)
  8. [SPARK-35003][SQL] Improve performance for reading smallint in (details)
  9. [SPARK-34963][SQL] Fix nested column pruning for extracting (details)
Commit ed3f103ee81b4f429b6fa8c5d4e7d866e14c6bfd by dhyun
Revert "[SPARK-34674][CORE][K8S] Close SparkContext after the Main method has finished"

This reverts commit ab97db75b2ab3ebb4d527610e2801df89cc23e2d.
The file was modifiedcore/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala (diff)
Commit 9663c4061ae07634697345560c84359574a75692 by dhyun
[SPARK-35002][INFRA] Fix the java.net.BindException when testing with Github Action

### What changes were proposed in this pull request?

This PR tries to fix the `java.net.BindException` when testing with Github Action:
```
[info] org.apache.spark.sql.kafka010.producer.InternalKafkaProducerPoolSuite *** ABORTED *** (282 milliseconds)
[info]   java.net.BindException: Cannot assign requested address: Service 'sparkDriver' failed after 100 retries (on a random free port)! Consider explicitly setting the appropriate binding address for the service 'sparkDriver' (for example spark.driver.bindAddress for SparkDriver) to the correct binding address.
[info]   at sun.nio.ch.Net.bind0(Native Method)
[info]   at sun.nio.ch.Net.bind(Net.java:461)
[info]   at sun.nio.ch.Net.bind(Net.java:453)
```

https://github.com/apache/spark/pull/32090/checks?check_run_id=2295418529

### Why are the changes needed?

Fix test framework.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Test by Github Action.

Closes #32096 from wangyum/SPARK_LOCAL_IP=localhost.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
The file was modified.github/workflows/build_and_test.yml (diff)
The file was modified.github/workflows/benchmark.yml (diff)
Commit 3af2c1bb9c7433edcce4a7b4309aefe321829de2 by gurwls223
[SPARK-34886][PYTHON] Port/integrate Koalas DataFrame unit test into PySpark

### What changes were proposed in this pull request?
Now that we merged the Koalas main code into the PySpark code base (#32036), we should port the Koalas DataFrame unit test to PySpark.

### Why are the changes needed?
Currently, the pandas-on-Spark modules are not tested at all. We should enable the DataFrame unit test first.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Enable the DataFrame unit test.

Closes #32083 from xinrong-databricks/port.test_dataframe.

Authored-by: Xinrong Meng <xinrong.meng@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
The file was modifieddev/sparktestsupport/modules.py (diff)
The file was modifiedpython/run-tests.py (diff)
The file was addedpython/pyspark/pandas/tests/__init__.py
The file was addedpython/pyspark/pandas/testing/utils.py
The file was addedpython/pyspark/pandas/tests/test_dataframe.py
The file was addedpython/pyspark/pandas/testing/__init__.py
Commit bfba7fadd2e65c853971fb2983bdea1c52d1ed7f by ltnwgl
[SPARK-34881][SQL][FOLLOWUP] Implement toString() and sql() methods for TRY_CAST

### What changes were proposed in this pull request?

Implement toString() and sql() methods for TRY_CAST

### Why are the changes needed?

The new expression should have a different name from `CAST` in SQL/String representation.

### Does this PR introduce _any_ user-facing change?

Yes, in the result of `explain()`, users can see try_cast if the new expression is used.

### How was this patch tested?

Unit tests.

Closes #32098 from gengliangwang/tryCastString.

Authored-by: Gengliang Wang <ltnwgl@gmail.com>
Signed-off-by: Gengliang Wang <ltnwgl@gmail.com>
The file was modifiedsql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/TryCastSuite.scala (diff)
The file was modifiedsql/core/src/test/resources/sql-tests/results/try_cast.sql.out (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TryCast.scala (diff)
Commit a3d1e00317d82bc2296a6f339b9604b5d6bc6754 by yumwang
[SPARK-35002][INFRA][FOLLOW-UP] Use localhost instead of 127.0.0.1 at SPARK_LOCAL_IP in GA builds

### What changes were proposed in this pull request?

This PR replaces 127.0.0.1 to `localhost`.

### Why are the changes needed?

- https://github.com/apache/spark/pull/32096#discussion_r610349269
- https://github.com/apache/spark/pull/32096#issuecomment-816442481

### Does this PR introduce _any_ user-facing change?

No, dev-only.

### How was this patch tested?

I didn't test it because it's CI specific issue. I will test it in Github Actions build in this PR.

Closes #32102 from HyukjinKwon/SPARK-35002.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: Yuming Wang <yumwang@ebay.com>
The file was modified.github/workflows/build_and_test.yml (diff)
The file was modified.github/workflows/benchmark.yml (diff)
Commit 0945baf90660a101ae0f86a39d4c91ca74ae5ee3 by herman
[SPARK-34989] Improve the performance of mapChildren and withNewChildren methods

### What changes were proposed in this pull request?
One of the main performance bottlenecks in query compilation is overly-generic tree transformation methods, namely `mapChildren` and `withNewChildren` (defined in `TreeNode`). These methods have an overly-generic implementation to iterate over the children and rely on reflection to create new instances. We have observed that, especially for queries with large query plans, a significant amount of CPU cycles are wasted in these methods. In this PR we make these methods more efficient, by delegating the iteration and instantiation to concrete node types. The benchmarks show that we can expect significant performance improvement in total query compilation time in queries with large query plans (from 30-80%) and about 20% on average.

#### Problem detail
The `mapChildren` method in `TreeNode` is overly generic and costly. To be more specific, this method:
- iterates over all the fields of a node using Scala’s product iterator. While the iteration is not reflection-based, thanks to the Scala compiler generating code for `Product`, we create many anonymous functions and visit many nested structures (recursive calls).
The anonymous functions (presumably compiled to Java anonymous inner classes) also show up quite high on the list in the object allocation profiles, so we are putting unnecessary pressure on GC here.
- does a lot of comparisons. Basically for each element returned from the product iterator, we check if it is a child (contained in the list of children) and then transform it. We can avoid that by just iterating over children, but in the current implementation, we need to gather all the fields (only transform the children) so that we can instantiate the object using the reflection.
- creates objects using reflection, by delegating to the `makeCopy` method, which is several orders of magnitude slower than using the constructor.

#### Solution
The proposed solution in this PR is rather straightforward: we rewrite the `mapChildren` method using the `children` and `withNewChildren` methods. The default `withNewChildren` method suffers from the same problems as `mapChildren` and we need to make it more efficient by specializing it in concrete classes.  Similar to how each concrete query plan node already defines its children, it should also define how they can be constructed given a new list of children. Actually, the implementation is quite simple in most cases and is a one-liner thanks to the copy method present in Scala case classes. Note that we cannot abstract over the copy method, it’s generated by the compiler for case classes if no other type higher in the hierarchy defines it. For most concrete nodes, the implementation of `withNewChildren` looks like this:
```
override def withNewChildren(newChildren: Seq[LogicalPlan]): LogicalPlan = copy(children = newChildren)
```
The current `withNewChildren` method has two properties that we should preserve:

- It returns the same instance if the provided children are the same as its children, i.e., it preserves referential equality.
- It copies tags and maintains the origin links when a new copy is created.

These properties are hard to enforce in the concrete node type implementation. Therefore, we propose a template method `withNewChildrenInternal` that should be rewritten by the concrete classes and let the `withNewChildren` method take care of referential equality and copying:
```
override def withNewChildren(newChildren: Seq[LogicalPlan]): LogicalPlan = {
if (childrenFastEquals(children, newChildren)) {
   this
} else {
   CurrentOrigin.withOrigin(origin) {
     val res = withNewChildrenInternal(newChildren)
     res.copyTagsFrom(this)
     res
   }
}
}
```

With the refactoring done in a previous PR (https://github.com/apache/spark/pull/31932) most tree node types fall in one of the categories of `Leaf`, `Unary`, `Binary` or `Ternary`. These traits have a more efficient implementation for `mapChildren` and define a more specialized version of `withNewChildrenInternal` that avoids creating unnecessary lists. For example, the `mapChildren` method in `UnaryLike` is defined as follows:
```
  override final def mapChildren(f: T => T): T = {
    val newChild = f(child)
    if (newChild fastEquals child) {
      this.asInstanceOf[T]
    } else {
      CurrentOrigin.withOrigin(origin) {
        val res = withNewChildInternal(newChild)
        res.copyTagsFrom(this.asInstanceOf[T])
        res
      }
    }
  }
```

#### Results
With this PR, we have observed significant performance improvements in query compilation time, more specifically in the analysis and optimization phases. The table below shows the TPC-DS queries that had more than 25% speedup in compilation times. Biggest speedups are observed in queries with large query plans.
| Query  | Speedup |
| ------------- | ------------- |
|q4    |29%|
|q9    |81%|
|q14a  |31%|
|q14b  |28%|
|q22   |33%|
|q33   |29%|
|q34   |25%|
|q39   |27%|
|q41   |27%|
|q44   |26%|
|q47   |28%|
|q48   |76%|
|q49   |46%|
|q56   |26%|
|q58   |43%|
|q59   |46%|
|q60   |50%|
|q65   |59%|
|q66   |46%|
|q67   |52%|
|q69   |31%|
|q70   |30%|
|q96   |26%|
|q98   |32%|

#### Binary incompatibility
Changing the `withNewChildren` in `TreeNode` breaks the binary compatibility of the code compiled against older versions of Spark because now it is expected that concrete `TreeNode` subclasses all implement the `withNewChildrenInternal` method. This is a problem, for example, when users write custom expressions. This change is the right choice, since it forces all newly added expressions to Catalyst implement it in an efficient manner and will prevent future regressions.
Please note that we have not completely removed the old implementation and renamed it to `legacyWithNewChildren`. This method will be removed in the future and for now helps the transition. There are expressions such as `UpdateFields` that have a complex way of defining children. Writing `withNewChildren` for them requires refactoring the expression. For now, these expressions use the old, slow method. In a future PR we address these expressions.

### Does this PR introduce _any_ user-facing change?

This PR does not introduce user facing changes but my break binary compatibility of the code compiled against older versions. See the binary compatibility section.

### How was this patch tested?

This PR is mainly a refactoring and passes existing tests.

Closes #32030 from dbaliafroozeh/ImprovedMapChildren.

Authored-by: Ali Afroozeh <ali.afroozeh@databricks.com>
Signed-off-by: herman <herman@databricks.com>
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/PivotFirst.scala (diff)
The file was modifiedsql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExec.scala (diff)
The file was modifiedsql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q14a.sf100/explain.txt (diff)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/execution/BaseScriptTransformationSuite.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/SparkScriptTransformationExec.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/CountIf.scala (diff)
The file was modifiedsql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConvertToLocalRelationSuite.scala (diff)
The file was modifiedexternal/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeExtractors.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SaveIntoDataSourceCommand.scala (diff)
The file was modifiedsql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q14a/explain.txt (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/regexpExpressions.scala (diff)
The file was modifiedsql/hive/src/test/scala/org/apache/spark/sql/hive/execution/TestingTypedCount.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/javaCode.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Percentile.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/SubqueryBroadcastExec.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/python/MapInPandasExec.scala (diff)
The file was modifiedsql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SubexpressionEliminationSuite.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/bitwiseExpressions.scala (diff)
The file was modifiedsql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveDirCommand.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/decimalExpressions.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproxCountDistinctForIntervals.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/hash.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/constraintExpressions.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/joins/CartesianProductExec.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Count.scala (diff)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/execution/ReferenceSort.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/First.scala (diff)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/intervalExpressions.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SubExprEvaluationRuntime.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/mathExpressions.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/arithmetic.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Average.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/python/WindowInPandasExec.scala (diff)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/ExtraStrategiesSuite.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala (diff)
The file was modifiedsql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q77/explain.txt (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/bitwiseAggregates.scala (diff)
The file was modifiedsql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlanIntegritySuite.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala (diff)
The file was modifiedsql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala (diff)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/DataFrameFunctionsSuite.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/window/WindowExec.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/csvExpressions.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTablesCommand.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/CountMinSketchAgg.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala (diff)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/TypedImperativeAggregateSuite.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PartitionTransforms.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Max.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PythonUDF.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/randomExpressions.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/CentralMomentAgg.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Last.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Product.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TypedAggregateExpression.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ScriptTransformation.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/command/resources.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/command/InsertIntoDataSourceDirCommand.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/grouping.scala (diff)
The file was modifiedsql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q5/explain.txt (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastNestedLoopJoinExec.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/command/functions.scala (diff)
The file was modifiedsql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q80.sf100/explain.txt (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/nullExpressions.scala (diff)
The file was modifiedsql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala (diff)
The file was modifiedmllib/src/main/scala/org/apache/spark/ml/stat/Summarizer.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/MaxByAndMinBy.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/ExpandExec.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/WriteToMicroBatchDataSource.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/ApproximatePercentile.scala (diff)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/HyperLogLogPlusPlus.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Covariance.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/CollectMetricsExec.scala (diff)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/execution/ColumnarRulesSuite.scala (diff)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala (diff)
The file was modifiedsql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q80/explain.txt (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Corr.scala (diff)
The file was modifiedsql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapCoGroupsInPandasExec.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TryCast.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/DynamicPruning.scala (diff)
The file was modifiedsql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/WriteToStream.scala (diff)
The file was modifiedexternal/avro/src/main/scala/org/apache/spark/sql/avro/CatalystDataToAvro.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/CallMethodViaReflection.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Sum.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/UnevaluableAggs.scala (diff)
The file was modifiedsql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionSuite.scala (diff)
The file was modifiedsql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q77.sf100/explain.txt (diff)
The file was modifiedsql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q5.sf100/explain.txt (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/SubqueryAdaptiveBroadcastExec.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Min.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/xml/xpath.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSource.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala (diff)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeSuite.scala (diff)
The file was modifiedsql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/LogicalPlanSuite.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NormalizeFloatingNumbers.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala (diff)
The file was modifiedsql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/streaming/streamingLimits.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/streaming/WriteToStreamStatement.scala (diff)
The file was modifiedsql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationExec.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala (diff)
Commit c06758834e6192b1888b4a885c612a151588b390 by ltnwgl
[SPARK-35004][TEST] Fix Incorrect assertion of `master/worker web ui available behind front-end reverseProxy` in MasterSuite

### What changes were proposed in this pull request?
Line 425 in `MasterSuite` is considered as unused expression by Intellij IDE,

https://github.com/apache/spark/blob/bfba7fadd2e65c853971fb2983bdea1c52d1ed7f/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala#L421-L426

If we merge lines 424 and 425 into one as:

```
System.getProperty("spark.ui.proxyBase") should startWith (s"$reverseProxyUrl/proxy/worker-")
```

this assertion will fail:

```
- master/worker web ui available behind front-end reverseProxy *** FAILED ***
  The code passed to eventually never returned normally. Attempted 45 times over 5.091914027 seconds. Last failure message: "http://proxyhost:8080/path/to/spark" did not start with substring "http://proxyhost:8080/path/to/spark/proxy/worker-". (MasterSuite.scala:405)
```

`System.getProperty("spark.ui.proxyBase")` should be `reverseProxyUrl` because `Master#onStart` and `Worker#handleRegisterResponse` have not changed it.

So the main purpose of this pr is to fix the condition of this assertion.

### Why are the changes needed?
Bug fix.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?

- Pass the Jenkins or GitHub Action

- Manual test:

1. merge lines 424 and 425 in `MasterSuite` into one to eliminate the unused expression:

```
System.getProperty("spark.ui.proxyBase") should startWith (s"$reverseProxyUrl/proxy/worker-")
```

2. execute `mvn clean test -pl core -Dtest=none -DwildcardSuites=org.apache.spark.deploy.master.MasterSuite`

**Before**

```
- master/worker web ui available behind front-end reverseProxy *** FAILED ***
  The code passed to eventually never returned normally. Attempted 45 times over 5.091914027 seconds. Last failure message: "http://proxyhost:8080/path/to/spark" did not start with substring "http://proxyhost:8080/path/to/spark/proxy/worker-". (MasterSuite.scala:405)

Run completed in 1 minute, 14 seconds.
Total number of tests run: 32
Suites: completed 2, aborted 0
Tests: succeeded 31, failed 1, canceled 0, ignored 0, pending 0
*** 1 TEST FAILED ***

```

**After**

```
Run completed in 1 minute, 11 seconds.
Total number of tests run: 32
Suites: completed 2, aborted 0
Tests: succeeded 32, failed 0, canceled 0, ignored 0, pending 0
All tests passed.
```

Closes #32105 from LuciferYang/SPARK-35004.

Authored-by: yangjie01 <yangjie01@baidu.com>
Signed-off-by: Gengliang Wang <ltnwgl@gmail.com>
The file was modifiedcore/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala (diff)
Commit ee7bf7d9628384548d09ac20e30992dd705c20f4 by dhyun
[SPARK-35003][SQL] Improve performance for reading smallint in vectorized Parquet reader

### What changes were proposed in this pull request?

Implements `readShorts` in `VectorizedPlainValuesReader`, which decodes `total` shorts in the input buffer at one time, similar to other types.

### Why are the changes needed?

Currently `VectorizedRleValuesReader` reads short integer in the following way:

```java
for (int i = 0; i < n; i++) {
  c.putShort(rowId + i, (short)data.readInteger());
}
```
For PLAIN encoding `data.readInteger` is done via:

```java
public final int readInteger() {
  return getBuffer(4).getInt();
}
```
which means it needs to repeatedly call `slice` buffer for the batch size number of times. This is more expensive than calling it once in a big chunk and then reading the ints out.

Micro benchmark via `DataSourceReadBenchmark` showed ~35% perf improvement.

Before:
```
[info] OpenJDK 64-Bit Server VM 11.0.8+10-LTS on Mac OS X 10.16
[info] Intel(R) Core(TM) i9-9880H CPU  2.30GHz
[info] SQL Single SMALLINT Column Scan:          Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] SQL CSV                                           10249          10271          32          1.5         651.6       1.0X
[info] SQL Json                                           5963           5982          28          2.6         379.1       1.7X
[info] SQL Parquet Vectorized                              141            151          15        111.9           8.9      72.9X
[info] SQL Parquet MR                                     1454           1491          52         10.8          92.4       7.0X
[info] SQL ORC Vectorized                                  160            164           3         98.3          10.2      64.1X
[info] SQL ORC MR                                         1133           1164          44         13.9          72.0       9.0X
```

After:
```
[info] OpenJDK 64-Bit Server VM 11.0.8+10-LTS on Mac OS X 10.16
[info] Intel(R) Core(TM) i9-9880H CPU  2.30GHz
[info] SQL Single SMALLINT Column Scan:          Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] SQL CSV                                           10489          10535          65          1.5         666.8       1.0X
[info] SQL Json                                           5864           5888          34          2.7         372.8       1.8X
[info] SQL Parquet Vectorized                              104            111           8        151.0           6.6     100.7X
[info] SQL Parquet MR                                     1458           1472          20         10.8          92.7       7.2X
[info] SQL ORC Vectorized                                  157            166           7        100.0          10.0      66.7X
[info] SQL ORC MR                                         1121           1147          37         14.0          71.2       9.4X
```

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing tests

Closes #32104 from sunchao/smallint.

Authored-by: Chao Sun <sunchao@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
The file was modifiedsql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java (diff)
The file was modifiedsql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java (diff)
The file was modifiedsql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java (diff)
Commit 364d1eaf10f51c357f507325557fb076140ced2c by viirya
[SPARK-34963][SQL] Fix nested column pruning for extracting case-insensitive struct field from array of struct

### What changes were proposed in this pull request?

This patch proposes a fix of nested column pruning for extracting case-insensitive struct field from array of struct.

### Why are the changes needed?

Under case-insensitive mode, nested column pruning rule cannot correctly push down extractor of a struct field of an array of struct, e.g.,

```scala
val query = spark.table("contacts").select("friends.First", "friends.MiDDle")
```

Error stack:
```
[info]   java.lang.IllegalArgumentException: Field "First" does not exist.
[info] Available fields:
[info]   at org.apache.spark.sql.types.StructType$$anonfun$apply$1.apply(StructType.scala:274)
[info]   at org.apache.spark.sql.types.StructType$$anonfun$apply$1.apply(StructType.scala:274)
[info]   at scala.collection.MapLike$class.getOrElse(MapLike.scala:128)
[info]   at scala.collection.AbstractMap.getOrElse(Map.scala:59)
[info]   at org.apache.spark.sql.types.StructType.apply(StructType.scala:273)
[info]   at org.apache.spark.sql.execution.ProjectionOverSchema$$anonfun$getProjection$3.apply(ProjectionOverSchema.scala:44)
[info]   at org.apache.spark.sql.execution.ProjectionOverSchema$$anonfun$getProjection$3.apply(ProjectionOverSchema.scala:41)
```

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Unit test

Closes #32059 from viirya/fix-array-nested-pruning.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ProjectionOverSchema.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SelectedField.scala (diff)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala (diff)