0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.sql.execution.datasources.orc;
0019
0020 import java.math.BigDecimal;
0021
0022 import org.apache.orc.storage.ql.exec.vector.*;
0023
0024 import org.apache.spark.sql.catalyst.util.DateTimeUtils;
0025 import org.apache.spark.sql.catalyst.util.RebaseDateTime;
0026 import org.apache.spark.sql.types.DataType;
0027 import org.apache.spark.sql.types.DateType;
0028 import org.apache.spark.sql.types.Decimal;
0029 import org.apache.spark.sql.types.TimestampType;
0030 import org.apache.spark.sql.vectorized.ColumnarArray;
0031 import org.apache.spark.sql.vectorized.ColumnarMap;
0032 import org.apache.spark.unsafe.types.UTF8String;
0033
0034
0035
0036
0037
0038
0039 public class OrcColumnVector extends org.apache.spark.sql.vectorized.ColumnVector {
0040 private ColumnVector baseData;
0041 private LongColumnVector longData;
0042 private DoubleColumnVector doubleData;
0043 private BytesColumnVector bytesData;
0044 private DecimalColumnVector decimalData;
0045 private TimestampColumnVector timestampData;
0046 private final boolean isTimestamp;
0047 private final boolean isDate;
0048
0049 private int batchSize;
0050
0051 OrcColumnVector(DataType type, ColumnVector vector) {
0052 super(type);
0053
0054 if (type instanceof TimestampType) {
0055 isTimestamp = true;
0056 } else {
0057 isTimestamp = false;
0058 }
0059
0060 if (type instanceof DateType) {
0061 isDate = true;
0062 } else {
0063 isDate = false;
0064 }
0065
0066 baseData = vector;
0067 if (vector instanceof LongColumnVector) {
0068 longData = (LongColumnVector) vector;
0069 } else if (vector instanceof DoubleColumnVector) {
0070 doubleData = (DoubleColumnVector) vector;
0071 } else if (vector instanceof BytesColumnVector) {
0072 bytesData = (BytesColumnVector) vector;
0073 } else if (vector instanceof DecimalColumnVector) {
0074 decimalData = (DecimalColumnVector) vector;
0075 } else if (vector instanceof TimestampColumnVector) {
0076 timestampData = (TimestampColumnVector) vector;
0077 } else {
0078 throw new UnsupportedOperationException();
0079 }
0080 }
0081
0082 public void setBatchSize(int batchSize) {
0083 this.batchSize = batchSize;
0084 }
0085
0086 @Override
0087 public void close() {
0088
0089 }
0090
0091 @Override
0092 public boolean hasNull() {
0093 return !baseData.noNulls;
0094 }
0095
0096 @Override
0097 public int numNulls() {
0098 if (baseData.isRepeating) {
0099 if (baseData.isNull[0]) {
0100 return batchSize;
0101 } else {
0102 return 0;
0103 }
0104 } else if (baseData.noNulls) {
0105 return 0;
0106 } else {
0107 int count = 0;
0108 for (int i = 0; i < batchSize; i++) {
0109 if (baseData.isNull[i]) count++;
0110 }
0111 return count;
0112 }
0113 }
0114
0115
0116 private int getRowIndex(int rowId) {
0117 return baseData.isRepeating ? 0 : rowId;
0118 }
0119
0120 @Override
0121 public boolean isNullAt(int rowId) {
0122 return baseData.isNull[getRowIndex(rowId)];
0123 }
0124
0125 @Override
0126 public boolean getBoolean(int rowId) {
0127 return longData.vector[getRowIndex(rowId)] == 1;
0128 }
0129
0130 @Override
0131 public byte getByte(int rowId) {
0132 return (byte) longData.vector[getRowIndex(rowId)];
0133 }
0134
0135 @Override
0136 public short getShort(int rowId) {
0137 return (short) longData.vector[getRowIndex(rowId)];
0138 }
0139
0140 @Override
0141 public int getInt(int rowId) {
0142 int value = (int) longData.vector[getRowIndex(rowId)];
0143 if (isDate) {
0144 return RebaseDateTime.rebaseJulianToGregorianDays(value);
0145 } else {
0146 return value;
0147 }
0148 }
0149
0150 @Override
0151 public long getLong(int rowId) {
0152 int index = getRowIndex(rowId);
0153 if (isTimestamp) {
0154 return DateTimeUtils.fromJavaTimestamp(timestampData.asScratchTimestamp(index));
0155 } else {
0156 return longData.vector[index];
0157 }
0158 }
0159
0160 @Override
0161 public float getFloat(int rowId) {
0162 return (float) doubleData.vector[getRowIndex(rowId)];
0163 }
0164
0165 @Override
0166 public double getDouble(int rowId) {
0167 return doubleData.vector[getRowIndex(rowId)];
0168 }
0169
0170 @Override
0171 public Decimal getDecimal(int rowId, int precision, int scale) {
0172 if (isNullAt(rowId)) return null;
0173 BigDecimal data = decimalData.vector[getRowIndex(rowId)].getHiveDecimal().bigDecimalValue();
0174 return Decimal.apply(data, precision, scale);
0175 }
0176
0177 @Override
0178 public UTF8String getUTF8String(int rowId) {
0179 if (isNullAt(rowId)) return null;
0180 int index = getRowIndex(rowId);
0181 BytesColumnVector col = bytesData;
0182 return UTF8String.fromBytes(col.vector[index], col.start[index], col.length[index]);
0183 }
0184
0185 @Override
0186 public byte[] getBinary(int rowId) {
0187 if (isNullAt(rowId)) return null;
0188 int index = getRowIndex(rowId);
0189 byte[] binary = new byte[bytesData.length[index]];
0190 System.arraycopy(bytesData.vector[index], bytesData.start[index], binary, 0, binary.length);
0191 return binary;
0192 }
0193
0194 @Override
0195 public ColumnarArray getArray(int rowId) {
0196 throw new UnsupportedOperationException();
0197 }
0198
0199 @Override
0200 public ColumnarMap getMap(int rowId) {
0201 throw new UnsupportedOperationException();
0202 }
0203
0204 @Override
0205 public org.apache.spark.sql.vectorized.ColumnVector getChild(int ordinal) {
0206 throw new UnsupportedOperationException();
0207 }
0208 }