SuccessChanges

Summary

  1. [SPARK-30553][DOCS] fix structured-streaming java example error (details)
Commit 0300d4bde7343c2e260430e4f470505e9e721ab0 by dhyun
[SPARK-30553][DOCS] fix structured-streaming java example error
# What changes were proposed in this pull request?
Fix structured-streaming java example error.
```java Dataset<Row> windowedCounts = words
   .withWatermark("timestamp", "10 minutes")
   .groupBy(
       functions.window(words.col("timestamp"), "10 minutes", "5
minutes"),
       words.col("word"))
   .count();
``` It does not clean up old state.May cause OOM
> Before the fix
```scala
== Physical Plan == WriteToDataSourceV2
org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter48e331f0
+- *(4) HashAggregate(keys=[window#13, word#4], functions=[count(1)],
output=[window#13, word#4, count#12L])
  +- StateStoreSave [window#13, word#4], state info [ checkpoint =
file:/C:/Users/chenhao/AppData/Local/Temp/temporary-91124080-0e20-41c0-9150-91735bdc22c0/state,
runId = 5c425536-a3ae-4385-8167-5fa529e6760d, opId = 0, ver = 6,
numPartitions = 1], Update, 1579530890886, 2
     +- *(3) HashAggregate(keys=[window#13, word#4],
functions=[merge_count(1)], output=[window#13, word#4, count#23L])
        +- StateStoreRestore [window#13, word#4], state info [
checkpoint =
file:/C:/Users/chenhao/AppData/Local/Temp/temporary-91124080-0e20-41c0-9150-91735bdc22c0/state,
runId = 5c425536-a3ae-4385-8167-5fa529e6760d, opId = 0, ver = 6,
numPartitions = 1], 2
           +- *(2) HashAggregate(keys=[window#13, word#4],
functions=[merge_count(1)], output=[window#13, word#4, count#23L])
              +- Exchange hashpartitioning(window#13, word#4, 1)
                 +- *(1) HashAggregate(keys=[window#13, word#4],
functions=[partial_count(1)], output=[window#13, word#4, count#23L])
                    +- *(1) Project [window#13, word#4]
                       +- *(1) Filter (((isnotnull(timestamp#5) &&
isnotnull(window#13)) && (timestamp#5 >= window#13.start)) &&
(timestamp#5 < window#13.end))
                          +- *(1) Expand [List(named_struct(start,
precisetimestampconversion(((((CASE WHEN
(cast(CEIL((cast((precisetimestampconversion(timestamp#5, TimestampType,
LongType) - 0) as double) / 3.0E8)) as double) =
(cast((precisetimestampconversion(timestamp#5, TimestampType, LongType)
- 0) as double) / 3.0E8)) THEN
(CEIL((cast((precisetimestampconversion(timestamp#5, TimestampType,
LongType) - 0) as double) / 3.0E8)) + 1) ELSE
CEIL((cast((precisetimestampconversion(timestamp#5, TimestampType,
LongType) - 0) as double) / 3.0E8)) END + 0) - 2) * 300000000) + 0),
LongType, TimestampType), end, precisetimestampconversion(((((CASE WHEN
(cast(CEIL((cast((precisetimestampconversion(timestamp#5, TimestampType,
LongType) - 0) as double) / 3.0E8)) as double) =
(cast((precisetimestampconversion(timestamp#5, TimestampType, LongType)
- 0) as double) / 3.0E8)) THEN
(CEIL((cast((precisetimestampconversion(timestamp#5, TimestampType,
LongType) - 0) as double) / 3.0E8)) + 1) ELSE
CEIL((cast((precisetimestampconversion(timestamp#5, TimestampType,
LongType) - 0) as double) / 3.0E8)) END + 0) - 2) * 300000000) +
600000000), LongType, TimestampType)), word#4, timestamp#5-T600000ms),
List(named_struct(start, precisetimestampconversion(((((CASE WHEN
(cast(CEIL((cast((precisetimestampconversion(timestamp#5, TimestampType,
LongType) - 0) as double) / 3.0E8)) as double) =
(cast((precisetimestampconversion(timestamp#5, TimestampType, LongType)
- 0) as double) / 3.0E8)) THEN
(CEIL((cast((precisetimestampconversion(timestamp#5, TimestampType,
LongType) - 0) as double) / 3.0E8)) + 1) ELSE
CEIL((cast((precisetimestampconversion(timestamp#5, TimestampType,
LongType) - 0) as double) / 3.0E8)) END + 1) - 2) * 300000000) + 0),
LongType, TimestampType), end, precisetimestampconversion(((((CASE WHEN
(cast(CEIL((cast((precisetimestampconversion(timestamp#5, TimestampType,
LongType) - 0) as double) / 3.0E8)) as double) =
(cast((precisetimestampconversion(timestamp#5, TimestampType, LongType)
- 0) as double) / 3.0E8)) THEN
(CEIL((cast((precisetimestampconversion(timestamp#5, TimestampType,
LongType) - 0) as double) / 3.0E8)) + 1) ELSE
CEIL((cast((precisetimestampconversion(timestamp#5, TimestampType,
LongType) - 0) as double) / 3.0E8)) END + 1) - 2) * 300000000) +
600000000), LongType, TimestampType)), word#4, timestamp#5-T600000ms)],
[window#13, word#4, timestamp#5-T600000ms]
                             +- EventTimeWatermark timestamp#5:
timestamp, interval 10 minutes
                                +- LocalTableScan <empty>, [word#4,
timestamp#5]
```
> After the fix
```scala
== Physical Plan == WriteToDataSourceV2
org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter1df12a96
+- *(4) HashAggregate(keys=[window#13-T600000ms, word#4],
functions=[count(1)], output=[window#8-T600000ms, word#4, count#12L])
  +- StateStoreSave [window#13-T600000ms, word#4], state info [
checkpoint =
file:/C:/Users/chenhao/AppData/Local/Temp/temporary-95ac74cc-aca6-42eb-827d-7586aa69bcd3/state,
runId = 91fa311d-d47e-4726-9d0a-f21ef268d9d0, opId = 0, ver = 4,
numPartitions = 1], Update, 1579529975342, 2
     +- *(3) HashAggregate(keys=[window#13-T600000ms, word#4],
functions=[merge_count(1)], output=[window#13-T600000ms, word#4,
count#23L])
        +- StateStoreRestore [window#13-T600000ms, word#4], state info [
checkpoint =
file:/C:/Users/chenhao/AppData/Local/Temp/temporary-95ac74cc-aca6-42eb-827d-7586aa69bcd3/state,
runId = 91fa311d-d47e-4726-9d0a-f21ef268d9d0, opId = 0, ver = 4,
numPartitions = 1], 2
           +- *(2) HashAggregate(keys=[window#13-T600000ms, word#4],
functions=[merge_count(1)], output=[window#13-T600000ms, word#4,
count#23L])
              +- Exchange hashpartitioning(window#13-T600000ms, word#4,
1)
                 +- *(1) HashAggregate(keys=[window#13-T600000ms,
word#4], functions=[partial_count(1)], output=[window#13-T600000ms,
word#4, count#23L])
                    +- *(1) Project [window#13-T600000ms, word#4]
                       +- *(1) Filter
(((isnotnull(timestamp#5-T600000ms) && isnotnull(window#13-T600000ms))
&& (timestamp#5-T600000ms >= window#13-T600000ms.start)) &&
(timestamp#5-T600000ms < window#13-T600000ms.end))
                          +- *(1) Expand [List(named_struct(start,
precisetimestampconversion(((((CASE WHEN
(cast(CEIL((cast((precisetimestampconversion(timestamp#5-T600000ms,
TimestampType, LongType) - 0) as double) / 3.0E8)) as double) =
(cast((precisetimestampconversion(timestamp#5-T600000ms, TimestampType,
LongType) - 0) as double) / 3.0E8)) THEN
(CEIL((cast((precisetimestampconversion(timestamp#5-T600000ms,
TimestampType, LongType) - 0) as double) / 3.0E8)) + 1) ELSE
CEIL((cast((precisetimestampconversion(timestamp#5-T600000ms,
TimestampType, LongType) - 0) as double) / 3.0E8)) END + 0) - 2) *
300000000) + 0), LongType, TimestampType), end,
precisetimestampconversion(((((CASE WHEN
(cast(CEIL((cast((precisetimestampconversion(timestamp#5-T600000ms,
TimestampType, LongType) - 0) as double) / 3.0E8)) as double) =
(cast((precisetimestampconversion(timestamp#5-T600000ms, TimestampType,
LongType) - 0) as double) / 3.0E8)) THEN
(CEIL((cast((precisetimestampconversion(timestamp#5-T600000ms,
TimestampType, LongType) - 0) as double) / 3.0E8)) + 1) ELSE
CEIL((cast((precisetimestampconversion(timestamp#5-T600000ms,
TimestampType, LongType) - 0) as double) / 3.0E8)) END + 0) - 2) *
300000000) + 600000000), LongType, TimestampType)), word#4,
timestamp#5-T600000ms), List(named_struct(start,
precisetimestampconversion(((((CASE WHEN
(cast(CEIL((cast((precisetimestampconversion(timestamp#5-T600000ms,
TimestampType, LongType) - 0) as double) / 3.0E8)) as double) =
(cast((precisetimestampconversion(timestamp#5-T600000ms, TimestampType,
LongType) - 0) as double) / 3.0E8)) THEN
(CEIL((cast((precisetimestampconversion(timestamp#5-T600000ms,
TimestampType, LongType) - 0) as double) / 3.0E8)) + 1) ELSE
CEIL((cast((precisetimestampconversion(timestamp#5-T600000ms,
TimestampType, LongType) - 0) as double) / 3.0E8)) END + 1) - 2) *
300000000) + 0), LongType, TimestampType), end,
precisetimestampconversion(((((CASE WHEN
(cast(CEIL((cast((precisetimestampconversion(timestamp#5-T600000ms,
TimestampType, LongType) - 0) as double) / 3.0E8)) as double) =
(cast((precisetimestampconversion(timestamp#5-T600000ms, TimestampType,
LongType) - 0) as double) / 3.0E8)) THEN
(CEIL((cast((precisetimestampconversion(timestamp#5-T600000ms,
TimestampType, LongType) - 0) as double) / 3.0E8)) + 1) ELSE
CEIL((cast((precisetimestampconversion(timestamp#5-T600000ms,
TimestampType, LongType) - 0) as double) / 3.0E8)) END + 1) - 2) *
300000000) + 600000000), LongType, TimestampType)), word#4,
timestamp#5-T600000ms)], [window#13-T600000ms, word#4,
timestamp#5-T600000ms]
                             +- EventTimeWatermark timestamp#5:
timestamp, interval 10 minutes
                                +- LocalTableScan <empty>, [word#4,
timestamp#5]
```
### Why are the changes needed? If we write the code according to the
documentation.It does not clean up old state.May cause OOM
### Does this PR introduce any user-facing change? No
### How was this patch tested?
```java
       SparkSession spark =
SparkSession.builder().appName("test").master("local[*]")
               .config("spark.sql.shuffle.partitions", 1)
               .getOrCreate();
       Dataset<Row> lines = spark.readStream().format("socket")
               .option("host", "skynet")
               .option("includeTimestamp", true)
               .option("port", 8888).load();
       Dataset<Row> words = lines.toDF("word", "timestamp");
       Dataset<Row> windowedCounts = words
               .withWatermark("timestamp", "10 minutes")
               .groupBy(
                       window(col("timestamp"), "10 minutes", "5
minutes"),
                       col("word"))
               .count();
       StreamingQuery start = windowedCounts.writeStream()
               .outputMode("update")
               .format("console").start();
       start.awaitTermination();
``` We can  write an example like this.And input some date 1. see the
matrics `stateOnCurrentVersionSizeBytes` in log.Is it increasing all the
time? 2. see the Physical Plan.Whether it contains things like
`HashAggregate(keys=[window#11-T10000ms, value#39]` 3. We can debug in
`storeManager.remove(store, keyRow)`.Whether it will remove the old
state.
Closes #27268 from bettermouse/spark-30553.
Authored-by: bettermouse <qq5375631> Signed-off-by: Dongjoon Hyun
<dhyun@apple.com>
(cherry picked from commit 3c4e61918fc8266368bd33ea0612144de77571e6)
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
The file was modifieddocs/structured-streaming-programming-guide.md (diff)