0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package test.org.apache.spark.sql.connector;
0019
0020 import java.io.IOException;
0021 import java.util.Arrays;
0022
0023 import org.apache.spark.sql.catalyst.InternalRow;
0024 import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
0025 import org.apache.spark.sql.connector.TestingV2Source;
0026 import org.apache.spark.sql.connector.expressions.Expressions;
0027 import org.apache.spark.sql.connector.expressions.Transform;
0028 import org.apache.spark.sql.connector.catalog.Table;
0029 import org.apache.spark.sql.connector.read.*;
0030 import org.apache.spark.sql.connector.read.partitioning.ClusteredDistribution;
0031 import org.apache.spark.sql.connector.read.partitioning.Distribution;
0032 import org.apache.spark.sql.connector.read.partitioning.Partitioning;
0033 import org.apache.spark.sql.util.CaseInsensitiveStringMap;
0034
0035 public class JavaPartitionAwareDataSource implements TestingV2Source {
0036
0037 class MyScanBuilder extends JavaSimpleScanBuilder implements SupportsReportPartitioning {
0038
0039 @Override
0040 public InputPartition[] planInputPartitions() {
0041 InputPartition[] partitions = new InputPartition[2];
0042 partitions[0] = new SpecificInputPartition(new int[]{1, 1, 3}, new int[]{4, 4, 6});
0043 partitions[1] = new SpecificInputPartition(new int[]{2, 4, 4}, new int[]{6, 2, 2});
0044 return partitions;
0045 }
0046
0047 @Override
0048 public PartitionReaderFactory createReaderFactory() {
0049 return new SpecificReaderFactory();
0050 }
0051
0052 @Override
0053 public Partitioning outputPartitioning() {
0054 return new MyPartitioning();
0055 }
0056 }
0057
0058 @Override
0059 public Table getTable(CaseInsensitiveStringMap options) {
0060 return new JavaSimpleBatchTable() {
0061 @Override
0062 public Transform[] partitioning() {
0063 return new Transform[] { Expressions.identity("i") };
0064 }
0065
0066 @Override
0067 public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
0068 return new MyScanBuilder();
0069 }
0070 };
0071 }
0072
0073 static class MyPartitioning implements Partitioning {
0074
0075 @Override
0076 public int numPartitions() {
0077 return 2;
0078 }
0079
0080 @Override
0081 public boolean satisfy(Distribution distribution) {
0082 if (distribution instanceof ClusteredDistribution) {
0083 String[] clusteredCols = ((ClusteredDistribution) distribution).clusteredColumns;
0084 return Arrays.asList(clusteredCols).contains("i");
0085 }
0086
0087 return false;
0088 }
0089 }
0090
0091 static class SpecificInputPartition implements InputPartition {
0092 int[] i;
0093 int[] j;
0094
0095 SpecificInputPartition(int[] i, int[] j) {
0096 assert i.length == j.length;
0097 this.i = i;
0098 this.j = j;
0099 }
0100 }
0101
0102 static class SpecificReaderFactory implements PartitionReaderFactory {
0103
0104 @Override
0105 public PartitionReader<InternalRow> createReader(InputPartition partition) {
0106 SpecificInputPartition p = (SpecificInputPartition) partition;
0107 return new PartitionReader<InternalRow>() {
0108 private int current = -1;
0109
0110 @Override
0111 public boolean next() throws IOException {
0112 current += 1;
0113 return current < p.i.length;
0114 }
0115
0116 @Override
0117 public InternalRow get() {
0118 return new GenericInternalRow(new Object[] {p.i[current], p.j[current]});
0119 }
0120
0121 @Override
0122 public void close() throws IOException {
0123
0124 }
0125 };
0126 }
0127 }
0128 }