0001 ---
0002 layout: global
0003 displayTitle: Spark Streaming Programming Guide
0004 title: Spark Streaming
0005 description: Spark Streaming programming guide and tutorial for Spark SPARK_VERSION_SHORT
0006 license: |
0007 Licensed to the Apache Software Foundation (ASF) under one or more
0008 contributor license agreements. See the NOTICE file distributed with
0009 this work for additional information regarding copyright ownership.
0010 The ASF licenses this file to You under the Apache License, Version 2.0
0011 (the "License"); you may not use this file except in compliance with
0012 the License. You may obtain a copy of the License at
0013
0014 http://www.apache.org/licenses/LICENSE-2.0
0015
0016 Unless required by applicable law or agreed to in writing, software
0017 distributed under the License is distributed on an "AS IS" BASIS,
0018 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0019 See the License for the specific language governing permissions and
0020 limitations under the License.
0021 ---
0022
0023 * This will become a table of contents (this text will be scraped).
0024 {:toc}
0025
0026 # Overview
0027 Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput,
0028 fault-tolerant stream processing of live data streams. Data can be ingested from many sources
0029 like Kafka, Kinesis, or TCP sockets, and can be processed using complex
0030 algorithms expressed with high-level functions like `map`, `reduce`, `join` and `window`.
0031 Finally, processed data can be pushed out to filesystems, databases,
0032 and live dashboards. In fact, you can apply Spark's
0033 [machine learning](ml-guide.html) and
0034 [graph processing](graphx-programming-guide.html) algorithms on data streams.
0035
0036 <p style="text-align: center;">
0037 <img
0038 src="img/streaming-arch.png"
0039 title="Spark Streaming architecture"
0040 alt="Spark Streaming"
0041 width="70%"
0042 />
0043 </p>
0044
0045 Internally, it works as follows. Spark Streaming receives live input data streams and divides
0046 the data into batches, which are then processed by the Spark engine to generate the final
0047 stream of results in batches.
0048
0049 <p style="text-align: center;">
0050 <img src="img/streaming-flow.png"
0051 title="Spark Streaming data flow"
0052 alt="Spark Streaming"
0053 width="70%" />
0054 </p>
0055
0056 Spark Streaming provides a high-level abstraction called *discretized stream* or *DStream*,
0057 which represents a continuous stream of data. DStreams can be created either from input data
0058 streams from sources such as Kafka, and Kinesis, or by applying high-level
0059 operations on other DStreams. Internally, a DStream is represented as a sequence of
0060 [RDDs](api/scala/org/apache/spark/rdd/RDD.html).
0061
0062 This guide shows you how to start writing Spark Streaming programs with DStreams. You can
0063 write Spark Streaming programs in Scala, Java or Python (introduced in Spark 1.2),
0064 all of which are presented in this guide.
0065 You will find tabs throughout this guide that let you choose between code snippets of
0066 different languages.
0067
0068 **Note:** There are a few APIs that are either different or not available in Python. Throughout this guide, you will find the tag <span class="badge" style="background-color: grey">Python API</span> highlighting these differences.
0069
0070 ***************************************************************************************************
0071
0072 # A Quick Example
0073 Before we go into the details of how to write your own Spark Streaming program,
0074 let's take a quick look at what a simple Spark Streaming program looks like. Let's say we want to
0075 count the number of words in text data received from a data server listening on a TCP
0076 socket. All you need to
0077 do is as follows.
0078
0079 <div class="codetabs">
0080 <div data-lang="scala" markdown="1" >
0081 First, we import the names of the Spark Streaming classes and some implicit
0082 conversions from StreamingContext into our environment in order to add useful methods to
0083 other classes we need (like DStream). [StreamingContext](api/scala/org/apache/spark/streaming/StreamingContext.html) is the
0084 main entry point for all streaming functionality. We create a local StreamingContext with two execution threads, and a batch interval of 1 second.
0085
0086 {% highlight scala %}
0087 import org.apache.spark._
0088 import org.apache.spark.streaming._
0089 import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
0090
0091 // Create a local StreamingContext with two working thread and batch interval of 1 second.
0092 // The master requires 2 cores to prevent a starvation scenario.
0093
0094 val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
0095 val ssc = new StreamingContext(conf, Seconds(1))
0096 {% endhighlight %}
0097
0098 Using this context, we can create a DStream that represents streaming data from a TCP
0099 source, specified as hostname (e.g. `localhost`) and port (e.g. `9999`).
0100
0101 {% highlight scala %}
0102 // Create a DStream that will connect to hostname:port, like localhost:9999
0103 val lines = ssc.socketTextStream("localhost", 9999)
0104 {% endhighlight %}
0105
0106 This `lines` DStream represents the stream of data that will be received from the data
0107 server. Each record in this DStream is a line of text. Next, we want to split the lines by
0108 space characters into words.
0109
0110 {% highlight scala %}
0111 // Split each line into words
0112 val words = lines.flatMap(_.split(" "))
0113 {% endhighlight %}
0114
0115 `flatMap` is a one-to-many DStream operation that creates a new DStream by
0116 generating multiple new records from each record in the source DStream. In this case,
0117 each line will be split into multiple words and the stream of words is represented as the
0118 `words` DStream. Next, we want to count these words.
0119
0120 {% highlight scala %}
0121 import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
0122 // Count each word in each batch
0123 val pairs = words.map(word => (word, 1))
0124 val wordCounts = pairs.reduceByKey(_ + _)
0125
0126 // Print the first ten elements of each RDD generated in this DStream to the console
0127 wordCounts.print()
0128 {% endhighlight %}
0129
0130 The `words` DStream is further mapped (one-to-one transformation) to a DStream of `(word,
0131 1)` pairs, which is then reduced to get the frequency of words in each batch of data.
0132 Finally, `wordCounts.print()` will print a few of the counts generated every second.
0133
0134 Note that when these lines are executed, Spark Streaming only sets up the computation it
0135 will perform when it is started, and no real processing has started yet. To start the processing
0136 after all the transformations have been setup, we finally call
0137
0138 {% highlight scala %}
0139 ssc.start() // Start the computation
0140 ssc.awaitTermination() // Wait for the computation to terminate
0141 {% endhighlight %}
0142
0143 The complete code can be found in the Spark Streaming example
0144 [NetworkWordCount]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/scala/org/apache/spark/examples/streaming/NetworkWordCount.scala).
0145 <br>
0146
0147 </div>
0148 <div data-lang="java" markdown="1">
0149
0150 First, we create a
0151 [JavaStreamingContext](api/java/index.html?org/apache/spark/streaming/api/java/JavaStreamingContext.html) object,
0152 which is the main entry point for all streaming
0153 functionality. We create a local StreamingContext with two execution threads, and a batch interval of 1 second.
0154
0155 {% highlight java %}
0156 import org.apache.spark.*;
0157 import org.apache.spark.api.java.function.*;
0158 import org.apache.spark.streaming.*;
0159 import org.apache.spark.streaming.api.java.*;
0160 import scala.Tuple2;
0161
0162 // Create a local StreamingContext with two working thread and batch interval of 1 second
0163 SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");
0164 JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
0165 {% endhighlight %}
0166
0167 Using this context, we can create a DStream that represents streaming data from a TCP
0168 source, specified as hostname (e.g. `localhost`) and port (e.g. `9999`).
0169
0170 {% highlight java %}
0171 // Create a DStream that will connect to hostname:port, like localhost:9999
0172 JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);
0173 {% endhighlight %}
0174
0175 This `lines` DStream represents the stream of data that will be received from the data
0176 server. Each record in this stream is a line of text. Then, we want to split the lines by
0177 space into words.
0178
0179 {% highlight java %}
0180 // Split each line into words
0181 JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(x.split(" ")).iterator());
0182 {% endhighlight %}
0183
0184 `flatMap` is a DStream operation that creates a new DStream by
0185 generating multiple new records from each record in the source DStream. In this case,
0186 each line will be split into multiple words and the stream of words is represented as the
0187 `words` DStream. Note that we defined the transformation using a
0188 [FlatMapFunction](api/scala/org/apache/spark/api/java/function/FlatMapFunction.html) object.
0189 As we will discover along the way, there are a number of such convenience classes in the Java API
0190 that help defines DStream transformations.
0191
0192 Next, we want to count these words.
0193
0194 {% highlight java %}
0195 // Count each word in each batch
0196 JavaPairDStream<String, Integer> pairs = words.mapToPair(s -> new Tuple2<>(s, 1));
0197 JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey((i1, i2) -> i1 + i2);
0198
0199 // Print the first ten elements of each RDD generated in this DStream to the console
0200 wordCounts.print();
0201 {% endhighlight %}
0202
0203 The `words` DStream is further mapped (one-to-one transformation) to a DStream of `(word,
0204 1)` pairs, using a [PairFunction](api/scala/org/apache/spark/api/java/function/PairFunction.html)
0205 object. Then, it is reduced to get the frequency of words in each batch of data,
0206 using a [Function2](api/scala/org/apache/spark/api/java/function/Function2.html) object.
0207 Finally, `wordCounts.print()` will print a few of the counts generated every second.
0208
0209 Note that when these lines are executed, Spark Streaming only sets up the computation it
0210 will perform after it is started, and no real processing has started yet. To start the processing
0211 after all the transformations have been setup, we finally call `start` method.
0212
0213 {% highlight java %}
0214 jssc.start(); // Start the computation
0215 jssc.awaitTermination(); // Wait for the computation to terminate
0216 {% endhighlight %}
0217
0218 The complete code can be found in the Spark Streaming example
0219 [JavaNetworkWordCount]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java).
0220 <br>
0221
0222 </div>
0223 <div data-lang="python" markdown="1" >
0224 First, we import [StreamingContext](api/python/pyspark.streaming.html#pyspark.streaming.StreamingContext), which is the main entry point for all streaming functionality. We create a local StreamingContext with two execution threads, and batch interval of 1 second.
0225
0226 {% highlight python %}
0227 from pyspark import SparkContext
0228 from pyspark.streaming import StreamingContext
0229
0230 # Create a local StreamingContext with two working thread and batch interval of 1 second
0231 sc = SparkContext("local[2]", "NetworkWordCount")
0232 ssc = StreamingContext(sc, 1)
0233 {% endhighlight %}
0234
0235 Using this context, we can create a DStream that represents streaming data from a TCP
0236 source, specified as hostname (e.g. `localhost`) and port (e.g. `9999`).
0237
0238 {% highlight python %}
0239 # Create a DStream that will connect to hostname:port, like localhost:9999
0240 lines = ssc.socketTextStream("localhost", 9999)
0241 {% endhighlight %}
0242
0243 This `lines` DStream represents the stream of data that will be received from the data
0244 server. Each record in this DStream is a line of text. Next, we want to split the lines by
0245 space into words.
0246
0247 {% highlight python %}
0248 # Split each line into words
0249 words = lines.flatMap(lambda line: line.split(" "))
0250 {% endhighlight %}
0251
0252 `flatMap` is a one-to-many DStream operation that creates a new DStream by
0253 generating multiple new records from each record in the source DStream. In this case,
0254 each line will be split into multiple words and the stream of words is represented as the
0255 `words` DStream. Next, we want to count these words.
0256
0257 {% highlight python %}
0258 # Count each word in each batch
0259 pairs = words.map(lambda word: (word, 1))
0260 wordCounts = pairs.reduceByKey(lambda x, y: x + y)
0261
0262 # Print the first ten elements of each RDD generated in this DStream to the console
0263 wordCounts.pprint()
0264 {% endhighlight %}
0265
0266 The `words` DStream is further mapped (one-to-one transformation) to a DStream of `(word,
0267 1)` pairs, which is then reduced to get the frequency of words in each batch of data.
0268 Finally, `wordCounts.pprint()` will print a few of the counts generated every second.
0269
0270 Note that when these lines are executed, Spark Streaming only sets up the computation it
0271 will perform when it is started, and no real processing has started yet. To start the processing
0272 after all the transformations have been setup, we finally call
0273
0274 {% highlight python %}
0275 ssc.start() # Start the computation
0276 ssc.awaitTermination() # Wait for the computation to terminate
0277 {% endhighlight %}
0278
0279 The complete code can be found in the Spark Streaming example
0280 [NetworkWordCount]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/python/streaming/network_wordcount.py).
0281 <br>
0282
0283 </div>
0284 </div>
0285
0286 If you have already [downloaded](index.html#downloading) and [built](index.html#building) Spark,
0287 you can run this example as follows. You will first need to run Netcat
0288 (a small utility found in most Unix-like systems) as a data server by using
0289
0290 {% highlight bash %}
0291 $ nc -lk 9999
0292 {% endhighlight %}
0293
0294 Then, in a different terminal, you can start the example by using
0295
0296 <div class="codetabs">
0297 <div data-lang="scala" markdown="1">
0298 {% highlight bash %}
0299 $ ./bin/run-example streaming.NetworkWordCount localhost 9999
0300 {% endhighlight %}
0301 </div>
0302 <div data-lang="java" markdown="1">
0303 {% highlight bash %}
0304 $ ./bin/run-example streaming.JavaNetworkWordCount localhost 9999
0305 {% endhighlight %}
0306 </div>
0307 <div data-lang="python" markdown="1">
0308 {% highlight bash %}
0309 $ ./bin/spark-submit examples/src/main/python/streaming/network_wordcount.py localhost 9999
0310 {% endhighlight %}
0311 </div>
0312 </div>
0313
0314
0315 Then, any lines typed in the terminal running the netcat server will be counted and printed on
0316 screen every second. It will look something like the following.
0317
0318 <table width="100%">
0319 <td>
0320 {% highlight bash %}
0321 # TERMINAL 1:
0322 # Running Netcat
0323
0324 $ nc -lk 9999
0325
0326 hello world
0327
0328
0329
0330 ...
0331 {% endhighlight %}
0332 </td>
0333 <td width="2%"></td>
0334 <td>
0335 <div class="codetabs">
0336
0337 <div data-lang="scala" markdown="1">
0338 {% highlight bash %}
0339 # TERMINAL 2: RUNNING NetworkWordCount
0340
0341 $ ./bin/run-example streaming.NetworkWordCount localhost 9999
0342 ...
0343 -------------------------------------------
0344 Time: 1357008430000 ms
0345 -------------------------------------------
0346 (hello,1)
0347 (world,1)
0348 ...
0349 {% endhighlight %}
0350 </div>
0351
0352 <div data-lang="java" markdown="1">
0353 {% highlight bash %}
0354 # TERMINAL 2: RUNNING JavaNetworkWordCount
0355
0356 $ ./bin/run-example streaming.JavaNetworkWordCount localhost 9999
0357 ...
0358 -------------------------------------------
0359 Time: 1357008430000 ms
0360 -------------------------------------------
0361 (hello,1)
0362 (world,1)
0363 ...
0364 {% endhighlight %}
0365 </div>
0366 <div data-lang="python" markdown="1">
0367 {% highlight bash %}
0368 # TERMINAL 2: RUNNING network_wordcount.py
0369
0370 $ ./bin/spark-submit examples/src/main/python/streaming/network_wordcount.py localhost 9999
0371 ...
0372 -------------------------------------------
0373 Time: 2014-10-14 15:25:21
0374 -------------------------------------------
0375 (hello,1)
0376 (world,1)
0377 ...
0378 {% endhighlight %}
0379 </div>
0380 </div>
0381 </td>
0382 </table>
0383
0384
0385 ***************************************************************************************************
0386 ***************************************************************************************************
0387
0388 # Basic Concepts
0389
0390 Next, we move beyond the simple example and elaborate on the basics of Spark Streaming.
0391
0392 ## Linking
0393
0394 Similar to Spark, Spark Streaming is available through Maven Central. To write your own Spark Streaming program, you will have to add the following dependency to your SBT or Maven project.
0395
0396 <div class="codetabs">
0397 <div data-lang="Maven" markdown="1">
0398
0399 <dependency>
0400 <groupId>org.apache.spark</groupId>
0401 <artifactId>spark-streaming_{{site.SCALA_BINARY_VERSION}}</artifactId>
0402 <version>{{site.SPARK_VERSION}}</version>
0403 <scope>provided</scope>
0404 </dependency>
0405 </div>
0406 <div data-lang="SBT" markdown="1">
0407
0408 libraryDependencies += "org.apache.spark" % "spark-streaming_{{site.SCALA_BINARY_VERSION}}" % "{{site.SPARK_VERSION}}" % "provided"
0409 </div>
0410 </div>
0411
0412 For ingesting data from sources like Kafka and Kinesis that are not present in the Spark
0413 Streaming core
0414 API, you will have to add the corresponding
0415 artifact `spark-streaming-xyz_{{site.SCALA_BINARY_VERSION}}` to the dependencies. For example,
0416 some of the common ones are as follows.
0417
0418 <table class="table">
0419 <tr><th>Source</th><th>Artifact</th></tr>
0420 <tr><td> Kafka </td><td> spark-streaming-kafka-0-10_{{site.SCALA_BINARY_VERSION}} </td></tr>
0421 <tr><td> Kinesis<br/></td><td>spark-streaming-kinesis-asl_{{site.SCALA_BINARY_VERSION}} [Amazon Software License] </td></tr>
0422 <tr><td></td><td></td></tr>
0423 </table>
0424
0425 For an up-to-date list, please refer to the
0426 [Maven repository](https://search.maven.org/#search%7Cga%7C1%7Cg%3A%22org.apache.spark%22%20AND%20v%3A%22{{site.SPARK_VERSION_SHORT}}%22)
0427 for the full list of supported sources and artifacts.
0428
0429 ***
0430
0431 ## Initializing StreamingContext
0432
0433 To initialize a Spark Streaming program, a **StreamingContext** object has to be created which is the main entry point of all Spark Streaming functionality.
0434
0435 <div class="codetabs">
0436 <div data-lang="scala" markdown="1">
0437
0438 A [StreamingContext](api/scala/org/apache/spark/streaming/StreamingContext.html) object can be created from a [SparkConf](api/scala/org/apache/spark/SparkConf.html) object.
0439
0440 {% highlight scala %}
0441 import org.apache.spark._
0442 import org.apache.spark.streaming._
0443
0444 val conf = new SparkConf().setAppName(appName).setMaster(master)
0445 val ssc = new StreamingContext(conf, Seconds(1))
0446 {% endhighlight %}
0447
0448 The `appName` parameter is a name for your application to show on the cluster UI.
0449 `master` is a [Spark, Mesos, Kubernetes or YARN cluster URL](submitting-applications.html#master-urls),
0450 or a special __"local[\*]"__ string to run in local mode. In practice, when running on a cluster,
0451 you will not want to hardcode `master` in the program,
0452 but rather [launch the application with `spark-submit`](submitting-applications.html) and
0453 receive it there. However, for local testing and unit tests, you can pass "local[\*]" to run Spark Streaming
0454 in-process (detects the number of cores in the local system). Note that this internally creates a [SparkContext](api/scala/org/apache/spark/SparkContext.html) (starting point of all Spark functionality) which can be accessed as `ssc.sparkContext`.
0455
0456 The batch interval must be set based on the latency requirements of your application
0457 and available cluster resources. See the [Performance Tuning](#setting-the-right-batch-interval)
0458 section for more details.
0459
0460 A `StreamingContext` object can also be created from an existing `SparkContext` object.
0461
0462 {% highlight scala %}
0463 import org.apache.spark.streaming._
0464
0465 val sc = ... // existing SparkContext
0466 val ssc = new StreamingContext(sc, Seconds(1))
0467 {% endhighlight %}
0468
0469
0470 </div>
0471 <div data-lang="java" markdown="1">
0472
0473 A [JavaStreamingContext](api/java/index.html?org/apache/spark/streaming/api/java/JavaStreamingContext.html) object can be created from a [SparkConf](api/java/index.html?org/apache/spark/SparkConf.html) object.
0474
0475 {% highlight java %}
0476 import org.apache.spark.*;
0477 import org.apache.spark.streaming.api.java.*;
0478
0479 SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
0480 JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(1000));
0481 {% endhighlight %}
0482
0483 The `appName` parameter is a name for your application to show on the cluster UI.
0484 `master` is a [Spark, Mesos or YARN cluster URL](submitting-applications.html#master-urls),
0485 or a special __"local[\*]"__ string to run in local mode. In practice, when running on a cluster,
0486 you will not want to hardcode `master` in the program,
0487 but rather [launch the application with `spark-submit`](submitting-applications.html) and
0488 receive it there. However, for local testing and unit tests, you can pass "local[*]" to run Spark Streaming
0489 in-process. Note that this internally creates a [JavaSparkContext](api/java/index.html?org/apache/spark/api/java/JavaSparkContext.html) (starting point of all Spark functionality) which can be accessed as `ssc.sparkContext`.
0490
0491 The batch interval must be set based on the latency requirements of your application
0492 and available cluster resources. See the [Performance Tuning](#setting-the-right-batch-interval)
0493 section for more details.
0494
0495 A `JavaStreamingContext` object can also be created from an existing `JavaSparkContext`.
0496
0497 {% highlight java %}
0498 import org.apache.spark.streaming.api.java.*;
0499
0500 JavaSparkContext sc = ... //existing JavaSparkContext
0501 JavaStreamingContext ssc = new JavaStreamingContext(sc, Durations.seconds(1));
0502 {% endhighlight %}
0503 </div>
0504 <div data-lang="python" markdown="1">
0505
0506 A [StreamingContext](api/python/pyspark.streaming.html#pyspark.streaming.StreamingContext) object can be created from a [SparkContext](api/python/pyspark.html#pyspark.SparkContext) object.
0507
0508 {% highlight python %}
0509 from pyspark import SparkContext
0510 from pyspark.streaming import StreamingContext
0511
0512 sc = SparkContext(master, appName)
0513 ssc = StreamingContext(sc, 1)
0514 {% endhighlight %}
0515
0516 The `appName` parameter is a name for your application to show on the cluster UI.
0517 `master` is a [Spark, Mesos or YARN cluster URL](submitting-applications.html#master-urls),
0518 or a special __"local[\*]"__ string to run in local mode. In practice, when running on a cluster,
0519 you will not want to hardcode `master` in the program,
0520 but rather [launch the application with `spark-submit`](submitting-applications.html) and
0521 receive it there. However, for local testing and unit tests, you can pass "local[\*]" to run Spark Streaming
0522 in-process (detects the number of cores in the local system).
0523
0524 The batch interval must be set based on the latency requirements of your application
0525 and available cluster resources. See the [Performance Tuning](#setting-the-right-batch-interval)
0526 section for more details.
0527 </div>
0528 </div>
0529
0530 After a context is defined, you have to do the following.
0531
0532 1. Define the input sources by creating input DStreams.
0533 1. Define the streaming computations by applying transformation and output operations to DStreams.
0534 1. Start receiving data and processing it using `streamingContext.start()`.
0535 1. Wait for the processing to be stopped (manually or due to any error) using `streamingContext.awaitTermination()`.
0536 1. The processing can be manually stopped using `streamingContext.stop()`.
0537
0538 ##### Points to remember:
0539 {:.no_toc}
0540 - Once a context has been started, no new streaming computations can be set up or added to it.
0541 - Once a context has been stopped, it cannot be restarted.
0542 - Only one StreamingContext can be active in a JVM at the same time.
0543 - stop() on StreamingContext also stops the SparkContext. To stop only the StreamingContext, set the optional parameter of `stop()` called `stopSparkContext` to false.
0544 - A SparkContext can be re-used to create multiple StreamingContexts, as long as the previous StreamingContext is stopped (without stopping the SparkContext) before the next StreamingContext is created.
0545
0546 ***
0547
0548 ## Discretized Streams (DStreams)
0549 **Discretized Stream** or **DStream** is the basic abstraction provided by Spark Streaming.
0550 It represents a continuous stream of data, either the input data stream received from source,
0551 or the processed data stream generated by transforming the input stream. Internally,
0552 a DStream is represented by a continuous series of RDDs, which is Spark's abstraction of an immutable,
0553 distributed dataset (see [Spark Programming Guide](rdd-programming-guide.html#resilient-distributed-datasets-rdds) for more details). Each RDD in a DStream contains data from a certain interval,
0554 as shown in the following figure.
0555
0556 <p style="text-align: center;">
0557 <img src="img/streaming-dstream.png"
0558 title="Spark Streaming data flow"
0559 alt="Spark Streaming"
0560 width="70%" />
0561 </p>
0562
0563 Any operation applied on a DStream translates to operations on the underlying RDDs. For example,
0564 in the [earlier example](#a-quick-example) of converting a stream of lines to words,
0565 the `flatMap` operation is applied on each RDD in the `lines` DStream to generate the RDDs of the
0566 `words` DStream. This is shown in the following figure.
0567
0568 <p style="text-align: center;">
0569 <img src="img/streaming-dstream-ops.png"
0570 title="Spark Streaming data flow"
0571 alt="Spark Streaming"
0572 width="70%" />
0573 </p>
0574
0575
0576 These underlying RDD transformations are computed by the Spark engine. The DStream operations
0577 hide most of these details and provide the developer with a higher-level API for convenience.
0578 These operations are discussed in detail in later sections.
0579
0580 ***
0581
0582 ## Input DStreams and Receivers
0583 Input DStreams are DStreams representing the stream of input data received from streaming
0584 sources. In the [quick example](#a-quick-example), `lines` was an input DStream as it represented
0585 the stream of data received from the netcat server. Every input DStream
0586 (except file stream, discussed later in this section) is associated with a **Receiver**
0587 ([Scala doc](api/scala/org/apache/spark/streaming/receiver/Receiver.html),
0588 [Java doc](api/java/org/apache/spark/streaming/receiver/Receiver.html)) object which receives the
0589 data from a source and stores it in Spark's memory for processing.
0590
0591 Spark Streaming provides two categories of built-in streaming sources.
0592
0593 - *Basic sources*: Sources directly available in the StreamingContext API.
0594 Examples: file systems, and socket connections.
0595 - *Advanced sources*: Sources like Kafka, Kinesis, etc. are available through
0596 extra utility classes. These require linking against extra dependencies as discussed in the
0597 [linking](#linking) section.
0598
0599 We are going to discuss some of the sources present in each category later in this section.
0600
0601 Note that, if you want to receive multiple streams of data in parallel in your streaming
0602 application, you can create multiple input DStreams (discussed
0603 further in the [Performance Tuning](#level-of-parallelism-in-data-receiving) section). This will
0604 create multiple receivers which will simultaneously receive multiple data streams. But note that a
0605 Spark worker/executor is a long-running task, hence it occupies one of the cores allocated to the
0606 Spark Streaming application. Therefore, it is important to remember that a Spark Streaming application
0607 needs to be allocated enough cores (or threads, if running locally) to process the received data,
0608 as well as to run the receiver(s).
0609
0610 ##### Points to remember
0611 {:.no_toc}
0612
0613 - When running a Spark Streaming program locally, do not use "local" or "local[1]" as the master URL.
0614 Either of these means that only one thread will be used for running tasks locally. If you are using
0615 an input DStream based on a receiver (e.g. sockets, Kafka, etc.), then the single thread will
0616 be used to run the receiver, leaving no thread for processing the received data. Hence, when
0617 running locally, always use "local[*n*]" as the master URL, where *n* > number of receivers to run
0618 (see [Spark Properties](configuration.html#spark-properties) for information on how to set
0619 the master).
0620
0621 - Extending the logic to running on a cluster, the number of cores allocated to the Spark Streaming
0622 application must be more than the number of receivers. Otherwise the system will receive data, but
0623 not be able to process it.
0624
0625 ### Basic Sources
0626 {:.no_toc}
0627
0628 We have already taken a look at the `ssc.socketTextStream(...)` in the [quick example](#a-quick-example)
0629 which creates a DStream from text
0630 data received over a TCP socket connection. Besides sockets, the StreamingContext API provides
0631 methods for creating DStreams from files as input sources.
0632
0633 #### File Streams
0634 {:.no_toc}
0635
0636 For reading data from files on any file system compatible with the HDFS API (that is, HDFS, S3, NFS, etc.), a DStream can be created as
0637 via `StreamingContext.fileStream[KeyClass, ValueClass, InputFormatClass]`.
0638
0639 File streams do not require running a receiver so there is no need to allocate any cores for receiving file data.
0640
0641 For simple text files, the easiest method is `StreamingContext.textFileStream(dataDirectory)`.
0642
0643 <div class="codetabs">
0644 <div data-lang="scala" markdown="1">
0645
0646 {% highlight scala %}
0647 streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)
0648 {% endhighlight %}
0649 For text files
0650
0651 {% highlight scala %}
0652 streamingContext.textFileStream(dataDirectory)
0653 {% endhighlight %}
0654 </div>
0655
0656 <div data-lang="java" markdown="1">
0657 {% highlight java %}
0658 streamingContext.fileStream<KeyClass, ValueClass, InputFormatClass>(dataDirectory);
0659 {% endhighlight %}
0660 For text files
0661
0662 {% highlight java %}
0663 streamingContext.textFileStream(dataDirectory);
0664 {% endhighlight %}
0665 </div>
0666
0667 <div data-lang="python" markdown="1">
0668 `fileStream` is not available in the Python API; only `textFileStream` is available.
0669 {% highlight python %}
0670 streamingContext.textFileStream(dataDirectory)
0671 {% endhighlight %}
0672 </div>
0673
0674 </div>
0675
0676 ##### How Directories are Monitored
0677 {:.no_toc}
0678
0679 Spark Streaming will monitor the directory `dataDirectory` and process any files created in that directory.
0680
0681 * A simple directory can be monitored, such as `"hdfs://namenode:8040/logs/"`.
0682 All files directly under such a path will be processed as they are discovered.
0683 + A [POSIX glob pattern](http://pubs.opengroup.org/onlinepubs/009695399/utilities/xcu_chap02.html#tag_02_13_02) can be supplied, such as
0684 `"hdfs://namenode:8040/logs/2017/*"`.
0685 Here, the DStream will consist of all files in the directories
0686 matching the pattern.
0687 That is: it is a pattern of directories, not of files in directories.
0688 + All files must be in the same data format.
0689 * A file is considered part of a time period based on its modification time,
0690 not its creation time.
0691 + Once processed, changes to a file within the current window will not cause the file to be reread.
0692 That is: *updates are ignored*.
0693 + The more files under a directory, the longer it will take to
0694 scan for changes — even if no files have been modified.
0695 * If a wildcard is used to identify directories, such as `"hdfs://namenode:8040/logs/2016-*"`,
0696 renaming an entire directory to match the path will add the directory to the list of
0697 monitored directories. Only the files in the directory whose modification time is
0698 within the current window will be included in the stream.
0699 + Calling [`FileSystem.setTimes()`](https://hadoop.apache.org/docs/current/api/org/apache/hadoop/fs/FileSystem.html#setTimes-org.apache.hadoop.fs.Path-long-long-)
0700 to fix the timestamp is a way to have the file picked up in a later window, even if its contents have not changed.
0701
0702
0703 ##### Using Object Stores as a source of data
0704 {:.no_toc}
0705
0706 "Full" Filesystems such as HDFS tend to set the modification time on their files as soon
0707 as the output stream is created.
0708 When a file is opened, even before data has been completely written,
0709 it may be included in the `DStream` - after which updates to the file within the same window
0710 will be ignored. That is: changes may be missed, and data omitted from the stream.
0711
0712 To guarantee that changes are picked up in a window, write the file
0713 to an unmonitored directory, then, immediately after the output stream is closed,
0714 rename it into the destination directory.
0715 Provided the renamed file appears in the scanned destination directory during the window
0716 of its creation, the new data will be picked up.
0717
0718 In contrast, Object Stores such as Amazon S3 and Azure Storage usually have slow rename operations, as the
0719 data is actually copied.
0720 Furthermore, renamed object may have the time of the `rename()` operation as its modification time, so
0721 may not be considered part of the window which the original create time implied they were.
0722
0723 Careful testing is needed against the target object store to verify that the timestamp behavior
0724 of the store is consistent with that expected by Spark Streaming. It may be
0725 that writing directly into a destination directory is the appropriate strategy for
0726 streaming data via the chosen object store.
0727
0728 For more details on this topic, consult the [Hadoop Filesystem Specification](https://hadoop.apache.org/docs/stable2/hadoop-project-dist/hadoop-common/filesystem/introduction.html).
0729
0730 #### Streams based on Custom Receivers
0731 {:.no_toc}
0732
0733 DStreams can be created with data streams received through custom receivers. See the [Custom Receiver
0734 Guide](streaming-custom-receivers.html) for more details.
0735
0736 #### Queue of RDDs as a Stream
0737 {:.no_toc}
0738
0739 For testing a Spark Streaming application with test data, one can also create a DStream based on a queue of RDDs, using `streamingContext.queueStream(queueOfRDDs)`. Each RDD pushed into the queue will be treated as a batch of data in the DStream, and processed like a stream.
0740
0741 For more details on streams from sockets and files, see the API documentations of the relevant functions in
0742 [StreamingContext](api/scala/org/apache/spark/streaming/StreamingContext.html) for
0743 Scala, [JavaStreamingContext](api/java/index.html?org/apache/spark/streaming/api/java/JavaStreamingContext.html)
0744 for Java, and [StreamingContext](api/python/pyspark.streaming.html#pyspark.streaming.StreamingContext) for Python.
0745
0746 ### Advanced Sources
0747 {:.no_toc}
0748
0749 <span class="badge" style="background-color: grey">Python API</span> As of Spark {{site.SPARK_VERSION_SHORT}},
0750 out of these sources, Kafka and Kinesis are available in the Python API.
0751
0752 This category of sources requires interfacing with external non-Spark libraries, some of them with
0753 complex dependencies (e.g., Kafka). Hence, to minimize issues related to version conflicts
0754 of dependencies, the functionality to create DStreams from these sources has been moved to separate
0755 libraries that can be [linked](#linking) to explicitly when necessary.
0756
0757 Note that these advanced sources are not available in the Spark shell, hence applications based on
0758 these advanced sources cannot be tested in the shell. If you really want to use them in the Spark
0759 shell you will have to download the corresponding Maven artifact's JAR along with its dependencies
0760 and add it to the classpath.
0761
0762 Some of these advanced sources are as follows.
0763
0764 - **Kafka:** Spark Streaming {{site.SPARK_VERSION_SHORT}} is compatible with Kafka broker versions 0.10 or higher. See the [Kafka Integration Guide](streaming-kafka-0-10-integration.html) for more details.
0765
0766 - **Kinesis:** Spark Streaming {{site.SPARK_VERSION_SHORT}} is compatible with Kinesis Client Library 1.2.1. See the [Kinesis Integration Guide](streaming-kinesis-integration.html) for more details.
0767
0768 ### Custom Sources
0769 {:.no_toc}
0770
0771 <span class="badge" style="background-color: grey">Python API</span> This is not yet supported in Python.
0772
0773 Input DStreams can also be created out of custom data sources. All you have to do is implement a
0774 user-defined **receiver** (see next section to understand what that is) that can receive data from
0775 the custom sources and push it into Spark. See the [Custom Receiver
0776 Guide](streaming-custom-receivers.html) for details.
0777
0778 ### Receiver Reliability
0779 {:.no_toc}
0780
0781 There can be two kinds of data sources based on their *reliability*. Sources
0782 (like Kafka) allow the transferred data to be acknowledged. If the system receiving
0783 data from these *reliable* sources acknowledges the received data correctly, it can be ensured
0784 that no data will be lost due to any kind of failure. This leads to two kinds of receivers:
0785
0786 1. *Reliable Receiver* - A *reliable receiver* correctly sends acknowledgment to a reliable
0787 source when the data has been received and stored in Spark with replication.
0788 1. *Unreliable Receiver* - An *unreliable receiver* does *not* send acknowledgment to a source. This can be used for sources that do not support acknowledgment, or even for reliable sources when one does not want or need to go into the complexity of acknowledgment.
0789
0790 The details of how to write a reliable receiver are discussed in the
0791 [Custom Receiver Guide](streaming-custom-receivers.html).
0792
0793 ***
0794
0795 ## Transformations on DStreams
0796 Similar to that of RDDs, transformations allow the data from the input DStream to be modified.
0797 DStreams support many of the transformations available on normal Spark RDD's.
0798 Some of the common ones are as follows.
0799
0800 <table class="table">
0801 <tr><th style="width:25%">Transformation</th><th>Meaning</th></tr>
0802 <tr>
0803 <td> <b>map</b>(<i>func</i>) </td>
0804 <td> Return a new DStream by passing each element of the source DStream through a
0805 function <i>func</i>. </td>
0806 </tr>
0807 <tr>
0808 <td> <b>flatMap</b>(<i>func</i>) </td>
0809 <td> Similar to map, but each input item can be mapped to 0 or more output items. </td>
0810 </tr>
0811 <tr>
0812 <td> <b>filter</b>(<i>func</i>) </td>
0813 <td> Return a new DStream by selecting only the records of the source DStream on which
0814 <i>func</i> returns true. </td>
0815 </tr>
0816 <tr>
0817 <td> <b>repartition</b>(<i>numPartitions</i>) </td>
0818 <td> Changes the level of parallelism in this DStream by creating more or fewer partitions. </td>
0819 </tr>
0820 <tr>
0821 <td> <b>union</b>(<i>otherStream</i>) </td>
0822 <td> Return a new DStream that contains the union of the elements in the source DStream and
0823 <i>otherDStream</i>. </td>
0824 </tr>
0825 <tr>
0826 <td> <b>count</b>() </td>
0827 <td> Return a new DStream of single-element RDDs by counting the number of elements in each RDD
0828 of the source DStream. </td>
0829 </tr>
0830 <tr>
0831 <td> <b>reduce</b>(<i>func</i>) </td>
0832 <td> Return a new DStream of single-element RDDs by aggregating the elements in each RDD of the
0833 source DStream using a function <i>func</i> (which takes two arguments and returns one).
0834 The function should be associative and commutative so that it can be computed in parallel. </td>
0835 </tr>
0836 <tr>
0837 <td> <b>countByValue</b>() </td>
0838 <td> When called on a DStream of elements of type K, return a new DStream of (K, Long) pairs
0839 where the value of each key is its frequency in each RDD of the source DStream. </td>
0840 </tr>
0841 <tr>
0842 <td> <b>reduceByKey</b>(<i>func</i>, [<i>numTasks</i>]) </td>
0843 <td> When called on a DStream of (K, V) pairs, return a new DStream of (K, V) pairs where the
0844 values for each key are aggregated using the given reduce function. <b>Note:</b> By default,
0845 this uses Spark's default number of parallel tasks (2 for local mode, and in cluster mode the number
0846 is determined by the config property <code>spark.default.parallelism</code>) to do the grouping.
0847 You can pass an optional <code>numTasks</code> argument to set a different number of tasks.</td>
0848 </tr>
0849 <tr>
0850 <td> <b>join</b>(<i>otherStream</i>, [<i>numTasks</i>]) </td>
0851 <td> When called on two DStreams of (K, V) and (K, W) pairs, return a new DStream of (K, (V, W))
0852 pairs with all pairs of elements for each key. </td>
0853 </tr>
0854 <tr>
0855 <td> <b>cogroup</b>(<i>otherStream</i>, [<i>numTasks</i>]) </td>
0856 <td> When called on a DStream of (K, V) and (K, W) pairs, return a new DStream of
0857 (K, Seq[V], Seq[W]) tuples.</td>
0858 </tr>
0859 <tr>
0860 <td> <b>transform</b>(<i>func</i>) </td>
0861 <td> Return a new DStream by applying a RDD-to-RDD function to every RDD of the source DStream.
0862 This can be used to do arbitrary RDD operations on the DStream. </td>
0863 </tr>
0864 <tr>
0865 <td> <b>updateStateByKey</b>(<i>func</i>) </td>
0866 <td> Return a new "state" DStream where the state for each key is updated by applying the
0867 given function on the previous state of the key and the new values for the key. This can be
0868 used to maintain arbitrary state data for each key.</td>
0869 </tr>
0870 <tr><td></td><td></td></tr>
0871 </table>
0872
0873 A few of these transformations are worth discussing in more detail.
0874
0875 #### UpdateStateByKey Operation
0876 {:.no_toc}
0877 The `updateStateByKey` operation allows you to maintain arbitrary state while continuously updating
0878 it with new information. To use this, you will have to do two steps.
0879
0880 1. Define the state - The state can be an arbitrary data type.
0881 1. Define the state update function - Specify with a function how to update the state using the
0882 previous state and the new values from an input stream.
0883
0884 In every batch, Spark will apply the state update function for all existing keys, regardless of whether they have new data in a batch or not. If the update function returns `None` then the key-value pair will be eliminated.
0885
0886 Let's illustrate this with an example. Say you want to maintain a running count of each word
0887 seen in a text data stream. Here, the running count is the state and it is an integer. We
0888 define the update function as:
0889
0890 <div class="codetabs">
0891 <div data-lang="scala" markdown="1">
0892
0893 {% highlight scala %}
0894 def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
0895 val newCount = ... // add the new values with the previous running count to get the new count
0896 Some(newCount)
0897 }
0898 {% endhighlight %}
0899
0900 This is applied on a DStream containing words (say, the `pairs` DStream containing `(word,
0901 1)` pairs in the [earlier example](#a-quick-example)).
0902
0903 {% highlight scala %}
0904 val runningCounts = pairs.updateStateByKey[Int](updateFunction _)
0905 {% endhighlight %}
0906
0907 The update function will be called for each word, with `newValues` having a sequence of 1's (from
0908 the `(word, 1)` pairs) and the `runningCount` having the previous count.
0909
0910 </div>
0911 <div data-lang="java" markdown="1">
0912
0913 {% highlight java %}
0914 Function2<List<Integer>, Optional<Integer>, Optional<Integer>> updateFunction =
0915 (values, state) -> {
0916 Integer newSum = ... // add the new values with the previous running count to get the new count
0917 return Optional.of(newSum);
0918 };
0919 {% endhighlight %}
0920
0921 This is applied on a DStream containing words (say, the `pairs` DStream containing `(word,
0922 1)` pairs in the [quick example](#a-quick-example)).
0923
0924 {% highlight java %}
0925 JavaPairDStream<String, Integer> runningCounts = pairs.updateStateByKey(updateFunction);
0926 {% endhighlight %}
0927
0928 The update function will be called for each word, with `newValues` having a sequence of 1's (from
0929 the `(word, 1)` pairs) and the `runningCount` having the previous count. For the complete
0930 Java code, take a look at the example
0931 [JavaStatefulNetworkWordCount.java]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java).
0932
0933 </div>
0934 <div data-lang="python" markdown="1">
0935
0936 {% highlight python %}
0937 def updateFunction(newValues, runningCount):
0938 if runningCount is None:
0939 runningCount = 0
0940 return sum(newValues, runningCount) # add the new values with the previous running count to get the new count
0941 {% endhighlight %}
0942
0943 This is applied on a DStream containing words (say, the `pairs` DStream containing `(word,
0944 1)` pairs in the [earlier example](#a-quick-example)).
0945
0946 {% highlight python %}
0947 runningCounts = pairs.updateStateByKey(updateFunction)
0948 {% endhighlight %}
0949
0950 The update function will be called for each word, with `newValues` having a sequence of 1's (from
0951 the `(word, 1)` pairs) and the `runningCount` having the previous count. For the complete
0952 Python code, take a look at the example
0953 [stateful_network_wordcount.py]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/python/streaming/stateful_network_wordcount.py).
0954
0955 </div>
0956 </div>
0957
0958 Note that using `updateStateByKey` requires the checkpoint directory to be configured, which is
0959 discussed in detail in the [checkpointing](#checkpointing) section.
0960
0961
0962 #### Transform Operation
0963 {:.no_toc}
0964 The `transform` operation (along with its variations like `transformWith`) allows
0965 arbitrary RDD-to-RDD functions to be applied on a DStream. It can be used to apply any RDD
0966 operation that is not exposed in the DStream API.
0967 For example, the functionality of joining every batch in a data stream
0968 with another dataset is not directly exposed in the DStream API. However,
0969 you can easily use `transform` to do this. This enables very powerful possibilities. For example,
0970 one can do real-time data cleaning by joining the input data stream with precomputed
0971 spam information (maybe generated with Spark as well) and then filtering based on it.
0972
0973 <div class="codetabs">
0974 <div data-lang="scala" markdown="1">
0975
0976 {% highlight scala %}
0977 val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam information
0978
0979 val cleanedDStream = wordCounts.transform { rdd =>
0980 rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning
0981 ...
0982 }
0983 {% endhighlight %}
0984
0985 </div>
0986 <div data-lang="java" markdown="1">
0987
0988 {% highlight java %}
0989 import org.apache.spark.streaming.api.java.*;
0990 // RDD containing spam information
0991 JavaPairRDD<String, Double> spamInfoRDD = jssc.sparkContext().newAPIHadoopRDD(...);
0992
0993 JavaPairDStream<String, Integer> cleanedDStream = wordCounts.transform(rdd -> {
0994 rdd.join(spamInfoRDD).filter(...); // join data stream with spam information to do data cleaning
0995 ...
0996 });
0997 {% endhighlight %}
0998
0999 </div>
1000 <div data-lang="python" markdown="1">
1001
1002 {% highlight python %}
1003 spamInfoRDD = sc.pickleFile(...) # RDD containing spam information
1004
1005 # join data stream with spam information to do data cleaning
1006 cleanedDStream = wordCounts.transform(lambda rdd: rdd.join(spamInfoRDD).filter(...))
1007 {% endhighlight %}
1008 </div>
1009 </div>
1010
1011 Note that the supplied function gets called in every batch interval. This allows you to do
1012 time-varying RDD operations, that is, RDD operations, number of partitions, broadcast variables,
1013 etc. can be changed between batches.
1014
1015 #### Window Operations
1016 {:.no_toc}
1017 Spark Streaming also provides *windowed computations*, which allow you to apply
1018 transformations over a sliding window of data. The following figure illustrates this sliding
1019 window.
1020
1021 <p style="text-align: center;">
1022 <img src="img/streaming-dstream-window.png"
1023 title="Spark Streaming data flow"
1024 alt="Spark Streaming"
1025 width="60%" />
1026 </p>
1027
1028 As shown in the figure, every time the window *slides* over a source DStream,
1029 the source RDDs that fall within the window are combined and operated upon to produce the
1030 RDDs of the windowed DStream. In this specific case, the operation is applied over the last 3 time
1031 units of data, and slides by 2 time units. This shows that any window operation needs to
1032 specify two parameters.
1033
1034 * <i>window length</i> - The duration of the window (3 in the figure).
1035 * <i>sliding interval</i> - The interval at which the window operation is performed (2 in
1036 the figure).
1037
1038 These two parameters must be multiples of the batch interval of the source DStream (1 in the
1039 figure).
1040
1041 Let's illustrate the window operations with an example. Say, you want to extend the
1042 [earlier example](#a-quick-example) by generating word counts over the last 30 seconds of data,
1043 every 10 seconds. To do this, we have to apply the `reduceByKey` operation on the `pairs` DStream of
1044 `(word, 1)` pairs over the last 30 seconds of data. This is done using the
1045 operation `reduceByKeyAndWindow`.
1046
1047 <div class="codetabs">
1048 <div data-lang="scala" markdown="1">
1049
1050 {% highlight scala %}
1051 // Reduce last 30 seconds of data, every 10 seconds
1052 val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))
1053 {% endhighlight %}
1054
1055 </div>
1056 <div data-lang="java" markdown="1">
1057
1058 {% highlight java %}
1059 // Reduce last 30 seconds of data, every 10 seconds
1060 JavaPairDStream<String, Integer> windowedWordCounts = pairs.reduceByKeyAndWindow((i1, i2) -> i1 + i2, Durations.seconds(30), Durations.seconds(10));
1061 {% endhighlight %}
1062
1063 </div>
1064 <div data-lang="python" markdown="1">
1065
1066 {% highlight python %}
1067 # Reduce last 30 seconds of data, every 10 seconds
1068 windowedWordCounts = pairs.reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y: x - y, 30, 10)
1069 {% endhighlight %}
1070
1071 </div>
1072 </div>
1073
1074 Some of the common window operations are as follows. All of these operations take the
1075 said two parameters - <i>windowLength</i> and <i>slideInterval</i>.
1076
1077 <table class="table">
1078 <tr><th style="width:25%">Transformation</th><th>Meaning</th></tr>
1079 <tr>
1080 <td> <b>window</b>(<i>windowLength</i>, <i>slideInterval</i>) </td>
1081 <td> Return a new DStream which is computed based on windowed batches of the source DStream.
1082 </td>
1083 </tr>
1084 <tr>
1085 <td> <b>countByWindow</b>(<i>windowLength</i>, <i>slideInterval</i>) </td>
1086 <td> Return a sliding window count of elements in the stream.
1087 </td>
1088 </tr>
1089 <tr>
1090 <td> <b>reduceByWindow</b>(<i>func</i>, <i>windowLength</i>, <i>slideInterval</i>) </td>
1091 <td> Return a new single-element stream, created by aggregating elements in the stream over a
1092 sliding interval using <i>func</i>. The function should be associative and commutative so that it can be computed
1093 correctly in parallel.
1094 </td>
1095 </tr>
1096 <tr>
1097 <td> <b>reduceByKeyAndWindow</b>(<i>func</i>, <i>windowLength</i>, <i>slideInterval</i>,
1098 [<i>numTasks</i>]) </td>
1099 <td> When called on a DStream of (K, V) pairs, returns a new DStream of (K, V)
1100 pairs where the values for each key are aggregated using the given reduce function <i>func</i>
1101 over batches in a sliding window. <b>Note:</b> By default, this uses Spark's default number of
1102 parallel tasks (2 for local mode, and in cluster mode the number is determined by the config
1103 property <code>spark.default.parallelism</code>) to do the grouping. You can pass an optional
1104 <code>numTasks</code> argument to set a different number of tasks.
1105 </td>
1106 </tr>
1107 <tr>
1108 <td> <b>reduceByKeyAndWindow</b>(<i>func</i>, <i>invFunc</i>, <i>windowLength</i>,
1109 <i>slideInterval</i>, [<i>numTasks</i>]) </td>
1110 <td markdown="1"> A more efficient version of the above <code>reduceByKeyAndWindow()</code> where the reduce
1111 value of each window is calculated incrementally using the reduce values of the previous window.
1112 This is done by reducing the new data that enters the sliding window, and "inverse reducing" the
1113 old data that leaves the window. An example would be that of "adding" and "subtracting" counts
1114 of keys as the window slides. However, it is applicable only to "invertible reduce functions",
1115 that is, those reduce functions which have a corresponding "inverse reduce" function (taken as
1116 parameter <i>invFunc</i>). Like in <code>reduceByKeyAndWindow</code>, the number of reduce tasks
1117 is configurable through an optional argument. Note that [checkpointing](#checkpointing) must be
1118 enabled for using this operation.
1119 </td>
1120 </tr>
1121 <tr>
1122 <td> <b>countByValueAndWindow</b>(<i>windowLength</i>,
1123 <i>slideInterval</i>, [<i>numTasks</i>]) </td>
1124 <td> When called on a DStream of (K, V) pairs, returns a new DStream of (K, Long) pairs where the
1125 value of each key is its frequency within a sliding window. Like in
1126 <code>reduceByKeyAndWindow</code>, the number of reduce tasks is configurable through an
1127 optional argument.
1128 </td>
1129 </tr>
1130 <tr><td></td><td></td></tr>
1131 </table>
1132
1133 #### Join Operations
1134 {:.no_toc}
1135 Finally, its worth highlighting how easily you can perform different kinds of joins in Spark Streaming.
1136
1137
1138 ##### Stream-stream joins
1139 {:.no_toc}
1140 Streams can be very easily joined with other streams.
1141
1142 <div class="codetabs">
1143 <div data-lang="scala" markdown="1">
1144 {% highlight scala %}
1145 val stream1: DStream[String, String] = ...
1146 val stream2: DStream[String, String] = ...
1147 val joinedStream = stream1.join(stream2)
1148 {% endhighlight %}
1149 </div>
1150 <div data-lang="java" markdown="1">
1151 {% highlight java %}
1152 JavaPairDStream<String, String> stream1 = ...
1153 JavaPairDStream<String, String> stream2 = ...
1154 JavaPairDStream<String, Tuple2<String, String>> joinedStream = stream1.join(stream2);
1155 {% endhighlight %}
1156 </div>
1157 <div data-lang="python" markdown="1">
1158 {% highlight python %}
1159 stream1 = ...
1160 stream2 = ...
1161 joinedStream = stream1.join(stream2)
1162 {% endhighlight %}
1163 </div>
1164 </div>
1165 Here, in each batch interval, the RDD generated by `stream1` will be joined with the RDD generated by `stream2`. You can also do `leftOuterJoin`, `rightOuterJoin`, `fullOuterJoin`. Furthermore, it is often very useful to do joins over windows of the streams. That is pretty easy as well.
1166
1167 <div class="codetabs">
1168 <div data-lang="scala" markdown="1">
1169 {% highlight scala %}
1170 val windowedStream1 = stream1.window(Seconds(20))
1171 val windowedStream2 = stream2.window(Minutes(1))
1172 val joinedStream = windowedStream1.join(windowedStream2)
1173 {% endhighlight %}
1174 </div>
1175 <div data-lang="java" markdown="1">
1176 {% highlight java %}
1177 JavaPairDStream<String, String> windowedStream1 = stream1.window(Durations.seconds(20));
1178 JavaPairDStream<String, String> windowedStream2 = stream2.window(Durations.minutes(1));
1179 JavaPairDStream<String, Tuple2<String, String>> joinedStream = windowedStream1.join(windowedStream2);
1180 {% endhighlight %}
1181 </div>
1182 <div data-lang="python" markdown="1">
1183 {% highlight python %}
1184 windowedStream1 = stream1.window(20)
1185 windowedStream2 = stream2.window(60)
1186 joinedStream = windowedStream1.join(windowedStream2)
1187 {% endhighlight %}
1188 </div>
1189 </div>
1190
1191 ##### Stream-dataset joins
1192 {:.no_toc}
1193 This has already been shown earlier while explain `DStream.transform` operation. Here is yet another example of joining a windowed stream with a dataset.
1194
1195 <div class="codetabs">
1196 <div data-lang="scala" markdown="1">
1197 {% highlight scala %}
1198 val dataset: RDD[String, String] = ...
1199 val windowedStream = stream.window(Seconds(20))...
1200 val joinedStream = windowedStream.transform { rdd => rdd.join(dataset) }
1201 {% endhighlight %}
1202 </div>
1203 <div data-lang="java" markdown="1">
1204 {% highlight java %}
1205 JavaPairRDD<String, String> dataset = ...
1206 JavaPairDStream<String, String> windowedStream = stream.window(Durations.seconds(20));
1207 JavaPairDStream<String, String> joinedStream = windowedStream.transform(rdd -> rdd.join(dataset));
1208 {% endhighlight %}
1209 </div>
1210 <div data-lang="python" markdown="1">
1211 {% highlight python %}
1212 dataset = ... # some RDD
1213 windowedStream = stream.window(20)
1214 joinedStream = windowedStream.transform(lambda rdd: rdd.join(dataset))
1215 {% endhighlight %}
1216 </div>
1217 </div>
1218
1219 In fact, you can also dynamically change the dataset you want to join against. The function provided to `transform` is evaluated every batch interval and therefore will use the current dataset that `dataset` reference points to.
1220
1221 The complete list of DStream transformations is available in the API documentation. For the Scala API,
1222 see [DStream](api/scala/org/apache/spark/streaming/dstream/DStream.html)
1223 and [PairDStreamFunctions](api/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.html).
1224 For the Java API, see [JavaDStream](api/java/index.html?org/apache/spark/streaming/api/java/JavaDStream.html)
1225 and [JavaPairDStream](api/java/index.html?org/apache/spark/streaming/api/java/JavaPairDStream.html).
1226 For the Python API, see [DStream](api/python/pyspark.streaming.html#pyspark.streaming.DStream).
1227
1228 ***
1229
1230 ## Output Operations on DStreams
1231 Output operations allow DStream's data to be pushed out to external systems like a database or a file systems.
1232 Since the output operations actually allow the transformed data to be consumed by external systems,
1233 they trigger the actual execution of all the DStream transformations (similar to actions for RDDs).
1234 Currently, the following output operations are defined:
1235
1236 <table class="table">
1237 <tr><th style="width:30%">Output Operation</th><th>Meaning</th></tr>
1238 <tr>
1239 <td> <b>print</b>()</td>
1240 <td> Prints the first ten elements of every batch of data in a DStream on the driver node running
1241 the streaming application. This is useful for development and debugging.
1242 <br/>
1243 <span class="badge" style="background-color: grey">Python API</span> This is called
1244 <b>pprint()</b> in the Python API.
1245 </td>
1246 </tr>
1247 <tr>
1248 <td> <b>saveAsTextFiles</b>(<i>prefix</i>, [<i>suffix</i>]) </td>
1249 <td> Save this DStream's contents as text files. The file name at each batch interval is
1250 generated based on <i>prefix</i> and <i>suffix</i>: <i>"prefix-TIME_IN_MS[.suffix]"</i>. </td>
1251 </tr>
1252 <tr>
1253 <td> <b>saveAsObjectFiles</b>(<i>prefix</i>, [<i>suffix</i>]) </td>
1254 <td> Save this DStream's contents as <code>SequenceFiles</code> of serialized Java objects. The file
1255 name at each batch interval is generated based on <i>prefix</i> and
1256 <i>suffix</i>: <i>"prefix-TIME_IN_MS[.suffix]"</i>.
1257 <br/>
1258 <span class="badge" style="background-color: grey">Python API</span> This is not available in
1259 the Python API.
1260 </td>
1261 </tr>
1262 <tr>
1263 <td> <b>saveAsHadoopFiles</b>(<i>prefix</i>, [<i>suffix</i>]) </td>
1264 <td> Save this DStream's contents as Hadoop files. The file name at each batch interval is
1265 generated based on <i>prefix</i> and <i>suffix</i>: <i>"prefix-TIME_IN_MS[.suffix]"</i>.
1266 <br>
1267 <span class="badge" style="background-color: grey">Python API</span> This is not available in
1268 the Python API.
1269 </td>
1270 </tr>
1271 <tr>
1272 <td> <b>foreachRDD</b>(<i>func</i>) </td>
1273 <td> The most generic output operator that applies a function, <i>func</i>, to each RDD generated from
1274 the stream. This function should push the data in each RDD to an external system, such as saving the RDD to
1275 files, or writing it over the network to a database. Note that the function <i>func</i> is executed
1276 in the driver process running the streaming application, and will usually have RDD actions in it
1277 that will force the computation of the streaming RDDs.</td>
1278 </tr>
1279 <tr><td></td><td></td></tr>
1280 </table>
1281
1282 ### Design Patterns for using foreachRDD
1283 {:.no_toc}
1284 `dstream.foreachRDD` is a powerful primitive that allows data to be sent out to external systems.
1285 However, it is important to understand how to use this primitive correctly and efficiently.
1286 Some of the common mistakes to avoid are as follows.
1287
1288 Often writing data to external system requires creating a connection object
1289 (e.g. TCP connection to a remote server) and using it to send data to a remote system.
1290 For this purpose, a developer may inadvertently try creating a connection object at
1291 the Spark driver, and then try to use it in a Spark worker to save records in the RDDs.
1292 For example (in Scala),
1293
1294 <div class="codetabs">
1295 <div data-lang="scala" markdown="1">
1296 {% highlight scala %}
1297 dstream.foreachRDD { rdd =>
1298 val connection = createNewConnection() // executed at the driver
1299 rdd.foreach { record =>
1300 connection.send(record) // executed at the worker
1301 }
1302 }
1303 {% endhighlight %}
1304 </div>
1305 <div data-lang="java" markdown="1">
1306 {% highlight java %}
1307 dstream.foreachRDD(rdd -> {
1308 Connection connection = createNewConnection(); // executed at the driver
1309 rdd.foreach(record -> {
1310 connection.send(record); // executed at the worker
1311 });
1312 });
1313 {% endhighlight %}
1314 </div>
1315 <div data-lang="python" markdown="1">
1316 {% highlight python %}
1317 def sendRecord(rdd):
1318 connection = createNewConnection() # executed at the driver
1319 rdd.foreach(lambda record: connection.send(record))
1320 connection.close()
1321
1322 dstream.foreachRDD(sendRecord)
1323 {% endhighlight %}
1324 </div>
1325 </div>
1326
1327 This is incorrect as this requires the connection object to be serialized and sent from the
1328 driver to the worker. Such connection objects are rarely transferable across machines. This
1329 error may manifest as serialization errors (connection object not serializable), initialization
1330 errors (connection object needs to be initialized at the workers), etc. The correct solution is
1331 to create the connection object at the worker.
1332
1333 However, this can lead to another common mistake - creating a new connection for every record.
1334 For example,
1335
1336 <div class="codetabs">
1337 <div data-lang="scala" markdown="1">
1338 {% highlight scala %}
1339 dstream.foreachRDD { rdd =>
1340 rdd.foreach { record =>
1341 val connection = createNewConnection()
1342 connection.send(record)
1343 connection.close()
1344 }
1345 }
1346 {% endhighlight %}
1347 </div>
1348 <div data-lang="java" markdown="1">
1349 {% highlight java %}
1350 dstream.foreachRDD(rdd -> {
1351 rdd.foreach(record -> {
1352 Connection connection = createNewConnection();
1353 connection.send(record);
1354 connection.close();
1355 });
1356 });
1357 {% endhighlight %}
1358 </div>
1359 <div data-lang="python" markdown="1">
1360 {% highlight python %}
1361 def sendRecord(record):
1362 connection = createNewConnection()
1363 connection.send(record)
1364 connection.close()
1365
1366 dstream.foreachRDD(lambda rdd: rdd.foreach(sendRecord))
1367 {% endhighlight %}
1368 </div>
1369 </div>
1370
1371 Typically, creating a connection object has time and resource overheads. Therefore, creating and
1372 destroying a connection object for each record can incur unnecessarily high overheads and can
1373 significantly reduce the overall throughput of the system. A better solution is to use
1374 `rdd.foreachPartition` - create a single connection object and send all the records in a RDD
1375 partition using that connection.
1376
1377 <div class="codetabs">
1378 <div data-lang="scala" markdown="1">
1379 {% highlight scala %}
1380 dstream.foreachRDD { rdd =>
1381 rdd.foreachPartition { partitionOfRecords =>
1382 val connection = createNewConnection()
1383 partitionOfRecords.foreach(record => connection.send(record))
1384 connection.close()
1385 }
1386 }
1387 {% endhighlight %}
1388 </div>
1389 <div data-lang="java" markdown="1">
1390 {% highlight java %}
1391 dstream.foreachRDD(rdd -> {
1392 rdd.foreachPartition(partitionOfRecords -> {
1393 Connection connection = createNewConnection();
1394 while (partitionOfRecords.hasNext()) {
1395 connection.send(partitionOfRecords.next());
1396 }
1397 connection.close();
1398 });
1399 });
1400 {% endhighlight %}
1401 </div>
1402 <div data-lang="python" markdown="1">
1403 {% highlight python %}
1404 def sendPartition(iter):
1405 connection = createNewConnection()
1406 for record in iter:
1407 connection.send(record)
1408 connection.close()
1409
1410 dstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))
1411 {% endhighlight %}
1412 </div>
1413 </div>
1414
1415 This amortizes the connection creation overheads over many records.
1416
1417 Finally, this can be further optimized by reusing connection objects across multiple RDDs/batches.
1418 One can maintain a static pool of connection objects than can be reused as
1419 RDDs of multiple batches are pushed to the external system, thus further reducing the overheads.
1420
1421 <div class="codetabs">
1422 <div data-lang="scala" markdown="1">
1423 {% highlight scala %}
1424 dstream.foreachRDD { rdd =>
1425 rdd.foreachPartition { partitionOfRecords =>
1426 // ConnectionPool is a static, lazily initialized pool of connections
1427 val connection = ConnectionPool.getConnection()
1428 partitionOfRecords.foreach(record => connection.send(record))
1429 ConnectionPool.returnConnection(connection) // return to the pool for future reuse
1430 }
1431 }
1432 {% endhighlight %}
1433 </div>
1434
1435 <div data-lang="java" markdown="1">
1436 {% highlight java %}
1437 dstream.foreachRDD(rdd -> {
1438 rdd.foreachPartition(partitionOfRecords -> {
1439 // ConnectionPool is a static, lazily initialized pool of connections
1440 Connection connection = ConnectionPool.getConnection();
1441 while (partitionOfRecords.hasNext()) {
1442 connection.send(partitionOfRecords.next());
1443 }
1444 ConnectionPool.returnConnection(connection); // return to the pool for future reuse
1445 });
1446 });
1447 {% endhighlight %}
1448 </div>
1449 <div data-lang="python" markdown="1">
1450 {% highlight python %}
1451 def sendPartition(iter):
1452 # ConnectionPool is a static, lazily initialized pool of connections
1453 connection = ConnectionPool.getConnection()
1454 for record in iter:
1455 connection.send(record)
1456 # return to the pool for future reuse
1457 ConnectionPool.returnConnection(connection)
1458
1459 dstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))
1460 {% endhighlight %}
1461 </div>
1462 </div>
1463
1464 Note that the connections in the pool should be lazily created on demand and timed out if not used for a while. This achieves the most efficient sending of data to external systems.
1465
1466
1467 ##### Other points to remember:
1468 {:.no_toc}
1469 - DStreams are executed lazily by the output operations, just like RDDs are lazily executed by RDD actions. Specifically, RDD actions inside the DStream output operations force the processing of the received data. Hence, if your application does not have any output operation, or has output operations like `dstream.foreachRDD()` without any RDD action inside them, then nothing will get executed. The system will simply receive the data and discard it.
1470
1471 - By default, output operations are executed one-at-a-time. And they are executed in the order they are defined in the application.
1472
1473 ***
1474
1475 ## DataFrame and SQL Operations
1476 You can easily use [DataFrames and SQL](sql-programming-guide.html) operations on streaming data. You have to create a SparkSession using the SparkContext that the StreamingContext is using. Furthermore, this has to done such that it can be restarted on driver failures. This is done by creating a lazily instantiated singleton instance of SparkSession. This is shown in the following example. It modifies the earlier [word count example](#a-quick-example) to generate word counts using DataFrames and SQL. Each RDD is converted to a DataFrame, registered as a temporary table and then queried using SQL.
1477
1478 <div class="codetabs">
1479 <div data-lang="scala" markdown="1">
1480 {% highlight scala %}
1481
1482 /** DataFrame operations inside your streaming program */
1483
1484 val words: DStream[String] = ...
1485
1486 words.foreachRDD { rdd =>
1487
1488 // Get the singleton instance of SparkSession
1489 val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
1490 import spark.implicits._
1491
1492 // Convert RDD[String] to DataFrame
1493 val wordsDataFrame = rdd.toDF("word")
1494
1495 // Create a temporary view
1496 wordsDataFrame.createOrReplaceTempView("words")
1497
1498 // Do word count on DataFrame using SQL and print it
1499 val wordCountsDataFrame =
1500 spark.sql("select word, count(*) as total from words group by word")
1501 wordCountsDataFrame.show()
1502 }
1503
1504 {% endhighlight %}
1505
1506 See the full [source code]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/scala/org/apache/spark/examples/streaming/SqlNetworkWordCount.scala).
1507 </div>
1508 <div data-lang="java" markdown="1">
1509 {% highlight java %}
1510
1511 /** Java Bean class for converting RDD to DataFrame */
1512 public class JavaRow implements java.io.Serializable {
1513 private String word;
1514
1515 public String getWord() {
1516 return word;
1517 }
1518
1519 public void setWord(String word) {
1520 this.word = word;
1521 }
1522 }
1523
1524 ...
1525
1526 /** DataFrame operations inside your streaming program */
1527
1528 JavaDStream<String> words = ...
1529
1530 words.foreachRDD((rdd, time) -> {
1531 // Get the singleton instance of SparkSession
1532 SparkSession spark = SparkSession.builder().config(rdd.sparkContext().getConf()).getOrCreate();
1533
1534 // Convert RDD[String] to RDD[case class] to DataFrame
1535 JavaRDD<JavaRow> rowRDD = rdd.map(word -> {
1536 JavaRow record = new JavaRow();
1537 record.setWord(word);
1538 return record;
1539 });
1540 DataFrame wordsDataFrame = spark.createDataFrame(rowRDD, JavaRow.class);
1541
1542 // Creates a temporary view using the DataFrame
1543 wordsDataFrame.createOrReplaceTempView("words");
1544
1545 // Do word count on table using SQL and print it
1546 DataFrame wordCountsDataFrame =
1547 spark.sql("select word, count(*) as total from words group by word");
1548 wordCountsDataFrame.show();
1549 });
1550 {% endhighlight %}
1551
1552 See the full [source code]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java).
1553 </div>
1554 <div data-lang="python" markdown="1">
1555 {% highlight python %}
1556
1557 # Lazily instantiated global instance of SparkSession
1558 def getSparkSessionInstance(sparkConf):
1559 if ("sparkSessionSingletonInstance" not in globals()):
1560 globals()["sparkSessionSingletonInstance"] = SparkSession \
1561 .builder \
1562 .config(conf=sparkConf) \
1563 .getOrCreate()
1564 return globals()["sparkSessionSingletonInstance"]
1565
1566 ...
1567
1568 # DataFrame operations inside your streaming program
1569
1570 words = ... # DStream of strings
1571
1572 def process(time, rdd):
1573 print("========= %s =========" % str(time))
1574 try:
1575 # Get the singleton instance of SparkSession
1576 spark = getSparkSessionInstance(rdd.context.getConf())
1577
1578 # Convert RDD[String] to RDD[Row] to DataFrame
1579 rowRdd = rdd.map(lambda w: Row(word=w))
1580 wordsDataFrame = spark.createDataFrame(rowRdd)
1581
1582 # Creates a temporary view using the DataFrame
1583 wordsDataFrame.createOrReplaceTempView("words")
1584
1585 # Do word count on table using SQL and print it
1586 wordCountsDataFrame = spark.sql("select word, count(*) as total from words group by word")
1587 wordCountsDataFrame.show()
1588 except:
1589 pass
1590
1591 words.foreachRDD(process)
1592 {% endhighlight %}
1593
1594 See the full [source code]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/python/streaming/sql_network_wordcount.py).
1595
1596 </div>
1597 </div>
1598
1599 You can also run SQL queries on tables defined on streaming data from a different thread (that is, asynchronous to the running StreamingContext). Just make sure that you set the StreamingContext to remember a sufficient amount of streaming data such that the query can run. Otherwise the StreamingContext, which is unaware of the any asynchronous SQL queries, will delete off old streaming data before the query can complete. For example, if you want to query the last batch, but your query can take 5 minutes to run, then call `streamingContext.remember(Minutes(5))` (in Scala, or equivalent in other languages).
1600
1601 See the [DataFrames and SQL](sql-programming-guide.html) guide to learn more about DataFrames.
1602
1603 ***
1604
1605 ## MLlib Operations
1606 You can also easily use machine learning algorithms provided by [MLlib](ml-guide.html). First of all, there are streaming machine learning algorithms (e.g. [Streaming Linear Regression](mllib-linear-methods.html#streaming-linear-regression), [Streaming KMeans](mllib-clustering.html#streaming-k-means), etc.) which can simultaneously learn from the streaming data as well as apply the model on the streaming data. Beyond these, for a much larger class of machine learning algorithms, you can learn a learning model offline (i.e. using historical data) and then apply the model online on streaming data. See the [MLlib](ml-guide.html) guide for more details.
1607
1608 ***
1609
1610 ## Caching / Persistence
1611 Similar to RDDs, DStreams also allow developers to persist the stream's data in memory. That is,
1612 using the `persist()` method on a DStream will automatically persist every RDD of that DStream in
1613 memory. This is useful if the data in the DStream will be computed multiple times (e.g., multiple
1614 operations on the same data). For window-based operations like `reduceByWindow` and
1615 `reduceByKeyAndWindow` and state-based operations like `updateStateByKey`, this is implicitly true.
1616 Hence, DStreams generated by window-based operations are automatically persisted in memory, without
1617 the developer calling `persist()`.
1618
1619 For input streams that receive data over the network (such as, Kafka, sockets, etc.), the
1620 default persistence level is set to replicate the data to two nodes for fault-tolerance.
1621
1622 Note that, unlike RDDs, the default persistence level of DStreams keeps the data serialized in
1623 memory. This is further discussed in the [Performance Tuning](#memory-tuning) section. More
1624 information on different persistence levels can be found in the [Spark Programming Guide](rdd-programming-guide.html#rdd-persistence).
1625
1626 ***
1627
1628 ## Checkpointing
1629 A streaming application must operate 24/7 and hence must be resilient to failures unrelated
1630 to the application logic (e.g., system failures, JVM crashes, etc.). For this to be possible,
1631 Spark Streaming needs to *checkpoint* enough information to a fault-
1632 tolerant storage system such that it can recover from failures. There are two types of data
1633 that are checkpointed.
1634
1635 - *Metadata checkpointing* - Saving of the information defining the streaming computation to
1636 fault-tolerant storage like HDFS. This is used to recover from failure of the node running the
1637 driver of the streaming application (discussed in detail later). Metadata includes:
1638 + *Configuration* - The configuration that was used to create the streaming application.
1639 + *DStream operations* - The set of DStream operations that define the streaming application.
1640 + *Incomplete batches* - Batches whose jobs are queued but have not completed yet.
1641 - *Data checkpointing* - Saving of the generated RDDs to reliable storage. This is necessary
1642 in some *stateful* transformations that combine data across multiple batches. In such
1643 transformations, the generated RDDs depend on RDDs of previous batches, which causes the length
1644 of the dependency chain to keep increasing with time. To avoid such unbounded increases in recovery
1645 time (proportional to dependency chain), intermediate RDDs of stateful transformations are periodically
1646 *checkpointed* to reliable storage (e.g. HDFS) to cut off the dependency chains.
1647
1648 To summarize, metadata checkpointing is primarily needed for recovery from driver failures,
1649 whereas data or RDD checkpointing is necessary even for basic functioning if stateful
1650 transformations are used.
1651
1652 #### When to enable Checkpointing
1653 {:.no_toc}
1654
1655 Checkpointing must be enabled for applications with any of the following requirements:
1656
1657 - *Usage of stateful transformations* - If either `updateStateByKey` or `reduceByKeyAndWindow` (with
1658 inverse function) is used in the application, then the checkpoint directory must be provided to
1659 allow for periodic RDD checkpointing.
1660 - *Recovering from failures of the driver running the application* - Metadata checkpoints are used
1661 to recover with progress information.
1662
1663 Note that simple streaming applications without the aforementioned stateful transformations can be
1664 run without enabling checkpointing. The recovery from driver failures will also be partial in
1665 that case (some received but unprocessed data may be lost). This is often acceptable and many run
1666 Spark Streaming applications in this way. Support for non-Hadoop environments is expected
1667 to improve in the future.
1668
1669 #### How to configure Checkpointing
1670 {:.no_toc}
1671
1672 Checkpointing can be enabled by setting a directory in a fault-tolerant,
1673 reliable file system (e.g., HDFS, S3, etc.) to which the checkpoint information will be saved.
1674 This is done by using `streamingContext.checkpoint(checkpointDirectory)`. This will allow you to
1675 use the aforementioned stateful transformations. Additionally,
1676 if you want to make the application recover from driver failures, you should rewrite your
1677 streaming application to have the following behavior.
1678
1679 + When the program is being started for the first time, it will create a new StreamingContext,
1680 set up all the streams and then call start().
1681 + When the program is being restarted after failure, it will re-create a StreamingContext
1682 from the checkpoint data in the checkpoint directory.
1683
1684 <div class="codetabs">
1685 <div data-lang="scala" markdown="1">
1686
1687 This behavior is made simple by using `StreamingContext.getOrCreate`. This is used as follows.
1688
1689 {% highlight scala %}
1690 // Function to create and setup a new StreamingContext
1691 def functionToCreateContext(): StreamingContext = {
1692 val ssc = new StreamingContext(...) // new context
1693 val lines = ssc.socketTextStream(...) // create DStreams
1694 ...
1695 ssc.checkpoint(checkpointDirectory) // set checkpoint directory
1696 ssc
1697 }
1698
1699 // Get StreamingContext from checkpoint data or create a new one
1700 val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
1701
1702 // Do additional setup on context that needs to be done,
1703 // irrespective of whether it is being started or restarted
1704 context. ...
1705
1706 // Start the context
1707 context.start()
1708 context.awaitTermination()
1709 {% endhighlight %}
1710
1711 If the `checkpointDirectory` exists, then the context will be recreated from the checkpoint data.
1712 If the directory does not exist (i.e., running for the first time),
1713 then the function `functionToCreateContext` will be called to create a new
1714 context and set up the DStreams. See the Scala example
1715 [RecoverableNetworkWordCount]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala).
1716 This example appends the word counts of network data into a file.
1717
1718 </div>
1719 <div data-lang="java" markdown="1">
1720
1721 This behavior is made simple by using `JavaStreamingContext.getOrCreate`. This is used as follows.
1722
1723 {% highlight java %}
1724 // Create a factory object that can create and setup a new JavaStreamingContext
1725 JavaStreamingContextFactory contextFactory = new JavaStreamingContextFactory() {
1726 @Override public JavaStreamingContext create() {
1727 JavaStreamingContext jssc = new JavaStreamingContext(...); // new context
1728 JavaDStream<String> lines = jssc.socketTextStream(...); // create DStreams
1729 ...
1730 jssc.checkpoint(checkpointDirectory); // set checkpoint directory
1731 return jssc;
1732 }
1733 };
1734
1735 // Get JavaStreamingContext from checkpoint data or create a new one
1736 JavaStreamingContext context = JavaStreamingContext.getOrCreate(checkpointDirectory, contextFactory);
1737
1738 // Do additional setup on context that needs to be done,
1739 // irrespective of whether it is being started or restarted
1740 context. ...
1741
1742 // Start the context
1743 context.start();
1744 context.awaitTermination();
1745 {% endhighlight %}
1746
1747 If the `checkpointDirectory` exists, then the context will be recreated from the checkpoint data.
1748 If the directory does not exist (i.e., running for the first time),
1749 then the function `contextFactory` will be called to create a new
1750 context and set up the DStreams. See the Java example
1751 [JavaRecoverableNetworkWordCount]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java).
1752 This example appends the word counts of network data into a file.
1753
1754 </div>
1755 <div data-lang="python" markdown="1">
1756
1757 This behavior is made simple by using `StreamingContext.getOrCreate`. This is used as follows.
1758
1759 {% highlight python %}
1760 # Function to create and setup a new StreamingContext
1761 def functionToCreateContext():
1762 sc = SparkContext(...) # new context
1763 ssc = StreamingContext(...)
1764 lines = ssc.socketTextStream(...) # create DStreams
1765 ...
1766 ssc.checkpoint(checkpointDirectory) # set checkpoint directory
1767 return ssc
1768
1769 # Get StreamingContext from checkpoint data or create a new one
1770 context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)
1771
1772 # Do additional setup on context that needs to be done,
1773 # irrespective of whether it is being started or restarted
1774 context. ...
1775
1776 # Start the context
1777 context.start()
1778 context.awaitTermination()
1779 {% endhighlight %}
1780
1781 If the `checkpointDirectory` exists, then the context will be recreated from the checkpoint data.
1782 If the directory does not exist (i.e., running for the first time),
1783 then the function `functionToCreateContext` will be called to create a new
1784 context and set up the DStreams. See the Python example
1785 [recoverable_network_wordcount.py]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/python/streaming/recoverable_network_wordcount.py).
1786 This example appends the word counts of network data into a file.
1787
1788 You can also explicitly create a `StreamingContext` from the checkpoint data and start the
1789 computation by using `StreamingContext.getOrCreate(checkpointDirectory, None)`.
1790
1791 </div>
1792 </div>
1793
1794 In addition to using `getOrCreate` one also needs to ensure that the driver process gets
1795 restarted automatically on failure. This can only be done by the deployment infrastructure that is
1796 used to run the application. This is further discussed in the
1797 [Deployment](#deploying-applications) section.
1798
1799 Note that checkpointing of RDDs incurs the cost of saving to reliable storage.
1800 This may cause an increase in the processing time of those batches where RDDs get checkpointed.
1801 Hence, the interval of
1802 checkpointing needs to be set carefully. At small batch sizes (say 1 second), checkpointing every
1803 batch may significantly reduce operation throughput. Conversely, checkpointing too infrequently
1804 causes the lineage and task sizes to grow, which may have detrimental effects. For stateful
1805 transformations that require RDD checkpointing, the default interval is a multiple of the
1806 batch interval that is at least 10 seconds. It can be set by using
1807 `dstream.checkpoint(checkpointInterval)`. Typically, a checkpoint interval of 5 - 10 sliding intervals of a DStream is a good setting to try.
1808
1809 ***
1810
1811 ## Accumulators, Broadcast Variables, and Checkpoints
1812
1813 [Accumulators](rdd-programming-guide.html#accumulators) and [Broadcast variables](rdd-programming-guide.html#broadcast-variables)
1814 cannot be recovered from checkpoint in Spark Streaming. If you enable checkpointing and use
1815 [Accumulators](rdd-programming-guide.html#accumulators) or [Broadcast variables](rdd-programming-guide.html#broadcast-variables)
1816 as well, you'll have to create lazily instantiated singleton instances for
1817 [Accumulators](rdd-programming-guide.html#accumulators) and [Broadcast variables](rdd-programming-guide.html#broadcast-variables)
1818 so that they can be re-instantiated after the driver restarts on failure.
1819 This is shown in the following example.
1820
1821 <div class="codetabs">
1822 <div data-lang="scala" markdown="1">
1823 {% highlight scala %}
1824
1825 object WordBlacklist {
1826
1827 @volatile private var instance: Broadcast[Seq[String]] = null
1828
1829 def getInstance(sc: SparkContext): Broadcast[Seq[String]] = {
1830 if (instance == null) {
1831 synchronized {
1832 if (instance == null) {
1833 val wordBlacklist = Seq("a", "b", "c")
1834 instance = sc.broadcast(wordBlacklist)
1835 }
1836 }
1837 }
1838 instance
1839 }
1840 }
1841
1842 object DroppedWordsCounter {
1843
1844 @volatile private var instance: LongAccumulator = null
1845
1846 def getInstance(sc: SparkContext): LongAccumulator = {
1847 if (instance == null) {
1848 synchronized {
1849 if (instance == null) {
1850 instance = sc.longAccumulator("WordsInBlacklistCounter")
1851 }
1852 }
1853 }
1854 instance
1855 }
1856 }
1857
1858 wordCounts.foreachRDD { (rdd: RDD[(String, Int)], time: Time) =>
1859 // Get or register the blacklist Broadcast
1860 val blacklist = WordBlacklist.getInstance(rdd.sparkContext)
1861 // Get or register the droppedWordsCounter Accumulator
1862 val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext)
1863 // Use blacklist to drop words and use droppedWordsCounter to count them
1864 val counts = rdd.filter { case (word, count) =>
1865 if (blacklist.value.contains(word)) {
1866 droppedWordsCounter.add(count)
1867 false
1868 } else {
1869 true
1870 }
1871 }.collect().mkString("[", ", ", "]")
1872 val output = "Counts at time " + time + " " + counts
1873 })
1874
1875 {% endhighlight %}
1876
1877 See the full [source code]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala).
1878 </div>
1879 <div data-lang="java" markdown="1">
1880 {% highlight java %}
1881
1882 class JavaWordBlacklist {
1883
1884 private static volatile Broadcast<List<String>> instance = null;
1885
1886 public static Broadcast<List<String>> getInstance(JavaSparkContext jsc) {
1887 if (instance == null) {
1888 synchronized (JavaWordBlacklist.class) {
1889 if (instance == null) {
1890 List<String> wordBlacklist = Arrays.asList("a", "b", "c");
1891 instance = jsc.broadcast(wordBlacklist);
1892 }
1893 }
1894 }
1895 return instance;
1896 }
1897 }
1898
1899 class JavaDroppedWordsCounter {
1900
1901 private static volatile LongAccumulator instance = null;
1902
1903 public static LongAccumulator getInstance(JavaSparkContext jsc) {
1904 if (instance == null) {
1905 synchronized (JavaDroppedWordsCounter.class) {
1906 if (instance == null) {
1907 instance = jsc.sc().longAccumulator("WordsInBlacklistCounter");
1908 }
1909 }
1910 }
1911 return instance;
1912 }
1913 }
1914
1915 wordCounts.foreachRDD((rdd, time) -> {
1916 // Get or register the blacklist Broadcast
1917 Broadcast<List<String>> blacklist = JavaWordBlacklist.getInstance(new JavaSparkContext(rdd.context()));
1918 // Get or register the droppedWordsCounter Accumulator
1919 LongAccumulator droppedWordsCounter = JavaDroppedWordsCounter.getInstance(new JavaSparkContext(rdd.context()));
1920 // Use blacklist to drop words and use droppedWordsCounter to count them
1921 String counts = rdd.filter(wordCount -> {
1922 if (blacklist.value().contains(wordCount._1())) {
1923 droppedWordsCounter.add(wordCount._2());
1924 return false;
1925 } else {
1926 return true;
1927 }
1928 }).collect().toString();
1929 String output = "Counts at time " + time + " " + counts;
1930 }
1931
1932 {% endhighlight %}
1933
1934 See the full [source code]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java).
1935 </div>
1936 <div data-lang="python" markdown="1">
1937 {% highlight python %}
1938 def getWordBlacklist(sparkContext):
1939 if ("wordBlacklist" not in globals()):
1940 globals()["wordBlacklist"] = sparkContext.broadcast(["a", "b", "c"])
1941 return globals()["wordBlacklist"]
1942
1943 def getDroppedWordsCounter(sparkContext):
1944 if ("droppedWordsCounter" not in globals()):
1945 globals()["droppedWordsCounter"] = sparkContext.accumulator(0)
1946 return globals()["droppedWordsCounter"]
1947
1948 def echo(time, rdd):
1949 # Get or register the blacklist Broadcast
1950 blacklist = getWordBlacklist(rdd.context)
1951 # Get or register the droppedWordsCounter Accumulator
1952 droppedWordsCounter = getDroppedWordsCounter(rdd.context)
1953
1954 # Use blacklist to drop words and use droppedWordsCounter to count them
1955 def filterFunc(wordCount):
1956 if wordCount[0] in blacklist.value:
1957 droppedWordsCounter.add(wordCount[1])
1958 False
1959 else:
1960 True
1961
1962 counts = "Counts at time %s %s" % (time, rdd.filter(filterFunc).collect())
1963
1964 wordCounts.foreachRDD(echo)
1965
1966 {% endhighlight %}
1967
1968 See the full [source code]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/python/streaming/recoverable_network_wordcount.py).
1969
1970 </div>
1971 </div>
1972
1973 ***
1974
1975 ## Deploying Applications
1976 This section discusses the steps to deploy a Spark Streaming application.
1977
1978 ### Requirements
1979 {:.no_toc}
1980
1981 To run a Spark Streaming applications, you need to have the following.
1982
1983 - *Cluster with a cluster manager* - This is the general requirement of any Spark application,
1984 and discussed in detail in the [deployment guide](cluster-overview.html).
1985
1986 - *Package the application JAR* - You have to compile your streaming application into a JAR.
1987 If you are using [`spark-submit`](submitting-applications.html) to start the
1988 application, then you will not need to provide Spark and Spark Streaming in the JAR. However,
1989 if your application uses [advanced sources](#advanced-sources) (e.g. Kafka),
1990 then you will have to package the extra artifact they link to, along with their dependencies,
1991 in the JAR that is used to deploy the application. For example, an application using `KafkaUtils`
1992 will have to include `spark-streaming-kafka-0-10_{{site.SCALA_BINARY_VERSION}}` and all its
1993 transitive dependencies in the application JAR.
1994
1995 - *Configuring sufficient memory for the executors* - Since the received data must be stored in
1996 memory, the executors must be configured with sufficient memory to hold the received data. Note
1997 that if you are doing 10 minute window operations, the system has to keep at least last 10 minutes
1998 of data in memory. So the memory requirements for the application depends on the operations
1999 used in it.
2000
2001 - *Configuring checkpointing* - If the stream application requires it, then a directory in the
2002 Hadoop API compatible fault-tolerant storage (e.g. HDFS, S3, etc.) must be configured as the
2003 checkpoint directory and the streaming application written in a way that checkpoint
2004 information can be used for failure recovery. See the [checkpointing](#checkpointing) section
2005 for more details.
2006
2007 - *Configuring automatic restart of the application driver* - To automatically recover from a
2008 driver failure, the deployment infrastructure that is
2009 used to run the streaming application must monitor the driver process and relaunch the driver
2010 if it fails. Different [cluster managers](cluster-overview.html#cluster-manager-types)
2011 have different tools to achieve this.
2012 + *Spark Standalone* - A Spark application driver can be submitted to run within the Spark
2013 Standalone cluster (see
2014 [cluster deploy mode](spark-standalone.html#launching-spark-applications)), that is, the
2015 application driver itself runs on one of the worker nodes. Furthermore, the
2016 Standalone cluster manager can be instructed to *supervise* the driver,
2017 and relaunch it if the driver fails either due to non-zero exit code,
2018 or due to failure of the node running the driver. See *cluster mode* and *supervise* in the
2019 [Spark Standalone guide](spark-standalone.html) for more details.
2020 + *YARN* - Yarn supports a similar mechanism for automatically restarting an application.
2021 Please refer to YARN documentation for more details.
2022 + *Mesos* - [Marathon](https://github.com/mesosphere/marathon) has been used to achieve this
2023 with Mesos.
2024
2025 - *Configuring write-ahead logs* - Since Spark 1.2,
2026 we have introduced _write-ahead logs_ for achieving strong
2027 fault-tolerance guarantees. If enabled, all the data received from a receiver gets written into
2028 a write-ahead log in the configuration checkpoint directory. This prevents data loss on driver
2029 recovery, thus ensuring zero data loss (discussed in detail in the
2030 [Fault-tolerance Semantics](#fault-tolerance-semantics) section). This can be enabled by setting
2031 the [configuration parameter](configuration.html#spark-streaming)
2032 `spark.streaming.receiver.writeAheadLog.enable` to `true`. However, these stronger semantics may
2033 come at the cost of the receiving throughput of individual receivers. This can be corrected by
2034 running [more receivers in parallel](#level-of-parallelism-in-data-receiving)
2035 to increase aggregate throughput. Additionally, it is recommended that the replication of the
2036 received data within Spark be disabled when the write-ahead log is enabled as the log is already
2037 stored in a replicated storage system. This can be done by setting the storage level for the
2038 input stream to `StorageLevel.MEMORY_AND_DISK_SER`. While using S3 (or any file system that
2039 does not support flushing) for _write-ahead logs_, please remember to enable
2040 `spark.streaming.driver.writeAheadLog.closeFileAfterWrite` and
2041 `spark.streaming.receiver.writeAheadLog.closeFileAfterWrite`. See
2042 [Spark Streaming Configuration](configuration.html#spark-streaming) for more details.
2043 Note that Spark will not encrypt data written to the write-ahead log when I/O encryption is
2044 enabled. If encryption of the write-ahead log data is desired, it should be stored in a file
2045 system that supports encryption natively.
2046
2047 - *Setting the max receiving rate* - If the cluster resources is not large enough for the streaming
2048 application to process data as fast as it is being received, the receivers can be rate limited
2049 by setting a maximum rate limit in terms of records / sec.
2050 See the [configuration parameters](configuration.html#spark-streaming)
2051 `spark.streaming.receiver.maxRate` for receivers and `spark.streaming.kafka.maxRatePerPartition`
2052 for Direct Kafka approach. In Spark 1.5, we have introduced a feature called *backpressure* that
2053 eliminate the need to set this rate limit, as Spark Streaming automatically figures out the
2054 rate limits and dynamically adjusts them if the processing conditions change. This backpressure
2055 can be enabled by setting the [configuration parameter](configuration.html#spark-streaming)
2056 `spark.streaming.backpressure.enabled` to `true`.
2057
2058 ### Upgrading Application Code
2059 {:.no_toc}
2060
2061 If a running Spark Streaming application needs to be upgraded with new
2062 application code, then there are two possible mechanisms.
2063
2064 - The upgraded Spark Streaming application is started and run in parallel to the existing application.
2065 Once the new one (receiving the same data as the old one) has been warmed up and is ready
2066 for prime time, the old one be can be brought down. Note that this can be done for data sources that support
2067 sending the data to two destinations (i.e., the earlier and upgraded applications).
2068
2069 - The existing application is shutdown gracefully (see
2070 [`StreamingContext.stop(...)`](api/scala/org/apache/spark/streaming/StreamingContext.html)
2071 or [`JavaStreamingContext.stop(...)`](api/java/index.html?org/apache/spark/streaming/api/java/JavaStreamingContext.html)
2072 for graceful shutdown options) which ensure data that has been received is completely
2073 processed before shutdown. Then the
2074 upgraded application can be started, which will start processing from the same point where the earlier
2075 application left off. Note that this can be done only with input sources that support source-side buffering
2076 (like Kafka) as data needs to be buffered while the previous application was down and
2077 the upgraded application is not yet up. And restarting from earlier checkpoint
2078 information of pre-upgrade code cannot be done. The checkpoint information essentially
2079 contains serialized Scala/Java/Python objects and trying to deserialize objects with new,
2080 modified classes may lead to errors. In this case, either start the upgraded app with a different
2081 checkpoint directory, or delete the previous checkpoint directory.
2082
2083 ***
2084
2085 ## Monitoring Applications
2086 Beyond Spark's [monitoring capabilities](monitoring.html), there are additional capabilities
2087 specific to Spark Streaming. When a StreamingContext is used, the
2088 [Spark web UI](monitoring.html#web-interfaces) shows
2089 an additional `Streaming` tab which shows statistics about running receivers (whether
2090 receivers are active, number of records received, receiver error, etc.)
2091 and completed batches (batch processing times, queueing delays, etc.). This can be used to
2092 monitor the progress of the streaming application.
2093
2094 The following two metrics in web UI are particularly important:
2095
2096 - *Processing Time* - The time to process each batch of data.
2097 - *Scheduling Delay* - the time a batch waits in a queue for the processing of previous batches
2098 to finish.
2099
2100 If the batch processing time is consistently more than the batch interval and/or the queueing
2101 delay keeps increasing, then it indicates that the system is
2102 not able to process the batches as fast they are being generated and is falling behind.
2103 In that case, consider
2104 [reducing](#reducing-the-batch-processing-times) the batch processing time.
2105
2106 The progress of a Spark Streaming program can also be monitored using the
2107 [StreamingListener](api/scala/org/apache/spark/streaming/scheduler/StreamingListener.html) interface,
2108 which allows you to get receiver status and processing times. Note that this is a developer API
2109 and it is likely to be improved upon (i.e., more information reported) in the future.
2110
2111 ***************************************************************************************************
2112 ***************************************************************************************************
2113
2114 # Performance Tuning
2115 Getting the best performance out of a Spark Streaming application on a cluster requires a bit of
2116 tuning. This section explains a number of the parameters and configurations that can be tuned to
2117 improve the performance of you application. At a high level, you need to consider two things:
2118
2119 1. Reducing the processing time of each batch of data by efficiently using cluster resources.
2120
2121 2. Setting the right batch size such that the batches of data can be processed as fast as they
2122 are received (that is, data processing keeps up with the data ingestion).
2123
2124 ## Reducing the Batch Processing Times
2125 There are a number of optimizations that can be done in Spark to minimize the processing time of
2126 each batch. These have been discussed in detail in the [Tuning Guide](tuning.html). This section
2127 highlights some of the most important ones.
2128
2129 ### Level of Parallelism in Data Receiving
2130 {:.no_toc}
2131 Receiving data over the network (like Kafka, socket, etc.) requires the data to be deserialized
2132 and stored in Spark. If the data receiving becomes a bottleneck in the system, then consider
2133 parallelizing the data receiving. Note that each input DStream
2134 creates a single receiver (running on a worker machine) that receives a single stream of data.
2135 Receiving multiple data streams can therefore be achieved by creating multiple input DStreams
2136 and configuring them to receive different partitions of the data stream from the source(s).
2137 For example, a single Kafka input DStream receiving two topics of data can be split into two
2138 Kafka input streams, each receiving only one topic. This would run two receivers,
2139 allowing data to be received in parallel, thus increasing overall throughput. These multiple
2140 DStreams can be unioned together to create a single DStream. Then the transformations that were
2141 being applied on a single input DStream can be applied on the unified stream. This is done as follows.
2142
2143 <div class="codetabs">
2144 <div data-lang="scala" markdown="1">
2145 {% highlight scala %}
2146 val numStreams = 5
2147 val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...) }
2148 val unifiedStream = streamingContext.union(kafkaStreams)
2149 unifiedStream.print()
2150 {% endhighlight %}
2151 </div>
2152 <div data-lang="java" markdown="1">
2153 {% highlight java %}
2154 int numStreams = 5;
2155 List<JavaPairDStream<String, String>> kafkaStreams = new ArrayList<>(numStreams);
2156 for (int i = 0; i < numStreams; i++) {
2157 kafkaStreams.add(KafkaUtils.createStream(...));
2158 }
2159 JavaPairDStream<String, String> unifiedStream = streamingContext.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size()));
2160 unifiedStream.print();
2161 {% endhighlight %}
2162 </div>
2163 <div data-lang="python" markdown="1">
2164 {% highlight python %}
2165 numStreams = 5
2166 kafkaStreams = [KafkaUtils.createStream(...) for _ in range (numStreams)]
2167 unifiedStream = streamingContext.union(*kafkaStreams)
2168 unifiedStream.pprint()
2169 {% endhighlight %}
2170 </div>
2171 </div>
2172
2173 Another parameter that should be considered is the receiver's block interval,
2174 which is determined by the [configuration parameter](configuration.html#spark-streaming)
2175 `spark.streaming.blockInterval`. For most receivers, the received data is coalesced together into
2176 blocks of data before storing inside Spark's memory. The number of blocks in each batch
2177 determines the number of tasks that will be used to process
2178 the received data in a map-like transformation. The number of tasks per receiver per batch will be
2179 approximately (batch interval / block interval). For example, block interval of 200 ms will
2180 create 10 tasks per 2 second batches. If the number of tasks is too low (that is, less than the number
2181 of cores per machine), then it will be inefficient as all available cores will not be used to
2182 process the data. To increase the number of tasks for a given batch interval, reduce the
2183 block interval. However, the recommended minimum value of block interval is about 50 ms,
2184 below which the task launching overheads may be a problem.
2185
2186 An alternative to receiving data with multiple input streams / receivers is to explicitly repartition
2187 the input data stream (using `inputStream.repartition(<number of partitions>)`).
2188 This distributes the received batches of data across the specified number of machines in the cluster
2189 before further processing.
2190
2191 For direct stream, please refer to [Spark Streaming + Kafka Integration Guide](streaming-kafka-0-10-integration.html)
2192
2193 ### Level of Parallelism in Data Processing
2194 {:.no_toc}
2195 Cluster resources can be under-utilized if the number of parallel tasks used in any stage of the
2196 computation is not high enough. For example, for distributed reduce operations like `reduceByKey`
2197 and `reduceByKeyAndWindow`, the default number of parallel tasks is controlled by
2198 the `spark.default.parallelism` [configuration property](configuration.html#spark-properties). You
2199 can pass the level of parallelism as an argument (see
2200 [`PairDStreamFunctions`](api/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.html)
2201 documentation), or set the `spark.default.parallelism`
2202 [configuration property](configuration.html#spark-properties) to change the default.
2203
2204 ### Data Serialization
2205 {:.no_toc}
2206 The overheads of data serialization can be reduced by tuning the serialization formats. In the case of streaming, there are two types of data that are being serialized.
2207
2208 * **Input data**: By default, the input data received through Receivers is stored in the executors' memory with [StorageLevel.MEMORY_AND_DISK_SER_2](api/scala/org/apache/spark/storage/StorageLevel$.html). That is, the data is serialized into bytes to reduce GC overheads, and replicated for tolerating executor failures. Also, the data is kept first in memory, and spilled over to disk only if the memory is insufficient to hold all of the input data necessary for the streaming computation. This serialization obviously has overheads -- the receiver must deserialize the received data and re-serialize it using Spark's serialization format.
2209
2210 * **Persisted RDDs generated by Streaming Operations**: RDDs generated by streaming computations may be persisted in memory. For example, window operations persist data in memory as they would be processed multiple times. However, unlike the Spark Core default of [StorageLevel.MEMORY_ONLY](api/scala/org/apache/spark/storage/StorageLevel$.html), persisted RDDs generated by streaming computations are persisted with [StorageLevel.MEMORY_ONLY_SER](api/scala/org/apache/spark/storage/StorageLevel.html$) (i.e. serialized) by default to minimize GC overheads.
2211
2212 In both cases, using Kryo serialization can reduce both CPU and memory overheads. See the [Spark Tuning Guide](tuning.html#data-serialization) for more details. For Kryo, consider registering custom classes, and disabling object reference tracking (see Kryo-related configurations in the [Configuration Guide](configuration.html#compression-and-serialization)).
2213
2214 In specific cases where the amount of data that needs to be retained for the streaming application is not large, it may be feasible to persist data (both types) as deserialized objects without incurring excessive GC overheads. For example, if you are using batch intervals of a few seconds and no window operations, then you can try disabling serialization in persisted data by explicitly setting the storage level accordingly. This would reduce the CPU overheads due to serialization, potentially improving performance without too much GC overheads.
2215
2216 ### Task Launching Overheads
2217 {:.no_toc}
2218 If the number of tasks launched per second is high (say, 50 or more per second), then the overhead
2219 of sending out tasks to the slaves may be significant and will make it hard to achieve sub-second
2220 latencies. The overhead can be reduced by the following changes:
2221
2222 * **Execution mode**: Running Spark in Standalone mode or coarse-grained Mesos mode leads to
2223 better task launch times than the fine-grained Mesos mode. Please refer to the
2224 [Running on Mesos guide](running-on-mesos.html) for more details.
2225
2226 These changes may reduce batch processing time by 100s of milliseconds,
2227 thus allowing sub-second batch size to be viable.
2228
2229 ***
2230
2231 ## Setting the Right Batch Interval
2232 For a Spark Streaming application running on a cluster to be stable, the system should be able to
2233 process data as fast as it is being received. In other words, batches of data should be processed
2234 as fast as they are being generated. Whether this is true for an application can be found by
2235 [monitoring](#monitoring-applications) the processing times in the streaming web UI, where the batch
2236 processing time should be less than the batch interval.
2237
2238 Depending on the nature of the streaming
2239 computation, the batch interval used may have significant impact on the data rates that can be
2240 sustained by the application on a fixed set of cluster resources. For example, let us
2241 consider the earlier WordCountNetwork example. For a particular data rate, the system may be able
2242 to keep up with reporting word counts every 2 seconds (i.e., batch interval of 2 seconds), but not
2243 every 500 milliseconds. So the batch interval needs to be set such that the expected data rate in
2244 production can be sustained.
2245
2246 A good approach to figure out the right batch size for your application is to test it with a
2247 conservative batch interval (say, 5-10 seconds) and a low data rate. To verify whether the system
2248 is able to keep up with the data rate, you can check the value of the end-to-end delay experienced
2249 by each processed batch (either look for "Total delay" in Spark driver log4j logs, or use the
2250 [StreamingListener](api/scala/org/apache/spark/streaming/scheduler/StreamingListener.html)
2251 interface).
2252 If the delay is maintained to be comparable to the batch size, then system is stable. Otherwise,
2253 if the delay is continuously increasing, it means that the system is unable to keep up and it
2254 therefore unstable. Once you have an idea of a stable configuration, you can try increasing the
2255 data rate and/or reducing the batch size. Note that a momentary increase in the delay due to
2256 temporary data rate increases may be fine as long as the delay reduces back to a low value
2257 (i.e., less than batch size).
2258
2259 ***
2260
2261 ## Memory Tuning
2262 Tuning the memory usage and GC behavior of Spark applications has been discussed in great detail
2263 in the [Tuning Guide](tuning.html#memory-tuning). It is strongly recommended that you read that. In this section, we discuss a few tuning parameters specifically in the context of Spark Streaming applications.
2264
2265 The amount of cluster memory required by a Spark Streaming application depends heavily on the type of transformations used. For example, if you want to use a window operation on the last 10 minutes of data, then your cluster should have sufficient memory to hold 10 minutes worth of data in memory. Or if you want to use `updateStateByKey` with a large number of keys, then the necessary memory will be high. On the contrary, if you want to do a simple map-filter-store operation, then the necessary memory will be low.
2266
2267 In general, since the data received through receivers is stored with StorageLevel.MEMORY_AND_DISK_SER_2, the data that does not fit in memory will spill over to the disk. This may reduce the performance of the streaming application, and hence it is advised to provide sufficient memory as required by your streaming application. Its best to try and see the memory usage on a small scale and estimate accordingly.
2268
2269 Another aspect of memory tuning is garbage collection. For a streaming application that requires low latency, it is undesirable to have large pauses caused by JVM Garbage Collection.
2270
2271 There are a few parameters that can help you tune the memory usage and GC overheads:
2272
2273 * **Persistence Level of DStreams**: As mentioned earlier in the [Data Serialization](#data-serialization) section, the input data and RDDs are by default persisted as serialized bytes. This reduces both the memory usage and GC overheads, compared to deserialized persistence. Enabling Kryo serialization further reduces serialized sizes and memory usage. Further reduction in memory usage can be achieved with compression (see the Spark configuration `spark.rdd.compress`), at the cost of CPU time.
2274
2275 * **Clearing old data**: By default, all input data and persisted RDDs generated by DStream transformations are automatically cleared. Spark Streaming decides when to clear the data based on the transformations that are used. For example, if you are using a window operation of 10 minutes, then Spark Streaming will keep around the last 10 minutes of data, and actively throw away older data.
2276 Data can be retained for a longer duration (e.g. interactively querying older data) by setting `streamingContext.remember`.
2277
2278 * **CMS Garbage Collector**: Use of the concurrent mark-and-sweep GC is strongly recommended for keeping GC-related pauses consistently low. Even though concurrent GC is known to reduce the
2279 overall processing throughput of the system, its use is still recommended to achieve more
2280 consistent batch processing times. Make sure you set the CMS GC on both the driver (using `--driver-java-options` in `spark-submit`) and the executors (using [Spark configuration](configuration.html#runtime-environment) `spark.executor.extraJavaOptions`).
2281
2282 * **Other tips**: To further reduce GC overheads, here are some more tips to try.
2283 - Persist RDDs using the `OFF_HEAP` storage level. See more detail in the [Spark Programming Guide](rdd-programming-guide.html#rdd-persistence).
2284 - Use more executors with smaller heap sizes. This will reduce the GC pressure within each JVM heap.
2285
2286 ***
2287
2288 ##### Important points to remember:
2289 {:.no_toc}
2290 - A DStream is associated with a single receiver. For attaining read parallelism multiple receivers i.e. multiple DStreams need to be created. A receiver is run within an executor. It occupies one core. Ensure that there are enough cores for processing after receiver slots are booked i.e. `spark.cores.max` should take the receiver slots into account. The receivers are allocated to executors in a round robin fashion.
2291
2292 - When data is received from a stream source, receiver creates blocks of data. A new block of data is generated every blockInterval milliseconds. N blocks of data are created during the batchInterval where N = batchInterval/blockInterval. These blocks are distributed by the BlockManager of the current executor to the block managers of other executors. After that, the Network Input Tracker running on the driver is informed about the block locations for further processing.
2293
2294 - An RDD is created on the driver for the blocks created during the batchInterval. The blocks generated during the batchInterval are partitions of the RDD. Each partition is a task in spark. blockInterval== batchinterval would mean that a single partition is created and probably it is processed locally.
2295
2296 - The map tasks on the blocks are processed in the executors (one that received the block, and another where the block was replicated) that has the blocks irrespective of block interval, unless non-local scheduling kicks in.
2297 Having bigger blockinterval means bigger blocks. A high value of `spark.locality.wait` increases the chance of processing a block on the local node. A balance needs to be found out between these two parameters to ensure that the bigger blocks are processed locally.
2298
2299 - Instead of relying on batchInterval and blockInterval, you can define the number of partitions by calling `inputDstream.repartition(n)`. This reshuffles the data in RDD randomly to create n number of partitions. Yes, for greater parallelism. Though comes at the cost of a shuffle. An RDD's processing is scheduled by driver's jobscheduler as a job. At a given point of time only one job is active. So, if one job is executing the other jobs are queued.
2300
2301 - If you have two dstreams there will be two RDDs formed and there will be two jobs created which will be scheduled one after the another. To avoid this, you can union two dstreams. This will ensure that a single unionRDD is formed for the two RDDs of the dstreams. This unionRDD is then considered as a single job. However, the partitioning of the RDDs is not impacted.
2302
2303 - If the batch processing time is more than batchinterval then obviously the receiver's memory will start filling up and will end up in throwing exceptions (most probably BlockNotFoundException). Currently, there is no way to pause the receiver. Using SparkConf configuration `spark.streaming.receiver.maxRate`, rate of receiver can be limited.
2304
2305
2306 ***************************************************************************************************
2307 ***************************************************************************************************
2308
2309 # Fault-tolerance Semantics
2310 In this section, we will discuss the behavior of Spark Streaming applications in the event
2311 of failures.
2312
2313 ## Background
2314 {:.no_toc}
2315 To understand the semantics provided by Spark Streaming, let us remember the basic fault-tolerance semantics of Spark's RDDs.
2316
2317 1. An RDD is an immutable, deterministically re-computable, distributed dataset. Each RDD
2318 remembers the lineage of deterministic operations that were used on a fault-tolerant input
2319 dataset to create it.
2320 1. If any partition of an RDD is lost due to a worker node failure, then that partition can be
2321 re-computed from the original fault-tolerant dataset using the lineage of operations.
2322 1. Assuming that all of the RDD transformations are deterministic, the data in the final transformed
2323 RDD will always be the same irrespective of failures in the Spark cluster.
2324
2325 Spark operates on data in fault-tolerant file systems like HDFS or S3. Hence,
2326 all of the RDDs generated from the fault-tolerant data are also fault-tolerant. However, this is not
2327 the case for Spark Streaming as the data in most cases is received over the network (except when
2328 `fileStream` is used). To achieve the same fault-tolerance properties for all of the generated RDDs,
2329 the received data is replicated among multiple Spark executors in worker nodes in the cluster
2330 (default replication factor is 2). This leads to two kinds of data in the
2331 system that need to recovered in the event of failures:
2332
2333 1. *Data received and replicated* - This data survives failure of a single worker node as a copy
2334 of it exists on one of the other nodes.
2335 1. *Data received but buffered for replication* - Since this is not replicated,
2336 the only way to recover this data is to get it again from the source.
2337
2338 Furthermore, there are two kinds of failures that we should be concerned about:
2339
2340 1. *Failure of a Worker Node* - Any of the worker nodes running executors can fail,
2341 and all in-memory data on those nodes will be lost. If any receivers were running on failed
2342 nodes, then their buffered data will be lost.
2343 1. *Failure of the Driver Node* - If the driver node running the Spark Streaming application
2344 fails, then obviously the SparkContext is lost, and all executors with their in-memory
2345 data are lost.
2346
2347 With this basic knowledge, let us understand the fault-tolerance semantics of Spark Streaming.
2348
2349 ## Definitions
2350 {:.no_toc}
2351 The semantics of streaming systems are often captured in terms of how many times each record can be processed by the system. There are three types of guarantees that a system can provide under all possible operating conditions (despite failures, etc.)
2352
2353 1. *At most once*: Each record will be either processed once or not processed at all.
2354 2. *At least once*: Each record will be processed one or more times. This is stronger than *at-most once* as it ensure that no data will be lost. But there may be duplicates.
2355 3. *Exactly once*: Each record will be processed exactly once - no data will be lost and no data will be processed multiple times. This is obviously the strongest guarantee of the three.
2356
2357 ## Basic Semantics
2358 {:.no_toc}
2359 In any stream processing system, broadly speaking, there are three steps in processing the data.
2360
2361 1. *Receiving the data*: The data is received from sources using Receivers or otherwise.
2362
2363 1. *Transforming the data*: The received data is transformed using DStream and RDD transformations.
2364
2365 1. *Pushing out the data*: The final transformed data is pushed out to external systems like file systems, databases, dashboards, etc.
2366
2367 If a streaming application has to achieve end-to-end exactly-once guarantees, then each step has to provide an exactly-once guarantee. That is, each record must be received exactly once, transformed exactly once, and pushed to downstream systems exactly once. Let's understand the semantics of these steps in the context of Spark Streaming.
2368
2369 1. *Receiving the data*: Different input sources provide different guarantees. This is discussed in detail in the next subsection.
2370
2371 1. *Transforming the data*: All data that has been received will be processed _exactly once_, thanks to the guarantees that RDDs provide. Even if there are failures, as long as the received input data is accessible, the final transformed RDDs will always have the same contents.
2372
2373 1. *Pushing out the data*: Output operations by default ensure _at-least once_ semantics because it depends on the type of output operation (idempotent, or not) and the semantics of the downstream system (supports transactions or not). But users can implement their own transaction mechanisms to achieve _exactly-once_ semantics. This is discussed in more details later in the section.
2374
2375 ## Semantics of Received Data
2376 {:.no_toc}
2377 Different input sources provide different guarantees, ranging from _at-least once_ to _exactly once_. Read for more details.
2378
2379 ### With Files
2380 {:.no_toc}
2381 If all of the input data is already present in a fault-tolerant file system like
2382 HDFS, Spark Streaming can always recover from any failure and process all of the data. This gives
2383 *exactly-once* semantics, meaning all of the data will be processed exactly once no matter what fails.
2384
2385 ### With Receiver-based Sources
2386 {:.no_toc}
2387 For input sources based on receivers, the fault-tolerance semantics depend on both the failure
2388 scenario and the type of receiver.
2389 As we discussed [earlier](#receiver-reliability), there are two types of receivers:
2390
2391 1. *Reliable Receiver* - These receivers acknowledge reliable sources only after ensuring that
2392 the received data has been replicated. If such a receiver fails, the source will not receive
2393 acknowledgment for the buffered (unreplicated) data. Therefore, if the receiver is
2394 restarted, the source will resend the data, and no data will be lost due to the failure.
2395 1. *Unreliable Receiver* - Such receivers do *not* send acknowledgment and therefore *can* lose
2396 data when they fail due to worker or driver failures.
2397
2398 Depending on what type of receivers are used we achieve the following semantics.
2399 If a worker node fails, then there is no data loss with reliable receivers. With unreliable
2400 receivers, data received but not replicated can get lost. If the driver node fails,
2401 then besides these losses, all of the past data that was received and replicated in memory will be
2402 lost. This will affect the results of the stateful transformations.
2403
2404 To avoid this loss of past received data, Spark 1.2 introduced _write
2405 ahead logs_ which save the received data to fault-tolerant storage. With the [write-ahead logs
2406 enabled](#deploying-applications) and reliable receivers, there is zero data loss. In terms of semantics, it provides an at-least once guarantee.
2407
2408 The following table summarizes the semantics under failures:
2409
2410 <table class="table">
2411 <tr>
2412 <th style="width:30%">Deployment Scenario</th>
2413 <th>Worker Failure</th>
2414 <th>Driver Failure</th>
2415 </tr>
2416 <tr>
2417 <td>
2418 <i>Spark 1.1 or earlier,</i> OR<br/>
2419 <i>Spark 1.2 or later without write-ahead logs</i>
2420 </td>
2421 <td>
2422 Buffered data lost with unreliable receivers<br/>
2423 Zero data loss with reliable receivers<br/>
2424 At-least once semantics
2425 </td>
2426 <td>
2427 Buffered data lost with unreliable receivers<br/>
2428 Past data lost with all receivers<br/>
2429 Undefined semantics
2430 </td>
2431 </tr>
2432 <tr>
2433 <td><i>Spark 1.2 or later with write-ahead logs</i></td>
2434 <td>
2435 Zero data loss with reliable receivers<br/>
2436 At-least once semantics
2437 </td>
2438 <td>
2439 Zero data loss with reliable receivers and files<br/>
2440 At-least once semantics
2441 </td>
2442 </tr>
2443 <tr>
2444 <td></td>
2445 <td></td>
2446 <td></td>
2447 </tr>
2448 </table>
2449
2450 ### With Kafka Direct API
2451 {:.no_toc}
2452 In Spark 1.3, we have introduced a new Kafka Direct API, which can ensure that all the Kafka data is received by Spark Streaming exactly once. Along with this, if you implement exactly-once output operation, you can achieve end-to-end exactly-once guarantees. This approach is further discussed in the [Kafka Integration Guide](streaming-kafka-0-10-integration.html).
2453
2454 ## Semantics of output operations
2455 {:.no_toc}
2456 Output operations (like `foreachRDD`) have _at-least once_ semantics, that is,
2457 the transformed data may get written to an external entity more than once in
2458 the event of a worker failure. While this is acceptable for saving to file systems using the
2459 `saveAs***Files` operations (as the file will simply get overwritten with the same data),
2460 additional effort may be necessary to achieve exactly-once semantics. There are two approaches.
2461
2462 - *Idempotent updates*: Multiple attempts always write the same data. For example, `saveAs***Files` always writes the same data to the generated files.
2463
2464 - *Transactional updates*: All updates are made transactionally so that updates are made exactly once atomically. One way to do this would be the following.
2465
2466 - Use the batch time (available in `foreachRDD`) and the partition index of the RDD to create an identifier. This identifier uniquely identifies a blob data in the streaming application.
2467 - Update external system with this blob transactionally (that is, exactly once, atomically) using the identifier. That is, if the identifier is not already committed, commit the partition data and the identifier atomically. Else, if this was already committed, skip the update.
2468
2469 dstream.foreachRDD { (rdd, time) =>
2470 rdd.foreachPartition { partitionIterator =>
2471 val partitionId = TaskContext.get.partitionId()
2472 val uniqueId = generateUniqueId(time.milliseconds, partitionId)
2473 // use this uniqueId to transactionally commit the data in partitionIterator
2474 }
2475 }
2476
2477 ***************************************************************************************************
2478 ***************************************************************************************************
2479
2480 # Where to Go from Here
2481 * Additional guides
2482 - [Kafka Integration Guide](streaming-kafka-0-10-integration.html)
2483 - [Kinesis Integration Guide](streaming-kinesis-integration.html)
2484 - [Custom Receiver Guide](streaming-custom-receivers.html)
2485 * Third-party DStream data sources can be found in [Third Party Projects](https://spark.apache.org/third-party-projects.html)
2486 * API documentation
2487 - Scala docs
2488 * [StreamingContext](api/scala/org/apache/spark/streaming/StreamingContext.html) and
2489 [DStream](api/scala/org/apache/spark/streaming/dstream/DStream.html)
2490 * [KafkaUtils](api/scala/org/apache/spark/streaming/kafka/KafkaUtils$.html),
2491 [KinesisUtils](api/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.html),
2492 - Java docs
2493 * [JavaStreamingContext](api/java/index.html?org/apache/spark/streaming/api/java/JavaStreamingContext.html),
2494 [JavaDStream](api/java/index.html?org/apache/spark/streaming/api/java/JavaDStream.html) and
2495 [JavaPairDStream](api/java/index.html?org/apache/spark/streaming/api/java/JavaPairDStream.html)
2496 * [KafkaUtils](api/java/index.html?org/apache/spark/streaming/kafka/KafkaUtils.html),
2497 [KinesisUtils](api/java/index.html?org/apache/spark/streaming/kinesis/KinesisInputDStream.html)
2498 - Python docs
2499 * [StreamingContext](api/python/pyspark.streaming.html#pyspark.streaming.StreamingContext) and [DStream](api/python/pyspark.streaming.html#pyspark.streaming.DStream)
2500 * [KafkaUtils](api/python/pyspark.streaming.html#pyspark.streaming.kafka.KafkaUtils)
2501
2502 * More examples in [Scala]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/examples/streaming)
2503 and [Java]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/java/org/apache/spark/examples/streaming)
2504 and [Python]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/python/streaming)
2505 * [Paper](http://www.eecs.berkeley.edu/Pubs/TechRpts/2012/EECS-2012-259.pdf) and [video](http://youtu.be/g171ndOHgJ0) describing Spark Streaming.