0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017 package org.apache.spark.sql.execution.datasources.parquet;
0018
0019 import java.io.IOException;
0020 import java.nio.ByteBuffer;
0021 import java.nio.ByteOrder;
0022
0023 import org.apache.parquet.bytes.ByteBufferInputStream;
0024 import org.apache.parquet.column.values.ValuesReader;
0025 import org.apache.parquet.io.api.Binary;
0026 import org.apache.parquet.io.ParquetDecodingException;
0027
0028 import org.apache.spark.sql.catalyst.util.RebaseDateTime;
0029 import org.apache.spark.sql.execution.datasources.DataSourceUtils;
0030 import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
0031
0032
0033
0034
0035 public class VectorizedPlainValuesReader extends ValuesReader implements VectorizedValuesReader {
0036 private ByteBufferInputStream in = null;
0037
0038
0039 private int bitOffset;
0040 private byte currentByte = 0;
0041
0042 public VectorizedPlainValuesReader() {
0043 }
0044
0045 @Override
0046 public void initFromPage(int valueCount, ByteBufferInputStream in) throws IOException {
0047 this.in = in;
0048 }
0049
0050 @Override
0051 public void skip() {
0052 throw new UnsupportedOperationException();
0053 }
0054
0055 @Override
0056 public final void readBooleans(int total, WritableColumnVector c, int rowId) {
0057
0058 for (int i = 0; i < total; i++) {
0059 c.putBoolean(rowId + i, readBoolean());
0060 }
0061 }
0062
0063 private ByteBuffer getBuffer(int length) {
0064 try {
0065 return in.slice(length).order(ByteOrder.LITTLE_ENDIAN);
0066 } catch (IOException e) {
0067 throw new ParquetDecodingException("Failed to read " + length + " bytes", e);
0068 }
0069 }
0070
0071 @Override
0072 public final void readIntegers(int total, WritableColumnVector c, int rowId) {
0073 int requiredBytes = total * 4;
0074 ByteBuffer buffer = getBuffer(requiredBytes);
0075
0076 if (buffer.hasArray()) {
0077 int offset = buffer.arrayOffset() + buffer.position();
0078 c.putIntsLittleEndian(rowId, total, buffer.array(), offset);
0079 } else {
0080 for (int i = 0; i < total; i += 1) {
0081 c.putInt(rowId + i, buffer.getInt());
0082 }
0083 }
0084 }
0085
0086
0087
0088
0089 @Override
0090 public final void readIntegersWithRebase(
0091 int total, WritableColumnVector c, int rowId, boolean failIfRebase) {
0092 int requiredBytes = total * 4;
0093 ByteBuffer buffer = getBuffer(requiredBytes);
0094 boolean rebase = false;
0095 for (int i = 0; i < total; i += 1) {
0096 rebase |= buffer.getInt(buffer.position() + i * 4) < RebaseDateTime.lastSwitchJulianDay();
0097 }
0098 if (rebase) {
0099 if (failIfRebase) {
0100 throw DataSourceUtils.newRebaseExceptionInRead("Parquet");
0101 } else {
0102 for (int i = 0; i < total; i += 1) {
0103 c.putInt(rowId + i, RebaseDateTime.rebaseJulianToGregorianDays(buffer.getInt()));
0104 }
0105 }
0106 } else {
0107 if (buffer.hasArray()) {
0108 int offset = buffer.arrayOffset() + buffer.position();
0109 c.putIntsLittleEndian(rowId, total, buffer.array(), offset);
0110 } else {
0111 for (int i = 0; i < total; i += 1) {
0112 c.putInt(rowId + i, buffer.getInt());
0113 }
0114 }
0115 }
0116 }
0117
0118 @Override
0119 public final void readLongs(int total, WritableColumnVector c, int rowId) {
0120 int requiredBytes = total * 8;
0121 ByteBuffer buffer = getBuffer(requiredBytes);
0122
0123 if (buffer.hasArray()) {
0124 int offset = buffer.arrayOffset() + buffer.position();
0125 c.putLongsLittleEndian(rowId, total, buffer.array(), offset);
0126 } else {
0127 for (int i = 0; i < total; i += 1) {
0128 c.putLong(rowId + i, buffer.getLong());
0129 }
0130 }
0131 }
0132
0133
0134
0135
0136 @Override
0137 public final void readLongsWithRebase(
0138 int total, WritableColumnVector c, int rowId, boolean failIfRebase) {
0139 int requiredBytes = total * 8;
0140 ByteBuffer buffer = getBuffer(requiredBytes);
0141 boolean rebase = false;
0142 for (int i = 0; i < total; i += 1) {
0143 rebase |= buffer.getLong(buffer.position() + i * 8) < RebaseDateTime.lastSwitchJulianTs();
0144 }
0145 if (rebase) {
0146 if (failIfRebase) {
0147 throw DataSourceUtils.newRebaseExceptionInRead("Parquet");
0148 } else {
0149 for (int i = 0; i < total; i += 1) {
0150 c.putLong(rowId + i, RebaseDateTime.rebaseJulianToGregorianMicros(buffer.getLong()));
0151 }
0152 }
0153 } else {
0154 if (buffer.hasArray()) {
0155 int offset = buffer.arrayOffset() + buffer.position();
0156 c.putLongsLittleEndian(rowId, total, buffer.array(), offset);
0157 } else {
0158 for (int i = 0; i < total; i += 1) {
0159 c.putLong(rowId + i, buffer.getLong());
0160 }
0161 }
0162 }
0163 }
0164
0165 @Override
0166 public final void readFloats(int total, WritableColumnVector c, int rowId) {
0167 int requiredBytes = total * 4;
0168 ByteBuffer buffer = getBuffer(requiredBytes);
0169
0170 if (buffer.hasArray()) {
0171 int offset = buffer.arrayOffset() + buffer.position();
0172 c.putFloats(rowId, total, buffer.array(), offset);
0173 } else {
0174 for (int i = 0; i < total; i += 1) {
0175 c.putFloat(rowId + i, buffer.getFloat());
0176 }
0177 }
0178 }
0179
0180 @Override
0181 public final void readDoubles(int total, WritableColumnVector c, int rowId) {
0182 int requiredBytes = total * 8;
0183 ByteBuffer buffer = getBuffer(requiredBytes);
0184
0185 if (buffer.hasArray()) {
0186 int offset = buffer.arrayOffset() + buffer.position();
0187 c.putDoubles(rowId, total, buffer.array(), offset);
0188 } else {
0189 for (int i = 0; i < total; i += 1) {
0190 c.putDouble(rowId + i, buffer.getDouble());
0191 }
0192 }
0193 }
0194
0195 @Override
0196 public final void readBytes(int total, WritableColumnVector c, int rowId) {
0197
0198
0199 int requiredBytes = total * 4;
0200 ByteBuffer buffer = getBuffer(requiredBytes);
0201
0202 for (int i = 0; i < total; i += 1) {
0203 c.putByte(rowId + i, buffer.get());
0204
0205 buffer.position(buffer.position() + 3);
0206 }
0207 }
0208
0209 @Override
0210 public final boolean readBoolean() {
0211
0212 if (bitOffset == 0) {
0213 try {
0214 currentByte = (byte) in.read();
0215 } catch (IOException e) {
0216 throw new ParquetDecodingException("Failed to read a byte", e);
0217 }
0218 }
0219
0220 boolean v = (currentByte & (1 << bitOffset)) != 0;
0221 bitOffset += 1;
0222 if (bitOffset == 8) {
0223 bitOffset = 0;
0224 }
0225 return v;
0226 }
0227
0228 @Override
0229 public final int readInteger() {
0230 return getBuffer(4).getInt();
0231 }
0232
0233 @Override
0234 public final long readLong() {
0235 return getBuffer(8).getLong();
0236 }
0237
0238 @Override
0239 public final byte readByte() {
0240 return (byte) readInteger();
0241 }
0242
0243 @Override
0244 public final float readFloat() {
0245 return getBuffer(4).getFloat();
0246 }
0247
0248 @Override
0249 public final double readDouble() {
0250 return getBuffer(8).getDouble();
0251 }
0252
0253 @Override
0254 public final void readBinary(int total, WritableColumnVector v, int rowId) {
0255 for (int i = 0; i < total; i++) {
0256 int len = readInteger();
0257 ByteBuffer buffer = getBuffer(len);
0258 if (buffer.hasArray()) {
0259 v.putByteArray(rowId + i, buffer.array(), buffer.arrayOffset() + buffer.position(), len);
0260 } else {
0261 byte[] bytes = new byte[len];
0262 buffer.get(bytes);
0263 v.putByteArray(rowId + i, bytes);
0264 }
0265 }
0266 }
0267
0268 @Override
0269 public final Binary readBinary(int len) {
0270 ByteBuffer buffer = getBuffer(len);
0271 if (buffer.hasArray()) {
0272 return Binary.fromConstantByteArray(
0273 buffer.array(), buffer.arrayOffset() + buffer.position(), len);
0274 } else {
0275 byte[] bytes = new byte[len];
0276 buffer.get(bytes);
0277 return Binary.fromConstantByteArray(bytes);
0278 }
0279 }
0280 }