0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017 package org.apache.spark.sql.execution.vectorized;
0018
0019 import java.nio.ByteBuffer;
0020 import java.nio.ByteOrder;
0021
0022 import com.google.common.annotations.VisibleForTesting;
0023
0024 import org.apache.spark.sql.types.*;
0025 import org.apache.spark.unsafe.Platform;
0026 import org.apache.spark.unsafe.types.UTF8String;
0027
0028
0029
0030
0031 public final class OffHeapColumnVector extends WritableColumnVector {
0032
0033 private static final boolean bigEndianPlatform =
0034 ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN);
0035
0036
0037
0038
0039
0040
0041 public static OffHeapColumnVector[] allocateColumns(int capacity, StructType schema) {
0042 return allocateColumns(capacity, schema.fields());
0043 }
0044
0045
0046
0047
0048
0049
0050 public static OffHeapColumnVector[] allocateColumns(int capacity, StructField[] fields) {
0051 OffHeapColumnVector[] vectors = new OffHeapColumnVector[fields.length];
0052 for (int i = 0; i < fields.length; i++) {
0053 vectors[i] = new OffHeapColumnVector(capacity, fields[i].dataType());
0054 }
0055 return vectors;
0056 }
0057
0058
0059
0060 private long nulls;
0061 private long data;
0062
0063
0064 private long lengthData;
0065 private long offsetData;
0066
0067 public OffHeapColumnVector(int capacity, DataType type) {
0068 super(capacity, type);
0069
0070 nulls = 0;
0071 data = 0;
0072 lengthData = 0;
0073 offsetData = 0;
0074
0075 reserveInternal(capacity);
0076 reset();
0077 }
0078
0079
0080
0081
0082 @VisibleForTesting
0083 public long valuesNativeAddress() {
0084 return data;
0085 }
0086
0087 @Override
0088 public void close() {
0089 super.close();
0090 Platform.freeMemory(nulls);
0091 Platform.freeMemory(data);
0092 Platform.freeMemory(lengthData);
0093 Platform.freeMemory(offsetData);
0094 nulls = 0;
0095 data = 0;
0096 lengthData = 0;
0097 offsetData = 0;
0098 }
0099
0100
0101
0102
0103
0104 @Override
0105 public void putNotNull(int rowId) {
0106 Platform.putByte(null, nulls + rowId, (byte) 0);
0107 }
0108
0109 @Override
0110 public void putNull(int rowId) {
0111 Platform.putByte(null, nulls + rowId, (byte) 1);
0112 ++numNulls;
0113 }
0114
0115 @Override
0116 public void putNulls(int rowId, int count) {
0117 long offset = nulls + rowId;
0118 for (int i = 0; i < count; ++i, ++offset) {
0119 Platform.putByte(null, offset, (byte) 1);
0120 }
0121 numNulls += count;
0122 }
0123
0124 @Override
0125 public void putNotNulls(int rowId, int count) {
0126 if (!hasNull()) return;
0127 long offset = nulls + rowId;
0128 for (int i = 0; i < count; ++i, ++offset) {
0129 Platform.putByte(null, offset, (byte) 0);
0130 }
0131 }
0132
0133 @Override
0134 public boolean isNullAt(int rowId) {
0135 return Platform.getByte(null, nulls + rowId) == 1;
0136 }
0137
0138
0139
0140
0141
0142 @Override
0143 public void putBoolean(int rowId, boolean value) {
0144 Platform.putByte(null, data + rowId, (byte)((value) ? 1 : 0));
0145 }
0146
0147 @Override
0148 public void putBooleans(int rowId, int count, boolean value) {
0149 byte v = (byte)((value) ? 1 : 0);
0150 for (int i = 0; i < count; ++i) {
0151 Platform.putByte(null, data + rowId + i, v);
0152 }
0153 }
0154
0155 @Override
0156 public boolean getBoolean(int rowId) { return Platform.getByte(null, data + rowId) == 1; }
0157
0158 @Override
0159 public boolean[] getBooleans(int rowId, int count) {
0160 assert(dictionary == null);
0161 boolean[] array = new boolean[count];
0162 for (int i = 0; i < count; ++i) {
0163 array[i] = (Platform.getByte(null, data + rowId + i) == 1);
0164 }
0165 return array;
0166 }
0167
0168
0169
0170
0171
0172 @Override
0173 public void putByte(int rowId, byte value) {
0174 Platform.putByte(null, data + rowId, value);
0175
0176 }
0177
0178 @Override
0179 public void putBytes(int rowId, int count, byte value) {
0180 for (int i = 0; i < count; ++i) {
0181 Platform.putByte(null, data + rowId + i, value);
0182 }
0183 }
0184
0185 @Override
0186 public void putBytes(int rowId, int count, byte[] src, int srcIndex) {
0187 Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex, null, data + rowId, count);
0188 }
0189
0190 @Override
0191 public byte getByte(int rowId) {
0192 if (dictionary == null) {
0193 return Platform.getByte(null, data + rowId);
0194 } else {
0195 return (byte) dictionary.decodeToInt(dictionaryIds.getDictId(rowId));
0196 }
0197 }
0198
0199 @Override
0200 public byte[] getBytes(int rowId, int count) {
0201 assert(dictionary == null);
0202 byte[] array = new byte[count];
0203 Platform.copyMemory(null, data + rowId, array, Platform.BYTE_ARRAY_OFFSET, count);
0204 return array;
0205 }
0206
0207 @Override
0208 protected UTF8String getBytesAsUTF8String(int rowId, int count) {
0209 return UTF8String.fromAddress(null, data + rowId, count);
0210 }
0211
0212
0213
0214
0215
0216 @Override
0217 public void putShort(int rowId, short value) {
0218 Platform.putShort(null, data + 2L * rowId, value);
0219 }
0220
0221 @Override
0222 public void putShorts(int rowId, int count, short value) {
0223 long offset = data + 2L * rowId;
0224 for (int i = 0; i < count; ++i, offset += 2) {
0225 Platform.putShort(null, offset, value);
0226 }
0227 }
0228
0229 @Override
0230 public void putShorts(int rowId, int count, short[] src, int srcIndex) {
0231 Platform.copyMemory(src, Platform.SHORT_ARRAY_OFFSET + srcIndex * 2L,
0232 null, data + 2L * rowId, count * 2L);
0233 }
0234
0235 @Override
0236 public void putShorts(int rowId, int count, byte[] src, int srcIndex) {
0237 Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex,
0238 null, data + rowId * 2L, count * 2L);
0239 }
0240
0241 @Override
0242 public short getShort(int rowId) {
0243 if (dictionary == null) {
0244 return Platform.getShort(null, data + 2L * rowId);
0245 } else {
0246 return (short) dictionary.decodeToInt(dictionaryIds.getDictId(rowId));
0247 }
0248 }
0249
0250 @Override
0251 public short[] getShorts(int rowId, int count) {
0252 assert(dictionary == null);
0253 short[] array = new short[count];
0254 Platform.copyMemory(null, data + rowId * 2L, array, Platform.SHORT_ARRAY_OFFSET, count * 2L);
0255 return array;
0256 }
0257
0258
0259
0260
0261
0262 @Override
0263 public void putInt(int rowId, int value) {
0264 Platform.putInt(null, data + 4L * rowId, value);
0265 }
0266
0267 @Override
0268 public void putInts(int rowId, int count, int value) {
0269 long offset = data + 4L * rowId;
0270 for (int i = 0; i < count; ++i, offset += 4) {
0271 Platform.putInt(null, offset, value);
0272 }
0273 }
0274
0275 @Override
0276 public void putInts(int rowId, int count, int[] src, int srcIndex) {
0277 Platform.copyMemory(src, Platform.INT_ARRAY_OFFSET + srcIndex * 4L,
0278 null, data + 4L * rowId, count * 4L);
0279 }
0280
0281 @Override
0282 public void putInts(int rowId, int count, byte[] src, int srcIndex) {
0283 Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex,
0284 null, data + rowId * 4L, count * 4L);
0285 }
0286
0287 @Override
0288 public void putIntsLittleEndian(int rowId, int count, byte[] src, int srcIndex) {
0289 if (!bigEndianPlatform) {
0290 Platform.copyMemory(src, srcIndex + Platform.BYTE_ARRAY_OFFSET,
0291 null, data + 4L * rowId, count * 4L);
0292 } else {
0293 int srcOffset = srcIndex + Platform.BYTE_ARRAY_OFFSET;
0294 long offset = data + 4L * rowId;
0295 for (int i = 0; i < count; ++i, offset += 4, srcOffset += 4) {
0296 Platform.putInt(null, offset,
0297 java.lang.Integer.reverseBytes(Platform.getInt(src, srcOffset)));
0298 }
0299 }
0300 }
0301
0302 @Override
0303 public int getInt(int rowId) {
0304 if (dictionary == null) {
0305 return Platform.getInt(null, data + 4L * rowId);
0306 } else {
0307 return dictionary.decodeToInt(dictionaryIds.getDictId(rowId));
0308 }
0309 }
0310
0311 @Override
0312 public int[] getInts(int rowId, int count) {
0313 assert(dictionary == null);
0314 int[] array = new int[count];
0315 Platform.copyMemory(null, data + rowId * 4L, array, Platform.INT_ARRAY_OFFSET, count * 4L);
0316 return array;
0317 }
0318
0319
0320
0321
0322
0323
0324 public int getDictId(int rowId) {
0325 assert(dictionary == null)
0326 : "A ColumnVector dictionary should not have a dictionary for itself.";
0327 return Platform.getInt(null, data + 4L * rowId);
0328 }
0329
0330
0331
0332
0333
0334 @Override
0335 public void putLong(int rowId, long value) {
0336 Platform.putLong(null, data + 8L * rowId, value);
0337 }
0338
0339 @Override
0340 public void putLongs(int rowId, int count, long value) {
0341 long offset = data + 8L * rowId;
0342 for (int i = 0; i < count; ++i, offset += 8) {
0343 Platform.putLong(null, offset, value);
0344 }
0345 }
0346
0347 @Override
0348 public void putLongs(int rowId, int count, long[] src, int srcIndex) {
0349 Platform.copyMemory(src, Platform.LONG_ARRAY_OFFSET + srcIndex * 8L,
0350 null, data + 8L * rowId, count * 8L);
0351 }
0352
0353 @Override
0354 public void putLongs(int rowId, int count, byte[] src, int srcIndex) {
0355 Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex,
0356 null, data + rowId * 8L, count * 8L);
0357 }
0358
0359 @Override
0360 public void putLongsLittleEndian(int rowId, int count, byte[] src, int srcIndex) {
0361 if (!bigEndianPlatform) {
0362 Platform.copyMemory(src, srcIndex + Platform.BYTE_ARRAY_OFFSET,
0363 null, data + 8L * rowId, count * 8L);
0364 } else {
0365 int srcOffset = srcIndex + Platform.BYTE_ARRAY_OFFSET;
0366 long offset = data + 8L * rowId;
0367 for (int i = 0; i < count; ++i, offset += 8, srcOffset += 8) {
0368 Platform.putLong(null, offset,
0369 java.lang.Long.reverseBytes(Platform.getLong(src, srcOffset)));
0370 }
0371 }
0372 }
0373
0374 @Override
0375 public long getLong(int rowId) {
0376 if (dictionary == null) {
0377 return Platform.getLong(null, data + 8L * rowId);
0378 } else {
0379 return dictionary.decodeToLong(dictionaryIds.getDictId(rowId));
0380 }
0381 }
0382
0383 @Override
0384 public long[] getLongs(int rowId, int count) {
0385 assert(dictionary == null);
0386 long[] array = new long[count];
0387 Platform.copyMemory(null, data + rowId * 8L, array, Platform.LONG_ARRAY_OFFSET, count * 8L);
0388 return array;
0389 }
0390
0391
0392
0393
0394
0395 @Override
0396 public void putFloat(int rowId, float value) {
0397 Platform.putFloat(null, data + rowId * 4L, value);
0398 }
0399
0400 @Override
0401 public void putFloats(int rowId, int count, float value) {
0402 long offset = data + 4L * rowId;
0403 for (int i = 0; i < count; ++i, offset += 4) {
0404 Platform.putFloat(null, offset, value);
0405 }
0406 }
0407
0408 @Override
0409 public void putFloats(int rowId, int count, float[] src, int srcIndex) {
0410 Platform.copyMemory(src, Platform.FLOAT_ARRAY_OFFSET + srcIndex * 4L,
0411 null, data + 4L * rowId, count * 4L);
0412 }
0413
0414 @Override
0415 public void putFloats(int rowId, int count, byte[] src, int srcIndex) {
0416 if (!bigEndianPlatform) {
0417 Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex,
0418 null, data + rowId * 4L, count * 4L);
0419 } else {
0420 ByteBuffer bb = ByteBuffer.wrap(src).order(ByteOrder.BIG_ENDIAN);
0421 long offset = data + 4L * rowId;
0422 for (int i = 0; i < count; ++i, offset += 4) {
0423 Platform.putFloat(null, offset, bb.getFloat(srcIndex + (4 * i)));
0424 }
0425 }
0426 }
0427
0428 @Override
0429 public float getFloat(int rowId) {
0430 if (dictionary == null) {
0431 return Platform.getFloat(null, data + rowId * 4L);
0432 } else {
0433 return dictionary.decodeToFloat(dictionaryIds.getDictId(rowId));
0434 }
0435 }
0436
0437 @Override
0438 public float[] getFloats(int rowId, int count) {
0439 assert(dictionary == null);
0440 float[] array = new float[count];
0441 Platform.copyMemory(null, data + rowId * 4L, array, Platform.FLOAT_ARRAY_OFFSET, count * 4L);
0442 return array;
0443 }
0444
0445
0446
0447
0448
0449
0450 @Override
0451 public void putDouble(int rowId, double value) {
0452 Platform.putDouble(null, data + rowId * 8L, value);
0453 }
0454
0455 @Override
0456 public void putDoubles(int rowId, int count, double value) {
0457 long offset = data + 8L * rowId;
0458 for (int i = 0; i < count; ++i, offset += 8) {
0459 Platform.putDouble(null, offset, value);
0460 }
0461 }
0462
0463 @Override
0464 public void putDoubles(int rowId, int count, double[] src, int srcIndex) {
0465 Platform.copyMemory(src, Platform.DOUBLE_ARRAY_OFFSET + srcIndex * 8L,
0466 null, data + 8L * rowId, count * 8L);
0467 }
0468
0469 @Override
0470 public void putDoubles(int rowId, int count, byte[] src, int srcIndex) {
0471 if (!bigEndianPlatform) {
0472 Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex,
0473 null, data + rowId * 8L, count * 8L);
0474 } else {
0475 ByteBuffer bb = ByteBuffer.wrap(src).order(ByteOrder.BIG_ENDIAN);
0476 long offset = data + 8L * rowId;
0477 for (int i = 0; i < count; ++i, offset += 8) {
0478 Platform.putDouble(null, offset, bb.getDouble(srcIndex + (8 * i)));
0479 }
0480 }
0481 }
0482
0483 @Override
0484 public double getDouble(int rowId) {
0485 if (dictionary == null) {
0486 return Platform.getDouble(null, data + rowId * 8L);
0487 } else {
0488 return dictionary.decodeToDouble(dictionaryIds.getDictId(rowId));
0489 }
0490 }
0491
0492 @Override
0493 public double[] getDoubles(int rowId, int count) {
0494 assert(dictionary == null);
0495 double[] array = new double[count];
0496 Platform.copyMemory(null, data + rowId * 8L, array, Platform.DOUBLE_ARRAY_OFFSET, count * 8L);
0497 return array;
0498 }
0499
0500
0501
0502
0503 @Override
0504 public void putArray(int rowId, int offset, int length) {
0505 assert(offset >= 0 && offset + length <= childColumns[0].capacity);
0506 Platform.putInt(null, lengthData + 4L * rowId, length);
0507 Platform.putInt(null, offsetData + 4L * rowId, offset);
0508 }
0509
0510 @Override
0511 public int getArrayLength(int rowId) {
0512 return Platform.getInt(null, lengthData + 4L * rowId);
0513 }
0514
0515 @Override
0516 public int getArrayOffset(int rowId) {
0517 return Platform.getInt(null, offsetData + 4L * rowId);
0518 }
0519
0520
0521 @Override
0522 public int putByteArray(int rowId, byte[] value, int offset, int length) {
0523 int result = arrayData().appendBytes(length, value, offset);
0524 Platform.putInt(null, lengthData + 4L * rowId, length);
0525 Platform.putInt(null, offsetData + 4L * rowId, result);
0526 return result;
0527 }
0528
0529
0530 @Override
0531 protected void reserveInternal(int newCapacity) {
0532 int oldCapacity = (nulls == 0L) ? 0 : capacity;
0533 if (isArray() || type instanceof MapType) {
0534 this.lengthData =
0535 Platform.reallocateMemory(lengthData, oldCapacity * 4L, newCapacity * 4L);
0536 this.offsetData =
0537 Platform.reallocateMemory(offsetData, oldCapacity * 4L, newCapacity * 4L);
0538 } else if (type instanceof ByteType || type instanceof BooleanType) {
0539 this.data = Platform.reallocateMemory(data, oldCapacity, newCapacity);
0540 } else if (type instanceof ShortType) {
0541 this.data = Platform.reallocateMemory(data, oldCapacity * 2L, newCapacity * 2L);
0542 } else if (type instanceof IntegerType || type instanceof FloatType ||
0543 type instanceof DateType || DecimalType.is32BitDecimalType(type)) {
0544 this.data = Platform.reallocateMemory(data, oldCapacity * 4L, newCapacity * 4L);
0545 } else if (type instanceof LongType || type instanceof DoubleType ||
0546 DecimalType.is64BitDecimalType(type) || type instanceof TimestampType) {
0547 this.data = Platform.reallocateMemory(data, oldCapacity * 8L, newCapacity * 8L);
0548 } else if (childColumns != null) {
0549
0550 } else {
0551 throw new RuntimeException("Unhandled " + type);
0552 }
0553 this.nulls = Platform.reallocateMemory(nulls, oldCapacity, newCapacity);
0554 Platform.setMemory(nulls + oldCapacity, (byte)0, newCapacity - oldCapacity);
0555 capacity = newCapacity;
0556 }
0557
0558 @Override
0559 protected OffHeapColumnVector reserveNewColumn(int capacity, DataType type) {
0560 return new OffHeapColumnVector(capacity, type);
0561 }
0562 }