Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.sql.execution.datasources.parquet;
0019 
0020 import java.io.IOException;
0021 import java.time.ZoneId;
0022 import java.time.ZoneOffset;
0023 import java.util.Arrays;
0024 
0025 import org.apache.parquet.bytes.ByteBufferInputStream;
0026 import org.apache.parquet.bytes.BytesInput;
0027 import org.apache.parquet.bytes.BytesUtils;
0028 import org.apache.parquet.column.ColumnDescriptor;
0029 import org.apache.parquet.column.Dictionary;
0030 import org.apache.parquet.column.Encoding;
0031 import org.apache.parquet.column.page.*;
0032 import org.apache.parquet.column.values.ValuesReader;
0033 import org.apache.parquet.io.api.Binary;
0034 import org.apache.parquet.schema.OriginalType;
0035 import org.apache.parquet.schema.PrimitiveType;
0036 
0037 import org.apache.spark.sql.catalyst.util.DateTimeUtils;
0038 import org.apache.spark.sql.catalyst.util.RebaseDateTime;
0039 import org.apache.spark.sql.execution.datasources.DataSourceUtils;
0040 import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException;
0041 import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
0042 import org.apache.spark.sql.types.DataTypes;
0043 import org.apache.spark.sql.types.DecimalType;
0044 
0045 import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL;
0046 import static org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase.ValuesReaderIntIterator;
0047 import static org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase.createRLEIterator;
0048 
0049 /**
0050  * Decoder to return values from a single column.
0051  */
0052 public class VectorizedColumnReader {
0053   /**
0054    * Total number of values read.
0055    */
0056   private long valuesRead;
0057 
0058   /**
0059    * value that indicates the end of the current page. That is,
0060    * if valuesRead == endOfPageValueCount, we are at the end of the page.
0061    */
0062   private long endOfPageValueCount;
0063 
0064   /**
0065    * The dictionary, if this column has dictionary encoding.
0066    */
0067   private final Dictionary dictionary;
0068 
0069   /**
0070    * If true, the current page is dictionary encoded.
0071    */
0072   private boolean isCurrentPageDictionaryEncoded;
0073 
0074   /**
0075    * Maximum definition level for this column.
0076    */
0077   private final int maxDefLevel;
0078 
0079   /**
0080    * Repetition/Definition/Value readers.
0081    */
0082   private SpecificParquetRecordReaderBase.IntIterator repetitionLevelColumn;
0083   private SpecificParquetRecordReaderBase.IntIterator definitionLevelColumn;
0084   private ValuesReader dataColumn;
0085 
0086   // Only set if vectorized decoding is true. This is used instead of the row by row decoding
0087   // with `definitionLevelColumn`.
0088   private VectorizedRleValuesReader defColumn;
0089 
0090   /**
0091    * Total number of values in this column (in this row group).
0092    */
0093   private final long totalValueCount;
0094 
0095   /**
0096    * Total values in the current page.
0097    */
0098   private int pageValueCount;
0099 
0100   private final PageReader pageReader;
0101   private final ColumnDescriptor descriptor;
0102   private final OriginalType originalType;
0103   // The timezone conversion to apply to int96 timestamps. Null if no conversion.
0104   private final ZoneId convertTz;
0105   private static final ZoneId UTC = ZoneOffset.UTC;
0106   private final String datetimeRebaseMode;
0107 
0108   public VectorizedColumnReader(
0109       ColumnDescriptor descriptor,
0110       OriginalType originalType,
0111       PageReader pageReader,
0112       ZoneId convertTz,
0113       String datetimeRebaseMode) throws IOException {
0114     this.descriptor = descriptor;
0115     this.pageReader = pageReader;
0116     this.convertTz = convertTz;
0117     this.originalType = originalType;
0118     this.maxDefLevel = descriptor.getMaxDefinitionLevel();
0119 
0120     DictionaryPage dictionaryPage = pageReader.readDictionaryPage();
0121     if (dictionaryPage != null) {
0122       try {
0123         this.dictionary = dictionaryPage.getEncoding().initDictionary(descriptor, dictionaryPage);
0124         this.isCurrentPageDictionaryEncoded = true;
0125       } catch (IOException e) {
0126         throw new IOException("could not decode the dictionary for " + descriptor, e);
0127       }
0128     } else {
0129       this.dictionary = null;
0130       this.isCurrentPageDictionaryEncoded = false;
0131     }
0132     this.totalValueCount = pageReader.getTotalValueCount();
0133     if (totalValueCount == 0) {
0134       throw new IOException("totalValueCount == 0");
0135     }
0136     assert "LEGACY".equals(datetimeRebaseMode) || "EXCEPTION".equals(datetimeRebaseMode) ||
0137       "CORRECTED".equals(datetimeRebaseMode);
0138     this.datetimeRebaseMode = datetimeRebaseMode;
0139   }
0140 
0141   /**
0142    * Advances to the next value. Returns true if the value is non-null.
0143    */
0144   private boolean next() throws IOException {
0145     if (valuesRead >= endOfPageValueCount) {
0146       if (valuesRead >= totalValueCount) {
0147         // How do we get here? Throw end of stream exception?
0148         return false;
0149       }
0150       readPage();
0151     }
0152     ++valuesRead;
0153     // TODO: Don't read for flat schemas
0154     //repetitionLevel = repetitionLevelColumn.nextInt();
0155     return definitionLevelColumn.nextInt() == maxDefLevel;
0156   }
0157 
0158   private boolean isLazyDecodingSupported(PrimitiveType.PrimitiveTypeName typeName) {
0159     boolean isSupported = false;
0160     switch (typeName) {
0161       case INT32:
0162         isSupported = originalType != OriginalType.DATE || "CORRECTED".equals(datetimeRebaseMode);
0163         break;
0164       case INT64:
0165         if (originalType == OriginalType.TIMESTAMP_MICROS) {
0166           isSupported = "CORRECTED".equals(datetimeRebaseMode);
0167         } else {
0168           isSupported = originalType != OriginalType.TIMESTAMP_MILLIS;
0169         }
0170         break;
0171       case FLOAT:
0172       case DOUBLE:
0173       case BINARY:
0174         isSupported = true;
0175         break;
0176     }
0177     return isSupported;
0178   }
0179 
0180   static int rebaseDays(int julianDays, final boolean failIfRebase) {
0181     if (failIfRebase) {
0182       if (julianDays < RebaseDateTime.lastSwitchJulianDay()) {
0183         throw DataSourceUtils.newRebaseExceptionInRead("Parquet");
0184       } else {
0185         return julianDays;
0186       }
0187     } else {
0188       return RebaseDateTime.rebaseJulianToGregorianDays(julianDays);
0189     }
0190   }
0191 
0192   static long rebaseMicros(long julianMicros, final boolean failIfRebase) {
0193     if (failIfRebase) {
0194       if (julianMicros < RebaseDateTime.lastSwitchJulianTs()) {
0195         throw DataSourceUtils.newRebaseExceptionInRead("Parquet");
0196       } else {
0197         return julianMicros;
0198       }
0199     } else {
0200       return RebaseDateTime.rebaseJulianToGregorianMicros(julianMicros);
0201     }
0202   }
0203 
0204   /**
0205    * Reads `total` values from this columnReader into column.
0206    */
0207   void readBatch(int total, WritableColumnVector column) throws IOException {
0208     int rowId = 0;
0209     WritableColumnVector dictionaryIds = null;
0210     if (dictionary != null) {
0211       // SPARK-16334: We only maintain a single dictionary per row batch, so that it can be used to
0212       // decode all previous dictionary encoded pages if we ever encounter a non-dictionary encoded
0213       // page.
0214       dictionaryIds = column.reserveDictionaryIds(total);
0215     }
0216     while (total > 0) {
0217       // Compute the number of values we want to read in this page.
0218       int leftInPage = (int) (endOfPageValueCount - valuesRead);
0219       if (leftInPage == 0) {
0220         readPage();
0221         leftInPage = (int) (endOfPageValueCount - valuesRead);
0222       }
0223       int num = Math.min(total, leftInPage);
0224       PrimitiveType.PrimitiveTypeName typeName =
0225         descriptor.getPrimitiveType().getPrimitiveTypeName();
0226       if (isCurrentPageDictionaryEncoded) {
0227         // Read and decode dictionary ids.
0228         defColumn.readIntegers(
0229             num, dictionaryIds, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
0230 
0231         // TIMESTAMP_MILLIS encoded as INT64 can't be lazily decoded as we need to post process
0232         // the values to add microseconds precision.
0233         if (column.hasDictionary() || (rowId == 0 && isLazyDecodingSupported(typeName))) {
0234           // Column vector supports lazy decoding of dictionary values so just set the dictionary.
0235           // We can't do this if rowId != 0 AND the column doesn't have a dictionary (i.e. some
0236           // non-dictionary encoded values have already been added).
0237           column.setDictionary(new ParquetDictionary(dictionary));
0238         } else {
0239           decodeDictionaryIds(rowId, num, column, dictionaryIds);
0240         }
0241       } else {
0242         if (column.hasDictionary() && rowId != 0) {
0243           // This batch already has dictionary encoded values but this new page is not. The batch
0244           // does not support a mix of dictionary and not so we will decode the dictionary.
0245           decodeDictionaryIds(0, rowId, column, column.getDictionaryIds());
0246         }
0247         column.setDictionary(null);
0248         switch (typeName) {
0249           case BOOLEAN:
0250             readBooleanBatch(rowId, num, column);
0251             break;
0252           case INT32:
0253             readIntBatch(rowId, num, column);
0254             break;
0255           case INT64:
0256             readLongBatch(rowId, num, column);
0257             break;
0258           case INT96:
0259             readBinaryBatch(rowId, num, column);
0260             break;
0261           case FLOAT:
0262             readFloatBatch(rowId, num, column);
0263             break;
0264           case DOUBLE:
0265             readDoubleBatch(rowId, num, column);
0266             break;
0267           case BINARY:
0268             readBinaryBatch(rowId, num, column);
0269             break;
0270           case FIXED_LEN_BYTE_ARRAY:
0271             readFixedLenByteArrayBatch(
0272               rowId, num, column, descriptor.getPrimitiveType().getTypeLength());
0273             break;
0274           default:
0275             throw new IOException("Unsupported type: " + typeName);
0276         }
0277       }
0278 
0279       valuesRead += num;
0280       rowId += num;
0281       total -= num;
0282     }
0283   }
0284 
0285   private boolean shouldConvertTimestamps() {
0286     return convertTz != null && !convertTz.equals(UTC);
0287   }
0288 
0289   /**
0290    * Helper function to construct exception for parquet schema mismatch.
0291    */
0292   private SchemaColumnConvertNotSupportedException constructConvertNotSupportedException(
0293       ColumnDescriptor descriptor,
0294       WritableColumnVector column) {
0295     return new SchemaColumnConvertNotSupportedException(
0296       Arrays.toString(descriptor.getPath()),
0297       descriptor.getPrimitiveType().getPrimitiveTypeName().toString(),
0298       column.dataType().catalogString());
0299   }
0300 
0301   /**
0302    * Reads `num` values into column, decoding the values from `dictionaryIds` and `dictionary`.
0303    */
0304   private void decodeDictionaryIds(
0305       int rowId,
0306       int num,
0307       WritableColumnVector column,
0308       WritableColumnVector dictionaryIds) {
0309     switch (descriptor.getPrimitiveType().getPrimitiveTypeName()) {
0310       case INT32:
0311         if (column.dataType() == DataTypes.IntegerType ||
0312             DecimalType.is32BitDecimalType(column.dataType()) ||
0313             (column.dataType() == DataTypes.DateType && "CORRECTED".equals(datetimeRebaseMode))) {
0314           for (int i = rowId; i < rowId + num; ++i) {
0315             if (!column.isNullAt(i)) {
0316               column.putInt(i, dictionary.decodeToInt(dictionaryIds.getDictId(i)));
0317             }
0318           }
0319         } else if (column.dataType() == DataTypes.ByteType) {
0320           for (int i = rowId; i < rowId + num; ++i) {
0321             if (!column.isNullAt(i)) {
0322               column.putByte(i, (byte) dictionary.decodeToInt(dictionaryIds.getDictId(i)));
0323             }
0324           }
0325         } else if (column.dataType() == DataTypes.ShortType) {
0326           for (int i = rowId; i < rowId + num; ++i) {
0327             if (!column.isNullAt(i)) {
0328               column.putShort(i, (short) dictionary.decodeToInt(dictionaryIds.getDictId(i)));
0329             }
0330           }
0331         } else if (column.dataType() == DataTypes.DateType) {
0332           final boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode);
0333           for (int i = rowId; i < rowId + num; ++i) {
0334             if (!column.isNullAt(i)) {
0335               int julianDays = dictionary.decodeToInt(dictionaryIds.getDictId(i));
0336               column.putInt(i, rebaseDays(julianDays, failIfRebase));
0337             }
0338           }
0339         } else {
0340           throw constructConvertNotSupportedException(descriptor, column);
0341         }
0342         break;
0343 
0344       case INT64:
0345         if (column.dataType() == DataTypes.LongType ||
0346             DecimalType.is64BitDecimalType(column.dataType()) ||
0347             (originalType == OriginalType.TIMESTAMP_MICROS &&
0348               "CORRECTED".equals(datetimeRebaseMode))) {
0349           for (int i = rowId; i < rowId + num; ++i) {
0350             if (!column.isNullAt(i)) {
0351               column.putLong(i, dictionary.decodeToLong(dictionaryIds.getDictId(i)));
0352             }
0353           }
0354         } else if (originalType == OriginalType.TIMESTAMP_MILLIS) {
0355           if ("CORRECTED".equals(datetimeRebaseMode)) {
0356             for (int i = rowId; i < rowId + num; ++i) {
0357               if (!column.isNullAt(i)) {
0358                 long gregorianMillis = dictionary.decodeToLong(dictionaryIds.getDictId(i));
0359                 column.putLong(i, DateTimeUtils.fromMillis(gregorianMillis));
0360               }
0361             }
0362           } else {
0363             final boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode);
0364             for (int i = rowId; i < rowId + num; ++i) {
0365               if (!column.isNullAt(i)) {
0366                 long julianMillis = dictionary.decodeToLong(dictionaryIds.getDictId(i));
0367                 long julianMicros = DateTimeUtils.fromMillis(julianMillis);
0368                 column.putLong(i, rebaseMicros(julianMicros, failIfRebase));
0369               }
0370             }
0371           }
0372         } else if (originalType == OriginalType.TIMESTAMP_MICROS) {
0373           final boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode);
0374           for (int i = rowId; i < rowId + num; ++i) {
0375             if (!column.isNullAt(i)) {
0376               long julianMicros = dictionary.decodeToLong(dictionaryIds.getDictId(i));
0377               column.putLong(i, rebaseMicros(julianMicros, failIfRebase));
0378             }
0379           }
0380         } else {
0381           throw constructConvertNotSupportedException(descriptor, column);
0382         }
0383         break;
0384 
0385       case FLOAT:
0386         for (int i = rowId; i < rowId + num; ++i) {
0387           if (!column.isNullAt(i)) {
0388             column.putFloat(i, dictionary.decodeToFloat(dictionaryIds.getDictId(i)));
0389           }
0390         }
0391         break;
0392 
0393       case DOUBLE:
0394         for (int i = rowId; i < rowId + num; ++i) {
0395           if (!column.isNullAt(i)) {
0396             column.putDouble(i, dictionary.decodeToDouble(dictionaryIds.getDictId(i)));
0397           }
0398         }
0399         break;
0400       case INT96:
0401         if (column.dataType() == DataTypes.TimestampType) {
0402           if (!shouldConvertTimestamps()) {
0403             for (int i = rowId; i < rowId + num; ++i) {
0404               if (!column.isNullAt(i)) {
0405                 Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i));
0406                 column.putLong(i, ParquetRowConverter.binaryToSQLTimestamp(v));
0407               }
0408             }
0409           } else {
0410             for (int i = rowId; i < rowId + num; ++i) {
0411               if (!column.isNullAt(i)) {
0412                 Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i));
0413                 long rawTime = ParquetRowConverter.binaryToSQLTimestamp(v);
0414                 long adjTime = DateTimeUtils.convertTz(rawTime, convertTz, UTC);
0415                 column.putLong(i, adjTime);
0416               }
0417             }
0418           }
0419         } else {
0420           throw constructConvertNotSupportedException(descriptor, column);
0421         }
0422         break;
0423       case BINARY:
0424         // TODO: this is incredibly inefficient as it blows up the dictionary right here. We
0425         // need to do this better. We should probably add the dictionary data to the ColumnVector
0426         // and reuse it across batches. This should mean adding a ByteArray would just update
0427         // the length and offset.
0428         for (int i = rowId; i < rowId + num; ++i) {
0429           if (!column.isNullAt(i)) {
0430             Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i));
0431             column.putByteArray(i, v.getBytes());
0432           }
0433         }
0434         break;
0435       case FIXED_LEN_BYTE_ARRAY:
0436         // DecimalType written in the legacy mode
0437         if (DecimalType.is32BitDecimalType(column.dataType())) {
0438           for (int i = rowId; i < rowId + num; ++i) {
0439             if (!column.isNullAt(i)) {
0440               Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i));
0441               column.putInt(i, (int) ParquetRowConverter.binaryToUnscaledLong(v));
0442             }
0443           }
0444         } else if (DecimalType.is64BitDecimalType(column.dataType())) {
0445           for (int i = rowId; i < rowId + num; ++i) {
0446             if (!column.isNullAt(i)) {
0447               Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i));
0448               column.putLong(i, ParquetRowConverter.binaryToUnscaledLong(v));
0449             }
0450           }
0451         } else if (DecimalType.isByteArrayDecimalType(column.dataType())) {
0452           for (int i = rowId; i < rowId + num; ++i) {
0453             if (!column.isNullAt(i)) {
0454               Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i));
0455               column.putByteArray(i, v.getBytes());
0456             }
0457           }
0458         } else {
0459           throw constructConvertNotSupportedException(descriptor, column);
0460         }
0461         break;
0462 
0463       default:
0464         throw new UnsupportedOperationException(
0465           "Unsupported type: " + descriptor.getPrimitiveType().getPrimitiveTypeName());
0466     }
0467   }
0468 
0469   /**
0470    * For all the read*Batch functions, reads `num` values from this columnReader into column. It
0471    * is guaranteed that num is smaller than the number of values left in the current page.
0472    */
0473 
0474   private void readBooleanBatch(int rowId, int num, WritableColumnVector column)
0475       throws IOException {
0476     if (column.dataType() != DataTypes.BooleanType) {
0477       throw constructConvertNotSupportedException(descriptor, column);
0478     }
0479     defColumn.readBooleans(
0480         num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
0481   }
0482 
0483   private void readIntBatch(int rowId, int num, WritableColumnVector column) throws IOException {
0484     // This is where we implement support for the valid type conversions.
0485     // TODO: implement remaining type conversions
0486     if (column.dataType() == DataTypes.IntegerType ||
0487         DecimalType.is32BitDecimalType(column.dataType())) {
0488       defColumn.readIntegers(
0489           num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
0490     } else if (column.dataType() == DataTypes.ByteType) {
0491       defColumn.readBytes(
0492           num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
0493     } else if (column.dataType() == DataTypes.ShortType) {
0494       defColumn.readShorts(
0495           num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
0496     } else if (column.dataType() == DataTypes.DateType ) {
0497       if ("CORRECTED".equals(datetimeRebaseMode)) {
0498         defColumn.readIntegers(
0499           num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
0500       } else {
0501         boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode);
0502         defColumn.readIntegersWithRebase(
0503           num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn, failIfRebase);
0504       }
0505     } else {
0506       throw constructConvertNotSupportedException(descriptor, column);
0507     }
0508   }
0509 
0510   private void readLongBatch(int rowId, int num, WritableColumnVector column) throws IOException {
0511     // This is where we implement support for the valid type conversions.
0512     if (column.dataType() == DataTypes.LongType ||
0513         DecimalType.is64BitDecimalType(column.dataType())) {
0514       defColumn.readLongs(
0515         num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
0516     } else if (originalType == OriginalType.TIMESTAMP_MICROS) {
0517       if ("CORRECTED".equals(datetimeRebaseMode)) {
0518         defColumn.readLongs(
0519           num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
0520       } else {
0521         boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode);
0522         defColumn.readLongsWithRebase(
0523           num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn, failIfRebase);
0524       }
0525     } else if (originalType == OriginalType.TIMESTAMP_MILLIS) {
0526       if ("CORRECTED".equals(datetimeRebaseMode)) {
0527         for (int i = 0; i < num; i++) {
0528           if (defColumn.readInteger() == maxDefLevel) {
0529             column.putLong(rowId + i, DateTimeUtils.fromMillis(dataColumn.readLong()));
0530           } else {
0531             column.putNull(rowId + i);
0532           }
0533         }
0534       } else {
0535         final boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode);
0536         for (int i = 0; i < num; i++) {
0537           if (defColumn.readInteger() == maxDefLevel) {
0538             long julianMicros = DateTimeUtils.fromMillis(dataColumn.readLong());
0539             column.putLong(rowId + i, rebaseMicros(julianMicros, failIfRebase));
0540           } else {
0541             column.putNull(rowId + i);
0542           }
0543         }
0544       }
0545     } else {
0546       throw constructConvertNotSupportedException(descriptor, column);
0547     }
0548   }
0549 
0550   private void readFloatBatch(int rowId, int num, WritableColumnVector column) throws IOException {
0551     // This is where we implement support for the valid type conversions.
0552     // TODO: support implicit cast to double?
0553     if (column.dataType() == DataTypes.FloatType) {
0554       defColumn.readFloats(
0555           num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
0556     } else {
0557       throw constructConvertNotSupportedException(descriptor, column);
0558     }
0559   }
0560 
0561   private void readDoubleBatch(int rowId, int num, WritableColumnVector column) throws IOException {
0562     // This is where we implement support for the valid type conversions.
0563     // TODO: implement remaining type conversions
0564     if (column.dataType() == DataTypes.DoubleType) {
0565       defColumn.readDoubles(
0566           num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
0567     } else {
0568       throw constructConvertNotSupportedException(descriptor, column);
0569     }
0570   }
0571 
0572   private void readBinaryBatch(int rowId, int num, WritableColumnVector column) throws IOException {
0573     // This is where we implement support for the valid type conversions.
0574     // TODO: implement remaining type conversions
0575     VectorizedValuesReader data = (VectorizedValuesReader) dataColumn;
0576     if (column.dataType() == DataTypes.StringType || column.dataType() == DataTypes.BinaryType
0577             || DecimalType.isByteArrayDecimalType(column.dataType())) {
0578       defColumn.readBinarys(num, column, rowId, maxDefLevel, data);
0579     } else if (column.dataType() == DataTypes.TimestampType) {
0580       if (!shouldConvertTimestamps()) {
0581         for (int i = 0; i < num; i++) {
0582           if (defColumn.readInteger() == maxDefLevel) {
0583             // Read 12 bytes for INT96
0584             long rawTime = ParquetRowConverter.binaryToSQLTimestamp(data.readBinary(12));
0585             column.putLong(rowId + i, rawTime);
0586           } else {
0587             column.putNull(rowId + i);
0588           }
0589         }
0590       } else {
0591         for (int i = 0; i < num; i++) {
0592           if (defColumn.readInteger() == maxDefLevel) {
0593             // Read 12 bytes for INT96
0594             long rawTime = ParquetRowConverter.binaryToSQLTimestamp(data.readBinary(12));
0595             long adjTime = DateTimeUtils.convertTz(rawTime, convertTz, UTC);
0596             column.putLong(rowId + i, adjTime);
0597           } else {
0598             column.putNull(rowId + i);
0599           }
0600         }
0601       }
0602     } else {
0603       throw constructConvertNotSupportedException(descriptor, column);
0604     }
0605   }
0606 
0607   private void readFixedLenByteArrayBatch(
0608       int rowId,
0609       int num,
0610       WritableColumnVector column,
0611       int arrayLen) {
0612     VectorizedValuesReader data = (VectorizedValuesReader) dataColumn;
0613     // This is where we implement support for the valid type conversions.
0614     // TODO: implement remaining type conversions
0615     if (DecimalType.is32BitDecimalType(column.dataType())) {
0616       for (int i = 0; i < num; i++) {
0617         if (defColumn.readInteger() == maxDefLevel) {
0618           column.putInt(rowId + i,
0619               (int) ParquetRowConverter.binaryToUnscaledLong(data.readBinary(arrayLen)));
0620         } else {
0621           column.putNull(rowId + i);
0622         }
0623       }
0624     } else if (DecimalType.is64BitDecimalType(column.dataType())) {
0625       for (int i = 0; i < num; i++) {
0626         if (defColumn.readInteger() == maxDefLevel) {
0627           column.putLong(rowId + i,
0628               ParquetRowConverter.binaryToUnscaledLong(data.readBinary(arrayLen)));
0629         } else {
0630           column.putNull(rowId + i);
0631         }
0632       }
0633     } else if (DecimalType.isByteArrayDecimalType(column.dataType())) {
0634       for (int i = 0; i < num; i++) {
0635         if (defColumn.readInteger() == maxDefLevel) {
0636           column.putByteArray(rowId + i, data.readBinary(arrayLen).getBytes());
0637         } else {
0638           column.putNull(rowId + i);
0639         }
0640       }
0641     } else {
0642       throw constructConvertNotSupportedException(descriptor, column);
0643     }
0644   }
0645 
0646   private void readPage() {
0647     DataPage page = pageReader.readPage();
0648     // TODO: Why is this a visitor?
0649     page.accept(new DataPage.Visitor<Void>() {
0650       @Override
0651       public Void visit(DataPageV1 dataPageV1) {
0652         try {
0653           readPageV1(dataPageV1);
0654           return null;
0655         } catch (IOException e) {
0656           throw new RuntimeException(e);
0657         }
0658       }
0659 
0660       @Override
0661       public Void visit(DataPageV2 dataPageV2) {
0662         try {
0663           readPageV2(dataPageV2);
0664           return null;
0665         } catch (IOException e) {
0666           throw new RuntimeException(e);
0667         }
0668       }
0669     });
0670   }
0671 
0672   private void initDataReader(Encoding dataEncoding, ByteBufferInputStream in) throws IOException {
0673     this.endOfPageValueCount = valuesRead + pageValueCount;
0674     if (dataEncoding.usesDictionary()) {
0675       this.dataColumn = null;
0676       if (dictionary == null) {
0677         throw new IOException(
0678             "could not read page in col " + descriptor +
0679                 " as the dictionary was missing for encoding " + dataEncoding);
0680       }
0681       @SuppressWarnings("deprecation")
0682       Encoding plainDict = Encoding.PLAIN_DICTIONARY; // var to allow warning suppression
0683       if (dataEncoding != plainDict && dataEncoding != Encoding.RLE_DICTIONARY) {
0684         throw new UnsupportedOperationException("Unsupported encoding: " + dataEncoding);
0685       }
0686       this.dataColumn = new VectorizedRleValuesReader();
0687       this.isCurrentPageDictionaryEncoded = true;
0688     } else {
0689       if (dataEncoding != Encoding.PLAIN) {
0690         throw new UnsupportedOperationException("Unsupported encoding: " + dataEncoding);
0691       }
0692       this.dataColumn = new VectorizedPlainValuesReader();
0693       this.isCurrentPageDictionaryEncoded = false;
0694     }
0695 
0696     try {
0697       dataColumn.initFromPage(pageValueCount, in);
0698     } catch (IOException e) {
0699       throw new IOException("could not read page in col " + descriptor, e);
0700     }
0701   }
0702 
0703   private void readPageV1(DataPageV1 page) throws IOException {
0704     this.pageValueCount = page.getValueCount();
0705     ValuesReader rlReader = page.getRlEncoding().getValuesReader(descriptor, REPETITION_LEVEL);
0706     ValuesReader dlReader;
0707 
0708     // Initialize the decoders.
0709     if (page.getDlEncoding() != Encoding.RLE && descriptor.getMaxDefinitionLevel() != 0) {
0710       throw new UnsupportedOperationException("Unsupported encoding: " + page.getDlEncoding());
0711     }
0712     int bitWidth = BytesUtils.getWidthFromMaxInt(descriptor.getMaxDefinitionLevel());
0713     this.defColumn = new VectorizedRleValuesReader(bitWidth);
0714     dlReader = this.defColumn;
0715     this.repetitionLevelColumn = new ValuesReaderIntIterator(rlReader);
0716     this.definitionLevelColumn = new ValuesReaderIntIterator(dlReader);
0717     try {
0718       BytesInput bytes = page.getBytes();
0719       ByteBufferInputStream in = bytes.toInputStream();
0720       rlReader.initFromPage(pageValueCount, in);
0721       dlReader.initFromPage(pageValueCount, in);
0722       initDataReader(page.getValueEncoding(), in);
0723     } catch (IOException e) {
0724       throw new IOException("could not read page " + page + " in col " + descriptor, e);
0725     }
0726   }
0727 
0728   private void readPageV2(DataPageV2 page) throws IOException {
0729     this.pageValueCount = page.getValueCount();
0730     this.repetitionLevelColumn = createRLEIterator(descriptor.getMaxRepetitionLevel(),
0731         page.getRepetitionLevels(), descriptor);
0732 
0733     int bitWidth = BytesUtils.getWidthFromMaxInt(descriptor.getMaxDefinitionLevel());
0734     // do not read the length from the stream. v2 pages handle dividing the page bytes.
0735     this.defColumn = new VectorizedRleValuesReader(bitWidth, false);
0736     this.definitionLevelColumn = new ValuesReaderIntIterator(this.defColumn);
0737     this.defColumn.initFromPage(
0738         this.pageValueCount, page.getDefinitionLevels().toInputStream());
0739     try {
0740       initDataReader(page.getDataEncoding(), page.getData().toInputStream());
0741     } catch (IOException e) {
0742       throw new IOException("could not read page " + page + " in col " + descriptor, e);
0743     }
0744   }
0745 }