0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017 package org.apache.spark.sql.execution.vectorized;
0018
0019 import java.math.BigDecimal;
0020 import java.math.BigInteger;
0021
0022 import com.google.common.annotations.VisibleForTesting;
0023
0024 import org.apache.spark.sql.internal.SQLConf;
0025 import org.apache.spark.sql.types.*;
0026 import org.apache.spark.sql.vectorized.ColumnVector;
0027 import org.apache.spark.sql.vectorized.ColumnarArray;
0028 import org.apache.spark.sql.vectorized.ColumnarMap;
0029 import org.apache.spark.unsafe.array.ByteArrayMethods;
0030 import org.apache.spark.unsafe.types.CalendarInterval;
0031 import org.apache.spark.unsafe.types.UTF8String;
0032
0033
0034
0035
0036
0037
0038
0039
0040
0041
0042
0043
0044
0045
0046
0047
0048 public abstract class WritableColumnVector extends ColumnVector {
0049
0050
0051
0052
0053 public void reset() {
0054 if (isConstant) return;
0055
0056 if (childColumns != null) {
0057 for (ColumnVector c: childColumns) {
0058 ((WritableColumnVector) c).reset();
0059 }
0060 }
0061 elementsAppended = 0;
0062 if (numNulls > 0) {
0063 putNotNulls(0, capacity);
0064 numNulls = 0;
0065 }
0066 }
0067
0068 @Override
0069 public void close() {
0070 if (childColumns != null) {
0071 for (int i = 0; i < childColumns.length; i++) {
0072 childColumns[i].close();
0073 childColumns[i] = null;
0074 }
0075 childColumns = null;
0076 }
0077 if (dictionaryIds != null) {
0078 dictionaryIds.close();
0079 dictionaryIds = null;
0080 }
0081 dictionary = null;
0082 }
0083
0084 public void reserve(int requiredCapacity) {
0085 if (requiredCapacity < 0) {
0086 throwUnsupportedException(requiredCapacity, null);
0087 } else if (requiredCapacity > capacity) {
0088 int newCapacity = (int) Math.min(MAX_CAPACITY, requiredCapacity * 2L);
0089 if (requiredCapacity <= newCapacity) {
0090 try {
0091 reserveInternal(newCapacity);
0092 } catch (OutOfMemoryError outOfMemoryError) {
0093 throwUnsupportedException(requiredCapacity, outOfMemoryError);
0094 }
0095 } else {
0096 throwUnsupportedException(requiredCapacity, null);
0097 }
0098 }
0099 }
0100
0101 private void throwUnsupportedException(int requiredCapacity, Throwable cause) {
0102 String message = "Cannot reserve additional contiguous bytes in the vectorized reader (" +
0103 (requiredCapacity >= 0 ? "requested " + requiredCapacity + " bytes" : "integer overflow") +
0104 "). As a workaround, you can reduce the vectorized reader batch size, or disable the " +
0105 "vectorized reader, or disable " + SQLConf.BUCKETING_ENABLED().key() + " if you read " +
0106 "from bucket table. For Parquet file format, refer to " +
0107 SQLConf.PARQUET_VECTORIZED_READER_BATCH_SIZE().key() +
0108 " (default " + SQLConf.PARQUET_VECTORIZED_READER_BATCH_SIZE().defaultValueString() +
0109 ") and " + SQLConf.PARQUET_VECTORIZED_READER_ENABLED().key() + "; for ORC file format, " +
0110 "refer to " + SQLConf.ORC_VECTORIZED_READER_BATCH_SIZE().key() +
0111 " (default " + SQLConf.ORC_VECTORIZED_READER_BATCH_SIZE().defaultValueString() +
0112 ") and " + SQLConf.ORC_VECTORIZED_READER_ENABLED().key() + ".";
0113 throw new RuntimeException(message, cause);
0114 }
0115
0116 @Override
0117 public boolean hasNull() {
0118 return numNulls > 0;
0119 }
0120
0121 @Override
0122 public int numNulls() { return numNulls; }
0123
0124
0125
0126
0127
0128
0129
0130 public abstract int getDictId(int rowId);
0131
0132
0133
0134
0135
0136
0137 protected Dictionary dictionary;
0138
0139
0140
0141
0142 protected WritableColumnVector dictionaryIds;
0143
0144
0145
0146
0147 public boolean hasDictionary() { return this.dictionary != null; }
0148
0149
0150
0151
0152 public WritableColumnVector getDictionaryIds() {
0153 return dictionaryIds;
0154 }
0155
0156
0157
0158
0159 public void setDictionary(Dictionary dictionary) {
0160 this.dictionary = dictionary;
0161 }
0162
0163
0164
0165
0166 public WritableColumnVector reserveDictionaryIds(int capacity) {
0167 if (dictionaryIds == null) {
0168 dictionaryIds = reserveNewColumn(capacity, DataTypes.IntegerType);
0169 } else {
0170 dictionaryIds.reset();
0171 dictionaryIds.reserve(capacity);
0172 }
0173 return dictionaryIds;
0174 }
0175
0176
0177
0178
0179
0180 protected abstract void reserveInternal(int capacity);
0181
0182
0183
0184
0185 public abstract void putNotNull(int rowId);
0186 public abstract void putNull(int rowId);
0187
0188
0189
0190
0191 public abstract void putNulls(int rowId, int count);
0192 public abstract void putNotNulls(int rowId, int count);
0193
0194
0195
0196
0197 public abstract void putBoolean(int rowId, boolean value);
0198
0199
0200
0201
0202 public abstract void putBooleans(int rowId, int count, boolean value);
0203
0204
0205
0206
0207 public abstract void putByte(int rowId, byte value);
0208
0209
0210
0211
0212 public abstract void putBytes(int rowId, int count, byte value);
0213
0214
0215
0216
0217 public abstract void putBytes(int rowId, int count, byte[] src, int srcIndex);
0218
0219
0220
0221
0222 public abstract void putShort(int rowId, short value);
0223
0224
0225
0226
0227 public abstract void putShorts(int rowId, int count, short value);
0228
0229
0230
0231
0232 public abstract void putShorts(int rowId, int count, short[] src, int srcIndex);
0233
0234
0235
0236
0237
0238 public abstract void putShorts(int rowId, int count, byte[] src, int srcIndex);
0239
0240
0241
0242
0243 public abstract void putInt(int rowId, int value);
0244
0245
0246
0247
0248 public abstract void putInts(int rowId, int count, int value);
0249
0250
0251
0252
0253 public abstract void putInts(int rowId, int count, int[] src, int srcIndex);
0254
0255
0256
0257
0258
0259 public abstract void putInts(int rowId, int count, byte[] src, int srcIndex);
0260
0261
0262
0263
0264
0265 public abstract void putIntsLittleEndian(int rowId, int count, byte[] src, int srcIndex);
0266
0267
0268
0269
0270 public abstract void putLong(int rowId, long value);
0271
0272
0273
0274
0275 public abstract void putLongs(int rowId, int count, long value);
0276
0277
0278
0279
0280 public abstract void putLongs(int rowId, int count, long[] src, int srcIndex);
0281
0282
0283
0284
0285
0286 public abstract void putLongs(int rowId, int count, byte[] src, int srcIndex);
0287
0288
0289
0290
0291
0292 public abstract void putLongsLittleEndian(int rowId, int count, byte[] src, int srcIndex);
0293
0294
0295
0296
0297 public abstract void putFloat(int rowId, float value);
0298
0299
0300
0301
0302 public abstract void putFloats(int rowId, int count, float value);
0303
0304
0305
0306
0307 public abstract void putFloats(int rowId, int count, float[] src, int srcIndex);
0308
0309
0310
0311
0312
0313 public abstract void putFloats(int rowId, int count, byte[] src, int srcIndex);
0314
0315
0316
0317
0318 public abstract void putDouble(int rowId, double value);
0319
0320
0321
0322
0323 public abstract void putDoubles(int rowId, int count, double value);
0324
0325
0326
0327
0328 public abstract void putDoubles(int rowId, int count, double[] src, int srcIndex);
0329
0330
0331
0332
0333
0334 public abstract void putDoubles(int rowId, int count, byte[] src, int srcIndex);
0335
0336
0337
0338
0339 public abstract void putArray(int rowId, int offset, int length);
0340
0341
0342
0343
0344 public abstract int putByteArray(int rowId, byte[] value, int offset, int count);
0345 public final int putByteArray(int rowId, byte[] value) {
0346 return putByteArray(rowId, value, 0, value.length);
0347 }
0348
0349 @Override
0350 public Decimal getDecimal(int rowId, int precision, int scale) {
0351 if (isNullAt(rowId)) return null;
0352 if (precision <= Decimal.MAX_INT_DIGITS()) {
0353 return Decimal.createUnsafe(getInt(rowId), precision, scale);
0354 } else if (precision <= Decimal.MAX_LONG_DIGITS()) {
0355 return Decimal.createUnsafe(getLong(rowId), precision, scale);
0356 } else {
0357
0358 byte[] bytes = getBinary(rowId);
0359 BigInteger bigInteger = new BigInteger(bytes);
0360 BigDecimal javaDecimal = new BigDecimal(bigInteger, scale);
0361 return Decimal.apply(javaDecimal, precision, scale);
0362 }
0363 }
0364
0365 public void putDecimal(int rowId, Decimal value, int precision) {
0366 if (precision <= Decimal.MAX_INT_DIGITS()) {
0367 putInt(rowId, (int) value.toUnscaledLong());
0368 } else if (precision <= Decimal.MAX_LONG_DIGITS()) {
0369 putLong(rowId, value.toUnscaledLong());
0370 } else {
0371 BigInteger bigInteger = value.toJavaBigDecimal().unscaledValue();
0372 putByteArray(rowId, bigInteger.toByteArray());
0373 }
0374 }
0375
0376 public void putInterval(int rowId, CalendarInterval value) {
0377 getChild(0).putInt(rowId, value.months);
0378 getChild(1).putInt(rowId, value.days);
0379 getChild(2).putLong(rowId, value.microseconds);
0380 }
0381
0382 @Override
0383 public UTF8String getUTF8String(int rowId) {
0384 if (isNullAt(rowId)) return null;
0385 if (dictionary == null) {
0386 return arrayData().getBytesAsUTF8String(getArrayOffset(rowId), getArrayLength(rowId));
0387 } else {
0388 byte[] bytes = dictionary.decodeToBinary(dictionaryIds.getDictId(rowId));
0389 return UTF8String.fromBytes(bytes);
0390 }
0391 }
0392
0393
0394
0395
0396
0397
0398 protected abstract UTF8String getBytesAsUTF8String(int rowId, int count);
0399
0400 @Override
0401 public byte[] getBinary(int rowId) {
0402 if (isNullAt(rowId)) return null;
0403 if (dictionary == null) {
0404 return arrayData().getBytes(getArrayOffset(rowId), getArrayLength(rowId));
0405 } else {
0406 return dictionary.decodeToBinary(dictionaryIds.getDictId(rowId));
0407 }
0408 }
0409
0410
0411
0412
0413
0414
0415
0416 public final int appendNull() {
0417 assert (!(dataType() instanceof StructType));
0418 reserve(elementsAppended + 1);
0419 putNull(elementsAppended);
0420 return elementsAppended++;
0421 }
0422
0423 public final int appendNotNull() {
0424 reserve(elementsAppended + 1);
0425 putNotNull(elementsAppended);
0426 return elementsAppended++;
0427 }
0428
0429 public final int appendNulls(int count) {
0430 assert (!(dataType() instanceof StructType));
0431 reserve(elementsAppended + count);
0432 int result = elementsAppended;
0433 putNulls(elementsAppended, count);
0434 elementsAppended += count;
0435 return result;
0436 }
0437
0438 public final int appendNotNulls(int count) {
0439 assert (!(dataType() instanceof StructType));
0440 reserve(elementsAppended + count);
0441 int result = elementsAppended;
0442 putNotNulls(elementsAppended, count);
0443 elementsAppended += count;
0444 return result;
0445 }
0446
0447 public final int appendBoolean(boolean v) {
0448 reserve(elementsAppended + 1);
0449 putBoolean(elementsAppended, v);
0450 return elementsAppended++;
0451 }
0452
0453 public final int appendBooleans(int count, boolean v) {
0454 reserve(elementsAppended + count);
0455 int result = elementsAppended;
0456 putBooleans(elementsAppended, count, v);
0457 elementsAppended += count;
0458 return result;
0459 }
0460
0461 public final int appendByte(byte v) {
0462 reserve(elementsAppended + 1);
0463 putByte(elementsAppended, v);
0464 return elementsAppended++;
0465 }
0466
0467 public final int appendBytes(int count, byte v) {
0468 reserve(elementsAppended + count);
0469 int result = elementsAppended;
0470 putBytes(elementsAppended, count, v);
0471 elementsAppended += count;
0472 return result;
0473 }
0474
0475 public final int appendBytes(int length, byte[] src, int offset) {
0476 reserve(elementsAppended + length);
0477 int result = elementsAppended;
0478 putBytes(elementsAppended, length, src, offset);
0479 elementsAppended += length;
0480 return result;
0481 }
0482
0483 public final int appendShort(short v) {
0484 reserve(elementsAppended + 1);
0485 putShort(elementsAppended, v);
0486 return elementsAppended++;
0487 }
0488
0489 public final int appendShorts(int count, short v) {
0490 reserve(elementsAppended + count);
0491 int result = elementsAppended;
0492 putShorts(elementsAppended, count, v);
0493 elementsAppended += count;
0494 return result;
0495 }
0496
0497 public final int appendShorts(int length, short[] src, int offset) {
0498 reserve(elementsAppended + length);
0499 int result = elementsAppended;
0500 putShorts(elementsAppended, length, src, offset);
0501 elementsAppended += length;
0502 return result;
0503 }
0504
0505 public final int appendInt(int v) {
0506 reserve(elementsAppended + 1);
0507 putInt(elementsAppended, v);
0508 return elementsAppended++;
0509 }
0510
0511 public final int appendInts(int count, int v) {
0512 reserve(elementsAppended + count);
0513 int result = elementsAppended;
0514 putInts(elementsAppended, count, v);
0515 elementsAppended += count;
0516 return result;
0517 }
0518
0519 public final int appendInts(int length, int[] src, int offset) {
0520 reserve(elementsAppended + length);
0521 int result = elementsAppended;
0522 putInts(elementsAppended, length, src, offset);
0523 elementsAppended += length;
0524 return result;
0525 }
0526
0527 public final int appendLong(long v) {
0528 reserve(elementsAppended + 1);
0529 putLong(elementsAppended, v);
0530 return elementsAppended++;
0531 }
0532
0533 public final int appendLongs(int count, long v) {
0534 reserve(elementsAppended + count);
0535 int result = elementsAppended;
0536 putLongs(elementsAppended, count, v);
0537 elementsAppended += count;
0538 return result;
0539 }
0540
0541 public final int appendLongs(int length, long[] src, int offset) {
0542 reserve(elementsAppended + length);
0543 int result = elementsAppended;
0544 putLongs(elementsAppended, length, src, offset);
0545 elementsAppended += length;
0546 return result;
0547 }
0548
0549 public final int appendFloat(float v) {
0550 reserve(elementsAppended + 1);
0551 putFloat(elementsAppended, v);
0552 return elementsAppended++;
0553 }
0554
0555 public final int appendFloats(int count, float v) {
0556 reserve(elementsAppended + count);
0557 int result = elementsAppended;
0558 putFloats(elementsAppended, count, v);
0559 elementsAppended += count;
0560 return result;
0561 }
0562
0563 public final int appendFloats(int length, float[] src, int offset) {
0564 reserve(elementsAppended + length);
0565 int result = elementsAppended;
0566 putFloats(elementsAppended, length, src, offset);
0567 elementsAppended += length;
0568 return result;
0569 }
0570
0571 public final int appendDouble(double v) {
0572 reserve(elementsAppended + 1);
0573 putDouble(elementsAppended, v);
0574 return elementsAppended++;
0575 }
0576
0577 public final int appendDoubles(int count, double v) {
0578 reserve(elementsAppended + count);
0579 int result = elementsAppended;
0580 putDoubles(elementsAppended, count, v);
0581 elementsAppended += count;
0582 return result;
0583 }
0584
0585 public final int appendDoubles(int length, double[] src, int offset) {
0586 reserve(elementsAppended + length);
0587 int result = elementsAppended;
0588 putDoubles(elementsAppended, length, src, offset);
0589 elementsAppended += length;
0590 return result;
0591 }
0592
0593 public final int appendByteArray(byte[] value, int offset, int length) {
0594 int copiedOffset = arrayData().appendBytes(length, value, offset);
0595 reserve(elementsAppended + 1);
0596 putArray(elementsAppended, copiedOffset, length);
0597 return elementsAppended++;
0598 }
0599
0600 public final int appendArray(int length) {
0601 reserve(elementsAppended + 1);
0602 putArray(elementsAppended, arrayData().elementsAppended, length);
0603 return elementsAppended++;
0604 }
0605
0606
0607
0608
0609
0610
0611
0612 public final int appendStruct(boolean isNull) {
0613 if (isNull) {
0614
0615 reserve(elementsAppended + 1);
0616 putNull(elementsAppended);
0617 elementsAppended++;
0618 for (WritableColumnVector c: childColumns) {
0619 if (c.type instanceof StructType) {
0620 c.appendStruct(true);
0621 } else {
0622 c.appendNull();
0623 }
0624 }
0625 } else {
0626 appendNotNull();
0627 }
0628 return elementsAppended;
0629 }
0630
0631
0632
0633 @Override
0634 public final ColumnarArray getArray(int rowId) {
0635 if (isNullAt(rowId)) return null;
0636 return new ColumnarArray(arrayData(), getArrayOffset(rowId), getArrayLength(rowId));
0637 }
0638
0639
0640
0641 @Override
0642 public final ColumnarMap getMap(int rowId) {
0643 if (isNullAt(rowId)) return null;
0644 return new ColumnarMap(getChild(0), getChild(1), getArrayOffset(rowId), getArrayLength(rowId));
0645 }
0646
0647 public WritableColumnVector arrayData() {
0648 return childColumns[0];
0649 }
0650
0651 public abstract int getArrayLength(int rowId);
0652
0653 public abstract int getArrayOffset(int rowId);
0654
0655 @Override
0656 public WritableColumnVector getChild(int ordinal) { return childColumns[ordinal]; }
0657
0658
0659
0660
0661 public final int getElementsAppended() { return elementsAppended; }
0662
0663
0664
0665
0666 public final void setIsConstant() { isConstant = true; }
0667
0668
0669
0670
0671 protected int capacity;
0672
0673
0674
0675
0676 @VisibleForTesting
0677 protected int MAX_CAPACITY = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH;
0678
0679
0680
0681
0682 protected int numNulls;
0683
0684
0685
0686
0687
0688 protected boolean isConstant;
0689
0690
0691
0692
0693 protected static final int DEFAULT_ARRAY_LENGTH = 4;
0694
0695
0696
0697
0698 protected int elementsAppended;
0699
0700
0701
0702
0703 protected WritableColumnVector[] childColumns;
0704
0705
0706
0707
0708 protected abstract WritableColumnVector reserveNewColumn(int capacity, DataType type);
0709
0710 protected boolean isArray() {
0711 return type instanceof ArrayType || type instanceof BinaryType || type instanceof StringType ||
0712 DecimalType.isByteArrayDecimalType(type);
0713 }
0714
0715
0716
0717
0718
0719 protected WritableColumnVector(int capacity, DataType type) {
0720 super(type);
0721 this.capacity = capacity;
0722
0723 if (isArray()) {
0724 DataType childType;
0725 int childCapacity = capacity;
0726 if (type instanceof ArrayType) {
0727 childType = ((ArrayType)type).elementType();
0728 } else {
0729 childType = DataTypes.ByteType;
0730 childCapacity *= DEFAULT_ARRAY_LENGTH;
0731 }
0732 this.childColumns = new WritableColumnVector[1];
0733 this.childColumns[0] = reserveNewColumn(childCapacity, childType);
0734 } else if (type instanceof StructType) {
0735 StructType st = (StructType)type;
0736 this.childColumns = new WritableColumnVector[st.fields().length];
0737 for (int i = 0; i < childColumns.length; ++i) {
0738 this.childColumns[i] = reserveNewColumn(capacity, st.fields()[i].dataType());
0739 }
0740 } else if (type instanceof MapType) {
0741 MapType mapType = (MapType) type;
0742 this.childColumns = new WritableColumnVector[2];
0743 this.childColumns[0] = reserveNewColumn(capacity, mapType.keyType());
0744 this.childColumns[1] = reserveNewColumn(capacity, mapType.valueType());
0745 } else if (type instanceof CalendarIntervalType) {
0746
0747 this.childColumns = new WritableColumnVector[3];
0748 this.childColumns[0] = reserveNewColumn(capacity, DataTypes.IntegerType);
0749 this.childColumns[1] = reserveNewColumn(capacity, DataTypes.IntegerType);
0750 this.childColumns[2] = reserveNewColumn(capacity, DataTypes.LongType);
0751 } else {
0752 this.childColumns = null;
0753 }
0754 }
0755 }