Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.sql.streaming;
0019 
0020 import java.util.concurrent.TimeUnit;
0021 
0022 import org.apache.spark.annotation.Evolving;
0023 import org.apache.spark.sql.execution.streaming.ProcessingTimeTrigger;
0024 import scala.concurrent.duration.Duration;
0025 
0026 import org.apache.spark.sql.execution.streaming.ContinuousTrigger;
0027 import org.apache.spark.sql.execution.streaming.OneTimeTrigger$;
0028 
0029 /**
0030  * Policy used to indicate how often results should be produced by a [[StreamingQuery]].
0031  *
0032  * @since 2.0.0
0033  */
0034 @Evolving
0035 public class Trigger {
0036 
0037   /**
0038    * A trigger policy that runs a query periodically based on an interval in processing time.
0039    * If `interval` is 0, the query will run as fast as possible.
0040    *
0041    * @since 2.2.0
0042    */
0043   public static Trigger ProcessingTime(long intervalMs) {
0044       return ProcessingTimeTrigger.create(intervalMs, TimeUnit.MILLISECONDS);
0045   }
0046 
0047   /**
0048    * (Java-friendly)
0049    * A trigger policy that runs a query periodically based on an interval in processing time.
0050    * If `interval` is 0, the query will run as fast as possible.
0051    *
0052    * {{{
0053    *    import java.util.concurrent.TimeUnit
0054    *    df.writeStream().trigger(Trigger.ProcessingTime(10, TimeUnit.SECONDS))
0055    * }}}
0056    *
0057    * @since 2.2.0
0058    */
0059   public static Trigger ProcessingTime(long interval, TimeUnit timeUnit) {
0060       return ProcessingTimeTrigger.create(interval, timeUnit);
0061   }
0062 
0063   /**
0064    * (Scala-friendly)
0065    * A trigger policy that runs a query periodically based on an interval in processing time.
0066    * If `duration` is 0, the query will run as fast as possible.
0067    *
0068    * {{{
0069    *    import scala.concurrent.duration._
0070    *    df.writeStream.trigger(Trigger.ProcessingTime(10.seconds))
0071    * }}}
0072    * @since 2.2.0
0073    */
0074   public static Trigger ProcessingTime(Duration interval) {
0075       return ProcessingTimeTrigger.apply(interval);
0076   }
0077 
0078   /**
0079    * A trigger policy that runs a query periodically based on an interval in processing time.
0080    * If `interval` is effectively 0, the query will run as fast as possible.
0081    *
0082    * {{{
0083    *    df.writeStream.trigger(Trigger.ProcessingTime("10 seconds"))
0084    * }}}
0085    * @since 2.2.0
0086    */
0087   public static Trigger ProcessingTime(String interval) {
0088       return ProcessingTimeTrigger.apply(interval);
0089   }
0090 
0091   /**
0092    * A trigger that process only one batch of data in a streaming query then terminates
0093    * the query.
0094    *
0095    * @since 2.2.0
0096    */
0097   public static Trigger Once() {
0098     return OneTimeTrigger$.MODULE$;
0099   }
0100 
0101   /**
0102    * A trigger that continuously processes streaming data, asynchronously checkpointing at
0103    * the specified interval.
0104    *
0105    * @since 2.3.0
0106    */
0107   public static Trigger Continuous(long intervalMs) {
0108     return ContinuousTrigger.apply(intervalMs);
0109   }
0110 
0111   /**
0112    * A trigger that continuously processes streaming data, asynchronously checkpointing at
0113    * the specified interval.
0114    *
0115    * {{{
0116    *    import java.util.concurrent.TimeUnit
0117    *    df.writeStream.trigger(Trigger.Continuous(10, TimeUnit.SECONDS))
0118    * }}}
0119    *
0120    * @since 2.3.0
0121    */
0122   public static Trigger Continuous(long interval, TimeUnit timeUnit) {
0123     return ContinuousTrigger.create(interval, timeUnit);
0124   }
0125 
0126   /**
0127    * (Scala-friendly)
0128    * A trigger that continuously processes streaming data, asynchronously checkpointing at
0129    * the specified interval.
0130    *
0131    * {{{
0132    *    import scala.concurrent.duration._
0133    *    df.writeStream.trigger(Trigger.Continuous(10.seconds))
0134    * }}}
0135    * @since 2.3.0
0136    */
0137   public static Trigger Continuous(Duration interval) {
0138     return ContinuousTrigger.apply(interval);
0139   }
0140 
0141   /**
0142    * A trigger that continuously processes streaming data, asynchronously checkpointing at
0143    * the specified interval.
0144    *
0145    * {{{
0146    *    df.writeStream.trigger(Trigger.Continuous("10 seconds"))
0147    * }}}
0148    * @since 2.3.0
0149    */
0150   public static Trigger Continuous(String interval) {
0151     return ContinuousTrigger.apply(interval);
0152   }
0153 }