0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.sql.execution.datasources.orc;
0019
0020 import java.io.IOException;
0021
0022 import com.google.common.annotations.VisibleForTesting;
0023 import org.apache.hadoop.conf.Configuration;
0024 import org.apache.hadoop.mapreduce.InputSplit;
0025 import org.apache.hadoop.mapreduce.RecordReader;
0026 import org.apache.hadoop.mapreduce.TaskAttemptContext;
0027 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
0028 import org.apache.orc.OrcConf;
0029 import org.apache.orc.OrcFile;
0030 import org.apache.orc.Reader;
0031 import org.apache.orc.TypeDescription;
0032 import org.apache.orc.mapred.OrcInputFormat;
0033
0034 import org.apache.spark.sql.catalyst.InternalRow;
0035 import org.apache.spark.sql.execution.datasources.orc.OrcShimUtils.VectorizedRowBatchWrap;
0036 import org.apache.spark.sql.execution.vectorized.ColumnVectorUtils;
0037 import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector;
0038 import org.apache.spark.sql.types.*;
0039 import org.apache.spark.sql.vectorized.ColumnarBatch;
0040
0041
0042
0043
0044
0045
0046 public class OrcColumnarBatchReader extends RecordReader<Void, ColumnarBatch> {
0047
0048
0049 private int capacity;
0050
0051
0052 private VectorizedRowBatchWrap wrap;
0053
0054
0055
0056
0057
0058
0059
0060
0061
0062 @VisibleForTesting
0063 public int[] requestedDataColIds;
0064
0065
0066 private org.apache.orc.RecordReader recordReader;
0067
0068 private StructField[] requiredFields;
0069
0070
0071 @VisibleForTesting
0072 public ColumnarBatch columnarBatch;
0073
0074
0075 private org.apache.spark.sql.vectorized.ColumnVector[] orcVectorWrappers;
0076
0077 public OrcColumnarBatchReader(int capacity) {
0078 this.capacity = capacity;
0079 }
0080
0081
0082 @Override
0083 public Void getCurrentKey() {
0084 return null;
0085 }
0086
0087 @Override
0088 public ColumnarBatch getCurrentValue() {
0089 return columnarBatch;
0090 }
0091
0092 @Override
0093 public float getProgress() throws IOException {
0094 return recordReader.getProgress();
0095 }
0096
0097 @Override
0098 public boolean nextKeyValue() throws IOException {
0099 return nextBatch();
0100 }
0101
0102 @Override
0103 public void close() throws IOException {
0104 if (columnarBatch != null) {
0105 columnarBatch.close();
0106 columnarBatch = null;
0107 }
0108 if (recordReader != null) {
0109 recordReader.close();
0110 recordReader = null;
0111 }
0112 }
0113
0114
0115
0116
0117
0118 @Override
0119 public void initialize(
0120 InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException {
0121 FileSplit fileSplit = (FileSplit)inputSplit;
0122 Configuration conf = taskAttemptContext.getConfiguration();
0123 Reader reader = OrcFile.createReader(
0124 fileSplit.getPath(),
0125 OrcFile.readerOptions(conf)
0126 .maxLength(OrcConf.MAX_FILE_LENGTH.getLong(conf))
0127 .filesystem(fileSplit.getPath().getFileSystem(conf)));
0128 Reader.Options options =
0129 OrcInputFormat.buildOptions(conf, reader, fileSplit.getStart(), fileSplit.getLength());
0130 recordReader = reader.rows(options);
0131 }
0132
0133
0134
0135
0136
0137
0138
0139
0140
0141
0142
0143 public void initBatch(
0144 TypeDescription orcSchema,
0145 StructField[] requiredFields,
0146 int[] requestedDataColIds,
0147 int[] requestedPartitionColIds,
0148 InternalRow partitionValues) {
0149 wrap = new VectorizedRowBatchWrap(orcSchema.createRowBatch(capacity));
0150 assert(!wrap.batch().selectedInUse);
0151 assert(requiredFields.length == requestedDataColIds.length);
0152 assert(requiredFields.length == requestedPartitionColIds.length);
0153
0154 for (int i = 0; i < requiredFields.length; i++) {
0155 if (requestedPartitionColIds[i] != -1) {
0156 requestedDataColIds[i] = -1;
0157 }
0158 }
0159 this.requiredFields = requiredFields;
0160 this.requestedDataColIds = requestedDataColIds;
0161
0162 StructType resultSchema = new StructType(requiredFields);
0163
0164
0165 orcVectorWrappers = new org.apache.spark.sql.vectorized.ColumnVector[resultSchema.length()];
0166
0167 for (int i = 0; i < requiredFields.length; i++) {
0168 DataType dt = requiredFields[i].dataType();
0169 if (requestedPartitionColIds[i] != -1) {
0170 OnHeapColumnVector partitionCol = new OnHeapColumnVector(capacity, dt);
0171 ColumnVectorUtils.populate(partitionCol, partitionValues, requestedPartitionColIds[i]);
0172 partitionCol.setIsConstant();
0173 orcVectorWrappers[i] = partitionCol;
0174 } else {
0175 int colId = requestedDataColIds[i];
0176
0177 if (colId == -1) {
0178 OnHeapColumnVector missingCol = new OnHeapColumnVector(capacity, dt);
0179 missingCol.putNulls(0, capacity);
0180 missingCol.setIsConstant();
0181 orcVectorWrappers[i] = missingCol;
0182 } else {
0183 orcVectorWrappers[i] = new OrcColumnVector(dt, wrap.batch().cols[colId]);
0184 }
0185 }
0186 }
0187
0188 columnarBatch = new ColumnarBatch(orcVectorWrappers);
0189 }
0190
0191
0192
0193
0194
0195 private boolean nextBatch() throws IOException {
0196 recordReader.nextBatch(wrap.batch());
0197 int batchSize = wrap.batch().size;
0198 if (batchSize == 0) {
0199 return false;
0200 }
0201 columnarBatch.setNumRows(batchSize);
0202
0203 for (int i = 0; i < requiredFields.length; i++) {
0204 if (requestedDataColIds[i] != -1) {
0205 ((OrcColumnVector) orcVectorWrappers[i]).setBatchSize(batchSize);
0206 }
0207 }
0208 return true;
0209 }
0210 }