0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018
0019 package org.apache.spark.sql.execution.datasources.parquet;
0020
0021 import java.io.File;
0022 import java.io.IOException;
0023 import java.lang.reflect.InvocationTargetException;
0024 import java.util.ArrayList;
0025 import java.util.Arrays;
0026 import java.util.Collections;
0027 import java.util.HashMap;
0028 import java.util.HashSet;
0029 import java.util.List;
0030 import java.util.Map;
0031 import java.util.Set;
0032
0033 import scala.Option;
0034
0035 import static org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups;
0036 import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
0037 import static org.apache.parquet.format.converter.ParquetMetadataConverter.range;
0038 import static org.apache.parquet.hadoop.ParquetFileReader.readFooter;
0039 import static org.apache.parquet.hadoop.ParquetInputFormat.getFilter;
0040
0041 import org.apache.hadoop.conf.Configuration;
0042 import org.apache.hadoop.fs.Path;
0043 import org.apache.hadoop.mapreduce.InputSplit;
0044 import org.apache.hadoop.mapreduce.RecordReader;
0045 import org.apache.hadoop.mapreduce.TaskAttemptContext;
0046 import org.apache.parquet.bytes.BytesInput;
0047 import org.apache.parquet.bytes.BytesUtils;
0048 import org.apache.parquet.column.ColumnDescriptor;
0049 import org.apache.parquet.column.values.ValuesReader;
0050 import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridDecoder;
0051 import org.apache.parquet.filter2.compat.FilterCompat;
0052 import org.apache.parquet.hadoop.BadConfigurationException;
0053 import org.apache.parquet.hadoop.ParquetFileReader;
0054 import org.apache.parquet.hadoop.ParquetInputFormat;
0055 import org.apache.parquet.hadoop.ParquetInputSplit;
0056 import org.apache.parquet.hadoop.api.InitContext;
0057 import org.apache.parquet.hadoop.api.ReadSupport;
0058 import org.apache.parquet.hadoop.metadata.BlockMetaData;
0059 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
0060 import org.apache.parquet.hadoop.util.ConfigurationUtil;
0061 import org.apache.parquet.schema.MessageType;
0062 import org.apache.parquet.schema.Types;
0063 import org.apache.spark.TaskContext;
0064 import org.apache.spark.TaskContext$;
0065 import org.apache.spark.sql.types.StructType;
0066 import org.apache.spark.sql.types.StructType$;
0067 import org.apache.spark.util.AccumulatorV2;
0068
0069
0070
0071
0072
0073
0074
0075
0076
0077 public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Void, T> {
0078 protected Path file;
0079 protected MessageType fileSchema;
0080 protected MessageType requestedSchema;
0081 protected StructType sparkSchema;
0082
0083
0084
0085
0086
0087 protected long totalRowCount;
0088
0089 protected ParquetFileReader reader;
0090
0091 @Override
0092 public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
0093 throws IOException, InterruptedException {
0094 Configuration configuration = taskAttemptContext.getConfiguration();
0095 ParquetInputSplit split = (ParquetInputSplit)inputSplit;
0096 this.file = split.getPath();
0097 long[] rowGroupOffsets = split.getRowGroupOffsets();
0098
0099 ParquetMetadata footer;
0100 List<BlockMetaData> blocks;
0101
0102
0103 if (rowGroupOffsets == null) {
0104
0105 footer = readFooter(configuration, file, range(split.getStart(), split.getEnd()));
0106 MessageType fileSchema = footer.getFileMetaData().getSchema();
0107 FilterCompat.Filter filter = getFilter(configuration);
0108 blocks = filterRowGroups(filter, footer.getBlocks(), fileSchema);
0109 } else {
0110
0111 footer = readFooter(configuration, file, NO_FILTER);
0112 Set<Long> offsets = new HashSet<>();
0113 for (long offset : rowGroupOffsets) {
0114 offsets.add(offset);
0115 }
0116 blocks = new ArrayList<>();
0117 for (BlockMetaData block : footer.getBlocks()) {
0118 if (offsets.contains(block.getStartingPos())) {
0119 blocks.add(block);
0120 }
0121 }
0122
0123 if (blocks.size() != rowGroupOffsets.length) {
0124 long[] foundRowGroupOffsets = new long[footer.getBlocks().size()];
0125 for (int i = 0; i < foundRowGroupOffsets.length; i++) {
0126 foundRowGroupOffsets[i] = footer.getBlocks().get(i).getStartingPos();
0127 }
0128
0129
0130 throw new IllegalStateException(
0131 "All the offsets listed in the split should be found in the file."
0132 + " expected: " + Arrays.toString(rowGroupOffsets)
0133 + " found: " + blocks
0134 + " out of: " + Arrays.toString(foundRowGroupOffsets)
0135 + " in range " + split.getStart() + ", " + split.getEnd());
0136 }
0137 }
0138 this.fileSchema = footer.getFileMetaData().getSchema();
0139 Map<String, String> fileMetadata = footer.getFileMetaData().getKeyValueMetaData();
0140 ReadSupport<T> readSupport = getReadSupportInstance(getReadSupportClass(configuration));
0141 ReadSupport.ReadContext readContext = readSupport.init(new InitContext(
0142 taskAttemptContext.getConfiguration(), toSetMultiMap(fileMetadata), fileSchema));
0143 this.requestedSchema = readContext.getRequestedSchema();
0144 String sparkRequestedSchemaString =
0145 configuration.get(ParquetReadSupport$.MODULE$.SPARK_ROW_REQUESTED_SCHEMA());
0146 this.sparkSchema = StructType$.MODULE$.fromString(sparkRequestedSchemaString);
0147 this.reader = new ParquetFileReader(
0148 configuration, footer.getFileMetaData(), file, blocks, requestedSchema.getColumns());
0149
0150 for (BlockMetaData block : reader.getRowGroups()) {
0151 this.totalRowCount += block.getRowCount();
0152 }
0153
0154
0155
0156
0157
0158 TaskContext taskContext = TaskContext$.MODULE$.get();
0159 if (taskContext != null) {
0160 Option<AccumulatorV2<?, ?>> accu = taskContext.taskMetrics().externalAccums().lastOption();
0161 if (accu.isDefined() && accu.get().getClass().getSimpleName().equals("NumRowGroupsAcc")) {
0162 @SuppressWarnings("unchecked")
0163 AccumulatorV2<Integer, Integer> intAccum = (AccumulatorV2<Integer, Integer>) accu.get();
0164 intAccum.add(blocks.size());
0165 }
0166 }
0167 }
0168
0169
0170
0171
0172
0173 public static List<String> listDirectory(File path) {
0174 List<String> result = new ArrayList<>();
0175 if (path.isDirectory()) {
0176 for (File f: path.listFiles()) {
0177 result.addAll(listDirectory(f));
0178 }
0179 } else {
0180 char c = path.getName().charAt(0);
0181 if (c != '.' && c != '_') {
0182 result.add(path.getAbsolutePath());
0183 }
0184 }
0185 return result;
0186 }
0187
0188
0189
0190
0191
0192
0193
0194
0195
0196 protected void initialize(String path, List<String> columns) throws IOException {
0197 Configuration config = new Configuration();
0198 config.set("spark.sql.parquet.binaryAsString", "false");
0199 config.set("spark.sql.parquet.int96AsTimestamp", "false");
0200
0201 this.file = new Path(path);
0202 long length = this.file.getFileSystem(config).getFileStatus(this.file).getLen();
0203 ParquetMetadata footer = readFooter(config, file, range(0, length));
0204
0205 List<BlockMetaData> blocks = footer.getBlocks();
0206 this.fileSchema = footer.getFileMetaData().getSchema();
0207
0208 if (columns == null) {
0209 this.requestedSchema = fileSchema;
0210 } else {
0211 if (columns.size() > 0) {
0212 Types.MessageTypeBuilder builder = Types.buildMessage();
0213 for (String s: columns) {
0214 if (!fileSchema.containsField(s)) {
0215 throw new IOException("Can only project existing columns. Unknown field: " + s +
0216 " File schema:\n" + fileSchema);
0217 }
0218 builder.addFields(fileSchema.getType(s));
0219 }
0220 this.requestedSchema = builder.named(ParquetSchemaConverter.SPARK_PARQUET_SCHEMA_NAME());
0221 } else {
0222 this.requestedSchema = ParquetSchemaConverter.EMPTY_MESSAGE();
0223 }
0224 }
0225 this.sparkSchema = new ParquetToSparkSchemaConverter(config).convert(requestedSchema);
0226 this.reader = new ParquetFileReader(
0227 config, footer.getFileMetaData(), file, blocks, requestedSchema.getColumns());
0228
0229 for (BlockMetaData block : reader.getRowGroups()) {
0230 this.totalRowCount += block.getRowCount();
0231 }
0232 }
0233
0234 @Override
0235 public Void getCurrentKey() {
0236 return null;
0237 }
0238
0239 @Override
0240 public void close() throws IOException {
0241 if (reader != null) {
0242 reader.close();
0243 reader = null;
0244 }
0245 }
0246
0247
0248
0249
0250
0251 abstract static class IntIterator {
0252 abstract int nextInt() throws IOException;
0253 }
0254
0255 protected static final class ValuesReaderIntIterator extends IntIterator {
0256 ValuesReader delegate;
0257
0258 public ValuesReaderIntIterator(ValuesReader delegate) {
0259 this.delegate = delegate;
0260 }
0261
0262 @Override
0263 int nextInt() {
0264 return delegate.readInteger();
0265 }
0266 }
0267
0268 protected static final class RLEIntIterator extends IntIterator {
0269 RunLengthBitPackingHybridDecoder delegate;
0270
0271 public RLEIntIterator(RunLengthBitPackingHybridDecoder delegate) {
0272 this.delegate = delegate;
0273 }
0274
0275 @Override
0276 int nextInt() throws IOException {
0277 return delegate.readInt();
0278 }
0279 }
0280
0281 protected static final class NullIntIterator extends IntIterator {
0282 @Override
0283 int nextInt() { return 0; }
0284 }
0285
0286
0287
0288
0289
0290 protected static IntIterator createRLEIterator(
0291 int maxLevel, BytesInput bytes, ColumnDescriptor descriptor) throws IOException {
0292 try {
0293 if (maxLevel == 0) return new NullIntIterator();
0294 return new RLEIntIterator(
0295 new RunLengthBitPackingHybridDecoder(
0296 BytesUtils.getWidthFromMaxInt(maxLevel),
0297 bytes.toInputStream()));
0298 } catch (IOException e) {
0299 throw new IOException("could not read levels in page for col " + descriptor, e);
0300 }
0301 }
0302
0303 private static <K, V> Map<K, Set<V>> toSetMultiMap(Map<K, V> map) {
0304 Map<K, Set<V>> setMultiMap = new HashMap<>();
0305 for (Map.Entry<K, V> entry : map.entrySet()) {
0306 Set<V> set = new HashSet<>();
0307 set.add(entry.getValue());
0308 setMultiMap.put(entry.getKey(), Collections.unmodifiableSet(set));
0309 }
0310 return Collections.unmodifiableMap(setMultiMap);
0311 }
0312
0313 @SuppressWarnings("unchecked")
0314 private Class<? extends ReadSupport<T>> getReadSupportClass(Configuration configuration) {
0315 return (Class<? extends ReadSupport<T>>) ConfigurationUtil.getClassFromConfig(configuration,
0316 ParquetInputFormat.READ_SUPPORT_CLASS, ReadSupport.class);
0317 }
0318
0319
0320
0321
0322
0323 private static <T> ReadSupport<T> getReadSupportInstance(
0324 Class<? extends ReadSupport<T>> readSupportClass){
0325 try {
0326 return readSupportClass.getConstructor().newInstance();
0327 } catch (InstantiationException | IllegalAccessException |
0328 NoSuchMethodException | InvocationTargetException e) {
0329 throw new BadConfigurationException("could not instantiate read support class", e);
0330 }
0331 }
0332 }