Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.sql.execution.datasources.parquet;
0019 
0020 import java.io.IOException;
0021 import java.nio.ByteBuffer;
0022 
0023 import org.apache.parquet.Preconditions;
0024 import org.apache.parquet.bytes.ByteBufferInputStream;
0025 import org.apache.parquet.bytes.BytesUtils;
0026 import org.apache.parquet.column.values.ValuesReader;
0027 import org.apache.parquet.column.values.bitpacking.BytePacker;
0028 import org.apache.parquet.column.values.bitpacking.Packer;
0029 import org.apache.parquet.io.ParquetDecodingException;
0030 import org.apache.parquet.io.api.Binary;
0031 
0032 import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
0033 
0034 /**
0035  * A values reader for Parquet's run-length encoded data. This is based off of the version in
0036  * parquet-mr with these changes:
0037  *  - Supports the vectorized interface.
0038  *  - Works on byte arrays(byte[]) instead of making byte streams.
0039  *
0040  * This encoding is used in multiple places:
0041  *  - Definition/Repetition levels
0042  *  - Dictionary ids.
0043  */
0044 public final class VectorizedRleValuesReader extends ValuesReader
0045     implements VectorizedValuesReader {
0046   // Current decoding mode. The encoded data contains groups of either run length encoded data
0047   // (RLE) or bit packed data. Each group contains a header that indicates which group it is and
0048   // the number of values in the group.
0049   // More details here: https://github.com/Parquet/parquet-format/blob/master/Encodings.md
0050   private enum MODE {
0051     RLE,
0052     PACKED
0053   }
0054 
0055   // Encoded data.
0056   private ByteBufferInputStream in;
0057 
0058   // bit/byte width of decoded data and utility to batch unpack them.
0059   private int bitWidth;
0060   private int bytesWidth;
0061   private BytePacker packer;
0062 
0063   // Current decoding mode and values
0064   private MODE mode;
0065   private int currentCount;
0066   private int currentValue;
0067 
0068   // Buffer of decoded values if the values are PACKED.
0069   private int[] currentBuffer = new int[16];
0070   private int currentBufferIdx = 0;
0071 
0072   // If true, the bit width is fixed. This decoder is used in different places and this also
0073   // controls if we need to read the bitwidth from the beginning of the data stream.
0074   private final boolean fixedWidth;
0075   private final boolean readLength;
0076 
0077   public VectorizedRleValuesReader() {
0078     this.fixedWidth = false;
0079     this.readLength = false;
0080   }
0081 
0082   public VectorizedRleValuesReader(int bitWidth) {
0083     this.fixedWidth = true;
0084     this.readLength = bitWidth != 0;
0085     init(bitWidth);
0086   }
0087 
0088   public VectorizedRleValuesReader(int bitWidth, boolean readLength) {
0089     this.fixedWidth = true;
0090     this.readLength = readLength;
0091     init(bitWidth);
0092   }
0093 
0094   @Override
0095   public void initFromPage(int valueCount, ByteBufferInputStream in) throws IOException {
0096     this.in = in;
0097     if (fixedWidth) {
0098       // initialize for repetition and definition levels
0099       if (readLength) {
0100         int length = readIntLittleEndian();
0101         this.in = in.sliceStream(length);
0102       }
0103     } else {
0104       // initialize for values
0105       if (in.available() > 0) {
0106         init(in.read());
0107       }
0108     }
0109     if (bitWidth == 0) {
0110       // 0 bit width, treat this as an RLE run of valueCount number of 0's.
0111       this.mode = MODE.RLE;
0112       this.currentCount = valueCount;
0113       this.currentValue = 0;
0114     } else {
0115       this.currentCount = 0;
0116     }
0117   }
0118 
0119   /**
0120    * Initializes the internal state for decoding ints of `bitWidth`.
0121    */
0122   private void init(int bitWidth) {
0123     Preconditions.checkArgument(bitWidth >= 0 && bitWidth <= 32, "bitWidth must be >= 0 and <= 32");
0124     this.bitWidth = bitWidth;
0125     this.bytesWidth = BytesUtils.paddedByteCountFromBits(bitWidth);
0126     this.packer = Packer.LITTLE_ENDIAN.newBytePacker(bitWidth);
0127   }
0128 
0129   @Override
0130   public boolean readBoolean() {
0131     return this.readInteger() != 0;
0132   }
0133 
0134   @Override
0135   public void skip() {
0136     this.readInteger();
0137   }
0138 
0139   @Override
0140   public int readValueDictionaryId() {
0141     return readInteger();
0142   }
0143 
0144   @Override
0145   public int readInteger() {
0146     if (this.currentCount == 0) { this.readNextGroup(); }
0147 
0148     this.currentCount--;
0149     switch (mode) {
0150       case RLE:
0151         return this.currentValue;
0152       case PACKED:
0153         return this.currentBuffer[currentBufferIdx++];
0154     }
0155     throw new RuntimeException("Unreachable");
0156   }
0157 
0158   /**
0159    * Reads `total` ints into `c` filling them in starting at `c[rowId]`. This reader
0160    * reads the definition levels and then will read from `data` for the non-null values.
0161    * If the value is null, c will be populated with `nullValue`. Note that `nullValue` is only
0162    * necessary for readIntegers because we also use it to decode dictionaryIds and want to make
0163    * sure it always has a value in range.
0164    *
0165    * This is a batched version of this logic:
0166    *  if (this.readInt() == level) {
0167    *    c[rowId] = data.readInteger();
0168    *  } else {
0169    *    c[rowId] = null;
0170    *  }
0171    */
0172   public void readIntegers(
0173       int total,
0174       WritableColumnVector c,
0175       int rowId,
0176       int level,
0177       VectorizedValuesReader data) throws IOException {
0178     int left = total;
0179     while (left > 0) {
0180       if (this.currentCount == 0) this.readNextGroup();
0181       int n = Math.min(left, this.currentCount);
0182       switch (mode) {
0183         case RLE:
0184           if (currentValue == level) {
0185             data.readIntegers(n, c, rowId);
0186           } else {
0187             c.putNulls(rowId, n);
0188           }
0189           break;
0190         case PACKED:
0191           for (int i = 0; i < n; ++i) {
0192             if (currentBuffer[currentBufferIdx++] == level) {
0193               c.putInt(rowId + i, data.readInteger());
0194             } else {
0195               c.putNull(rowId + i);
0196             }
0197           }
0198           break;
0199       }
0200       rowId += n;
0201       left -= n;
0202       currentCount -= n;
0203     }
0204   }
0205 
0206   // A fork of `readIntegers`, which rebases the date int value (days) before filling
0207   // the Spark column vector.
0208   public void readIntegersWithRebase(
0209       int total,
0210       WritableColumnVector c,
0211       int rowId,
0212       int level,
0213       VectorizedValuesReader data,
0214       final boolean failIfRebase) throws IOException {
0215     int left = total;
0216     while (left > 0) {
0217       if (this.currentCount == 0) this.readNextGroup();
0218       int n = Math.min(left, this.currentCount);
0219       switch (mode) {
0220         case RLE:
0221           if (currentValue == level) {
0222             data.readIntegersWithRebase(n, c, rowId, failIfRebase);
0223           } else {
0224             c.putNulls(rowId, n);
0225           }
0226           break;
0227         case PACKED:
0228           for (int i = 0; i < n; ++i) {
0229             if (currentBuffer[currentBufferIdx++] == level) {
0230               int julianDays = data.readInteger();
0231               c.putInt(rowId + i, VectorizedColumnReader.rebaseDays(julianDays, failIfRebase));
0232             } else {
0233               c.putNull(rowId + i);
0234             }
0235           }
0236           break;
0237       }
0238       rowId += n;
0239       left -= n;
0240       currentCount -= n;
0241     }
0242   }
0243 
0244   // TODO: can this code duplication be removed without a perf penalty?
0245   public void readBooleans(
0246       int total,
0247       WritableColumnVector c,
0248       int rowId,
0249       int level,
0250       VectorizedValuesReader data) throws IOException {
0251     int left = total;
0252     while (left > 0) {
0253       if (this.currentCount == 0) this.readNextGroup();
0254       int n = Math.min(left, this.currentCount);
0255       switch (mode) {
0256         case RLE:
0257           if (currentValue == level) {
0258             data.readBooleans(n, c, rowId);
0259           } else {
0260             c.putNulls(rowId, n);
0261           }
0262           break;
0263         case PACKED:
0264           for (int i = 0; i < n; ++i) {
0265             if (currentBuffer[currentBufferIdx++] == level) {
0266               c.putBoolean(rowId + i, data.readBoolean());
0267             } else {
0268               c.putNull(rowId + i);
0269             }
0270           }
0271           break;
0272       }
0273       rowId += n;
0274       left -= n;
0275       currentCount -= n;
0276     }
0277   }
0278 
0279   public void readBytes(
0280       int total,
0281       WritableColumnVector c,
0282       int rowId,
0283       int level,
0284       VectorizedValuesReader data) throws IOException {
0285     int left = total;
0286     while (left > 0) {
0287       if (this.currentCount == 0) this.readNextGroup();
0288       int n = Math.min(left, this.currentCount);
0289       switch (mode) {
0290         case RLE:
0291           if (currentValue == level) {
0292             data.readBytes(n, c, rowId);
0293           } else {
0294             c.putNulls(rowId, n);
0295           }
0296           break;
0297         case PACKED:
0298           for (int i = 0; i < n; ++i) {
0299             if (currentBuffer[currentBufferIdx++] == level) {
0300               c.putByte(rowId + i, data.readByte());
0301             } else {
0302               c.putNull(rowId + i);
0303             }
0304           }
0305           break;
0306       }
0307       rowId += n;
0308       left -= n;
0309       currentCount -= n;
0310     }
0311   }
0312 
0313   public void readShorts(
0314       int total,
0315       WritableColumnVector c,
0316       int rowId,
0317       int level,
0318       VectorizedValuesReader data) throws IOException {
0319     int left = total;
0320     while (left > 0) {
0321       if (this.currentCount == 0) this.readNextGroup();
0322       int n = Math.min(left, this.currentCount);
0323       switch (mode) {
0324         case RLE:
0325           if (currentValue == level) {
0326             for (int i = 0; i < n; i++) {
0327               c.putShort(rowId + i, (short)data.readInteger());
0328             }
0329           } else {
0330             c.putNulls(rowId, n);
0331           }
0332           break;
0333         case PACKED:
0334           for (int i = 0; i < n; ++i) {
0335             if (currentBuffer[currentBufferIdx++] == level) {
0336               c.putShort(rowId + i, (short)data.readInteger());
0337             } else {
0338               c.putNull(rowId + i);
0339             }
0340           }
0341           break;
0342       }
0343       rowId += n;
0344       left -= n;
0345       currentCount -= n;
0346     }
0347   }
0348 
0349   public void readLongs(
0350       int total,
0351       WritableColumnVector c,
0352       int rowId,
0353       int level,
0354       VectorizedValuesReader data) throws IOException {
0355     int left = total;
0356     while (left > 0) {
0357       if (this.currentCount == 0) this.readNextGroup();
0358       int n = Math.min(left, this.currentCount);
0359       switch (mode) {
0360         case RLE:
0361           if (currentValue == level) {
0362             data.readLongs(n, c, rowId);
0363           } else {
0364             c.putNulls(rowId, n);
0365           }
0366           break;
0367         case PACKED:
0368           for (int i = 0; i < n; ++i) {
0369             if (currentBuffer[currentBufferIdx++] == level) {
0370               c.putLong(rowId + i, data.readLong());
0371             } else {
0372               c.putNull(rowId + i);
0373             }
0374           }
0375           break;
0376       }
0377       rowId += n;
0378       left -= n;
0379       currentCount -= n;
0380     }
0381   }
0382 
0383   // A fork of `readLongs`, which rebases the timestamp long value (microseconds) before filling
0384   // the Spark column vector.
0385   public void readLongsWithRebase(
0386       int total,
0387       WritableColumnVector c,
0388       int rowId,
0389       int level,
0390       VectorizedValuesReader data,
0391       final boolean failIfRebase) throws IOException {
0392     int left = total;
0393     while (left > 0) {
0394       if (this.currentCount == 0) this.readNextGroup();
0395       int n = Math.min(left, this.currentCount);
0396       switch (mode) {
0397         case RLE:
0398           if (currentValue == level) {
0399             data.readLongsWithRebase(n, c, rowId, failIfRebase);
0400           } else {
0401             c.putNulls(rowId, n);
0402           }
0403           break;
0404         case PACKED:
0405           for (int i = 0; i < n; ++i) {
0406             if (currentBuffer[currentBufferIdx++] == level) {
0407               long julianMicros = data.readLong();
0408               c.putLong(rowId + i, VectorizedColumnReader.rebaseMicros(julianMicros, failIfRebase));
0409             } else {
0410               c.putNull(rowId + i);
0411             }
0412           }
0413           break;
0414       }
0415       rowId += n;
0416       left -= n;
0417       currentCount -= n;
0418     }
0419   }
0420 
0421   public void readFloats(
0422       int total,
0423       WritableColumnVector c,
0424       int rowId,
0425       int level,
0426       VectorizedValuesReader data) throws IOException {
0427     int left = total;
0428     while (left > 0) {
0429       if (this.currentCount == 0) this.readNextGroup();
0430       int n = Math.min(left, this.currentCount);
0431       switch (mode) {
0432         case RLE:
0433           if (currentValue == level) {
0434             data.readFloats(n, c, rowId);
0435           } else {
0436             c.putNulls(rowId, n);
0437           }
0438           break;
0439         case PACKED:
0440           for (int i = 0; i < n; ++i) {
0441             if (currentBuffer[currentBufferIdx++] == level) {
0442               c.putFloat(rowId + i, data.readFloat());
0443             } else {
0444               c.putNull(rowId + i);
0445             }
0446           }
0447           break;
0448       }
0449       rowId += n;
0450       left -= n;
0451       currentCount -= n;
0452     }
0453   }
0454 
0455   public void readDoubles(
0456       int total,
0457       WritableColumnVector c,
0458       int rowId,
0459       int level,
0460       VectorizedValuesReader data) throws IOException {
0461     int left = total;
0462     while (left > 0) {
0463       if (this.currentCount == 0) this.readNextGroup();
0464       int n = Math.min(left, this.currentCount);
0465       switch (mode) {
0466         case RLE:
0467           if (currentValue == level) {
0468             data.readDoubles(n, c, rowId);
0469           } else {
0470             c.putNulls(rowId, n);
0471           }
0472           break;
0473         case PACKED:
0474           for (int i = 0; i < n; ++i) {
0475             if (currentBuffer[currentBufferIdx++] == level) {
0476               c.putDouble(rowId + i, data.readDouble());
0477             } else {
0478               c.putNull(rowId + i);
0479             }
0480           }
0481           break;
0482       }
0483       rowId += n;
0484       left -= n;
0485       currentCount -= n;
0486     }
0487   }
0488 
0489   public void readBinarys(
0490       int total,
0491       WritableColumnVector c,
0492       int rowId,
0493       int level,
0494       VectorizedValuesReader data) throws IOException {
0495     int left = total;
0496     while (left > 0) {
0497       if (this.currentCount == 0) this.readNextGroup();
0498       int n = Math.min(left, this.currentCount);
0499       switch (mode) {
0500         case RLE:
0501           if (currentValue == level) {
0502             data.readBinary(n, c, rowId);
0503           } else {
0504             c.putNulls(rowId, n);
0505           }
0506           break;
0507         case PACKED:
0508           for (int i = 0; i < n; ++i) {
0509             if (currentBuffer[currentBufferIdx++] == level) {
0510               data.readBinary(1, c, rowId + i);
0511             } else {
0512               c.putNull(rowId + i);
0513             }
0514           }
0515           break;
0516       }
0517       rowId += n;
0518       left -= n;
0519       currentCount -= n;
0520     }
0521   }
0522 
0523   /**
0524    * Decoding for dictionary ids. The IDs are populated into `values` and the nullability is
0525    * populated into `nulls`.
0526    */
0527   public void readIntegers(
0528       int total,
0529       WritableColumnVector values,
0530       WritableColumnVector nulls,
0531       int rowId,
0532       int level,
0533       VectorizedValuesReader data) throws IOException {
0534     int left = total;
0535     while (left > 0) {
0536       if (this.currentCount == 0) this.readNextGroup();
0537       int n = Math.min(left, this.currentCount);
0538       switch (mode) {
0539         case RLE:
0540           if (currentValue == level) {
0541             data.readIntegers(n, values, rowId);
0542           } else {
0543             nulls.putNulls(rowId, n);
0544           }
0545           break;
0546         case PACKED:
0547           for (int i = 0; i < n; ++i) {
0548             if (currentBuffer[currentBufferIdx++] == level) {
0549               values.putInt(rowId + i, data.readInteger());
0550             } else {
0551               nulls.putNull(rowId + i);
0552             }
0553           }
0554           break;
0555       }
0556       rowId += n;
0557       left -= n;
0558       currentCount -= n;
0559     }
0560   }
0561 
0562 
0563   // The RLE reader implements the vectorized decoding interface when used to decode dictionary
0564   // IDs. This is different than the above APIs that decodes definitions levels along with values.
0565   // Since this is only used to decode dictionary IDs, only decoding integers is supported.
0566   @Override
0567   public void readIntegers(int total, WritableColumnVector c, int rowId) {
0568     int left = total;
0569     while (left > 0) {
0570       if (this.currentCount == 0) this.readNextGroup();
0571       int n = Math.min(left, this.currentCount);
0572       switch (mode) {
0573         case RLE:
0574           c.putInts(rowId, n, currentValue);
0575           break;
0576         case PACKED:
0577           c.putInts(rowId, n, currentBuffer, currentBufferIdx);
0578           currentBufferIdx += n;
0579           break;
0580       }
0581       rowId += n;
0582       left -= n;
0583       currentCount -= n;
0584     }
0585   }
0586 
0587   @Override
0588   public void readIntegersWithRebase(
0589       int total, WritableColumnVector c, int rowId, boolean failIfRebase) {
0590     throw new UnsupportedOperationException("only readInts is valid.");
0591   }
0592 
0593   @Override
0594   public byte readByte() {
0595     throw new UnsupportedOperationException("only readInts is valid.");
0596   }
0597 
0598   @Override
0599   public void readBytes(int total, WritableColumnVector c, int rowId) {
0600     throw new UnsupportedOperationException("only readInts is valid.");
0601   }
0602 
0603   @Override
0604   public void readLongs(int total, WritableColumnVector c, int rowId) {
0605     throw new UnsupportedOperationException("only readInts is valid.");
0606   }
0607 
0608   @Override
0609   public void readLongsWithRebase(
0610       int total, WritableColumnVector c, int rowId, boolean failIfRebase) {
0611     throw new UnsupportedOperationException("only readInts is valid.");
0612   }
0613 
0614   @Override
0615   public void readBinary(int total, WritableColumnVector c, int rowId) {
0616     throw new UnsupportedOperationException("only readInts is valid.");
0617   }
0618 
0619   @Override
0620   public void readBooleans(int total, WritableColumnVector c, int rowId) {
0621     throw new UnsupportedOperationException("only readInts is valid.");
0622   }
0623 
0624   @Override
0625   public void readFloats(int total, WritableColumnVector c, int rowId) {
0626     throw new UnsupportedOperationException("only readInts is valid.");
0627   }
0628 
0629   @Override
0630   public void readDoubles(int total, WritableColumnVector c, int rowId) {
0631     throw new UnsupportedOperationException("only readInts is valid.");
0632   }
0633 
0634   @Override
0635   public Binary readBinary(int len) {
0636     throw new UnsupportedOperationException("only readInts is valid.");
0637   }
0638 
0639   /**
0640    * Reads the next varint encoded int.
0641    */
0642   private int readUnsignedVarInt() throws IOException {
0643     int value = 0;
0644     int shift = 0;
0645     int b;
0646     do {
0647       b = in.read();
0648       value |= (b & 0x7F) << shift;
0649       shift += 7;
0650     } while ((b & 0x80) != 0);
0651     return value;
0652   }
0653 
0654   /**
0655    * Reads the next 4 byte little endian int.
0656    */
0657   private int readIntLittleEndian() throws IOException {
0658     int ch4 = in.read();
0659     int ch3 = in.read();
0660     int ch2 = in.read();
0661     int ch1 = in.read();
0662     return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0));
0663   }
0664 
0665   /**
0666    * Reads the next byteWidth little endian int.
0667    */
0668   private int readIntLittleEndianPaddedOnBitWidth() throws IOException {
0669     switch (bytesWidth) {
0670       case 0:
0671         return 0;
0672       case 1:
0673         return in.read();
0674       case 2: {
0675         int ch2 = in.read();
0676         int ch1 = in.read();
0677         return (ch1 << 8) + ch2;
0678       }
0679       case 3: {
0680         int ch3 = in.read();
0681         int ch2 = in.read();
0682         int ch1 = in.read();
0683         return (ch1 << 16) + (ch2 << 8) + (ch3 << 0);
0684       }
0685       case 4: {
0686         return readIntLittleEndian();
0687       }
0688     }
0689     throw new RuntimeException("Unreachable");
0690   }
0691 
0692   private int ceil8(int value) {
0693     return (value + 7) / 8;
0694   }
0695 
0696   /**
0697    * Reads the next group.
0698    */
0699   private void readNextGroup() {
0700     try {
0701       int header = readUnsignedVarInt();
0702       this.mode = (header & 1) == 0 ? MODE.RLE : MODE.PACKED;
0703       switch (mode) {
0704         case RLE:
0705           this.currentCount = header >>> 1;
0706           this.currentValue = readIntLittleEndianPaddedOnBitWidth();
0707           return;
0708         case PACKED:
0709           int numGroups = header >>> 1;
0710           this.currentCount = numGroups * 8;
0711 
0712           if (this.currentBuffer.length < this.currentCount) {
0713             this.currentBuffer = new int[this.currentCount];
0714           }
0715           currentBufferIdx = 0;
0716           int valueIndex = 0;
0717           while (valueIndex < this.currentCount) {
0718             // values are bit packed 8 at a time, so reading bitWidth will always work
0719             ByteBuffer buffer = in.slice(bitWidth);
0720             this.packer.unpack8Values(buffer, buffer.position(), this.currentBuffer, valueIndex);
0721             valueIndex += 8;
0722           }
0723           return;
0724         default:
0725           throw new ParquetDecodingException("not a valid mode " + this.mode);
0726       }
0727     } catch (IOException e) {
0728       throw new ParquetDecodingException("Failed to read from input stream", e);
0729     }
0730   }
0731 }