Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.sql.connector.read.streaming;
0019 
0020 import org.apache.spark.annotation.Evolving;
0021 import org.apache.spark.sql.connector.read.InputPartition;
0022 import org.apache.spark.sql.connector.read.Scan;
0023 
0024 /**
0025  * A {@link SparkDataStream} for streaming queries with continuous mode.
0026  *
0027  * @since 3.0.0
0028  */
0029 @Evolving
0030 public interface ContinuousStream extends SparkDataStream {
0031 
0032   /**
0033    * Returns a list of {@link InputPartition input partitions} given the start offset. Each
0034    * {@link InputPartition} represents a data split that can be processed by one Spark task. The
0035    * number of input partitions returned here is the same as the number of RDD partitions this scan
0036    * outputs.
0037    * <p>
0038    * If the {@link Scan} supports filter pushdown, this stream is likely configured with a filter
0039    * and is responsible for creating splits for that filter, which is not a full scan.
0040    * </p>
0041    * <p>
0042    * This method will be called to launch one Spark job for reading the data stream. It will be
0043    * called more than once, if {@link #needsReconfiguration()} returns true and Spark needs to
0044    * launch a new job.
0045    * </p>
0046    */
0047   InputPartition[] planInputPartitions(Offset start);
0048 
0049   /**
0050    * Returns a factory to create a {@link ContinuousPartitionReader} for each
0051    * {@link InputPartition}.
0052    */
0053   ContinuousPartitionReaderFactory createContinuousReaderFactory();
0054 
0055   /**
0056    * Merge partitioned offsets coming from {@link ContinuousPartitionReader} instances
0057    * for each partition to a single global offset.
0058    */
0059   Offset mergeOffsets(PartitionOffset[] offsets);
0060 
0061   /**
0062    * The execution engine will call this method in every epoch to determine if new input
0063    * partitions need to be generated, which may be required if for example the underlying
0064    * source system has had partitions added or removed.
0065    *
0066    * If true, the Spark job to scan this continuous data stream will be interrupted and Spark will
0067    * launch it again with a new list of {@link InputPartition input partitions}.
0068    */
0069   default boolean needsReconfiguration() {
0070     return false;
0071   }
0072 }