0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.sql.catalyst.expressions;
0019
0020 import java.io.*;
0021 import java.math.BigDecimal;
0022 import java.math.BigInteger;
0023 import java.nio.ByteBuffer;
0024 import java.util.Arrays;
0025 import java.util.Collections;
0026 import java.util.HashSet;
0027 import java.util.Set;
0028
0029 import com.esotericsoftware.kryo.Kryo;
0030 import com.esotericsoftware.kryo.KryoSerializable;
0031 import com.esotericsoftware.kryo.io.Input;
0032 import com.esotericsoftware.kryo.io.Output;
0033
0034 import org.apache.spark.sql.catalyst.InternalRow;
0035 import org.apache.spark.sql.types.*;
0036 import org.apache.spark.unsafe.Platform;
0037 import org.apache.spark.unsafe.array.ByteArrayMethods;
0038 import org.apache.spark.unsafe.bitset.BitSetMethods;
0039 import org.apache.spark.unsafe.hash.Murmur3_x86_32;
0040 import org.apache.spark.unsafe.types.CalendarInterval;
0041 import org.apache.spark.unsafe.types.UTF8String;
0042
0043 import static org.apache.spark.sql.types.DataTypes.*;
0044 import static org.apache.spark.unsafe.Platform.BYTE_ARRAY_OFFSET;
0045
0046
0047
0048
0049
0050
0051
0052
0053
0054
0055
0056
0057
0058
0059
0060
0061
0062 public final class UnsafeRow extends InternalRow implements Externalizable, KryoSerializable {
0063
0064 public static final int WORD_SIZE = 8;
0065
0066
0067
0068
0069
0070 public static int calculateBitSetWidthInBytes(int numFields) {
0071 return ((numFields + 63)/ 64) * 8;
0072 }
0073
0074
0075
0076
0077 public static final Set<DataType> mutableFieldTypes;
0078
0079
0080 static {
0081 mutableFieldTypes = Collections.unmodifiableSet(
0082 new HashSet<>(
0083 Arrays.asList(new DataType[] {
0084 NullType,
0085 BooleanType,
0086 ByteType,
0087 ShortType,
0088 IntegerType,
0089 LongType,
0090 FloatType,
0091 DoubleType,
0092 DateType,
0093 TimestampType
0094 })));
0095 }
0096
0097 public static boolean isFixedLength(DataType dt) {
0098 if (dt instanceof UserDefinedType) {
0099 return isFixedLength(((UserDefinedType) dt).sqlType());
0100 }
0101
0102 if (dt instanceof DecimalType) {
0103 return ((DecimalType) dt).precision() <= Decimal.MAX_LONG_DIGITS();
0104 } else {
0105 return mutableFieldTypes.contains(dt);
0106 }
0107 }
0108
0109 public static boolean isMutable(DataType dt) {
0110 if (dt instanceof UserDefinedType) {
0111 return isMutable(((UserDefinedType) dt).sqlType());
0112 }
0113
0114 return mutableFieldTypes.contains(dt) || dt instanceof DecimalType ||
0115 dt instanceof CalendarIntervalType;
0116 }
0117
0118
0119
0120
0121
0122 private Object baseObject;
0123 private long baseOffset;
0124
0125
0126 private int numFields;
0127
0128
0129 private int sizeInBytes;
0130
0131
0132 private int bitSetWidthInBytes;
0133
0134 private long getFieldOffset(int ordinal) {
0135 return baseOffset + bitSetWidthInBytes + ordinal * 8L;
0136 }
0137
0138 private void assertIndexIsValid(int index) {
0139 assert index >= 0 : "index (" + index + ") should >= 0";
0140 assert index < numFields : "index (" + index + ") should < " + numFields;
0141 }
0142
0143
0144
0145
0146
0147
0148
0149
0150
0151
0152
0153 public UnsafeRow(int numFields) {
0154 this.numFields = numFields;
0155 this.bitSetWidthInBytes = calculateBitSetWidthInBytes(numFields);
0156 }
0157
0158
0159 public UnsafeRow() {}
0160
0161 public Object getBaseObject() { return baseObject; }
0162 public long getBaseOffset() { return baseOffset; }
0163 public int getSizeInBytes() { return sizeInBytes; }
0164
0165 @Override
0166 public int numFields() { return numFields; }
0167
0168
0169
0170
0171
0172
0173
0174
0175 public void pointTo(Object baseObject, long baseOffset, int sizeInBytes) {
0176 assert numFields >= 0 : "numFields (" + numFields + ") should >= 0";
0177 assert sizeInBytes % 8 == 0 : "sizeInBytes (" + sizeInBytes + ") should be a multiple of 8";
0178 this.baseObject = baseObject;
0179 this.baseOffset = baseOffset;
0180 this.sizeInBytes = sizeInBytes;
0181 }
0182
0183
0184
0185
0186
0187
0188
0189 public void pointTo(byte[] buf, int sizeInBytes) {
0190 pointTo(buf, Platform.BYTE_ARRAY_OFFSET, sizeInBytes);
0191 }
0192
0193 public void setTotalSize(int sizeInBytes) {
0194 assert sizeInBytes % 8 == 0 : "sizeInBytes (" + sizeInBytes + ") should be a multiple of 8";
0195 this.sizeInBytes = sizeInBytes;
0196 }
0197
0198 public void setNotNullAt(int i) {
0199 assertIndexIsValid(i);
0200 BitSetMethods.unset(baseObject, baseOffset, i);
0201 }
0202
0203 @Override
0204 public void setNullAt(int i) {
0205 assertIndexIsValid(i);
0206 BitSetMethods.set(baseObject, baseOffset, i);
0207
0208
0209
0210 Platform.putLong(baseObject, getFieldOffset(i), 0);
0211 }
0212
0213 @Override
0214 public void update(int ordinal, Object value) {
0215 throw new UnsupportedOperationException();
0216 }
0217
0218 @Override
0219 public void setInt(int ordinal, int value) {
0220 assertIndexIsValid(ordinal);
0221 setNotNullAt(ordinal);
0222 Platform.putInt(baseObject, getFieldOffset(ordinal), value);
0223 }
0224
0225 @Override
0226 public void setLong(int ordinal, long value) {
0227 assertIndexIsValid(ordinal);
0228 setNotNullAt(ordinal);
0229 Platform.putLong(baseObject, getFieldOffset(ordinal), value);
0230 }
0231
0232 @Override
0233 public void setDouble(int ordinal, double value) {
0234 assertIndexIsValid(ordinal);
0235 setNotNullAt(ordinal);
0236 Platform.putDouble(baseObject, getFieldOffset(ordinal), value);
0237 }
0238
0239 @Override
0240 public void setBoolean(int ordinal, boolean value) {
0241 assertIndexIsValid(ordinal);
0242 setNotNullAt(ordinal);
0243 Platform.putBoolean(baseObject, getFieldOffset(ordinal), value);
0244 }
0245
0246 @Override
0247 public void setShort(int ordinal, short value) {
0248 assertIndexIsValid(ordinal);
0249 setNotNullAt(ordinal);
0250 Platform.putShort(baseObject, getFieldOffset(ordinal), value);
0251 }
0252
0253 @Override
0254 public void setByte(int ordinal, byte value) {
0255 assertIndexIsValid(ordinal);
0256 setNotNullAt(ordinal);
0257 Platform.putByte(baseObject, getFieldOffset(ordinal), value);
0258 }
0259
0260 @Override
0261 public void setFloat(int ordinal, float value) {
0262 assertIndexIsValid(ordinal);
0263 setNotNullAt(ordinal);
0264 Platform.putFloat(baseObject, getFieldOffset(ordinal), value);
0265 }
0266
0267
0268
0269
0270
0271
0272
0273 @Override
0274 public void setDecimal(int ordinal, Decimal value, int precision) {
0275 assertIndexIsValid(ordinal);
0276 if (precision <= Decimal.MAX_LONG_DIGITS()) {
0277
0278 if (value == null) {
0279 setNullAt(ordinal);
0280 } else {
0281 setLong(ordinal, value.toUnscaledLong());
0282 }
0283 } else {
0284
0285 long cursor = getLong(ordinal) >>> 32;
0286 assert cursor > 0 : "invalid cursor " + cursor;
0287
0288 Platform.putLong(baseObject, baseOffset + cursor, 0L);
0289 Platform.putLong(baseObject, baseOffset + cursor + 8, 0L);
0290
0291 if (value == null) {
0292 setNullAt(ordinal);
0293
0294 Platform.putLong(baseObject, getFieldOffset(ordinal), cursor << 32);
0295 } else {
0296
0297 final BigInteger integer = value.toJavaBigDecimal().unscaledValue();
0298 byte[] bytes = integer.toByteArray();
0299 assert(bytes.length <= 16);
0300
0301
0302 Platform.copyMemory(
0303 bytes, Platform.BYTE_ARRAY_OFFSET, baseObject, baseOffset + cursor, bytes.length);
0304 setLong(ordinal, (cursor << 32) | ((long) bytes.length));
0305 }
0306 }
0307 }
0308
0309 @Override
0310 public void setInterval(int ordinal, CalendarInterval value) {
0311 assertIndexIsValid(ordinal);
0312 long cursor = getLong(ordinal) >>> 32;
0313 assert cursor > 0 : "invalid cursor " + cursor;
0314 if (value == null) {
0315 setNullAt(ordinal);
0316
0317 Platform.putLong(baseObject, baseOffset + cursor, 0L);
0318 Platform.putLong(baseObject, baseOffset + cursor + 8, 0L);
0319
0320 Platform.putLong(baseObject, getFieldOffset(ordinal), (cursor << 32) | 16L);
0321 } else {
0322 Platform.putInt(baseObject, baseOffset + cursor, value.months);
0323 Platform.putInt(baseObject, baseOffset + cursor + 4, value.days);
0324 Platform.putLong(baseObject, baseOffset + cursor + 8, value.microseconds);
0325 setLong(ordinal, (cursor << 32) | 16L);
0326 }
0327 }
0328
0329 @Override
0330 public Object get(int ordinal, DataType dataType) {
0331 return SpecializedGettersReader.read(this, ordinal, dataType, true, true);
0332 }
0333
0334 @Override
0335 public boolean isNullAt(int ordinal) {
0336 assertIndexIsValid(ordinal);
0337 return BitSetMethods.isSet(baseObject, baseOffset, ordinal);
0338 }
0339
0340 @Override
0341 public boolean getBoolean(int ordinal) {
0342 assertIndexIsValid(ordinal);
0343 return Platform.getBoolean(baseObject, getFieldOffset(ordinal));
0344 }
0345
0346 @Override
0347 public byte getByte(int ordinal) {
0348 assertIndexIsValid(ordinal);
0349 return Platform.getByte(baseObject, getFieldOffset(ordinal));
0350 }
0351
0352 @Override
0353 public short getShort(int ordinal) {
0354 assertIndexIsValid(ordinal);
0355 return Platform.getShort(baseObject, getFieldOffset(ordinal));
0356 }
0357
0358 @Override
0359 public int getInt(int ordinal) {
0360 assertIndexIsValid(ordinal);
0361 return Platform.getInt(baseObject, getFieldOffset(ordinal));
0362 }
0363
0364 @Override
0365 public long getLong(int ordinal) {
0366 assertIndexIsValid(ordinal);
0367 return Platform.getLong(baseObject, getFieldOffset(ordinal));
0368 }
0369
0370 @Override
0371 public float getFloat(int ordinal) {
0372 assertIndexIsValid(ordinal);
0373 return Platform.getFloat(baseObject, getFieldOffset(ordinal));
0374 }
0375
0376 @Override
0377 public double getDouble(int ordinal) {
0378 assertIndexIsValid(ordinal);
0379 return Platform.getDouble(baseObject, getFieldOffset(ordinal));
0380 }
0381
0382 @Override
0383 public Decimal getDecimal(int ordinal, int precision, int scale) {
0384 if (isNullAt(ordinal)) {
0385 return null;
0386 }
0387 if (precision <= Decimal.MAX_LONG_DIGITS()) {
0388 return Decimal.createUnsafe(getLong(ordinal), precision, scale);
0389 } else {
0390 byte[] bytes = getBinary(ordinal);
0391 BigInteger bigInteger = new BigInteger(bytes);
0392 BigDecimal javaDecimal = new BigDecimal(bigInteger, scale);
0393 return Decimal.apply(javaDecimal, precision, scale);
0394 }
0395 }
0396
0397 @Override
0398 public UTF8String getUTF8String(int ordinal) {
0399 if (isNullAt(ordinal)) return null;
0400 final long offsetAndSize = getLong(ordinal);
0401 final int offset = (int) (offsetAndSize >> 32);
0402 final int size = (int) offsetAndSize;
0403 return UTF8String.fromAddress(baseObject, baseOffset + offset, size);
0404 }
0405
0406 @Override
0407 public byte[] getBinary(int ordinal) {
0408 if (isNullAt(ordinal)) {
0409 return null;
0410 } else {
0411 final long offsetAndSize = getLong(ordinal);
0412 final int offset = (int) (offsetAndSize >> 32);
0413 final int size = (int) offsetAndSize;
0414 final byte[] bytes = new byte[size];
0415 Platform.copyMemory(
0416 baseObject,
0417 baseOffset + offset,
0418 bytes,
0419 Platform.BYTE_ARRAY_OFFSET,
0420 size
0421 );
0422 return bytes;
0423 }
0424 }
0425
0426 @Override
0427 public CalendarInterval getInterval(int ordinal) {
0428 if (isNullAt(ordinal)) {
0429 return null;
0430 } else {
0431 final long offsetAndSize = getLong(ordinal);
0432 final int offset = (int) (offsetAndSize >> 32);
0433 final int months = Platform.getInt(baseObject, baseOffset + offset);
0434 final int days = Platform.getInt(baseObject, baseOffset + offset + 4);
0435 final long microseconds = Platform.getLong(baseObject, baseOffset + offset + 8);
0436 return new CalendarInterval(months, days, microseconds);
0437 }
0438 }
0439
0440 @Override
0441 public UnsafeRow getStruct(int ordinal, int numFields) {
0442 if (isNullAt(ordinal)) {
0443 return null;
0444 } else {
0445 final long offsetAndSize = getLong(ordinal);
0446 final int offset = (int) (offsetAndSize >> 32);
0447 final int size = (int) offsetAndSize;
0448 final UnsafeRow row = new UnsafeRow(numFields);
0449 row.pointTo(baseObject, baseOffset + offset, size);
0450 return row;
0451 }
0452 }
0453
0454 @Override
0455 public UnsafeArrayData getArray(int ordinal) {
0456 if (isNullAt(ordinal)) {
0457 return null;
0458 } else {
0459 final long offsetAndSize = getLong(ordinal);
0460 final int offset = (int) (offsetAndSize >> 32);
0461 final int size = (int) offsetAndSize;
0462 final UnsafeArrayData array = new UnsafeArrayData();
0463 array.pointTo(baseObject, baseOffset + offset, size);
0464 return array;
0465 }
0466 }
0467
0468 @Override
0469 public UnsafeMapData getMap(int ordinal) {
0470 if (isNullAt(ordinal)) {
0471 return null;
0472 } else {
0473 final long offsetAndSize = getLong(ordinal);
0474 final int offset = (int) (offsetAndSize >> 32);
0475 final int size = (int) offsetAndSize;
0476 final UnsafeMapData map = new UnsafeMapData();
0477 map.pointTo(baseObject, baseOffset + offset, size);
0478 return map;
0479 }
0480 }
0481
0482
0483
0484
0485
0486 @Override
0487 public UnsafeRow copy() {
0488 UnsafeRow rowCopy = new UnsafeRow(numFields);
0489 final byte[] rowDataCopy = new byte[sizeInBytes];
0490 Platform.copyMemory(
0491 baseObject,
0492 baseOffset,
0493 rowDataCopy,
0494 Platform.BYTE_ARRAY_OFFSET,
0495 sizeInBytes
0496 );
0497 rowCopy.pointTo(rowDataCopy, Platform.BYTE_ARRAY_OFFSET, sizeInBytes);
0498 return rowCopy;
0499 }
0500
0501
0502
0503
0504
0505 public static UnsafeRow createFromByteArray(int numBytes, int numFields) {
0506 final UnsafeRow row = new UnsafeRow(numFields);
0507 row.pointTo(new byte[numBytes], numBytes);
0508 return row;
0509 }
0510
0511
0512
0513
0514
0515 public void copyFrom(UnsafeRow row) {
0516
0517 assert (baseObject instanceof byte[]) && baseOffset == Platform.BYTE_ARRAY_OFFSET;
0518 if (row.sizeInBytes > this.sizeInBytes) {
0519
0520 this.baseObject = new byte[row.sizeInBytes];
0521 }
0522 Platform.copyMemory(
0523 row.baseObject, row.baseOffset, this.baseObject, this.baseOffset, row.sizeInBytes);
0524
0525 this.sizeInBytes = row.sizeInBytes;
0526 }
0527
0528
0529
0530
0531
0532
0533
0534
0535
0536 public void writeToStream(OutputStream out, byte[] writeBuffer) throws IOException {
0537 if (baseObject instanceof byte[]) {
0538 int offsetInByteArray = (int) (baseOffset - Platform.BYTE_ARRAY_OFFSET);
0539 out.write((byte[]) baseObject, offsetInByteArray, sizeInBytes);
0540 } else {
0541 int dataRemaining = sizeInBytes;
0542 long rowReadPosition = baseOffset;
0543 while (dataRemaining > 0) {
0544 int toTransfer = Math.min(writeBuffer.length, dataRemaining);
0545 Platform.copyMemory(
0546 baseObject, rowReadPosition, writeBuffer, Platform.BYTE_ARRAY_OFFSET, toTransfer);
0547 out.write(writeBuffer, 0, toTransfer);
0548 rowReadPosition += toTransfer;
0549 dataRemaining -= toTransfer;
0550 }
0551 }
0552 }
0553
0554 @Override
0555 public int hashCode() {
0556 return Murmur3_x86_32.hashUnsafeWords(baseObject, baseOffset, sizeInBytes, 42);
0557 }
0558
0559 @Override
0560 public boolean equals(Object other) {
0561 if (other instanceof UnsafeRow) {
0562 UnsafeRow o = (UnsafeRow) other;
0563 return (sizeInBytes == o.sizeInBytes) &&
0564 ByteArrayMethods.arrayEquals(baseObject, baseOffset, o.baseObject, o.baseOffset,
0565 sizeInBytes);
0566 }
0567 return false;
0568 }
0569
0570
0571
0572
0573 public byte[] getBytes() {
0574 return UnsafeDataUtils.getBytes(baseObject, baseOffset, sizeInBytes);
0575 }
0576
0577
0578 @Override
0579 public String toString() {
0580 StringBuilder build = new StringBuilder("[");
0581 for (int i = 0; i < sizeInBytes; i += 8) {
0582 if (i != 0) build.append(',');
0583 build.append(java.lang.Long.toHexString(Platform.getLong(baseObject, baseOffset + i)));
0584 }
0585 build.append(']');
0586 return build.toString();
0587 }
0588
0589 @Override
0590 public boolean anyNull() {
0591 return BitSetMethods.anySet(baseObject, baseOffset, bitSetWidthInBytes / 8);
0592 }
0593
0594
0595
0596
0597
0598
0599 public void writeToMemory(Object target, long targetOffset) {
0600 Platform.copyMemory(baseObject, baseOffset, target, targetOffset, sizeInBytes);
0601 }
0602
0603 public void writeTo(ByteBuffer buffer) {
0604 assert (buffer.hasArray());
0605 byte[] target = buffer.array();
0606 int offset = buffer.arrayOffset();
0607 int pos = buffer.position();
0608 writeToMemory(target, Platform.BYTE_ARRAY_OFFSET + offset + pos);
0609 buffer.position(pos + sizeInBytes);
0610 }
0611
0612
0613
0614
0615
0616
0617 public void writeFieldTo(int ordinal, ByteBuffer buffer) {
0618 final long offsetAndSize = getLong(ordinal);
0619 final int offset = (int) (offsetAndSize >> 32);
0620 final int size = (int) offsetAndSize;
0621
0622 buffer.putInt(size);
0623 int pos = buffer.position();
0624 buffer.position(pos + size);
0625 Platform.copyMemory(
0626 baseObject,
0627 baseOffset + offset,
0628 buffer.array(),
0629 Platform.BYTE_ARRAY_OFFSET + buffer.arrayOffset() + pos,
0630 size);
0631 }
0632
0633 @Override
0634 public void writeExternal(ObjectOutput out) throws IOException {
0635 byte[] bytes = getBytes();
0636 out.writeInt(bytes.length);
0637 out.writeInt(this.numFields);
0638 out.write(bytes);
0639 }
0640
0641 @Override
0642 public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
0643 this.baseOffset = BYTE_ARRAY_OFFSET;
0644 this.sizeInBytes = in.readInt();
0645 this.numFields = in.readInt();
0646 this.bitSetWidthInBytes = calculateBitSetWidthInBytes(numFields);
0647 this.baseObject = new byte[sizeInBytes];
0648 in.readFully((byte[]) baseObject);
0649 }
0650
0651 @Override
0652 public void write(Kryo kryo, Output out) {
0653 byte[] bytes = getBytes();
0654 out.writeInt(bytes.length);
0655 out.writeInt(this.numFields);
0656 out.write(bytes);
0657 }
0658
0659 @Override
0660 public void read(Kryo kryo, Input in) {
0661 this.baseOffset = BYTE_ARRAY_OFFSET;
0662 this.sizeInBytes = in.readInt();
0663 this.numFields = in.readInt();
0664 this.bitSetWidthInBytes = calculateBitSetWidthInBytes(numFields);
0665 this.baseObject = new byte[sizeInBytes];
0666 in.read((byte[]) baseObject);
0667 }
0668 }