Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 
0019 package org.apache.spark.sql.execution.datasources.parquet;
0020 
0021 import java.io.File;
0022 import java.io.IOException;
0023 import java.lang.reflect.InvocationTargetException;
0024 import java.util.ArrayList;
0025 import java.util.Arrays;
0026 import java.util.Collections;
0027 import java.util.HashMap;
0028 import java.util.HashSet;
0029 import java.util.List;
0030 import java.util.Map;
0031 import java.util.Set;
0032 
0033 import scala.Option;
0034 
0035 import static org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups;
0036 import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
0037 import static org.apache.parquet.format.converter.ParquetMetadataConverter.range;
0038 import static org.apache.parquet.hadoop.ParquetFileReader.readFooter;
0039 import static org.apache.parquet.hadoop.ParquetInputFormat.getFilter;
0040 
0041 import org.apache.hadoop.conf.Configuration;
0042 import org.apache.hadoop.fs.Path;
0043 import org.apache.hadoop.mapreduce.InputSplit;
0044 import org.apache.hadoop.mapreduce.RecordReader;
0045 import org.apache.hadoop.mapreduce.TaskAttemptContext;
0046 import org.apache.parquet.bytes.BytesInput;
0047 import org.apache.parquet.bytes.BytesUtils;
0048 import org.apache.parquet.column.ColumnDescriptor;
0049 import org.apache.parquet.column.values.ValuesReader;
0050 import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridDecoder;
0051 import org.apache.parquet.filter2.compat.FilterCompat;
0052 import org.apache.parquet.hadoop.BadConfigurationException;
0053 import org.apache.parquet.hadoop.ParquetFileReader;
0054 import org.apache.parquet.hadoop.ParquetInputFormat;
0055 import org.apache.parquet.hadoop.ParquetInputSplit;
0056 import org.apache.parquet.hadoop.api.InitContext;
0057 import org.apache.parquet.hadoop.api.ReadSupport;
0058 import org.apache.parquet.hadoop.metadata.BlockMetaData;
0059 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
0060 import org.apache.parquet.hadoop.util.ConfigurationUtil;
0061 import org.apache.parquet.schema.MessageType;
0062 import org.apache.parquet.schema.Types;
0063 import org.apache.spark.TaskContext;
0064 import org.apache.spark.TaskContext$;
0065 import org.apache.spark.sql.types.StructType;
0066 import org.apache.spark.sql.types.StructType$;
0067 import org.apache.spark.util.AccumulatorV2;
0068 
0069 /**
0070  * Base class for custom RecordReaders for Parquet that directly materialize to `T`.
0071  * This class handles computing row groups, filtering on them, setting up the column readers,
0072  * etc.
0073  * This is heavily based on parquet-mr's RecordReader.
0074  * TODO: move this to the parquet-mr project. There are performance benefits of doing it
0075  * this way, albeit at a higher cost to implement. This base class is reusable.
0076  */
0077 public abstract class SpecificParquetRecordReaderBase<T> extends RecordReader<Void, T> {
0078   protected Path file;
0079   protected MessageType fileSchema;
0080   protected MessageType requestedSchema;
0081   protected StructType sparkSchema;
0082 
0083   /**
0084    * The total number of rows this RecordReader will eventually read. The sum of the
0085    * rows of all the row groups.
0086    */
0087   protected long totalRowCount;
0088 
0089   protected ParquetFileReader reader;
0090 
0091   @Override
0092   public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
0093       throws IOException, InterruptedException {
0094     Configuration configuration = taskAttemptContext.getConfiguration();
0095     ParquetInputSplit split = (ParquetInputSplit)inputSplit;
0096     this.file = split.getPath();
0097     long[] rowGroupOffsets = split.getRowGroupOffsets();
0098 
0099     ParquetMetadata footer;
0100     List<BlockMetaData> blocks;
0101 
0102     // if task.side.metadata is set, rowGroupOffsets is null
0103     if (rowGroupOffsets == null) {
0104       // then we need to apply the predicate push down filter
0105       footer = readFooter(configuration, file, range(split.getStart(), split.getEnd()));
0106       MessageType fileSchema = footer.getFileMetaData().getSchema();
0107       FilterCompat.Filter filter = getFilter(configuration);
0108       blocks = filterRowGroups(filter, footer.getBlocks(), fileSchema);
0109     } else {
0110       // otherwise we find the row groups that were selected on the client
0111       footer = readFooter(configuration, file, NO_FILTER);
0112       Set<Long> offsets = new HashSet<>();
0113       for (long offset : rowGroupOffsets) {
0114         offsets.add(offset);
0115       }
0116       blocks = new ArrayList<>();
0117       for (BlockMetaData block : footer.getBlocks()) {
0118         if (offsets.contains(block.getStartingPos())) {
0119           blocks.add(block);
0120         }
0121       }
0122       // verify we found them all
0123       if (blocks.size() != rowGroupOffsets.length) {
0124         long[] foundRowGroupOffsets = new long[footer.getBlocks().size()];
0125         for (int i = 0; i < foundRowGroupOffsets.length; i++) {
0126           foundRowGroupOffsets[i] = footer.getBlocks().get(i).getStartingPos();
0127         }
0128         // this should never happen.
0129         // provide a good error message in case there's a bug
0130         throw new IllegalStateException(
0131             "All the offsets listed in the split should be found in the file."
0132                 + " expected: " + Arrays.toString(rowGroupOffsets)
0133                 + " found: " + blocks
0134                 + " out of: " + Arrays.toString(foundRowGroupOffsets)
0135                 + " in range " + split.getStart() + ", " + split.getEnd());
0136       }
0137     }
0138     this.fileSchema = footer.getFileMetaData().getSchema();
0139     Map<String, String> fileMetadata = footer.getFileMetaData().getKeyValueMetaData();
0140     ReadSupport<T> readSupport = getReadSupportInstance(getReadSupportClass(configuration));
0141     ReadSupport.ReadContext readContext = readSupport.init(new InitContext(
0142         taskAttemptContext.getConfiguration(), toSetMultiMap(fileMetadata), fileSchema));
0143     this.requestedSchema = readContext.getRequestedSchema();
0144     String sparkRequestedSchemaString =
0145         configuration.get(ParquetReadSupport$.MODULE$.SPARK_ROW_REQUESTED_SCHEMA());
0146     this.sparkSchema = StructType$.MODULE$.fromString(sparkRequestedSchemaString);
0147     this.reader = new ParquetFileReader(
0148         configuration, footer.getFileMetaData(), file, blocks, requestedSchema.getColumns());
0149     // use the blocks from the reader in case some do not match filters and will not be read
0150     for (BlockMetaData block : reader.getRowGroups()) {
0151       this.totalRowCount += block.getRowCount();
0152     }
0153 
0154     // For test purpose.
0155     // If the last external accumulator is `NumRowGroupsAccumulator`, the row group number to read
0156     // will be updated to the accumulator. So we can check if the row groups are filtered or not
0157     // in test case.
0158     TaskContext taskContext = TaskContext$.MODULE$.get();
0159     if (taskContext != null) {
0160       Option<AccumulatorV2<?, ?>> accu = taskContext.taskMetrics().externalAccums().lastOption();
0161       if (accu.isDefined() && accu.get().getClass().getSimpleName().equals("NumRowGroupsAcc")) {
0162         @SuppressWarnings("unchecked")
0163         AccumulatorV2<Integer, Integer> intAccum = (AccumulatorV2<Integer, Integer>) accu.get();
0164         intAccum.add(blocks.size());
0165       }
0166     }
0167   }
0168 
0169   /**
0170    * Returns the list of files at 'path' recursively. This skips files that are ignored normally
0171    * by MapReduce.
0172    */
0173   public static List<String> listDirectory(File path) {
0174     List<String> result = new ArrayList<>();
0175     if (path.isDirectory()) {
0176       for (File f: path.listFiles()) {
0177         result.addAll(listDirectory(f));
0178       }
0179     } else {
0180       char c = path.getName().charAt(0);
0181       if (c != '.' && c != '_') {
0182         result.add(path.getAbsolutePath());
0183       }
0184     }
0185     return result;
0186   }
0187 
0188   /**
0189    * Initializes the reader to read the file at `path` with `columns` projected. If columns is
0190    * null, all the columns are projected.
0191    *
0192    * This is exposed for testing to be able to create this reader without the rest of the Hadoop
0193    * split machinery. It is not intended for general use and those not support all the
0194    * configurations.
0195    */
0196   protected void initialize(String path, List<String> columns) throws IOException {
0197     Configuration config = new Configuration();
0198     config.set("spark.sql.parquet.binaryAsString", "false");
0199     config.set("spark.sql.parquet.int96AsTimestamp", "false");
0200 
0201     this.file = new Path(path);
0202     long length = this.file.getFileSystem(config).getFileStatus(this.file).getLen();
0203     ParquetMetadata footer = readFooter(config, file, range(0, length));
0204 
0205     List<BlockMetaData> blocks = footer.getBlocks();
0206     this.fileSchema = footer.getFileMetaData().getSchema();
0207 
0208     if (columns == null) {
0209       this.requestedSchema = fileSchema;
0210     } else {
0211       if (columns.size() > 0) {
0212         Types.MessageTypeBuilder builder = Types.buildMessage();
0213         for (String s: columns) {
0214           if (!fileSchema.containsField(s)) {
0215             throw new IOException("Can only project existing columns. Unknown field: " + s +
0216                     " File schema:\n" + fileSchema);
0217           }
0218           builder.addFields(fileSchema.getType(s));
0219         }
0220         this.requestedSchema = builder.named(ParquetSchemaConverter.SPARK_PARQUET_SCHEMA_NAME());
0221       } else {
0222         this.requestedSchema = ParquetSchemaConverter.EMPTY_MESSAGE();
0223       }
0224     }
0225     this.sparkSchema = new ParquetToSparkSchemaConverter(config).convert(requestedSchema);
0226     this.reader = new ParquetFileReader(
0227         config, footer.getFileMetaData(), file, blocks, requestedSchema.getColumns());
0228     // use the blocks from the reader in case some do not match filters and will not be read
0229     for (BlockMetaData block : reader.getRowGroups()) {
0230       this.totalRowCount += block.getRowCount();
0231     }
0232   }
0233 
0234   @Override
0235   public Void getCurrentKey() {
0236     return null;
0237   }
0238 
0239   @Override
0240   public void close() throws IOException {
0241     if (reader != null) {
0242       reader.close();
0243       reader = null;
0244     }
0245   }
0246 
0247   /**
0248    * Utility classes to abstract over different way to read ints with different encodings.
0249    * TODO: remove this layer of abstraction?
0250    */
0251   abstract static class IntIterator {
0252     abstract int nextInt() throws IOException;
0253   }
0254 
0255   protected static final class ValuesReaderIntIterator extends IntIterator {
0256     ValuesReader delegate;
0257 
0258     public ValuesReaderIntIterator(ValuesReader delegate) {
0259       this.delegate = delegate;
0260     }
0261 
0262     @Override
0263     int nextInt() {
0264       return delegate.readInteger();
0265     }
0266   }
0267 
0268   protected static final class RLEIntIterator extends IntIterator {
0269     RunLengthBitPackingHybridDecoder delegate;
0270 
0271     public RLEIntIterator(RunLengthBitPackingHybridDecoder delegate) {
0272       this.delegate = delegate;
0273     }
0274 
0275     @Override
0276     int nextInt() throws IOException {
0277       return delegate.readInt();
0278     }
0279   }
0280 
0281   protected static final class NullIntIterator extends IntIterator {
0282     @Override
0283     int nextInt() { return 0; }
0284   }
0285 
0286   /**
0287    * Creates a reader for definition and repetition levels, returning an optimized one if
0288    * the levels are not needed.
0289    */
0290   protected static IntIterator createRLEIterator(
0291       int maxLevel, BytesInput bytes, ColumnDescriptor descriptor) throws IOException {
0292     try {
0293       if (maxLevel == 0) return new NullIntIterator();
0294       return new RLEIntIterator(
0295           new RunLengthBitPackingHybridDecoder(
0296               BytesUtils.getWidthFromMaxInt(maxLevel),
0297               bytes.toInputStream()));
0298     } catch (IOException e) {
0299       throw new IOException("could not read levels in page for col " + descriptor, e);
0300     }
0301   }
0302 
0303   private static <K, V> Map<K, Set<V>> toSetMultiMap(Map<K, V> map) {
0304     Map<K, Set<V>> setMultiMap = new HashMap<>();
0305     for (Map.Entry<K, V> entry : map.entrySet()) {
0306       Set<V> set = new HashSet<>();
0307       set.add(entry.getValue());
0308       setMultiMap.put(entry.getKey(), Collections.unmodifiableSet(set));
0309     }
0310     return Collections.unmodifiableMap(setMultiMap);
0311   }
0312 
0313   @SuppressWarnings("unchecked")
0314   private Class<? extends ReadSupport<T>> getReadSupportClass(Configuration configuration) {
0315     return (Class<? extends ReadSupport<T>>) ConfigurationUtil.getClassFromConfig(configuration,
0316         ParquetInputFormat.READ_SUPPORT_CLASS, ReadSupport.class);
0317   }
0318 
0319   /**
0320    * @param readSupportClass to instantiate
0321    * @return the configured read support
0322    */
0323   private static <T> ReadSupport<T> getReadSupportInstance(
0324       Class<? extends ReadSupport<T>> readSupportClass){
0325     try {
0326       return readSupportClass.getConstructor().newInstance();
0327     } catch (InstantiationException | IllegalAccessException |
0328              NoSuchMethodException | InvocationTargetException e) {
0329       throw new BadConfigurationException("could not instantiate read support class", e);
0330     }
0331   }
0332 }