Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package test.org.apache.spark.sql.connector;
0019 
0020 import java.io.IOException;
0021 
0022 import org.apache.spark.sql.catalyst.InternalRow;
0023 import org.apache.spark.sql.connector.TestingV2Source;
0024 import org.apache.spark.sql.connector.catalog.Table;
0025 import org.apache.spark.sql.connector.read.InputPartition;
0026 import org.apache.spark.sql.connector.read.PartitionReader;
0027 import org.apache.spark.sql.connector.read.PartitionReaderFactory;
0028 import org.apache.spark.sql.connector.read.ScanBuilder;
0029 import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector;
0030 import org.apache.spark.sql.types.DataTypes;
0031 import org.apache.spark.sql.util.CaseInsensitiveStringMap;
0032 import org.apache.spark.sql.vectorized.ColumnVector;
0033 import org.apache.spark.sql.vectorized.ColumnarBatch;
0034 
0035 
0036 public class JavaColumnarDataSourceV2 implements TestingV2Source {
0037 
0038   class MyScanBuilder extends JavaSimpleScanBuilder {
0039 
0040     @Override
0041     public InputPartition[] planInputPartitions() {
0042       InputPartition[] partitions = new InputPartition[2];
0043       partitions[0] = new JavaRangeInputPartition(0, 50);
0044       partitions[1] = new JavaRangeInputPartition(50, 90);
0045       return partitions;
0046     }
0047 
0048     @Override
0049     public PartitionReaderFactory createReaderFactory() {
0050       return new ColumnarReaderFactory();
0051     }
0052   }
0053 
0054   @Override
0055   public Table getTable(CaseInsensitiveStringMap options) {
0056     return new JavaSimpleBatchTable() {
0057       @Override
0058       public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
0059         return new MyScanBuilder();
0060       }
0061     };
0062   }
0063 
0064   static class ColumnarReaderFactory implements PartitionReaderFactory {
0065     private static final int BATCH_SIZE = 20;
0066 
0067     @Override
0068     public boolean supportColumnarReads(InputPartition partition) {
0069       return true;
0070     }
0071 
0072     @Override
0073     public PartitionReader<InternalRow> createReader(InputPartition partition) {
0074       throw new UnsupportedOperationException("");
0075     }
0076 
0077     @Override
0078     public PartitionReader<ColumnarBatch> createColumnarReader(InputPartition partition) {
0079       JavaRangeInputPartition p = (JavaRangeInputPartition) partition;
0080       OnHeapColumnVector i = new OnHeapColumnVector(BATCH_SIZE, DataTypes.IntegerType);
0081       OnHeapColumnVector j = new OnHeapColumnVector(BATCH_SIZE, DataTypes.IntegerType);
0082       ColumnVector[] vectors = new ColumnVector[2];
0083       vectors[0] = i;
0084       vectors[1] = j;
0085       ColumnarBatch batch = new ColumnarBatch(vectors);
0086 
0087       return new PartitionReader<ColumnarBatch>() {
0088         private int current = p.start;
0089 
0090         @Override
0091         public boolean next() throws IOException {
0092           i.reset();
0093           j.reset();
0094           int count = 0;
0095           while (current < p.end && count < BATCH_SIZE) {
0096             i.putInt(count, current);
0097             j.putInt(count, -current);
0098             current += 1;
0099             count += 1;
0100           }
0101 
0102           if (count == 0) {
0103             return false;
0104           } else {
0105             batch.setNumRows(count);
0106             return true;
0107           }
0108         }
0109 
0110         @Override
0111         public ColumnarBatch get() {
0112           return batch;
0113         }
0114 
0115         @Override
0116         public void close() throws IOException {
0117           batch.close();
0118         }
0119       };
0120     }
0121   }
0122 }