Back to home page

OSCL-LXR

 
 

    


0001 ---
0002 layout: global
0003 title: Spark Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher)
0004 license: |
0005   Licensed to the Apache Software Foundation (ASF) under one or more
0006   contributor license agreements.  See the NOTICE file distributed with
0007   this work for additional information regarding copyright ownership.
0008   The ASF licenses this file to You under the Apache License, Version 2.0
0009   (the "License"); you may not use this file except in compliance with
0010   the License.  You may obtain a copy of the License at
0011  
0012      http://www.apache.org/licenses/LICENSE-2.0
0013  
0014   Unless required by applicable law or agreed to in writing, software
0015   distributed under the License is distributed on an "AS IS" BASIS,
0016   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0017   See the License for the specific language governing permissions and
0018   limitations under the License.
0019 ---
0020 
0021 The Spark Streaming integration for Kafka 0.10 provides simple parallelism, 1:1 correspondence between Kafka 
0022 partitions and Spark partitions, and access to offsets and metadata. However, because the newer integration uses 
0023 the [new Kafka consumer API](https://kafka.apache.org/documentation.html#newconsumerapi) instead of the simple API, 
0024 there are notable differences in usage.
0025 
0026 ### Linking
0027 For Scala/Java applications using SBT/Maven project definitions, link your streaming application with the following artifact (see [Linking section](streaming-programming-guide.html#linking) in the main programming guide for further information).
0028 
0029         groupId = org.apache.spark
0030         artifactId = spark-streaming-kafka-0-10_{{site.SCALA_BINARY_VERSION}}
0031         version = {{site.SPARK_VERSION_SHORT}}
0032 
0033 **Do not** manually add dependencies on `org.apache.kafka` artifacts (e.g. `kafka-clients`).  The `spark-streaming-kafka-0-10` artifact has the appropriate transitive dependencies already, and different versions may be incompatible in hard to diagnose ways.
0034 
0035 ### Creating a Direct Stream
0036  Note that the namespace for the import includes the version, org.apache.spark.streaming.kafka010
0037 
0038 <div class="codetabs">
0039 <div data-lang="scala" markdown="1">
0040 {% highlight scala %}
0041 import org.apache.kafka.clients.consumer.ConsumerRecord
0042 import org.apache.kafka.common.serialization.StringDeserializer
0043 import org.apache.spark.streaming.kafka010._
0044 import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
0045 import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
0046 
0047 val kafkaParams = Map[String, Object](
0048   "bootstrap.servers" -> "localhost:9092,anotherhost:9092",
0049   "key.deserializer" -> classOf[StringDeserializer],
0050   "value.deserializer" -> classOf[StringDeserializer],
0051   "group.id" -> "use_a_separate_group_id_for_each_stream",
0052   "auto.offset.reset" -> "latest",
0053   "enable.auto.commit" -> (false: java.lang.Boolean)
0054 )
0055 
0056 val topics = Array("topicA", "topicB")
0057 val stream = KafkaUtils.createDirectStream[String, String](
0058   streamingContext,
0059   PreferConsistent,
0060   Subscribe[String, String](topics, kafkaParams)
0061 )
0062 
0063 stream.map(record => (record.key, record.value))
0064 {% endhighlight %}
0065 Each item in the stream is a [ConsumerRecord](http://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/ConsumerRecord.html)
0066 </div>
0067 <div data-lang="java" markdown="1">
0068 {% highlight java %}
0069 import java.util.*;
0070 import org.apache.spark.SparkConf;
0071 import org.apache.spark.TaskContext;
0072 import org.apache.spark.api.java.*;
0073 import org.apache.spark.api.java.function.*;
0074 import org.apache.spark.streaming.api.java.*;
0075 import org.apache.spark.streaming.kafka010.*;
0076 import org.apache.kafka.clients.consumer.ConsumerRecord;
0077 import org.apache.kafka.common.TopicPartition;
0078 import org.apache.kafka.common.serialization.StringDeserializer;
0079 import scala.Tuple2;
0080 
0081 Map<String, Object> kafkaParams = new HashMap<>();
0082 kafkaParams.put("bootstrap.servers", "localhost:9092,anotherhost:9092");
0083 kafkaParams.put("key.deserializer", StringDeserializer.class);
0084 kafkaParams.put("value.deserializer", StringDeserializer.class);
0085 kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
0086 kafkaParams.put("auto.offset.reset", "latest");
0087 kafkaParams.put("enable.auto.commit", false);
0088 
0089 Collection<String> topics = Arrays.asList("topicA", "topicB");
0090 
0091 JavaInputDStream<ConsumerRecord<String, String>> stream =
0092   KafkaUtils.createDirectStream(
0093     streamingContext,
0094     LocationStrategies.PreferConsistent(),
0095     ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
0096   );
0097 
0098 stream.mapToPair(record -> new Tuple2<>(record.key(), record.value()));
0099 {% endhighlight %}
0100 </div>
0101 </div>
0102 
0103 For possible kafkaParams, see [Kafka consumer config docs](http://kafka.apache.org/documentation.html#newconsumerconfigs).
0104 If your Spark batch duration is larger than the default Kafka heartbeat session timeout (30 seconds), increase heartbeat.interval.ms and session.timeout.ms appropriately.  For batches larger than 5 minutes, this will require changing group.max.session.timeout.ms on the broker.
0105 Note that the example sets enable.auto.commit to false, for discussion see [Storing Offsets](streaming-kafka-0-10-integration.html#storing-offsets) below.
0106 
0107 ### LocationStrategies
0108 The new Kafka consumer API will pre-fetch messages into buffers.  Therefore it is important for performance reasons that the Spark integration keep cached consumers on executors (rather than recreating them for each batch), and prefer to schedule partitions on the host locations that have the appropriate consumers.
0109 
0110 In most cases, you should use `LocationStrategies.PreferConsistent` as shown above.  This will distribute partitions evenly across available executors.  If your executors are on the same hosts as your Kafka brokers, use `PreferBrokers`, which will prefer to schedule partitions on the Kafka leader for that partition.  Finally, if you have a significant skew in load among partitions, use `PreferFixed`. This allows you to specify an explicit mapping of partitions to hosts (any unspecified partitions will use a consistent location).
0111 
0112 The cache for consumers has a default maximum size of 64.  If you expect to be handling more than (64 * number of executors) Kafka partitions, you can change this setting via `spark.streaming.kafka.consumer.cache.maxCapacity`.
0113 
0114 If you would like to disable the caching for Kafka consumers, you can set `spark.streaming.kafka.consumer.cache.enabled` to `false`.
0115 
0116 The cache is keyed by topicpartition and group.id, so use a **separate** `group.id` for each call to `createDirectStream`.
0117 
0118 
0119 ### ConsumerStrategies
0120 The new Kafka consumer API has a number of different ways to specify topics, some of which require considerable post-object-instantiation setup.  `ConsumerStrategies` provides an abstraction that allows Spark to obtain properly configured consumers even after restart from checkpoint.
0121 
0122 `ConsumerStrategies.Subscribe`, as shown above, allows you to subscribe to a fixed collection of topics. `SubscribePattern` allows you to use a regex to specify topics of interest. Note that unlike the 0.8 integration, using `Subscribe` or `SubscribePattern` should respond to adding partitions during a running stream. Finally, `Assign` allows you to specify a fixed collection of partitions.  All three strategies have overloaded constructors that allow you to specify the starting offset for a particular partition.
0123 
0124 If you have specific consumer setup needs that are not met by the options above, `ConsumerStrategy` is a public class that you can extend.
0125 
0126 ### Creating an RDD
0127 If you have a use case that is better suited to batch processing, you can create an RDD for a defined range of offsets.
0128 
0129 <div class="codetabs">
0130 <div data-lang="scala" markdown="1">
0131 {% highlight scala %}
0132 // Import dependencies and create kafka params as in Create Direct Stream above
0133 
0134 val offsetRanges = Array(
0135   // topic, partition, inclusive starting offset, exclusive ending offset
0136   OffsetRange("test", 0, 0, 100),
0137   OffsetRange("test", 1, 0, 100)
0138 )
0139 
0140 val rdd = KafkaUtils.createRDD[String, String](sparkContext, kafkaParams, offsetRanges, PreferConsistent)
0141 {% endhighlight %}
0142 </div>
0143 <div data-lang="java" markdown="1">
0144 {% highlight java %}
0145 // Import dependencies and create kafka params as in Create Direct Stream above
0146 
0147 OffsetRange[] offsetRanges = {
0148   // topic, partition, inclusive starting offset, exclusive ending offset
0149   OffsetRange.create("test", 0, 0, 100),
0150   OffsetRange.create("test", 1, 0, 100)
0151 };
0152 
0153 JavaRDD<ConsumerRecord<String, String>> rdd = KafkaUtils.createRDD(
0154   sparkContext,
0155   kafkaParams,
0156   offsetRanges,
0157   LocationStrategies.PreferConsistent()
0158 );
0159 {% endhighlight %}
0160 </div>
0161 </div>
0162 
0163 Note that you cannot use `PreferBrokers`, because without the stream there is not a driver-side consumer to automatically look up broker metadata for you.  Use `PreferFixed` with your own metadata lookups if necessary.
0164 
0165 ### Obtaining Offsets
0166 
0167 <div class="codetabs">
0168 <div data-lang="scala" markdown="1">
0169 {% highlight scala %}
0170 stream.foreachRDD { rdd =>
0171   val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
0172   rdd.foreachPartition { iter =>
0173     val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
0174     println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
0175   }
0176 }
0177 {% endhighlight %}
0178 </div>
0179 <div data-lang="java" markdown="1">
0180 {% highlight java %}
0181 stream.foreachRDD(rdd -> {
0182   OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
0183   rdd.foreachPartition(consumerRecords -> {
0184     OffsetRange o = offsetRanges[TaskContext.get().partitionId()];
0185     System.out.println(
0186       o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset());
0187   });
0188 });
0189 {% endhighlight %}
0190 </div>
0191 </div>
0192 
0193 Note that the typecast to `HasOffsetRanges` will only succeed if it is done in the first method called on the result of `createDirectStream`, not later down a chain of methods. Be aware that the one-to-one mapping between RDD partition and Kafka partition does not remain after any methods that shuffle or repartition, e.g. reduceByKey() or window().
0194 
0195 ### Storing Offsets
0196 Kafka delivery semantics in the case of failure depend on how and when offsets are stored.  Spark output operations are [at-least-once](streaming-programming-guide.html#semantics-of-output-operations).  So if you want the equivalent of exactly-once semantics, you must either store offsets after an idempotent output, or store offsets in an atomic transaction alongside output. With this integration, you have 3 options, in order of increasing reliability (and code complexity), for how to store offsets.
0197 
0198 #### Checkpoints
0199 If you enable Spark [checkpointing](streaming-programming-guide.html#checkpointing), offsets will be stored in the checkpoint.  This is easy to enable, but there are drawbacks. Your output operation must be idempotent, since you will get repeated outputs; transactions are not an option.  Furthermore, you cannot recover from a checkpoint if your application code has changed.  For planned upgrades, you can mitigate this by running the new code at the same time as the old code (since outputs need to be idempotent anyway, they should not clash).  But for unplanned failures that require code changes, you will lose data unless you have another way to identify known good starting offsets.
0200 
0201 #### Kafka itself
0202 Kafka has an offset commit API that stores offsets in a special Kafka topic.  By default, the new consumer will periodically auto-commit offsets. This is almost certainly not what you want, because messages successfully polled by the consumer may not yet have resulted in a Spark output operation, resulting in undefined semantics. This is why the stream example above sets "enable.auto.commit" to false.  However, you can commit offsets to Kafka after you know your output has been stored, using the `commitAsync` API. The benefit as compared to checkpoints is that Kafka is a durable store regardless of changes to your application code.  However, Kafka is not transactional, so your outputs must still be idempotent.
0203 
0204 <div class="codetabs">
0205 <div data-lang="scala" markdown="1">
0206 {% highlight scala %}
0207 stream.foreachRDD { rdd =>
0208   val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
0209 
0210   // some time later, after outputs have completed
0211   stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
0212 }
0213 {% endhighlight %}
0214 As with HasOffsetRanges, the cast to CanCommitOffsets will only succeed if called on the result of createDirectStream, not after transformations.  The commitAsync call is threadsafe, but must occur after outputs if you want meaningful semantics.
0215 </div>
0216 <div data-lang="java" markdown="1">
0217 {% highlight java %}
0218 stream.foreachRDD(rdd -> {
0219   OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
0220 
0221   // some time later, after outputs have completed
0222   ((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);
0223 });
0224 {% endhighlight %}
0225 </div>
0226 </div>
0227 
0228 #### Your own data store
0229 For data stores that support transactions, saving offsets in the same transaction as the results can keep the two in sync, even in failure situations.  If you're careful about detecting repeated or skipped offset ranges, rolling back the transaction prevents duplicated or lost messages from affecting results.  This gives the equivalent of exactly-once semantics.  It is also possible to use this tactic even for outputs that result from aggregations, which are typically hard to make idempotent.
0230 
0231 <div class="codetabs">
0232 <div data-lang="scala" markdown="1">
0233 {% highlight scala %}
0234 // The details depend on your data store, but the general idea looks like this
0235 
0236 // begin from the offsets committed to the database
0237 val fromOffsets = selectOffsetsFromYourDatabase.map { resultSet =>
0238   new TopicPartition(resultSet.string("topic"), resultSet.int("partition")) -> resultSet.long("offset")
0239 }.toMap
0240 
0241 val stream = KafkaUtils.createDirectStream[String, String](
0242   streamingContext,
0243   PreferConsistent,
0244   Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets)
0245 )
0246 
0247 stream.foreachRDD { rdd =>
0248   val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
0249 
0250   val results = yourCalculation(rdd)
0251 
0252   // begin your transaction
0253 
0254   // update results
0255   // update offsets where the end of existing offsets matches the beginning of this batch of offsets
0256   // assert that offsets were updated correctly
0257 
0258   // end your transaction
0259 }
0260 {% endhighlight %}
0261 </div>
0262 <div data-lang="java" markdown="1">
0263 {% highlight java %}
0264 // The details depend on your data store, but the general idea looks like this
0265 
0266 // begin from the offsets committed to the database
0267 Map<TopicPartition, Long> fromOffsets = new HashMap<>();
0268 for (resultSet : selectOffsetsFromYourDatabase)
0269   fromOffsets.put(new TopicPartition(resultSet.string("topic"), resultSet.int("partition")), resultSet.long("offset"));
0270 }
0271 
0272 JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(
0273   streamingContext,
0274   LocationStrategies.PreferConsistent(),
0275   ConsumerStrategies.<String, String>Assign(fromOffsets.keySet(), kafkaParams, fromOffsets)
0276 );
0277 
0278 stream.foreachRDD(rdd -> {
0279   OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
0280   
0281   Object results = yourCalculation(rdd);
0282 
0283   // begin your transaction
0284 
0285   // update results
0286   // update offsets where the end of existing offsets matches the beginning of this batch of offsets
0287   // assert that offsets were updated correctly
0288 
0289   // end your transaction
0290 });
0291 {% endhighlight %}
0292 </div>
0293 </div>
0294 
0295 ### SSL / TLS
0296 The new Kafka consumer [supports SSL](http://kafka.apache.org/documentation.html#security_ssl).  To enable it, set kafkaParams appropriately before passing to `createDirectStream` / `createRDD`.  Note that this only applies to communication between Spark and Kafka brokers; you are still responsible for separately [securing](security.html) Spark inter-node communication.
0297 
0298 
0299 <div class="codetabs">
0300 <div data-lang="scala" markdown="1">
0301 {% highlight scala %}
0302 val kafkaParams = Map[String, Object](
0303   // the usual params, make sure to change the port in bootstrap.servers if 9092 is not TLS
0304   "security.protocol" -> "SSL",
0305   "ssl.truststore.location" -> "/some-directory/kafka.client.truststore.jks",
0306   "ssl.truststore.password" -> "test1234",
0307   "ssl.keystore.location" -> "/some-directory/kafka.client.keystore.jks",
0308   "ssl.keystore.password" -> "test1234",
0309   "ssl.key.password" -> "test1234"
0310 )
0311 {% endhighlight %}
0312 </div>
0313 <div data-lang="java" markdown="1">
0314 {% highlight java %}
0315 Map<String, Object> kafkaParams = new HashMap<String, Object>();
0316 // the usual params, make sure to change the port in bootstrap.servers if 9092 is not TLS
0317 kafkaParams.put("security.protocol", "SSL");
0318 kafkaParams.put("ssl.truststore.location", "/some-directory/kafka.client.truststore.jks");
0319 kafkaParams.put("ssl.truststore.password", "test1234");
0320 kafkaParams.put("ssl.keystore.location", "/some-directory/kafka.client.keystore.jks");
0321 kafkaParams.put("ssl.keystore.password", "test1234");
0322 kafkaParams.put("ssl.key.password", "test1234");
0323 {% endhighlight %}
0324 </div>
0325 </div>
0326 
0327 ### Deploying
0328 
0329 As with any Spark applications, `spark-submit` is used to launch your application.
0330 
0331 For Scala and Java applications, if you are using SBT or Maven for project management, then package `spark-streaming-kafka-0-10_{{site.SCALA_BINARY_VERSION}}` and its dependencies into the application JAR. Make sure `spark-core_{{site.SCALA_BINARY_VERSION}}` and `spark-streaming_{{site.SCALA_BINARY_VERSION}}` are marked as `provided` dependencies as those are already present in a Spark installation. Then use `spark-submit` to launch your application (see [Deploying section](streaming-programming-guide.html#deploying-applications) in the main programming guide).
0332 
0333 ### Security
0334 
0335 See [Structured Streaming Security](structured-streaming-kafka-integration.html#security).
0336 
0337 ##### Additional Caveats
0338 
0339 - Kafka native sink is not available so delegation token used only on consumer side.