SuccessChanges

Summary

  1. [SPARK-23598][SQL] Make methods in BufferedRowIterator public to avoid (commit: f3efbfa4b973cdb8cf992e30540609d0006e0cfe) (details)
  2. [SPARK-22915][MLLIB] Streaming tests for spark.ml.feature, from N to Z (commit: 0663b61193b37094b9d00c7f2cbb0268ad946e25) (details)
  3. [SPARK-23642][DOCS] AccumulatorV2 subclass isZero scaladoc fix (commit: a9d0784e6733666a0608e8236322f1dc380e96b7) (details)
  4. [SPARK-23695][PYTHON] Fix the error message for Kinesis streaming tests (commit: 72c13ed844d6be6510ce2c5e3526c234d1d5e10f) (details)
  5. [SPARK-23658][LAUNCHER] InProcessAppHandle uses the wrong class in (commit: 2e1e274ed9d7a30656555e71c68e7de34a336a8a) (details)
  6. [SPARK-23671][CORE] Fix condition to enable the SHS thread pool. (commit: 52a52d5d26fc1650e788eec62ce478c76f627470) (details)
  7. [SPARK-23608][CORE][WEBUI] Add synchronization in SHS between (commit: 99f5c0bc7a6c77917b4ccd498724b8ccc0c21473) (details)
Commit f3efbfa4b973cdb8cf992e30540609d0006e0cfe by hvanhovell
[SPARK-23598][SQL] Make methods in BufferedRowIterator public to avoid
runtime error for a large query
## What changes were proposed in this pull request?
This PR fixes runtime error regarding a large query when a generated
code has split classes. The issue is `append()`, `stopEarly()`, and
other methods are not accessible from split classes that are not
subclasses of `BufferedRowIterator`. This PR fixes this issue by making
them `public`.
Before applying the PR, we see the following exception by running the
attached program with `CodeGenerator.GENERATED_CLASS_SIZE_THRESHOLD=-1`.
```
test("SPARK-23598") {
   // When set -1 to CodeGenerator.GENERATED_CLASS_SIZE_THRESHOLD, an
exception is thrown
   val df_pet_age = Seq((8, "bat"), (15, "mouse"), (5,
"horse")).toDF("age", "name")
   df_pet_age.groupBy("name").avg("age").show()
}
```
Exception:
``` 19:40:52.591 WARN org.apache.hadoop.util.NativeCodeLoader: Unable to
load native-hadoop library for your platform... using builtin-java
classes where applicable 19:41:32.319 ERROR
org.apache.spark.executor.Executor: Exception in task 0.0 in stage 0.0
(TID 0) java.lang.IllegalAccessError: tried to access method
org.apache.spark.sql.execution.BufferedRowIterator.shouldStop()Z from
class
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1$agg_NestedClass1
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1$agg_NestedClass1.agg_doAggregateWithKeys$(generated.java:203)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(generated.java:160)
at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:616)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
...
```
Generated code (line 195 calles `stopEarly()`).
```
/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIteratorForCodegenStage1(references);
/* 003 */ }
/* 004 */
/* 005 */ // codegenStageId=1
/* 006 */ final class GeneratedIteratorForCodegenStage1 extends
org.apache.spark.sql.execution.BufferedRowIterator {
/* 007 */   private Object[] references;
/* 008 */   private scala.collection.Iterator[] inputs;
/* 009 */   private boolean agg_initAgg;
/* 010 */   private boolean agg_bufIsNull;
/* 011 */   private double agg_bufValue;
/* 012 */   private boolean agg_bufIsNull1;
/* 013 */   private long agg_bufValue1;
/* 014 */   private agg_FastHashMap agg_fastHashMap;
/* 015 */   private org.apache.spark.unsafe.KVIterator<UnsafeRow,
UnsafeRow> agg_fastHashMapIter;
/* 016 */   private org.apache.spark.unsafe.KVIterator agg_mapIter;
/* 017 */   private
org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap
agg_hashMap;
/* 018 */   private
org.apache.spark.sql.execution.UnsafeKVExternalSorter agg_sorter;
/* 019 */   private scala.collection.Iterator inputadapter_input;
/* 020 */   private boolean agg_agg_isNull11;
/* 021 */   private boolean agg_agg_isNull25;
/* 022 */   private
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder[]
agg_mutableStateArray1 = new
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder[2];
/* 023 */   private
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[]
agg_mutableStateArray2 = new
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[2];
/* 024 */   private UnsafeRow[] agg_mutableStateArray = new
UnsafeRow[2];
/* 025 */
/* 026 */   public GeneratedIteratorForCodegenStage1(Object[]
references) {
/* 027 */     this.references = references;
/* 028 */   }
/* 029 */
/* 030 */   public void init(int index, scala.collection.Iterator[]
inputs) {
/* 031 */     partitionIndex = index;
/* 032 */     this.inputs = inputs;
/* 033 */
/* 034 */     agg_fastHashMap = new
agg_FastHashMap(((org.apache.spark.sql.execution.aggregate.HashAggregateExec)
references[0] /* plan */).getTaskMemoryManager(),
((org.apache.spark.sql.execution.aggregate.HashAggregateExec)
references[0] /* plan */).getEmptyAggregationBuffer());
/* 035 */     agg_hashMap =
((org.apache.spark.sql.execution.aggregate.HashAggregateExec)
references[0] /* plan */).createHashMap();
/* 036 */     inputadapter_input = inputs[0];
/* 037 */     agg_mutableStateArray[0] = new UnsafeRow(1);
/* 038 */     agg_mutableStateArray1[0] = new
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_mutableStateArray[0],
32);
/* 039 */     agg_mutableStateArray2[0] = new
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_mutableStateArray1[0],
1);
/* 040 */     agg_mutableStateArray[1] = new UnsafeRow(3);
/* 041 */     agg_mutableStateArray1[1] = new
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_mutableStateArray[1],
32);
/* 042 */     agg_mutableStateArray2[1] = new
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(agg_mutableStateArray1[1],
3);
/* 043 */
/* 044 */   }
/* 045 */
/* 046 */   public class agg_FastHashMap {
/* 047 */     private
org.apache.spark.sql.catalyst.expressions.RowBasedKeyValueBatch batch;
/* 048 */     private int[] buckets;
/* 049 */     private int capacity = 1 << 16;
/* 050 */     private double loadFactor = 0.5;
/* 051 */     private int numBuckets = (int) (capacity / loadFactor);
/* 052 */     private int maxSteps = 2;
/* 053 */     private int numRows = 0;
/* 054 */     private org.apache.spark.sql.types.StructType keySchema =
new org.apache.spark.sql.types.StructType().add(((java.lang.String)
references[1] /* keyName */),
org.apache.spark.sql.types.DataTypes.StringType);
/* 055 */     private org.apache.spark.sql.types.StructType valueSchema
= new org.apache.spark.sql.types.StructType().add(((java.lang.String)
references[2] /* keyName */),
org.apache.spark.sql.types.DataTypes.DoubleType)
/* 056 */     .add(((java.lang.String) references[3] /* keyName */),
org.apache.spark.sql.types.DataTypes.LongType);
/* 057 */     private Object emptyVBase;
/* 058 */     private long emptyVOff;
/* 059 */     private int emptyVLen;
/* 060 */     private boolean isBatchFull = false;
/* 061 */
/* 062 */     public agg_FastHashMap(
/* 063 */       org.apache.spark.memory.TaskMemoryManager
taskMemoryManager,
/* 064 */       InternalRow emptyAggregationBuffer) {
/* 065 */       batch =
org.apache.spark.sql.catalyst.expressions.RowBasedKeyValueBatch
/* 066 */       .allocate(keySchema, valueSchema, taskMemoryManager,
capacity);
/* 067 */
/* 068 */       final UnsafeProjection valueProjection =
UnsafeProjection.create(valueSchema);
/* 069 */       final byte[] emptyBuffer =
valueProjection.apply(emptyAggregationBuffer).getBytes();
/* 070 */
/* 071 */       emptyVBase = emptyBuffer;
/* 072 */       emptyVOff = Platform.BYTE_ARRAY_OFFSET;
/* 073 */       emptyVLen = emptyBuffer.length;
/* 074 */
/* 075 */       buckets = new int[numBuckets];
/* 076 */       java.util.Arrays.fill(buckets, -1);
/* 077 */     }
/* 078 */
/* 079 */     public org.apache.spark.sql.catalyst.expressions.UnsafeRow
findOrInsert(UTF8String agg_key) {
/* 080 */       long h = hash(agg_key);
/* 081 */       int step = 0;
/* 082 */       int idx = (int) h & (numBuckets - 1);
/* 083 */       while (step < maxSteps) {
/* 084 */         // Return bucket index if it's either an empty slot or
already contains the key
/* 085 */         if (buckets[idx] == -1) {
/* 086 */           if (numRows < capacity && !isBatchFull) {
/* 087 */             // creating the unsafe for new entry
/* 088 */             UnsafeRow agg_result = new UnsafeRow(1);
/* 089 */           
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder
agg_holder
/* 090 */             = new
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(agg_result,
/* 091 */               32);
/* 092 */           
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter
agg_rowWriter
/* 093 */             = new
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(
/* 094 */               agg_holder,
/* 095 */               1);
/* 096 */             agg_holder.reset(); //TODO: investigate if reset
or zeroout are actually needed
/* 097 */             agg_rowWriter.zeroOutNullBytes();
/* 098 */             agg_rowWriter.write(0, agg_key);
/* 099 */             agg_result.setTotalSize(agg_holder.totalSize());
/* 100 */             Object kbase = agg_result.getBaseObject();
/* 101 */             long koff = agg_result.getBaseOffset();
/* 102 */             int klen = agg_result.getSizeInBytes();
/* 103 */
/* 104 */             UnsafeRow vRow
/* 105 */             = batch.appendRow(kbase, koff, klen, emptyVBase,
emptyVOff, emptyVLen);
/* 106 */             if (vRow == null) {
/* 107 */               isBatchFull = true;
/* 108 */             } else {
/* 109 */               buckets[idx] = numRows++;
/* 110 */             }
/* 111 */             return vRow;
/* 112 */           } else {
/* 113 */             // No more space
/* 114 */             return null;
/* 115 */           }
/* 116 */         } else if (equals(idx, agg_key)) {
/* 117 */           return batch.getValueRow(buckets[idx]);
/* 118 */         }
/* 119 */         idx = (idx + 1) & (numBuckets - 1);
/* 120 */         step++;
/* 121 */       }
/* 122 */       // Didn't find it
/* 123 */       return null;
/* 124 */     }
/* 125 */
/* 126 */     private boolean equals(int idx, UTF8String agg_key) {
/* 127 */       UnsafeRow row = batch.getKeyRow(buckets[idx]);
/* 128 */       return (row.getUTF8String(0).equals(agg_key));
/* 129 */     }
/* 130 */
/* 131 */     private long hash(UTF8String agg_key) {
/* 132 */       long agg_hash = 0;
/* 133 */
/* 134 */       int agg_result = 0;
/* 135 */       byte[] agg_bytes = agg_key.getBytes();
/* 136 */       for (int i = 0; i < agg_bytes.length; i++) {
/* 137 */         int agg_hash1 = agg_bytes[i];
/* 138 */         agg_result = (agg_result ^ (0x9e3779b9)) + agg_hash1 +
(agg_result << 6) + (agg_result >>> 2);
/* 139 */       }
/* 140 */
/* 141 */       agg_hash = (agg_hash ^ (0x9e3779b9)) + agg_result +
(agg_hash << 6) + (agg_hash >>> 2);
/* 142 */
/* 143 */       return agg_hash;
/* 144 */     }
/* 145 */
/* 146 */     public org.apache.spark.unsafe.KVIterator<UnsafeRow,
UnsafeRow> rowIterator() {
/* 147 */       return batch.rowIterator();
/* 148 */     }
/* 149 */
/* 150 */     public void close() {
/* 151 */       batch.close();
/* 152 */     }
/* 153 */
/* 154 */   }
/* 155 */
/* 156 */   protected void processNext() throws java.io.IOException {
/* 157 */     if (!agg_initAgg) {
/* 158 */       agg_initAgg = true;
/* 159 */       long wholestagecodegen_beforeAgg = System.nanoTime();
/* 160 */       agg_nestedClassInstance1.agg_doAggregateWithKeys();
/* 161 */       ((org.apache.spark.sql.execution.metric.SQLMetric)
references[8] /* aggTime */).add((System.nanoTime() -
wholestagecodegen_beforeAgg) / 1000000);
/* 162 */     }
/* 163 */
/* 164 */     // output the result
/* 165 */
/* 166 */     while (agg_fastHashMapIter.next()) {
/* 167 */       UnsafeRow agg_aggKey = (UnsafeRow)
agg_fastHashMapIter.getKey();
/* 168 */       UnsafeRow agg_aggBuffer = (UnsafeRow)
agg_fastHashMapIter.getValue();
/* 169 */     
wholestagecodegen_nestedClassInstance.agg_doAggregateWithKeysOutput(agg_aggKey,
agg_aggBuffer);
/* 170 */
/* 171 */       if (shouldStop()) return;
/* 172 */     }
/* 173 */     agg_fastHashMap.close();
/* 174 */
/* 175 */     while (agg_mapIter.next()) {
/* 176 */       UnsafeRow agg_aggKey = (UnsafeRow) agg_mapIter.getKey();
/* 177 */       UnsafeRow agg_aggBuffer = (UnsafeRow)
agg_mapIter.getValue();
/* 178 */     
wholestagecodegen_nestedClassInstance.agg_doAggregateWithKeysOutput(agg_aggKey,
agg_aggBuffer);
/* 179 */
/* 180 */       if (shouldStop()) return;
/* 181 */     }
/* 182 */
/* 183 */     agg_mapIter.close();
/* 184 */     if (agg_sorter == null) {
/* 185 */       agg_hashMap.free();
/* 186 */     }
/* 187 */   }
/* 188 */
/* 189 */   private wholestagecodegen_NestedClass
wholestagecodegen_nestedClassInstance = new
wholestagecodegen_NestedClass();
/* 190 */   private agg_NestedClass1 agg_nestedClassInstance1 = new
agg_NestedClass1();
/* 191 */   private agg_NestedClass agg_nestedClassInstance = new
agg_NestedClass();
/* 192 */
/* 193 */   private class agg_NestedClass1 {
/* 194 */     private void agg_doAggregateWithKeys() throws
java.io.IOException {
/* 195 */       while (inputadapter_input.hasNext() && !stopEarly()) {
/* 196 */         InternalRow inputadapter_row = (InternalRow)
inputadapter_input.next();
/* 197 */         int inputadapter_value = inputadapter_row.getInt(0);
/* 198 */         boolean inputadapter_isNull1 =
inputadapter_row.isNullAt(1);
/* 199 */         UTF8String inputadapter_value1 = inputadapter_isNull1
?
/* 200 */         null : (inputadapter_row.getUTF8String(1));
/* 201 */
/* 202 */       
agg_nestedClassInstance.agg_doConsume(inputadapter_row,
inputadapter_value, inputadapter_value1, inputadapter_isNull1);
/* 203 */         if (shouldStop()) return;
/* 204 */       }
/* 205 */
/* 206 */       agg_fastHashMapIter = agg_fastHashMap.rowIterator();
/* 207 */       agg_mapIter =
((org.apache.spark.sql.execution.aggregate.HashAggregateExec)
references[0] /* plan */).finishAggregate(agg_hashMap, agg_sorter,
((org.apache.spark.sql.execution.metric.SQLMetric) references[4] /*
peakMemory */), ((org.apache.spark.sql.execution.metric.SQLMetric)
references[5] /* spillSize */),
((org.apache.spark.sql.execution.metric.SQLMetric) references[6] /*
avgHashProbe */));
/* 208 */
/* 209 */     }
/* 210 */
/* 211 */   }
/* 212 */
/* 213 */   private class wholestagecodegen_NestedClass {
/* 214 */     private void agg_doAggregateWithKeysOutput(UnsafeRow
agg_keyTerm, UnsafeRow agg_bufferTerm)
/* 215 */     throws java.io.IOException {
/* 216 */       ((org.apache.spark.sql.execution.metric.SQLMetric)
references[7] /* numOutputRows */).add(1);
/* 217 */
/* 218 */       boolean agg_isNull35 = agg_keyTerm.isNullAt(0);
/* 219 */       UTF8String agg_value37 = agg_isNull35 ?
/* 220 */       null : (agg_keyTerm.getUTF8String(0));
/* 221 */       boolean agg_isNull36 = agg_bufferTerm.isNullAt(0);
/* 222 */       double agg_value38 = agg_isNull36 ?
/* 223 */       -1.0 : (agg_bufferTerm.getDouble(0));
/* 224 */       boolean agg_isNull37 = agg_bufferTerm.isNullAt(1);
/* 225 */       long agg_value39 = agg_isNull37 ?
/* 226 */       -1L : (agg_bufferTerm.getLong(1));
/* 227 */
/* 228 */       agg_mutableStateArray1[1].reset();
/* 229 */
/* 230 */       agg_mutableStateArray2[1].zeroOutNullBytes();
/* 231 */
/* 232 */       if (agg_isNull35) {
/* 233 */         agg_mutableStateArray2[1].setNullAt(0);
/* 234 */       } else {
/* 235 */         agg_mutableStateArray2[1].write(0, agg_value37);
/* 236 */       }
/* 237 */
/* 238 */       if (agg_isNull36) {
/* 239 */         agg_mutableStateArray2[1].setNullAt(1);
/* 240 */       } else {
/* 241 */         agg_mutableStateArray2[1].write(1, agg_value38);
/* 242 */       }
/* 243 */
/* 244 */       if (agg_isNull37) {
/* 245 */         agg_mutableStateArray2[1].setNullAt(2);
/* 246 */       } else {
/* 247 */         agg_mutableStateArray2[1].write(2, agg_value39);
/* 248 */       }
/* 249 */     
agg_mutableStateArray[1].setTotalSize(agg_mutableStateArray1[1].totalSize());
/* 250 */       append(agg_mutableStateArray[1]);
/* 251 */
/* 252 */     }
/* 253 */
/* 254 */   }
/* 255 */
/* 256 */   private class agg_NestedClass {
/* 257 */     private void agg_doConsume(InternalRow inputadapter_row,
int agg_expr_0, UTF8String agg_expr_1, boolean agg_exprIsNull_1) throws
java.io.IOException {
/* 258 */       UnsafeRow agg_unsafeRowAggBuffer = null;
/* 259 */       UnsafeRow agg_fastAggBuffer = null;
/* 260 */
/* 261 */       if (true) {
/* 262 */         if (!agg_exprIsNull_1) {
/* 263 */           agg_fastAggBuffer = agg_fastHashMap.findOrInsert(
/* 264 */             agg_expr_1);
/* 265 */         }
/* 266 */       }
/* 267 */       // Cannot find the key in fast hash map, try regular
hash map.
/* 268 */       if (agg_fastAggBuffer == null) {
/* 269 */         // generate grouping key
/* 270 */         agg_mutableStateArray1[0].reset();
/* 271 */
/* 272 */         agg_mutableStateArray2[0].zeroOutNullBytes();
/* 273 */
/* 274 */         if (agg_exprIsNull_1) {
/* 275 */           agg_mutableStateArray2[0].setNullAt(0);
/* 276 */         } else {
/* 277 */           agg_mutableStateArray2[0].write(0, agg_expr_1);
/* 278 */         }
/* 279 */       
agg_mutableStateArray[0].setTotalSize(agg_mutableStateArray1[0].totalSize());
/* 280 */         int agg_value7 = 42;
/* 281 */
/* 282 */         if (!agg_exprIsNull_1) {
/* 283 */           agg_value7 =
org.apache.spark.unsafe.hash.Murmur3_x86_32.hashUnsafeBytes(agg_expr_1.getBaseObject(),
agg_expr_1.getBaseOffset(), agg_expr_1.numBytes(), agg_value7);
/* 284 */         }
/* 285 */         if (true) {
/* 286 */           // try to get the buffer from hash map
/* 287 */           agg_unsafeRowAggBuffer =
/* 288 */         
agg_hashMap.getAggregationBufferFromUnsafeRow(agg_mutableStateArray[0],
agg_value7);
/* 289 */         }
/* 290 */         // Can't allocate buffer from the hash map. Spill the
map and fallback to sort-based
/* 291 */         // aggregation after processing all input rows.
/* 292 */         if (agg_unsafeRowAggBuffer == null) {
/* 293 */           if (agg_sorter == null) {
/* 294 */             agg_sorter =
agg_hashMap.destructAndCreateExternalSorter();
/* 295 */           } else {
/* 296 */           
agg_sorter.merge(agg_hashMap.destructAndCreateExternalSorter());
/* 297 */           }
/* 298 */
/* 299 */           // the hash map had be spilled, it should have
enough memory now,
/* 300 */           // try to allocate buffer again.
/* 301 */           agg_unsafeRowAggBuffer =
agg_hashMap.getAggregationBufferFromUnsafeRow(
/* 302 */             agg_mutableStateArray[0], agg_value7);
/* 303 */           if (agg_unsafeRowAggBuffer == null) {
/* 304 */             // failed to allocate the first page
/* 305 */             throw new OutOfMemoryError("No enough memory for
aggregation");
/* 306 */           }
/* 307 */         }
/* 308 */
/* 309 */       }
/* 310 */
/* 311 */       if (agg_fastAggBuffer != null) {
/* 312 */         // common sub-expressions
/* 313 */         boolean agg_isNull21 = false;
/* 314 */         long agg_value23 = -1L;
/* 315 */         if (!false) {
/* 316 */           agg_value23 = (long) agg_expr_0;
/* 317 */         }
/* 318 */         // evaluate aggregate function
/* 319 */         boolean agg_isNull23 = true;
/* 320 */         double agg_value25 = -1.0;
/* 321 */
/* 322 */         boolean agg_isNull24 = agg_fastAggBuffer.isNullAt(0);
/* 323 */         double agg_value26 = agg_isNull24 ?
/* 324 */         -1.0 : (agg_fastAggBuffer.getDouble(0));
/* 325 */         if (!agg_isNull24) {
/* 326 */           agg_agg_isNull25 = true;
/* 327 */           double agg_value27 = -1.0;
/* 328 */           do {
/* 329 */             boolean agg_isNull26 = agg_isNull21;
/* 330 */             double agg_value28 = -1.0;
/* 331 */             if (!agg_isNull21) {
/* 332 */               agg_value28 = (double) agg_value23;
/* 333 */             }
/* 334 */             if (!agg_isNull26) {
/* 335 */               agg_agg_isNull25 = false;
/* 336 */               agg_value27 = agg_value28;
/* 337 */               continue;
/* 338 */             }
/* 339 */
/* 340 */             boolean agg_isNull27 = false;
/* 341 */             double agg_value29 = -1.0;
/* 342 */             if (!false) {
/* 343 */               agg_value29 = (double) 0;
/* 344 */             }
/* 345 */             if (!agg_isNull27) {
/* 346 */               agg_agg_isNull25 = false;
/* 347 */               agg_value27 = agg_value29;
/* 348 */               continue;
/* 349 */             }
/* 350 */
/* 351 */           } while (false);
/* 352 */
/* 353 */           agg_isNull23 = false; // resultCode could change
nullability.
/* 354 */           agg_value25 = agg_value26 + agg_value27;
/* 355 */
/* 356 */         }
/* 357 */         boolean agg_isNull29 = false;
/* 358 */         long agg_value31 = -1L;
/* 359 */         if (!false && agg_isNull21) {
/* 360 */           boolean agg_isNull31 =
agg_fastAggBuffer.isNullAt(1);
/* 361 */           long agg_value33 = agg_isNull31 ?
/* 362 */           -1L : (agg_fastAggBuffer.getLong(1));
/* 363 */           agg_isNull29 = agg_isNull31;
/* 364 */           agg_value31 = agg_value33;
/* 365 */         } else {
/* 366 */           boolean agg_isNull32 = true;
/* 367 */           long agg_value34 = -1L;
/* 368 */
/* 369 */           boolean agg_isNull33 =
agg_fastAggBuffer.isNullAt(1);
/* 370 */           long agg_value35 = agg_isNull33 ?
/* 371 */           -1L : (agg_fastAggBuffer.getLong(1));
/* 372 */           if (!agg_isNull33) {
/* 373 */             agg_isNull32 = false; // resultCode could change
nullability.
/* 374 */             agg_value34 = agg_value35 + 1L;
/* 375 */
/* 376 */           }
/* 377 */           agg_isNull29 = agg_isNull32;
/* 378 */           agg_value31 = agg_value34;
/* 379 */         }
/* 380 */         // update fast row
/* 381 */         if (!agg_isNull23) {
/* 382 */           agg_fastAggBuffer.setDouble(0, agg_value25);
/* 383 */         } else {
/* 384 */           agg_fastAggBuffer.setNullAt(0);
/* 385 */         }
/* 386 */
/* 387 */         if (!agg_isNull29) {
/* 388 */           agg_fastAggBuffer.setLong(1, agg_value31);
/* 389 */         } else {
/* 390 */           agg_fastAggBuffer.setNullAt(1);
/* 391 */         }
/* 392 */       } else {
/* 393 */         // common sub-expressions
/* 394 */         boolean agg_isNull7 = false;
/* 395 */         long agg_value9 = -1L;
/* 396 */         if (!false) {
/* 397 */           agg_value9 = (long) agg_expr_0;
/* 398 */         }
/* 399 */         // evaluate aggregate function
/* 400 */         boolean agg_isNull9 = true;
/* 401 */         double agg_value11 = -1.0;
/* 402 */
/* 403 */         boolean agg_isNull10 =
agg_unsafeRowAggBuffer.isNullAt(0);
/* 404 */         double agg_value12 = agg_isNull10 ?
/* 405 */         -1.0 : (agg_unsafeRowAggBuffer.getDouble(0));
/* 406 */         if (!agg_isNull10) {
/* 407 */           agg_agg_isNull11 = true;
/* 408 */           double agg_value13 = -1.0;
/* 409 */           do {
/* 410 */             boolean agg_isNull12 = agg_isNull7;
/* 411 */             double agg_value14 = -1.0;
/* 412 */             if (!agg_isNull7) {
/* 413 */               agg_value14 = (double) agg_value9;
/* 414 */             }
/* 415 */             if (!agg_isNull12) {
/* 416 */               agg_agg_isNull11 = false;
/* 417 */               agg_value13 = agg_value14;
/* 418 */               continue;
/* 419 */             }
/* 420 */
/* 421 */             boolean agg_isNull13 = false;
/* 422 */             double agg_value15 = -1.0;
/* 423 */             if (!false) {
/* 424 */               agg_value15 = (double) 0;
/* 425 */             }
/* 426 */             if (!agg_isNull13) {
/* 427 */               agg_agg_isNull11 = false;
/* 428 */               agg_value13 = agg_value15;
/* 429 */               continue;
/* 430 */             }
/* 431 */
/* 432 */           } while (false);
/* 433 */
/* 434 */           agg_isNull9 = false; // resultCode could change
nullability.
/* 435 */           agg_value11 = agg_value12 + agg_value13;
/* 436 */
/* 437 */         }
/* 438 */         boolean agg_isNull15 = false;
/* 439 */         long agg_value17 = -1L;
/* 440 */         if (!false && agg_isNull7) {
/* 441 */           boolean agg_isNull17 =
agg_unsafeRowAggBuffer.isNullAt(1);
/* 442 */           long agg_value19 = agg_isNull17 ?
/* 443 */           -1L : (agg_unsafeRowAggBuffer.getLong(1));
/* 444 */           agg_isNull15 = agg_isNull17;
/* 445 */           agg_value17 = agg_value19;
/* 446 */         } else {
/* 447 */           boolean agg_isNull18 = true;
/* 448 */           long agg_value20 = -1L;
/* 449 */
/* 450 */           boolean agg_isNull19 =
agg_unsafeRowAggBuffer.isNullAt(1);
/* 451 */           long agg_value21 = agg_isNull19 ?
/* 452 */           -1L : (agg_unsafeRowAggBuffer.getLong(1));
/* 453 */           if (!agg_isNull19) {
/* 454 */             agg_isNull18 = false; // resultCode could change
nullability.
/* 455 */             agg_value20 = agg_value21 + 1L;
/* 456 */
/* 457 */           }
/* 458 */           agg_isNull15 = agg_isNull18;
/* 459 */           agg_value17 = agg_value20;
/* 460 */         }
/* 461 */         // update unsafe row buffer
/* 462 */         if (!agg_isNull9) {
/* 463 */           agg_unsafeRowAggBuffer.setDouble(0, agg_value11);
/* 464 */         } else {
/* 465 */           agg_unsafeRowAggBuffer.setNullAt(0);
/* 466 */         }
/* 467 */
/* 468 */         if (!agg_isNull15) {
/* 469 */           agg_unsafeRowAggBuffer.setLong(1, agg_value17);
/* 470 */         } else {
/* 471 */           agg_unsafeRowAggBuffer.setNullAt(1);
/* 472 */         }
/* 473 */
/* 474 */       }
/* 475 */
/* 476 */     }
/* 477 */
/* 478 */   }
/* 479 */
/* 480 */ }
```
## How was this patch tested?
Added UT into `WholeStageCodegenSuite`
Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>
Closes #20779 from kiszk/SPARK-23598.
(cherry picked from commit 1098933b0ac5cdb18101d3aebefa773c2ce05a50)
Signed-off-by: Herman van Hovell <hvanhovell@databricks.com>
(commit: f3efbfa4b973cdb8cf992e30540609d0006e0cfe)
The file was modifiedsql/core/src/main/java/org/apache/spark/sql/execution/BufferedRowIterator.java (diff)
The file was modifiedsql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala (diff)
Commit 0663b61193b37094b9d00c7f2cbb0268ad946e25 by joseph
[SPARK-22915][MLLIB] Streaming tests for spark.ml.feature, from N to Z
# What changes were proposed in this pull request?
Adds structured streaming tests using testTransformer for these suites:
- NGramSuite
- NormalizerSuite
- OneHotEncoderEstimatorSuite
- OneHotEncoderSuite
- PCASuite
- PolynomialExpansionSuite
- QuantileDiscretizerSuite
- RFormulaSuite
- SQLTransformerSuite
- StandardScalerSuite
- StopWordsRemoverSuite
- StringIndexerSuite
- TokenizerSuite
- RegexTokenizerSuite
- VectorAssemblerSuite
- VectorIndexerSuite
- VectorSizeHintSuite
- VectorSlicerSuite
- Word2VecSuite
# How was this patch tested?
They are unit test.
Author: “attilapiros” <piros.attila.zsolt@gmail.com>
Closes #20686 from attilapiros/SPARK-22915.
(cherry picked from commit 279b3db8970809104c30941254e57e3d62da5041)
Signed-off-by: Joseph K. Bradley <joseph@databricks.com>
(commit: 0663b61193b37094b9d00c7f2cbb0268ad946e25)
The file was modifiedmllib/src/test/scala/org/apache/spark/ml/feature/Word2VecSuite.scala (diff)
The file was modifiedmllib/src/test/scala/org/apache/spark/ml/feature/PolynomialExpansionSuite.scala (diff)
The file was modifiedmllib/src/test/scala/org/apache/spark/ml/feature/RFormulaSuite.scala (diff)
The file was modifiedmllib/src/test/scala/org/apache/spark/ml/util/MLTest.scala (diff)
The file was modifiedmllib/src/test/scala/org/apache/spark/ml/feature/StandardScalerSuite.scala (diff)
The file was modifiedmllib/src/test/scala/org/apache/spark/ml/feature/VectorSlicerSuite.scala (diff)
The file was modifiedmllib/src/test/scala/org/apache/spark/ml/feature/PCASuite.scala (diff)
The file was modifiedmllib/src/test/scala/org/apache/spark/ml/feature/NGramSuite.scala (diff)
The file was modifiedmllib/src/test/scala/org/apache/spark/ml/feature/NormalizerSuite.scala (diff)
The file was modifiedmllib/src/test/scala/org/apache/spark/ml/feature/StopWordsRemoverSuite.scala (diff)
The file was modifiedmllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala (diff)
The file was modifiedmllib/src/test/scala/org/apache/spark/ml/feature/VectorSizeHintSuite.scala (diff)
The file was modifiedmllib/src/test/scala/org/apache/spark/ml/feature/TokenizerSuite.scala (diff)
The file was modifiedmllib/src/test/scala/org/apache/spark/ml/feature/OneHotEncoderEstimatorSuite.scala (diff)
The file was modifiedmllib/src/test/scala/org/apache/spark/ml/feature/SQLTransformerSuite.scala (diff)
The file was modifiedmllib/src/test/scala/org/apache/spark/ml/feature/OneHotEncoderSuite.scala (diff)
The file was modifiedmllib/src/test/scala/org/apache/spark/ml/feature/VectorIndexerSuite.scala (diff)
The file was modifiedmllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala (diff)
Commit a9d0784e6733666a0608e8236322f1dc380e96b7 by hyukjinkwon
[SPARK-23642][DOCS] AccumulatorV2 subclass isZero scaladoc fix
Added/corrected scaladoc for isZero on the DoubleAccumulator,
CollectionAccumulator, and LongAccumulator subclasses of AccumulatorV2,
particularly noting where there are requirements in addition to having a
value of zero in order to return true.
## What changes were proposed in this pull request?
Three scaladoc comments are updated in AccumulatorV2.scala No changes
outside of comment blocks were made.
## How was this patch tested?
Running "sbt unidoc", fixing style errors found, and reviewing the
resulting local scaladoc in firefox.
Author: smallory <s.mallory@gmail.com>
Closes #20790 from smallory/patch-1.
(cherry picked from commit 4f5bad615b47d743b8932aea1071652293981604)
Signed-off-by: hyukjinkwon <gurwls223@gmail.com>
(commit: a9d0784e6733666a0608e8236322f1dc380e96b7)
The file was modifiedcore/src/main/scala/org/apache/spark/util/AccumulatorV2.scala (diff)
Commit 72c13ed844d6be6510ce2c5e3526c234d1d5e10f by ueshin
[SPARK-23695][PYTHON] Fix the error message for Kinesis streaming tests
## What changes were proposed in this pull request?
This PR proposes to fix the error message for Kinesis in PySpark when
its jar is missing but explicitly enabled.
```bash ENABLE_KINESIS_TESTS=1 SPARK_TESTING=1 bin/pyspark
pyspark.streaming.tests
```
Before:
``` Skipped test_flume_stream (enable by setting environment variable
ENABLE_FLUME_TESTS=1Skipped test_kafka_stream (enable by setting
environment variable ENABLE_KAFKA_0_8_TESTS=1Traceback (most recent call
last):
File
"/usr/local/Cellar/python/2.7.14_3/Frameworks/Python.framework/Versions/2.7/lib/python2.7/runpy.py",
line 174, in _run_module_as_main
   "__main__", fname, loader, pkg_name)
File
"/usr/local/Cellar/python/2.7.14_3/Frameworks/Python.framework/Versions/2.7/lib/python2.7/runpy.py",
line 72, in _run_code
   exec code in run_globals
File "/.../spark/python/pyspark/streaming/tests.py", line 1572, in
<module>
   % kinesis_asl_assembly_dir) + NameError: name
'kinesis_asl_assembly_dir' is not defined
```
After:
``` Skipped test_flume_stream (enable by setting environment variable
ENABLE_FLUME_TESTS=1Skipped test_kafka_stream (enable by setting
environment variable ENABLE_KAFKA_0_8_TESTS=1Traceback (most recent call
last):
File
"/usr/local/Cellar/python/2.7.14_3/Frameworks/Python.framework/Versions/2.7/lib/python2.7/runpy.py",
line 174, in _run_module_as_main
   "__main__", fname, loader, pkg_name)
File
"/usr/local/Cellar/python/2.7.14_3/Frameworks/Python.framework/Versions/2.7/lib/python2.7/runpy.py",
line 72, in _run_code
   exec code in run_globals
File "/.../spark/python/pyspark/streaming/tests.py", line 1576, in
<module>
   "You need to build Spark with 'build/sbt -Pkinesis-asl " Exception:
Failed to find Spark Streaming Kinesis assembly jar in
/.../spark/external/kinesis-asl-assembly. You need to build Spark with
'build/sbt -Pkinesis-asl assembly/package
streaming-kinesis-asl-assembly/assembly'or 'build/mvn -Pkinesis-asl
package' before running this test.
```
## How was this patch tested?
Manually tested.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes #20834 from HyukjinKwon/minor-variable.
(cherry picked from commit 56e8f48a43eb51e8582db2461a585b13a771a00a)
Signed-off-by: Takuya UESHIN <ueshin@databricks.com>
(commit: 72c13ed844d6be6510ce2c5e3526c234d1d5e10f)
The file was modifiedpython/pyspark/streaming/tests.py (diff)
Commit 2e1e274ed9d7a30656555e71c68e7de34a336a8a by vanzin
[SPARK-23658][LAUNCHER] InProcessAppHandle uses the wrong class in
getLogger
## What changes were proposed in this pull request?
Changed `Logger` in `InProcessAppHandle` to use `InProcessAppHandle`
instead of `ChildProcAppHandle`
Author: Sahil Takiar <stakiar@cloudera.com>
Closes #20815 from sahilTakiar/master.
(cherry picked from commit 7618896e855579f111dd92cd76794a5672a087e5)
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
(commit: 2e1e274ed9d7a30656555e71c68e7de34a336a8a)
The file was modifiedlauncher/src/main/java/org/apache/spark/launcher/InProcessAppHandle.java (diff)
Commit 52a52d5d26fc1650e788eec62ce478c76f627470 by vanzin
[SPARK-23671][CORE] Fix condition to enable the SHS thread pool.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes #20814 from vanzin/SPARK-23671.
(cherry picked from commit 18f8575e0166c6997569358d45bdae2cf45bf624)
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
(commit: 52a52d5d26fc1650e788eec62ce478c76f627470)
The file was modifiedcore/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala (diff)
Commit 99f5c0bc7a6c77917b4ccd498724b8ccc0c21473 by vanzin
[SPARK-23608][CORE][WEBUI] Add synchronization in SHS between
attachSparkUI and detachSparkUI functions to avoid concurrent
modification issue to Jetty Handlers
Jetty handlers are dynamically attached/detached while SHS is running.
But the attach and detach operations might be taking place at the same
time due to the async in load/clear in Guava Cache.
## What changes were proposed in this pull request? Add synchronization
between attachSparkUI and detachSparkUI in SHS.
## How was this patch tested? With this patch, the jetty handlers
missing issue never happens again in our production cluster SHS.
Author: Ye Zhou <yezhou@linkedin.com>
Closes #20744 from zhouyejoe/SPARK-23608.
(cherry picked from commit 3675af7247e841e9a689666dc20891ba55c612b3)
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
(commit: 99f5c0bc7a6c77917b4ccd498724b8ccc0c21473)
The file was modifiedcore/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala (diff)