Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 package org.apache.spark.sql.execution.vectorized;
0018 
0019 import java.nio.ByteBuffer;
0020 import java.nio.ByteOrder;
0021 import java.util.Arrays;
0022 
0023 import org.apache.spark.sql.types.*;
0024 import org.apache.spark.unsafe.Platform;
0025 import org.apache.spark.unsafe.types.UTF8String;
0026 
0027 /**
0028  * A column backed by an in memory JVM array. This stores the NULLs as a byte per value
0029  * and a java array for the values.
0030  */
0031 public final class OnHeapColumnVector extends WritableColumnVector {
0032 
0033   private static final boolean bigEndianPlatform =
0034     ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN);
0035 
0036   /**
0037    * Allocates columns to store elements of each field of the schema on heap.
0038    * Capacity is the initial capacity of the vector and it will grow as necessary. Capacity is
0039    * in number of elements, not number of bytes.
0040    */
0041   public static OnHeapColumnVector[] allocateColumns(int capacity, StructType schema) {
0042     return allocateColumns(capacity, schema.fields());
0043   }
0044 
0045   /**
0046    * Allocates columns to store elements of each field on heap.
0047    * Capacity is the initial capacity of the vector and it will grow as necessary. Capacity is
0048    * in number of elements, not number of bytes.
0049    */
0050   public static OnHeapColumnVector[] allocateColumns(int capacity, StructField[] fields) {
0051     OnHeapColumnVector[] vectors = new OnHeapColumnVector[fields.length];
0052     for (int i = 0; i < fields.length; i++) {
0053       vectors[i] = new OnHeapColumnVector(capacity, fields[i].dataType());
0054     }
0055     return vectors;
0056   }
0057 
0058   // The data stored in these arrays need to maintain binary compatible. We can
0059   // directly pass this buffer to external components.
0060 
0061   // This is faster than a boolean array and we optimize this over memory footprint.
0062   private byte[] nulls;
0063 
0064   // Array for each type. Only 1 is populated for any type.
0065   private byte[] byteData;
0066   private short[] shortData;
0067   private int[] intData;
0068   private long[] longData;
0069   private float[] floatData;
0070   private double[] doubleData;
0071 
0072   // Only set if type is Array or Map.
0073   private int[] arrayLengths;
0074   private int[] arrayOffsets;
0075 
0076   public OnHeapColumnVector(int capacity, DataType type) {
0077     super(capacity, type);
0078 
0079     reserveInternal(capacity);
0080     reset();
0081   }
0082 
0083   @Override
0084   public void close() {
0085     super.close();
0086     nulls = null;
0087     byteData = null;
0088     shortData = null;
0089     intData = null;
0090     longData = null;
0091     floatData = null;
0092     doubleData = null;
0093     arrayLengths = null;
0094     arrayOffsets = null;
0095   }
0096 
0097   //
0098   // APIs dealing with nulls
0099   //
0100 
0101   @Override
0102   public void putNotNull(int rowId) {
0103     nulls[rowId] = (byte)0;
0104   }
0105 
0106   @Override
0107   public void putNull(int rowId) {
0108     nulls[rowId] = (byte)1;
0109     ++numNulls;
0110   }
0111 
0112   @Override
0113   public void putNulls(int rowId, int count) {
0114     for (int i = 0; i < count; ++i) {
0115       nulls[rowId + i] = (byte)1;
0116     }
0117     numNulls += count;
0118   }
0119 
0120   @Override
0121   public void putNotNulls(int rowId, int count) {
0122     if (!hasNull()) return;
0123     for (int i = 0; i < count; ++i) {
0124       nulls[rowId + i] = (byte)0;
0125     }
0126   }
0127 
0128   @Override
0129   public boolean isNullAt(int rowId) {
0130     return nulls[rowId] == 1;
0131   }
0132 
0133   //
0134   // APIs dealing with Booleans
0135   //
0136 
0137   @Override
0138   public void putBoolean(int rowId, boolean value) {
0139     byteData[rowId] = (byte)((value) ? 1 : 0);
0140   }
0141 
0142   @Override
0143   public void putBooleans(int rowId, int count, boolean value) {
0144     byte v = (byte)((value) ? 1 : 0);
0145     for (int i = 0; i < count; ++i) {
0146       byteData[i + rowId] = v;
0147     }
0148   }
0149 
0150   @Override
0151   public boolean getBoolean(int rowId) {
0152     return byteData[rowId] == 1;
0153   }
0154 
0155   @Override
0156   public boolean[] getBooleans(int rowId, int count) {
0157     assert(dictionary == null);
0158     boolean[] array = new boolean[count];
0159     for (int i = 0; i < count; ++i) {
0160       array[i] = (byteData[rowId + i] == 1);
0161     }
0162    return array;
0163   }
0164 
0165   //
0166 
0167   //
0168   // APIs dealing with Bytes
0169   //
0170 
0171   @Override
0172   public void putByte(int rowId, byte value) {
0173     byteData[rowId] = value;
0174   }
0175 
0176   @Override
0177   public void putBytes(int rowId, int count, byte value) {
0178     for (int i = 0; i < count; ++i) {
0179       byteData[i + rowId] = value;
0180     }
0181   }
0182 
0183   @Override
0184   public void putBytes(int rowId, int count, byte[] src, int srcIndex) {
0185     System.arraycopy(src, srcIndex, byteData, rowId, count);
0186   }
0187 
0188   @Override
0189   public byte getByte(int rowId) {
0190     if (dictionary == null) {
0191       return byteData[rowId];
0192     } else {
0193       return (byte) dictionary.decodeToInt(dictionaryIds.getDictId(rowId));
0194     }
0195   }
0196 
0197   @Override
0198   public byte[] getBytes(int rowId, int count) {
0199     assert(dictionary == null);
0200     byte[] array = new byte[count];
0201     System.arraycopy(byteData, rowId, array, 0, count);
0202     return array;
0203   }
0204 
0205   @Override
0206   protected UTF8String getBytesAsUTF8String(int rowId, int count) {
0207     return UTF8String.fromBytes(byteData, rowId, count);
0208   }
0209 
0210   //
0211   // APIs dealing with Shorts
0212   //
0213 
0214   @Override
0215   public void putShort(int rowId, short value) {
0216     shortData[rowId] = value;
0217   }
0218 
0219   @Override
0220   public void putShorts(int rowId, int count, short value) {
0221     for (int i = 0; i < count; ++i) {
0222       shortData[i + rowId] = value;
0223     }
0224   }
0225 
0226   @Override
0227   public void putShorts(int rowId, int count, short[] src, int srcIndex) {
0228     System.arraycopy(src, srcIndex, shortData, rowId, count);
0229   }
0230 
0231   @Override
0232   public void putShorts(int rowId, int count, byte[] src, int srcIndex) {
0233     Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex, shortData,
0234       Platform.SHORT_ARRAY_OFFSET + rowId * 2L, count * 2L);
0235   }
0236 
0237   @Override
0238   public short getShort(int rowId) {
0239     if (dictionary == null) {
0240       return shortData[rowId];
0241     } else {
0242       return (short) dictionary.decodeToInt(dictionaryIds.getDictId(rowId));
0243     }
0244   }
0245 
0246   @Override
0247   public short[] getShorts(int rowId, int count) {
0248     assert(dictionary == null);
0249     short[] array = new short[count];
0250     System.arraycopy(shortData, rowId, array, 0, count);
0251     return array;
0252   }
0253 
0254 
0255   //
0256   // APIs dealing with Ints
0257   //
0258 
0259   @Override
0260   public void putInt(int rowId, int value) {
0261     intData[rowId] = value;
0262   }
0263 
0264   @Override
0265   public void putInts(int rowId, int count, int value) {
0266     for (int i = 0; i < count; ++i) {
0267       intData[i + rowId] = value;
0268     }
0269   }
0270 
0271   @Override
0272   public void putInts(int rowId, int count, int[] src, int srcIndex) {
0273     System.arraycopy(src, srcIndex, intData, rowId, count);
0274   }
0275 
0276   @Override
0277   public void putInts(int rowId, int count, byte[] src, int srcIndex) {
0278     Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex, intData,
0279       Platform.INT_ARRAY_OFFSET + rowId * 4L, count * 4L);
0280   }
0281 
0282   @Override
0283   public void putIntsLittleEndian(int rowId, int count, byte[] src, int srcIndex) {
0284     int srcOffset = srcIndex + Platform.BYTE_ARRAY_OFFSET;
0285     for (int i = 0; i < count; ++i, srcOffset += 4) {
0286       intData[i + rowId] = Platform.getInt(src, srcOffset);
0287       if (bigEndianPlatform) {
0288         intData[i + rowId] = java.lang.Integer.reverseBytes(intData[i + rowId]);
0289       }
0290     }
0291   }
0292 
0293   @Override
0294   public int getInt(int rowId) {
0295     if (dictionary == null) {
0296       return intData[rowId];
0297     } else {
0298       return dictionary.decodeToInt(dictionaryIds.getDictId(rowId));
0299     }
0300   }
0301 
0302   @Override
0303   public int[] getInts(int rowId, int count) {
0304     assert(dictionary == null);
0305     int[] array = new int[count];
0306     System.arraycopy(intData, rowId, array, 0, count);
0307     return array;
0308   }
0309 
0310   /**
0311    * Returns the dictionary Id for rowId.
0312    * This should only be called when the ColumnVector is dictionaryIds.
0313    * We have this separate method for dictionaryIds as per SPARK-16928.
0314    */
0315   public int getDictId(int rowId) {
0316     assert(dictionary == null)
0317             : "A ColumnVector dictionary should not have a dictionary for itself.";
0318     return intData[rowId];
0319   }
0320 
0321   //
0322   // APIs dealing with Longs
0323   //
0324 
0325   @Override
0326   public void putLong(int rowId, long value) {
0327     longData[rowId] = value;
0328   }
0329 
0330   @Override
0331   public void putLongs(int rowId, int count, long value) {
0332     for (int i = 0; i < count; ++i) {
0333       longData[i + rowId] = value;
0334     }
0335   }
0336 
0337   @Override
0338   public void putLongs(int rowId, int count, long[] src, int srcIndex) {
0339     System.arraycopy(src, srcIndex, longData, rowId, count);
0340   }
0341 
0342   @Override
0343   public void putLongs(int rowId, int count, byte[] src, int srcIndex) {
0344     Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex, longData,
0345       Platform.LONG_ARRAY_OFFSET + rowId * 8L, count * 8L);
0346   }
0347 
0348   @Override
0349   public void putLongsLittleEndian(int rowId, int count, byte[] src, int srcIndex) {
0350     int srcOffset = srcIndex + Platform.BYTE_ARRAY_OFFSET;
0351     for (int i = 0; i < count; ++i, srcOffset += 8) {
0352       longData[i + rowId] = Platform.getLong(src, srcOffset);
0353       if (bigEndianPlatform) {
0354         longData[i + rowId] = java.lang.Long.reverseBytes(longData[i + rowId]);
0355       }
0356     }
0357   }
0358 
0359   @Override
0360   public long getLong(int rowId) {
0361     if (dictionary == null) {
0362       return longData[rowId];
0363     } else {
0364       return dictionary.decodeToLong(dictionaryIds.getDictId(rowId));
0365     }
0366   }
0367 
0368   @Override
0369   public long[] getLongs(int rowId, int count) {
0370     assert(dictionary == null);
0371     long[] array = new long[count];
0372     System.arraycopy(longData, rowId, array, 0, count);
0373     return array;
0374   }
0375 
0376   //
0377   // APIs dealing with floats
0378   //
0379 
0380   @Override
0381   public void putFloat(int rowId, float value) { floatData[rowId] = value; }
0382 
0383   @Override
0384   public void putFloats(int rowId, int count, float value) {
0385     Arrays.fill(floatData, rowId, rowId + count, value);
0386   }
0387 
0388   @Override
0389   public void putFloats(int rowId, int count, float[] src, int srcIndex) {
0390     System.arraycopy(src, srcIndex, floatData, rowId, count);
0391   }
0392 
0393   @Override
0394   public void putFloats(int rowId, int count, byte[] src, int srcIndex) {
0395     if (!bigEndianPlatform) {
0396       Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex, floatData,
0397           Platform.DOUBLE_ARRAY_OFFSET + rowId * 4L, count * 4L);
0398     } else {
0399       ByteBuffer bb = ByteBuffer.wrap(src).order(ByteOrder.BIG_ENDIAN);
0400       for (int i = 0; i < count; ++i) {
0401         floatData[i + rowId] = bb.getFloat(srcIndex + (4 * i));
0402       }
0403     }
0404   }
0405 
0406   @Override
0407   public float getFloat(int rowId) {
0408     if (dictionary == null) {
0409       return floatData[rowId];
0410     } else {
0411       return dictionary.decodeToFloat(dictionaryIds.getDictId(rowId));
0412     }
0413   }
0414 
0415   @Override
0416   public float[] getFloats(int rowId, int count) {
0417     assert(dictionary == null);
0418     float[] array = new float[count];
0419     System.arraycopy(floatData, rowId, array, 0, count);
0420     return array;
0421   }
0422 
0423   //
0424   // APIs dealing with doubles
0425   //
0426 
0427   @Override
0428   public void putDouble(int rowId, double value) {
0429     doubleData[rowId] = value;
0430   }
0431 
0432   @Override
0433   public void putDoubles(int rowId, int count, double value) {
0434     Arrays.fill(doubleData, rowId, rowId + count, value);
0435   }
0436 
0437   @Override
0438   public void putDoubles(int rowId, int count, double[] src, int srcIndex) {
0439     System.arraycopy(src, srcIndex, doubleData, rowId, count);
0440   }
0441 
0442   @Override
0443   public void putDoubles(int rowId, int count, byte[] src, int srcIndex) {
0444     if (!bigEndianPlatform) {
0445       Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex, doubleData,
0446           Platform.DOUBLE_ARRAY_OFFSET + rowId * 8L, count * 8L);
0447     } else {
0448       ByteBuffer bb = ByteBuffer.wrap(src).order(ByteOrder.BIG_ENDIAN);
0449       for (int i = 0; i < count; ++i) {
0450         doubleData[i + rowId] = bb.getDouble(srcIndex + (8 * i));
0451       }
0452     }
0453   }
0454 
0455   @Override
0456   public double getDouble(int rowId) {
0457     if (dictionary == null) {
0458       return doubleData[rowId];
0459     } else {
0460       return dictionary.decodeToDouble(dictionaryIds.getDictId(rowId));
0461     }
0462   }
0463 
0464   @Override
0465   public double[] getDoubles(int rowId, int count) {
0466     assert(dictionary == null);
0467     double[] array = new double[count];
0468     System.arraycopy(doubleData, rowId, array, 0, count);
0469     return array;
0470   }
0471 
0472   //
0473   // APIs dealing with Arrays
0474   //
0475 
0476   @Override
0477   public int getArrayLength(int rowId) {
0478     return arrayLengths[rowId];
0479   }
0480   @Override
0481   public int getArrayOffset(int rowId) {
0482     return arrayOffsets[rowId];
0483   }
0484 
0485   @Override
0486   public void putArray(int rowId, int offset, int length) {
0487     arrayOffsets[rowId] = offset;
0488     arrayLengths[rowId] = length;
0489   }
0490 
0491   //
0492   // APIs dealing with Byte Arrays
0493   //
0494 
0495   @Override
0496   public int putByteArray(int rowId, byte[] value, int offset, int length) {
0497     int result = arrayData().appendBytes(length, value, offset);
0498     arrayOffsets[rowId] = result;
0499     arrayLengths[rowId] = length;
0500     return result;
0501   }
0502 
0503   // Spilt this function out since it is the slow path.
0504   @Override
0505   protected void reserveInternal(int newCapacity) {
0506     if (isArray() || type instanceof MapType) {
0507       int[] newLengths = new int[newCapacity];
0508       int[] newOffsets = new int[newCapacity];
0509       if (this.arrayLengths != null) {
0510         System.arraycopy(this.arrayLengths, 0, newLengths, 0, capacity);
0511         System.arraycopy(this.arrayOffsets, 0, newOffsets, 0, capacity);
0512       }
0513       arrayLengths = newLengths;
0514       arrayOffsets = newOffsets;
0515     } else if (type instanceof BooleanType) {
0516       if (byteData == null || byteData.length < newCapacity) {
0517         byte[] newData = new byte[newCapacity];
0518         if (byteData != null) System.arraycopy(byteData, 0, newData, 0, capacity);
0519         byteData = newData;
0520       }
0521     } else if (type instanceof ByteType) {
0522       if (byteData == null || byteData.length < newCapacity) {
0523         byte[] newData = new byte[newCapacity];
0524         if (byteData != null) System.arraycopy(byteData, 0, newData, 0, capacity);
0525         byteData = newData;
0526       }
0527     } else if (type instanceof ShortType) {
0528       if (shortData == null || shortData.length < newCapacity) {
0529         short[] newData = new short[newCapacity];
0530         if (shortData != null) System.arraycopy(shortData, 0, newData, 0, capacity);
0531         shortData = newData;
0532       }
0533     } else if (type instanceof IntegerType || type instanceof DateType ||
0534       DecimalType.is32BitDecimalType(type)) {
0535       if (intData == null || intData.length < newCapacity) {
0536         int[] newData = new int[newCapacity];
0537         if (intData != null) System.arraycopy(intData, 0, newData, 0, capacity);
0538         intData = newData;
0539       }
0540     } else if (type instanceof LongType || type instanceof TimestampType ||
0541         DecimalType.is64BitDecimalType(type)) {
0542       if (longData == null || longData.length < newCapacity) {
0543         long[] newData = new long[newCapacity];
0544         if (longData != null) System.arraycopy(longData, 0, newData, 0, capacity);
0545         longData = newData;
0546       }
0547     } else if (type instanceof FloatType) {
0548       if (floatData == null || floatData.length < newCapacity) {
0549         float[] newData = new float[newCapacity];
0550         if (floatData != null) System.arraycopy(floatData, 0, newData, 0, capacity);
0551         floatData = newData;
0552       }
0553     } else if (type instanceof DoubleType) {
0554       if (doubleData == null || doubleData.length < newCapacity) {
0555         double[] newData = new double[newCapacity];
0556         if (doubleData != null) System.arraycopy(doubleData, 0, newData, 0, capacity);
0557         doubleData = newData;
0558       }
0559     } else if (childColumns != null) {
0560       // Nothing to store.
0561     } else {
0562       throw new RuntimeException("Unhandled " + type);
0563     }
0564 
0565     byte[] newNulls = new byte[newCapacity];
0566     if (nulls != null) System.arraycopy(nulls, 0, newNulls, 0, capacity);
0567     nulls = newNulls;
0568 
0569     capacity = newCapacity;
0570   }
0571 
0572   @Override
0573   protected OnHeapColumnVector reserveNewColumn(int capacity, DataType type) {
0574     return new OnHeapColumnVector(capacity, type);
0575   }
0576 }