Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package test.org.apache.spark.sql.connector;
0019 
0020 import java.io.IOException;
0021 import java.util.Arrays;
0022 
0023 import org.apache.spark.sql.catalyst.InternalRow;
0024 import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
0025 import org.apache.spark.sql.connector.TestingV2Source;
0026 import org.apache.spark.sql.connector.expressions.Expressions;
0027 import org.apache.spark.sql.connector.expressions.Transform;
0028 import org.apache.spark.sql.connector.catalog.Table;
0029 import org.apache.spark.sql.connector.read.*;
0030 import org.apache.spark.sql.connector.read.partitioning.ClusteredDistribution;
0031 import org.apache.spark.sql.connector.read.partitioning.Distribution;
0032 import org.apache.spark.sql.connector.read.partitioning.Partitioning;
0033 import org.apache.spark.sql.util.CaseInsensitiveStringMap;
0034 
0035 public class JavaPartitionAwareDataSource implements TestingV2Source {
0036 
0037   class MyScanBuilder extends JavaSimpleScanBuilder implements SupportsReportPartitioning {
0038 
0039     @Override
0040     public InputPartition[] planInputPartitions() {
0041       InputPartition[] partitions = new InputPartition[2];
0042       partitions[0] = new SpecificInputPartition(new int[]{1, 1, 3}, new int[]{4, 4, 6});
0043       partitions[1] = new SpecificInputPartition(new int[]{2, 4, 4}, new int[]{6, 2, 2});
0044       return partitions;
0045     }
0046 
0047     @Override
0048     public PartitionReaderFactory createReaderFactory() {
0049       return new SpecificReaderFactory();
0050     }
0051 
0052     @Override
0053     public Partitioning outputPartitioning() {
0054       return new MyPartitioning();
0055     }
0056   }
0057 
0058   @Override
0059   public Table getTable(CaseInsensitiveStringMap options) {
0060     return new JavaSimpleBatchTable() {
0061       @Override
0062       public Transform[] partitioning() {
0063         return new Transform[] { Expressions.identity("i") };
0064       }
0065 
0066       @Override
0067       public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
0068         return new MyScanBuilder();
0069       }
0070     };
0071   }
0072 
0073   static class MyPartitioning implements Partitioning {
0074 
0075     @Override
0076     public int numPartitions() {
0077       return 2;
0078     }
0079 
0080     @Override
0081     public boolean satisfy(Distribution distribution) {
0082       if (distribution instanceof ClusteredDistribution) {
0083         String[] clusteredCols = ((ClusteredDistribution) distribution).clusteredColumns;
0084         return Arrays.asList(clusteredCols).contains("i");
0085       }
0086 
0087       return false;
0088     }
0089   }
0090 
0091   static class SpecificInputPartition implements InputPartition {
0092     int[] i;
0093     int[] j;
0094 
0095     SpecificInputPartition(int[] i, int[] j) {
0096       assert i.length == j.length;
0097       this.i = i;
0098       this.j = j;
0099     }
0100   }
0101 
0102   static class SpecificReaderFactory implements PartitionReaderFactory {
0103 
0104     @Override
0105     public PartitionReader<InternalRow> createReader(InputPartition partition) {
0106       SpecificInputPartition p = (SpecificInputPartition) partition;
0107       return new PartitionReader<InternalRow>() {
0108         private int current = -1;
0109 
0110         @Override
0111         public boolean next() throws IOException {
0112           current += 1;
0113           return current < p.i.length;
0114         }
0115 
0116         @Override
0117         public InternalRow get() {
0118           return new GenericInternalRow(new Object[] {p.i[current], p.j[current]});
0119         }
0120 
0121         @Override
0122         public void close() throws IOException {
0123 
0124         }
0125       };
0126     }
0127   }
0128 }