0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.sql.execution.datasources.parquet;
0019
0020 import java.io.IOException;
0021 import java.nio.ByteBuffer;
0022
0023 import org.apache.parquet.Preconditions;
0024 import org.apache.parquet.bytes.ByteBufferInputStream;
0025 import org.apache.parquet.bytes.BytesUtils;
0026 import org.apache.parquet.column.values.ValuesReader;
0027 import org.apache.parquet.column.values.bitpacking.BytePacker;
0028 import org.apache.parquet.column.values.bitpacking.Packer;
0029 import org.apache.parquet.io.ParquetDecodingException;
0030 import org.apache.parquet.io.api.Binary;
0031
0032 import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
0033
0034
0035
0036
0037
0038
0039
0040
0041
0042
0043
0044 public final class VectorizedRleValuesReader extends ValuesReader
0045 implements VectorizedValuesReader {
0046
0047
0048
0049
0050 private enum MODE {
0051 RLE,
0052 PACKED
0053 }
0054
0055
0056 private ByteBufferInputStream in;
0057
0058
0059 private int bitWidth;
0060 private int bytesWidth;
0061 private BytePacker packer;
0062
0063
0064 private MODE mode;
0065 private int currentCount;
0066 private int currentValue;
0067
0068
0069 private int[] currentBuffer = new int[16];
0070 private int currentBufferIdx = 0;
0071
0072
0073
0074 private final boolean fixedWidth;
0075 private final boolean readLength;
0076
0077 public VectorizedRleValuesReader() {
0078 this.fixedWidth = false;
0079 this.readLength = false;
0080 }
0081
0082 public VectorizedRleValuesReader(int bitWidth) {
0083 this.fixedWidth = true;
0084 this.readLength = bitWidth != 0;
0085 init(bitWidth);
0086 }
0087
0088 public VectorizedRleValuesReader(int bitWidth, boolean readLength) {
0089 this.fixedWidth = true;
0090 this.readLength = readLength;
0091 init(bitWidth);
0092 }
0093
0094 @Override
0095 public void initFromPage(int valueCount, ByteBufferInputStream in) throws IOException {
0096 this.in = in;
0097 if (fixedWidth) {
0098
0099 if (readLength) {
0100 int length = readIntLittleEndian();
0101 this.in = in.sliceStream(length);
0102 }
0103 } else {
0104
0105 if (in.available() > 0) {
0106 init(in.read());
0107 }
0108 }
0109 if (bitWidth == 0) {
0110
0111 this.mode = MODE.RLE;
0112 this.currentCount = valueCount;
0113 this.currentValue = 0;
0114 } else {
0115 this.currentCount = 0;
0116 }
0117 }
0118
0119
0120
0121
0122 private void init(int bitWidth) {
0123 Preconditions.checkArgument(bitWidth >= 0 && bitWidth <= 32, "bitWidth must be >= 0 and <= 32");
0124 this.bitWidth = bitWidth;
0125 this.bytesWidth = BytesUtils.paddedByteCountFromBits(bitWidth);
0126 this.packer = Packer.LITTLE_ENDIAN.newBytePacker(bitWidth);
0127 }
0128
0129 @Override
0130 public boolean readBoolean() {
0131 return this.readInteger() != 0;
0132 }
0133
0134 @Override
0135 public void skip() {
0136 this.readInteger();
0137 }
0138
0139 @Override
0140 public int readValueDictionaryId() {
0141 return readInteger();
0142 }
0143
0144 @Override
0145 public int readInteger() {
0146 if (this.currentCount == 0) { this.readNextGroup(); }
0147
0148 this.currentCount--;
0149 switch (mode) {
0150 case RLE:
0151 return this.currentValue;
0152 case PACKED:
0153 return this.currentBuffer[currentBufferIdx++];
0154 }
0155 throw new RuntimeException("Unreachable");
0156 }
0157
0158
0159
0160
0161
0162
0163
0164
0165
0166
0167
0168
0169
0170
0171
0172 public void readIntegers(
0173 int total,
0174 WritableColumnVector c,
0175 int rowId,
0176 int level,
0177 VectorizedValuesReader data) throws IOException {
0178 int left = total;
0179 while (left > 0) {
0180 if (this.currentCount == 0) this.readNextGroup();
0181 int n = Math.min(left, this.currentCount);
0182 switch (mode) {
0183 case RLE:
0184 if (currentValue == level) {
0185 data.readIntegers(n, c, rowId);
0186 } else {
0187 c.putNulls(rowId, n);
0188 }
0189 break;
0190 case PACKED:
0191 for (int i = 0; i < n; ++i) {
0192 if (currentBuffer[currentBufferIdx++] == level) {
0193 c.putInt(rowId + i, data.readInteger());
0194 } else {
0195 c.putNull(rowId + i);
0196 }
0197 }
0198 break;
0199 }
0200 rowId += n;
0201 left -= n;
0202 currentCount -= n;
0203 }
0204 }
0205
0206
0207
0208 public void readIntegersWithRebase(
0209 int total,
0210 WritableColumnVector c,
0211 int rowId,
0212 int level,
0213 VectorizedValuesReader data,
0214 final boolean failIfRebase) throws IOException {
0215 int left = total;
0216 while (left > 0) {
0217 if (this.currentCount == 0) this.readNextGroup();
0218 int n = Math.min(left, this.currentCount);
0219 switch (mode) {
0220 case RLE:
0221 if (currentValue == level) {
0222 data.readIntegersWithRebase(n, c, rowId, failIfRebase);
0223 } else {
0224 c.putNulls(rowId, n);
0225 }
0226 break;
0227 case PACKED:
0228 for (int i = 0; i < n; ++i) {
0229 if (currentBuffer[currentBufferIdx++] == level) {
0230 int julianDays = data.readInteger();
0231 c.putInt(rowId + i, VectorizedColumnReader.rebaseDays(julianDays, failIfRebase));
0232 } else {
0233 c.putNull(rowId + i);
0234 }
0235 }
0236 break;
0237 }
0238 rowId += n;
0239 left -= n;
0240 currentCount -= n;
0241 }
0242 }
0243
0244
0245 public void readBooleans(
0246 int total,
0247 WritableColumnVector c,
0248 int rowId,
0249 int level,
0250 VectorizedValuesReader data) throws IOException {
0251 int left = total;
0252 while (left > 0) {
0253 if (this.currentCount == 0) this.readNextGroup();
0254 int n = Math.min(left, this.currentCount);
0255 switch (mode) {
0256 case RLE:
0257 if (currentValue == level) {
0258 data.readBooleans(n, c, rowId);
0259 } else {
0260 c.putNulls(rowId, n);
0261 }
0262 break;
0263 case PACKED:
0264 for (int i = 0; i < n; ++i) {
0265 if (currentBuffer[currentBufferIdx++] == level) {
0266 c.putBoolean(rowId + i, data.readBoolean());
0267 } else {
0268 c.putNull(rowId + i);
0269 }
0270 }
0271 break;
0272 }
0273 rowId += n;
0274 left -= n;
0275 currentCount -= n;
0276 }
0277 }
0278
0279 public void readBytes(
0280 int total,
0281 WritableColumnVector c,
0282 int rowId,
0283 int level,
0284 VectorizedValuesReader data) throws IOException {
0285 int left = total;
0286 while (left > 0) {
0287 if (this.currentCount == 0) this.readNextGroup();
0288 int n = Math.min(left, this.currentCount);
0289 switch (mode) {
0290 case RLE:
0291 if (currentValue == level) {
0292 data.readBytes(n, c, rowId);
0293 } else {
0294 c.putNulls(rowId, n);
0295 }
0296 break;
0297 case PACKED:
0298 for (int i = 0; i < n; ++i) {
0299 if (currentBuffer[currentBufferIdx++] == level) {
0300 c.putByte(rowId + i, data.readByte());
0301 } else {
0302 c.putNull(rowId + i);
0303 }
0304 }
0305 break;
0306 }
0307 rowId += n;
0308 left -= n;
0309 currentCount -= n;
0310 }
0311 }
0312
0313 public void readShorts(
0314 int total,
0315 WritableColumnVector c,
0316 int rowId,
0317 int level,
0318 VectorizedValuesReader data) throws IOException {
0319 int left = total;
0320 while (left > 0) {
0321 if (this.currentCount == 0) this.readNextGroup();
0322 int n = Math.min(left, this.currentCount);
0323 switch (mode) {
0324 case RLE:
0325 if (currentValue == level) {
0326 for (int i = 0; i < n; i++) {
0327 c.putShort(rowId + i, (short)data.readInteger());
0328 }
0329 } else {
0330 c.putNulls(rowId, n);
0331 }
0332 break;
0333 case PACKED:
0334 for (int i = 0; i < n; ++i) {
0335 if (currentBuffer[currentBufferIdx++] == level) {
0336 c.putShort(rowId + i, (short)data.readInteger());
0337 } else {
0338 c.putNull(rowId + i);
0339 }
0340 }
0341 break;
0342 }
0343 rowId += n;
0344 left -= n;
0345 currentCount -= n;
0346 }
0347 }
0348
0349 public void readLongs(
0350 int total,
0351 WritableColumnVector c,
0352 int rowId,
0353 int level,
0354 VectorizedValuesReader data) throws IOException {
0355 int left = total;
0356 while (left > 0) {
0357 if (this.currentCount == 0) this.readNextGroup();
0358 int n = Math.min(left, this.currentCount);
0359 switch (mode) {
0360 case RLE:
0361 if (currentValue == level) {
0362 data.readLongs(n, c, rowId);
0363 } else {
0364 c.putNulls(rowId, n);
0365 }
0366 break;
0367 case PACKED:
0368 for (int i = 0; i < n; ++i) {
0369 if (currentBuffer[currentBufferIdx++] == level) {
0370 c.putLong(rowId + i, data.readLong());
0371 } else {
0372 c.putNull(rowId + i);
0373 }
0374 }
0375 break;
0376 }
0377 rowId += n;
0378 left -= n;
0379 currentCount -= n;
0380 }
0381 }
0382
0383
0384
0385 public void readLongsWithRebase(
0386 int total,
0387 WritableColumnVector c,
0388 int rowId,
0389 int level,
0390 VectorizedValuesReader data,
0391 final boolean failIfRebase) throws IOException {
0392 int left = total;
0393 while (left > 0) {
0394 if (this.currentCount == 0) this.readNextGroup();
0395 int n = Math.min(left, this.currentCount);
0396 switch (mode) {
0397 case RLE:
0398 if (currentValue == level) {
0399 data.readLongsWithRebase(n, c, rowId, failIfRebase);
0400 } else {
0401 c.putNulls(rowId, n);
0402 }
0403 break;
0404 case PACKED:
0405 for (int i = 0; i < n; ++i) {
0406 if (currentBuffer[currentBufferIdx++] == level) {
0407 long julianMicros = data.readLong();
0408 c.putLong(rowId + i, VectorizedColumnReader.rebaseMicros(julianMicros, failIfRebase));
0409 } else {
0410 c.putNull(rowId + i);
0411 }
0412 }
0413 break;
0414 }
0415 rowId += n;
0416 left -= n;
0417 currentCount -= n;
0418 }
0419 }
0420
0421 public void readFloats(
0422 int total,
0423 WritableColumnVector c,
0424 int rowId,
0425 int level,
0426 VectorizedValuesReader data) throws IOException {
0427 int left = total;
0428 while (left > 0) {
0429 if (this.currentCount == 0) this.readNextGroup();
0430 int n = Math.min(left, this.currentCount);
0431 switch (mode) {
0432 case RLE:
0433 if (currentValue == level) {
0434 data.readFloats(n, c, rowId);
0435 } else {
0436 c.putNulls(rowId, n);
0437 }
0438 break;
0439 case PACKED:
0440 for (int i = 0; i < n; ++i) {
0441 if (currentBuffer[currentBufferIdx++] == level) {
0442 c.putFloat(rowId + i, data.readFloat());
0443 } else {
0444 c.putNull(rowId + i);
0445 }
0446 }
0447 break;
0448 }
0449 rowId += n;
0450 left -= n;
0451 currentCount -= n;
0452 }
0453 }
0454
0455 public void readDoubles(
0456 int total,
0457 WritableColumnVector c,
0458 int rowId,
0459 int level,
0460 VectorizedValuesReader data) throws IOException {
0461 int left = total;
0462 while (left > 0) {
0463 if (this.currentCount == 0) this.readNextGroup();
0464 int n = Math.min(left, this.currentCount);
0465 switch (mode) {
0466 case RLE:
0467 if (currentValue == level) {
0468 data.readDoubles(n, c, rowId);
0469 } else {
0470 c.putNulls(rowId, n);
0471 }
0472 break;
0473 case PACKED:
0474 for (int i = 0; i < n; ++i) {
0475 if (currentBuffer[currentBufferIdx++] == level) {
0476 c.putDouble(rowId + i, data.readDouble());
0477 } else {
0478 c.putNull(rowId + i);
0479 }
0480 }
0481 break;
0482 }
0483 rowId += n;
0484 left -= n;
0485 currentCount -= n;
0486 }
0487 }
0488
0489 public void readBinarys(
0490 int total,
0491 WritableColumnVector c,
0492 int rowId,
0493 int level,
0494 VectorizedValuesReader data) throws IOException {
0495 int left = total;
0496 while (left > 0) {
0497 if (this.currentCount == 0) this.readNextGroup();
0498 int n = Math.min(left, this.currentCount);
0499 switch (mode) {
0500 case RLE:
0501 if (currentValue == level) {
0502 data.readBinary(n, c, rowId);
0503 } else {
0504 c.putNulls(rowId, n);
0505 }
0506 break;
0507 case PACKED:
0508 for (int i = 0; i < n; ++i) {
0509 if (currentBuffer[currentBufferIdx++] == level) {
0510 data.readBinary(1, c, rowId + i);
0511 } else {
0512 c.putNull(rowId + i);
0513 }
0514 }
0515 break;
0516 }
0517 rowId += n;
0518 left -= n;
0519 currentCount -= n;
0520 }
0521 }
0522
0523
0524
0525
0526
0527 public void readIntegers(
0528 int total,
0529 WritableColumnVector values,
0530 WritableColumnVector nulls,
0531 int rowId,
0532 int level,
0533 VectorizedValuesReader data) throws IOException {
0534 int left = total;
0535 while (left > 0) {
0536 if (this.currentCount == 0) this.readNextGroup();
0537 int n = Math.min(left, this.currentCount);
0538 switch (mode) {
0539 case RLE:
0540 if (currentValue == level) {
0541 data.readIntegers(n, values, rowId);
0542 } else {
0543 nulls.putNulls(rowId, n);
0544 }
0545 break;
0546 case PACKED:
0547 for (int i = 0; i < n; ++i) {
0548 if (currentBuffer[currentBufferIdx++] == level) {
0549 values.putInt(rowId + i, data.readInteger());
0550 } else {
0551 nulls.putNull(rowId + i);
0552 }
0553 }
0554 break;
0555 }
0556 rowId += n;
0557 left -= n;
0558 currentCount -= n;
0559 }
0560 }
0561
0562
0563
0564
0565
0566 @Override
0567 public void readIntegers(int total, WritableColumnVector c, int rowId) {
0568 int left = total;
0569 while (left > 0) {
0570 if (this.currentCount == 0) this.readNextGroup();
0571 int n = Math.min(left, this.currentCount);
0572 switch (mode) {
0573 case RLE:
0574 c.putInts(rowId, n, currentValue);
0575 break;
0576 case PACKED:
0577 c.putInts(rowId, n, currentBuffer, currentBufferIdx);
0578 currentBufferIdx += n;
0579 break;
0580 }
0581 rowId += n;
0582 left -= n;
0583 currentCount -= n;
0584 }
0585 }
0586
0587 @Override
0588 public void readIntegersWithRebase(
0589 int total, WritableColumnVector c, int rowId, boolean failIfRebase) {
0590 throw new UnsupportedOperationException("only readInts is valid.");
0591 }
0592
0593 @Override
0594 public byte readByte() {
0595 throw new UnsupportedOperationException("only readInts is valid.");
0596 }
0597
0598 @Override
0599 public void readBytes(int total, WritableColumnVector c, int rowId) {
0600 throw new UnsupportedOperationException("only readInts is valid.");
0601 }
0602
0603 @Override
0604 public void readLongs(int total, WritableColumnVector c, int rowId) {
0605 throw new UnsupportedOperationException("only readInts is valid.");
0606 }
0607
0608 @Override
0609 public void readLongsWithRebase(
0610 int total, WritableColumnVector c, int rowId, boolean failIfRebase) {
0611 throw new UnsupportedOperationException("only readInts is valid.");
0612 }
0613
0614 @Override
0615 public void readBinary(int total, WritableColumnVector c, int rowId) {
0616 throw new UnsupportedOperationException("only readInts is valid.");
0617 }
0618
0619 @Override
0620 public void readBooleans(int total, WritableColumnVector c, int rowId) {
0621 throw new UnsupportedOperationException("only readInts is valid.");
0622 }
0623
0624 @Override
0625 public void readFloats(int total, WritableColumnVector c, int rowId) {
0626 throw new UnsupportedOperationException("only readInts is valid.");
0627 }
0628
0629 @Override
0630 public void readDoubles(int total, WritableColumnVector c, int rowId) {
0631 throw new UnsupportedOperationException("only readInts is valid.");
0632 }
0633
0634 @Override
0635 public Binary readBinary(int len) {
0636 throw new UnsupportedOperationException("only readInts is valid.");
0637 }
0638
0639
0640
0641
0642 private int readUnsignedVarInt() throws IOException {
0643 int value = 0;
0644 int shift = 0;
0645 int b;
0646 do {
0647 b = in.read();
0648 value |= (b & 0x7F) << shift;
0649 shift += 7;
0650 } while ((b & 0x80) != 0);
0651 return value;
0652 }
0653
0654
0655
0656
0657 private int readIntLittleEndian() throws IOException {
0658 int ch4 = in.read();
0659 int ch3 = in.read();
0660 int ch2 = in.read();
0661 int ch1 = in.read();
0662 return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0));
0663 }
0664
0665
0666
0667
0668 private int readIntLittleEndianPaddedOnBitWidth() throws IOException {
0669 switch (bytesWidth) {
0670 case 0:
0671 return 0;
0672 case 1:
0673 return in.read();
0674 case 2: {
0675 int ch2 = in.read();
0676 int ch1 = in.read();
0677 return (ch1 << 8) + ch2;
0678 }
0679 case 3: {
0680 int ch3 = in.read();
0681 int ch2 = in.read();
0682 int ch1 = in.read();
0683 return (ch1 << 16) + (ch2 << 8) + (ch3 << 0);
0684 }
0685 case 4: {
0686 return readIntLittleEndian();
0687 }
0688 }
0689 throw new RuntimeException("Unreachable");
0690 }
0691
0692 private int ceil8(int value) {
0693 return (value + 7) / 8;
0694 }
0695
0696
0697
0698
0699 private void readNextGroup() {
0700 try {
0701 int header = readUnsignedVarInt();
0702 this.mode = (header & 1) == 0 ? MODE.RLE : MODE.PACKED;
0703 switch (mode) {
0704 case RLE:
0705 this.currentCount = header >>> 1;
0706 this.currentValue = readIntLittleEndianPaddedOnBitWidth();
0707 return;
0708 case PACKED:
0709 int numGroups = header >>> 1;
0710 this.currentCount = numGroups * 8;
0711
0712 if (this.currentBuffer.length < this.currentCount) {
0713 this.currentBuffer = new int[this.currentCount];
0714 }
0715 currentBufferIdx = 0;
0716 int valueIndex = 0;
0717 while (valueIndex < this.currentCount) {
0718
0719 ByteBuffer buffer = in.slice(bitWidth);
0720 this.packer.unpack8Values(buffer, buffer.position(), this.currentBuffer, valueIndex);
0721 valueIndex += 8;
0722 }
0723 return;
0724 default:
0725 throw new ParquetDecodingException("not a valid mode " + this.mode);
0726 }
0727 } catch (IOException e) {
0728 throw new ParquetDecodingException("Failed to read from input stream", e);
0729 }
0730 }
0731 }