0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.sql.execution.datasources.parquet;
0019
0020 import java.io.IOException;
0021 import java.time.ZoneId;
0022 import java.time.ZoneOffset;
0023 import java.util.Arrays;
0024
0025 import org.apache.parquet.bytes.ByteBufferInputStream;
0026 import org.apache.parquet.bytes.BytesInput;
0027 import org.apache.parquet.bytes.BytesUtils;
0028 import org.apache.parquet.column.ColumnDescriptor;
0029 import org.apache.parquet.column.Dictionary;
0030 import org.apache.parquet.column.Encoding;
0031 import org.apache.parquet.column.page.*;
0032 import org.apache.parquet.column.values.ValuesReader;
0033 import org.apache.parquet.io.api.Binary;
0034 import org.apache.parquet.schema.OriginalType;
0035 import org.apache.parquet.schema.PrimitiveType;
0036
0037 import org.apache.spark.sql.catalyst.util.DateTimeUtils;
0038 import org.apache.spark.sql.catalyst.util.RebaseDateTime;
0039 import org.apache.spark.sql.execution.datasources.DataSourceUtils;
0040 import org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException;
0041 import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
0042 import org.apache.spark.sql.types.DataTypes;
0043 import org.apache.spark.sql.types.DecimalType;
0044
0045 import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL;
0046 import static org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase.ValuesReaderIntIterator;
0047 import static org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase.createRLEIterator;
0048
0049
0050
0051
0052 public class VectorizedColumnReader {
0053
0054
0055
0056 private long valuesRead;
0057
0058
0059
0060
0061
0062 private long endOfPageValueCount;
0063
0064
0065
0066
0067 private final Dictionary dictionary;
0068
0069
0070
0071
0072 private boolean isCurrentPageDictionaryEncoded;
0073
0074
0075
0076
0077 private final int maxDefLevel;
0078
0079
0080
0081
0082 private SpecificParquetRecordReaderBase.IntIterator repetitionLevelColumn;
0083 private SpecificParquetRecordReaderBase.IntIterator definitionLevelColumn;
0084 private ValuesReader dataColumn;
0085
0086
0087
0088 private VectorizedRleValuesReader defColumn;
0089
0090
0091
0092
0093 private final long totalValueCount;
0094
0095
0096
0097
0098 private int pageValueCount;
0099
0100 private final PageReader pageReader;
0101 private final ColumnDescriptor descriptor;
0102 private final OriginalType originalType;
0103
0104 private final ZoneId convertTz;
0105 private static final ZoneId UTC = ZoneOffset.UTC;
0106 private final String datetimeRebaseMode;
0107
0108 public VectorizedColumnReader(
0109 ColumnDescriptor descriptor,
0110 OriginalType originalType,
0111 PageReader pageReader,
0112 ZoneId convertTz,
0113 String datetimeRebaseMode) throws IOException {
0114 this.descriptor = descriptor;
0115 this.pageReader = pageReader;
0116 this.convertTz = convertTz;
0117 this.originalType = originalType;
0118 this.maxDefLevel = descriptor.getMaxDefinitionLevel();
0119
0120 DictionaryPage dictionaryPage = pageReader.readDictionaryPage();
0121 if (dictionaryPage != null) {
0122 try {
0123 this.dictionary = dictionaryPage.getEncoding().initDictionary(descriptor, dictionaryPage);
0124 this.isCurrentPageDictionaryEncoded = true;
0125 } catch (IOException e) {
0126 throw new IOException("could not decode the dictionary for " + descriptor, e);
0127 }
0128 } else {
0129 this.dictionary = null;
0130 this.isCurrentPageDictionaryEncoded = false;
0131 }
0132 this.totalValueCount = pageReader.getTotalValueCount();
0133 if (totalValueCount == 0) {
0134 throw new IOException("totalValueCount == 0");
0135 }
0136 assert "LEGACY".equals(datetimeRebaseMode) || "EXCEPTION".equals(datetimeRebaseMode) ||
0137 "CORRECTED".equals(datetimeRebaseMode);
0138 this.datetimeRebaseMode = datetimeRebaseMode;
0139 }
0140
0141
0142
0143
0144 private boolean next() throws IOException {
0145 if (valuesRead >= endOfPageValueCount) {
0146 if (valuesRead >= totalValueCount) {
0147
0148 return false;
0149 }
0150 readPage();
0151 }
0152 ++valuesRead;
0153
0154
0155 return definitionLevelColumn.nextInt() == maxDefLevel;
0156 }
0157
0158 private boolean isLazyDecodingSupported(PrimitiveType.PrimitiveTypeName typeName) {
0159 boolean isSupported = false;
0160 switch (typeName) {
0161 case INT32:
0162 isSupported = originalType != OriginalType.DATE || "CORRECTED".equals(datetimeRebaseMode);
0163 break;
0164 case INT64:
0165 if (originalType == OriginalType.TIMESTAMP_MICROS) {
0166 isSupported = "CORRECTED".equals(datetimeRebaseMode);
0167 } else {
0168 isSupported = originalType != OriginalType.TIMESTAMP_MILLIS;
0169 }
0170 break;
0171 case FLOAT:
0172 case DOUBLE:
0173 case BINARY:
0174 isSupported = true;
0175 break;
0176 }
0177 return isSupported;
0178 }
0179
0180 static int rebaseDays(int julianDays, final boolean failIfRebase) {
0181 if (failIfRebase) {
0182 if (julianDays < RebaseDateTime.lastSwitchJulianDay()) {
0183 throw DataSourceUtils.newRebaseExceptionInRead("Parquet");
0184 } else {
0185 return julianDays;
0186 }
0187 } else {
0188 return RebaseDateTime.rebaseJulianToGregorianDays(julianDays);
0189 }
0190 }
0191
0192 static long rebaseMicros(long julianMicros, final boolean failIfRebase) {
0193 if (failIfRebase) {
0194 if (julianMicros < RebaseDateTime.lastSwitchJulianTs()) {
0195 throw DataSourceUtils.newRebaseExceptionInRead("Parquet");
0196 } else {
0197 return julianMicros;
0198 }
0199 } else {
0200 return RebaseDateTime.rebaseJulianToGregorianMicros(julianMicros);
0201 }
0202 }
0203
0204
0205
0206
0207 void readBatch(int total, WritableColumnVector column) throws IOException {
0208 int rowId = 0;
0209 WritableColumnVector dictionaryIds = null;
0210 if (dictionary != null) {
0211
0212
0213
0214 dictionaryIds = column.reserveDictionaryIds(total);
0215 }
0216 while (total > 0) {
0217
0218 int leftInPage = (int) (endOfPageValueCount - valuesRead);
0219 if (leftInPage == 0) {
0220 readPage();
0221 leftInPage = (int) (endOfPageValueCount - valuesRead);
0222 }
0223 int num = Math.min(total, leftInPage);
0224 PrimitiveType.PrimitiveTypeName typeName =
0225 descriptor.getPrimitiveType().getPrimitiveTypeName();
0226 if (isCurrentPageDictionaryEncoded) {
0227
0228 defColumn.readIntegers(
0229 num, dictionaryIds, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
0230
0231
0232
0233 if (column.hasDictionary() || (rowId == 0 && isLazyDecodingSupported(typeName))) {
0234
0235
0236
0237 column.setDictionary(new ParquetDictionary(dictionary));
0238 } else {
0239 decodeDictionaryIds(rowId, num, column, dictionaryIds);
0240 }
0241 } else {
0242 if (column.hasDictionary() && rowId != 0) {
0243
0244
0245 decodeDictionaryIds(0, rowId, column, column.getDictionaryIds());
0246 }
0247 column.setDictionary(null);
0248 switch (typeName) {
0249 case BOOLEAN:
0250 readBooleanBatch(rowId, num, column);
0251 break;
0252 case INT32:
0253 readIntBatch(rowId, num, column);
0254 break;
0255 case INT64:
0256 readLongBatch(rowId, num, column);
0257 break;
0258 case INT96:
0259 readBinaryBatch(rowId, num, column);
0260 break;
0261 case FLOAT:
0262 readFloatBatch(rowId, num, column);
0263 break;
0264 case DOUBLE:
0265 readDoubleBatch(rowId, num, column);
0266 break;
0267 case BINARY:
0268 readBinaryBatch(rowId, num, column);
0269 break;
0270 case FIXED_LEN_BYTE_ARRAY:
0271 readFixedLenByteArrayBatch(
0272 rowId, num, column, descriptor.getPrimitiveType().getTypeLength());
0273 break;
0274 default:
0275 throw new IOException("Unsupported type: " + typeName);
0276 }
0277 }
0278
0279 valuesRead += num;
0280 rowId += num;
0281 total -= num;
0282 }
0283 }
0284
0285 private boolean shouldConvertTimestamps() {
0286 return convertTz != null && !convertTz.equals(UTC);
0287 }
0288
0289
0290
0291
0292 private SchemaColumnConvertNotSupportedException constructConvertNotSupportedException(
0293 ColumnDescriptor descriptor,
0294 WritableColumnVector column) {
0295 return new SchemaColumnConvertNotSupportedException(
0296 Arrays.toString(descriptor.getPath()),
0297 descriptor.getPrimitiveType().getPrimitiveTypeName().toString(),
0298 column.dataType().catalogString());
0299 }
0300
0301
0302
0303
0304 private void decodeDictionaryIds(
0305 int rowId,
0306 int num,
0307 WritableColumnVector column,
0308 WritableColumnVector dictionaryIds) {
0309 switch (descriptor.getPrimitiveType().getPrimitiveTypeName()) {
0310 case INT32:
0311 if (column.dataType() == DataTypes.IntegerType ||
0312 DecimalType.is32BitDecimalType(column.dataType()) ||
0313 (column.dataType() == DataTypes.DateType && "CORRECTED".equals(datetimeRebaseMode))) {
0314 for (int i = rowId; i < rowId + num; ++i) {
0315 if (!column.isNullAt(i)) {
0316 column.putInt(i, dictionary.decodeToInt(dictionaryIds.getDictId(i)));
0317 }
0318 }
0319 } else if (column.dataType() == DataTypes.ByteType) {
0320 for (int i = rowId; i < rowId + num; ++i) {
0321 if (!column.isNullAt(i)) {
0322 column.putByte(i, (byte) dictionary.decodeToInt(dictionaryIds.getDictId(i)));
0323 }
0324 }
0325 } else if (column.dataType() == DataTypes.ShortType) {
0326 for (int i = rowId; i < rowId + num; ++i) {
0327 if (!column.isNullAt(i)) {
0328 column.putShort(i, (short) dictionary.decodeToInt(dictionaryIds.getDictId(i)));
0329 }
0330 }
0331 } else if (column.dataType() == DataTypes.DateType) {
0332 final boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode);
0333 for (int i = rowId; i < rowId + num; ++i) {
0334 if (!column.isNullAt(i)) {
0335 int julianDays = dictionary.decodeToInt(dictionaryIds.getDictId(i));
0336 column.putInt(i, rebaseDays(julianDays, failIfRebase));
0337 }
0338 }
0339 } else {
0340 throw constructConvertNotSupportedException(descriptor, column);
0341 }
0342 break;
0343
0344 case INT64:
0345 if (column.dataType() == DataTypes.LongType ||
0346 DecimalType.is64BitDecimalType(column.dataType()) ||
0347 (originalType == OriginalType.TIMESTAMP_MICROS &&
0348 "CORRECTED".equals(datetimeRebaseMode))) {
0349 for (int i = rowId; i < rowId + num; ++i) {
0350 if (!column.isNullAt(i)) {
0351 column.putLong(i, dictionary.decodeToLong(dictionaryIds.getDictId(i)));
0352 }
0353 }
0354 } else if (originalType == OriginalType.TIMESTAMP_MILLIS) {
0355 if ("CORRECTED".equals(datetimeRebaseMode)) {
0356 for (int i = rowId; i < rowId + num; ++i) {
0357 if (!column.isNullAt(i)) {
0358 long gregorianMillis = dictionary.decodeToLong(dictionaryIds.getDictId(i));
0359 column.putLong(i, DateTimeUtils.fromMillis(gregorianMillis));
0360 }
0361 }
0362 } else {
0363 final boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode);
0364 for (int i = rowId; i < rowId + num; ++i) {
0365 if (!column.isNullAt(i)) {
0366 long julianMillis = dictionary.decodeToLong(dictionaryIds.getDictId(i));
0367 long julianMicros = DateTimeUtils.fromMillis(julianMillis);
0368 column.putLong(i, rebaseMicros(julianMicros, failIfRebase));
0369 }
0370 }
0371 }
0372 } else if (originalType == OriginalType.TIMESTAMP_MICROS) {
0373 final boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode);
0374 for (int i = rowId; i < rowId + num; ++i) {
0375 if (!column.isNullAt(i)) {
0376 long julianMicros = dictionary.decodeToLong(dictionaryIds.getDictId(i));
0377 column.putLong(i, rebaseMicros(julianMicros, failIfRebase));
0378 }
0379 }
0380 } else {
0381 throw constructConvertNotSupportedException(descriptor, column);
0382 }
0383 break;
0384
0385 case FLOAT:
0386 for (int i = rowId; i < rowId + num; ++i) {
0387 if (!column.isNullAt(i)) {
0388 column.putFloat(i, dictionary.decodeToFloat(dictionaryIds.getDictId(i)));
0389 }
0390 }
0391 break;
0392
0393 case DOUBLE:
0394 for (int i = rowId; i < rowId + num; ++i) {
0395 if (!column.isNullAt(i)) {
0396 column.putDouble(i, dictionary.decodeToDouble(dictionaryIds.getDictId(i)));
0397 }
0398 }
0399 break;
0400 case INT96:
0401 if (column.dataType() == DataTypes.TimestampType) {
0402 if (!shouldConvertTimestamps()) {
0403 for (int i = rowId; i < rowId + num; ++i) {
0404 if (!column.isNullAt(i)) {
0405 Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i));
0406 column.putLong(i, ParquetRowConverter.binaryToSQLTimestamp(v));
0407 }
0408 }
0409 } else {
0410 for (int i = rowId; i < rowId + num; ++i) {
0411 if (!column.isNullAt(i)) {
0412 Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i));
0413 long rawTime = ParquetRowConverter.binaryToSQLTimestamp(v);
0414 long adjTime = DateTimeUtils.convertTz(rawTime, convertTz, UTC);
0415 column.putLong(i, adjTime);
0416 }
0417 }
0418 }
0419 } else {
0420 throw constructConvertNotSupportedException(descriptor, column);
0421 }
0422 break;
0423 case BINARY:
0424
0425
0426
0427
0428 for (int i = rowId; i < rowId + num; ++i) {
0429 if (!column.isNullAt(i)) {
0430 Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i));
0431 column.putByteArray(i, v.getBytes());
0432 }
0433 }
0434 break;
0435 case FIXED_LEN_BYTE_ARRAY:
0436
0437 if (DecimalType.is32BitDecimalType(column.dataType())) {
0438 for (int i = rowId; i < rowId + num; ++i) {
0439 if (!column.isNullAt(i)) {
0440 Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i));
0441 column.putInt(i, (int) ParquetRowConverter.binaryToUnscaledLong(v));
0442 }
0443 }
0444 } else if (DecimalType.is64BitDecimalType(column.dataType())) {
0445 for (int i = rowId; i < rowId + num; ++i) {
0446 if (!column.isNullAt(i)) {
0447 Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i));
0448 column.putLong(i, ParquetRowConverter.binaryToUnscaledLong(v));
0449 }
0450 }
0451 } else if (DecimalType.isByteArrayDecimalType(column.dataType())) {
0452 for (int i = rowId; i < rowId + num; ++i) {
0453 if (!column.isNullAt(i)) {
0454 Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i));
0455 column.putByteArray(i, v.getBytes());
0456 }
0457 }
0458 } else {
0459 throw constructConvertNotSupportedException(descriptor, column);
0460 }
0461 break;
0462
0463 default:
0464 throw new UnsupportedOperationException(
0465 "Unsupported type: " + descriptor.getPrimitiveType().getPrimitiveTypeName());
0466 }
0467 }
0468
0469
0470
0471
0472
0473
0474 private void readBooleanBatch(int rowId, int num, WritableColumnVector column)
0475 throws IOException {
0476 if (column.dataType() != DataTypes.BooleanType) {
0477 throw constructConvertNotSupportedException(descriptor, column);
0478 }
0479 defColumn.readBooleans(
0480 num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
0481 }
0482
0483 private void readIntBatch(int rowId, int num, WritableColumnVector column) throws IOException {
0484
0485
0486 if (column.dataType() == DataTypes.IntegerType ||
0487 DecimalType.is32BitDecimalType(column.dataType())) {
0488 defColumn.readIntegers(
0489 num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
0490 } else if (column.dataType() == DataTypes.ByteType) {
0491 defColumn.readBytes(
0492 num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
0493 } else if (column.dataType() == DataTypes.ShortType) {
0494 defColumn.readShorts(
0495 num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
0496 } else if (column.dataType() == DataTypes.DateType ) {
0497 if ("CORRECTED".equals(datetimeRebaseMode)) {
0498 defColumn.readIntegers(
0499 num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
0500 } else {
0501 boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode);
0502 defColumn.readIntegersWithRebase(
0503 num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn, failIfRebase);
0504 }
0505 } else {
0506 throw constructConvertNotSupportedException(descriptor, column);
0507 }
0508 }
0509
0510 private void readLongBatch(int rowId, int num, WritableColumnVector column) throws IOException {
0511
0512 if (column.dataType() == DataTypes.LongType ||
0513 DecimalType.is64BitDecimalType(column.dataType())) {
0514 defColumn.readLongs(
0515 num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
0516 } else if (originalType == OriginalType.TIMESTAMP_MICROS) {
0517 if ("CORRECTED".equals(datetimeRebaseMode)) {
0518 defColumn.readLongs(
0519 num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
0520 } else {
0521 boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode);
0522 defColumn.readLongsWithRebase(
0523 num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn, failIfRebase);
0524 }
0525 } else if (originalType == OriginalType.TIMESTAMP_MILLIS) {
0526 if ("CORRECTED".equals(datetimeRebaseMode)) {
0527 for (int i = 0; i < num; i++) {
0528 if (defColumn.readInteger() == maxDefLevel) {
0529 column.putLong(rowId + i, DateTimeUtils.fromMillis(dataColumn.readLong()));
0530 } else {
0531 column.putNull(rowId + i);
0532 }
0533 }
0534 } else {
0535 final boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode);
0536 for (int i = 0; i < num; i++) {
0537 if (defColumn.readInteger() == maxDefLevel) {
0538 long julianMicros = DateTimeUtils.fromMillis(dataColumn.readLong());
0539 column.putLong(rowId + i, rebaseMicros(julianMicros, failIfRebase));
0540 } else {
0541 column.putNull(rowId + i);
0542 }
0543 }
0544 }
0545 } else {
0546 throw constructConvertNotSupportedException(descriptor, column);
0547 }
0548 }
0549
0550 private void readFloatBatch(int rowId, int num, WritableColumnVector column) throws IOException {
0551
0552
0553 if (column.dataType() == DataTypes.FloatType) {
0554 defColumn.readFloats(
0555 num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
0556 } else {
0557 throw constructConvertNotSupportedException(descriptor, column);
0558 }
0559 }
0560
0561 private void readDoubleBatch(int rowId, int num, WritableColumnVector column) throws IOException {
0562
0563
0564 if (column.dataType() == DataTypes.DoubleType) {
0565 defColumn.readDoubles(
0566 num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
0567 } else {
0568 throw constructConvertNotSupportedException(descriptor, column);
0569 }
0570 }
0571
0572 private void readBinaryBatch(int rowId, int num, WritableColumnVector column) throws IOException {
0573
0574
0575 VectorizedValuesReader data = (VectorizedValuesReader) dataColumn;
0576 if (column.dataType() == DataTypes.StringType || column.dataType() == DataTypes.BinaryType
0577 || DecimalType.isByteArrayDecimalType(column.dataType())) {
0578 defColumn.readBinarys(num, column, rowId, maxDefLevel, data);
0579 } else if (column.dataType() == DataTypes.TimestampType) {
0580 if (!shouldConvertTimestamps()) {
0581 for (int i = 0; i < num; i++) {
0582 if (defColumn.readInteger() == maxDefLevel) {
0583
0584 long rawTime = ParquetRowConverter.binaryToSQLTimestamp(data.readBinary(12));
0585 column.putLong(rowId + i, rawTime);
0586 } else {
0587 column.putNull(rowId + i);
0588 }
0589 }
0590 } else {
0591 for (int i = 0; i < num; i++) {
0592 if (defColumn.readInteger() == maxDefLevel) {
0593
0594 long rawTime = ParquetRowConverter.binaryToSQLTimestamp(data.readBinary(12));
0595 long adjTime = DateTimeUtils.convertTz(rawTime, convertTz, UTC);
0596 column.putLong(rowId + i, adjTime);
0597 } else {
0598 column.putNull(rowId + i);
0599 }
0600 }
0601 }
0602 } else {
0603 throw constructConvertNotSupportedException(descriptor, column);
0604 }
0605 }
0606
0607 private void readFixedLenByteArrayBatch(
0608 int rowId,
0609 int num,
0610 WritableColumnVector column,
0611 int arrayLen) {
0612 VectorizedValuesReader data = (VectorizedValuesReader) dataColumn;
0613
0614
0615 if (DecimalType.is32BitDecimalType(column.dataType())) {
0616 for (int i = 0; i < num; i++) {
0617 if (defColumn.readInteger() == maxDefLevel) {
0618 column.putInt(rowId + i,
0619 (int) ParquetRowConverter.binaryToUnscaledLong(data.readBinary(arrayLen)));
0620 } else {
0621 column.putNull(rowId + i);
0622 }
0623 }
0624 } else if (DecimalType.is64BitDecimalType(column.dataType())) {
0625 for (int i = 0; i < num; i++) {
0626 if (defColumn.readInteger() == maxDefLevel) {
0627 column.putLong(rowId + i,
0628 ParquetRowConverter.binaryToUnscaledLong(data.readBinary(arrayLen)));
0629 } else {
0630 column.putNull(rowId + i);
0631 }
0632 }
0633 } else if (DecimalType.isByteArrayDecimalType(column.dataType())) {
0634 for (int i = 0; i < num; i++) {
0635 if (defColumn.readInteger() == maxDefLevel) {
0636 column.putByteArray(rowId + i, data.readBinary(arrayLen).getBytes());
0637 } else {
0638 column.putNull(rowId + i);
0639 }
0640 }
0641 } else {
0642 throw constructConvertNotSupportedException(descriptor, column);
0643 }
0644 }
0645
0646 private void readPage() {
0647 DataPage page = pageReader.readPage();
0648
0649 page.accept(new DataPage.Visitor<Void>() {
0650 @Override
0651 public Void visit(DataPageV1 dataPageV1) {
0652 try {
0653 readPageV1(dataPageV1);
0654 return null;
0655 } catch (IOException e) {
0656 throw new RuntimeException(e);
0657 }
0658 }
0659
0660 @Override
0661 public Void visit(DataPageV2 dataPageV2) {
0662 try {
0663 readPageV2(dataPageV2);
0664 return null;
0665 } catch (IOException e) {
0666 throw new RuntimeException(e);
0667 }
0668 }
0669 });
0670 }
0671
0672 private void initDataReader(Encoding dataEncoding, ByteBufferInputStream in) throws IOException {
0673 this.endOfPageValueCount = valuesRead + pageValueCount;
0674 if (dataEncoding.usesDictionary()) {
0675 this.dataColumn = null;
0676 if (dictionary == null) {
0677 throw new IOException(
0678 "could not read page in col " + descriptor +
0679 " as the dictionary was missing for encoding " + dataEncoding);
0680 }
0681 @SuppressWarnings("deprecation")
0682 Encoding plainDict = Encoding.PLAIN_DICTIONARY;
0683 if (dataEncoding != plainDict && dataEncoding != Encoding.RLE_DICTIONARY) {
0684 throw new UnsupportedOperationException("Unsupported encoding: " + dataEncoding);
0685 }
0686 this.dataColumn = new VectorizedRleValuesReader();
0687 this.isCurrentPageDictionaryEncoded = true;
0688 } else {
0689 if (dataEncoding != Encoding.PLAIN) {
0690 throw new UnsupportedOperationException("Unsupported encoding: " + dataEncoding);
0691 }
0692 this.dataColumn = new VectorizedPlainValuesReader();
0693 this.isCurrentPageDictionaryEncoded = false;
0694 }
0695
0696 try {
0697 dataColumn.initFromPage(pageValueCount, in);
0698 } catch (IOException e) {
0699 throw new IOException("could not read page in col " + descriptor, e);
0700 }
0701 }
0702
0703 private void readPageV1(DataPageV1 page) throws IOException {
0704 this.pageValueCount = page.getValueCount();
0705 ValuesReader rlReader = page.getRlEncoding().getValuesReader(descriptor, REPETITION_LEVEL);
0706 ValuesReader dlReader;
0707
0708
0709 if (page.getDlEncoding() != Encoding.RLE && descriptor.getMaxDefinitionLevel() != 0) {
0710 throw new UnsupportedOperationException("Unsupported encoding: " + page.getDlEncoding());
0711 }
0712 int bitWidth = BytesUtils.getWidthFromMaxInt(descriptor.getMaxDefinitionLevel());
0713 this.defColumn = new VectorizedRleValuesReader(bitWidth);
0714 dlReader = this.defColumn;
0715 this.repetitionLevelColumn = new ValuesReaderIntIterator(rlReader);
0716 this.definitionLevelColumn = new ValuesReaderIntIterator(dlReader);
0717 try {
0718 BytesInput bytes = page.getBytes();
0719 ByteBufferInputStream in = bytes.toInputStream();
0720 rlReader.initFromPage(pageValueCount, in);
0721 dlReader.initFromPage(pageValueCount, in);
0722 initDataReader(page.getValueEncoding(), in);
0723 } catch (IOException e) {
0724 throw new IOException("could not read page " + page + " in col " + descriptor, e);
0725 }
0726 }
0727
0728 private void readPageV2(DataPageV2 page) throws IOException {
0729 this.pageValueCount = page.getValueCount();
0730 this.repetitionLevelColumn = createRLEIterator(descriptor.getMaxRepetitionLevel(),
0731 page.getRepetitionLevels(), descriptor);
0732
0733 int bitWidth = BytesUtils.getWidthFromMaxInt(descriptor.getMaxDefinitionLevel());
0734
0735 this.defColumn = new VectorizedRleValuesReader(bitWidth, false);
0736 this.definitionLevelColumn = new ValuesReaderIntIterator(this.defColumn);
0737 this.defColumn.initFromPage(
0738 this.pageValueCount, page.getDefinitionLevels().toInputStream());
0739 try {
0740 initDataReader(page.getDataEncoding(), page.getData().toInputStream());
0741 } catch (IOException e) {
0742 throw new IOException("could not read page " + page + " in col " + descriptor, e);
0743 }
0744 }
0745 }