0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017 package org.apache.spark.sql.execution.vectorized;
0018
0019 import java.nio.ByteBuffer;
0020 import java.nio.ByteOrder;
0021 import java.util.Arrays;
0022
0023 import org.apache.spark.sql.types.*;
0024 import org.apache.spark.unsafe.Platform;
0025 import org.apache.spark.unsafe.types.UTF8String;
0026
0027
0028
0029
0030
0031 public final class OnHeapColumnVector extends WritableColumnVector {
0032
0033 private static final boolean bigEndianPlatform =
0034 ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN);
0035
0036
0037
0038
0039
0040
0041 public static OnHeapColumnVector[] allocateColumns(int capacity, StructType schema) {
0042 return allocateColumns(capacity, schema.fields());
0043 }
0044
0045
0046
0047
0048
0049
0050 public static OnHeapColumnVector[] allocateColumns(int capacity, StructField[] fields) {
0051 OnHeapColumnVector[] vectors = new OnHeapColumnVector[fields.length];
0052 for (int i = 0; i < fields.length; i++) {
0053 vectors[i] = new OnHeapColumnVector(capacity, fields[i].dataType());
0054 }
0055 return vectors;
0056 }
0057
0058
0059
0060
0061
0062 private byte[] nulls;
0063
0064
0065 private byte[] byteData;
0066 private short[] shortData;
0067 private int[] intData;
0068 private long[] longData;
0069 private float[] floatData;
0070 private double[] doubleData;
0071
0072
0073 private int[] arrayLengths;
0074 private int[] arrayOffsets;
0075
0076 public OnHeapColumnVector(int capacity, DataType type) {
0077 super(capacity, type);
0078
0079 reserveInternal(capacity);
0080 reset();
0081 }
0082
0083 @Override
0084 public void close() {
0085 super.close();
0086 nulls = null;
0087 byteData = null;
0088 shortData = null;
0089 intData = null;
0090 longData = null;
0091 floatData = null;
0092 doubleData = null;
0093 arrayLengths = null;
0094 arrayOffsets = null;
0095 }
0096
0097
0098
0099
0100
0101 @Override
0102 public void putNotNull(int rowId) {
0103 nulls[rowId] = (byte)0;
0104 }
0105
0106 @Override
0107 public void putNull(int rowId) {
0108 nulls[rowId] = (byte)1;
0109 ++numNulls;
0110 }
0111
0112 @Override
0113 public void putNulls(int rowId, int count) {
0114 for (int i = 0; i < count; ++i) {
0115 nulls[rowId + i] = (byte)1;
0116 }
0117 numNulls += count;
0118 }
0119
0120 @Override
0121 public void putNotNulls(int rowId, int count) {
0122 if (!hasNull()) return;
0123 for (int i = 0; i < count; ++i) {
0124 nulls[rowId + i] = (byte)0;
0125 }
0126 }
0127
0128 @Override
0129 public boolean isNullAt(int rowId) {
0130 return nulls[rowId] == 1;
0131 }
0132
0133
0134
0135
0136
0137 @Override
0138 public void putBoolean(int rowId, boolean value) {
0139 byteData[rowId] = (byte)((value) ? 1 : 0);
0140 }
0141
0142 @Override
0143 public void putBooleans(int rowId, int count, boolean value) {
0144 byte v = (byte)((value) ? 1 : 0);
0145 for (int i = 0; i < count; ++i) {
0146 byteData[i + rowId] = v;
0147 }
0148 }
0149
0150 @Override
0151 public boolean getBoolean(int rowId) {
0152 return byteData[rowId] == 1;
0153 }
0154
0155 @Override
0156 public boolean[] getBooleans(int rowId, int count) {
0157 assert(dictionary == null);
0158 boolean[] array = new boolean[count];
0159 for (int i = 0; i < count; ++i) {
0160 array[i] = (byteData[rowId + i] == 1);
0161 }
0162 return array;
0163 }
0164
0165
0166
0167
0168
0169
0170
0171 @Override
0172 public void putByte(int rowId, byte value) {
0173 byteData[rowId] = value;
0174 }
0175
0176 @Override
0177 public void putBytes(int rowId, int count, byte value) {
0178 for (int i = 0; i < count; ++i) {
0179 byteData[i + rowId] = value;
0180 }
0181 }
0182
0183 @Override
0184 public void putBytes(int rowId, int count, byte[] src, int srcIndex) {
0185 System.arraycopy(src, srcIndex, byteData, rowId, count);
0186 }
0187
0188 @Override
0189 public byte getByte(int rowId) {
0190 if (dictionary == null) {
0191 return byteData[rowId];
0192 } else {
0193 return (byte) dictionary.decodeToInt(dictionaryIds.getDictId(rowId));
0194 }
0195 }
0196
0197 @Override
0198 public byte[] getBytes(int rowId, int count) {
0199 assert(dictionary == null);
0200 byte[] array = new byte[count];
0201 System.arraycopy(byteData, rowId, array, 0, count);
0202 return array;
0203 }
0204
0205 @Override
0206 protected UTF8String getBytesAsUTF8String(int rowId, int count) {
0207 return UTF8String.fromBytes(byteData, rowId, count);
0208 }
0209
0210
0211
0212
0213
0214 @Override
0215 public void putShort(int rowId, short value) {
0216 shortData[rowId] = value;
0217 }
0218
0219 @Override
0220 public void putShorts(int rowId, int count, short value) {
0221 for (int i = 0; i < count; ++i) {
0222 shortData[i + rowId] = value;
0223 }
0224 }
0225
0226 @Override
0227 public void putShorts(int rowId, int count, short[] src, int srcIndex) {
0228 System.arraycopy(src, srcIndex, shortData, rowId, count);
0229 }
0230
0231 @Override
0232 public void putShorts(int rowId, int count, byte[] src, int srcIndex) {
0233 Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex, shortData,
0234 Platform.SHORT_ARRAY_OFFSET + rowId * 2L, count * 2L);
0235 }
0236
0237 @Override
0238 public short getShort(int rowId) {
0239 if (dictionary == null) {
0240 return shortData[rowId];
0241 } else {
0242 return (short) dictionary.decodeToInt(dictionaryIds.getDictId(rowId));
0243 }
0244 }
0245
0246 @Override
0247 public short[] getShorts(int rowId, int count) {
0248 assert(dictionary == null);
0249 short[] array = new short[count];
0250 System.arraycopy(shortData, rowId, array, 0, count);
0251 return array;
0252 }
0253
0254
0255
0256
0257
0258
0259 @Override
0260 public void putInt(int rowId, int value) {
0261 intData[rowId] = value;
0262 }
0263
0264 @Override
0265 public void putInts(int rowId, int count, int value) {
0266 for (int i = 0; i < count; ++i) {
0267 intData[i + rowId] = value;
0268 }
0269 }
0270
0271 @Override
0272 public void putInts(int rowId, int count, int[] src, int srcIndex) {
0273 System.arraycopy(src, srcIndex, intData, rowId, count);
0274 }
0275
0276 @Override
0277 public void putInts(int rowId, int count, byte[] src, int srcIndex) {
0278 Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex, intData,
0279 Platform.INT_ARRAY_OFFSET + rowId * 4L, count * 4L);
0280 }
0281
0282 @Override
0283 public void putIntsLittleEndian(int rowId, int count, byte[] src, int srcIndex) {
0284 int srcOffset = srcIndex + Platform.BYTE_ARRAY_OFFSET;
0285 for (int i = 0; i < count; ++i, srcOffset += 4) {
0286 intData[i + rowId] = Platform.getInt(src, srcOffset);
0287 if (bigEndianPlatform) {
0288 intData[i + rowId] = java.lang.Integer.reverseBytes(intData[i + rowId]);
0289 }
0290 }
0291 }
0292
0293 @Override
0294 public int getInt(int rowId) {
0295 if (dictionary == null) {
0296 return intData[rowId];
0297 } else {
0298 return dictionary.decodeToInt(dictionaryIds.getDictId(rowId));
0299 }
0300 }
0301
0302 @Override
0303 public int[] getInts(int rowId, int count) {
0304 assert(dictionary == null);
0305 int[] array = new int[count];
0306 System.arraycopy(intData, rowId, array, 0, count);
0307 return array;
0308 }
0309
0310
0311
0312
0313
0314
0315 public int getDictId(int rowId) {
0316 assert(dictionary == null)
0317 : "A ColumnVector dictionary should not have a dictionary for itself.";
0318 return intData[rowId];
0319 }
0320
0321
0322
0323
0324
0325 @Override
0326 public void putLong(int rowId, long value) {
0327 longData[rowId] = value;
0328 }
0329
0330 @Override
0331 public void putLongs(int rowId, int count, long value) {
0332 for (int i = 0; i < count; ++i) {
0333 longData[i + rowId] = value;
0334 }
0335 }
0336
0337 @Override
0338 public void putLongs(int rowId, int count, long[] src, int srcIndex) {
0339 System.arraycopy(src, srcIndex, longData, rowId, count);
0340 }
0341
0342 @Override
0343 public void putLongs(int rowId, int count, byte[] src, int srcIndex) {
0344 Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex, longData,
0345 Platform.LONG_ARRAY_OFFSET + rowId * 8L, count * 8L);
0346 }
0347
0348 @Override
0349 public void putLongsLittleEndian(int rowId, int count, byte[] src, int srcIndex) {
0350 int srcOffset = srcIndex + Platform.BYTE_ARRAY_OFFSET;
0351 for (int i = 0; i < count; ++i, srcOffset += 8) {
0352 longData[i + rowId] = Platform.getLong(src, srcOffset);
0353 if (bigEndianPlatform) {
0354 longData[i + rowId] = java.lang.Long.reverseBytes(longData[i + rowId]);
0355 }
0356 }
0357 }
0358
0359 @Override
0360 public long getLong(int rowId) {
0361 if (dictionary == null) {
0362 return longData[rowId];
0363 } else {
0364 return dictionary.decodeToLong(dictionaryIds.getDictId(rowId));
0365 }
0366 }
0367
0368 @Override
0369 public long[] getLongs(int rowId, int count) {
0370 assert(dictionary == null);
0371 long[] array = new long[count];
0372 System.arraycopy(longData, rowId, array, 0, count);
0373 return array;
0374 }
0375
0376
0377
0378
0379
0380 @Override
0381 public void putFloat(int rowId, float value) { floatData[rowId] = value; }
0382
0383 @Override
0384 public void putFloats(int rowId, int count, float value) {
0385 Arrays.fill(floatData, rowId, rowId + count, value);
0386 }
0387
0388 @Override
0389 public void putFloats(int rowId, int count, float[] src, int srcIndex) {
0390 System.arraycopy(src, srcIndex, floatData, rowId, count);
0391 }
0392
0393 @Override
0394 public void putFloats(int rowId, int count, byte[] src, int srcIndex) {
0395 if (!bigEndianPlatform) {
0396 Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex, floatData,
0397 Platform.DOUBLE_ARRAY_OFFSET + rowId * 4L, count * 4L);
0398 } else {
0399 ByteBuffer bb = ByteBuffer.wrap(src).order(ByteOrder.BIG_ENDIAN);
0400 for (int i = 0; i < count; ++i) {
0401 floatData[i + rowId] = bb.getFloat(srcIndex + (4 * i));
0402 }
0403 }
0404 }
0405
0406 @Override
0407 public float getFloat(int rowId) {
0408 if (dictionary == null) {
0409 return floatData[rowId];
0410 } else {
0411 return dictionary.decodeToFloat(dictionaryIds.getDictId(rowId));
0412 }
0413 }
0414
0415 @Override
0416 public float[] getFloats(int rowId, int count) {
0417 assert(dictionary == null);
0418 float[] array = new float[count];
0419 System.arraycopy(floatData, rowId, array, 0, count);
0420 return array;
0421 }
0422
0423
0424
0425
0426
0427 @Override
0428 public void putDouble(int rowId, double value) {
0429 doubleData[rowId] = value;
0430 }
0431
0432 @Override
0433 public void putDoubles(int rowId, int count, double value) {
0434 Arrays.fill(doubleData, rowId, rowId + count, value);
0435 }
0436
0437 @Override
0438 public void putDoubles(int rowId, int count, double[] src, int srcIndex) {
0439 System.arraycopy(src, srcIndex, doubleData, rowId, count);
0440 }
0441
0442 @Override
0443 public void putDoubles(int rowId, int count, byte[] src, int srcIndex) {
0444 if (!bigEndianPlatform) {
0445 Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex, doubleData,
0446 Platform.DOUBLE_ARRAY_OFFSET + rowId * 8L, count * 8L);
0447 } else {
0448 ByteBuffer bb = ByteBuffer.wrap(src).order(ByteOrder.BIG_ENDIAN);
0449 for (int i = 0; i < count; ++i) {
0450 doubleData[i + rowId] = bb.getDouble(srcIndex + (8 * i));
0451 }
0452 }
0453 }
0454
0455 @Override
0456 public double getDouble(int rowId) {
0457 if (dictionary == null) {
0458 return doubleData[rowId];
0459 } else {
0460 return dictionary.decodeToDouble(dictionaryIds.getDictId(rowId));
0461 }
0462 }
0463
0464 @Override
0465 public double[] getDoubles(int rowId, int count) {
0466 assert(dictionary == null);
0467 double[] array = new double[count];
0468 System.arraycopy(doubleData, rowId, array, 0, count);
0469 return array;
0470 }
0471
0472
0473
0474
0475
0476 @Override
0477 public int getArrayLength(int rowId) {
0478 return arrayLengths[rowId];
0479 }
0480 @Override
0481 public int getArrayOffset(int rowId) {
0482 return arrayOffsets[rowId];
0483 }
0484
0485 @Override
0486 public void putArray(int rowId, int offset, int length) {
0487 arrayOffsets[rowId] = offset;
0488 arrayLengths[rowId] = length;
0489 }
0490
0491
0492
0493
0494
0495 @Override
0496 public int putByteArray(int rowId, byte[] value, int offset, int length) {
0497 int result = arrayData().appendBytes(length, value, offset);
0498 arrayOffsets[rowId] = result;
0499 arrayLengths[rowId] = length;
0500 return result;
0501 }
0502
0503
0504 @Override
0505 protected void reserveInternal(int newCapacity) {
0506 if (isArray() || type instanceof MapType) {
0507 int[] newLengths = new int[newCapacity];
0508 int[] newOffsets = new int[newCapacity];
0509 if (this.arrayLengths != null) {
0510 System.arraycopy(this.arrayLengths, 0, newLengths, 0, capacity);
0511 System.arraycopy(this.arrayOffsets, 0, newOffsets, 0, capacity);
0512 }
0513 arrayLengths = newLengths;
0514 arrayOffsets = newOffsets;
0515 } else if (type instanceof BooleanType) {
0516 if (byteData == null || byteData.length < newCapacity) {
0517 byte[] newData = new byte[newCapacity];
0518 if (byteData != null) System.arraycopy(byteData, 0, newData, 0, capacity);
0519 byteData = newData;
0520 }
0521 } else if (type instanceof ByteType) {
0522 if (byteData == null || byteData.length < newCapacity) {
0523 byte[] newData = new byte[newCapacity];
0524 if (byteData != null) System.arraycopy(byteData, 0, newData, 0, capacity);
0525 byteData = newData;
0526 }
0527 } else if (type instanceof ShortType) {
0528 if (shortData == null || shortData.length < newCapacity) {
0529 short[] newData = new short[newCapacity];
0530 if (shortData != null) System.arraycopy(shortData, 0, newData, 0, capacity);
0531 shortData = newData;
0532 }
0533 } else if (type instanceof IntegerType || type instanceof DateType ||
0534 DecimalType.is32BitDecimalType(type)) {
0535 if (intData == null || intData.length < newCapacity) {
0536 int[] newData = new int[newCapacity];
0537 if (intData != null) System.arraycopy(intData, 0, newData, 0, capacity);
0538 intData = newData;
0539 }
0540 } else if (type instanceof LongType || type instanceof TimestampType ||
0541 DecimalType.is64BitDecimalType(type)) {
0542 if (longData == null || longData.length < newCapacity) {
0543 long[] newData = new long[newCapacity];
0544 if (longData != null) System.arraycopy(longData, 0, newData, 0, capacity);
0545 longData = newData;
0546 }
0547 } else if (type instanceof FloatType) {
0548 if (floatData == null || floatData.length < newCapacity) {
0549 float[] newData = new float[newCapacity];
0550 if (floatData != null) System.arraycopy(floatData, 0, newData, 0, capacity);
0551 floatData = newData;
0552 }
0553 } else if (type instanceof DoubleType) {
0554 if (doubleData == null || doubleData.length < newCapacity) {
0555 double[] newData = new double[newCapacity];
0556 if (doubleData != null) System.arraycopy(doubleData, 0, newData, 0, capacity);
0557 doubleData = newData;
0558 }
0559 } else if (childColumns != null) {
0560
0561 } else {
0562 throw new RuntimeException("Unhandled " + type);
0563 }
0564
0565 byte[] newNulls = new byte[newCapacity];
0566 if (nulls != null) System.arraycopy(nulls, 0, newNulls, 0, capacity);
0567 nulls = newNulls;
0568
0569 capacity = newCapacity;
0570 }
0571
0572 @Override
0573 protected OnHeapColumnVector reserveNewColumn(int capacity, DataType type) {
0574 return new OnHeapColumnVector(capacity, type);
0575 }
0576 }