Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.sql.execution.datasources.orc;
0019 
0020 import java.io.IOException;
0021 
0022 import com.google.common.annotations.VisibleForTesting;
0023 import org.apache.hadoop.conf.Configuration;
0024 import org.apache.hadoop.mapreduce.InputSplit;
0025 import org.apache.hadoop.mapreduce.RecordReader;
0026 import org.apache.hadoop.mapreduce.TaskAttemptContext;
0027 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
0028 import org.apache.orc.OrcConf;
0029 import org.apache.orc.OrcFile;
0030 import org.apache.orc.Reader;
0031 import org.apache.orc.TypeDescription;
0032 import org.apache.orc.mapred.OrcInputFormat;
0033 
0034 import org.apache.spark.sql.catalyst.InternalRow;
0035 import org.apache.spark.sql.execution.datasources.orc.OrcShimUtils.VectorizedRowBatchWrap;
0036 import org.apache.spark.sql.execution.vectorized.ColumnVectorUtils;
0037 import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector;
0038 import org.apache.spark.sql.types.*;
0039 import org.apache.spark.sql.vectorized.ColumnarBatch;
0040 
0041 
0042 /**
0043  * To support vectorization in WholeStageCodeGen, this reader returns ColumnarBatch.
0044  * After creating, `initialize` and `initBatch` should be called sequentially.
0045  */
0046 public class OrcColumnarBatchReader extends RecordReader<Void, ColumnarBatch> {
0047 
0048   // The capacity of vectorized batch.
0049   private int capacity;
0050 
0051   // Vectorized ORC Row Batch wrap.
0052   private VectorizedRowBatchWrap wrap;
0053 
0054   /**
0055    * The column IDs of the physical ORC file schema which are required by this reader.
0056    * -1 means this required column is partition column, or it doesn't exist in the ORC file.
0057    * Ideally partition column should never appear in the physical file, and should only appear
0058    * in the directory name. However, Spark allows partition columns inside physical file,
0059    * but Spark will discard the values from the file, and use the partition value got from
0060    * directory name. The column order will be reserved though.
0061    */
0062   @VisibleForTesting
0063   public int[] requestedDataColIds;
0064 
0065   // Record reader from ORC row batch.
0066   private org.apache.orc.RecordReader recordReader;
0067 
0068   private StructField[] requiredFields;
0069 
0070   // The result columnar batch for vectorized execution by whole-stage codegen.
0071   @VisibleForTesting
0072   public ColumnarBatch columnarBatch;
0073 
0074   // The wrapped ORC column vectors.
0075   private org.apache.spark.sql.vectorized.ColumnVector[] orcVectorWrappers;
0076 
0077   public OrcColumnarBatchReader(int capacity) {
0078     this.capacity = capacity;
0079   }
0080 
0081 
0082   @Override
0083   public Void getCurrentKey() {
0084     return null;
0085   }
0086 
0087   @Override
0088   public ColumnarBatch getCurrentValue() {
0089     return columnarBatch;
0090   }
0091 
0092   @Override
0093   public float getProgress() throws IOException {
0094     return recordReader.getProgress();
0095   }
0096 
0097   @Override
0098   public boolean nextKeyValue() throws IOException {
0099     return nextBatch();
0100   }
0101 
0102   @Override
0103   public void close() throws IOException {
0104     if (columnarBatch != null) {
0105       columnarBatch.close();
0106       columnarBatch = null;
0107     }
0108     if (recordReader != null) {
0109       recordReader.close();
0110       recordReader = null;
0111     }
0112   }
0113 
0114   /**
0115    * Initialize ORC file reader and batch record reader.
0116    * Please note that `initBatch` is needed to be called after this.
0117    */
0118   @Override
0119   public void initialize(
0120       InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException {
0121     FileSplit fileSplit = (FileSplit)inputSplit;
0122     Configuration conf = taskAttemptContext.getConfiguration();
0123     Reader reader = OrcFile.createReader(
0124       fileSplit.getPath(),
0125       OrcFile.readerOptions(conf)
0126         .maxLength(OrcConf.MAX_FILE_LENGTH.getLong(conf))
0127         .filesystem(fileSplit.getPath().getFileSystem(conf)));
0128     Reader.Options options =
0129       OrcInputFormat.buildOptions(conf, reader, fileSplit.getStart(), fileSplit.getLength());
0130     recordReader = reader.rows(options);
0131   }
0132 
0133   /**
0134    * Initialize columnar batch by setting required schema and partition information.
0135    * With this information, this creates ColumnarBatch with the full schema.
0136    *
0137    * @param orcSchema Schema from ORC file reader.
0138    * @param requiredFields All the fields that are required to return, including partition fields.
0139    * @param requestedDataColIds Requested column ids from orcSchema. -1 if not existed.
0140    * @param requestedPartitionColIds Requested column ids from partition schema. -1 if not existed.
0141    * @param partitionValues Values of partition columns.
0142    */
0143   public void initBatch(
0144       TypeDescription orcSchema,
0145       StructField[] requiredFields,
0146       int[] requestedDataColIds,
0147       int[] requestedPartitionColIds,
0148       InternalRow partitionValues) {
0149     wrap = new VectorizedRowBatchWrap(orcSchema.createRowBatch(capacity));
0150     assert(!wrap.batch().selectedInUse); // `selectedInUse` should be initialized with `false`.
0151     assert(requiredFields.length == requestedDataColIds.length);
0152     assert(requiredFields.length == requestedPartitionColIds.length);
0153     // If a required column is also partition column, use partition value and don't read from file.
0154     for (int i = 0; i < requiredFields.length; i++) {
0155       if (requestedPartitionColIds[i] != -1) {
0156         requestedDataColIds[i] = -1;
0157       }
0158     }
0159     this.requiredFields = requiredFields;
0160     this.requestedDataColIds = requestedDataColIds;
0161 
0162     StructType resultSchema = new StructType(requiredFields);
0163 
0164     // Just wrap the ORC column vector instead of copying it to Spark column vector.
0165     orcVectorWrappers = new org.apache.spark.sql.vectorized.ColumnVector[resultSchema.length()];
0166 
0167     for (int i = 0; i < requiredFields.length; i++) {
0168       DataType dt = requiredFields[i].dataType();
0169       if (requestedPartitionColIds[i] != -1) {
0170         OnHeapColumnVector partitionCol = new OnHeapColumnVector(capacity, dt);
0171         ColumnVectorUtils.populate(partitionCol, partitionValues, requestedPartitionColIds[i]);
0172         partitionCol.setIsConstant();
0173         orcVectorWrappers[i] = partitionCol;
0174       } else {
0175         int colId = requestedDataColIds[i];
0176         // Initialize the missing columns once.
0177         if (colId == -1) {
0178           OnHeapColumnVector missingCol = new OnHeapColumnVector(capacity, dt);
0179           missingCol.putNulls(0, capacity);
0180           missingCol.setIsConstant();
0181           orcVectorWrappers[i] = missingCol;
0182         } else {
0183           orcVectorWrappers[i] = new OrcColumnVector(dt, wrap.batch().cols[colId]);
0184         }
0185       }
0186     }
0187 
0188     columnarBatch = new ColumnarBatch(orcVectorWrappers);
0189   }
0190 
0191   /**
0192    * Return true if there exists more data in the next batch. If exists, prepare the next batch
0193    * by copying from ORC VectorizedRowBatch columns to Spark ColumnarBatch columns.
0194    */
0195   private boolean nextBatch() throws IOException {
0196     recordReader.nextBatch(wrap.batch());
0197     int batchSize = wrap.batch().size;
0198     if (batchSize == 0) {
0199       return false;
0200     }
0201     columnarBatch.setNumRows(batchSize);
0202 
0203     for (int i = 0; i < requiredFields.length; i++) {
0204       if (requestedDataColIds[i] != -1) {
0205         ((OrcColumnVector) orcVectorWrappers[i]).setBatchSize(batchSize);
0206       }
0207     }
0208     return true;
0209   }
0210 }