Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 package org.apache.spark.sql.execution.vectorized;
0018 
0019 import java.nio.ByteBuffer;
0020 import java.nio.ByteOrder;
0021 
0022 import com.google.common.annotations.VisibleForTesting;
0023 
0024 import org.apache.spark.sql.types.*;
0025 import org.apache.spark.unsafe.Platform;
0026 import org.apache.spark.unsafe.types.UTF8String;
0027 
0028 /**
0029  * Column data backed using offheap memory.
0030  */
0031 public final class OffHeapColumnVector extends WritableColumnVector {
0032 
0033   private static final boolean bigEndianPlatform =
0034     ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN);
0035 
0036   /**
0037    * Allocates columns to store elements of each field of the schema off heap.
0038    * Capacity is the initial capacity of the vector and it will grow as necessary. Capacity is
0039    * in number of elements, not number of bytes.
0040    */
0041   public static OffHeapColumnVector[] allocateColumns(int capacity, StructType schema) {
0042     return allocateColumns(capacity, schema.fields());
0043   }
0044 
0045   /**
0046    * Allocates columns to store elements of each field off heap.
0047    * Capacity is the initial capacity of the vector and it will grow as necessary. Capacity is
0048    * in number of elements, not number of bytes.
0049    */
0050   public static OffHeapColumnVector[] allocateColumns(int capacity, StructField[] fields) {
0051     OffHeapColumnVector[] vectors = new OffHeapColumnVector[fields.length];
0052     for (int i = 0; i < fields.length; i++) {
0053       vectors[i] = new OffHeapColumnVector(capacity, fields[i].dataType());
0054     }
0055     return vectors;
0056   }
0057 
0058   // The data stored in these two allocations need to maintain binary compatible. We can
0059   // directly pass this buffer to external components.
0060   private long nulls;
0061   private long data;
0062 
0063   // Only set if type is Array or Map.
0064   private long lengthData;
0065   private long offsetData;
0066 
0067   public OffHeapColumnVector(int capacity, DataType type) {
0068     super(capacity, type);
0069 
0070     nulls = 0;
0071     data = 0;
0072     lengthData = 0;
0073     offsetData = 0;
0074 
0075     reserveInternal(capacity);
0076     reset();
0077   }
0078 
0079   /**
0080    * Returns the off heap pointer for the values buffer.
0081    */
0082   @VisibleForTesting
0083   public long valuesNativeAddress() {
0084     return data;
0085   }
0086 
0087   @Override
0088   public void close() {
0089     super.close();
0090     Platform.freeMemory(nulls);
0091     Platform.freeMemory(data);
0092     Platform.freeMemory(lengthData);
0093     Platform.freeMemory(offsetData);
0094     nulls = 0;
0095     data = 0;
0096     lengthData = 0;
0097     offsetData = 0;
0098   }
0099 
0100   //
0101   // APIs dealing with nulls
0102   //
0103 
0104   @Override
0105   public void putNotNull(int rowId) {
0106     Platform.putByte(null, nulls + rowId, (byte) 0);
0107   }
0108 
0109   @Override
0110   public void putNull(int rowId) {
0111     Platform.putByte(null, nulls + rowId, (byte) 1);
0112     ++numNulls;
0113   }
0114 
0115   @Override
0116   public void putNulls(int rowId, int count) {
0117     long offset = nulls + rowId;
0118     for (int i = 0; i < count; ++i, ++offset) {
0119       Platform.putByte(null, offset, (byte) 1);
0120     }
0121     numNulls += count;
0122   }
0123 
0124   @Override
0125   public void putNotNulls(int rowId, int count) {
0126     if (!hasNull()) return;
0127     long offset = nulls + rowId;
0128     for (int i = 0; i < count; ++i, ++offset) {
0129       Platform.putByte(null, offset, (byte) 0);
0130     }
0131   }
0132 
0133   @Override
0134   public boolean isNullAt(int rowId) {
0135     return Platform.getByte(null, nulls + rowId) == 1;
0136   }
0137 
0138   //
0139   // APIs dealing with Booleans
0140   //
0141 
0142   @Override
0143   public void putBoolean(int rowId, boolean value) {
0144     Platform.putByte(null, data + rowId, (byte)((value) ? 1 : 0));
0145   }
0146 
0147   @Override
0148   public void putBooleans(int rowId, int count, boolean value) {
0149     byte v = (byte)((value) ? 1 : 0);
0150     for (int i = 0; i < count; ++i) {
0151       Platform.putByte(null, data + rowId + i, v);
0152     }
0153   }
0154 
0155   @Override
0156   public boolean getBoolean(int rowId) { return Platform.getByte(null, data + rowId) == 1; }
0157 
0158   @Override
0159   public boolean[] getBooleans(int rowId, int count) {
0160     assert(dictionary == null);
0161     boolean[] array = new boolean[count];
0162     for (int i = 0; i < count; ++i) {
0163       array[i] = (Platform.getByte(null, data + rowId + i) == 1);
0164     }
0165     return array;
0166   }
0167 
0168   //
0169   // APIs dealing with Bytes
0170   //
0171 
0172   @Override
0173   public void putByte(int rowId, byte value) {
0174     Platform.putByte(null, data + rowId, value);
0175 
0176   }
0177 
0178   @Override
0179   public void putBytes(int rowId, int count, byte value) {
0180     for (int i = 0; i < count; ++i) {
0181       Platform.putByte(null, data + rowId + i, value);
0182     }
0183   }
0184 
0185   @Override
0186   public void putBytes(int rowId, int count, byte[] src, int srcIndex) {
0187     Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex, null, data + rowId, count);
0188   }
0189 
0190   @Override
0191   public byte getByte(int rowId) {
0192     if (dictionary == null) {
0193       return Platform.getByte(null, data + rowId);
0194     } else {
0195       return (byte) dictionary.decodeToInt(dictionaryIds.getDictId(rowId));
0196     }
0197   }
0198 
0199   @Override
0200   public byte[] getBytes(int rowId, int count) {
0201     assert(dictionary == null);
0202     byte[] array = new byte[count];
0203     Platform.copyMemory(null, data + rowId, array, Platform.BYTE_ARRAY_OFFSET, count);
0204     return array;
0205   }
0206 
0207   @Override
0208   protected UTF8String getBytesAsUTF8String(int rowId, int count) {
0209     return UTF8String.fromAddress(null, data + rowId, count);
0210   }
0211 
0212   //
0213   // APIs dealing with shorts
0214   //
0215 
0216   @Override
0217   public void putShort(int rowId, short value) {
0218     Platform.putShort(null, data + 2L * rowId, value);
0219   }
0220 
0221   @Override
0222   public void putShorts(int rowId, int count, short value) {
0223     long offset = data + 2L * rowId;
0224     for (int i = 0; i < count; ++i, offset += 2) {
0225       Platform.putShort(null, offset, value);
0226     }
0227   }
0228 
0229   @Override
0230   public void putShorts(int rowId, int count, short[] src, int srcIndex) {
0231     Platform.copyMemory(src, Platform.SHORT_ARRAY_OFFSET + srcIndex * 2L,
0232         null, data + 2L * rowId, count * 2L);
0233   }
0234 
0235   @Override
0236   public void putShorts(int rowId, int count, byte[] src, int srcIndex) {
0237     Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex,
0238       null, data + rowId * 2L, count * 2L);
0239   }
0240 
0241   @Override
0242   public short getShort(int rowId) {
0243     if (dictionary == null) {
0244       return Platform.getShort(null, data + 2L * rowId);
0245     } else {
0246       return (short) dictionary.decodeToInt(dictionaryIds.getDictId(rowId));
0247     }
0248   }
0249 
0250   @Override
0251   public short[] getShorts(int rowId, int count) {
0252     assert(dictionary == null);
0253     short[] array = new short[count];
0254     Platform.copyMemory(null, data + rowId * 2L, array, Platform.SHORT_ARRAY_OFFSET, count * 2L);
0255     return array;
0256   }
0257 
0258   //
0259   // APIs dealing with ints
0260   //
0261 
0262   @Override
0263   public void putInt(int rowId, int value) {
0264     Platform.putInt(null, data + 4L * rowId, value);
0265   }
0266 
0267   @Override
0268   public void putInts(int rowId, int count, int value) {
0269     long offset = data + 4L * rowId;
0270     for (int i = 0; i < count; ++i, offset += 4) {
0271       Platform.putInt(null, offset, value);
0272     }
0273   }
0274 
0275   @Override
0276   public void putInts(int rowId, int count, int[] src, int srcIndex) {
0277     Platform.copyMemory(src, Platform.INT_ARRAY_OFFSET + srcIndex * 4L,
0278         null, data + 4L * rowId, count * 4L);
0279   }
0280 
0281   @Override
0282   public void putInts(int rowId, int count, byte[] src, int srcIndex) {
0283     Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex,
0284       null, data + rowId * 4L, count * 4L);
0285   }
0286 
0287   @Override
0288   public void putIntsLittleEndian(int rowId, int count, byte[] src, int srcIndex) {
0289     if (!bigEndianPlatform) {
0290       Platform.copyMemory(src, srcIndex + Platform.BYTE_ARRAY_OFFSET,
0291           null, data + 4L * rowId, count * 4L);
0292     } else {
0293       int srcOffset = srcIndex + Platform.BYTE_ARRAY_OFFSET;
0294       long offset = data + 4L * rowId;
0295       for (int i = 0; i < count; ++i, offset += 4, srcOffset += 4) {
0296         Platform.putInt(null, offset,
0297             java.lang.Integer.reverseBytes(Platform.getInt(src, srcOffset)));
0298       }
0299     }
0300   }
0301 
0302   @Override
0303   public int getInt(int rowId) {
0304     if (dictionary == null) {
0305       return Platform.getInt(null, data + 4L * rowId);
0306     } else {
0307       return dictionary.decodeToInt(dictionaryIds.getDictId(rowId));
0308     }
0309   }
0310 
0311   @Override
0312   public int[] getInts(int rowId, int count) {
0313     assert(dictionary == null);
0314     int[] array = new int[count];
0315     Platform.copyMemory(null, data + rowId * 4L, array, Platform.INT_ARRAY_OFFSET, count * 4L);
0316     return array;
0317   }
0318 
0319   /**
0320    * Returns the dictionary Id for rowId.
0321    * This should only be called when the ColumnVector is dictionaryIds.
0322    * We have this separate method for dictionaryIds as per SPARK-16928.
0323    */
0324   public int getDictId(int rowId) {
0325     assert(dictionary == null)
0326             : "A ColumnVector dictionary should not have a dictionary for itself.";
0327     return Platform.getInt(null, data + 4L * rowId);
0328   }
0329 
0330   //
0331   // APIs dealing with Longs
0332   //
0333 
0334   @Override
0335   public void putLong(int rowId, long value) {
0336     Platform.putLong(null, data + 8L * rowId, value);
0337   }
0338 
0339   @Override
0340   public void putLongs(int rowId, int count, long value) {
0341     long offset = data + 8L * rowId;
0342     for (int i = 0; i < count; ++i, offset += 8) {
0343       Platform.putLong(null, offset, value);
0344     }
0345   }
0346 
0347   @Override
0348   public void putLongs(int rowId, int count, long[] src, int srcIndex) {
0349     Platform.copyMemory(src, Platform.LONG_ARRAY_OFFSET + srcIndex * 8L,
0350         null, data + 8L * rowId, count * 8L);
0351   }
0352 
0353   @Override
0354   public void putLongs(int rowId, int count, byte[] src, int srcIndex) {
0355     Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex,
0356       null, data + rowId * 8L, count * 8L);
0357   }
0358 
0359   @Override
0360   public void putLongsLittleEndian(int rowId, int count, byte[] src, int srcIndex) {
0361     if (!bigEndianPlatform) {
0362       Platform.copyMemory(src, srcIndex + Platform.BYTE_ARRAY_OFFSET,
0363           null, data + 8L * rowId, count * 8L);
0364     } else {
0365       int srcOffset = srcIndex + Platform.BYTE_ARRAY_OFFSET;
0366       long offset = data + 8L * rowId;
0367       for (int i = 0; i < count; ++i, offset += 8, srcOffset += 8) {
0368         Platform.putLong(null, offset,
0369             java.lang.Long.reverseBytes(Platform.getLong(src, srcOffset)));
0370       }
0371     }
0372   }
0373 
0374   @Override
0375   public long getLong(int rowId) {
0376     if (dictionary == null) {
0377       return Platform.getLong(null, data + 8L * rowId);
0378     } else {
0379       return dictionary.decodeToLong(dictionaryIds.getDictId(rowId));
0380     }
0381   }
0382 
0383   @Override
0384   public long[] getLongs(int rowId, int count) {
0385     assert(dictionary == null);
0386     long[] array = new long[count];
0387     Platform.copyMemory(null, data + rowId * 8L, array, Platform.LONG_ARRAY_OFFSET, count * 8L);
0388     return array;
0389   }
0390 
0391   //
0392   // APIs dealing with floats
0393   //
0394 
0395   @Override
0396   public void putFloat(int rowId, float value) {
0397     Platform.putFloat(null, data + rowId * 4L, value);
0398   }
0399 
0400   @Override
0401   public void putFloats(int rowId, int count, float value) {
0402     long offset = data + 4L * rowId;
0403     for (int i = 0; i < count; ++i, offset += 4) {
0404       Platform.putFloat(null, offset, value);
0405     }
0406   }
0407 
0408   @Override
0409   public void putFloats(int rowId, int count, float[] src, int srcIndex) {
0410     Platform.copyMemory(src, Platform.FLOAT_ARRAY_OFFSET + srcIndex * 4L,
0411         null, data + 4L * rowId, count * 4L);
0412   }
0413 
0414   @Override
0415   public void putFloats(int rowId, int count, byte[] src, int srcIndex) {
0416     if (!bigEndianPlatform) {
0417       Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex,
0418           null, data + rowId * 4L, count * 4L);
0419     } else {
0420       ByteBuffer bb = ByteBuffer.wrap(src).order(ByteOrder.BIG_ENDIAN);
0421       long offset = data + 4L * rowId;
0422       for (int i = 0; i < count; ++i, offset += 4) {
0423         Platform.putFloat(null, offset, bb.getFloat(srcIndex + (4 * i)));
0424       }
0425     }
0426   }
0427 
0428   @Override
0429   public float getFloat(int rowId) {
0430     if (dictionary == null) {
0431       return Platform.getFloat(null, data + rowId * 4L);
0432     } else {
0433       return dictionary.decodeToFloat(dictionaryIds.getDictId(rowId));
0434     }
0435   }
0436 
0437   @Override
0438   public float[] getFloats(int rowId, int count) {
0439     assert(dictionary == null);
0440     float[] array = new float[count];
0441     Platform.copyMemory(null, data + rowId * 4L, array, Platform.FLOAT_ARRAY_OFFSET, count * 4L);
0442     return array;
0443   }
0444 
0445 
0446   //
0447   // APIs dealing with doubles
0448   //
0449 
0450   @Override
0451   public void putDouble(int rowId, double value) {
0452     Platform.putDouble(null, data + rowId * 8L, value);
0453   }
0454 
0455   @Override
0456   public void putDoubles(int rowId, int count, double value) {
0457     long offset = data + 8L * rowId;
0458     for (int i = 0; i < count; ++i, offset += 8) {
0459       Platform.putDouble(null, offset, value);
0460     }
0461   }
0462 
0463   @Override
0464   public void putDoubles(int rowId, int count, double[] src, int srcIndex) {
0465     Platform.copyMemory(src, Platform.DOUBLE_ARRAY_OFFSET + srcIndex * 8L,
0466       null, data + 8L * rowId, count * 8L);
0467   }
0468 
0469   @Override
0470   public void putDoubles(int rowId, int count, byte[] src, int srcIndex) {
0471     if (!bigEndianPlatform) {
0472       Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex,
0473         null, data + rowId * 8L, count * 8L);
0474     } else {
0475       ByteBuffer bb = ByteBuffer.wrap(src).order(ByteOrder.BIG_ENDIAN);
0476       long offset = data + 8L * rowId;
0477       for (int i = 0; i < count; ++i, offset += 8) {
0478         Platform.putDouble(null, offset, bb.getDouble(srcIndex + (8 * i)));
0479       }
0480     }
0481   }
0482 
0483   @Override
0484   public double getDouble(int rowId) {
0485     if (dictionary == null) {
0486       return Platform.getDouble(null, data + rowId * 8L);
0487     } else {
0488       return dictionary.decodeToDouble(dictionaryIds.getDictId(rowId));
0489     }
0490   }
0491 
0492   @Override
0493   public double[] getDoubles(int rowId, int count) {
0494     assert(dictionary == null);
0495     double[] array = new double[count];
0496     Platform.copyMemory(null, data + rowId * 8L, array, Platform.DOUBLE_ARRAY_OFFSET, count * 8L);
0497     return array;
0498   }
0499 
0500   //
0501   // APIs dealing with Arrays.
0502   //
0503   @Override
0504   public void putArray(int rowId, int offset, int length) {
0505     assert(offset >= 0 && offset + length <= childColumns[0].capacity);
0506     Platform.putInt(null, lengthData + 4L * rowId, length);
0507     Platform.putInt(null, offsetData + 4L * rowId, offset);
0508   }
0509 
0510   @Override
0511   public int getArrayLength(int rowId) {
0512     return Platform.getInt(null, lengthData + 4L * rowId);
0513   }
0514 
0515   @Override
0516   public int getArrayOffset(int rowId) {
0517     return Platform.getInt(null, offsetData + 4L * rowId);
0518   }
0519 
0520   // APIs dealing with ByteArrays
0521   @Override
0522   public int putByteArray(int rowId, byte[] value, int offset, int length) {
0523     int result = arrayData().appendBytes(length, value, offset);
0524     Platform.putInt(null, lengthData + 4L * rowId, length);
0525     Platform.putInt(null, offsetData + 4L * rowId, result);
0526     return result;
0527   }
0528 
0529   // Split out the slow path.
0530   @Override
0531   protected void reserveInternal(int newCapacity) {
0532     int oldCapacity = (nulls == 0L) ? 0 : capacity;
0533     if (isArray() || type instanceof MapType) {
0534       this.lengthData =
0535           Platform.reallocateMemory(lengthData, oldCapacity * 4L, newCapacity * 4L);
0536       this.offsetData =
0537           Platform.reallocateMemory(offsetData, oldCapacity * 4L, newCapacity * 4L);
0538     } else if (type instanceof ByteType || type instanceof BooleanType) {
0539       this.data = Platform.reallocateMemory(data, oldCapacity, newCapacity);
0540     } else if (type instanceof ShortType) {
0541       this.data = Platform.reallocateMemory(data, oldCapacity * 2L, newCapacity * 2L);
0542     } else if (type instanceof IntegerType || type instanceof FloatType ||
0543         type instanceof DateType || DecimalType.is32BitDecimalType(type)) {
0544       this.data = Platform.reallocateMemory(data, oldCapacity * 4L, newCapacity * 4L);
0545     } else if (type instanceof LongType || type instanceof DoubleType ||
0546         DecimalType.is64BitDecimalType(type) || type instanceof TimestampType) {
0547       this.data = Platform.reallocateMemory(data, oldCapacity * 8L, newCapacity * 8L);
0548     } else if (childColumns != null) {
0549       // Nothing to store.
0550     } else {
0551       throw new RuntimeException("Unhandled " + type);
0552     }
0553     this.nulls = Platform.reallocateMemory(nulls, oldCapacity, newCapacity);
0554     Platform.setMemory(nulls + oldCapacity, (byte)0, newCapacity - oldCapacity);
0555     capacity = newCapacity;
0556   }
0557 
0558   @Override
0559   protected OffHeapColumnVector reserveNewColumn(int capacity, DataType type) {
0560     return new OffHeapColumnVector(capacity, type);
0561   }
0562 }