SuccessChanges

Summary

  1. [SPARK-30353][SQL] Add IsNotNull check in SimplifyBinaryComparison (details)
  2. [SPARK-27296][SQL] Allows Aggregator to be registered as a UDF (details)
Commit 823e3d309c51528e69893f71c4be0f5bc8552d99 by wenchen
[SPARK-30353][SQL] Add IsNotNull check in SimplifyBinaryComparison
optimization
### What changes were proposed in this pull request?
Now Spark can propagate constraint during sql optimization when
`spark.sql.constraintPropagation.enabled` is true, then `where c = 1`
will convert to `where c = 1 and c is not null`. We also can use
constraint in `SimplifyBinaryComparison`.
`SimplifyBinaryComparison` will simplify expression which is not
nullable and semanticEquals. And we also can simplify if one expression
is infered `IsNotNull`.
### Why are the changes needed?
Simplify SQL.
``` create table test (c1 string);
explain extended select c1 from test where c1 = c1 limit 10;
-- before GlobalLimit 10
+- LocalLimit 10
  +- Filter (isnotnull(c1#20) AND (c1#20 = c1#20))
     +- Relation[c1#20]
-- after GlobalLimit 10
+- LocalLimit 10
   +- Filter (isnotnull(c1#20)
       +- Relation[c1#20]
explain extended select c1 from test where c1 > c1 limit 10;
-- before GlobalLimit 10
+- LocalLimit 10
  +- Filter (isnotnull(c1#20) && (c1#20 > c1#20))
     +- Relation[c1#20]
-- after LocalRelation <empty>, [c1#20]
```
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Add UT.
Closes #27008 from ulysses-you/SPARK-30353.
Authored-by: ulysses <youxiduo@weidian.com> Signed-off-by: Wenchen Fan
<wenchen@databricks.com>
The file was modifiedsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala (diff)
The file was modifiedsql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BinaryComparisonSimplificationSuite.scala (diff)
Commit 1f50a5875b46885a40668c058a1a28e736776244 by wenchen
[SPARK-27296][SQL] Allows Aggregator to be registered as a UDF
## What changes were proposed in this pull request? Defines a new
subclass of UDF: `UserDefinedAggregator`. Also allows `Aggregator` to be
registered as a udf.  Under the hood, the implementation is based on the
internal `TypedImperativeAggregate` class that spark's predefined
aggregators make use of. The effect is that custom user defined
aggregators are now serialized only on partition boundaries instead of
being serialized and deserialized at each input row.
The two new modes of using `Aggregator` are as follows:
```scala val agg: Aggregator[IN, BUF, OUT] = // typed aggregator val
udaf1 = UserDefinedAggregator(agg) val udaf2 = spark.udf.register("agg",
agg)
```
## How was this patch tested? Unit testing has been added that
corresponds to the testing suites for `UserDefinedAggregateFunction`.
Additionally, unit tests explicitly count the number of aggregator
ser/de cycles to ensure that it is governed only by the number of data
partitions.
To evaluate the performance impact, I did two comparisons. The code and
REPL results are recorded on [this
gist](https://gist.github.com/erikerlandson/b0e106a4dbaf7f80b4f4f3a21f05f892)
To characterize its behavior I benchmarked both a relatively simple
aggregator and then an aggregator with a complex structure (a t-digest).
### performance The following compares the new `Aggregator` based
aggregation against UDAF. In this scenario, the new aggregation is about
100x faster. The difference in performance impact depends on the
complexity of the aggregator. For very simple aggregators (e.g.
implementing 'sum', etc), the performance impact is more like 25-30%.
```scala scala> import scala.util.Random._, org.apache.spark.sql.Row,
org.apache.spark.tdigest._ import scala.util.Random._ import
org.apache.spark.sql.Row import org.apache.spark.tdigest._
scala> val data = sc.parallelize(Vector.fill(50000){(nextInt(2),
nextGaussian, nextGaussian.toFloat)}, 5).toDF("cat", "x1", "x2") data:
org.apache.spark.sql.DataFrame = [cat: int, x1: double ... 1 more field]
scala> val udaf = TDigestUDAF(0.5, 0) udaf:
org.apache.spark.tdigest.TDigestUDAF = TDigestUDAF(0.5,0)
scala> val bs = Benchmark.sample(10) { data.agg(udaf($"x1"),
udaf($"x2")).first } bs: Array[(Double, org.apache.spark.sql.Row)] =
Array((16.523,[TDigestSQL(TDigest(0.5,0,130,TDigestMap(-4.9171836327285225
-> (1.0, 1.0), -3.9615949140987685 -> (1.0, 2.0), -3.792874086327091 ->
(0.7500781537109753, 2.7500781537109753), -3.720534874164185 ->
(1.796754196108008, 4.546832349818983), -3.702105588052377 ->
(0.4531676501810167, 5.0), -3.665883591332569 -> (2.3434687534153142,
7.343468753415314), -3.649982231368131 -> (0.6565312465846858, 8.0),
-3.5914188829817744 -> (4.0, 12.0), -3.530472305581248 -> (4.0, 16.0),
-3.4060489584449467 -> (2.9372251939818383, 18.93722519398184),
-3.3000694035428486 -> (8.12412890252889, 27.061354096510726),
-3.2250016655261877 -> (8.30564453211017, 35.3669986286209),
-3.180537395623448 -> (6.001782561137285, 41.3687811...
scala> bs.map(_._1) res0: Array[Double] = Array(16.523, 17.138, 17.863,
17.801, 17.769, 17.786, 17.744, 17.8, 17.939, 17.854)
scala> val agg = TDigestAggregator(0.5, 0) agg:
org.apache.spark.tdigest.TDigestAggregator = TDigestAggregator(0.5,0)
scala> val udaa = spark.udf.register("tdigest", agg) udaa:
org.apache.spark.sql.expressions.UserDefinedAggregator[Double,org.apache.spark.tdigest.TDigestSQL,org.apache.spark.tdigest.TDigestSQL]
= UserDefinedAggregator(TDigestAggregator(0.5,0),None,true,true)
scala> val bs = Benchmark.sample(10) { data.agg(udaa($"x1"),
udaa($"x2")).first } bs: Array[(Double, org.apache.spark.sql.Row)] =
Array((0.313,[TDigestSQL(TDigest(0.5,0,130,TDigestMap(-4.9171836327285225
-> (1.0, 1.0), -3.9615949140987685 -> (1.0, 2.0), -3.792874086327091 ->
(0.7500781537109753, 2.7500781537109753), -3.720534874164185 ->
(1.796754196108008, 4.546832349818983), -3.702105588052377 ->
(0.4531676501810167, 5.0), -3.665883591332569 -> (2.3434687534153142,
7.343468753415314), -3.649982231368131 -> (0.6565312465846858, 8.0),
-3.5914188829817744 -> (4.0, 12.0), -3.530472305581248 -> (4.0, 16.0),
-3.4060489584449467 -> (2.9372251939818383, 18.93722519398184),
-3.3000694035428486 -> (8.12412890252889, 27.061354096510726),
-3.2250016655261877 -> (8.30564453211017, 35.3669986286209),
-3.180537395623448 -> (6.001782561137285, 41.36878118...
scala> bs.map(_._1) res1: Array[Double] = Array(0.313, 0.193, 0.175,
0.185, 0.174, 0.176, 0.16, 0.186, 0.171, 0.179)
scala>
```
Closes #25024 from erikerlandson/spark-27296.
Authored-by: Erik Erlandson <eerlands@redhat.com> Signed-off-by: Wenchen
Fan <wenchen@databricks.com>
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala (diff)
The file was addedsql/hive/src/test/scala/org/apache/spark/sql/hive/execution/UDAQuerySuite.scala
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/UDFRegistration.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/functions.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/expressions/UserDefinedFunction.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala (diff)