0001 ---
0002 layout: global
0003 title: Spark Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher)
0004 license: |
0005 Licensed to the Apache Software Foundation (ASF) under one or more
0006 contributor license agreements. See the NOTICE file distributed with
0007 this work for additional information regarding copyright ownership.
0008 The ASF licenses this file to You under the Apache License, Version 2.0
0009 (the "License"); you may not use this file except in compliance with
0010 the License. You may obtain a copy of the License at
0011
0012 http://www.apache.org/licenses/LICENSE-2.0
0013
0014 Unless required by applicable law or agreed to in writing, software
0015 distributed under the License is distributed on an "AS IS" BASIS,
0016 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0017 See the License for the specific language governing permissions and
0018 limitations under the License.
0019 ---
0020
0021 The Spark Streaming integration for Kafka 0.10 provides simple parallelism, 1:1 correspondence between Kafka
0022 partitions and Spark partitions, and access to offsets and metadata. However, because the newer integration uses
0023 the [new Kafka consumer API](https://kafka.apache.org/documentation.html#newconsumerapi) instead of the simple API,
0024 there are notable differences in usage.
0025
0026 ### Linking
0027 For Scala/Java applications using SBT/Maven project definitions, link your streaming application with the following artifact (see [Linking section](streaming-programming-guide.html#linking) in the main programming guide for further information).
0028
0029 groupId = org.apache.spark
0030 artifactId = spark-streaming-kafka-0-10_{{site.SCALA_BINARY_VERSION}}
0031 version = {{site.SPARK_VERSION_SHORT}}
0032
0033 **Do not** manually add dependencies on `org.apache.kafka` artifacts (e.g. `kafka-clients`). The `spark-streaming-kafka-0-10` artifact has the appropriate transitive dependencies already, and different versions may be incompatible in hard to diagnose ways.
0034
0035 ### Creating a Direct Stream
0036 Note that the namespace for the import includes the version, org.apache.spark.streaming.kafka010
0037
0038 <div class="codetabs">
0039 <div data-lang="scala" markdown="1">
0040 {% highlight scala %}
0041 import org.apache.kafka.clients.consumer.ConsumerRecord
0042 import org.apache.kafka.common.serialization.StringDeserializer
0043 import org.apache.spark.streaming.kafka010._
0044 import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
0045 import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
0046
0047 val kafkaParams = Map[String, Object](
0048 "bootstrap.servers" -> "localhost:9092,anotherhost:9092",
0049 "key.deserializer" -> classOf[StringDeserializer],
0050 "value.deserializer" -> classOf[StringDeserializer],
0051 "group.id" -> "use_a_separate_group_id_for_each_stream",
0052 "auto.offset.reset" -> "latest",
0053 "enable.auto.commit" -> (false: java.lang.Boolean)
0054 )
0055
0056 val topics = Array("topicA", "topicB")
0057 val stream = KafkaUtils.createDirectStream[String, String](
0058 streamingContext,
0059 PreferConsistent,
0060 Subscribe[String, String](topics, kafkaParams)
0061 )
0062
0063 stream.map(record => (record.key, record.value))
0064 {% endhighlight %}
0065 Each item in the stream is a [ConsumerRecord](http://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/ConsumerRecord.html)
0066 </div>
0067 <div data-lang="java" markdown="1">
0068 {% highlight java %}
0069 import java.util.*;
0070 import org.apache.spark.SparkConf;
0071 import org.apache.spark.TaskContext;
0072 import org.apache.spark.api.java.*;
0073 import org.apache.spark.api.java.function.*;
0074 import org.apache.spark.streaming.api.java.*;
0075 import org.apache.spark.streaming.kafka010.*;
0076 import org.apache.kafka.clients.consumer.ConsumerRecord;
0077 import org.apache.kafka.common.TopicPartition;
0078 import org.apache.kafka.common.serialization.StringDeserializer;
0079 import scala.Tuple2;
0080
0081 Map<String, Object> kafkaParams = new HashMap<>();
0082 kafkaParams.put("bootstrap.servers", "localhost:9092,anotherhost:9092");
0083 kafkaParams.put("key.deserializer", StringDeserializer.class);
0084 kafkaParams.put("value.deserializer", StringDeserializer.class);
0085 kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
0086 kafkaParams.put("auto.offset.reset", "latest");
0087 kafkaParams.put("enable.auto.commit", false);
0088
0089 Collection<String> topics = Arrays.asList("topicA", "topicB");
0090
0091 JavaInputDStream<ConsumerRecord<String, String>> stream =
0092 KafkaUtils.createDirectStream(
0093 streamingContext,
0094 LocationStrategies.PreferConsistent(),
0095 ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
0096 );
0097
0098 stream.mapToPair(record -> new Tuple2<>(record.key(), record.value()));
0099 {% endhighlight %}
0100 </div>
0101 </div>
0102
0103 For possible kafkaParams, see [Kafka consumer config docs](http://kafka.apache.org/documentation.html#newconsumerconfigs).
0104 If your Spark batch duration is larger than the default Kafka heartbeat session timeout (30 seconds), increase heartbeat.interval.ms and session.timeout.ms appropriately. For batches larger than 5 minutes, this will require changing group.max.session.timeout.ms on the broker.
0105 Note that the example sets enable.auto.commit to false, for discussion see [Storing Offsets](streaming-kafka-0-10-integration.html#storing-offsets) below.
0106
0107 ### LocationStrategies
0108 The new Kafka consumer API will pre-fetch messages into buffers. Therefore it is important for performance reasons that the Spark integration keep cached consumers on executors (rather than recreating them for each batch), and prefer to schedule partitions on the host locations that have the appropriate consumers.
0109
0110 In most cases, you should use `LocationStrategies.PreferConsistent` as shown above. This will distribute partitions evenly across available executors. If your executors are on the same hosts as your Kafka brokers, use `PreferBrokers`, which will prefer to schedule partitions on the Kafka leader for that partition. Finally, if you have a significant skew in load among partitions, use `PreferFixed`. This allows you to specify an explicit mapping of partitions to hosts (any unspecified partitions will use a consistent location).
0111
0112 The cache for consumers has a default maximum size of 64. If you expect to be handling more than (64 * number of executors) Kafka partitions, you can change this setting via `spark.streaming.kafka.consumer.cache.maxCapacity`.
0113
0114 If you would like to disable the caching for Kafka consumers, you can set `spark.streaming.kafka.consumer.cache.enabled` to `false`.
0115
0116 The cache is keyed by topicpartition and group.id, so use a **separate** `group.id` for each call to `createDirectStream`.
0117
0118
0119 ### ConsumerStrategies
0120 The new Kafka consumer API has a number of different ways to specify topics, some of which require considerable post-object-instantiation setup. `ConsumerStrategies` provides an abstraction that allows Spark to obtain properly configured consumers even after restart from checkpoint.
0121
0122 `ConsumerStrategies.Subscribe`, as shown above, allows you to subscribe to a fixed collection of topics. `SubscribePattern` allows you to use a regex to specify topics of interest. Note that unlike the 0.8 integration, using `Subscribe` or `SubscribePattern` should respond to adding partitions during a running stream. Finally, `Assign` allows you to specify a fixed collection of partitions. All three strategies have overloaded constructors that allow you to specify the starting offset for a particular partition.
0123
0124 If you have specific consumer setup needs that are not met by the options above, `ConsumerStrategy` is a public class that you can extend.
0125
0126 ### Creating an RDD
0127 If you have a use case that is better suited to batch processing, you can create an RDD for a defined range of offsets.
0128
0129 <div class="codetabs">
0130 <div data-lang="scala" markdown="1">
0131 {% highlight scala %}
0132 // Import dependencies and create kafka params as in Create Direct Stream above
0133
0134 val offsetRanges = Array(
0135 // topic, partition, inclusive starting offset, exclusive ending offset
0136 OffsetRange("test", 0, 0, 100),
0137 OffsetRange("test", 1, 0, 100)
0138 )
0139
0140 val rdd = KafkaUtils.createRDD[String, String](sparkContext, kafkaParams, offsetRanges, PreferConsistent)
0141 {% endhighlight %}
0142 </div>
0143 <div data-lang="java" markdown="1">
0144 {% highlight java %}
0145 // Import dependencies and create kafka params as in Create Direct Stream above
0146
0147 OffsetRange[] offsetRanges = {
0148 // topic, partition, inclusive starting offset, exclusive ending offset
0149 OffsetRange.create("test", 0, 0, 100),
0150 OffsetRange.create("test", 1, 0, 100)
0151 };
0152
0153 JavaRDD<ConsumerRecord<String, String>> rdd = KafkaUtils.createRDD(
0154 sparkContext,
0155 kafkaParams,
0156 offsetRanges,
0157 LocationStrategies.PreferConsistent()
0158 );
0159 {% endhighlight %}
0160 </div>
0161 </div>
0162
0163 Note that you cannot use `PreferBrokers`, because without the stream there is not a driver-side consumer to automatically look up broker metadata for you. Use `PreferFixed` with your own metadata lookups if necessary.
0164
0165 ### Obtaining Offsets
0166
0167 <div class="codetabs">
0168 <div data-lang="scala" markdown="1">
0169 {% highlight scala %}
0170 stream.foreachRDD { rdd =>
0171 val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
0172 rdd.foreachPartition { iter =>
0173 val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
0174 println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
0175 }
0176 }
0177 {% endhighlight %}
0178 </div>
0179 <div data-lang="java" markdown="1">
0180 {% highlight java %}
0181 stream.foreachRDD(rdd -> {
0182 OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
0183 rdd.foreachPartition(consumerRecords -> {
0184 OffsetRange o = offsetRanges[TaskContext.get().partitionId()];
0185 System.out.println(
0186 o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset());
0187 });
0188 });
0189 {% endhighlight %}
0190 </div>
0191 </div>
0192
0193 Note that the typecast to `HasOffsetRanges` will only succeed if it is done in the first method called on the result of `createDirectStream`, not later down a chain of methods. Be aware that the one-to-one mapping between RDD partition and Kafka partition does not remain after any methods that shuffle or repartition, e.g. reduceByKey() or window().
0194
0195 ### Storing Offsets
0196 Kafka delivery semantics in the case of failure depend on how and when offsets are stored. Spark output operations are [at-least-once](streaming-programming-guide.html#semantics-of-output-operations). So if you want the equivalent of exactly-once semantics, you must either store offsets after an idempotent output, or store offsets in an atomic transaction alongside output. With this integration, you have 3 options, in order of increasing reliability (and code complexity), for how to store offsets.
0197
0198 #### Checkpoints
0199 If you enable Spark [checkpointing](streaming-programming-guide.html#checkpointing), offsets will be stored in the checkpoint. This is easy to enable, but there are drawbacks. Your output operation must be idempotent, since you will get repeated outputs; transactions are not an option. Furthermore, you cannot recover from a checkpoint if your application code has changed. For planned upgrades, you can mitigate this by running the new code at the same time as the old code (since outputs need to be idempotent anyway, they should not clash). But for unplanned failures that require code changes, you will lose data unless you have another way to identify known good starting offsets.
0200
0201 #### Kafka itself
0202 Kafka has an offset commit API that stores offsets in a special Kafka topic. By default, the new consumer will periodically auto-commit offsets. This is almost certainly not what you want, because messages successfully polled by the consumer may not yet have resulted in a Spark output operation, resulting in undefined semantics. This is why the stream example above sets "enable.auto.commit" to false. However, you can commit offsets to Kafka after you know your output has been stored, using the `commitAsync` API. The benefit as compared to checkpoints is that Kafka is a durable store regardless of changes to your application code. However, Kafka is not transactional, so your outputs must still be idempotent.
0203
0204 <div class="codetabs">
0205 <div data-lang="scala" markdown="1">
0206 {% highlight scala %}
0207 stream.foreachRDD { rdd =>
0208 val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
0209
0210 // some time later, after outputs have completed
0211 stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
0212 }
0213 {% endhighlight %}
0214 As with HasOffsetRanges, the cast to CanCommitOffsets will only succeed if called on the result of createDirectStream, not after transformations. The commitAsync call is threadsafe, but must occur after outputs if you want meaningful semantics.
0215 </div>
0216 <div data-lang="java" markdown="1">
0217 {% highlight java %}
0218 stream.foreachRDD(rdd -> {
0219 OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
0220
0221 // some time later, after outputs have completed
0222 ((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);
0223 });
0224 {% endhighlight %}
0225 </div>
0226 </div>
0227
0228 #### Your own data store
0229 For data stores that support transactions, saving offsets in the same transaction as the results can keep the two in sync, even in failure situations. If you're careful about detecting repeated or skipped offset ranges, rolling back the transaction prevents duplicated or lost messages from affecting results. This gives the equivalent of exactly-once semantics. It is also possible to use this tactic even for outputs that result from aggregations, which are typically hard to make idempotent.
0230
0231 <div class="codetabs">
0232 <div data-lang="scala" markdown="1">
0233 {% highlight scala %}
0234 // The details depend on your data store, but the general idea looks like this
0235
0236 // begin from the offsets committed to the database
0237 val fromOffsets = selectOffsetsFromYourDatabase.map { resultSet =>
0238 new TopicPartition(resultSet.string("topic"), resultSet.int("partition")) -> resultSet.long("offset")
0239 }.toMap
0240
0241 val stream = KafkaUtils.createDirectStream[String, String](
0242 streamingContext,
0243 PreferConsistent,
0244 Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets)
0245 )
0246
0247 stream.foreachRDD { rdd =>
0248 val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
0249
0250 val results = yourCalculation(rdd)
0251
0252 // begin your transaction
0253
0254 // update results
0255 // update offsets where the end of existing offsets matches the beginning of this batch of offsets
0256 // assert that offsets were updated correctly
0257
0258 // end your transaction
0259 }
0260 {% endhighlight %}
0261 </div>
0262 <div data-lang="java" markdown="1">
0263 {% highlight java %}
0264 // The details depend on your data store, but the general idea looks like this
0265
0266 // begin from the offsets committed to the database
0267 Map<TopicPartition, Long> fromOffsets = new HashMap<>();
0268 for (resultSet : selectOffsetsFromYourDatabase)
0269 fromOffsets.put(new TopicPartition(resultSet.string("topic"), resultSet.int("partition")), resultSet.long("offset"));
0270 }
0271
0272 JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(
0273 streamingContext,
0274 LocationStrategies.PreferConsistent(),
0275 ConsumerStrategies.<String, String>Assign(fromOffsets.keySet(), kafkaParams, fromOffsets)
0276 );
0277
0278 stream.foreachRDD(rdd -> {
0279 OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
0280
0281 Object results = yourCalculation(rdd);
0282
0283 // begin your transaction
0284
0285 // update results
0286 // update offsets where the end of existing offsets matches the beginning of this batch of offsets
0287 // assert that offsets were updated correctly
0288
0289 // end your transaction
0290 });
0291 {% endhighlight %}
0292 </div>
0293 </div>
0294
0295 ### SSL / TLS
0296 The new Kafka consumer [supports SSL](http://kafka.apache.org/documentation.html#security_ssl). To enable it, set kafkaParams appropriately before passing to `createDirectStream` / `createRDD`. Note that this only applies to communication between Spark and Kafka brokers; you are still responsible for separately [securing](security.html) Spark inter-node communication.
0297
0298
0299 <div class="codetabs">
0300 <div data-lang="scala" markdown="1">
0301 {% highlight scala %}
0302 val kafkaParams = Map[String, Object](
0303 // the usual params, make sure to change the port in bootstrap.servers if 9092 is not TLS
0304 "security.protocol" -> "SSL",
0305 "ssl.truststore.location" -> "/some-directory/kafka.client.truststore.jks",
0306 "ssl.truststore.password" -> "test1234",
0307 "ssl.keystore.location" -> "/some-directory/kafka.client.keystore.jks",
0308 "ssl.keystore.password" -> "test1234",
0309 "ssl.key.password" -> "test1234"
0310 )
0311 {% endhighlight %}
0312 </div>
0313 <div data-lang="java" markdown="1">
0314 {% highlight java %}
0315 Map<String, Object> kafkaParams = new HashMap<String, Object>();
0316 // the usual params, make sure to change the port in bootstrap.servers if 9092 is not TLS
0317 kafkaParams.put("security.protocol", "SSL");
0318 kafkaParams.put("ssl.truststore.location", "/some-directory/kafka.client.truststore.jks");
0319 kafkaParams.put("ssl.truststore.password", "test1234");
0320 kafkaParams.put("ssl.keystore.location", "/some-directory/kafka.client.keystore.jks");
0321 kafkaParams.put("ssl.keystore.password", "test1234");
0322 kafkaParams.put("ssl.key.password", "test1234");
0323 {% endhighlight %}
0324 </div>
0325 </div>
0326
0327 ### Deploying
0328
0329 As with any Spark applications, `spark-submit` is used to launch your application.
0330
0331 For Scala and Java applications, if you are using SBT or Maven for project management, then package `spark-streaming-kafka-0-10_{{site.SCALA_BINARY_VERSION}}` and its dependencies into the application JAR. Make sure `spark-core_{{site.SCALA_BINARY_VERSION}}` and `spark-streaming_{{site.SCALA_BINARY_VERSION}}` are marked as `provided` dependencies as those are already present in a Spark installation. Then use `spark-submit` to launch your application (see [Deploying section](streaming-programming-guide.html#deploying-applications) in the main programming guide).
0332
0333 ### Security
0334
0335 See [Structured Streaming Security](structured-streaming-kafka-integration.html#security).
0336
0337 ##### Additional Caveats
0338
0339 - Kafka native sink is not available so delegation token used only on consumer side.