SuccessChanges

Summary

  1. [SPARK-23524] Big local shuffle blocks should not be checked for (commit: 86ca91551522832141aedc17ba1e47dbeb44d970) (details)
  2. [SPARK-23490][BACKPORT][SQL] Check storage.locationUri with existing (commit: 1dd37ff3b8c84c60858b159e745339ce19e53432) (details)
  3. [SPARK-23406][SS] Enable stream-stream self-joins for branch-2.3 (commit: 404f7e2013ecfdf993a17fd942d8890d9a8100e7) (details)
Commit 86ca91551522832141aedc17ba1e47dbeb44d970 by wenchen
[SPARK-23524] Big local shuffle blocks should not be checked for
corruption.
## What changes were proposed in this pull request?
In current code, all local blocks will be checked for corruption no
matter it's big or not.  The reasons are as below:
Size in FetchResult for local block is set to be 0
(https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L327)
SPARK-4105 meant to only check the small
blocks(size<maxBytesInFlight/3), but for reason 1, below check will be
invalid.
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala#L420
We can fix this and avoid the OOM.
## How was this patch tested?
UT added
Author: jx158167 <jx158167@antfin.com>
Closes #20685 from jinxing64/SPARK-23524.
(cherry picked from commit 77c91cc746f93e609c412f3a220495d9e931f696)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(commit: 86ca91551522832141aedc17ba1e47dbeb44d970)
The file was modifiedcore/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala (diff)
The file was modifiedcore/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala (diff)
Commit 1dd37ff3b8c84c60858b159e745339ce19e53432 by gatorsmile
[SPARK-23490][BACKPORT][SQL] Check storage.locationUri with existing
table in CreateTable
Backport #20660 to branch 2.3
=====================================
## What changes were proposed in this pull request?
For CreateTable with Append mode, we should check if
`storage.locationUri` is the same with existing table in
`PreprocessTableCreation`
In the current code, there is only a simple exception if the
`storage.locationUri` is different with existing table:
`org.apache.spark.sql.AnalysisException: Table or view not found:`
which can be improved.
## How was this patch tested?
Unit test
Author: Wang Gengliang <gengliang.wang@databricks.com>
Closes #20766 from gengliangwang/backport_20660_to_2.3.
(commit: 1dd37ff3b8c84c60858b159e745339ce19e53432)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala (diff)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala (diff)
Commit 404f7e2013ecfdf993a17fd942d8890d9a8100e7 by tathagata.das1565
[SPARK-23406][SS] Enable stream-stream self-joins for branch-2.3
This is a backport of #20598.
## What changes were proposed in this pull request?
Solved two bugs to enable stream-stream self joins.
### Incorrect analysis due to missing MultiInstanceRelation trait
Streaming leaf nodes did not extend MultiInstanceRelation, which is
necessary for the catalyst analyzer to convert the self-join logical
plan DAG into a tree (by creating new instances of the leaf relations).
This was causing the error `Failure when resolving conflicting
references in Join:` (see JIRA for details).
### Incorrect attribute rewrite when splicing batch plans in
MicroBatchExecution When splicing the source's batch plan into the
streaming plan (by replacing the StreamingExecutionPlan), we were
rewriting the attribute reference in the streaming plan with the new
attribute references from the batch plan. This was incorrectly handling
the scenario when multiple StreamingExecutionRelation point to the same
source, and therefore eventually point to the same batch plan returned
by the source. Here is an example query, and its corresponding plan
transformations.
``` val df = input.toDF val join =
     df.select('value % 5 as "key", 'value).join(
       df.select('value % 5 as "key", 'value), "key")
``` Streaming logical plan before splicing the batch plan
``` Project [key#6, value#1, value#12]
+- Join Inner, (key#6 = key#9)
  :- Project [(value#1 % 5) AS key#6, value#1]
  :  +- StreamingExecutionRelation Memory[#1], value#1
  +- Project [(value#12 % 5) AS key#9, value#12]
     +- StreamingExecutionRelation Memory[#1], value#12  // two
different leaves pointing to same source
``` Batch logical plan after splicing the batch plan and before
rewriting
``` Project [key#6, value#1, value#12]
+- Join Inner, (key#6 = key#9)
  :- Project [(value#1 % 5) AS key#6, value#1]
  :  +- LocalRelation [value#66]           // replaces
StreamingExecutionRelation Memory[#1], value#1
  +- Project [(value#12 % 5) AS key#9, value#12]
     +- LocalRelation [value#66]           // replaces
StreamingExecutionRelation Memory[#1], value#12
``` Batch logical plan after rewriting the attributes. Specifically, for
spliced, the new output attributes (value#66) replace the earlier output
attributes (value#12, and value#1, one for each
StreamingExecutionRelation).
``` Project [key#6, value#66, value#66]       // both value#1 and
value#12 replaces by value#66
+- Join Inner, (key#6 = key#9)
  :- Project [(value#66 % 5) AS key#6, value#66]
  :  +- LocalRelation [value#66]
  +- Project [(value#66 % 5) AS key#9, value#66]
     +- LocalRelation [value#66]
``` This causes the optimizer to eliminate value#66 from one side of the
join.
``` Project [key#6, value#66, value#66]
+- Join Inner, (key#6 = key#9)
  :- Project [(value#66 % 5) AS key#6, value#66]
  :  +- LocalRelation [value#66]
  +- Project [(value#66 % 5) AS key#9]   // this does not generate
value, incorrect join results
     +- LocalRelation [value#66]
```
**Solution**: Instead of rewriting attributes, use a Project to
introduce aliases between the output attribute references and the new
reference generated by the spliced plans. The analyzer and optimizer
will take care of the rest.
``` Project [key#6, value#1, value#12]
+- Join Inner, (key#6 = key#9)
  :- Project [(value#1 % 5) AS key#6, value#1]
  :  +- Project [value#66 AS value#1]   // solution: project with
aliases
  :     +- LocalRelation [value#66]
  +- Project [(value#12 % 5) AS key#9, value#12]
     +- Project [value#66 AS value#12]    // solution: project with
aliases
        +- LocalRelation [value#66]
```
## How was this patch tested? New unit test
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes #20765 from tdas/SPARK-23406-2.3.
(commit: 404f7e2013ecfdf993a17fd942d8890d9a8100e7)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala (diff)