Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.sql.execution.datasources.orc;
0019 
0020 import java.math.BigDecimal;
0021 
0022 import org.apache.hadoop.hive.ql.exec.vector.*;
0023 
0024 import org.apache.spark.sql.catalyst.util.DateTimeUtils;
0025 import org.apache.spark.sql.catalyst.util.RebaseDateTime;
0026 import org.apache.spark.sql.types.DataType;
0027 import org.apache.spark.sql.types.DateType;
0028 import org.apache.spark.sql.types.Decimal;
0029 import org.apache.spark.sql.types.TimestampType;
0030 import org.apache.spark.sql.vectorized.ColumnarArray;
0031 import org.apache.spark.sql.vectorized.ColumnarMap;
0032 import org.apache.spark.unsafe.types.UTF8String;
0033 
0034 /**
0035  * A column vector class wrapping Hive's ColumnVector. Because Spark ColumnarBatch only accepts
0036  * Spark's vectorized.ColumnVector, this column vector is used to adapt Hive ColumnVector with
0037  * Spark ColumnarVector.
0038  */
0039 public class OrcColumnVector extends org.apache.spark.sql.vectorized.ColumnVector {
0040   private ColumnVector baseData;
0041   private LongColumnVector longData;
0042   private DoubleColumnVector doubleData;
0043   private BytesColumnVector bytesData;
0044   private DecimalColumnVector decimalData;
0045   private TimestampColumnVector timestampData;
0046   private final boolean isTimestamp;
0047   private final boolean isDate;
0048 
0049   private int batchSize;
0050 
0051   OrcColumnVector(DataType type, ColumnVector vector) {
0052     super(type);
0053 
0054     if (type instanceof TimestampType) {
0055       isTimestamp = true;
0056     } else {
0057       isTimestamp = false;
0058     }
0059 
0060     if (type instanceof DateType) {
0061       isDate = true;
0062     } else {
0063       isDate = false;
0064     }
0065 
0066     baseData = vector;
0067     if (vector instanceof LongColumnVector) {
0068       longData = (LongColumnVector) vector;
0069     } else if (vector instanceof DoubleColumnVector) {
0070       doubleData = (DoubleColumnVector) vector;
0071     } else if (vector instanceof BytesColumnVector) {
0072       bytesData = (BytesColumnVector) vector;
0073     } else if (vector instanceof DecimalColumnVector) {
0074       decimalData = (DecimalColumnVector) vector;
0075     } else if (vector instanceof TimestampColumnVector) {
0076       timestampData = (TimestampColumnVector) vector;
0077     } else {
0078       throw new UnsupportedOperationException();
0079     }
0080   }
0081 
0082   public void setBatchSize(int batchSize) {
0083     this.batchSize = batchSize;
0084   }
0085 
0086   @Override
0087   public void close() {
0088 
0089   }
0090 
0091   @Override
0092   public boolean hasNull() {
0093     return !baseData.noNulls;
0094   }
0095 
0096   @Override
0097   public int numNulls() {
0098     if (baseData.isRepeating) {
0099       if (baseData.isNull[0]) {
0100         return batchSize;
0101       } else {
0102         return 0;
0103       }
0104     } else if (baseData.noNulls) {
0105       return 0;
0106     } else {
0107       int count = 0;
0108       for (int i = 0; i < batchSize; i++) {
0109         if (baseData.isNull[i]) count++;
0110       }
0111       return count;
0112     }
0113   }
0114 
0115   /* A helper method to get the row index in a column. */
0116   private int getRowIndex(int rowId) {
0117     return baseData.isRepeating ? 0 : rowId;
0118   }
0119 
0120   @Override
0121   public boolean isNullAt(int rowId) {
0122     return baseData.isNull[getRowIndex(rowId)];
0123   }
0124 
0125   @Override
0126   public boolean getBoolean(int rowId) {
0127     return longData.vector[getRowIndex(rowId)] == 1;
0128   }
0129 
0130   @Override
0131   public byte getByte(int rowId) {
0132     return (byte) longData.vector[getRowIndex(rowId)];
0133   }
0134 
0135   @Override
0136   public short getShort(int rowId) {
0137     return (short) longData.vector[getRowIndex(rowId)];
0138   }
0139 
0140   @Override
0141   public int getInt(int rowId) {
0142     int value = (int) longData.vector[getRowIndex(rowId)];
0143     if (isDate) {
0144       return RebaseDateTime.rebaseJulianToGregorianDays(value);
0145     } else {
0146       return value;
0147     }
0148   }
0149 
0150   @Override
0151   public long getLong(int rowId) {
0152     int index = getRowIndex(rowId);
0153     if (isTimestamp) {
0154       return DateTimeUtils.fromJavaTimestamp(timestampData.asScratchTimestamp(index));
0155     } else {
0156       return longData.vector[index];
0157     }
0158   }
0159 
0160   @Override
0161   public float getFloat(int rowId) {
0162     return (float) doubleData.vector[getRowIndex(rowId)];
0163   }
0164 
0165   @Override
0166   public double getDouble(int rowId) {
0167     return doubleData.vector[getRowIndex(rowId)];
0168   }
0169 
0170   @Override
0171   public Decimal getDecimal(int rowId, int precision, int scale) {
0172     if (isNullAt(rowId)) return null;
0173     BigDecimal data = decimalData.vector[getRowIndex(rowId)].getHiveDecimal().bigDecimalValue();
0174     return Decimal.apply(data, precision, scale);
0175   }
0176 
0177   @Override
0178   public UTF8String getUTF8String(int rowId) {
0179     if (isNullAt(rowId)) return null;
0180     int index = getRowIndex(rowId);
0181     BytesColumnVector col = bytesData;
0182     return UTF8String.fromBytes(col.vector[index], col.start[index], col.length[index]);
0183   }
0184 
0185   @Override
0186   public byte[] getBinary(int rowId) {
0187     if (isNullAt(rowId)) return null;
0188     int index = getRowIndex(rowId);
0189     byte[] binary = new byte[bytesData.length[index]];
0190     System.arraycopy(bytesData.vector[index], bytesData.start[index], binary, 0, binary.length);
0191     return binary;
0192   }
0193 
0194   @Override
0195   public ColumnarArray getArray(int rowId) {
0196     throw new UnsupportedOperationException();
0197   }
0198 
0199   @Override
0200   public ColumnarMap getMap(int rowId) {
0201     throw new UnsupportedOperationException();
0202   }
0203 
0204   @Override
0205   public org.apache.spark.sql.vectorized.ColumnVector getChild(int ordinal) {
0206     throw new UnsupportedOperationException();
0207   }
0208 }