0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.sql.catalyst.expressions;
0019
0020 import java.io.Externalizable;
0021 import java.io.IOException;
0022 import java.io.ObjectInput;
0023 import java.io.ObjectOutput;
0024 import java.nio.ByteBuffer;
0025
0026 import com.esotericsoftware.kryo.Kryo;
0027 import com.esotericsoftware.kryo.KryoSerializable;
0028 import com.esotericsoftware.kryo.io.Input;
0029 import com.esotericsoftware.kryo.io.Output;
0030
0031 import org.apache.spark.sql.catalyst.util.MapData;
0032 import org.apache.spark.unsafe.Platform;
0033
0034 import static org.apache.spark.unsafe.Platform.BYTE_ARRAY_OFFSET;
0035
0036
0037
0038
0039
0040
0041
0042
0043
0044
0045
0046
0047 public final class UnsafeMapData extends MapData implements Externalizable, KryoSerializable {
0048
0049 private Object baseObject;
0050 private long baseOffset;
0051
0052
0053
0054
0055 private int sizeInBytes;
0056
0057 public Object getBaseObject() { return baseObject; }
0058 public long getBaseOffset() { return baseOffset; }
0059 public int getSizeInBytes() { return sizeInBytes; }
0060
0061 private final UnsafeArrayData keys;
0062 private final UnsafeArrayData values;
0063
0064
0065
0066
0067
0068
0069 public UnsafeMapData() {
0070 keys = new UnsafeArrayData();
0071 values = new UnsafeArrayData();
0072 }
0073
0074
0075
0076
0077
0078
0079
0080
0081 public void pointTo(Object baseObject, long baseOffset, int sizeInBytes) {
0082
0083 final long keyArraySize = Platform.getLong(baseObject, baseOffset);
0084 assert keyArraySize >= 0 : "keyArraySize (" + keyArraySize + ") should >= 0";
0085 assert keyArraySize <= Integer.MAX_VALUE :
0086 "keyArraySize (" + keyArraySize + ") should <= Integer.MAX_VALUE";
0087 final int valueArraySize = sizeInBytes - (int)keyArraySize - 8;
0088 assert valueArraySize >= 0 : "valueArraySize (" + valueArraySize + ") should >= 0";
0089
0090 keys.pointTo(baseObject, baseOffset + 8, (int)keyArraySize);
0091 values.pointTo(baseObject, baseOffset + 8 + keyArraySize, valueArraySize);
0092
0093 assert keys.numElements() == values.numElements();
0094
0095 this.baseObject = baseObject;
0096 this.baseOffset = baseOffset;
0097 this.sizeInBytes = sizeInBytes;
0098 }
0099
0100 @Override
0101 public int numElements() {
0102 return keys.numElements();
0103 }
0104
0105 @Override
0106 public UnsafeArrayData keyArray() {
0107 return keys;
0108 }
0109
0110 @Override
0111 public UnsafeArrayData valueArray() {
0112 return values;
0113 }
0114
0115 public void writeToMemory(Object target, long targetOffset) {
0116 Platform.copyMemory(baseObject, baseOffset, target, targetOffset, sizeInBytes);
0117 }
0118
0119 public void writeTo(ByteBuffer buffer) {
0120 assert(buffer.hasArray());
0121 byte[] target = buffer.array();
0122 int offset = buffer.arrayOffset();
0123 int pos = buffer.position();
0124 writeToMemory(target, Platform.BYTE_ARRAY_OFFSET + offset + pos);
0125 buffer.position(pos + sizeInBytes);
0126 }
0127
0128 @Override
0129 public UnsafeMapData copy() {
0130 UnsafeMapData mapCopy = new UnsafeMapData();
0131 final byte[] mapDataCopy = new byte[sizeInBytes];
0132 Platform.copyMemory(
0133 baseObject, baseOffset, mapDataCopy, Platform.BYTE_ARRAY_OFFSET, sizeInBytes);
0134 mapCopy.pointTo(mapDataCopy, Platform.BYTE_ARRAY_OFFSET, sizeInBytes);
0135 return mapCopy;
0136 }
0137
0138 @Override
0139 public void writeExternal(ObjectOutput out) throws IOException {
0140 byte[] bytes = UnsafeDataUtils.getBytes(baseObject, baseOffset, sizeInBytes);
0141 out.writeInt(bytes.length);
0142 out.write(bytes);
0143 }
0144
0145 @Override
0146 public void readExternal(ObjectInput in) throws IOException {
0147 this.baseOffset = BYTE_ARRAY_OFFSET;
0148 this.sizeInBytes = in.readInt();
0149 this.baseObject = new byte[sizeInBytes];
0150 in.readFully((byte[]) baseObject);
0151 pointTo(baseObject, baseOffset, sizeInBytes);
0152 }
0153
0154 @Override
0155 public void write(Kryo kryo, Output output) {
0156 byte[] bytes = UnsafeDataUtils.getBytes(baseObject, baseOffset, sizeInBytes);
0157 output.writeInt(bytes.length);
0158 output.write(bytes);
0159 }
0160
0161 @Override
0162 public void read(Kryo kryo, Input input) {
0163 this.baseOffset = BYTE_ARRAY_OFFSET;
0164 this.sizeInBytes = input.readInt();
0165 this.baseObject = new byte[sizeInBytes];
0166 input.read((byte[]) baseObject);
0167 pointTo(baseObject, baseOffset, sizeInBytes);
0168 }
0169 }