SuccessChanges

Summary

  1. [SPARK-32975][K8S][FOLLOWUP] Avoid None.get exception (details)
  2. [SPARK-35652][SQL] joinWith on two table generated from same one (details)
  3. [SPARK-35695][SQL] Collect observed metrics from cached and adaptive (details)
Commit 0f3a251af0795bfa4af75ce1efa6a845a31362fa by dhyun
[SPARK-32975][K8S][FOLLOWUP] Avoid None.get exception

### What changes were proposed in this pull request?

A follow-up for SPARK-32975 to avoid unexpected the `None.get` exception

Run SparkPi with docker desktop, as podName is an option, we will got
```logtalk
21/06/09 01:09:12 ERROR Utils: Uncaught exception in thread main
java.util.NoSuchElementException: None.get
at scala.None$.get(Option.scala:529)
at scala.None$.get(Option.scala:527)
at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.$anonfun$start$1(ExecutorPodsAllocator.scala:110)
at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1417)
at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.start(ExecutorPodsAllocator.scala:111)
at org.apache.spark.scheduler.cluster.k8s.KubernetesClusterSchedulerBackend.start(KubernetesClusterSchedulerBackend.scala:99)
at org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:220)
at org.apache.spark.SparkContext.<init>(SparkContext.scala:581)
at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2686)
at org.apache.spark.sql.SparkSession$Builder.$anonfun$getOrCreate$2(SparkSession.scala:948)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:942)
at org.apache.spark.examples.SparkPi$.main(SparkPi.scala:30)
at org.apache.spark.examples.SparkPi.main(SparkPi.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:955)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1043)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1052)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
```

### Why are the changes needed?

fix a regression

### Does this PR introduce _any_ user-facing change?

no
### How was this patch tested?

Manual.

Closes #32830 from yaooqinn/SPARK-32975.

Authored-by: Kent Yao <yao@apache.org>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
(cherry picked from commit b4b78ce26567ce7ab83d47ce3b6af87c866bcacb)
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
The file was modifiedresource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala (diff)
Commit 6b398a4d832d78e2d0caee5acf22eac961024ee3 by wenchen
[SPARK-35652][SQL] joinWith on two table generated from same one

It seems like spark inner join is performing a cartesian join in self joining using `joinWith`

To produce this issues:
```
val df = spark.range(0,3)
df.joinWith(df, df("id") === df("id")).show()
```

Before this pull request, the result is
+---+---+
| _1 |  _2 |
+---+---+
|    0 |   0 |
|    0 |   1 |
|    0 |   2 |
|    1 |   0 |
|    1 |   1 |
|    1 |   2 |
|    2 |   0 |
|    2 |   1 |
|    2 |   2 |
+---+---+

The expected result is
+---+---+
| _1 |  _2 |
+---+---+
|    0 |   0 |
|    1 |   1 |
|    2 |   2 |
+---+---+
correctness

no

add test

Closes #32863 from dgd-contributor/SPARK-35652_join_and_joinWith_in_seft_joining.

Authored-by: dgd-contributor <dgd_contributor@viettel.com.vn>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 6e1aa15679b5fed249c62b2340151a0299401b18)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/Dataset.scala (diff)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala (diff)
Commit 8aa23fb73c074ae14529172e176723f5ee44b9d7 by wenchen
[SPARK-35695][SQL] Collect observed metrics from cached and adaptive execution sub-trees

### What changes were proposed in this pull request?

Collect observed metrics from cached and adaptive execution sub-trees.

### Why are the changes needed?

Currently persisting/caching will hide all observed metrics in that sub-tree from reaching the `QueryExecutionListeners`. Adaptive query execution can also hide the metrics from reaching `QueryExecutionListeners`.

### Does this PR introduce _any_ user-facing change?

Bugfix

### How was this patch tested?

New UTs

Closes #32862 from tanelk/SPARK-35695_collect_metrics_persist.

Lead-authored-by: Tanel Kiis <tanel.kiis@gmail.com>
Co-authored-by: tanel.kiis@gmail.com <tanel.kiis@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 692dc66c4a3660665c1f156df6eeb9ce6f86195e)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/CollectMetricsExec.scala (diff)