1. [SPARK-25497][SQL] Limit operation within whole stage codegen should not (details)
Commit e3133f4abf1cd5667abe5f0d05fa0af0df3033ae by ishizaki
[SPARK-25497][SQL] Limit operation within whole stage codegen should not
consume all the inputs
## What changes were proposed in this pull request?
This PR is inspired by, but
proposes a safer fix.
The current limit whole stage codegen has 2 problems: 1. It's only
applied to `InputAdapter`, many leaf nodes can't stop earlier w.r.t.
limit. 2. It needs to override a method, which will break if we have
more than one limit in the whole-stage.
The first problem is easy to fix, just figure out which nodes can stop
earlier w.r.t. limit, and update them. This PR updates `RangeExec`,
`ColumnarBatchScan`, `SortExec`, `HashAggregateExec`.
The second problem is hard to fix. This PR proposes to propagate the
limit counter variable name upstream, so that the upstream leaf/blocking
nodes can check the limit counter and quit the loop earlier.
For better performance, the implementation here follows
`CodegenSupport.needStopCheck`, so that we only codegen the check only
if there is limit in the query. For columnar node like range, we check
the limit counter per-batch instead of per-row, to make the inner loop
tight and fast.
Why this is safer? 1. the leaf/blocking nodes don't have to check the
limit counter and stop earlier. It's only for performance. (this is same
as before) 2. The blocking operators can stop propagating the limit
counter name, because the counter of limit after blocking operators will
never increase, before blocking operators consume all the data from
upstream operators. So the upstream operators don't care about limit
after blocking operators. This is also for performance only, it's OK if
we forget to do it for some new blocking operators.
## How was this patch tested?
a new test
Closes #22630 from cloud-fan/limit.
Authored-by: Wenchen Fan <> Signed-off-by: Kazuaki
Ishizaki <>
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala (diff)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala (diff)
The file was modifiedsql/core/src/main/java/org/apache/spark/sql/execution/ (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala (diff)
The file was modifiedsql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarBatchScan.scala (diff)