|
||||
0001 /* 0002 * Licensed to the Apache Software Foundation (ASF) under one or more 0003 * contributor license agreements. See the NOTICE file distributed with 0004 * this work for additional information regarding copyright ownership. 0005 * The ASF licenses this file to You under the Apache License, Version 2.0 0006 * (the "License"); you may not use this file except in compliance with 0007 * the License. You may obtain a copy of the License at 0008 * 0009 * http://www.apache.org/licenses/LICENSE-2.0 0010 * 0011 * Unless required by applicable law or agreed to in writing, software 0012 * distributed under the License is distributed on an "AS IS" BASIS, 0013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 0014 * See the License for the specific language governing permissions and 0015 * limitations under the License. 0016 */ 0017 0018 package org.apache.spark.sql.streaming; 0019 0020 import java.util.concurrent.TimeUnit; 0021 0022 import org.apache.spark.annotation.Evolving; 0023 import org.apache.spark.sql.execution.streaming.ProcessingTimeTrigger; 0024 import scala.concurrent.duration.Duration; 0025 0026 import org.apache.spark.sql.execution.streaming.ContinuousTrigger; 0027 import org.apache.spark.sql.execution.streaming.OneTimeTrigger$; 0028 0029 /** 0030 * Policy used to indicate how often results should be produced by a [[StreamingQuery]]. 0031 * 0032 * @since 2.0.0 0033 */ 0034 @Evolving 0035 public class Trigger { 0036 0037 /** 0038 * A trigger policy that runs a query periodically based on an interval in processing time. 0039 * If `interval` is 0, the query will run as fast as possible. 0040 * 0041 * @since 2.2.0 0042 */ 0043 public static Trigger ProcessingTime(long intervalMs) { 0044 return ProcessingTimeTrigger.create(intervalMs, TimeUnit.MILLISECONDS); 0045 } 0046 0047 /** 0048 * (Java-friendly) 0049 * A trigger policy that runs a query periodically based on an interval in processing time. 0050 * If `interval` is 0, the query will run as fast as possible. 0051 * 0052 * {{{ 0053 * import java.util.concurrent.TimeUnit 0054 * df.writeStream().trigger(Trigger.ProcessingTime(10, TimeUnit.SECONDS)) 0055 * }}} 0056 * 0057 * @since 2.2.0 0058 */ 0059 public static Trigger ProcessingTime(long interval, TimeUnit timeUnit) { 0060 return ProcessingTimeTrigger.create(interval, timeUnit); 0061 } 0062 0063 /** 0064 * (Scala-friendly) 0065 * A trigger policy that runs a query periodically based on an interval in processing time. 0066 * If `duration` is 0, the query will run as fast as possible. 0067 * 0068 * {{{ 0069 * import scala.concurrent.duration._ 0070 * df.writeStream.trigger(Trigger.ProcessingTime(10.seconds)) 0071 * }}} 0072 * @since 2.2.0 0073 */ 0074 public static Trigger ProcessingTime(Duration interval) { 0075 return ProcessingTimeTrigger.apply(interval); 0076 } 0077 0078 /** 0079 * A trigger policy that runs a query periodically based on an interval in processing time. 0080 * If `interval` is effectively 0, the query will run as fast as possible. 0081 * 0082 * {{{ 0083 * df.writeStream.trigger(Trigger.ProcessingTime("10 seconds")) 0084 * }}} 0085 * @since 2.2.0 0086 */ 0087 public static Trigger ProcessingTime(String interval) { 0088 return ProcessingTimeTrigger.apply(interval); 0089 } 0090 0091 /** 0092 * A trigger that process only one batch of data in a streaming query then terminates 0093 * the query. 0094 * 0095 * @since 2.2.0 0096 */ 0097 public static Trigger Once() { 0098 return OneTimeTrigger$.MODULE$; 0099 } 0100 0101 /** 0102 * A trigger that continuously processes streaming data, asynchronously checkpointing at 0103 * the specified interval. 0104 * 0105 * @since 2.3.0 0106 */ 0107 public static Trigger Continuous(long intervalMs) { 0108 return ContinuousTrigger.apply(intervalMs); 0109 } 0110 0111 /** 0112 * A trigger that continuously processes streaming data, asynchronously checkpointing at 0113 * the specified interval. 0114 * 0115 * {{{ 0116 * import java.util.concurrent.TimeUnit 0117 * df.writeStream.trigger(Trigger.Continuous(10, TimeUnit.SECONDS)) 0118 * }}} 0119 * 0120 * @since 2.3.0 0121 */ 0122 public static Trigger Continuous(long interval, TimeUnit timeUnit) { 0123 return ContinuousTrigger.create(interval, timeUnit); 0124 } 0125 0126 /** 0127 * (Scala-friendly) 0128 * A trigger that continuously processes streaming data, asynchronously checkpointing at 0129 * the specified interval. 0130 * 0131 * {{{ 0132 * import scala.concurrent.duration._ 0133 * df.writeStream.trigger(Trigger.Continuous(10.seconds)) 0134 * }}} 0135 * @since 2.3.0 0136 */ 0137 public static Trigger Continuous(Duration interval) { 0138 return ContinuousTrigger.apply(interval); 0139 } 0140 0141 /** 0142 * A trigger that continuously processes streaming data, asynchronously checkpointing at 0143 * the specified interval. 0144 * 0145 * {{{ 0146 * df.writeStream.trigger(Trigger.Continuous("10 seconds")) 0147 * }}} 0148 * @since 2.3.0 0149 */ 0150 public static Trigger Continuous(String interval) { 0151 return ContinuousTrigger.apply(interval); 0152 } 0153 }
[ Source navigation ] | [ Diff markup ] | [ Identifier search ] | [ general search ] |
This page was automatically generated by the 2.1.0 LXR engine. The LXR team |