Back to home page

OSCL-LXR

 
 

    


0001 ---
0002 layout: global
0003 displayTitle: Structured Streaming Programming Guide
0004 title: Structured Streaming Programming Guide
0005 license: |
0006   Licensed to the Apache Software Foundation (ASF) under one or more
0007   contributor license agreements.  See the NOTICE file distributed with
0008   this work for additional information regarding copyright ownership.
0009   The ASF licenses this file to You under the Apache License, Version 2.0
0010   (the "License"); you may not use this file except in compliance with
0011   the License.  You may obtain a copy of the License at
0012  
0013      http://www.apache.org/licenses/LICENSE-2.0
0014  
0015   Unless required by applicable law or agreed to in writing, software
0016   distributed under the License is distributed on an "AS IS" BASIS,
0017   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0018   See the License for the specific language governing permissions and
0019   limitations under the License.
0020 ---
0021 
0022 * This will become a table of contents (this text will be scraped).
0023 {:toc}
0024 
0025 # Overview
0026 Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. You can express your streaming computation the same way you would express a batch computation on static data. The Spark SQL engine will take care of running it incrementally and continuously and updating the final result as streaming data continues to arrive. You can use the [Dataset/DataFrame API](sql-programming-guide.html) in Scala, Java, Python or R to express streaming aggregations, event-time windows, stream-to-batch joins, etc. The computation is executed on the same optimized Spark SQL engine. Finally, the system ensures end-to-end exactly-once fault-tolerance guarantees through checkpointing and Write-Ahead Logs. In short, *Structured Streaming provides fast, scalable, fault-tolerant, end-to-end exactly-once stream processing without the user having to reason about streaming.*
0027 
0028 Internally, by default, Structured Streaming queries are processed using a *micro-batch processing* engine, which processes data streams as a series of small batch jobs thereby achieving end-to-end latencies as low as 100 milliseconds and exactly-once fault-tolerance guarantees. However, since Spark 2.3, we have introduced a new low-latency processing mode called **Continuous Processing**, which can achieve end-to-end latencies as low as 1 millisecond with at-least-once guarantees. Without changing the Dataset/DataFrame operations in your queries, you will be able to choose the mode based on your application requirements. 
0029 
0030 In this guide, we are going to walk you through the programming model and the APIs. We are going to explain the concepts mostly using the default micro-batch processing model, and then [later](#continuous-processing) discuss Continuous Processing model. First, let's start with a simple example of a Structured Streaming query - a streaming word count.
0031 
0032 # Quick Example
0033 Let’s say you want to maintain a running word count of text data received from a data server listening on a TCP socket. Let’s see how you can express this using Structured Streaming. You can see the full code in
0034 [Scala]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCount.scala)/[Java]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java)/[Python]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/python/sql/streaming/structured_network_wordcount.py)/[R]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/r/streaming/structured_network_wordcount.R).
0035 And if you [download Spark](https://spark.apache.org/downloads.html), you can directly [run the example](index.html#running-the-examples-and-shell). In any case, let’s walk through the example step-by-step and understand how it works. First, we have to import the necessary classes and create a local SparkSession, the starting point of all functionalities related to Spark.
0036 
0037 <div class="codetabs">
0038 <div data-lang="scala"  markdown="1">
0039 
0040 {% highlight scala %}
0041 import org.apache.spark.sql.functions._
0042 import org.apache.spark.sql.SparkSession
0043 
0044 val spark = SparkSession
0045   .builder
0046   .appName("StructuredNetworkWordCount")
0047   .getOrCreate()
0048   
0049 import spark.implicits._
0050 {% endhighlight %}
0051 
0052 </div>
0053 <div data-lang="java"  markdown="1">
0054 
0055 {% highlight java %}
0056 import org.apache.spark.api.java.function.FlatMapFunction;
0057 import org.apache.spark.sql.*;
0058 import org.apache.spark.sql.streaming.StreamingQuery;
0059 
0060 import java.util.Arrays;
0061 import java.util.Iterator;
0062 
0063 SparkSession spark = SparkSession
0064   .builder()
0065   .appName("JavaStructuredNetworkWordCount")
0066   .getOrCreate();
0067 {% endhighlight %}
0068 
0069 </div>
0070 <div data-lang="python"  markdown="1">
0071 
0072 {% highlight python %}
0073 from pyspark.sql import SparkSession
0074 from pyspark.sql.functions import explode
0075 from pyspark.sql.functions import split
0076 
0077 spark = SparkSession \
0078     .builder \
0079     .appName("StructuredNetworkWordCount") \
0080     .getOrCreate()
0081 {% endhighlight %}
0082 
0083 </div>
0084 <div data-lang="r"  markdown="1">
0085 
0086 {% highlight r %}
0087 sparkR.session(appName = "StructuredNetworkWordCount")
0088 {% endhighlight %}
0089 
0090 </div>
0091 </div>
0092 
0093 Next, let’s create a streaming DataFrame that represents text data received from a server listening on localhost:9999, and transform the DataFrame to calculate word counts.
0094 
0095 <div class="codetabs">
0096 <div data-lang="scala"  markdown="1">
0097 
0098 {% highlight scala %}
0099 // Create DataFrame representing the stream of input lines from connection to localhost:9999
0100 val lines = spark.readStream
0101   .format("socket")
0102   .option("host", "localhost")
0103   .option("port", 9999)
0104   .load()
0105 
0106 // Split the lines into words
0107 val words = lines.as[String].flatMap(_.split(" "))
0108 
0109 // Generate running word count
0110 val wordCounts = words.groupBy("value").count()
0111 {% endhighlight %}
0112 
0113 This `lines` DataFrame represents an unbounded table containing the streaming text data. This table contains one column of strings named "value", and each line in the streaming text data becomes a row in the table. Note, that this is not currently receiving any data as we are just setting up the transformation, and have not yet started it. Next, we have converted the DataFrame to a  Dataset of String using `.as[String]`, so that we can apply the `flatMap` operation to split each line into multiple words. The resultant `words` Dataset contains all the words. Finally, we have defined the `wordCounts` DataFrame by grouping by the unique values in the Dataset and counting them. Note that this is a streaming DataFrame which represents the running word counts of the stream.
0114 
0115 </div>
0116 <div data-lang="java"  markdown="1">
0117 
0118 {% highlight java %}
0119 // Create DataFrame representing the stream of input lines from connection to localhost:9999
0120 Dataset<Row> lines = spark
0121   .readStream()
0122   .format("socket")
0123   .option("host", "localhost")
0124   .option("port", 9999)
0125   .load();
0126 
0127 // Split the lines into words
0128 Dataset<String> words = lines
0129   .as(Encoders.STRING())
0130   .flatMap((FlatMapFunction<String, String>) x -> Arrays.asList(x.split(" ")).iterator(), Encoders.STRING());
0131 
0132 // Generate running word count
0133 Dataset<Row> wordCounts = words.groupBy("value").count();
0134 {% endhighlight %}
0135 
0136 This `lines` DataFrame represents an unbounded table containing the streaming text data. This table contains one column of strings named "value", and each line in the streaming text data becomes a row in the table. Note, that this is not currently receiving any data as we are just setting up the transformation, and have not yet started it. Next, we have converted the DataFrame to a  Dataset of String using `.as(Encoders.STRING())`, so that we can apply the `flatMap` operation to split each line into multiple words. The resultant `words` Dataset contains all the words. Finally, we have defined the `wordCounts` DataFrame by grouping by the unique values in the Dataset and counting them. Note that this is a streaming DataFrame which represents the running word counts of the stream.
0137 
0138 </div>
0139 <div data-lang="python"  markdown="1">
0140 
0141 {% highlight python %}
0142 # Create DataFrame representing the stream of input lines from connection to localhost:9999
0143 lines = spark \
0144     .readStream \
0145     .format("socket") \
0146     .option("host", "localhost") \
0147     .option("port", 9999) \
0148     .load()
0149 
0150 # Split the lines into words
0151 words = lines.select(
0152    explode(
0153        split(lines.value, " ")
0154    ).alias("word")
0155 )
0156 
0157 # Generate running word count
0158 wordCounts = words.groupBy("word").count()
0159 {% endhighlight %}
0160 
0161 This `lines` DataFrame represents an unbounded table containing the streaming text data. This table contains one column of strings named "value", and each line in the streaming text data becomes a row in the table. Note, that this is not currently receiving any data as we are just setting up the transformation, and have not yet started it. Next, we have used two built-in SQL functions - split and explode, to split each line into multiple rows with a word each. In addition, we use the function `alias` to name the new column as "word". Finally, we have defined the `wordCounts` DataFrame by grouping by the unique values in the Dataset and counting them. Note that this is a streaming DataFrame which represents the running word counts of the stream.
0162 
0163 </div>
0164 <div data-lang="r"  markdown="1">
0165 
0166 {% highlight r %}
0167 # Create DataFrame representing the stream of input lines from connection to localhost:9999
0168 lines <- read.stream("socket", host = "localhost", port = 9999)
0169 
0170 # Split the lines into words
0171 words <- selectExpr(lines, "explode(split(value, ' ')) as word")
0172 
0173 # Generate running word count
0174 wordCounts <- count(group_by(words, "word"))
0175 {% endhighlight %}
0176 
0177 This `lines` SparkDataFrame represents an unbounded table containing the streaming text data. This table contains one column of strings named "value", and each line in the streaming text data becomes a row in the table. Note, that this is not currently receiving any data as we are just setting up the transformation, and have not yet started it. Next, we have a SQL expression with two SQL functions - split and explode, to split each line into multiple rows with a word each. In addition, we name the new column as "word". Finally, we have defined the `wordCounts` SparkDataFrame by grouping by the unique values in the SparkDataFrame and counting them. Note that this is a streaming SparkDataFrame which represents the running word counts of the stream.
0178 
0179 </div>
0180 </div>
0181 
0182 We have now set up the query on the streaming data. All that is left is to actually start receiving data and computing the counts. To do this, we set it up to print the complete set of counts (specified by `outputMode("complete")`) to the console every time they are updated. And then start the streaming computation using `start()`.
0183 
0184 <div class="codetabs">
0185 <div data-lang="scala"  markdown="1">
0186 
0187 {% highlight scala %}
0188 // Start running the query that prints the running counts to the console
0189 val query = wordCounts.writeStream
0190   .outputMode("complete")
0191   .format("console")
0192   .start()
0193 
0194 query.awaitTermination()
0195 {% endhighlight %}
0196 
0197 </div>
0198 <div data-lang="java"  markdown="1">
0199 
0200 {% highlight java %}
0201 // Start running the query that prints the running counts to the console
0202 StreamingQuery query = wordCounts.writeStream()
0203   .outputMode("complete")
0204   .format("console")
0205   .start();
0206 
0207 query.awaitTermination();
0208 {% endhighlight %}
0209 
0210 </div>
0211 <div data-lang="python"  markdown="1">
0212 
0213 {% highlight python %}
0214  # Start running the query that prints the running counts to the console
0215 query = wordCounts \
0216     .writeStream \
0217     .outputMode("complete") \
0218     .format("console") \
0219     .start()
0220 
0221 query.awaitTermination()
0222 {% endhighlight %}
0223 
0224 </div>
0225 <div data-lang="r"  markdown="1">
0226 
0227 {% highlight r %}
0228 # Start running the query that prints the running counts to the console
0229 query <- write.stream(wordCounts, "console", outputMode = "complete")
0230 
0231 awaitTermination(query)
0232 {% endhighlight %}
0233 
0234 </div>
0235 </div>
0236 
0237 After this code is executed, the streaming computation will have started in the background. The `query` object is a handle to that active streaming query, and we have decided to wait for the termination of the query using `awaitTermination()` to prevent the process from exiting while the query is active.
0238 
0239 To actually execute this example code, you can either compile the code in your own 
0240 [Spark application](quick-start.html#self-contained-applications), or simply 
0241 [run the example](index.html#running-the-examples-and-shell) once you have downloaded Spark. We are showing the latter. You will first need to run Netcat (a small utility found in most Unix-like systems) as a data server by using
0242 
0243 
0244     $ nc -lk 9999
0245 
0246 Then, in a different terminal, you can start the example by using
0247 
0248 <div class="codetabs">
0249 <div data-lang="scala"  markdown="1">
0250 {% highlight bash %}
0251 $ ./bin/run-example org.apache.spark.examples.sql.streaming.StructuredNetworkWordCount localhost 9999
0252 {% endhighlight %}
0253 </div>
0254 <div data-lang="java"  markdown="1">
0255 {% highlight bash %}
0256 $ ./bin/run-example org.apache.spark.examples.sql.streaming.JavaStructuredNetworkWordCount localhost 9999
0257 {% endhighlight %}
0258 </div>
0259 <div data-lang="python"  markdown="1">
0260 {% highlight bash %}
0261 $ ./bin/spark-submit examples/src/main/python/sql/streaming/structured_network_wordcount.py localhost 9999
0262 {% endhighlight %}
0263 </div>
0264 <div data-lang="r"  markdown="1">
0265 {% highlight bash %}
0266 $ ./bin/spark-submit examples/src/main/r/streaming/structured_network_wordcount.R localhost 9999
0267 {% endhighlight %}
0268 </div>
0269 </div>
0270 
0271 Then, any lines typed in the terminal running the netcat server will be counted and printed on screen every second. It will look something like the following.
0272 
0273 <table width="100%">
0274     <td>
0275 {% highlight bash %}
0276 # TERMINAL 1:
0277 # Running Netcat
0278 
0279 $ nc -lk 9999
0280 apache spark
0281 apache hadoop
0282 
0283 
0284 
0285 
0286 
0287 
0288 
0289 
0290 
0291 
0292 
0293 
0294 
0295 
0296 
0297 
0298 
0299 
0300 
0301 ...
0302 {% endhighlight %}
0303     </td>
0304     <td width="2%"></td>
0305     <td>
0306 <div class="codetabs">
0307 
0308 <div data-lang="scala" markdown="1">
0309 {% highlight bash %}
0310 # TERMINAL 2: RUNNING StructuredNetworkWordCount
0311 
0312 $ ./bin/run-example org.apache.spark.examples.sql.streaming.StructuredNetworkWordCount localhost 9999
0313 
0314 -------------------------------------------
0315 Batch: 0
0316 -------------------------------------------
0317 +------+-----+
0318 | value|count|
0319 +------+-----+
0320 |apache|    1|
0321 | spark|    1|
0322 +------+-----+
0323 
0324 -------------------------------------------
0325 Batch: 1
0326 -------------------------------------------
0327 +------+-----+
0328 | value|count|
0329 +------+-----+
0330 |apache|    2|
0331 | spark|    1|
0332 |hadoop|    1|
0333 +------+-----+
0334 ...
0335 {% endhighlight %}
0336 </div>
0337 
0338 <div data-lang="java" markdown="1">
0339 {% highlight bash %}
0340 # TERMINAL 2: RUNNING JavaStructuredNetworkWordCount
0341 
0342 $ ./bin/run-example org.apache.spark.examples.sql.streaming.JavaStructuredNetworkWordCount localhost 9999
0343 
0344 -------------------------------------------
0345 Batch: 0
0346 -------------------------------------------
0347 +------+-----+
0348 | value|count|
0349 +------+-----+
0350 |apache|    1|
0351 | spark|    1|
0352 +------+-----+
0353 
0354 -------------------------------------------
0355 Batch: 1
0356 -------------------------------------------
0357 +------+-----+
0358 | value|count|
0359 +------+-----+
0360 |apache|    2|
0361 | spark|    1|
0362 |hadoop|    1|
0363 +------+-----+
0364 ...
0365 {% endhighlight %}
0366 </div>
0367 <div data-lang="python" markdown="1">
0368 {% highlight bash %}
0369 # TERMINAL 2: RUNNING structured_network_wordcount.py
0370 
0371 $ ./bin/spark-submit examples/src/main/python/sql/streaming/structured_network_wordcount.py localhost 9999
0372 
0373 -------------------------------------------
0374 Batch: 0
0375 -------------------------------------------
0376 +------+-----+
0377 | value|count|
0378 +------+-----+
0379 |apache|    1|
0380 | spark|    1|
0381 +------+-----+
0382 
0383 -------------------------------------------
0384 Batch: 1
0385 -------------------------------------------
0386 +------+-----+
0387 | value|count|
0388 +------+-----+
0389 |apache|    2|
0390 | spark|    1|
0391 |hadoop|    1|
0392 +------+-----+
0393 ...
0394 {% endhighlight %}
0395 </div>
0396 <div data-lang="r" markdown="1">
0397 {% highlight bash %}
0398 # TERMINAL 2: RUNNING structured_network_wordcount.R
0399 
0400 $ ./bin/spark-submit examples/src/main/r/streaming/structured_network_wordcount.R localhost 9999
0401 
0402 -------------------------------------------
0403 Batch: 0
0404 -------------------------------------------
0405 +------+-----+
0406 | value|count|
0407 +------+-----+
0408 |apache|    1|
0409 | spark|    1|
0410 +------+-----+
0411 
0412 -------------------------------------------
0413 Batch: 1
0414 -------------------------------------------
0415 +------+-----+
0416 | value|count|
0417 +------+-----+
0418 |apache|    2|
0419 | spark|    1|
0420 |hadoop|    1|
0421 +------+-----+
0422 ...
0423 {% endhighlight %}
0424 </div>
0425 </div>
0426     </td>
0427 </table>
0428 
0429 
0430 # Programming Model
0431 
0432 The key idea in Structured Streaming is to treat a live data stream as a 
0433 table that is being continuously appended. This leads to a new stream 
0434 processing model that is very similar to a batch processing model. You will 
0435 express your streaming computation as standard batch-like query as on a static 
0436 table, and Spark runs it as an *incremental* query on the *unbounded* input 
0437 table. Let’s understand this model in more detail.
0438 
0439 ## Basic Concepts
0440 Consider the input data stream as the "Input Table". Every data item that is 
0441 arriving on the stream is like a new row being appended to the Input Table.
0442 
0443 ![Stream as a Table](img/structured-streaming-stream-as-a-table.png "Stream as a Table")
0444 
0445 A query on the input will generate the "Result Table". Every trigger interval (say, every 1 second), new rows get appended to the Input Table, which eventually updates the Result Table. Whenever the result table gets updated, we would want to write the changed result rows to an external sink. 
0446 
0447 ![Model](img/structured-streaming-model.png)
0448 
0449 The "Output" is defined as what gets written out to the external storage. The output can be defined in a different mode:
0450 
0451   - *Complete Mode* - The entire updated Result Table will be written to the external storage. It is up to the storage connector to decide how to handle writing of the entire table. 
0452 
0453   - *Append Mode* - Only the new rows appended in the Result Table since the last trigger will be written to the external storage. This is applicable only on the queries where existing rows in the Result Table are not expected to change.
0454   
0455   - *Update Mode* - Only the rows that were updated in the Result Table since the last trigger will be written to the external storage (available since Spark 2.1.1). Note that this is different from the Complete Mode in that this mode only outputs the rows that have changed since the last trigger. If the query doesn't contain aggregations, it will be equivalent to Append mode.
0456 
0457 Note that each mode is applicable on certain types of queries. This is discussed in detail [later](#output-modes).
0458 
0459 To illustrate the use of this model, let’s understand the model in context of 
0460 the [Quick Example](#quick-example) above. The first `lines` DataFrame is the input table, and 
0461 the final `wordCounts` DataFrame is the result table. Note that the query on 
0462 streaming `lines` DataFrame to generate `wordCounts` is *exactly the same* as 
0463 it would be a static DataFrame. However, when this query is started, Spark 
0464 will continuously check for new data from the socket connection. If there is 
0465 new data, Spark will run an "incremental" query that combines the previous 
0466 running counts with the new data to compute updated counts, as shown below.
0467 
0468 ![Model](img/structured-streaming-example-model.png)
0469 
0470 **Note that Structured Streaming does not materialize the entire table**. It reads the latest
0471 available data from the streaming data source, processes it incrementally to update the result,
0472 and then discards the source data. It only keeps around the minimal intermediate *state* data as
0473 required to update the result (e.g. intermediate counts in the earlier example).
0474 
0475 This model is significantly different from many other stream processing
0476 engines. Many streaming systems require the user to maintain running 
0477 aggregations themselves, thus having to reason about fault-tolerance, and 
0478 data consistency (at-least-once, or at-most-once, or exactly-once). In this 
0479 model, Spark is responsible for updating the Result Table when there is new 
0480 data, thus relieving the users from reasoning about it. As an example, let’s 
0481 see how this model handles event-time based processing and late arriving data.
0482 
0483 ## Handling Event-time and Late Data
0484 Event-time is the time embedded in the data itself. For many applications, you may want to operate on this event-time. For example, if you want to get the number of events generated by IoT devices every minute, then you probably want to use the time when the data was generated (that is, event-time in the data), rather than the time Spark receives them. This event-time is very naturally expressed in this model -- each event from the devices is a row in the table, and event-time is a column value in the row. This allows window-based aggregations (e.g. number of events every minute) to be just a special type of grouping and aggregation on the event-time column -- each time window is a group and each row can belong to multiple windows/groups. Therefore, such event-time-window-based aggregation queries can be defined consistently on both a static dataset (e.g. from collected device events logs) as well as on a data stream, making the life of the user much easier.
0485 
0486 Furthermore, this model naturally handles data that has arrived later than 
0487 expected based on its event-time. Since Spark is updating the Result Table, 
0488 it has full control over updating old aggregates when there is late data, 
0489 as well as cleaning up old aggregates to limit the size of intermediate
0490 state data. Since Spark 2.1, we have support for watermarking which 
0491 allows the user to specify the threshold of late data, and allows the engine
0492 to accordingly clean up old state. These are explained later in more 
0493 detail in the [Window Operations](#window-operations-on-event-time) section.
0494 
0495 ## Fault Tolerance Semantics
0496 Delivering end-to-end exactly-once semantics was one of key goals behind the design of Structured Streaming. To achieve that, we have designed the Structured Streaming sources, the sinks and the execution engine to reliably track the exact progress of the processing so that it can handle any kind of failure by restarting and/or reprocessing. Every streaming source is assumed to have offsets (similar to Kafka offsets, or Kinesis sequence numbers)
0497 to track the read position in the stream. The engine uses checkpointing and write-ahead logs to record the offset range of the data being processed in each trigger. The streaming sinks are designed to be idempotent for handling reprocessing. Together, using replayable sources and idempotent sinks, Structured Streaming can ensure **end-to-end exactly-once semantics** under any failure.
0498 
0499 # API using Datasets and DataFrames
0500 Since Spark 2.0, DataFrames and Datasets can represent static, bounded data, as well as streaming, unbounded data. Similar to static Datasets/DataFrames, you can use the common entry point `SparkSession`
0501 ([Scala](api/scala/org/apache/spark/sql/SparkSession.html)/[Java](api/java/org/apache/spark/sql/SparkSession.html)/[Python](api/python/pyspark.sql.html#pyspark.sql.SparkSession)/[R](api/R/sparkR.session.html) docs)
0502 to create streaming DataFrames/Datasets from streaming sources, and apply the same operations on them as static DataFrames/Datasets. If you are not familiar with Datasets/DataFrames, you are strongly advised to familiarize yourself with them using the
0503 [DataFrame/Dataset Programming Guide](sql-programming-guide.html).
0504 
0505 ## Creating streaming DataFrames and streaming Datasets
0506 Streaming DataFrames can be created through the `DataStreamReader` interface
0507 ([Scala](api/scala/org/apache/spark/sql/streaming/DataStreamReader.html)/[Java](api/java/org/apache/spark/sql/streaming/DataStreamReader.html)/[Python](api/python/pyspark.sql.html#pyspark.sql.streaming.DataStreamReader) docs)
0508 returned by `SparkSession.readStream()`. In [R](api/R/read.stream.html), with the `read.stream()` method. Similar to the read interface for creating static DataFrame, you can specify the details of the source – data format, schema, options, etc.
0509 
0510 #### Input Sources
0511 There are a few built-in sources.
0512 
0513   - **File source** - Reads files written in a directory as a stream of data. Files will be processed in the order of file modification time. If `latestFirst` is set, order will be reversed. Supported file formats are text, CSV, JSON, ORC, Parquet. See the docs of the DataStreamReader interface for a more up-to-date list, and supported options for each file format. Note that the files must be atomically placed in the given directory, which in most file systems, can be achieved by file move operations.
0514   - **Kafka source** - Reads data from Kafka. It's compatible with Kafka broker versions 0.10.0 or higher. See the [Kafka Integration Guide](structured-streaming-kafka-integration.html) for more details.
0515 
0516   - **Socket source (for testing)** - Reads UTF8 text data from a socket connection. The listening server socket is at the driver. Note that this should be used only for testing as this does not provide end-to-end fault-tolerance guarantees. 
0517 
0518   - **Rate source (for testing)** - Generates data at the specified number of rows per second, each output row contains a `timestamp` and `value`. Where `timestamp` is a `Timestamp` type containing the time of message dispatch, and `value` is of `Long` type containing the message count, starting from 0 as the first row. This source is intended for testing and benchmarking.
0519 
0520 Some sources are not fault-tolerant because they do not guarantee that data can be replayed using 
0521 checkpointed offsets after a failure. See the earlier section on 
0522 [fault-tolerance semantics](#fault-tolerance-semantics).
0523 Here are the details of all the sources in Spark.
0524 
0525 <table class="table">
0526   <tr>
0527     <th>Source</th>
0528     <th>Options</th>
0529     <th>Fault-tolerant</th>
0530     <th>Notes</th>
0531   </tr>
0532   <tr>
0533     <td><b>File source</b></td>
0534     <td>
0535         <code>path</code>: path to the input directory, and common to all file formats.
0536         <br/>
0537         <code>maxFilesPerTrigger</code>: maximum number of new files to be considered in every trigger (default: no max)
0538         <br/>
0539         <code>latestFirst</code>: whether to process the latest new files first, useful when there is a large backlog of files (default: false)
0540         <br/>
0541         <code>fileNameOnly</code>: whether to check new files based on only the filename instead of on the full path (default: false). With this set to `true`, the following files would be considered as the same file, because their filenames, "dataset.txt", are the same:
0542         <br/>
0543         <code>maxFileAge</code>: Maximum age of a file that can be found in this directory, before it is ignored. For the first batch all files will be considered valid. If <code>latestFirst</code> is set to `true` and <code>maxFilesPerTrigger</code> is set, then this parameter will be ignored, because old files that are valid, and should be processed, may be ignored. The max age is specified with respect to the timestamp of the latest file, and not the timestamp of the current system.(default: 1 week)
0544         <br/>
0545         "file:///dataset.txt"<br/>
0546         "s3://a/dataset.txt"<br/>
0547         "s3n://a/b/dataset.txt"<br/>
0548         "s3a://a/b/c/dataset.txt"<br/>
0549         <code>cleanSource</code>: option to clean up completed files after processing.<br/>
0550         Available options are "archive", "delete", "off". If the option is not provided, the default value is "off".<br/>
0551         When "archive" is provided, additional option <code>sourceArchiveDir</code> must be provided as well. The value of "sourceArchiveDir" must not match with source pattern in depth (the number of directories from the root directory), where the depth is minimum of depth on both paths. This will ensure archived files are never included as new source files.<br/>
0552         For example, suppose you provide '/hello?/spark/*' as source pattern, '/hello1/spark/archive/dir' cannot be used as the value of "sourceArchiveDir", as '/hello?/spark/*' and '/hello1/spark/archive' will be matched. '/hello1/spark' cannot be also used as the value of "sourceArchiveDir", as '/hello?/spark' and '/hello1/spark' will be matched. '/archived/here' would be OK as it doesn't match.<br/>
0553         Spark will move source files respecting their own path. For example, if the path of source file is <code>/a/b/dataset.txt</code> and the path of archive directory is <code>/archived/here</code>, file will be moved to <code>/archived/here/a/b/dataset.txt</code>.<br/>
0554         NOTE: Both archiving (via moving) or deleting completed files will introduce overhead (slow down, even if it's happening in separate thread) in each micro-batch, so you need to understand the cost for each operation in your file system before enabling this option. On the other hand, enabling this option will reduce the cost to list source files which can be an expensive operation.<br/>
0555         Number of threads used in completed file cleaner can be configured with<code>spark.sql.streaming.fileSource.cleaner.numThreads</code> (default: 1).<br/>
0556         NOTE 2: The source path should not be used from multiple sources or queries when enabling this option. Similarly, you must ensure the source path doesn't match to any files in output directory of file stream sink.<br/>
0557         NOTE 3: Both delete and move actions are best effort. Failing to delete or move files will not fail the streaming query. Spark may not clean up some source files in some circumstances - e.g. the application doesn't shut down gracefully, too many files are queued to clean up.
0558         <br/><br/>
0559         For file-format-specific options, see the related methods in <code>DataStreamReader</code>
0560         (<a href="api/scala/org/apache/spark/sql/streaming/DataStreamReader.html">Scala</a>/<a href="api/java/org/apache/spark/sql/streaming/DataStreamReader.html">Java</a>/<a href="api/python/pyspark.sql.html#pyspark.sql.streaming.DataStreamReader">Python</a>/<a
0561         href="api/R/read.stream.html">R</a>).
0562         E.g. for "parquet" format options see <code>DataStreamReader.parquet()</code>.
0563         <br/><br/>
0564         In addition, there are session configurations that affect certain file-formats. See the <a href="sql-programming-guide.html">SQL Programming Guide</a> for more details. E.g., for "parquet", see <a href="sql-data-sources-parquet.html#configuration">Parquet configuration</a> section.
0565         </td>
0566     <td>Yes</td>
0567     <td>Supports glob paths, but does not support multiple comma-separated paths/globs.</td>
0568   </tr>
0569   <tr>
0570     <td><b>Socket Source</b></td>
0571     <td>
0572         <code>host</code>: host to connect to, must be specified<br/>
0573         <code>port</code>: port to connect to, must be specified
0574     </td>
0575     <td>No</td>
0576     <td></td>
0577   </tr>
0578   <tr>
0579     <td><b>Rate Source</b></td>
0580     <td>
0581         <code>rowsPerSecond</code> (e.g. 100, default: 1): How many rows should be generated per second.<br/><br/>
0582         <code>rampUpTime</code> (e.g. 5s, default: 0s): How long to ramp up before the generating speed becomes <code>rowsPerSecond</code>. Using finer granularities than seconds will be truncated to integer seconds. <br/><br/>
0583         <code>numPartitions</code> (e.g. 10, default: Spark's default parallelism): The partition number for the generated rows. <br/><br/>
0584         
0585         The source will try its best to reach <code>rowsPerSecond</code>, but the query may be resource constrained, and <code>numPartitions</code> can be tweaked to help reach the desired speed.
0586     </td>
0587     <td>Yes</td>
0588     <td></td>
0589   </tr>
0590 
0591   <tr>
0592     <td><b>Kafka Source</b></td>
0593     <td>
0594         See the <a href="structured-streaming-kafka-integration.html">Kafka Integration Guide</a>.
0595     </td>
0596     <td>Yes</td>
0597     <td></td>
0598   </tr>
0599   <tr>
0600     <td></td>
0601     <td></td>
0602     <td></td>
0603     <td></td>
0604   </tr>
0605 </table>
0606 
0607 Here are some examples.
0608 
0609 <div class="codetabs">
0610 <div data-lang="scala"  markdown="1">
0611 
0612 {% highlight scala %}
0613 val spark: SparkSession = ...
0614 
0615 // Read text from socket
0616 val socketDF = spark
0617   .readStream
0618   .format("socket")
0619   .option("host", "localhost")
0620   .option("port", 9999)
0621   .load()
0622 
0623 socketDF.isStreaming    // Returns True for DataFrames that have streaming sources
0624 
0625 socketDF.printSchema
0626 
0627 // Read all the csv files written atomically in a directory
0628 val userSchema = new StructType().add("name", "string").add("age", "integer")
0629 val csvDF = spark
0630   .readStream
0631   .option("sep", ";")
0632   .schema(userSchema)      // Specify schema of the csv files
0633   .csv("/path/to/directory")    // Equivalent to format("csv").load("/path/to/directory")
0634 {% endhighlight %}
0635 
0636 </div>
0637 <div data-lang="java"  markdown="1">
0638 
0639 {% highlight java %}
0640 SparkSession spark = ...
0641 
0642 // Read text from socket
0643 Dataset<Row> socketDF = spark
0644   .readStream()
0645   .format("socket")
0646   .option("host", "localhost")
0647   .option("port", 9999)
0648   .load();
0649 
0650 socketDF.isStreaming();    // Returns True for DataFrames that have streaming sources
0651 
0652 socketDF.printSchema();
0653 
0654 // Read all the csv files written atomically in a directory
0655 StructType userSchema = new StructType().add("name", "string").add("age", "integer");
0656 Dataset<Row> csvDF = spark
0657   .readStream()
0658   .option("sep", ";")
0659   .schema(userSchema)      // Specify schema of the csv files
0660   .csv("/path/to/directory");    // Equivalent to format("csv").load("/path/to/directory")
0661 {% endhighlight %}
0662 
0663 </div>
0664 <div data-lang="python"  markdown="1">
0665 
0666 {% highlight python %}
0667 spark = SparkSession. ...
0668 
0669 # Read text from socket
0670 socketDF = spark \
0671     .readStream \
0672     .format("socket") \
0673     .option("host", "localhost") \
0674     .option("port", 9999) \
0675     .load()
0676 
0677 socketDF.isStreaming()    # Returns True for DataFrames that have streaming sources
0678 
0679 socketDF.printSchema()
0680 
0681 # Read all the csv files written atomically in a directory
0682 userSchema = StructType().add("name", "string").add("age", "integer")
0683 csvDF = spark \
0684     .readStream \
0685     .option("sep", ";") \
0686     .schema(userSchema) \
0687     .csv("/path/to/directory")  # Equivalent to format("csv").load("/path/to/directory")
0688 {% endhighlight %}
0689 
0690 </div>
0691 <div data-lang="r"  markdown="1">
0692 
0693 {% highlight r %}
0694 sparkR.session(...)
0695 
0696 # Read text from socket
0697 socketDF <- read.stream("socket", host = hostname, port = port)
0698 
0699 isStreaming(socketDF)    # Returns TRUE for SparkDataFrames that have streaming sources
0700 
0701 printSchema(socketDF)
0702 
0703 # Read all the csv files written atomically in a directory
0704 schema <- structType(structField("name", "string"),
0705                      structField("age", "integer"))
0706 csvDF <- read.stream("csv", path = "/path/to/directory", schema = schema, sep = ";")
0707 {% endhighlight %}
0708 
0709 </div>
0710 </div>
0711 
0712 These examples generate streaming DataFrames that are untyped, meaning that the schema of the DataFrame is not checked at compile time, only checked at runtime when the query is submitted. Some operations like `map`, `flatMap`, etc. need the type to be known at compile time. To do those, you can convert these untyped streaming DataFrames to typed streaming Datasets using the same methods as static DataFrame. See the [SQL Programming Guide](sql-programming-guide.html) for more details. Additionally, more details on the supported streaming sources are discussed later in the document.
0713 
0714 ### Schema inference and partition of streaming DataFrames/Datasets
0715 
0716 By default, Structured Streaming from file based sources requires you to specify the schema, rather than rely on Spark to infer it automatically. This restriction ensures a consistent schema will be used for the streaming query, even in the case of failures. For ad-hoc use cases, you can reenable schema inference by setting `spark.sql.streaming.schemaInference` to `true`.
0717 
0718 Partition discovery does occur when subdirectories that are named `/key=value/` are present and listing will automatically recurse into these directories. If these columns appear in the user-provided schema, they will be filled in by Spark based on the path of the file being read. The directories that make up the partitioning scheme must be present when the query starts and must remain static. For example, it is okay to add `/data/year=2016/` when `/data/year=2015/` was present, but it is invalid to change the partitioning column (i.e. by creating the directory `/data/date=2016-04-17/`).
0719 
0720 ## Operations on streaming DataFrames/Datasets
0721 You can apply all kinds of operations on streaming DataFrames/Datasets – ranging from untyped, SQL-like operations (e.g. `select`, `where`, `groupBy`), to typed RDD-like operations (e.g. `map`, `filter`, `flatMap`). See the [SQL programming guide](sql-programming-guide.html) for more details. Let’s take a look at a few example operations that you can use.
0722 
0723 ### Basic Operations - Selection, Projection, Aggregation
0724 Most of the common operations on DataFrame/Dataset are supported for streaming. The few operations that are not supported are [discussed later](#unsupported-operations) in this section.
0725 
0726 <div class="codetabs">
0727 <div data-lang="scala"  markdown="1">
0728 
0729 {% highlight scala %}
0730 case class DeviceData(device: String, deviceType: String, signal: Double, time: DateTime)
0731 
0732 val df: DataFrame = ... // streaming DataFrame with IOT device data with schema { device: string, deviceType: string, signal: double, time: string }
0733 val ds: Dataset[DeviceData] = df.as[DeviceData]    // streaming Dataset with IOT device data
0734 
0735 // Select the devices which have signal more than 10
0736 df.select("device").where("signal > 10")      // using untyped APIs   
0737 ds.filter(_.signal > 10).map(_.device)         // using typed APIs
0738 
0739 // Running count of the number of updates for each device type
0740 df.groupBy("deviceType").count()                          // using untyped API
0741 
0742 // Running average signal for each device type
0743 import org.apache.spark.sql.expressions.scalalang.typed
0744 ds.groupByKey(_.deviceType).agg(typed.avg(_.signal))    // using typed API
0745 {% endhighlight %}
0746 
0747 </div>
0748 <div data-lang="java"  markdown="1">
0749 
0750 {% highlight java %}
0751 import org.apache.spark.api.java.function.*;
0752 import org.apache.spark.sql.*;
0753 import org.apache.spark.sql.expressions.javalang.typed;
0754 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
0755 
0756 public class DeviceData {
0757   private String device;
0758   private String deviceType;
0759   private Double signal;
0760   private java.sql.Date time;
0761   ...
0762   // Getter and setter methods for each field
0763 }
0764 
0765 Dataset<Row> df = ...;    // streaming DataFrame with IOT device data with schema { device: string, type: string, signal: double, time: DateType }
0766 Dataset<DeviceData> ds = df.as(ExpressionEncoder.javaBean(DeviceData.class)); // streaming Dataset with IOT device data
0767 
0768 // Select the devices which have signal more than 10
0769 df.select("device").where("signal > 10"); // using untyped APIs
0770 ds.filter((FilterFunction<DeviceData>) value -> value.getSignal() > 10)
0771   .map((MapFunction<DeviceData, String>) value -> value.getDevice(), Encoders.STRING());
0772 
0773 // Running count of the number of updates for each device type
0774 df.groupBy("deviceType").count(); // using untyped API
0775 
0776 // Running average signal for each device type
0777 ds.groupByKey((MapFunction<DeviceData, String>) value -> value.getDeviceType(), Encoders.STRING())
0778   .agg(typed.avg((MapFunction<DeviceData, Double>) value -> value.getSignal()));
0779 {% endhighlight %}
0780 
0781 
0782 </div>
0783 <div data-lang="python"  markdown="1">
0784 
0785 {% highlight python %}
0786 df = ...  # streaming DataFrame with IOT device data with schema { device: string, deviceType: string, signal: double, time: DateType }
0787 
0788 # Select the devices which have signal more than 10
0789 df.select("device").where("signal > 10")
0790 
0791 # Running count of the number of updates for each device type
0792 df.groupBy("deviceType").count()
0793 {% endhighlight %}
0794 </div>
0795 <div data-lang="r"  markdown="1">
0796 
0797 {% highlight r %}
0798 df <- ...  # streaming DataFrame with IOT device data with schema { device: string, deviceType: string, signal: double, time: DateType }
0799 
0800 # Select the devices which have signal more than 10
0801 select(where(df, "signal > 10"), "device")
0802 
0803 # Running count of the number of updates for each device type
0804 count(groupBy(df, "deviceType"))
0805 {% endhighlight %}
0806 </div>
0807 </div>
0808 
0809 You can also register a streaming DataFrame/Dataset as a temporary view and then apply SQL commands on it.
0810 
0811 <div class="codetabs">
0812 <div data-lang="scala"  markdown="1">
0813 {% highlight scala %}
0814 df.createOrReplaceTempView("updates")
0815 spark.sql("select count(*) from updates")  // returns another streaming DF
0816 {% endhighlight %}
0817 </div>
0818 <div data-lang="java"  markdown="1">  
0819 {% highlight java %}
0820 df.createOrReplaceTempView("updates");
0821 spark.sql("select count(*) from updates");  // returns another streaming DF
0822 {% endhighlight %}
0823 </div>
0824 <div data-lang="python"  markdown="1">  
0825 {% highlight python %}
0826 df.createOrReplaceTempView("updates")
0827 spark.sql("select count(*) from updates")  # returns another streaming DF
0828 {% endhighlight %}
0829 </div>
0830 <div data-lang="r"  markdown="1">
0831 {% highlight r %}
0832 createOrReplaceTempView(df, "updates")
0833 sql("select count(*) from updates")
0834 {% endhighlight %}
0835 </div>
0836 </div>
0837 
0838 Note, you can identify whether a DataFrame/Dataset has streaming data or not by using `df.isStreaming`.
0839 
0840 <div class="codetabs">
0841 <div data-lang="scala"  markdown="1">
0842 {% highlight scala %}
0843 df.isStreaming
0844 {% endhighlight %}
0845 </div>
0846 <div data-lang="java"  markdown="1">
0847 {% highlight java %}
0848 df.isStreaming()
0849 {% endhighlight %}
0850 </div>
0851 <div data-lang="python"  markdown="1">
0852 {% highlight python %}
0853 df.isStreaming()
0854 {% endhighlight %}
0855 </div>
0856 <div data-lang="r"  markdown="1">
0857 {% highlight r %}
0858 isStreaming(df)
0859 {% endhighlight %}
0860 </div>
0861 </div>
0862 
0863 ### Window Operations on Event Time
0864 Aggregations over a sliding event-time window are straightforward with Structured Streaming and are very similar to grouped aggregations. In a grouped aggregation, aggregate values (e.g. counts) are maintained for each unique value in the user-specified grouping column. In case of window-based aggregations, aggregate values are maintained for each window the event-time of a row falls into. Let's understand this with an illustration. 
0865 
0866 Imagine our [quick example](#quick-example) is modified and the stream now contains lines along with the time when the line was generated. Instead of running word counts, we want to count words within 10 minute windows, updating every 5 minutes. That is, word counts in words received between 10 minute windows 12:00 - 12:10, 12:05 - 12:15, 12:10 - 12:20, etc. Note that 12:00 - 12:10 means data that arrived after 12:00 but before 12:10. Now, consider a word that was received at 12:07. This word should increment the counts corresponding to two windows 12:00 - 12:10 and 12:05 - 12:15. So the counts will be indexed by both, the grouping key (i.e. the word) and the window (can be calculated from the event-time).
0867 
0868 The result tables would look something like the following.
0869 
0870 ![Window Operations](img/structured-streaming-window.png)
0871 
0872 Since this windowing is similar to grouping, in code, you can use `groupBy()` and `window()` operations to express windowed aggregations. You can see the full code for the below examples in
0873 [Scala]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCountWindowed.scala)/[Java]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCountWindowed.java)/[Python]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/python/sql/streaming/structured_network_wordcount_windowed.py).
0874 
0875 <div class="codetabs">
0876 <div data-lang="scala"  markdown="1">
0877 
0878 {% highlight scala %}
0879 import spark.implicits._
0880 
0881 val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }
0882 
0883 // Group the data by window and word and compute the count of each group
0884 val windowedCounts = words.groupBy(
0885   window($"timestamp", "10 minutes", "5 minutes"),
0886   $"word"
0887 ).count()
0888 {% endhighlight %}
0889 
0890 </div>
0891 <div data-lang="java"  markdown="1">
0892 
0893 {% highlight java %}
0894 Dataset<Row> words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }
0895 
0896 // Group the data by window and word and compute the count of each group
0897 Dataset<Row> windowedCounts = words.groupBy(
0898   functions.window(words.col("timestamp"), "10 minutes", "5 minutes"),
0899   words.col("word")
0900 ).count();
0901 {% endhighlight %}
0902 
0903 </div>
0904 <div data-lang="python"  markdown="1">
0905 {% highlight python %}
0906 words = ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }
0907 
0908 # Group the data by window and word and compute the count of each group
0909 windowedCounts = words.groupBy(
0910     window(words.timestamp, "10 minutes", "5 minutes"),
0911     words.word
0912 ).count()
0913 {% endhighlight %}
0914 
0915 </div>
0916 <div data-lang="r"  markdown="1">
0917 {% highlight r %}
0918 words <- ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }
0919 
0920 # Group the data by window and word and compute the count of each group
0921 windowedCounts <- count(
0922                     groupBy(
0923                       words,
0924                       window(words$timestamp, "10 minutes", "5 minutes"),
0925                       words$word))
0926 {% endhighlight %}
0927 
0928 </div>
0929 </div>
0930 
0931 
0932 #### Handling Late Data and Watermarking
0933 Now consider what happens if one of the events arrives late to the application.
0934 For example, say, a word generated at 12:04 (i.e. event time) could be received by 
0935 the application at 12:11. The application should use the time 12:04 instead of 12:11
0936 to update the older counts for the window `12:00 - 12:10`. This occurs 
0937 naturally in our window-based grouping – Structured Streaming can maintain the intermediate state 
0938 for partial aggregates for a long period of time such that late data can update aggregates of 
0939 old windows correctly, as illustrated below.
0940 
0941 ![Handling Late Data](img/structured-streaming-late-data.png)
0942 
0943 However, to run this query for days, it's necessary for the system to bound the amount of 
0944 intermediate in-memory state it accumulates. This means the system needs to know when an old 
0945 aggregate can be dropped from the in-memory state because the application is not going to receive 
0946 late data for that aggregate any more. To enable this, in Spark 2.1, we have introduced 
0947 **watermarking**, which lets the engine automatically track the current event time in the data
0948 and attempt to clean up old state accordingly. You can define the watermark of a query by 
0949 specifying the event time column and the threshold on how late the data is expected to be in terms of 
0950 event time. For a specific window ending at time `T`, the engine will maintain state and allow late
0951 data to update the state until `(max event time seen by the engine - late threshold > T)`. 
0952 In other words, late data within the threshold will be aggregated, 
0953 but data later than the threshold will start getting dropped
0954 (see [later](#semantic-guarantees-of-aggregation-with-watermarking)
0955 in the section for the exact guarantees). Let's understand this with an example. We can
0956 easily define watermarking on the previous example using `withWatermark()` as shown below.
0957 
0958 <div class="codetabs">
0959 <div data-lang="scala"  markdown="1">
0960 
0961 {% highlight scala %}
0962 import spark.implicits._
0963 
0964 val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }
0965 
0966 // Group the data by window and word and compute the count of each group
0967 val windowedCounts = words
0968     .withWatermark("timestamp", "10 minutes")
0969     .groupBy(
0970         window($"timestamp", "10 minutes", "5 minutes"),
0971         $"word")
0972     .count()
0973 {% endhighlight %}
0974 
0975 </div>
0976 <div data-lang="java"  markdown="1">
0977 
0978 {% highlight java %}
0979 Dataset<Row> words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }
0980 
0981 // Group the data by window and word and compute the count of each group
0982 Dataset<Row> windowedCounts = words
0983     .withWatermark("timestamp", "10 minutes")
0984     .groupBy(
0985         window(col("timestamp"), "10 minutes", "5 minutes"),
0986         col("word"))
0987     .count();
0988 {% endhighlight %}
0989 
0990 </div>
0991 <div data-lang="python"  markdown="1">
0992 {% highlight python %}
0993 words = ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }
0994 
0995 # Group the data by window and word and compute the count of each group
0996 windowedCounts = words \
0997     .withWatermark("timestamp", "10 minutes") \
0998     .groupBy(
0999         window(words.timestamp, "10 minutes", "5 minutes"),
1000         words.word) \
1001     .count()
1002 {% endhighlight %}
1003 
1004 </div>
1005 <div data-lang="r"  markdown="1">
1006 {% highlight r %}
1007 words <- ...  # streaming DataFrame of schema { timestamp: Timestamp, word: String }
1008 
1009 # Group the data by window and word and compute the count of each group
1010 
1011 words <- withWatermark(words, "timestamp", "10 minutes")
1012 windowedCounts <- count(
1013                     groupBy(
1014                       words,
1015                       window(words$timestamp, "10 minutes", "5 minutes"),
1016                       words$word))
1017 {% endhighlight %}
1018 
1019 </div>
1020 </div>
1021 
1022 In this example, we are defining the watermark of the query on the value of the column "timestamp", 
1023 and also defining "10 minutes" as the threshold of how late is the data allowed to be. If this query 
1024 is run in Update output mode (discussed later in [Output Modes](#output-modes) section), 
1025 the engine will keep updating counts of a window in the Result Table until the window is older
1026 than the watermark, which lags behind the current event time in column "timestamp" by 10 minutes.
1027 Here is an illustration. 
1028 
1029 ![Watermarking in Update Mode](img/structured-streaming-watermark-update-mode.png)
1030 
1031 As shown in the illustration, the maximum event time tracked by the engine is the 
1032 *blue dashed line*, and the watermark set as `(max event time - '10 mins')`
1033 at the beginning of every trigger is the red line. For example, when the engine observes the data 
1034 `(12:14, dog)`, it sets the watermark for the next trigger as `12:04`.
1035 This watermark lets the engine maintain intermediate state for additional 10 minutes to allow late
1036 data to be counted. For example, the data `(12:09, cat)` is out of order and late, and it falls in
1037 windows `12:00 - 12:10` and `12:05 - 12:15`. Since, it is still ahead of the watermark `12:04` in 
1038 the trigger, the engine still maintains the intermediate counts as state and correctly updates the 
1039 counts of the related windows. However, when the watermark is updated to `12:11`, the intermediate 
1040 state for window `(12:00 - 12:10)` is cleared, and all subsequent data (e.g. `(12:04, donkey)`) 
1041 is considered "too late" and therefore ignored. Note that after every trigger, 
1042 the updated counts (i.e. purple rows) are written to sink as the trigger output, as dictated by 
1043 the Update mode.
1044 
1045 Some sinks (e.g. files) may not supported fine-grained updates that Update Mode requires. To work
1046 with them, we have also support Append Mode, where only the *final counts* are written to sink.
1047 This is illustrated below.
1048 
1049 Note that using `withWatermark` on a non-streaming Dataset is no-op. As the watermark should not affect 
1050 any batch query in any way, we will ignore it directly.
1051 
1052 ![Watermarking in Append Mode](img/structured-streaming-watermark-append-mode.png)
1053 
1054 Similar to the Update Mode earlier, the engine maintains intermediate counts for each window. 
1055 However, the partial counts are not updated to the Result Table and not written to sink. The engine
1056 waits for "10 mins" for late date to be counted, 
1057 then drops intermediate state of a window < watermark, and appends the final
1058 counts to the Result Table/sink. For example, the final counts of window `12:00 - 12:10` is 
1059 appended to the Result Table only after the watermark is updated to `12:11`. 
1060 
1061 ##### Conditions for watermarking to clean aggregation state
1062 {:.no_toc}
1063 
1064 It is important to note that the following conditions must be satisfied for the watermarking to 
1065 clean the state in aggregation queries *(as of Spark 2.1.1, subject to change in the future)*.
1066 
1067 - **Output mode must be Append or Update.** Complete mode requires all aggregate data to be preserved, 
1068 and hence cannot use watermarking to drop intermediate state. See the [Output Modes](#output-modes) 
1069 section for detailed explanation of the semantics of each output mode.
1070 
1071 - The aggregation must have either the event-time column, or a `window` on the event-time column. 
1072 
1073 - `withWatermark` must be called on the 
1074 same column as the timestamp column used in the aggregate. For example, 
1075 `df.withWatermark("time", "1 min").groupBy("time2").count()` is invalid 
1076 in Append output mode, as watermark is defined on a different column
1077 from the aggregation column.
1078 
1079 - `withWatermark` must be called before the aggregation for the watermark details to be used. 
1080 For example, `df.groupBy("time").count().withWatermark("time", "1 min")` is invalid in Append 
1081 output mode.
1082 
1083 ##### Semantic Guarantees of Aggregation with Watermarking
1084 {:.no_toc}
1085 
1086 - A watermark delay (set with `withWatermark`) of "2 hours" guarantees that the engine will never
1087 drop any data that is less than 2 hours delayed. In other words, any data less than 2 hours behind
1088 (in terms of event-time) the latest data processed till then is guaranteed to be aggregated.
1089 
1090 - However, the guarantee is strict only in one direction. Data delayed by more than 2 hours is
1091 not guaranteed to be dropped; it may or may not get aggregated. More delayed is the data, less
1092 likely is the engine going to process it.
1093 
1094 ### Join Operations
1095 Structured Streaming supports joining a streaming Dataset/DataFrame with a static Dataset/DataFrame
1096 as well as another streaming Dataset/DataFrame. The result of the streaming join is generated
1097 incrementally, similar to the results of streaming aggregations in the previous section. In this
1098 section we will explore what type of joins (i.e. inner, outer, etc.) are supported in the above
1099 cases. Note that in all the supported join types, the result of the join with a streaming
1100 Dataset/DataFrame will be the exactly the same as if it was with a static Dataset/DataFrame
1101 containing the same data in the stream.
1102 
1103 
1104 #### Stream-static Joins
1105 
1106 Since the introduction in Spark 2.0, Structured Streaming has supported joins (inner join and some
1107 type of outer joins) between a streaming and a static DataFrame/Dataset. Here is a simple example.
1108 
1109 <div class="codetabs">
1110 <div data-lang="scala"  markdown="1">
1111 
1112 {% highlight scala %}
1113 val staticDf = spark.read. ...
1114 val streamingDf = spark.readStream. ...
1115 
1116 streamingDf.join(staticDf, "type")          // inner equi-join with a static DF
1117 streamingDf.join(staticDf, "type", "right_join")  // right outer join with a static DF  
1118 
1119 {% endhighlight %}
1120 
1121 </div>
1122 <div data-lang="java"  markdown="1">
1123 
1124 {% highlight java %}
1125 Dataset<Row> staticDf = spark.read(). ...;
1126 Dataset<Row> streamingDf = spark.readStream(). ...;
1127 streamingDf.join(staticDf, "type");         // inner equi-join with a static DF
1128 streamingDf.join(staticDf, "type", "right_join");  // right outer join with a static DF
1129 {% endhighlight %}
1130 
1131 
1132 </div>
1133 <div data-lang="python"  markdown="1">
1134 
1135 {% highlight python %}
1136 staticDf = spark.read. ...
1137 streamingDf = spark.readStream. ...
1138 streamingDf.join(staticDf, "type")  # inner equi-join with a static DF
1139 streamingDf.join(staticDf, "type", "right_join")  # right outer join with a static DF
1140 {% endhighlight %}
1141 
1142 </div>
1143 
1144 <div data-lang="r"  markdown="1">
1145 
1146 {% highlight r %}
1147 staticDf <- read.df(...)
1148 streamingDf <- read.stream(...)
1149 joined <- merge(streamingDf, staticDf, sort = FALSE)  # inner equi-join with a static DF
1150 joined <- join(
1151             staticDf,
1152             streamingDf, 
1153             streamingDf$value == staticDf$value,
1154             "right_outer")  # right outer join with a static DF
1155 {% endhighlight %}
1156 
1157 </div>
1158 </div>
1159 
1160 Note that stream-static joins are not stateful, so no state management is necessary.
1161 However, a few types of stream-static outer joins are not yet supported.
1162 These are listed at the [end of this Join section](#support-matrix-for-joins-in-streaming-queries).
1163 
1164 #### Stream-stream Joins
1165 In Spark 2.3, we have added support for stream-stream joins, that is, you can join two streaming
1166 Datasets/DataFrames. The challenge of generating join results between two data streams is that,
1167 at any point of time, the view of the dataset is incomplete for both sides of the join making
1168 it much harder to find matches between inputs. Any row received from one input stream can match
1169 with any future, yet-to-be-received row from the other input stream. Hence, for both the input
1170 streams, we buffer past input as streaming state, so that we can match every future input with
1171 past input and accordingly generate joined results. Furthermore, similar to streaming aggregations,
1172 we automatically handle late, out-of-order data and can limit the state using watermarks.
1173 Let’s discuss the different types of supported stream-stream joins and how to use them.
1174 
1175 ##### Inner Joins with optional Watermarking
1176 Inner joins on any kind of columns along with any kind of join conditions are supported.
1177 However, as the stream runs, the size of streaming state will keep growing indefinitely as
1178 *all* past input must be saved as any new input can match with any input from the past.
1179 To avoid unbounded state, you have to define additional join conditions such that indefinitely
1180 old inputs cannot match with future inputs and therefore can be cleared from the state.
1181 In other words, you will have to do the following additional steps in the join.
1182 
1183 1. Define watermark delays on both inputs such that the engine knows how delayed the input can be
1184 (similar to streaming aggregations)
1185 
1186 1. Define a constraint on event-time across the two inputs such that the engine can figure out when
1187 old rows of one input is not going to be required (i.e. will not satisfy the time constraint) for
1188 matches with the other input. This constraint can be defined in one of the two ways.
1189 
1190     1. Time range join conditions (e.g. `...JOIN ON leftTime BETWEEN rightTime AND rightTime + INTERVAL 1 HOUR`),
1191 
1192     1. Join on event-time windows (e.g. `...JOIN ON leftTimeWindow = rightTimeWindow`).
1193 
1194 Let’s understand this with an example.
1195 
1196 Let’s say we want to join a stream of advertisement impressions (when an ad was shown) with
1197 another stream of user clicks on advertisements to correlate when impressions led to
1198 monetizable clicks. To allow the state cleanup in this stream-stream join, you will have to
1199 specify the watermarking delays and the time constraints as follows.
1200 
1201 1. Watermark delays: Say, the impressions and the corresponding clicks can be late/out-of-order
1202 in event-time by at most 2 and 3 hours, respectively.
1203 
1204 1. Event-time range condition: Say, a click can occur within a time range of 0 seconds to 1 hour
1205 after the corresponding impression.
1206 
1207 The code would look like this.
1208 
1209 <div class="codetabs">
1210 <div data-lang="scala"  markdown="1">
1211 
1212 {% highlight scala %}
1213 import org.apache.spark.sql.functions.expr
1214 
1215 val impressions = spark.readStream. ...
1216 val clicks = spark.readStream. ...
1217 
1218 // Apply watermarks on event-time columns
1219 val impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 hours")
1220 val clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours")
1221 
1222 // Join with event-time constraints
1223 impressionsWithWatermark.join(
1224   clicksWithWatermark,
1225   expr("""
1226     clickAdId = impressionAdId AND
1227     clickTime >= impressionTime AND
1228     clickTime <= impressionTime + interval 1 hour
1229     """)
1230 )
1231 
1232 {% endhighlight %}
1233 
1234 </div>
1235 <div data-lang="java"  markdown="1">
1236 
1237 {% highlight java %}
1238 import static org.apache.spark.sql.functions.expr
1239 
1240 Dataset<Row> impressions = spark.readStream(). ...
1241 Dataset<Row> clicks = spark.readStream(). ...
1242 
1243 // Apply watermarks on event-time columns
1244 Dataset<Row> impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 hours");
1245 Dataset<Row> clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours");
1246 
1247 // Join with event-time constraints
1248 impressionsWithWatermark.join(
1249   clicksWithWatermark,
1250   expr(
1251     "clickAdId = impressionAdId AND " +
1252     "clickTime >= impressionTime AND " +
1253     "clickTime <= impressionTime + interval 1 hour ")
1254 );
1255 
1256 {% endhighlight %}
1257 
1258 
1259 </div>
1260 <div data-lang="python"  markdown="1">
1261 
1262 {% highlight python %}
1263 from pyspark.sql.functions import expr
1264 
1265 impressions = spark.readStream. ...
1266 clicks = spark.readStream. ...
1267 
1268 # Apply watermarks on event-time columns
1269 impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 hours")
1270 clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours")
1271 
1272 # Join with event-time constraints
1273 impressionsWithWatermark.join(
1274   clicksWithWatermark,
1275   expr("""
1276     clickAdId = impressionAdId AND
1277     clickTime >= impressionTime AND
1278     clickTime <= impressionTime + interval 1 hour
1279     """)
1280 )
1281 
1282 {% endhighlight %}
1283 
1284 </div>
1285 <div data-lang="r"  markdown="1">
1286 
1287 {% highlight r %}
1288 impressions <- read.stream(...)
1289 clicks <- read.stream(...)
1290 
1291 # Apply watermarks on event-time columns
1292 impressionsWithWatermark <- withWatermark(impressions, "impressionTime", "2 hours")
1293 clicksWithWatermark <- withWatermark(clicks, "clickTime", "3 hours")
1294 
1295 # Join with event-time constraints
1296 joined <- join(
1297   impressionsWithWatermark,
1298   clicksWithWatermark,
1299   expr(
1300     paste(
1301       "clickAdId = impressionAdId AND",
1302       "clickTime >= impressionTime AND",
1303       "clickTime <= impressionTime + interval 1 hour"
1304 )))
1305 
1306 {% endhighlight %}
1307 
1308 </div>
1309 </div>
1310 
1311 ###### Semantic Guarantees of Stream-stream Inner Joins with Watermarking
1312 {:.no_toc}
1313 This is similar to the [guarantees provided by watermarking on aggregations](#semantic-guarantees-of-aggregation-with-watermarking).
1314 A watermark delay of "2 hours" guarantees that the engine will never drop any data that is less than
1315  2 hours delayed. But data delayed by more than 2 hours may or may not get processed.
1316 
1317 ##### Outer Joins with Watermarking
1318 While the watermark + event-time constraints is optional for inner joins, for left and right outer
1319 joins they must be specified. This is because for generating the NULL results in outer join, the
1320 engine must know when an input row is not going to match with anything in future. Hence, the
1321 watermark + event-time constraints must be specified for generating correct results. Therefore,
1322 a query with outer-join will look quite like the ad-monetization example earlier, except that
1323 there will be an additional parameter specifying it to be an outer-join.
1324 
1325 <div class="codetabs">
1326 <div data-lang="scala"  markdown="1">
1327 
1328 {% highlight scala %}
1329 
1330 impressionsWithWatermark.join(
1331   clicksWithWatermark,
1332   expr("""
1333     clickAdId = impressionAdId AND
1334     clickTime >= impressionTime AND
1335     clickTime <= impressionTime + interval 1 hour
1336     """),
1337   joinType = "leftOuter"      // can be "inner", "leftOuter", "rightOuter"
1338  )
1339 
1340 {% endhighlight %}
1341 
1342 </div>
1343 <div data-lang="java"  markdown="1">
1344 
1345 {% highlight java %}
1346 impressionsWithWatermark.join(
1347   clicksWithWatermark,
1348   expr(
1349     "clickAdId = impressionAdId AND " +
1350     "clickTime >= impressionTime AND " +
1351     "clickTime <= impressionTime + interval 1 hour "),
1352   "leftOuter"                 // can be "inner", "leftOuter", "rightOuter"
1353 );
1354 
1355 {% endhighlight %}
1356 
1357 
1358 </div>
1359 <div data-lang="python"  markdown="1">
1360 
1361 {% highlight python %}
1362 impressionsWithWatermark.join(
1363   clicksWithWatermark,
1364   expr("""
1365     clickAdId = impressionAdId AND
1366     clickTime >= impressionTime AND
1367     clickTime <= impressionTime + interval 1 hour
1368     """),
1369   "leftOuter"                 # can be "inner", "leftOuter", "rightOuter"
1370 )
1371 
1372 {% endhighlight %}
1373 
1374 </div>
1375 <div data-lang="r"  markdown="1">
1376 
1377 {% highlight r %}
1378 joined <- join(
1379   impressionsWithWatermark,
1380   clicksWithWatermark,
1381   expr(
1382     paste(
1383       "clickAdId = impressionAdId AND",
1384       "clickTime >= impressionTime AND",
1385       "clickTime <= impressionTime + interval 1 hour"),
1386   "left_outer"                 # can be "inner", "left_outer", "right_outer"
1387 ))
1388 
1389 {% endhighlight %}
1390 
1391 </div>
1392 </div>
1393 
1394 
1395 ###### Semantic Guarantees of Stream-stream Outer Joins with Watermarking
1396 {:.no_toc}
1397 Outer joins have the same guarantees as [inner joins](#semantic-guarantees-of-stream-stream-inner-joins-with-watermarking)
1398 regarding watermark delays and whether data will be dropped or not.
1399 
1400 ###### Caveats
1401 {:.no_toc}
1402 There are a few important characteristics to note regarding how the outer results are generated.
1403 
1404 - *The outer NULL results will be generated with a delay that depends on the specified watermark
1405 delay and the time range condition.* This is because the engine has to wait for that long to ensure
1406 there were no matches and there will be no more matches in future.
1407 
1408 - In the current implementation in the micro-batch engine, watermarks are advanced at the end of a
1409 micro-batch, and the next micro-batch uses the updated watermark to clean up state and output
1410 outer results. Since we trigger a micro-batch only when there is new data to be processed, the
1411 generation of the outer result may get delayed if there no new data being received in the stream.
1412 *In short, if any of the two input streams being joined does not receive data for a while, the
1413 outer (both cases, left or right) output may get delayed.*
1414 
1415 ##### Support matrix for joins in streaming queries
1416 
1417 <table class ="table">
1418   <tr>
1419     <th>Left Input</th>
1420     <th>Right Input</th>
1421     <th>Join Type</th>
1422     <th></th>
1423   </tr>
1424   <tr>
1425       <td style="vertical-align: middle;">Static</td>
1426       <td style="vertical-align: middle;">Static</td>
1427       <td style="vertical-align: middle;">All types</td>
1428       <td style="vertical-align: middle;">
1429         Supported, since its not on streaming data even though it
1430         can be present in a streaming query
1431       </td>
1432   </tr>
1433   <tr>
1434     <td rowspan="4" style="vertical-align: middle;">Stream</td>
1435     <td rowspan="4" style="vertical-align: middle;">Static</td>
1436     <td style="vertical-align: middle;">Inner</td>
1437     <td style="vertical-align: middle;">Supported, not stateful</td>
1438   </tr>
1439   <tr>
1440     <td style="vertical-align: middle;">Left Outer</td>
1441     <td style="vertical-align: middle;">Supported, not stateful</td>
1442   </tr>
1443   <tr>
1444     <td style="vertical-align: middle;">Right Outer</td>
1445     <td style="vertical-align: middle;">Not supported</td>
1446   </tr>
1447   <tr>
1448     <td style="vertical-align: middle;">Full Outer</td>
1449     <td style="vertical-align: middle;">Not supported</td>
1450   </tr>
1451   <tr>
1452     <td rowspan="4" style="vertical-align: middle;">Static</td>
1453     <td rowspan="4" style="vertical-align: middle;">Stream</td>
1454     <td style="vertical-align: middle;">Inner</td>
1455     <td style="vertical-align: middle;">Supported, not stateful</td>
1456   </tr>
1457   <tr>
1458     <td style="vertical-align: middle;">Left Outer</td>
1459     <td style="vertical-align: middle;">Not supported</td>
1460   </tr>
1461   <tr>
1462     <td style="vertical-align: middle;">Right Outer</td>
1463     <td style="vertical-align: middle;">Supported, not stateful</td>
1464   </tr>
1465   <tr>
1466     <td style="vertical-align: middle;">Full Outer</td>
1467     <td style="vertical-align: middle;">Not supported</td>
1468   </tr>
1469   <tr>
1470     <td rowspan="4" style="vertical-align: middle;">Stream</td>
1471     <td rowspan="4" style="vertical-align: middle;">Stream</td>
1472     <td style="vertical-align: middle;">Inner</td>
1473     <td style="vertical-align: middle;">
1474       Supported, optionally specify watermark on both sides +
1475       time constraints for state cleanup
1476     </td>
1477   </tr>
1478   <tr>
1479     <td style="vertical-align: middle;">Left Outer</td>
1480     <td style="vertical-align: middle;">
1481       Conditionally supported, must specify watermark on right + time constraints for correct
1482       results, optionally specify watermark on left for all state cleanup
1483     </td>
1484   </tr>
1485   <tr>
1486     <td style="vertical-align: middle;">Right Outer</td>
1487     <td style="vertical-align: middle;">
1488       Conditionally supported, must specify watermark on left + time constraints for correct
1489       results, optionally specify watermark on right for all state cleanup
1490     </td>
1491   </tr>
1492   <tr>
1493     <td style="vertical-align: middle;">Full Outer</td>
1494     <td style="vertical-align: middle;">Not supported</td>
1495   </tr>
1496  <tr>
1497     <td></td>
1498     <td></td>
1499     <td></td>
1500     <td></td>
1501   </tr>
1502 </table>
1503 
1504 Additional details on supported joins:
1505 
1506 - Joins can be cascaded, that is, you can do `df1.join(df2, ...).join(df3, ...).join(df4, ....)`.
1507 
1508 - As of Spark 2.4, you can use joins only when the query is in Append output mode. Other output modes are not yet supported.
1509 
1510 - As of Spark 2.4, you cannot use other non-map-like operations before joins. Here are a few examples of
1511   what cannot be used.
1512 
1513   - Cannot use streaming aggregations before joins.
1514 
1515   - Cannot use mapGroupsWithState and flatMapGroupsWithState in Update mode before joins.
1516 
1517 ### Streaming Deduplication
1518 You can deduplicate records in data streams using a unique identifier in the events. This is exactly same as deduplication on static using a unique identifier column. The query will store the necessary amount of data from previous records such that it can filter duplicate records. Similar to aggregations, you can use deduplication with or without watermarking.
1519 
1520 - *With watermark* - If there is an upper bound on how late a duplicate record may arrive, then you can define a watermark on an event time column and deduplicate using both the guid and the event time columns. The query will use the watermark to remove old state data from past records that are not expected to get any duplicates any more. This bounds the amount of the state the query has to maintain.
1521 
1522 - *Without watermark* - Since there are no bounds on when a duplicate record may arrive, the query stores the data from all the past records as state.
1523 
1524 <div class="codetabs">
1525 <div data-lang="scala"  markdown="1">
1526 
1527 {% highlight scala %}
1528 val streamingDf = spark.readStream. ...  // columns: guid, eventTime, ...
1529 
1530 // Without watermark using guid column
1531 streamingDf.dropDuplicates("guid")
1532 
1533 // With watermark using guid and eventTime columns
1534 streamingDf
1535   .withWatermark("eventTime", "10 seconds")
1536   .dropDuplicates("guid", "eventTime")
1537 {% endhighlight %}
1538 
1539 </div>
1540 <div data-lang="java"  markdown="1">
1541 
1542 {% highlight java %}
1543 Dataset<Row> streamingDf = spark.readStream(). ...;  // columns: guid, eventTime, ...
1544 
1545 // Without watermark using guid column
1546 streamingDf.dropDuplicates("guid");
1547 
1548 // With watermark using guid and eventTime columns
1549 streamingDf
1550   .withWatermark("eventTime", "10 seconds")
1551   .dropDuplicates("guid", "eventTime");
1552 {% endhighlight %}
1553 
1554 
1555 </div>
1556 <div data-lang="python"  markdown="1">
1557 
1558 {% highlight python %}
1559 streamingDf = spark.readStream. ...
1560 
1561 # Without watermark using guid column
1562 streamingDf.dropDuplicates("guid")
1563 
1564 # With watermark using guid and eventTime columns
1565 streamingDf \
1566   .withWatermark("eventTime", "10 seconds") \
1567   .dropDuplicates("guid", "eventTime")
1568 {% endhighlight %}
1569 
1570 </div>
1571 <div data-lang="r"  markdown="1">
1572 
1573 {% highlight r %}
1574 streamingDf <- read.stream(...)
1575 
1576 # Without watermark using guid column
1577 streamingDf <- dropDuplicates(streamingDf, "guid")
1578 
1579 # With watermark using guid and eventTime columns
1580 streamingDf <- withWatermark(streamingDf, "eventTime", "10 seconds")
1581 streamingDf <- dropDuplicates(streamingDf, "guid", "eventTime")
1582 {% endhighlight %}
1583 
1584 </div>
1585 </div>
1586 
1587 ### Policy for handling multiple watermarks
1588 A streaming query can have multiple input streams that are unioned or joined together.
1589 Each of the input streams can have a different threshold of late data that needs to
1590 be tolerated for stateful operations. You specify these thresholds using
1591 ``withWatermarks("eventTime", delay)`` on each of the input streams. For example, consider
1592 a query with stream-stream joins between `inputStream1` and `inputStream2`.
1593     
1594 <div class="codetabs">
1595 <div data-lang="scala"  markdown="1">
1596 
1597 {% highlight scala %}
1598 inputStream1.withWatermark("eventTime1", "1 hour")
1599   .join(
1600     inputStream2.withWatermark("eventTime2", "2 hours"),
1601     joinCondition)
1602 {% endhighlight %}
1603 
1604 </div>
1605 </div>
1606 
1607 While executing the query, Structured Streaming individually tracks the maximum
1608 event time seen in each input stream, calculates watermarks based on the corresponding delay,
1609 and chooses a single global watermark with them to be used for stateful operations. By default,
1610 the minimum is chosen as the global watermark because it ensures that no data is
1611 accidentally dropped as too late if one of the streams falls behind the others
1612 (for example, one of the streams stops receiving data due to upstream failures). In other words,
1613 the global watermark will safely move at the pace of the slowest stream and the query output will
1614 be delayed accordingly.
1615 
1616 However, in some cases, you may want to get faster results even if it means dropping data from the
1617 slowest stream. Since Spark 2.4, you can set the multiple watermark policy to choose
1618 the maximum value as the global watermark by setting the SQL configuration
1619 ``spark.sql.streaming.multipleWatermarkPolicy`` to ``max`` (default is ``min``). 
1620 This lets the global watermark move at the pace of the fastest stream.
1621 However, as a side effect, data from the slower streams will be aggressively dropped. Hence, use
1622 this configuration judiciously.
1623 
1624 ### Arbitrary Stateful Operations
1625 Many usecases require more advanced stateful operations than aggregations. For example, in many usecases, you have to track sessions from data streams of events. For doing such sessionization, you will have to save arbitrary types of data as state, and perform arbitrary operations on the state using the data stream events in every trigger. Since Spark 2.2, this can be done using the operation `mapGroupsWithState` and the more powerful operation `flatMapGroupsWithState`. Both operations allow you to apply user-defined code on grouped Datasets to update user-defined state. For more concrete details, take a look at the API documentation ([Scala](api/scala/org/apache/spark/sql/streaming/GroupState.html)/[Java](api/java/org/apache/spark/sql/streaming/GroupState.html)) and the examples ([Scala]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala)/[Java]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredSessionization.java)).
1626 
1627 Though Spark cannot check and force it, the state function should be implemented with respect to the semantics of the output mode. For example, in Update mode Spark doesn't expect that the state function will emit rows which are older than current watermark plus allowed late record delay, whereas in Append mode the state function can emit these rows.
1628 
1629 ### Unsupported Operations
1630 There are a few DataFrame/Dataset operations that are not supported with streaming DataFrames/Datasets. 
1631 Some of them are as follows.
1632  
1633 - Multiple streaming aggregations (i.e. a chain of aggregations on a streaming DF) are not yet supported on streaming Datasets.
1634 
1635 - Limit and take the first N rows are not supported on streaming Datasets.
1636 
1637 - Distinct operations on streaming Datasets are not supported.
1638 
1639 - Sorting operations are supported on streaming Datasets only after an aggregation and in Complete Output Mode.
1640 
1641 - Few types of outer joins on streaming Datasets are not supported. See the
1642   <a href="#support-matrix-for-joins-in-streaming-queries">support matrix in the Join Operations section</a>
1643   for more details.
1644 
1645 In addition, there are some Dataset methods that will not work on streaming Datasets. They are actions that will immediately run queries and return results, which does not make sense on a streaming Dataset. Rather, those functionalities can be done by explicitly starting a streaming query (see the next section regarding that).
1646 
1647 - `count()` - Cannot return a single count from a streaming Dataset. Instead, use `ds.groupBy().count()` which returns a streaming Dataset containing a running count. 
1648 
1649 - `foreach()` - Instead use `ds.writeStream.foreach(...)` (see next section).
1650 
1651 - `show()` - Instead use the console sink (see next section).
1652 
1653 If you try any of these operations, you will see an `AnalysisException` like "operation XYZ is not supported with streaming DataFrames/Datasets".
1654 While some of them may be supported in future releases of Spark, 
1655 there are others which are fundamentally hard to implement on streaming data efficiently. 
1656 For example, sorting on the input stream is not supported, as it requires keeping 
1657 track of all the data received in the stream. This is therefore fundamentally hard to execute 
1658 efficiently.
1659 
1660 ### Limitation of global watermark
1661 
1662 In Append mode, if a stateful operation emits rows older than current watermark plus allowed late record delay,
1663 they will be "late rows" in downstream stateful operations (as Spark uses global watermark). Note that these rows may be discarded.
1664 This is a limitation of a global watermark, and it could potentially cause a correctness issue.
1665 
1666 Spark will check the logical plan of query and log a warning when Spark detects such a pattern.
1667 
1668 Any of the stateful operation(s) after any of below stateful operations can have this issue:
1669 
1670 * streaming aggregation in Append mode
1671 * stream-stream outer join
1672 * `mapGroupsWithState` and `flatMapGroupsWithState` in Append mode (depending on the implementation of the state function)
1673 
1674 As Spark cannot check the state function of `mapGroupsWithState`/`flatMapGroupsWithState`, Spark assumes that the state function
1675 emits late rows if the operator uses Append mode.
1676 
1677 There's a known workaround: split your streaming query into multiple queries per stateful operator, and ensure
1678 end-to-end exactly once per query. Ensuring end-to-end exactly once for the last query is optional.
1679 
1680 ## Starting Streaming Queries
1681 Once you have defined the final result DataFrame/Dataset, all that is left is for you to start the streaming computation. To do that, you have to use the `DataStreamWriter`
1682 ([Scala](api/scala/org/apache/spark/sql/streaming/DataStreamWriter.html)/[Java](api/java/org/apache/spark/sql/streaming/DataStreamWriter.html)/[Python](api/python/pyspark.sql.html#pyspark.sql.streaming.DataStreamWriter) docs)
1683 returned through `Dataset.writeStream()`. You will have to specify one or more of the following in this interface.
1684 
1685 - *Details of the output sink:* Data format, location, etc.
1686 
1687 - *Output mode:* Specify what gets written to the output sink.
1688 
1689 - *Query name:* Optionally, specify a unique name of the query for identification.
1690 
1691 - *Trigger interval:* Optionally, specify the trigger interval. If it is not specified, the system will check for availability of new data as soon as the previous processing has been completed. If a trigger time is missed because the previous processing has not been completed, then the system will trigger processing immediately.
1692 
1693 - *Checkpoint location:* For some output sinks where the end-to-end fault-tolerance can be guaranteed, specify the location where the system will write all the checkpoint information. This should be a directory in an HDFS-compatible fault-tolerant file system. The semantics of checkpointing is discussed in more detail in the next section.
1694 
1695 #### Output Modes
1696 There are a few types of output modes.
1697 
1698 - **Append mode (default)** - This is the default mode, where only the 
1699 new rows added to the Result Table since the last trigger will be 
1700 outputted to the sink. This is supported for only those queries where 
1701 rows added to the Result Table is never going to change. Hence, this mode 
1702 guarantees that each row will be output only once (assuming 
1703 fault-tolerant sink). For example, queries with only `select`, 
1704 `where`, `map`, `flatMap`, `filter`, `join`, etc. will support Append mode.
1705 
1706 - **Complete mode** - The whole Result Table will be outputted to the sink after every trigger.
1707  This is supported for aggregation queries.
1708 
1709 - **Update mode** - (*Available since Spark 2.1.1*) Only the rows in the Result Table that were 
1710 updated since the last trigger will be outputted to the sink. 
1711 More information to be added in future releases.
1712 
1713 Different types of streaming queries support different output modes.
1714 Here is the compatibility matrix.
1715 
1716 <table class="table">
1717   <tr>
1718     <th>Query Type</th>
1719     <th></th>
1720     <th>Supported Output Modes</th>
1721     <th>Notes</th>        
1722   </tr>
1723   <tr>
1724     <td rowspan="2" style="vertical-align: middle;">Queries with aggregation</td>
1725     <td style="vertical-align: middle;">Aggregation on event-time with watermark</td>
1726     <td style="vertical-align: middle;">Append, Update, Complete</td>
1727     <td>
1728         Append mode uses watermark to drop old aggregation state. But the output of a 
1729         windowed aggregation is delayed the late threshold specified in <code>withWatermark()</code> as by
1730         the modes semantics, rows can be added to the Result Table only once after they are 
1731         finalized (i.e. after watermark is crossed). See the
1732         <a href="#handling-late-data-and-watermarking">Late Data</a> section for more details.
1733         <br/><br/>
1734         Update mode uses watermark to drop old aggregation state.
1735         <br/><br/>
1736         Complete mode does not drop old aggregation state since by definition this mode
1737         preserves all data in the Result Table.
1738     </td>    
1739   </tr>
1740   <tr>
1741     <td style="vertical-align: middle;">Other aggregations</td>
1742     <td style="vertical-align: middle;">Complete, Update</td>
1743     <td>
1744         Since no watermark is defined (only defined in other category), 
1745         old aggregation state is not dropped.
1746         <br/><br/>
1747         Append mode is not supported as aggregates can update thus violating the semantics of 
1748         this mode.
1749     </td>  
1750   </tr>
1751   <tr>
1752     <td colspan="2" style="vertical-align: middle;">Queries with <code>mapGroupsWithState</code></td>
1753     <td style="vertical-align: middle;">Update</td>
1754     <td style="vertical-align: middle;"></td>
1755   </tr>
1756   <tr>
1757     <td rowspan="2" style="vertical-align: middle;">Queries with <code>flatMapGroupsWithState</code></td>
1758     <td style="vertical-align: middle;">Append operation mode</td>
1759     <td style="vertical-align: middle;">Append</td>
1760     <td style="vertical-align: middle;">
1761       Aggregations are allowed after <code>flatMapGroupsWithState</code>.
1762     </td>
1763   </tr>
1764   <tr>
1765     <td style="vertical-align: middle;">Update operation mode</td>
1766     <td style="vertical-align: middle;">Update</td>
1767     <td style="vertical-align: middle;">
1768       Aggregations not allowed after <code>flatMapGroupsWithState</code>.
1769     </td>
1770   </tr>
1771   <tr>
1772       <td colspan="2" style="vertical-align: middle;">Queries with <code>joins</code></td>
1773       <td style="vertical-align: middle;">Append</td>
1774       <td style="vertical-align: middle;">
1775         Update and Complete mode not supported yet. See the
1776         <a href="#support-matrix-for-joins-in-streaming-queries">support matrix in the Join Operations section</a>
1777          for more details on what types of joins are supported.
1778       </td>
1779     </tr>
1780   <tr>
1781     <td colspan="2" style="vertical-align: middle;">Other queries</td>
1782     <td style="vertical-align: middle;">Append, Update</td>
1783     <td style="vertical-align: middle;">
1784       Complete mode not supported as it is infeasible to keep all unaggregated data in the Result Table.
1785     </td>
1786   </tr>
1787   <tr>
1788     <td></td>
1789     <td></td>
1790     <td></td>
1791     <td></td>
1792   </tr>
1793 </table>
1794 
1795 
1796 #### Output Sinks
1797 There are a few types of built-in output sinks.
1798 
1799 - **File sink** - Stores the output to a directory.
1800 
1801 {% highlight scala %}
1802 writeStream
1803     .format("parquet")        // can be "orc", "json", "csv", etc.
1804     .option("path", "path/to/destination/dir")
1805     .start()
1806 {% endhighlight %}
1807 
1808 - **Kafka sink** - Stores the output to one or more topics in Kafka.
1809 
1810 {% highlight scala %}
1811 writeStream
1812     .format("kafka")
1813     .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
1814     .option("topic", "updates")
1815     .start()
1816 {% endhighlight %}
1817 
1818 - **Foreach sink** - Runs arbitrary computation on the records in the output. See later in the section for more details.
1819 
1820 {% highlight scala %}
1821 writeStream
1822     .foreach(...)
1823     .start()
1824 {% endhighlight %}
1825 
1826 - **Console sink (for debugging)** - Prints the output to the console/stdout every time there is a trigger. Both, Append and Complete output modes, are supported. This should be used for debugging purposes on low data volumes as the entire output is collected and stored in the driver's memory after every trigger.
1827 
1828 {% highlight scala %}
1829 writeStream
1830     .format("console")
1831     .start()
1832 {% endhighlight %}
1833 
1834 - **Memory sink (for debugging)** - The output is stored in memory as an in-memory table.
1835 Both, Append and Complete output modes, are supported. This should be used for debugging purposes
1836 on low data volumes as the entire output is collected and stored in the driver's memory.
1837 Hence, use it with caution.
1838 
1839 {% highlight scala %}
1840 writeStream
1841     .format("memory")
1842     .queryName("tableName")
1843     .start()
1844 {% endhighlight %}
1845 
1846 Some sinks are not fault-tolerant because they do not guarantee persistence of the output and are 
1847 meant for debugging purposes only. See the earlier section on 
1848 [fault-tolerance semantics](#fault-tolerance-semantics). 
1849 Here are the details of all the sinks in Spark.
1850 
1851 <table class="table">
1852   <tr>
1853     <th>Sink</th>
1854     <th>Supported Output Modes</th>
1855     <th>Options</th>
1856     <th>Fault-tolerant</th>
1857     <th>Notes</th>
1858   </tr>
1859   <tr>
1860     <td><b>File Sink</b></td>
1861     <td>Append</td>
1862     <td>
1863         <code>path</code>: path to the output directory, must be specified.
1864         <br/><br/>
1865         For file-format-specific options, see the related methods in DataFrameWriter
1866         (<a href="api/scala/org/apache/spark/sql/DataFrameWriter.html">Scala</a>/<a href="api/java/org/apache/spark/sql/DataFrameWriter.html">Java</a>/<a href="api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter">Python</a>/<a
1867         href="api/R/write.stream.html">R</a>).
1868         E.g. for "parquet" format options see <code>DataFrameWriter.parquet()</code>
1869     </td>
1870     <td>Yes (exactly-once)</td>
1871     <td>Supports writes to partitioned tables. Partitioning by time may be useful.</td>
1872   </tr>
1873   <tr>
1874     <td><b>Kafka Sink</b></td>
1875     <td>Append, Update, Complete</td>
1876     <td>See the <a href="structured-streaming-kafka-integration.html">Kafka Integration Guide</a></td>
1877     <td>Yes (at-least-once)</td>
1878     <td>More details in the <a href="structured-streaming-kafka-integration.html">Kafka Integration Guide</a></td>
1879   </tr>
1880   <tr>
1881     <td><b>Foreach Sink</b></td>
1882     <td>Append, Update, Complete</td>
1883     <td>None</td>
1884     <td>Yes (at-least-once)</td>
1885     <td>More details in the <a href="#using-foreach-and-foreachbatch">next section</a></td>
1886   </tr>
1887   <tr>
1888       <td><b>ForeachBatch Sink</b></td>
1889       <td>Append, Update, Complete</td>
1890       <td>None</td>
1891       <td>Depends on the implementation</td>
1892       <td>More details in the <a href="#using-foreach-and-foreachbatch">next section</a></td>
1893     </tr>
1894     
1895   <tr>
1896     <td><b>Console Sink</b></td>
1897     <td>Append, Update, Complete</td>
1898     <td>
1899         <code>numRows</code>: Number of rows to print every trigger (default: 20)
1900         <br/>
1901         <code>truncate</code>: Whether to truncate the output if too long (default: true)
1902     </td>
1903     <td>No</td>
1904     <td></td>
1905   </tr>
1906   <tr>
1907     <td><b>Memory Sink</b></td>
1908     <td>Append, Complete</td>
1909     <td>None</td>
1910     <td>No. But in Complete Mode, restarted query will recreate the full table.</td>
1911     <td>Table name is the query name.</td>
1912   </tr>
1913   <tr>
1914     <td></td>
1915     <td></td>
1916     <td></td>
1917     <td></td>
1918     <td></td>
1919   </tr>
1920 </table>
1921 
1922 Note that you have to call `start()` to actually start the execution of the query. This returns a StreamingQuery object which is a handle to the continuously running execution. You can use this object to manage the query, which we will discuss in the next subsection. For now, let’s understand all this with a few examples.
1923 
1924 
1925 <div class="codetabs">
1926 <div data-lang="scala"  markdown="1">
1927 
1928 {% highlight scala %}
1929 // ========== DF with no aggregations ==========
1930 val noAggDF = deviceDataDf.select("device").where("signal > 10")   
1931 
1932 // Print new data to console
1933 noAggDF
1934   .writeStream
1935   .format("console")
1936   .start()
1937 
1938 // Write new data to Parquet files
1939 noAggDF
1940   .writeStream
1941   .format("parquet")
1942   .option("checkpointLocation", "path/to/checkpoint/dir")
1943   .option("path", "path/to/destination/dir")
1944   .start()
1945 
1946 // ========== DF with aggregation ==========
1947 val aggDF = df.groupBy("device").count()
1948 
1949 // Print updated aggregations to console
1950 aggDF
1951   .writeStream
1952   .outputMode("complete")
1953   .format("console")
1954   .start()
1955 
1956 // Have all the aggregates in an in-memory table
1957 aggDF
1958   .writeStream
1959   .queryName("aggregates")    // this query name will be the table name
1960   .outputMode("complete")
1961   .format("memory")
1962   .start()
1963 
1964 spark.sql("select * from aggregates").show()   // interactively query in-memory table
1965 {% endhighlight %}
1966 
1967 </div>
1968 <div data-lang="java"  markdown="1">
1969 
1970 {% highlight java %}
1971 // ========== DF with no aggregations ==========
1972 Dataset<Row> noAggDF = deviceDataDf.select("device").where("signal > 10");
1973 
1974 // Print new data to console
1975 noAggDF
1976   .writeStream()
1977   .format("console")
1978   .start();
1979 
1980 // Write new data to Parquet files
1981 noAggDF
1982   .writeStream()
1983   .format("parquet")
1984   .option("checkpointLocation", "path/to/checkpoint/dir")
1985   .option("path", "path/to/destination/dir")
1986   .start();
1987 
1988 // ========== DF with aggregation ==========
1989 Dataset<Row> aggDF = df.groupBy("device").count();
1990 
1991 // Print updated aggregations to console
1992 aggDF
1993   .writeStream()
1994   .outputMode("complete")
1995   .format("console")
1996   .start();
1997 
1998 // Have all the aggregates in an in-memory table
1999 aggDF
2000   .writeStream()
2001   .queryName("aggregates")    // this query name will be the table name
2002   .outputMode("complete")
2003   .format("memory")
2004   .start();
2005 
2006 spark.sql("select * from aggregates").show();   // interactively query in-memory table
2007 {% endhighlight %}
2008 
2009 </div>
2010 <div data-lang="python"  markdown="1">
2011 
2012 {% highlight python %}
2013 # ========== DF with no aggregations ==========
2014 noAggDF = deviceDataDf.select("device").where("signal > 10")   
2015 
2016 # Print new data to console
2017 noAggDF \
2018     .writeStream \
2019     .format("console") \
2020     .start()
2021 
2022 # Write new data to Parquet files
2023 noAggDF \
2024     .writeStream \
2025     .format("parquet") \
2026     .option("checkpointLocation", "path/to/checkpoint/dir") \
2027     .option("path", "path/to/destination/dir") \
2028     .start()
2029 
2030 # ========== DF with aggregation ==========
2031 aggDF = df.groupBy("device").count()
2032 
2033 # Print updated aggregations to console
2034 aggDF \
2035     .writeStream \
2036     .outputMode("complete") \
2037     .format("console") \
2038     .start()
2039 
2040 # Have all the aggregates in an in-memory table. The query name will be the table name
2041 aggDF \
2042     .writeStream \
2043     .queryName("aggregates") \
2044     .outputMode("complete") \
2045     .format("memory") \
2046     .start()
2047 
2048 spark.sql("select * from aggregates").show()   # interactively query in-memory table
2049 {% endhighlight %}
2050 
2051 </div>
2052 <div data-lang="r"  markdown="1">
2053 
2054 {% highlight r %}
2055 # ========== DF with no aggregations ==========
2056 noAggDF <- select(where(deviceDataDf, "signal > 10"), "device")
2057 
2058 # Print new data to console
2059 write.stream(noAggDF, "console")
2060 
2061 # Write new data to Parquet files
2062 write.stream(noAggDF,
2063              "parquet",
2064              path = "path/to/destination/dir",
2065              checkpointLocation = "path/to/checkpoint/dir")
2066 
2067 # ========== DF with aggregation ==========
2068 aggDF <- count(groupBy(df, "device"))
2069 
2070 # Print updated aggregations to console
2071 write.stream(aggDF, "console", outputMode = "complete")
2072 
2073 # Have all the aggregates in an in memory table. The query name will be the table name
2074 write.stream(aggDF, "memory", queryName = "aggregates", outputMode = "complete")
2075 
2076 # Interactively query in-memory table
2077 head(sql("select * from aggregates"))
2078 {% endhighlight %}
2079 
2080 </div>
2081 </div>
2082 
2083 ##### Using Foreach and ForeachBatch
2084 The `foreach` and `foreachBatch` operations allow you to apply arbitrary operations and writing 
2085 logic on the output of a streaming query. They have slightly different use cases - while `foreach` 
2086 allows custom write logic on every row, `foreachBatch` allows arbitrary operations 
2087 and custom logic on the output of each micro-batch. Let's understand their usages in more detail.  
2088 
2089 ###### ForeachBatch
2090 `foreachBatch(...)` allows you to specify a function that is executed on 
2091 the output data of every micro-batch of a streaming query. Since Spark 2.4, this is supported in Scala, Java and Python. 
2092 It takes two parameters: a DataFrame or Dataset that has the output data of a micro-batch and the unique ID of the micro-batch.
2093 
2094 <div class="codetabs">
2095 <div data-lang="scala"  markdown="1">
2096 
2097 {% highlight scala %}
2098 streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
2099   // Transform and write batchDF 
2100 }.start()
2101 {% endhighlight %}
2102 
2103 </div>
2104 <div data-lang="java"  markdown="1">
2105 
2106 {% highlight java %}
2107 streamingDatasetOfString.writeStream().foreachBatch(
2108   new VoidFunction2<Dataset<String>, Long>() {
2109     public void call(Dataset<String> dataset, Long batchId) {
2110       // Transform and write batchDF
2111     }    
2112   }
2113 ).start();
2114 {% endhighlight %}
2115 
2116 </div>
2117 <div data-lang="python"  markdown="1">
2118 
2119 {% highlight python %}
2120 def foreach_batch_function(df, epoch_id):
2121     # Transform and write batchDF
2122     pass
2123   
2124 streamingDF.writeStream.foreachBatch(foreach_batch_function).start()   
2125 {% endhighlight %}
2126 
2127 </div>
2128 <div data-lang="r"  markdown="1">
2129 R is not yet supported.
2130 </div>
2131 </div>
2132 
2133 With `foreachBatch`, you can do the following.
2134 
2135 - **Reuse existing batch data sources** - For many storage systems, there may not be a streaming sink available yet, 
2136   but there may already exist a data writer for batch queries. Using `foreachBatch`, you can use the batch
2137   data writers on the output of each micro-batch.
2138 - **Write to multiple locations** - If you want to write the output of a streaming query to multiple locations, 
2139   then you can simply write the output DataFrame/Dataset multiple times. However, each attempt to write can 
2140   cause the output data to be recomputed (including possible re-reading of the input data). To avoid recomputations,
2141   you should cache the output DataFrame/Dataset, write it to multiple locations, and then uncache it. Here is an outline.  
2142 
2143 <div class="codetabs">
2144 <div data-lang="scala"  markdown="1">
2145 
2146 {% highlight scala %}
2147 streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
2148   batchDF.persist()
2149   batchDF.write.format(...).save(...)  // location 1
2150   batchDF.write.format(...).save(...)  // location 2
2151   batchDF.unpersist()
2152 }
2153 {% endhighlight %}
2154 
2155 </div>
2156 </div>
2157 
2158 - **Apply additional DataFrame operations** - Many DataFrame and Dataset operations are not supported 
2159   in streaming DataFrames because Spark does not support generating incremental plans in those cases. 
2160   Using `foreachBatch`, you can apply some of these operations on each micro-batch output. However, you will have to reason about the end-to-end semantics of doing that operation yourself.
2161 
2162 **Note:**
2163 - By default, `foreachBatch` provides only at-least-once write guarantees. However, you can use the 
2164   batchId provided to the function as way to deduplicate the output and get an exactly-once guarantee.  
2165 - `foreachBatch` does not work with the continuous processing mode as it fundamentally relies on the
2166   micro-batch execution of a streaming query. If you write data in the continuous mode, use `foreach` instead.
2167 
2168 
2169 ###### Foreach
2170 If `foreachBatch` is not an option (for example, corresponding batch data writer does not exist, or 
2171 continuous processing mode), then you can express your custom writer logic using `foreach`. 
2172 Specifically, you can express the data writing logic by dividing it into three methods: `open`, `process`, and `close`.
2173 Since Spark 2.4, `foreach` is available in Scala, Java and Python. 
2174 
2175 <div class="codetabs">
2176 <div data-lang="scala"  markdown="1">
2177 
2178 In Scala, you have to extend the class `ForeachWriter` ([docs](api/scala/org/apache/spark/sql/ForeachWriter.html)).
2179 
2180 {% highlight scala %}
2181 streamingDatasetOfString.writeStream.foreach(
2182   new ForeachWriter[String] {
2183 
2184     def open(partitionId: Long, version: Long): Boolean = {
2185       // Open connection
2186     }
2187 
2188     def process(record: String): Unit = {
2189       // Write string to connection
2190     }
2191 
2192     def close(errorOrNull: Throwable): Unit = {
2193       // Close the connection
2194     }
2195   }
2196 ).start()
2197 {% endhighlight %}
2198 
2199 </div>
2200 <div data-lang="java"  markdown="1">
2201 
2202 In Java, you have to extend the class `ForeachWriter` ([docs](api/java/org/apache/spark/sql/ForeachWriter.html)).
2203 {% highlight java %}
2204 streamingDatasetOfString.writeStream().foreach(
2205   new ForeachWriter<String>() {
2206 
2207     @Override public boolean open(long partitionId, long version) {
2208       // Open connection
2209     }
2210 
2211     @Override public void process(String record) {
2212       // Write string to connection
2213     }
2214 
2215     @Override public void close(Throwable errorOrNull) {
2216       // Close the connection
2217     }
2218   }
2219 ).start();
2220 
2221 {% endhighlight %}
2222 
2223 </div>
2224 <div data-lang="python"  markdown="1">
2225 
2226 In Python, you can invoke foreach in two ways: in a function or in an object. 
2227 The function offers a simple way to express your processing logic but does not allow you to 
2228 deduplicate generated data when failures cause reprocessing of some input data. 
2229 For that situation you must specify the processing logic in an object.
2230 
2231 - First, the function takes a row as input.
2232 
2233 {% highlight python %}
2234 def process_row(row):
2235     # Write row to storage
2236     pass
2237 
2238 query = streamingDF.writeStream.foreach(process_row).start()  
2239 {% endhighlight %}
2240 
2241 - Second, the object has a process method and optional open and close methods:
2242 
2243 {% highlight python %}
2244 class ForeachWriter:
2245     def open(self, partition_id, epoch_id):
2246         # Open connection. This method is optional in Python.
2247         pass
2248 
2249     def process(self, row):
2250         # Write row to connection. This method is NOT optional in Python.
2251         pass
2252 
2253     def close(self, error):
2254         # Close the connection. This method in optional in Python.
2255         pass
2256       
2257 query = streamingDF.writeStream.foreach(ForeachWriter()).start()
2258 {% endhighlight %}
2259 
2260 </div>
2261 <div data-lang="r"  markdown="1">
2262 R is not yet supported.
2263 </div>
2264 </div>
2265 
2266 
2267 **Execution semantics**
2268 When the streaming query is started, Spark calls the function or the object’s methods in the following way:
2269 
2270 - A single copy of this object is responsible for all the data generated by a single task in a query. 
2271   In other words, one instance is responsible for processing one partition of the data generated in a distributed manner.
2272 
2273 - This object must be serializable, because each task will get a fresh serialized-deserialized copy 
2274   of the provided object. Hence, it is strongly recommended that any initialization for writing data 
2275   (for example. opening a connection or starting a transaction) is done after the open() method has 
2276   been called, which signifies that the task is ready to generate data.
2277 
2278 - The lifecycle of the methods are as follows:
2279 
2280   - For each partition with partition_id:
2281 
2282     - For each batch/epoch of streaming data with epoch_id:
2283 
2284       - Method open(partitionId, epochId) is called.
2285 
2286       - If open(...) returns true, for each row in the partition and batch/epoch, method process(row) is called.
2287 
2288       - Method close(error) is called with error (if any) seen while processing rows.
2289 
2290 - The close() method (if it exists) is called if an open() method exists and returns successfully (irrespective of the return value), except if the JVM or Python process crashes in the middle.
2291 
2292 - **Note:** Spark does not guarantee same output for (partitionId, epochId), so deduplication
2293   cannot be achieved with (partitionId, epochId). e.g. source provides different number of
2294   partitions for some reasons, Spark optimization changes number of partitions, etc.
2295   See [SPARK-28650](https://issues.apache.org/jira/browse/SPARK-28650) for more details.
2296   If you need deduplication on output, try out `foreachBatch` instead.
2297 
2298 #### Triggers
2299 The trigger settings of a streaming query define the timing of streaming data processing, whether
2300 the query is going to be executed as micro-batch query with a fixed batch interval or as a continuous processing query.
2301 Here are the different kinds of triggers that are supported.
2302 
2303 <table class="table">
2304   <tr>
2305     <th>Trigger Type</th>
2306     <th>Description</th>
2307   </tr>
2308   <tr>
2309     <td><i>unspecified (default)</i></td>
2310     <td>
2311         If no trigger setting is explicitly specified, then by default, the query will be
2312         executed in micro-batch mode, where micro-batches will be generated as soon as
2313         the previous micro-batch has completed processing.
2314     </td>
2315   </tr>
2316   <tr>
2317     <td><b>Fixed interval micro-batches</b></td>
2318     <td>
2319         The query will be executed with micro-batches mode, where micro-batches will be kicked off
2320         at the user-specified intervals.
2321         <ul>
2322           <li>If the previous micro-batch completes within the interval, then the engine will wait until
2323           the interval is over before kicking off the next micro-batch.</li>
2324 
2325           <li>If the previous micro-batch takes longer than the interval to complete (i.e. if an
2326           interval boundary is missed), then the next micro-batch will start as soon as the
2327           previous one completes (i.e., it will not wait for the next interval boundary).</li>
2328 
2329           <li>If no new data is available, then no micro-batch will be kicked off.</li>
2330         </ul>
2331     </td>
2332   </tr>
2333   <tr>
2334     <td><b>One-time micro-batch</b></td>
2335     <td>
2336         The query will execute <strong>only one</strong> micro-batch to process all the available data and then
2337         stop on its own. This is useful in scenarios you want to periodically spin up a cluster,
2338         process everything that is available since the last period, and then shutdown the
2339         cluster. In some case, this may lead to significant cost savings.
2340     </td>
2341   </tr>
2342   <tr>
2343     <td><b>Continuous with fixed checkpoint interval</b><br/><i>(experimental)</i></td>
2344     <td>
2345         The query will be executed in the new low-latency, continuous processing mode. Read more
2346         about this in the <a href="#continuous-processing">Continuous Processing section</a> below.
2347     </td>
2348   </tr>
2349 </table>
2350 
2351 Here are a few code examples.
2352 
2353 <div class="codetabs">
2354 <div data-lang="scala"  markdown="1">
2355 
2356 {% highlight scala %}
2357 import org.apache.spark.sql.streaming.Trigger
2358 
2359 // Default trigger (runs micro-batch as soon as it can)
2360 df.writeStream
2361   .format("console")
2362   .start()
2363 
2364 // ProcessingTime trigger with two-seconds micro-batch interval
2365 df.writeStream
2366   .format("console")
2367   .trigger(Trigger.ProcessingTime("2 seconds"))
2368   .start()
2369 
2370 // One-time trigger
2371 df.writeStream
2372   .format("console")
2373   .trigger(Trigger.Once())
2374   .start()
2375 
2376 // Continuous trigger with one-second checkpointing interval
2377 df.writeStream
2378   .format("console")
2379   .trigger(Trigger.Continuous("1 second"))
2380   .start()
2381 
2382 {% endhighlight %}
2383 
2384 
2385 </div>
2386 <div data-lang="java"  markdown="1">
2387 
2388 {% highlight java %}
2389 import org.apache.spark.sql.streaming.Trigger
2390 
2391 // Default trigger (runs micro-batch as soon as it can)
2392 df.writeStream
2393   .format("console")
2394   .start();
2395 
2396 // ProcessingTime trigger with two-seconds micro-batch interval
2397 df.writeStream
2398   .format("console")
2399   .trigger(Trigger.ProcessingTime("2 seconds"))
2400   .start();
2401 
2402 // One-time trigger
2403 df.writeStream
2404   .format("console")
2405   .trigger(Trigger.Once())
2406   .start();
2407 
2408 // Continuous trigger with one-second checkpointing interval
2409 df.writeStream
2410   .format("console")
2411   .trigger(Trigger.Continuous("1 second"))
2412   .start();
2413 
2414 {% endhighlight %}
2415 
2416 </div>
2417 <div data-lang="python"  markdown="1">
2418 
2419 {% highlight python %}
2420 
2421 # Default trigger (runs micro-batch as soon as it can)
2422 df.writeStream \
2423   .format("console") \
2424   .start()
2425 
2426 # ProcessingTime trigger with two-seconds micro-batch interval
2427 df.writeStream \
2428   .format("console") \
2429   .trigger(processingTime='2 seconds') \
2430   .start()
2431 
2432 # One-time trigger
2433 df.writeStream \
2434   .format("console") \
2435   .trigger(once=True) \
2436   .start()
2437 
2438 # Continuous trigger with one-second checkpointing interval
2439 df.writeStream
2440   .format("console")
2441   .trigger(continuous='1 second')
2442   .start()
2443 
2444 {% endhighlight %}
2445 </div>
2446 <div data-lang="r"  markdown="1">
2447 
2448 {% highlight r %}
2449 # Default trigger (runs micro-batch as soon as it can)
2450 write.stream(df, "console")
2451 
2452 # ProcessingTime trigger with two-seconds micro-batch interval
2453 write.stream(df, "console", trigger.processingTime = "2 seconds")
2454 
2455 # One-time trigger
2456 write.stream(df, "console", trigger.once = TRUE)
2457 
2458 # Continuous trigger is not yet supported
2459 {% endhighlight %}
2460 </div>
2461 </div>
2462 
2463 
2464 ## Managing Streaming Queries
2465 The `StreamingQuery` object created when a query is started can be used to monitor and manage the query. 
2466 
2467 <div class="codetabs">
2468 <div data-lang="scala"  markdown="1">
2469 
2470 {% highlight scala %}
2471 val query = df.writeStream.format("console").start()   // get the query object
2472 
2473 query.id          // get the unique identifier of the running query that persists across restarts from checkpoint data
2474 
2475 query.runId       // get the unique id of this run of the query, which will be generated at every start/restart
2476 
2477 query.name        // get the name of the auto-generated or user-specified name
2478 
2479 query.explain()   // print detailed explanations of the query
2480 
2481 query.stop()      // stop the query
2482 
2483 query.awaitTermination()   // block until query is terminated, with stop() or with error
2484 
2485 query.exception       // the exception if the query has been terminated with error
2486 
2487 query.recentProgress  // an array of the most recent progress updates for this query
2488 
2489 query.lastProgress    // the most recent progress update of this streaming query
2490 {% endhighlight %}
2491 
2492 
2493 </div>
2494 <div data-lang="java"  markdown="1">
2495 
2496 {% highlight java %}
2497 StreamingQuery query = df.writeStream().format("console").start();   // get the query object
2498 
2499 query.id();          // get the unique identifier of the running query that persists across restarts from checkpoint data
2500 
2501 query.runId();       // get the unique id of this run of the query, which will be generated at every start/restart
2502 
2503 query.name();        // get the name of the auto-generated or user-specified name
2504 
2505 query.explain();   // print detailed explanations of the query
2506 
2507 query.stop();      // stop the query
2508 
2509 query.awaitTermination();   // block until query is terminated, with stop() or with error
2510 
2511 query.exception();       // the exception if the query has been terminated with error
2512 
2513 query.recentProgress();  // an array of the most recent progress updates for this query
2514 
2515 query.lastProgress();    // the most recent progress update of this streaming query
2516 
2517 {% endhighlight %}
2518 
2519 </div>
2520 <div data-lang="python"  markdown="1">
2521 
2522 {% highlight python %}
2523 query = df.writeStream.format("console").start()   # get the query object
2524 
2525 query.id()          # get the unique identifier of the running query that persists across restarts from checkpoint data
2526 
2527 query.runId()       # get the unique id of this run of the query, which will be generated at every start/restart
2528 
2529 query.name()        # get the name of the auto-generated or user-specified name
2530 
2531 query.explain()   # print detailed explanations of the query
2532 
2533 query.stop()      # stop the query
2534 
2535 query.awaitTermination()   # block until query is terminated, with stop() or with error
2536 
2537 query.exception()       # the exception if the query has been terminated with error
2538 
2539 query.recentProgress()  # an array of the most recent progress updates for this query
2540 
2541 query.lastProgress()    # the most recent progress update of this streaming query
2542 
2543 {% endhighlight %}
2544 
2545 </div>
2546 <div data-lang="r"  markdown="1">
2547 
2548 {% highlight r %}
2549 query <- write.stream(df, "console")  # get the query object
2550 
2551 queryName(query)          # get the name of the auto-generated or user-specified name
2552 
2553 explain(query)            # print detailed explanations of the query
2554 
2555 stopQuery(query)          # stop the query
2556 
2557 awaitTermination(query)   # block until query is terminated, with stop() or with error
2558 
2559 lastProgress(query)       # the most recent progress update of this streaming query
2560 
2561 {% endhighlight %}
2562 
2563 </div>
2564 </div>
2565 
2566 You can start any number of queries in a single SparkSession. They will all be running concurrently sharing the cluster resources. You can use `sparkSession.streams()` to get the `StreamingQueryManager`
2567 ([Scala](api/scala/org/apache/spark/sql/streaming/StreamingQueryManager.html)/[Java](api/java/org/apache/spark/sql/streaming/StreamingQueryManager.html)/[Python](api/python/pyspark.sql.html#pyspark.sql.streaming.StreamingQueryManager) docs)
2568 that can be used to manage the currently active queries.
2569 
2570 <div class="codetabs">
2571 <div data-lang="scala"  markdown="1">
2572 
2573 {% highlight scala %}
2574 val spark: SparkSession = ...
2575 
2576 spark.streams.active    // get the list of currently active streaming queries
2577 
2578 spark.streams.get(id)   // get a query object by its unique id
2579 
2580 spark.streams.awaitAnyTermination()   // block until any one of them terminates
2581 {% endhighlight %}
2582 
2583 </div>
2584 <div data-lang="java"  markdown="1">
2585 
2586 {% highlight java %}
2587 SparkSession spark = ...
2588 
2589 spark.streams().active();    // get the list of currently active streaming queries
2590 
2591 spark.streams().get(id);   // get a query object by its unique id
2592 
2593 spark.streams().awaitAnyTermination();   // block until any one of them terminates
2594 {% endhighlight %}
2595 
2596 </div>
2597 <div data-lang="python"  markdown="1">
2598 
2599 {% highlight python %}
2600 spark = ...  # spark session
2601 
2602 spark.streams.active  # get the list of currently active streaming queries
2603 
2604 spark.streams.get(id)  # get a query object by its unique id
2605 
2606 spark.streams.awaitAnyTermination()  # block until any one of them terminates
2607 {% endhighlight %}
2608 
2609 </div>
2610 <div data-lang="r"  markdown="1">
2611 {% highlight bash %}
2612 Not available in R.
2613 {% endhighlight %}
2614 
2615 </div>
2616 </div>
2617 
2618 
2619 ## Monitoring Streaming Queries
2620 There are multiple ways to monitor active streaming queries. You can either push metrics to external systems using Spark's Dropwizard Metrics support, or access them programmatically.
2621 
2622 ### Reading Metrics Interactively
2623 
2624 You can directly get the current status and metrics of an active query using 
2625 `streamingQuery.lastProgress()` and `streamingQuery.status()`. 
2626 `lastProgress()` returns a `StreamingQueryProgress` object 
2627 in [Scala](api/scala/org/apache/spark/sql/streaming/StreamingQueryProgress.html) 
2628 and [Java](api/java/org/apache/spark/sql/streaming/StreamingQueryProgress.html)
2629 and a dictionary with the same fields in Python. It has all the information about
2630 the progress made in the last trigger of the stream - what data was processed, 
2631 what were the processing rates, latencies, etc. There is also 
2632 `streamingQuery.recentProgress` which returns an array of last few progresses.  
2633 
2634 In addition, `streamingQuery.status()` returns a `StreamingQueryStatus` object 
2635 in [Scala](api/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.html) 
2636 and [Java](api/java/org/apache/spark/sql/streaming/StreamingQueryStatus.html)
2637 and a dictionary with the same fields in Python. It gives information about
2638 what the query is immediately doing - is a trigger active, is data being processed, etc.
2639 
2640 Here are a few examples.
2641 
2642 <div class="codetabs">
2643 <div data-lang="scala"  markdown="1">
2644 
2645 {% highlight scala %}
2646 val query: StreamingQuery = ...
2647 
2648 println(query.lastProgress)
2649 
2650 /* Will print something like the following.
2651 
2652 {
2653   "id" : "ce011fdc-8762-4dcb-84eb-a77333e28109",
2654   "runId" : "88e2ff94-ede0-45a8-b687-6316fbef529a",
2655   "name" : "MyQuery",
2656   "timestamp" : "2016-12-14T18:45:24.873Z",
2657   "numInputRows" : 10,
2658   "inputRowsPerSecond" : 120.0,
2659   "processedRowsPerSecond" : 200.0,
2660   "durationMs" : {
2661     "triggerExecution" : 3,
2662     "getOffset" : 2
2663   },
2664   "eventTime" : {
2665     "watermark" : "2016-12-14T18:45:24.873Z"
2666   },
2667   "stateOperators" : [ ],
2668   "sources" : [ {
2669     "description" : "KafkaSource[Subscribe[topic-0]]",
2670     "startOffset" : {
2671       "topic-0" : {
2672         "2" : 0,
2673         "4" : 1,
2674         "1" : 1,
2675         "3" : 1,
2676         "0" : 1
2677       }
2678     },
2679     "endOffset" : {
2680       "topic-0" : {
2681         "2" : 0,
2682         "4" : 115,
2683         "1" : 134,
2684         "3" : 21,
2685         "0" : 534
2686       }
2687     },
2688     "numInputRows" : 10,
2689     "inputRowsPerSecond" : 120.0,
2690     "processedRowsPerSecond" : 200.0
2691   } ],
2692   "sink" : {
2693     "description" : "MemorySink"
2694   }
2695 }
2696 */
2697 
2698 
2699 println(query.status)
2700 
2701 /*  Will print something like the following.
2702 {
2703   "message" : "Waiting for data to arrive",
2704   "isDataAvailable" : false,
2705   "isTriggerActive" : false
2706 }
2707 */
2708 {% endhighlight %}
2709 
2710 </div>
2711 <div data-lang="java"  markdown="1">
2712 
2713 {% highlight java %}
2714 StreamingQuery query = ...
2715 
2716 System.out.println(query.lastProgress());
2717 /* Will print something like the following.
2718 
2719 {
2720   "id" : "ce011fdc-8762-4dcb-84eb-a77333e28109",
2721   "runId" : "88e2ff94-ede0-45a8-b687-6316fbef529a",
2722   "name" : "MyQuery",
2723   "timestamp" : "2016-12-14T18:45:24.873Z",
2724   "numInputRows" : 10,
2725   "inputRowsPerSecond" : 120.0,
2726   "processedRowsPerSecond" : 200.0,
2727   "durationMs" : {
2728     "triggerExecution" : 3,
2729     "getOffset" : 2
2730   },
2731   "eventTime" : {
2732     "watermark" : "2016-12-14T18:45:24.873Z"
2733   },
2734   "stateOperators" : [ ],
2735   "sources" : [ {
2736     "description" : "KafkaSource[Subscribe[topic-0]]",
2737     "startOffset" : {
2738       "topic-0" : {
2739         "2" : 0,
2740         "4" : 1,
2741         "1" : 1,
2742         "3" : 1,
2743         "0" : 1
2744       }
2745     },
2746     "endOffset" : {
2747       "topic-0" : {
2748         "2" : 0,
2749         "4" : 115,
2750         "1" : 134,
2751         "3" : 21,
2752         "0" : 534
2753       }
2754     },
2755     "numInputRows" : 10,
2756     "inputRowsPerSecond" : 120.0,
2757     "processedRowsPerSecond" : 200.0
2758   } ],
2759   "sink" : {
2760     "description" : "MemorySink"
2761   }
2762 }
2763 */
2764 
2765 
2766 System.out.println(query.status());
2767 /*  Will print something like the following.
2768 {
2769   "message" : "Waiting for data to arrive",
2770   "isDataAvailable" : false,
2771   "isTriggerActive" : false
2772 }
2773 */
2774 {% endhighlight %}
2775 
2776 </div>
2777 <div data-lang="python"  markdown="1">
2778 
2779 {% highlight python %}
2780 query = ...  # a StreamingQuery
2781 print(query.lastProgress)
2782 
2783 '''
2784 Will print something like the following.
2785 
2786 {u'stateOperators': [], u'eventTime': {u'watermark': u'2016-12-14T18:45:24.873Z'}, u'name': u'MyQuery', u'timestamp': u'2016-12-14T18:45:24.873Z', u'processedRowsPerSecond': 200.0, u'inputRowsPerSecond': 120.0, u'numInputRows': 10, u'sources': [{u'description': u'KafkaSource[Subscribe[topic-0]]', u'endOffset': {u'topic-0': {u'1': 134, u'0': 534, u'3': 21, u'2': 0, u'4': 115}}, u'processedRowsPerSecond': 200.0, u'inputRowsPerSecond': 120.0, u'numInputRows': 10, u'startOffset': {u'topic-0': {u'1': 1, u'0': 1, u'3': 1, u'2': 0, u'4': 1}}}], u'durationMs': {u'getOffset': 2, u'triggerExecution': 3}, u'runId': u'88e2ff94-ede0-45a8-b687-6316fbef529a', u'id': u'ce011fdc-8762-4dcb-84eb-a77333e28109', u'sink': {u'description': u'MemorySink'}}
2787 '''
2788 
2789 print(query.status)
2790 ''' 
2791 Will print something like the following.
2792 
2793 {u'message': u'Waiting for data to arrive', u'isTriggerActive': False, u'isDataAvailable': False}
2794 '''
2795 {% endhighlight %}
2796 
2797 </div>
2798 <div data-lang="r"  markdown="1">
2799 
2800 {% highlight r %}
2801 query <- ...  # a StreamingQuery
2802 lastProgress(query)
2803 
2804 '''
2805 Will print something like the following.
2806 
2807 {
2808   "id" : "8c57e1ec-94b5-4c99-b100-f694162df0b9",
2809   "runId" : "ae505c5a-a64e-4896-8c28-c7cbaf926f16",
2810   "name" : null,
2811   "timestamp" : "2017-04-26T08:27:28.835Z",
2812   "numInputRows" : 0,
2813   "inputRowsPerSecond" : 0.0,
2814   "processedRowsPerSecond" : 0.0,
2815   "durationMs" : {
2816     "getOffset" : 0,
2817     "triggerExecution" : 1
2818   },
2819   "stateOperators" : [ {
2820     "numRowsTotal" : 4,
2821     "numRowsUpdated" : 0
2822   } ],
2823   "sources" : [ {
2824     "description" : "TextSocketSource[host: localhost, port: 9999]",
2825     "startOffset" : 1,
2826     "endOffset" : 1,
2827     "numInputRows" : 0,
2828     "inputRowsPerSecond" : 0.0,
2829     "processedRowsPerSecond" : 0.0
2830   } ],
2831   "sink" : {
2832     "description" : "org.apache.spark.sql.execution.streaming.ConsoleSink@76b37531"
2833   }
2834 }
2835 '''
2836 
2837 status(query)
2838 '''
2839 Will print something like the following.
2840 
2841 {
2842   "message" : "Waiting for data to arrive",
2843   "isDataAvailable" : false,
2844   "isTriggerActive" : false
2845 }
2846 '''
2847 {% endhighlight %}
2848 
2849 </div>
2850 </div>
2851 
2852 ### Reporting Metrics programmatically using Asynchronous APIs
2853 
2854 You can also asynchronously monitor all queries associated with a
2855 `SparkSession` by attaching a `StreamingQueryListener`
2856 ([Scala](api/scala/org/apache/spark/sql/streaming/StreamingQueryListener.html)/[Java](api/java/org/apache/spark/sql/streaming/StreamingQueryListener.html) docs).
2857 Once you attach your custom `StreamingQueryListener` object with
2858 `sparkSession.streams.attachListener()`, you will get callbacks when a query is started and
2859 stopped and when there is progress made in an active query. Here is an example,
2860 
2861 <div class="codetabs">
2862 <div data-lang="scala"  markdown="1">
2863 
2864 {% highlight scala %}
2865 val spark: SparkSession = ...
2866 
2867 spark.streams.addListener(new StreamingQueryListener() {
2868     override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
2869         println("Query started: " + queryStarted.id)
2870     }
2871     override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = {
2872         println("Query terminated: " + queryTerminated.id)
2873     }
2874     override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
2875         println("Query made progress: " + queryProgress.progress)
2876     }
2877 })
2878 {% endhighlight %}
2879 
2880 </div>
2881 <div data-lang="java"  markdown="1">
2882 
2883 {% highlight java %}
2884 SparkSession spark = ...
2885 
2886 spark.streams().addListener(new StreamingQueryListener() {
2887     @Override
2888     public void onQueryStarted(QueryStartedEvent queryStarted) {
2889         System.out.println("Query started: " + queryStarted.id());
2890     }
2891     @Override
2892     public void onQueryTerminated(QueryTerminatedEvent queryTerminated) {
2893         System.out.println("Query terminated: " + queryTerminated.id());
2894     }
2895     @Override
2896     public void onQueryProgress(QueryProgressEvent queryProgress) {
2897         System.out.println("Query made progress: " + queryProgress.progress());
2898     }
2899 });
2900 {% endhighlight %}
2901 
2902 </div>
2903 <div data-lang="python"  markdown="1">
2904 {% highlight bash %}
2905 Not available in Python.
2906 {% endhighlight %}
2907 
2908 </div>
2909 <div data-lang="r"  markdown="1">
2910 {% highlight bash %}
2911 Not available in R.
2912 {% endhighlight %}
2913 
2914 </div>
2915 </div>
2916 
2917 ### Reporting Metrics using Dropwizard 
2918 Spark supports reporting metrics using the [Dropwizard Library](monitoring.html#metrics). To enable metrics of Structured Streaming queries to be reported as well, you have to explicitly enable the configuration `spark.sql.streaming.metricsEnabled` in the SparkSession. 
2919 
2920 <div class="codetabs">
2921 <div data-lang="scala"  markdown="1">
2922 {% highlight scala %}
2923 spark.conf.set("spark.sql.streaming.metricsEnabled", "true")
2924 // or
2925 spark.sql("SET spark.sql.streaming.metricsEnabled=true")
2926 {% endhighlight %}
2927 </div>
2928 <div data-lang="java"  markdown="1">  
2929 {% highlight java %}
2930 spark.conf().set("spark.sql.streaming.metricsEnabled", "true");
2931 // or
2932 spark.sql("SET spark.sql.streaming.metricsEnabled=true");
2933 {% endhighlight %}
2934 </div>
2935 <div data-lang="python"  markdown="1">  
2936 {% highlight python %}
2937 spark.conf.set("spark.sql.streaming.metricsEnabled", "true")
2938 # or
2939 spark.sql("SET spark.sql.streaming.metricsEnabled=true")
2940 {% endhighlight %}
2941 </div>
2942 <div data-lang="r"  markdown="1">
2943 {% highlight r %}
2944 sql("SET spark.sql.streaming.metricsEnabled=true")
2945 {% endhighlight %}
2946 </div>
2947 </div>
2948 
2949 
2950 All queries started in the SparkSession after this configuration has been enabled will report metrics through Dropwizard to whatever [sinks](monitoring.html#metrics) have been configured (e.g. Ganglia, Graphite, JMX, etc.).
2951 
2952 ## Recovering from Failures with Checkpointing 
2953 In case of a failure or intentional shutdown, you can recover the previous progress and state of a previous query, and continue where it left off. This is done using checkpointing and write-ahead logs. You can configure a query with a checkpoint location, and the query will save all the progress information (i.e. range of offsets processed in each trigger) and the running aggregates (e.g. word counts in the [quick example](#quick-example)) to the checkpoint location. This checkpoint location has to be a path in an HDFS compatible file system, and can be set as an option in the DataStreamWriter when [starting a query](#starting-streaming-queries).
2954 
2955 <div class="codetabs">
2956 <div data-lang="scala"  markdown="1">
2957 
2958 {% highlight scala %}
2959 aggDF
2960   .writeStream
2961   .outputMode("complete")
2962   .option("checkpointLocation", "path/to/HDFS/dir")
2963   .format("memory")
2964   .start()
2965 {% endhighlight %}
2966 
2967 </div>
2968 <div data-lang="java"  markdown="1">
2969 
2970 {% highlight java %}
2971 aggDF
2972   .writeStream()
2973   .outputMode("complete")
2974   .option("checkpointLocation", "path/to/HDFS/dir")
2975   .format("memory")
2976   .start();
2977 {% endhighlight %}
2978 
2979 </div>
2980 <div data-lang="python"  markdown="1">
2981 
2982 {% highlight python %}
2983 aggDF \
2984     .writeStream \
2985     .outputMode("complete") \
2986     .option("checkpointLocation", "path/to/HDFS/dir") \
2987     .format("memory") \
2988     .start()
2989 {% endhighlight %}
2990 
2991 </div>
2992 <div data-lang="r"  markdown="1">
2993 
2994 {% highlight r %}
2995 write.stream(aggDF, "memory", outputMode = "complete", checkpointLocation = "path/to/HDFS/dir")
2996 {% endhighlight %}
2997 
2998 </div>
2999 </div>
3000 
3001 
3002 ## Recovery Semantics after Changes in a Streaming Query
3003 There are limitations on what changes in a streaming query are allowed between restarts from the 
3004 same checkpoint location. Here are a few kinds of changes that are either not allowed, or 
3005 the effect of the change is not well-defined. For all of them:
3006 
3007 - The term *allowed* means you can do the specified change but whether the semantics of its effect 
3008   is well-defined depends on the query and the change.
3009 
3010 - The term *not allowed* means you should not do the specified change as the restarted query is likely 
3011   to fail with unpredictable errors. `sdf` represents a streaming DataFrame/Dataset 
3012   generated with sparkSession.readStream.
3013   
3014 **Types of changes**
3015 
3016 - *Changes in the number or type (i.e. different source) of input sources*: This is not allowed.
3017 
3018 - *Changes in the parameters of input sources*: Whether this is allowed and whether the semantics 
3019   of the change are well-defined depends on the source and the query. Here are a few examples.
3020 
3021   - Addition/deletion/modification of rate limits is allowed: `spark.readStream.format("kafka").option("subscribe", "topic")` to `spark.readStream.format("kafka").option("subscribe", "topic").option("maxOffsetsPerTrigger", ...)`
3022 
3023   - Changes to subscribed topics/files are generally not allowed as the results are unpredictable: `spark.readStream.format("kafka").option("subscribe", "topic")` to `spark.readStream.format("kafka").option("subscribe", "newTopic")`
3024 
3025 - *Changes in the type of output sink*: Changes between a few specific combinations of sinks 
3026   are allowed. This needs to be verified on a case-by-case basis. Here are a few examples.
3027 
3028   - File sink to Kafka sink is allowed. Kafka will see only the new data.
3029 
3030   - Kafka sink to file sink is not allowed.
3031 
3032   - Kafka sink changed to foreach, or vice versa is allowed.
3033 
3034 - *Changes in the parameters of output sink*: Whether this is allowed and whether the semantics of 
3035   the change are well-defined depends on the sink and the query. Here are a few examples.
3036 
3037   - Changes to output directory of a file sink are not allowed: `sdf.writeStream.format("parquet").option("path", "/somePath")` to `sdf.writeStream.format("parquet").option("path", "/anotherPath")`
3038 
3039   - Changes to output topic are allowed: `sdf.writeStream.format("kafka").option("topic", "someTopic")` to `sdf.writeStream.format("kafka").option("topic", "anotherTopic")`
3040 
3041   - Changes to the user-defined foreach sink (that is, the `ForeachWriter` code) are allowed, but the semantics of the change depends on the code.
3042 
3043 - *Changes in projection / filter / map-like operations*: Some cases are allowed. For example:
3044 
3045   - Addition / deletion of filters is allowed: `sdf.selectExpr("a")` to `sdf.where(...).selectExpr("a").filter(...)`.
3046 
3047   - Changes in projections with same output schema are allowed: `sdf.selectExpr("stringColumn AS json").writeStream` to `sdf.selectExpr("anotherStringColumn AS json").writeStream`
3048 
3049   - Changes in projections with different output schema are conditionally allowed: `sdf.selectExpr("a").writeStream` to `sdf.selectExpr("b").writeStream` is allowed only if the output sink allows the schema change from `"a"` to `"b"`.
3050 
3051 - *Changes in stateful operations*: Some operations in streaming queries need to maintain
3052   state data in order to continuously update the result. Structured Streaming automatically checkpoints
3053   the state data to fault-tolerant storage (for example, HDFS, AWS S3, Azure Blob storage) and restores it after restart.
3054   However, this assumes that the schema of the state data remains same across restarts. This means that
3055   *any changes (that is, additions, deletions, or schema modifications) to the stateful operations of a streaming query are not allowed between restarts*.
3056   Here is the list of stateful operations whose schema should not be changed between restarts in order to ensure state recovery:
3057 
3058   - *Streaming aggregation*: For example, `sdf.groupBy("a").agg(...)`. Any change in number or type of grouping keys or aggregates is not allowed.
3059 
3060   - *Streaming deduplication*: For example, `sdf.dropDuplicates("a")`. Any change in number or type of grouping keys or aggregates is not allowed.
3061 
3062   - *Stream-stream join*: For example, `sdf1.join(sdf2, ...)` (i.e. both inputs are generated with `sparkSession.readStream`). Changes
3063     in the schema or equi-joining columns are not allowed. Changes in join type (outer or inner) are not allowed. Other changes in the join condition are ill-defined.
3064 
3065   - *Arbitrary stateful operation*: For example, `sdf.groupByKey(...).mapGroupsWithState(...)` or `sdf.groupByKey(...).flatMapGroupsWithState(...)`.
3066     Any change to the schema of the user-defined state and the type of timeout is not allowed.
3067     Any change within the user-defined state-mapping function are allowed, but the semantic effect of the change depends on the user-defined logic.
3068     If you really want to support state schema changes, then you can explicitly encode/decode your complex state data
3069     structures into bytes using an encoding/decoding scheme that supports schema migration. For example,
3070     if you save your state as Avro-encoded bytes, then you are free to change the Avro-state-schema between query
3071     restarts as the binary state will always be restored successfully.
3072 
3073 # Continuous Processing
3074 ## [Experimental]
3075 {:.no_toc}
3076 
3077 **Continuous processing** is a new, experimental streaming execution mode introduced in Spark 2.3 that enables low (~1 ms) end-to-end latency with at-least-once fault-tolerance guarantees. Compare this with the default *micro-batch processing* engine which can achieve exactly-once guarantees but achieve latencies of ~100ms at best. For some types of queries (discussed below), you can choose which mode to execute them in without modifying the application logic (i.e. without changing the DataFrame/Dataset operations). 
3078 
3079 To run a supported query in continuous processing mode, all you need to do is specify a **continuous trigger** with the desired checkpoint interval as a parameter. For example, 
3080 
3081 <div class="codetabs">
3082 <div data-lang="scala"  markdown="1">
3083 {% highlight scala %}
3084 import org.apache.spark.sql.streaming.Trigger
3085 
3086 spark
3087   .readStream
3088   .format("kafka")
3089   .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
3090   .option("subscribe", "topic1")
3091   .load()
3092   .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
3093   .writeStream
3094   .format("kafka")
3095   .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
3096   .option("topic", "topic1")
3097   .trigger(Trigger.Continuous("1 second"))  // only change in query
3098   .start()
3099 {% endhighlight %}
3100 </div>
3101 <div data-lang="java"  markdown="1">  
3102 {% highlight java %}
3103 import org.apache.spark.sql.streaming.Trigger;
3104 
3105 spark
3106   .readStream
3107   .format("kafka")
3108   .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
3109   .option("subscribe", "topic1")
3110   .load()
3111   .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
3112   .writeStream
3113   .format("kafka")
3114   .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
3115   .option("topic", "topic1")
3116   .trigger(Trigger.Continuous("1 second"))  // only change in query
3117   .start();
3118 {% endhighlight %}
3119 </div>
3120 <div data-lang="python"  markdown="1">  
3121 {% highlight python %}
3122 spark \
3123   .readStream \
3124   .format("kafka") \
3125   .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
3126   .option("subscribe", "topic1") \
3127   .load() \
3128   .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
3129   .writeStream \
3130   .format("kafka") \
3131   .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
3132   .option("topic", "topic1") \
3133   .trigger(continuous="1 second") \     # only change in query
3134   .start()
3135 
3136 {% endhighlight %}
3137 </div>
3138 </div>
3139 
3140 A checkpoint interval of 1 second means that the continuous processing engine will record the progress of the query every second. The resulting checkpoints are in a format compatible with the micro-batch engine, hence any query can be restarted with any trigger. For example, a supported query started with the micro-batch mode can be restarted in continuous mode, and vice versa. Note that any time you switch to continuous mode, you will get at-least-once fault-tolerance guarantees.
3141 
3142 ## Supported Queries
3143 {:.no_toc}
3144 
3145 As of Spark 2.4, only the following type of queries are supported in the continuous processing mode.
3146 
3147 - *Operations*: Only map-like Dataset/DataFrame operations are supported in continuous mode, that is, only projections (`select`, `map`, `flatMap`, `mapPartitions`, etc.) and selections (`where`, `filter`, etc.).
3148   + All SQL functions are supported except aggregation functions (since aggregations are not yet supported), `current_timestamp()` and `current_date()` (deterministic computations using time is challenging).
3149 
3150 - *Sources*:
3151   + Kafka source: All options are supported.
3152   + Rate source: Good for testing. Only options that are supported in the continuous mode are `numPartitions` and `rowsPerSecond`.
3153 
3154 - *Sinks*: 
3155   + Kafka sink: All options are supported.
3156   + Memory sink: Good for debugging.
3157   + Console sink: Good for debugging. All options are supported. Note that the console will print every checkpoint interval that you have specified in the continuous trigger. 
3158 
3159 See [Input Sources](#input-sources) and [Output Sinks](#output-sinks) sections for more details on them. While the console sink is good for testing, the end-to-end low-latency processing can be best observed with Kafka as the source and sink, as this allows the engine to process the data and make the results available in the output topic within milliseconds of the input data being available in the input topic.
3160 
3161 ## Caveats
3162 {:.no_toc}
3163 
3164 - Continuous processing engine launches multiple long-running tasks that continuously read data from sources, process it and continuously write to sinks. The number of tasks required by the query depends on how many partitions the query can read from the sources in parallel. Therefore, before starting a continuous processing query, you must ensure there are enough cores in the cluster to all the tasks in parallel. For example, if you are reading from a Kafka topic that has 10 partitions, then the cluster must have at least 10 cores for the query to make progress.
3165 - Stopping a continuous processing stream may produce spurious task termination warnings. These can be safely ignored.
3166 - There are currently no automatic retries of failed tasks. Any failure will lead to the query being stopped and it needs to be manually restarted from the checkpoint.
3167 
3168 # Additional Information
3169 
3170 **Notes**
3171 
3172 - Several configurations are not modifiable after the query has run. To change them, discard the checkpoint and start a new query. These configurations include:
3173   - `spark.sql.shuffle.partitions`
3174     - This is due to the physical partitioning of state: state is partitioned via applying hash function to key, hence the number of partitions for state should be unchanged.
3175     - If you want to run fewer tasks for stateful operations, `coalesce` would help with avoiding unnecessary repartitioning.
3176       - After `coalesce`, the number of (reduced) tasks will be kept unless another shuffle happens.
3177   - `spark.sql.streaming.stateStore.providerClass`: To read the previous state of the query properly, the class of state store provider should be unchanged.
3178   - `spark.sql.streaming.multipleWatermarkPolicy`: Modification of this would lead inconsistent watermark value when query contains multiple watermarks, hence the policy should be unchanged.
3179 
3180 **Further Reading**
3181 
3182 - See and run the
3183   [Scala]({{site.SPARK_GITHUB_URL}}/tree/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/scala/org/apache/spark/examples/sql/streaming)/[Java]({{site.SPARK_GITHUB_URL}}/tree/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/java/org/apache/spark/examples/sql/streaming)/[Python]({{site.SPARK_GITHUB_URL}}/tree/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/python/sql/streaming)/[R]({{site.SPARK_GITHUB_URL}}/tree/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/r/streaming)
3184   examples.
3185     - [Instructions](index.html#running-the-examples-and-shell) on how to run Spark examples
3186 - Read about integrating with Kafka in the [Structured Streaming Kafka Integration Guide](structured-streaming-kafka-integration.html)
3187 - Read more details about using DataFrames/Datasets in the [Spark SQL Programming Guide](sql-programming-guide.html)
3188 - Third-party Blog Posts
3189     - [Real-time Streaming ETL with Structured Streaming in Apache Spark 2.1 (Databricks Blog)](https://databricks.com/blog/2017/01/19/real-time-streaming-etl-structured-streaming-apache-spark-2-1.html)
3190     - [Real-Time End-to-End Integration with Apache Kafka in Apache Spark’s Structured Streaming (Databricks Blog)](https://databricks.com/blog/2017/04/04/real-time-end-to-end-integration-with-apache-kafka-in-apache-sparks-structured-streaming.html)
3191     - [Event-time Aggregation and Watermarking in Apache Spark’s Structured Streaming (Databricks Blog)](https://databricks.com/blog/2017/05/08/event-time-aggregation-watermarking-apache-sparks-structured-streaming.html)
3192 
3193 **Talks**
3194 
3195 - Spark Summit Europe 2017
3196   - Easy, Scalable, Fault-tolerant Stream Processing with Structured Streaming in Apache Spark -
3197     [Part 1 slides/video](https://databricks.com/session/easy-scalable-fault-tolerant-stream-processing-with-structured-streaming-in-apache-spark), [Part 2 slides/video](https://databricks.com/session/easy-scalable-fault-tolerant-stream-processing-with-structured-streaming-in-apache-spark-continues)
3198   - Deep Dive into Stateful Stream Processing in Structured Streaming - [slides/video](https://databricks.com/session/deep-dive-into-stateful-stream-processing-in-structured-streaming)
3199 - Spark Summit 2016
3200   - A Deep Dive into Structured Streaming - [slides/video](https://spark-summit.org/2016/events/a-deep-dive-into-structured-streaming/)
3201 
3202