Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 package org.apache.spark.sql.execution.datasources.parquet;
0018 
0019 import java.io.IOException;
0020 import java.nio.ByteBuffer;
0021 import java.nio.ByteOrder;
0022 
0023 import org.apache.parquet.bytes.ByteBufferInputStream;
0024 import org.apache.parquet.column.values.ValuesReader;
0025 import org.apache.parquet.io.api.Binary;
0026 import org.apache.parquet.io.ParquetDecodingException;
0027 
0028 import org.apache.spark.sql.catalyst.util.RebaseDateTime;
0029 import org.apache.spark.sql.execution.datasources.DataSourceUtils;
0030 import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
0031 
0032 /**
0033  * An implementation of the Parquet PLAIN decoder that supports the vectorized interface.
0034  */
0035 public class VectorizedPlainValuesReader extends ValuesReader implements VectorizedValuesReader {
0036   private ByteBufferInputStream in = null;
0037 
0038   // Only used for booleans.
0039   private int bitOffset;
0040   private byte currentByte = 0;
0041 
0042   public VectorizedPlainValuesReader() {
0043   }
0044 
0045   @Override
0046   public void initFromPage(int valueCount, ByteBufferInputStream in) throws IOException {
0047     this.in = in;
0048   }
0049 
0050   @Override
0051   public void skip() {
0052     throw new UnsupportedOperationException();
0053   }
0054 
0055   @Override
0056   public final void readBooleans(int total, WritableColumnVector c, int rowId) {
0057     // TODO: properly vectorize this
0058     for (int i = 0; i < total; i++) {
0059       c.putBoolean(rowId + i, readBoolean());
0060     }
0061   }
0062 
0063   private ByteBuffer getBuffer(int length) {
0064     try {
0065       return in.slice(length).order(ByteOrder.LITTLE_ENDIAN);
0066     } catch (IOException e) {
0067       throw new ParquetDecodingException("Failed to read " + length + " bytes", e);
0068     }
0069   }
0070 
0071   @Override
0072   public final void readIntegers(int total, WritableColumnVector c, int rowId) {
0073     int requiredBytes = total * 4;
0074     ByteBuffer buffer = getBuffer(requiredBytes);
0075 
0076     if (buffer.hasArray()) {
0077       int offset = buffer.arrayOffset() + buffer.position();
0078       c.putIntsLittleEndian(rowId, total, buffer.array(), offset);
0079     } else {
0080       for (int i = 0; i < total; i += 1) {
0081         c.putInt(rowId + i, buffer.getInt());
0082       }
0083     }
0084   }
0085 
0086   // A fork of `readIntegers` to rebase the date values. For performance reasons, this method
0087   // iterates the values twice: check if we need to rebase first, then go to the optimized branch
0088   // if rebase is not needed.
0089   @Override
0090   public final void readIntegersWithRebase(
0091       int total, WritableColumnVector c, int rowId, boolean failIfRebase) {
0092     int requiredBytes = total * 4;
0093     ByteBuffer buffer = getBuffer(requiredBytes);
0094     boolean rebase = false;
0095     for (int i = 0; i < total; i += 1) {
0096       rebase |= buffer.getInt(buffer.position() + i * 4) < RebaseDateTime.lastSwitchJulianDay();
0097     }
0098     if (rebase) {
0099       if (failIfRebase) {
0100         throw DataSourceUtils.newRebaseExceptionInRead("Parquet");
0101       } else {
0102         for (int i = 0; i < total; i += 1) {
0103           c.putInt(rowId + i, RebaseDateTime.rebaseJulianToGregorianDays(buffer.getInt()));
0104         }
0105       }
0106     } else {
0107       if (buffer.hasArray()) {
0108         int offset = buffer.arrayOffset() + buffer.position();
0109         c.putIntsLittleEndian(rowId, total, buffer.array(), offset);
0110       } else {
0111         for (int i = 0; i < total; i += 1) {
0112           c.putInt(rowId + i, buffer.getInt());
0113         }
0114       }
0115     }
0116   }
0117 
0118   @Override
0119   public final void readLongs(int total, WritableColumnVector c, int rowId) {
0120     int requiredBytes = total * 8;
0121     ByteBuffer buffer = getBuffer(requiredBytes);
0122 
0123     if (buffer.hasArray()) {
0124       int offset = buffer.arrayOffset() + buffer.position();
0125       c.putLongsLittleEndian(rowId, total, buffer.array(), offset);
0126     } else {
0127       for (int i = 0; i < total; i += 1) {
0128         c.putLong(rowId + i, buffer.getLong());
0129       }
0130     }
0131   }
0132 
0133   // A fork of `readLongs` to rebase the timestamp values. For performance reasons, this method
0134   // iterates the values twice: check if we need to rebase first, then go to the optimized branch
0135   // if rebase is not needed.
0136   @Override
0137   public final void readLongsWithRebase(
0138       int total, WritableColumnVector c, int rowId, boolean failIfRebase) {
0139     int requiredBytes = total * 8;
0140     ByteBuffer buffer = getBuffer(requiredBytes);
0141     boolean rebase = false;
0142     for (int i = 0; i < total; i += 1) {
0143       rebase |= buffer.getLong(buffer.position() + i * 8) < RebaseDateTime.lastSwitchJulianTs();
0144     }
0145     if (rebase) {
0146       if (failIfRebase) {
0147         throw DataSourceUtils.newRebaseExceptionInRead("Parquet");
0148       } else {
0149         for (int i = 0; i < total; i += 1) {
0150           c.putLong(rowId + i, RebaseDateTime.rebaseJulianToGregorianMicros(buffer.getLong()));
0151         }
0152       }
0153     } else {
0154       if (buffer.hasArray()) {
0155         int offset = buffer.arrayOffset() + buffer.position();
0156         c.putLongsLittleEndian(rowId, total, buffer.array(), offset);
0157       } else {
0158         for (int i = 0; i < total; i += 1) {
0159           c.putLong(rowId + i, buffer.getLong());
0160         }
0161       }
0162     }
0163   }
0164 
0165   @Override
0166   public final void readFloats(int total, WritableColumnVector c, int rowId) {
0167     int requiredBytes = total * 4;
0168     ByteBuffer buffer = getBuffer(requiredBytes);
0169 
0170     if (buffer.hasArray()) {
0171       int offset = buffer.arrayOffset() + buffer.position();
0172       c.putFloats(rowId, total, buffer.array(), offset);
0173     } else {
0174       for (int i = 0; i < total; i += 1) {
0175         c.putFloat(rowId + i, buffer.getFloat());
0176       }
0177     }
0178   }
0179 
0180   @Override
0181   public final void readDoubles(int total, WritableColumnVector c, int rowId) {
0182     int requiredBytes = total * 8;
0183     ByteBuffer buffer = getBuffer(requiredBytes);
0184 
0185     if (buffer.hasArray()) {
0186       int offset = buffer.arrayOffset() + buffer.position();
0187       c.putDoubles(rowId, total, buffer.array(), offset);
0188     } else {
0189       for (int i = 0; i < total; i += 1) {
0190         c.putDouble(rowId + i, buffer.getDouble());
0191       }
0192     }
0193   }
0194 
0195   @Override
0196   public final void readBytes(int total, WritableColumnVector c, int rowId) {
0197     // Bytes are stored as a 4-byte little endian int. Just read the first byte.
0198     // TODO: consider pushing this in ColumnVector by adding a readBytes with a stride.
0199     int requiredBytes = total * 4;
0200     ByteBuffer buffer = getBuffer(requiredBytes);
0201 
0202     for (int i = 0; i < total; i += 1) {
0203       c.putByte(rowId + i, buffer.get());
0204       // skip the next 3 bytes
0205       buffer.position(buffer.position() + 3);
0206     }
0207   }
0208 
0209   @Override
0210   public final boolean readBoolean() {
0211     // TODO: vectorize decoding and keep boolean[] instead of currentByte
0212     if (bitOffset == 0) {
0213       try {
0214         currentByte = (byte) in.read();
0215       } catch (IOException e) {
0216         throw new ParquetDecodingException("Failed to read a byte", e);
0217       }
0218     }
0219 
0220     boolean v = (currentByte & (1 << bitOffset)) != 0;
0221     bitOffset += 1;
0222     if (bitOffset == 8) {
0223       bitOffset = 0;
0224     }
0225     return v;
0226   }
0227 
0228   @Override
0229   public final int readInteger() {
0230     return getBuffer(4).getInt();
0231   }
0232 
0233   @Override
0234   public final long readLong() {
0235     return getBuffer(8).getLong();
0236   }
0237 
0238   @Override
0239   public final byte readByte() {
0240     return (byte) readInteger();
0241   }
0242 
0243   @Override
0244   public final float readFloat() {
0245     return getBuffer(4).getFloat();
0246   }
0247 
0248   @Override
0249   public final double readDouble() {
0250     return getBuffer(8).getDouble();
0251   }
0252 
0253   @Override
0254   public final void readBinary(int total, WritableColumnVector v, int rowId) {
0255     for (int i = 0; i < total; i++) {
0256       int len = readInteger();
0257       ByteBuffer buffer = getBuffer(len);
0258       if (buffer.hasArray()) {
0259         v.putByteArray(rowId + i, buffer.array(), buffer.arrayOffset() + buffer.position(), len);
0260       } else {
0261         byte[] bytes = new byte[len];
0262         buffer.get(bytes);
0263         v.putByteArray(rowId + i, bytes);
0264       }
0265     }
0266   }
0267 
0268   @Override
0269   public final Binary readBinary(int len) {
0270     ByteBuffer buffer = getBuffer(len);
0271     if (buffer.hasArray()) {
0272       return Binary.fromConstantByteArray(
0273           buffer.array(), buffer.arrayOffset() + buffer.position(), len);
0274     } else {
0275       byte[] bytes = new byte[len];
0276       buffer.get(bytes);
0277       return Binary.fromConstantByteArray(bytes);
0278     }
0279   }
0280 }