Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.sql.vectorized;
0019 
0020 import io.netty.buffer.ArrowBuf;
0021 import org.apache.arrow.vector.*;
0022 import org.apache.arrow.vector.complex.*;
0023 import org.apache.arrow.vector.holders.NullableVarCharHolder;
0024 
0025 import org.apache.spark.sql.util.ArrowUtils;
0026 import org.apache.spark.sql.types.*;
0027 import org.apache.spark.unsafe.types.UTF8String;
0028 
0029 /**
0030  * A column vector backed by Apache Arrow. Currently calendar interval type and map type are not
0031  * supported.
0032  */
0033 public final class ArrowColumnVector extends ColumnVector {
0034 
0035   private final ArrowVectorAccessor accessor;
0036   private ArrowColumnVector[] childColumns;
0037 
0038   @Override
0039   public boolean hasNull() {
0040     return accessor.getNullCount() > 0;
0041   }
0042 
0043   @Override
0044   public int numNulls() {
0045     return accessor.getNullCount();
0046   }
0047 
0048   @Override
0049   public void close() {
0050     if (childColumns != null) {
0051       for (int i = 0; i < childColumns.length; i++) {
0052         childColumns[i].close();
0053         childColumns[i] = null;
0054       }
0055       childColumns = null;
0056     }
0057     accessor.close();
0058   }
0059 
0060   @Override
0061   public boolean isNullAt(int rowId) {
0062     return accessor.isNullAt(rowId);
0063   }
0064 
0065   @Override
0066   public boolean getBoolean(int rowId) {
0067     return accessor.getBoolean(rowId);
0068   }
0069 
0070   @Override
0071   public byte getByte(int rowId) {
0072     return accessor.getByte(rowId);
0073   }
0074 
0075   @Override
0076   public short getShort(int rowId) {
0077     return accessor.getShort(rowId);
0078   }
0079 
0080   @Override
0081   public int getInt(int rowId) {
0082     return accessor.getInt(rowId);
0083   }
0084 
0085   @Override
0086   public long getLong(int rowId) {
0087     return accessor.getLong(rowId);
0088   }
0089 
0090   @Override
0091   public float getFloat(int rowId) {
0092     return accessor.getFloat(rowId);
0093   }
0094 
0095   @Override
0096   public double getDouble(int rowId) {
0097     return accessor.getDouble(rowId);
0098   }
0099 
0100   @Override
0101   public Decimal getDecimal(int rowId, int precision, int scale) {
0102     if (isNullAt(rowId)) return null;
0103     return accessor.getDecimal(rowId, precision, scale);
0104   }
0105 
0106   @Override
0107   public UTF8String getUTF8String(int rowId) {
0108     if (isNullAt(rowId)) return null;
0109     return accessor.getUTF8String(rowId);
0110   }
0111 
0112   @Override
0113   public byte[] getBinary(int rowId) {
0114     if (isNullAt(rowId)) return null;
0115     return accessor.getBinary(rowId);
0116   }
0117 
0118   @Override
0119   public ColumnarArray getArray(int rowId) {
0120     if (isNullAt(rowId)) return null;
0121     return accessor.getArray(rowId);
0122   }
0123 
0124   @Override
0125   public ColumnarMap getMap(int rowId) {
0126     if (isNullAt(rowId)) return null;
0127     return accessor.getMap(rowId);
0128   }
0129 
0130   @Override
0131   public ArrowColumnVector getChild(int ordinal) { return childColumns[ordinal]; }
0132 
0133   public ArrowColumnVector(ValueVector vector) {
0134     super(ArrowUtils.fromArrowField(vector.getField()));
0135 
0136     if (vector instanceof BitVector) {
0137       accessor = new BooleanAccessor((BitVector) vector);
0138     } else if (vector instanceof TinyIntVector) {
0139       accessor = new ByteAccessor((TinyIntVector) vector);
0140     } else if (vector instanceof SmallIntVector) {
0141       accessor = new ShortAccessor((SmallIntVector) vector);
0142     } else if (vector instanceof IntVector) {
0143       accessor = new IntAccessor((IntVector) vector);
0144     } else if (vector instanceof BigIntVector) {
0145       accessor = new LongAccessor((BigIntVector) vector);
0146     } else if (vector instanceof Float4Vector) {
0147       accessor = new FloatAccessor((Float4Vector) vector);
0148     } else if (vector instanceof Float8Vector) {
0149       accessor = new DoubleAccessor((Float8Vector) vector);
0150     } else if (vector instanceof DecimalVector) {
0151       accessor = new DecimalAccessor((DecimalVector) vector);
0152     } else if (vector instanceof VarCharVector) {
0153       accessor = new StringAccessor((VarCharVector) vector);
0154     } else if (vector instanceof VarBinaryVector) {
0155       accessor = new BinaryAccessor((VarBinaryVector) vector);
0156     } else if (vector instanceof DateDayVector) {
0157       accessor = new DateAccessor((DateDayVector) vector);
0158     } else if (vector instanceof TimeStampMicroTZVector) {
0159       accessor = new TimestampAccessor((TimeStampMicroTZVector) vector);
0160     } else if (vector instanceof MapVector) {
0161       MapVector mapVector = (MapVector) vector;
0162       accessor = new MapAccessor(mapVector);
0163     } else if (vector instanceof ListVector) {
0164       ListVector listVector = (ListVector) vector;
0165       accessor = new ArrayAccessor(listVector);
0166     } else if (vector instanceof StructVector) {
0167       StructVector structVector = (StructVector) vector;
0168       accessor = new StructAccessor(structVector);
0169 
0170       childColumns = new ArrowColumnVector[structVector.size()];
0171       for (int i = 0; i < childColumns.length; ++i) {
0172         childColumns[i] = new ArrowColumnVector(structVector.getVectorById(i));
0173       }
0174     } else {
0175       throw new UnsupportedOperationException();
0176     }
0177   }
0178 
0179   private abstract static class ArrowVectorAccessor {
0180 
0181     private final ValueVector vector;
0182 
0183     ArrowVectorAccessor(ValueVector vector) {
0184       this.vector = vector;
0185     }
0186 
0187     // TODO: should be final after removing ArrayAccessor workaround
0188     boolean isNullAt(int rowId) {
0189       return vector.isNull(rowId);
0190     }
0191 
0192     final int getNullCount() {
0193       return vector.getNullCount();
0194     }
0195 
0196     final void close() {
0197       vector.close();
0198     }
0199 
0200     boolean getBoolean(int rowId) {
0201       throw new UnsupportedOperationException();
0202     }
0203 
0204     byte getByte(int rowId) {
0205       throw new UnsupportedOperationException();
0206     }
0207 
0208     short getShort(int rowId) {
0209       throw new UnsupportedOperationException();
0210     }
0211 
0212     int getInt(int rowId) {
0213       throw new UnsupportedOperationException();
0214     }
0215 
0216     long getLong(int rowId) {
0217       throw new UnsupportedOperationException();
0218     }
0219 
0220     float getFloat(int rowId) {
0221       throw new UnsupportedOperationException();
0222     }
0223 
0224     double getDouble(int rowId) {
0225       throw new UnsupportedOperationException();
0226     }
0227 
0228     Decimal getDecimal(int rowId, int precision, int scale) {
0229       throw new UnsupportedOperationException();
0230     }
0231 
0232     UTF8String getUTF8String(int rowId) {
0233       throw new UnsupportedOperationException();
0234     }
0235 
0236     byte[] getBinary(int rowId) {
0237       throw new UnsupportedOperationException();
0238     }
0239 
0240     ColumnarArray getArray(int rowId) {
0241       throw new UnsupportedOperationException();
0242     }
0243 
0244     ColumnarMap getMap(int rowId) {
0245       throw new UnsupportedOperationException();
0246     }
0247   }
0248 
0249   private static class BooleanAccessor extends ArrowVectorAccessor {
0250 
0251     private final BitVector accessor;
0252 
0253     BooleanAccessor(BitVector vector) {
0254       super(vector);
0255       this.accessor = vector;
0256     }
0257 
0258     @Override
0259     final boolean getBoolean(int rowId) {
0260       return accessor.get(rowId) == 1;
0261     }
0262   }
0263 
0264   private static class ByteAccessor extends ArrowVectorAccessor {
0265 
0266     private final TinyIntVector accessor;
0267 
0268     ByteAccessor(TinyIntVector vector) {
0269       super(vector);
0270       this.accessor = vector;
0271     }
0272 
0273     @Override
0274     final byte getByte(int rowId) {
0275       return accessor.get(rowId);
0276     }
0277   }
0278 
0279   private static class ShortAccessor extends ArrowVectorAccessor {
0280 
0281     private final SmallIntVector accessor;
0282 
0283     ShortAccessor(SmallIntVector vector) {
0284       super(vector);
0285       this.accessor = vector;
0286     }
0287 
0288     @Override
0289     final short getShort(int rowId) {
0290       return accessor.get(rowId);
0291     }
0292   }
0293 
0294   private static class IntAccessor extends ArrowVectorAccessor {
0295 
0296     private final IntVector accessor;
0297 
0298     IntAccessor(IntVector vector) {
0299       super(vector);
0300       this.accessor = vector;
0301     }
0302 
0303     @Override
0304     final int getInt(int rowId) {
0305       return accessor.get(rowId);
0306     }
0307   }
0308 
0309   private static class LongAccessor extends ArrowVectorAccessor {
0310 
0311     private final BigIntVector accessor;
0312 
0313     LongAccessor(BigIntVector vector) {
0314       super(vector);
0315       this.accessor = vector;
0316     }
0317 
0318     @Override
0319     final long getLong(int rowId) {
0320       return accessor.get(rowId);
0321     }
0322   }
0323 
0324   private static class FloatAccessor extends ArrowVectorAccessor {
0325 
0326     private final Float4Vector accessor;
0327 
0328     FloatAccessor(Float4Vector vector) {
0329       super(vector);
0330       this.accessor = vector;
0331     }
0332 
0333     @Override
0334     final float getFloat(int rowId) {
0335       return accessor.get(rowId);
0336     }
0337   }
0338 
0339   private static class DoubleAccessor extends ArrowVectorAccessor {
0340 
0341     private final Float8Vector accessor;
0342 
0343     DoubleAccessor(Float8Vector vector) {
0344       super(vector);
0345       this.accessor = vector;
0346     }
0347 
0348     @Override
0349     final double getDouble(int rowId) {
0350       return accessor.get(rowId);
0351     }
0352   }
0353 
0354   private static class DecimalAccessor extends ArrowVectorAccessor {
0355 
0356     private final DecimalVector accessor;
0357 
0358     DecimalAccessor(DecimalVector vector) {
0359       super(vector);
0360       this.accessor = vector;
0361     }
0362 
0363     @Override
0364     final Decimal getDecimal(int rowId, int precision, int scale) {
0365       if (isNullAt(rowId)) return null;
0366       return Decimal.apply(accessor.getObject(rowId), precision, scale);
0367     }
0368   }
0369 
0370   private static class StringAccessor extends ArrowVectorAccessor {
0371 
0372     private final VarCharVector accessor;
0373     private final NullableVarCharHolder stringResult = new NullableVarCharHolder();
0374 
0375     StringAccessor(VarCharVector vector) {
0376       super(vector);
0377       this.accessor = vector;
0378     }
0379 
0380     @Override
0381     final UTF8String getUTF8String(int rowId) {
0382       accessor.get(rowId, stringResult);
0383       if (stringResult.isSet == 0) {
0384         return null;
0385       } else {
0386         return UTF8String.fromAddress(null,
0387           stringResult.buffer.memoryAddress() + stringResult.start,
0388           stringResult.end - stringResult.start);
0389       }
0390     }
0391   }
0392 
0393   private static class BinaryAccessor extends ArrowVectorAccessor {
0394 
0395     private final VarBinaryVector accessor;
0396 
0397     BinaryAccessor(VarBinaryVector vector) {
0398       super(vector);
0399       this.accessor = vector;
0400     }
0401 
0402     @Override
0403     final byte[] getBinary(int rowId) {
0404       return accessor.getObject(rowId);
0405     }
0406   }
0407 
0408   private static class DateAccessor extends ArrowVectorAccessor {
0409 
0410     private final DateDayVector accessor;
0411 
0412     DateAccessor(DateDayVector vector) {
0413       super(vector);
0414       this.accessor = vector;
0415     }
0416 
0417     @Override
0418     final int getInt(int rowId) {
0419       return accessor.get(rowId);
0420     }
0421   }
0422 
0423   private static class TimestampAccessor extends ArrowVectorAccessor {
0424 
0425     private final TimeStampMicroTZVector accessor;
0426 
0427     TimestampAccessor(TimeStampMicroTZVector vector) {
0428       super(vector);
0429       this.accessor = vector;
0430     }
0431 
0432     @Override
0433     final long getLong(int rowId) {
0434       return accessor.get(rowId);
0435     }
0436   }
0437 
0438   private static class ArrayAccessor extends ArrowVectorAccessor {
0439 
0440     private final ListVector accessor;
0441     private final ArrowColumnVector arrayData;
0442 
0443     ArrayAccessor(ListVector vector) {
0444       super(vector);
0445       this.accessor = vector;
0446       this.arrayData = new ArrowColumnVector(vector.getDataVector());
0447     }
0448 
0449     @Override
0450     final boolean isNullAt(int rowId) {
0451       // TODO: Workaround if vector has all non-null values, see ARROW-1948
0452       if (accessor.getValueCount() > 0 && accessor.getValidityBuffer().capacity() == 0) {
0453         return false;
0454       } else {
0455         return super.isNullAt(rowId);
0456       }
0457     }
0458 
0459     @Override
0460     final ColumnarArray getArray(int rowId) {
0461       ArrowBuf offsets = accessor.getOffsetBuffer();
0462       int index = rowId * ListVector.OFFSET_WIDTH;
0463       int start = offsets.getInt(index);
0464       int end = offsets.getInt(index + ListVector.OFFSET_WIDTH);
0465       return new ColumnarArray(arrayData, start, end - start);
0466     }
0467   }
0468 
0469   /**
0470    * Any call to "get" method will throw UnsupportedOperationException.
0471    *
0472    * Access struct values in a ArrowColumnVector doesn't use this accessor. Instead, it uses
0473    * getStruct() method defined in the parent class. Any call to "get" method in this class is a
0474    * bug in the code.
0475    *
0476    */
0477   private static class StructAccessor extends ArrowVectorAccessor {
0478 
0479     StructAccessor(StructVector vector) {
0480       super(vector);
0481     }
0482   }
0483 
0484   private static class MapAccessor extends ArrowVectorAccessor {
0485     private final MapVector accessor;
0486     private final ArrowColumnVector keys;
0487     private final ArrowColumnVector values;
0488 
0489     MapAccessor(MapVector vector) {
0490       super(vector);
0491       this.accessor = vector;
0492       StructVector entries = (StructVector) vector.getDataVector();
0493       this.keys = new ArrowColumnVector(entries.getChild(MapVector.KEY_NAME));
0494       this.values = new ArrowColumnVector(entries.getChild(MapVector.VALUE_NAME));
0495     }
0496 
0497     @Override
0498     final ColumnarMap getMap(int rowId) {
0499       int index = rowId * MapVector.OFFSET_WIDTH;
0500       int offset = accessor.getOffsetBuffer().getInt(index);
0501       int length = accessor.getInnerValueCountAt(rowId);
0502       return new ColumnarMap(keys, values, offset, length);
0503     }
0504   }
0505 }