Back to home page

OSCL-LXR

 
 

    


0001 ---
0002 layout: global
0003 title: Spark Streaming + Kinesis Integration
0004 license: |
0005   Licensed to the Apache Software Foundation (ASF) under one or more
0006   contributor license agreements.  See the NOTICE file distributed with
0007   this work for additional information regarding copyright ownership.
0008   The ASF licenses this file to You under the Apache License, Version 2.0
0009   (the "License"); you may not use this file except in compliance with
0010   the License.  You may obtain a copy of the License at
0011  
0012      http://www.apache.org/licenses/LICENSE-2.0
0013  
0014   Unless required by applicable law or agreed to in writing, software
0015   distributed under the License is distributed on an "AS IS" BASIS,
0016   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0017   See the License for the specific language governing permissions and
0018   limitations under the License.
0019 ---
0020 [Amazon Kinesis](http://aws.amazon.com/kinesis/) is a fully managed service for real-time processing of streaming data at massive scale.
0021 The Kinesis receiver creates an input DStream using the Kinesis Client Library (KCL) provided by Amazon under the Amazon Software License (ASL).
0022 The KCL builds on top of the Apache 2.0 licensed AWS Java SDK and provides load-balancing, fault-tolerance, checkpointing through the concepts of Workers, Checkpoints, and Shard Leases.
0023 Here we explain how to configure Spark Streaming to receive data from Kinesis.
0024 
0025 #### Configuring Kinesis
0026 
0027 A Kinesis stream can be set up at one of the valid Kinesis endpoints with 1 or more shards per the following
0028 [guide](http://docs.aws.amazon.com/kinesis/latest/dev/step-one-create-stream.html).
0029 
0030 
0031 #### Configuring Spark Streaming Application
0032 
0033 1. **Linking:** For Scala/Java applications using SBT/Maven project definitions, link your streaming application against the following artifact (see [Linking section](streaming-programming-guide.html#linking) in the main programming guide for further information).
0034 
0035                 groupId = org.apache.spark
0036                 artifactId = spark-streaming-kinesis-asl_{{site.SCALA_BINARY_VERSION}}
0037                 version = {{site.SPARK_VERSION_SHORT}}
0038 
0039         For Python applications, you will have to add this above library and its dependencies when deploying your application. See the *Deploying* subsection below.
0040         **Note that by linking to this library, you will include [ASL](https://aws.amazon.com/asl/)-licensed code in your application.**
0041 
0042 2. **Programming:** In the streaming application code, import `KinesisInputDStream` and create the input DStream of byte array as follows:
0043 
0044         <div class="codetabs">
0045         <div data-lang="scala" markdown="1">
0046             import org.apache.spark.storage.StorageLevel
0047             import org.apache.spark.streaming.kinesis.KinesisInputDStream
0048             import org.apache.spark.streaming.{Seconds, StreamingContext}
0049             import org.apache.spark.streaming.kinesis.KinesisInitialPositions
0050 
0051             val kinesisStream = KinesisInputDStream.builder
0052                 .streamingContext(streamingContext)
0053                 .endpointUrl([endpoint URL])
0054                 .regionName([region name])
0055                 .streamName([streamName])
0056                 .initialPosition([initial position])
0057                 .checkpointAppName([Kinesis app name])
0058                 .checkpointInterval([checkpoint interval])
0059                 .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
0060                 .build()
0061 
0062         See the [API docs](api/scala/org/apache/spark/streaming/kinesis/KinesisInputDStream.html)
0063         and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/external/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala). Refer to the [Running the Example](#running-the-example) subsection for instructions on how to run the example.
0064 
0065         </div>
0066         <div data-lang="java" markdown="1">
0067             import org.apache.spark.storage.StorageLevel;
0068             import org.apache.spark.streaming.kinesis.KinesisInputDStream;
0069             import org.apache.spark.streaming.Seconds;
0070             import org.apache.spark.streaming.StreamingContext;
0071             import org.apache.spark.streaming.kinesis.KinesisInitialPositions;
0072 
0073             KinesisInputDStream<byte[]> kinesisStream = KinesisInputDStream.builder()
0074                 .streamingContext(streamingContext)
0075                 .endpointUrl([endpoint URL])
0076                 .regionName([region name])
0077                 .streamName([streamName])
0078                 .initialPosition([initial position])
0079                 .checkpointAppName([Kinesis app name])
0080                 .checkpointInterval([checkpoint interval])
0081                 .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
0082                 .build();
0083 
0084         See the [API docs](api/java/index.html?org/apache/spark/streaming/kinesis/KinesisInputDStream.html)
0085         and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/external/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java). Refer to the [Running the Example](#running-the-example) subsection for instructions to run the example.
0086 
0087         </div>
0088         <div data-lang="python" markdown="1">
0089             from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream
0090 
0091             kinesisStream = KinesisUtils.createStream(
0092                 streamingContext, [Kinesis app name], [Kinesis stream name], [endpoint URL],
0093                 [region name], [initial position], [checkpoint interval], StorageLevel.MEMORY_AND_DISK_2)
0094 
0095         See the [API docs](api/python/pyspark.streaming.html#pyspark.streaming.kinesis.KinesisUtils)
0096         and the [example]({{site.SPARK_GITHUB_URL}}/tree/master/external/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py). Refer to the [Running the Example](#running-the-example) subsection for instructions to run the example.
0097 
0098         </div>
0099         </div>
0100 
0101         You may also provide the following settings. These are currently only supported in Scala and Java.
0102 
0103         - A "message handler function" that takes a Kinesis `Record` and returns a generic object `T`, in case you would like to use other data included in a `Record` such as partition key.
0104 
0105         - CloudWatch metrics level and dimensions. See [the AWS documentation about monitoring KCL](https://docs.aws.amazon.com/streams/latest/dev/monitoring-with-kcl.html) for details.
0106 
0107         <div class="codetabs">
0108         <div data-lang="scala" markdown="1">
0109                 import collection.JavaConverters._
0110                 import org.apache.spark.storage.StorageLevel
0111                 import org.apache.spark.streaming.kinesis.KinesisInputDStream
0112                 import org.apache.spark.streaming.{Seconds, StreamingContext}
0113                 import org.apache.spark.streaming.kinesis.KinesisInitialPositions
0114                 import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration
0115                 import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel
0116 
0117                 val kinesisStream = KinesisInputDStream.builder
0118                     .streamingContext(streamingContext)
0119                     .endpointUrl([endpoint URL])
0120                     .regionName([region name])
0121                     .streamName([streamName])
0122                     .initialPosition([initial position])
0123                     .checkpointAppName([Kinesis app name])
0124                     .checkpointInterval([checkpoint interval])
0125                     .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
0126                     .metricsLevel(MetricsLevel.DETAILED)
0127                     .metricsEnabledDimensions(KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSet)
0128                     .buildWithMessageHandler([message handler])
0129 
0130         </div>
0131         <div data-lang="java" markdown="1">
0132                 import org.apache.spark.storage.StorageLevel;
0133                 import org.apache.spark.streaming.kinesis.KinesisInputDStream;
0134                 import org.apache.spark.streaming.Seconds;
0135                 import org.apache.spark.streaming.StreamingContext;
0136                 import org.apache.spark.streaming.kinesis.KinesisInitialPositions;
0137                 import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
0138                 import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
0139                 import scala.collection.JavaConverters;
0140 
0141                 KinesisInputDStream<byte[]> kinesisStream = KinesisInputDStream.builder()
0142                     .streamingContext(streamingContext)
0143                     .endpointUrl([endpoint URL])
0144                     .regionName([region name])
0145                     .streamName([streamName])
0146                     .initialPosition([initial position])
0147                     .checkpointAppName([Kinesis app name])
0148                     .checkpointInterval([checkpoint interval])
0149                     .storageLevel(StorageLevel.MEMORY_AND_DISK_2)
0150                     .metricsLevel(MetricsLevel.DETAILED)
0151                     .metricsEnabledDimensions(JavaConverters.asScalaSetConverter(KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS).asScala().toSet())
0152                     .buildWithMessageHandler([message handler]);
0153 
0154         </div>
0155         </div>
0156 
0157         - `streamingContext`: StreamingContext containing an application name used by Kinesis to tie this Kinesis application to the Kinesis stream
0158 
0159         - `[Kinesis app name]`: The application name that will be used to checkpoint the Kinesis
0160                 sequence numbers in DynamoDB table.
0161                 - The application name must be unique for a given account and region.
0162                 - If the table exists but has incorrect checkpoint information (for a different stream, or
0163                         old expired sequenced numbers), then there may be temporary errors.
0164 
0165         - `[Kinesis stream name]`: The Kinesis stream that this streaming application will pull data from.
0166 
0167         - `[endpoint URL]`: Valid Kinesis endpoints URL can be found [here](http://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region).
0168 
0169         - `[region name]`: Valid Kinesis region names can be found [here](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-regions-availability-zones.html).
0170 
0171         - `[checkpoint interval]`: The interval (e.g., Duration(2000) = 2 seconds) at which the Kinesis Client Library saves its position in the stream.  For starters, set it to the same as the batch interval of the streaming application.
0172 
0173         - `[initial position]`: Can be either `KinesisInitialPositions.TrimHorizon` or `KinesisInitialPositions.Latest` or `KinesisInitialPositions.AtTimestamp` (see [`Kinesis Checkpointing`](#kinesis-checkpointing) section and [`Amazon Kinesis API documentation`](http://docs.aws.amazon.com/streams/latest/dev/developing-consumers-with-sdk.html) for more details).
0174 
0175         - `[message handler]`: A function that takes a Kinesis `Record` and outputs generic `T`.
0176 
0177         In other versions of the API, you can also specify the AWS access key and secret key directly.
0178 
0179 3. **Deploying:** As with any Spark applications, `spark-submit` is used to launch your application. However, the details are slightly different for Scala/Java applications and Python applications.
0180 
0181         For Scala and Java applications, if you are using SBT or Maven for project management, then package `spark-streaming-kinesis-asl_{{site.SCALA_BINARY_VERSION}}` and its dependencies into the application JAR. Make sure `spark-core_{{site.SCALA_BINARY_VERSION}}` and `spark-streaming_{{site.SCALA_BINARY_VERSION}}` are marked as `provided` dependencies as those are already present in a Spark installation. Then use `spark-submit` to launch your application (see [Deploying section](streaming-programming-guide.html#deploying-applications) in the main programming guide).
0182 
0183         For Python applications which lack SBT/Maven project management, `spark-streaming-kinesis-asl_{{site.SCALA_BINARY_VERSION}}` and its dependencies can be directly added to `spark-submit` using `--packages` (see [Application Submission Guide](submitting-applications.html)). That is,
0184 
0185             ./bin/spark-submit --packages org.apache.spark:spark-streaming-kinesis-asl_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION_SHORT}} ...
0186 
0187         Alternatively, you can also download the JAR of the Maven artifact `spark-streaming-kinesis-asl-assembly` from the
0188         [Maven repository](http://search.maven.org/#search|ga|1|a%3A%22spark-streaming-kinesis-asl-assembly_{{site.SCALA_BINARY_VERSION}}%22%20AND%20v%3A%22{{site.SPARK_VERSION_SHORT}}%22) and add it to `spark-submit` with `--jars`.
0189 
0190         <p style="text-align: center;">
0191                 <img src="img/streaming-kinesis-arch.png"
0192                 title="Spark Streaming Kinesis Architecture"
0193                 alt="Spark Streaming Kinesis Architecture"
0194                width="60%"
0195         />
0196                 <!-- Images are downsized intentionally to improve quality on retina displays -->
0197         </p>
0198 
0199         *Points to remember at runtime:*
0200 
0201         - Kinesis data processing is ordered per partition and occurs at-least once per message.
0202 
0203         - Multiple applications can read from the same Kinesis stream.  Kinesis will maintain the application-specific shard and checkpoint info in DynamoDB.
0204 
0205         - A single Kinesis stream shard is processed by one input DStream at a time.
0206 
0207         - A single Kinesis input DStream can read from multiple shards of a Kinesis stream by creating multiple KinesisRecordProcessor threads.
0208 
0209         - Multiple input DStreams running in separate processes/instances can read from a Kinesis stream.
0210 
0211         - You never need more Kinesis input DStreams than the number of Kinesis stream shards as each input DStream will create at least one KinesisRecordProcessor thread that handles a single shard.
0212 
0213         - Horizontal scaling is achieved by adding/removing  Kinesis input DStreams (within a single process or across multiple processes/instances) - up to the total number of Kinesis stream shards per the previous point.
0214 
0215         - The Kinesis input DStream will balance the load between all DStreams - even across processes/instances.
0216 
0217         - The Kinesis input DStream will balance the load during re-shard events (merging and splitting) due to changes in load.
0218 
0219         - As a best practice, it's recommended that you avoid re-shard jitter by over-provisioning when possible.
0220 
0221         - Each Kinesis input DStream maintains its own checkpoint info.  See the Kinesis Checkpointing section for more details.
0222 
0223         - There is no correlation between the number of Kinesis stream shards and the number of RDD partitions/shards created across the Spark cluster during input DStream processing.  These are 2 independent partitioning schemes.
0224 
0225 #### Running the Example
0226 To run the example,
0227 
0228 - Download a Spark binary from the [download site](https://spark.apache.org/downloads.html).
0229 
0230 - Set up Kinesis stream (see earlier section) within AWS. Note the name of the Kinesis stream and the endpoint URL corresponding to the region where the stream was created.
0231 
0232 - Set up the environment variables `AWS_ACCESS_KEY_ID` and `AWS_SECRET_KEY` with your AWS credentials.
0233 
0234 - In the Spark root directory, run the example as
0235 
0236         <div class="codetabs">
0237         <div data-lang="scala" markdown="1">
0238 
0239         ./bin/run-example --packages org.apache.spark:spark-streaming-kinesis-asl_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION_SHORT}} streaming.KinesisWordCountASL [Kinesis app name] [Kinesis stream name] [endpoint URL]
0240 
0241         </div>
0242         <div data-lang="java" markdown="1">
0243 
0244         ./bin/run-example --packages org.apache.spark:spark-streaming-kinesis-asl_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION_SHORT}} streaming.JavaKinesisWordCountASL [Kinesis app name] [Kinesis stream name] [endpoint URL]
0245 
0246         </div>
0247         <div data-lang="python" markdown="1">
0248 
0249         ./bin/spark-submit --jars 'external/kinesis-asl-assembly/target/spark-streaming-kinesis-asl-assembly_*.jar' \
0250             external/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py \
0251             [Kinesis app name] [Kinesis stream name] [endpoint URL] [region name]
0252 
0253         </div>
0254         </div>
0255 
0256     This will wait for data to be received from the Kinesis stream.
0257 
0258 - To generate random string data to put onto the Kinesis stream, in another terminal, run the associated Kinesis data producer.
0259 
0260                 ./bin/run-example streaming.KinesisWordProducerASL [Kinesis stream name] [endpoint URL] 1000 10
0261 
0262         This will push 1000 lines per second of 10 random numbers per line to the Kinesis stream.  This data should then be received and processed by the running example.
0263 
0264 #### Record De-aggregation
0265 
0266 When data is generated using the [Kinesis Producer Library (KPL)](http://docs.aws.amazon.com/kinesis/latest/dev/developing-producers-with-kpl.html), messages may be aggregated for cost savings. Spark Streaming will automatically
0267 de-aggregate records during consumption.
0268 
0269 #### Kinesis Checkpointing
0270 - Each Kinesis input DStream periodically stores the current position of the stream in the backing DynamoDB table.  This allows the system to recover from failures and continue processing where the DStream left off.
0271 
0272 - Checkpointing too frequently will cause excess load on the AWS checkpoint storage layer and may lead to AWS throttling.  The provided example handles this throttling with a random-backoff-retry strategy.
0273 
0274 - If no Kinesis checkpoint info exists when the input DStream starts, it will start either from the oldest record available (`KinesisInitialPositions.TrimHorizon`), or from the latest tip (`KinesisInitialPositions.Latest`), or (except Python) from the position denoted by the provided UTC timestamp (`KinesisInitialPositions.AtTimestamp(Date timestamp)`).  This is configurable.
0275   - `KinesisInitialPositions.Latest` could lead to missed records if data is added to the stream while no input DStreams are running (and no checkpoint info is being stored).
0276   - `KinesisInitialPositions.TrimHorizon` may lead to duplicate processing of records where the impact is dependent on checkpoint frequency and processing idempotency.
0277 
0278 #### Kinesis retry configuration
0279  - `spark.streaming.kinesis.retry.waitTime` : Wait time between Kinesis retries as a duration string. When reading from Amazon Kinesis, users may hit `ProvisionedThroughputExceededException`'s, when consuming faster than 5 transactions/second or, exceeding the maximum read rate of 2 MiB/second. This configuration can be tweaked to increase the sleep between fetches when a fetch fails to reduce these exceptions. Default is "100ms".
0280  - `spark.streaming.kinesis.retry.maxAttempts` : Max number of retries for Kinesis fetches. This config can also be used to tackle the Kinesis `ProvisionedThroughputExceededException`'s in scenarios mentioned above. It can be increased to have more number of retries for Kinesis reads. Default is 3.