SuccessChanges

Summary

  1. [SPARK-30154][ML] PySpark UDF to convert MLlib vectors to dense arrays (details)
  2. [SPARK-30430][PYTHON][DOCS] Add a note that UserDefinedFunction's (details)
  3. [SPARK-19784][SPARK-25403][SQL] Refresh the table even table stats is (details)
  4. [SPARK-30433][SQL] Make conflict attributes resolution more scalable in (details)
  5. [SPARK-30338][SQL] Avoid unnecessary InternalRow copies in (details)
  6. [SPARK-30414][SQL] ParquetRowConverter optimizations: arrays, maps, plus (details)
  7. [SPARK-30335][SQL][DOCS] Add a note first, last, collect_list and (details)
Commit 88542bc3d9e506b1a0e852f3e9c632920d3fe553 by meng
[SPARK-30154][ML] PySpark UDF to convert MLlib vectors to dense arrays
### What changes were proposed in this pull request?
PySpark UDF to convert MLlib vectors to dense arrays. Example:
``` from pyspark.ml.functions import vector_to_array
df.select(vector_to_array(col("features"))
```
### Why are the changes needed? If a PySpark user wants to convert MLlib
sparse/dense vectors in a DataFrame into dense arrays, an efficient
approach is to do that in JVM. However, it requires PySpark user to
write Scala code and register it as a UDF. Often this is infeasible for
a pure python project.
### Does this PR introduce any user-facing change? No.
### How was this patch tested? UT.
Closes #26910 from WeichenXu123/vector_to_array.
Authored-by: WeichenXu <weichen.xu@databricks.com> Signed-off-by:
Xiangrui Meng <meng@databricks.com>
The file was modifieddev/sparktestsupport/modules.py (diff)
The file was addedpython/pyspark/ml/functions.py
The file was addedmllib/src/test/scala/org/apache/spark/ml/FunctionsSuite.scala
The file was modifiedpython/docs/pyspark.ml.rst (diff)
The file was addedmllib/src/main/scala/org/apache/spark/ml/functions.scala
Commit 3ba175ef9a5c011a48d8f5e4f6ab6b21e7f5377b by gurwls223
[SPARK-30430][PYTHON][DOCS] Add a note that UserDefinedFunction's
constructor is private
### What changes were proposed in this pull request?
This PR adds a note that UserDefinedFunction's constructor is private.
### Why are the changes needed?
To match with Scala side. Scala side does not have it at all.
### Does this PR introduce any user-facing change?
Doc only changes but it declares UserDefinedFunction's constructor is
private explicitly.
### How was this patch tested?
Jenkins
Closes #27101 from HyukjinKwon/SPARK-30430.
Authored-by: HyukjinKwon <gurwls223@apache.org> Signed-off-by:
HyukjinKwon <gurwls223@apache.org>
The file was modifiedpython/pyspark/sql/udf.py (diff)
Commit 17881a467a1ac4224a50247458107f8b141850d2 by wenchen
[SPARK-19784][SPARK-25403][SQL] Refresh the table even table stats is
empty
## What changes were proposed in this pull request?
We invalidate table relation once table data is changed by
[SPARK-21237](https://issues.apache.org/jira/browse/SPARK-21237). But
there is a situation we have not
invalidated(`spark.sql.statistics.size.autoUpdate.enabled=false` and
`table.stats.isEmpty`):
https://github.com/apache/spark/blob/07c4b9bd1fb055f283af076b2a995db8f6efe7a5/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala#L44-L54
This will introduce some issues, e.g.
[SPARK-19784](https://issues.apache.org/jira/browse/SPARK-19784),
[SPARK-19845](https://issues.apache.org/jira/browse/SPARK-19845),
[SPARK-25403](https://issues.apache.org/jira/browse/SPARK-25403),
[SPARK-25332](https://issues.apache.org/jira/browse/SPARK-25332) and
[SPARK-28413](https://issues.apache.org/jira/browse/SPARK-28413).
This is a example to reproduce
[SPARK-19784](https://issues.apache.org/jira/browse/SPARK-19784):
```scala val path = "/tmp/spark/parquet" spark.sql("CREATE TABLE t (a
INT) USING parquet") spark.sql("INSERT INTO TABLE t VALUES (1)")
spark.range(5).toDF("a").write.parquet(path) spark.sql(s"ALTER TABLE t
SET LOCATION '${path}'") spark.table("t").count() // return 1
spark.sql("refresh table t") spark.table("t").count() // return 5
```
This PR invalidates the table relation in this
case(`spark.sql.statistics.size.autoUpdate.enabled=false` and
`table.stats.isEmpty`) to fix this issue.
## How was this patch tested?
unit tests
Closes #22721 from wangyum/SPARK-25403.
Authored-by: Yuming Wang <yumwang@ebay.com> Signed-off-by: Wenchen Fan
<wenchen@databricks.com>
The file was modifiedsql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala (diff)
The file was modifiedsql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetMetastoreSuite.scala (diff)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala (diff)
Commit da076153aa569cfedc7322332898ae7031044f25 by wenchen
[SPARK-30433][SQL] Make conflict attributes resolution more scalable in
ResolveReferences
### What changes were proposed in this pull request?
This PR tries to make conflict attributes resolution in
`ResolveReferences` more scalable by doing resolution in batch way.
### Why are the changes needed?
Currently, `ResolveReferences` rule only resolves conflict attributes of
one single conflict plan pair in one iteration, which can be inefficient
when there're many conflicts.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Covered by existed tests.
Closes #27105 from Ngone51/resolve-conflict-columns-in-batch.
Authored-by: yi.wu <yi.wu@databricks.com> Signed-off-by: Wenchen Fan
<wenchen@databricks.com>
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala (diff)
Commit 93d3ab88cd38c41f7f60cdf9c579f953a3f5f3be by wenchen
[SPARK-30338][SQL] Avoid unnecessary InternalRow copies in
ParquetRowConverter
### What changes were proposed in this pull request?
This PR modifies `ParquetRowConverter` to remove unnecessary
`InternalRow.copy()` calls for structs that are directly nested in other
structs.
### Why are the changes needed?
These changes  can significantly improve performance when reading
Parquet files that contain deeply-nested structs with many fields.
The `ParquetRowConverter` uses per-field `Converter`s for handling
individual fields. Internally, these converters may have mutable state
and may return mutable objects. In most cases, each `converter` is only
invoked once per Parquet record (this is true for top-level fields, for
example). However, arrays and maps may call their child element
converters multiple times per Parquet record: in these cases we must be
careful to copy any mutable outputs returned by child converters.
In the existing code, `InternalRow`s are copied whenever they are stored
into _any_ parent container (not just maps and arrays). This copying can
be especially expensive for deeply-nested fields, since a deep copy is
performed at every level of nesting.
This PR modifies the code to avoid copies for structs that are directly
nested in structs; see inline code comments for an argument for why this
is safe.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
**Correctness**:  I added new test cases to `ParquetIOSuite` to increase
coverage of nested structs, including structs nested in arrays:
previously this suite didn't test that case, so we used to lack mutation
coverage of this `copy()` code (the suite's tests still passed if I
incorrectly removed the `.copy()` in all cases). I also added a test for
maps with struct keys and modified the existing "map with struct values"
test case include maps with two elements (since the incorrect omission
of a `copy()` can only be detected if the map has multiple elements).
**Performance**: I put together a simple local benchmark demonstrating
the performance problems:
First, construct a nested schema:
```scala
case class Inner(
   f1: Int,
   f2: Long,
   f3: String,
   f4: Int,
   f5: Long,
   f6: String,
   f7: Int,
   f8: Long,
   f9: String,
   f10: Int
)
  case class Wrapper1(inner: Inner)
case class Wrapper2(wrapper1: Wrapper1)
case class Wrapper3(wrapper2: Wrapper2)
```
`Wrapper3`'s schema looks like:
``` root
|-- wrapper2: struct (nullable = true)
|    |-- wrapper1: struct (nullable = true)
|    |    |-- inner: struct (nullable = true)
|    |    |    |-- f1: integer (nullable = true)
|    |    |    |-- f2: long (nullable = true)
|    |    |    |-- f3: string (nullable = true)
|    |    |    |-- f4: integer (nullable = true)
|    |    |    |-- f5: long (nullable = true)
|    |    |    |-- f6: string (nullable = true)
|    |    |    |-- f7: integer (nullable = true)
|    |    |    |-- f8: long (nullable = true)
|    |    |    |-- f9: string (nullable = true)
|    |    |    |-- f10: integer (nullable = true)
```
Next, generate some fake data:
```scala
val data = spark.range(1, 1000 * 1000 * 25, 1, 1).map { i =>
   Wrapper3(Wrapper2(Wrapper1(Inner(
     i.toInt,
     i * 2,
     (i * 3).toString,
     (i * 4).toInt,
     i * 5,
     (i * 6).toString,
     (i * 7).toInt,
     i * 8,
     (i * 9).toString,
     (i * 10).toInt
   ))))
}
  data.write.mode("overwrite").parquet("/tmp/parquet-test")
```
I then ran a simple benchmark consisting of
```
spark.read.parquet("/tmp/parquet-test").selectExpr("hash(*)").rdd.count()
```
where the `hash(*)` is designed to force decoding of all Parquet fields
but avoids `RowEncoder` costs in the `.rdd.count()` stage.
In the old code, expensive copying takes place at every level of
nesting; this is apparent in the following flame graph:
![image](https://user-images.githubusercontent.com/50748/71389014-88a15380-25af-11ea-9537-3e87a2aef179.png)
After this PR's changes, the above toy benchmark runs ~30% faster.
Closes #26993 from
JoshRosen/joshrosen/faster-parquet-nested-scan-by-avoiding-copies.
Lead-authored-by: Josh Rosen <rosenville@gmail.com> Co-authored-by: Josh
Rosen <joshrosen@stripe.com> Signed-off-by: Wenchen Fan
<wenchen@databricks.com>
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala (diff)
Commit 7a1a5db35f2c204554951964a783051ba72171d6 by gurwls223
[SPARK-30414][SQL] ParquetRowConverter optimizations: arrays, maps, plus
misc. constant factors
### What changes were proposed in this pull request?
This PR implements multiple performance optimizations for
`ParquetRowConverter`, achieving some modest constant-factor wins for
all fields and larger wins for map and array fields:
- Add `private[this]` to several `val`s
(90cebf080a5d3857ea8cf2a89e8e060b8b5a2fbf)
- Keep a `fieldUpdaters` array, saving two`.updater()` calls per field
(7318785d350cc924198d7514e40973fd76d54ad5): I suspect that these are
often megamorphic calls, so cutting these out seems like it could be a
relatively large performance win.
- Only call `currentRow.numFields` once per `start()` call
(e05de150813b639929c18af1df09ec718d2d16fc): previously we'd call it once
per field and this had a significant enough cost that it was visible
during profiling.
- Reuse buffers in array and map converters
(c7d1534685fbad5d2280b082f37bed6d75848e76,
6d16f596ef6af9fd8946a062f79d0eeace9e1959): previously we would create a
brand-new Scala `ArrayBuffer` for each field read, but this isn't
actually necessary because the data is already copied into a fresh array
when `end()` constructs a `GenericArrayData`.
### Why are the changes needed?
To improve Parquet read performance; this is complementary to #26993's
(orthogonal) improvements for nested struct read performance.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Existing tests, plus manual benchmarking with both synthetic and
realistic schemas (similar to the ones in #26993). I've seen ~10%+
improvements in scan performance on certain real-world datasets.
Closes #27089 from
JoshRosen/joshrosen/more-ParquetRowConverter-optimizations.
Lead-authored-by: Josh Rosen <rosenville@gmail.com> Co-authored-by: Josh
Rosen <joshrosen@stripe.com> Signed-off-by: HyukjinKwon
<gurwls223@apache.org>
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala (diff)
Commit 866b7df348bb2b3ed69383501a344bf0866c3451 by gurwls223
[SPARK-30335][SQL][DOCS] Add a note first, last, collect_list and
collect_set can be non-deterministic in SQL function docs as well
### What changes were proposed in this pull request? This PR adds a note
first and last can be non-deterministic in SQL function docs as well.
This is already documented in `functions.scala`.
### Why are the changes needed? Some people look reading SQL docs only.
### Does this PR introduce any user-facing change? No
### How was this patch tested? Jenkins will test.
Closes #27099 from HyukjinKwon/SPARK-30335.
Authored-by: HyukjinKwon <gurwls223@apache.org> Signed-off-by:
HyukjinKwon <gurwls223@apache.org>
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/First.scala (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala (diff)
The file was modifiedR/pkg/R/functions.R (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/functions.scala (diff)
The file was modifiedpython/pyspark/sql/functions.py (diff)
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Last.scala (diff)