0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package test.org.apache.spark.sql.connector;
0019
0020 import java.io.IOException;
0021
0022 import org.apache.spark.sql.catalyst.InternalRow;
0023 import org.apache.spark.sql.connector.TestingV2Source;
0024 import org.apache.spark.sql.connector.catalog.Table;
0025 import org.apache.spark.sql.connector.read.InputPartition;
0026 import org.apache.spark.sql.connector.read.PartitionReader;
0027 import org.apache.spark.sql.connector.read.PartitionReaderFactory;
0028 import org.apache.spark.sql.connector.read.ScanBuilder;
0029 import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector;
0030 import org.apache.spark.sql.types.DataTypes;
0031 import org.apache.spark.sql.util.CaseInsensitiveStringMap;
0032 import org.apache.spark.sql.vectorized.ColumnVector;
0033 import org.apache.spark.sql.vectorized.ColumnarBatch;
0034
0035
0036 public class JavaColumnarDataSourceV2 implements TestingV2Source {
0037
0038 class MyScanBuilder extends JavaSimpleScanBuilder {
0039
0040 @Override
0041 public InputPartition[] planInputPartitions() {
0042 InputPartition[] partitions = new InputPartition[2];
0043 partitions[0] = new JavaRangeInputPartition(0, 50);
0044 partitions[1] = new JavaRangeInputPartition(50, 90);
0045 return partitions;
0046 }
0047
0048 @Override
0049 public PartitionReaderFactory createReaderFactory() {
0050 return new ColumnarReaderFactory();
0051 }
0052 }
0053
0054 @Override
0055 public Table getTable(CaseInsensitiveStringMap options) {
0056 return new JavaSimpleBatchTable() {
0057 @Override
0058 public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
0059 return new MyScanBuilder();
0060 }
0061 };
0062 }
0063
0064 static class ColumnarReaderFactory implements PartitionReaderFactory {
0065 private static final int BATCH_SIZE = 20;
0066
0067 @Override
0068 public boolean supportColumnarReads(InputPartition partition) {
0069 return true;
0070 }
0071
0072 @Override
0073 public PartitionReader<InternalRow> createReader(InputPartition partition) {
0074 throw new UnsupportedOperationException("");
0075 }
0076
0077 @Override
0078 public PartitionReader<ColumnarBatch> createColumnarReader(InputPartition partition) {
0079 JavaRangeInputPartition p = (JavaRangeInputPartition) partition;
0080 OnHeapColumnVector i = new OnHeapColumnVector(BATCH_SIZE, DataTypes.IntegerType);
0081 OnHeapColumnVector j = new OnHeapColumnVector(BATCH_SIZE, DataTypes.IntegerType);
0082 ColumnVector[] vectors = new ColumnVector[2];
0083 vectors[0] = i;
0084 vectors[1] = j;
0085 ColumnarBatch batch = new ColumnarBatch(vectors);
0086
0087 return new PartitionReader<ColumnarBatch>() {
0088 private int current = p.start;
0089
0090 @Override
0091 public boolean next() throws IOException {
0092 i.reset();
0093 j.reset();
0094 int count = 0;
0095 while (current < p.end && count < BATCH_SIZE) {
0096 i.putInt(count, current);
0097 j.putInt(count, -current);
0098 current += 1;
0099 count += 1;
0100 }
0101
0102 if (count == 0) {
0103 return false;
0104 } else {
0105 batch.setNumRows(count);
0106 return true;
0107 }
0108 }
0109
0110 @Override
0111 public ColumnarBatch get() {
0112 return batch;
0113 }
0114
0115 @Override
0116 public void close() throws IOException {
0117 batch.close();
0118 }
0119 };
0120 }
0121 }
0122 }