0001 ---
0002 layout: global
0003 title: Structured Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher)
0004 license: |
0005 Licensed to the Apache Software Foundation (ASF) under one or more
0006 contributor license agreements. See the NOTICE file distributed with
0007 this work for additional information regarding copyright ownership.
0008 The ASF licenses this file to You under the Apache License, Version 2.0
0009 (the "License"); you may not use this file except in compliance with
0010 the License. You may obtain a copy of the License at
0011
0012 http://www.apache.org/licenses/LICENSE-2.0
0013
0014 Unless required by applicable law or agreed to in writing, software
0015 distributed under the License is distributed on an "AS IS" BASIS,
0016 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0017 See the License for the specific language governing permissions and
0018 limitations under the License.
0019 ---
0020
0021 Structured Streaming integration for Kafka 0.10 to read data from and write data to Kafka.
0022
0023 ## Linking
0024 For Scala/Java applications using SBT/Maven project definitions, link your application with the following artifact:
0025
0026 groupId = org.apache.spark
0027 artifactId = spark-sql-kafka-0-10_{{site.SCALA_BINARY_VERSION}}
0028 version = {{site.SPARK_VERSION_SHORT}}
0029
0030 Please note that to use the headers functionality, your Kafka client version should be version 0.11.0.0 or up.
0031
0032 For Python applications, you need to add this above library and its dependencies when deploying your
0033 application. See the [Deploying](#deploying) subsection below.
0034
0035 For experimenting on `spark-shell`, you need to add this above library and its dependencies too when invoking `spark-shell`. Also, see the [Deploying](#deploying) subsection below.
0036
0037 ## Reading Data from Kafka
0038
0039 ### Creating a Kafka Source for Streaming Queries
0040
0041 <div class="codetabs">
0042 <div data-lang="scala" markdown="1">
0043 {% highlight scala %}
0044
0045 // Subscribe to 1 topic
0046 val df = spark
0047 .readStream
0048 .format("kafka")
0049 .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
0050 .option("subscribe", "topic1")
0051 .load()
0052 df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
0053 .as[(String, String)]
0054
0055 // Subscribe to 1 topic, with headers
0056 val df = spark
0057 .readStream
0058 .format("kafka")
0059 .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
0060 .option("subscribe", "topic1")
0061 .option("includeHeaders", "true")
0062 .load()
0063 df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers")
0064 .as[(String, String, Map)]
0065
0066 // Subscribe to multiple topics
0067 val df = spark
0068 .readStream
0069 .format("kafka")
0070 .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
0071 .option("subscribe", "topic1,topic2")
0072 .load()
0073 df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
0074 .as[(String, String)]
0075
0076 // Subscribe to a pattern
0077 val df = spark
0078 .readStream
0079 .format("kafka")
0080 .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
0081 .option("subscribePattern", "topic.*")
0082 .load()
0083 df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
0084 .as[(String, String)]
0085
0086 {% endhighlight %}
0087 </div>
0088 <div data-lang="java" markdown="1">
0089 {% highlight java %}
0090
0091 // Subscribe to 1 topic
0092 Dataset<Row> df = spark
0093 .readStream()
0094 .format("kafka")
0095 .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
0096 .option("subscribe", "topic1")
0097 .load();
0098 df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
0099
0100 // Subscribe to 1 topic, with headers
0101 Dataset<Row> df = spark
0102 .readStream()
0103 .format("kafka")
0104 .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
0105 .option("subscribe", "topic1")
0106 .option("includeHeaders", "true")
0107 .load()
0108 df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers");
0109
0110 // Subscribe to multiple topics
0111 Dataset<Row> df = spark
0112 .readStream()
0113 .format("kafka")
0114 .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
0115 .option("subscribe", "topic1,topic2")
0116 .load();
0117 df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
0118
0119 // Subscribe to a pattern
0120 Dataset<Row> df = spark
0121 .readStream()
0122 .format("kafka")
0123 .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
0124 .option("subscribePattern", "topic.*")
0125 .load();
0126 df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
0127
0128 {% endhighlight %}
0129 </div>
0130 <div data-lang="python" markdown="1">
0131 {% highlight python %}
0132
0133 # Subscribe to 1 topic
0134 df = spark \
0135 .readStream \
0136 .format("kafka") \
0137 .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
0138 .option("subscribe", "topic1") \
0139 .load()
0140 df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
0141
0142 # Subscribe to 1 topic, with headers
0143 val df = spark \
0144 .readStream \
0145 .format("kafka") \
0146 .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
0147 .option("subscribe", "topic1") \
0148 .option("includeHeaders", "true") \
0149 .load()
0150 df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers")
0151
0152 # Subscribe to multiple topics
0153 df = spark \
0154 .readStream \
0155 .format("kafka") \
0156 .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
0157 .option("subscribe", "topic1,topic2") \
0158 .load()
0159 df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
0160
0161 # Subscribe to a pattern
0162 df = spark \
0163 .readStream \
0164 .format("kafka") \
0165 .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
0166 .option("subscribePattern", "topic.*") \
0167 .load()
0168 df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
0169
0170 {% endhighlight %}
0171 </div>
0172 </div>
0173
0174 ### Creating a Kafka Source for Batch Queries
0175 If you have a use case that is better suited to batch processing,
0176 you can create a Dataset/DataFrame for a defined range of offsets.
0177
0178 <div class="codetabs">
0179 <div data-lang="scala" markdown="1">
0180 {% highlight scala %}
0181
0182 // Subscribe to 1 topic defaults to the earliest and latest offsets
0183 val df = spark
0184 .read
0185 .format("kafka")
0186 .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
0187 .option("subscribe", "topic1")
0188 .load()
0189 df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
0190 .as[(String, String)]
0191
0192 // Subscribe to multiple topics, specifying explicit Kafka offsets
0193 val df = spark
0194 .read
0195 .format("kafka")
0196 .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
0197 .option("subscribe", "topic1,topic2")
0198 .option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""")
0199 .option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""")
0200 .load()
0201 df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
0202 .as[(String, String)]
0203
0204 // Subscribe to a pattern, at the earliest and latest offsets
0205 val df = spark
0206 .read
0207 .format("kafka")
0208 .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
0209 .option("subscribePattern", "topic.*")
0210 .option("startingOffsets", "earliest")
0211 .option("endingOffsets", "latest")
0212 .load()
0213 df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
0214 .as[(String, String)]
0215
0216 {% endhighlight %}
0217 </div>
0218 <div data-lang="java" markdown="1">
0219 {% highlight java %}
0220
0221 // Subscribe to 1 topic defaults to the earliest and latest offsets
0222 Dataset<Row> df = spark
0223 .read()
0224 .format("kafka")
0225 .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
0226 .option("subscribe", "topic1")
0227 .load();
0228 df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
0229
0230 // Subscribe to multiple topics, specifying explicit Kafka offsets
0231 Dataset<Row> df = spark
0232 .read()
0233 .format("kafka")
0234 .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
0235 .option("subscribe", "topic1,topic2")
0236 .option("startingOffsets", "{\"topic1\":{\"0\":23,\"1\":-2},\"topic2\":{\"0\":-2}}")
0237 .option("endingOffsets", "{\"topic1\":{\"0\":50,\"1\":-1},\"topic2\":{\"0\":-1}}")
0238 .load();
0239 df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
0240
0241 // Subscribe to a pattern, at the earliest and latest offsets
0242 Dataset<Row> df = spark
0243 .read()
0244 .format("kafka")
0245 .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
0246 .option("subscribePattern", "topic.*")
0247 .option("startingOffsets", "earliest")
0248 .option("endingOffsets", "latest")
0249 .load();
0250 df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
0251
0252 {% endhighlight %}
0253 </div>
0254 <div data-lang="python" markdown="1">
0255 {% highlight python %}
0256
0257 # Subscribe to 1 topic defaults to the earliest and latest offsets
0258 df = spark \
0259 .read \
0260 .format("kafka") \
0261 .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
0262 .option("subscribe", "topic1") \
0263 .load()
0264 df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
0265
0266 # Subscribe to multiple topics, specifying explicit Kafka offsets
0267 df = spark \
0268 .read \
0269 .format("kafka") \
0270 .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
0271 .option("subscribe", "topic1,topic2") \
0272 .option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""") \
0273 .option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""") \
0274 .load()
0275 df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
0276
0277 # Subscribe to a pattern, at the earliest and latest offsets
0278 df = spark \
0279 .read \
0280 .format("kafka") \
0281 .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
0282 .option("subscribePattern", "topic.*") \
0283 .option("startingOffsets", "earliest") \
0284 .option("endingOffsets", "latest") \
0285 .load()
0286 df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
0287 {% endhighlight %}
0288 </div>
0289 </div>
0290
0291 Each row in the source has the following schema:
0292 <table class="table">
0293 <tr><th>Column</th><th>Type</th></tr>
0294 <tr>
0295 <td>key</td>
0296 <td>binary</td>
0297 </tr>
0298 <tr>
0299 <td>value</td>
0300 <td>binary</td>
0301 </tr>
0302 <tr>
0303 <td>topic</td>
0304 <td>string</td>
0305 </tr>
0306 <tr>
0307 <td>partition</td>
0308 <td>int</td>
0309 </tr>
0310 <tr>
0311 <td>offset</td>
0312 <td>long</td>
0313 </tr>
0314 <tr>
0315 <td>timestamp</td>
0316 <td>timestamp</td>
0317 </tr>
0318 <tr>
0319 <td>timestampType</td>
0320 <td>int</td>
0321 </tr>
0322 <tr>
0323 <td>headers (optional)</td>
0324 <td>array</td>
0325 </tr>
0326 </table>
0327
0328 The following options must be set for the Kafka source
0329 for both batch and streaming queries.
0330
0331 <table class="table">
0332 <tr><th>Option</th><th>value</th><th>meaning</th></tr>
0333 <tr>
0334 <td>assign</td>
0335 <td>json string {"topicA":[0,1],"topicB":[2,4]}</td>
0336 <td>Specific TopicPartitions to consume.
0337 Only one of "assign", "subscribe" or "subscribePattern"
0338 options can be specified for Kafka source.</td>
0339 </tr>
0340 <tr>
0341 <td>subscribe</td>
0342 <td>A comma-separated list of topics</td>
0343 <td>The topic list to subscribe.
0344 Only one of "assign", "subscribe" or "subscribePattern"
0345 options can be specified for Kafka source.</td>
0346 </tr>
0347 <tr>
0348 <td>subscribePattern</td>
0349 <td>Java regex string</td>
0350 <td>The pattern used to subscribe to topic(s).
0351 Only one of "assign, "subscribe" or "subscribePattern"
0352 options can be specified for Kafka source.</td>
0353 </tr>
0354 <tr>
0355 <td>kafka.bootstrap.servers</td>
0356 <td>A comma-separated list of host:port</td>
0357 <td>The Kafka "bootstrap.servers" configuration.</td>
0358 </tr>
0359 </table>
0360
0361 The following configurations are optional:
0362
0363 <table class="table">
0364 <tr><th>Option</th><th>value</th><th>default</th><th>query type</th><th>meaning</th></tr>
0365 <tr>
0366 <td>startingOffsetsByTimestamp</td>
0367 <td>json string
0368 """ {"topicA":{"0": 1000, "1": 1000}, "topicB": {"0": 2000, "1": 2000}} """
0369 </td>
0370 <td>none (the value of <code>startingOffsets</code> will apply)</td>
0371 <td>streaming and batch</td>
0372 <td>The start point of timestamp when a query is started, a json string specifying a starting timestamp for
0373 each TopicPartition. The returned offset for each partition is the earliest offset whose timestamp is greater than or
0374 equal to the given timestamp in the corresponding partition. If the matched offset doesn't exist,
0375 the query will fail immediately to prevent unintended read from such partition. (This is a kind of limitation as of now, and will be addressed in near future.)<p/>
0376 <p/>
0377 Spark simply passes the timestamp information to <code>KafkaConsumer.offsetsForTimes</code>, and doesn't interpret or reason about the value. <p/>
0378 For more details on <code>KafkaConsumer.offsetsForTimes</code>, please refer <a href="https://kafka.apache.org/21/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#offsetsForTimes-java.util.Map-">javadoc</a> for details.<p/>
0379 Also the meaning of <code>timestamp</code> here can be vary according to Kafka configuration (<code>log.message.timestamp.type</code>): please refer <a href="https://kafka.apache.org/documentation/">Kafka documentation</a> for further details.<p/>
0380 Note: This option requires Kafka 0.10.1.0 or higher.<p/>
0381 Note2: <code>startingOffsetsByTimestamp</code> takes precedence over <code>startingOffsets</code>.<p/>
0382 Note3: For streaming queries, this only applies when a new query is started, and that resuming will
0383 always pick up from where the query left off. Newly discovered partitions during a query will start at
0384 earliest.</td>
0385 </tr>
0386 <tr>
0387 <td>startingOffsets</td>
0388 <td>"earliest", "latest" (streaming only), or json string
0389 """ {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}} """
0390 </td>
0391 <td>"latest" for streaming, "earliest" for batch</td>
0392 <td>streaming and batch</td>
0393 <td>The start point when a query is started, either "earliest" which is from the earliest offsets,
0394 "latest" which is just from the latest offsets, or a json string specifying a starting offset for
0395 each TopicPartition. In the json, -2 as an offset can be used to refer to earliest, -1 to latest.
0396 Note: For batch queries, latest (either implicitly or by using -1 in json) is not allowed.
0397 For streaming queries, this only applies when a new query is started, and that resuming will
0398 always pick up from where the query left off. Newly discovered partitions during a query will start at
0399 earliest.</td>
0400 </tr>
0401 <tr>
0402 <td>endingOffsetsByTimestamp</td>
0403 <td>json string
0404 """ {"topicA":{"0": 1000, "1": 1000}, "topicB": {"0": 2000, "1": 2000}} """
0405 </td>
0406 <td>latest</td>
0407 <td>batch query</td>
0408 <td>The end point when a batch query is ended, a json string specifying an ending timestamp for each TopicPartition.
0409 The returned offset for each partition is the earliest offset whose timestamp is greater than or equal to
0410 the given timestamp in the corresponding partition. If the matched offset doesn't exist, the offset will
0411 be set to latest.<p/>
0412 <p/>
0413 Spark simply passes the timestamp information to <code>KafkaConsumer.offsetsForTimes</code>, and doesn't interpret or reason about the value. <p/>
0414 For more details on <code>KafkaConsumer.offsetsForTimes</code>, please refer <a href="https://kafka.apache.org/21/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#offsetsForTimes-java.util.Map-">javadoc</a> for details.<p/>
0415 Also the meaning of <code>timestamp</code> here can be vary according to Kafka configuration (<code>log.message.timestamp.type</code>): please refer <a href="https://kafka.apache.org/documentation/">Kafka documentation</a> for further details.<p/>
0416 Note: This option requires Kafka 0.10.1.0 or higher.<p/>
0417 Note2: <code>endingOffsetsByTimestamp</code> takes precedence over <code>endingOffsets</code>.
0418 </td>
0419 </tr>
0420 <tr>
0421 <td>endingOffsets</td>
0422 <td>latest or json string
0423 {"topicA":{"0":23,"1":-1},"topicB":{"0":-1}}
0424 </td>
0425 <td>latest</td>
0426 <td>batch query</td>
0427 <td>The end point when a batch query is ended, either "latest" which is just referred to the
0428 latest, or a json string specifying an ending offset for each TopicPartition. In the json, -1
0429 as an offset can be used to refer to latest, and -2 (earliest) as an offset is not allowed.</td>
0430 </tr>
0431 <tr>
0432 <td>failOnDataLoss</td>
0433 <td>true or false</td>
0434 <td>true</td>
0435 <td>streaming and batch</td>
0436 <td>Whether to fail the query when it's possible that data is lost (e.g., topics are deleted, or
0437 offsets are out of range). This may be a false alarm. You can disable it when it doesn't work
0438 as you expected.</td>
0439 </tr>
0440 <tr>
0441 <td>kafkaConsumer.pollTimeoutMs</td>
0442 <td>long</td>
0443 <td>512</td>
0444 <td>streaming and batch</td>
0445 <td>The timeout in milliseconds to poll data from Kafka in executors.</td>
0446 </tr>
0447 <tr>
0448 <td>fetchOffset.numRetries</td>
0449 <td>int</td>
0450 <td>3</td>
0451 <td>streaming and batch</td>
0452 <td>Number of times to retry before giving up fetching Kafka offsets.</td>
0453 </tr>
0454 <tr>
0455 <td>fetchOffset.retryIntervalMs</td>
0456 <td>long</td>
0457 <td>10</td>
0458 <td>streaming and batch</td>
0459 <td>milliseconds to wait before retrying to fetch Kafka offsets</td>
0460 </tr>
0461 <tr>
0462 <td>maxOffsetsPerTrigger</td>
0463 <td>long</td>
0464 <td>none</td>
0465 <td>streaming and batch</td>
0466 <td>Rate limit on maximum number of offsets processed per trigger interval. The specified total number of offsets will be proportionally split across topicPartitions of different volume.</td>
0467 </tr>
0468 <tr>
0469 <td>minPartitions</td>
0470 <td>int</td>
0471 <td>none</td>
0472 <td>streaming and batch</td>
0473 <td>Desired minimum number of partitions to read from Kafka.
0474 By default, Spark has a 1-1 mapping of topicPartitions to Spark partitions consuming from Kafka.
0475 If you set this option to a value greater than your topicPartitions, Spark will divvy up large
0476 Kafka partitions to smaller pieces. Please note that this configuration is like a <code>hint</code>: the
0477 number of Spark tasks will be <strong>approximately</strong> <code>minPartitions</code>. It can be less or more depending on
0478 rounding errors or Kafka partitions that didn't receive any new data.</td>
0479 </tr>
0480 <tr>
0481 <td>groupIdPrefix</td>
0482 <td>string</td>
0483 <td>spark-kafka-source</td>
0484 <td>streaming and batch</td>
0485 <td>Prefix of consumer group identifiers (<code>group.id</code>) that are generated by structured streaming
0486 queries. If "kafka.group.id" is set, this option will be ignored.</td>
0487 </tr>
0488 <tr>
0489 <td>kafka.group.id</td>
0490 <td>string</td>
0491 <td>none</td>
0492 <td>streaming and batch</td>
0493 <td>The Kafka group id to use in Kafka consumer while reading from Kafka. Use this with caution.
0494 By default, each query generates a unique group id for reading data. This ensures that each Kafka
0495 source has its own consumer group that does not face interference from any other consumer, and
0496 therefore can read all of the partitions of its subscribed topics. In some scenarios (for example,
0497 Kafka group-based authorization), you may want to use a specific authorized group id to read data.
0498 You can optionally set the group id. However, do this with extreme caution as it can cause
0499 unexpected behavior. Concurrently running queries (both, batch and streaming) or sources with the
0500 same group id are likely interfere with each other causing each query to read only part of the
0501 data. This may also occur when queries are started/restarted in quick succession. To minimize such
0502 issues, set the Kafka consumer session timeout (by setting option "kafka.session.timeout.ms") to
0503 be very small. When this is set, option "groupIdPrefix" will be ignored.</td>
0504 </tr>
0505 <tr>
0506 <td>includeHeaders</td>
0507 <td>boolean</td>
0508 <td>false</td>
0509 <td>streaming and batch</td>
0510 <td>Whether to include the Kafka headers in the row.</td>
0511 </tr>
0512 </table>
0513
0514 ### Consumer Caching
0515
0516 It's time-consuming to initialize Kafka consumers, especially in streaming scenarios where processing time is a key factor.
0517 Because of this, Spark pools Kafka consumers on executors, by leveraging Apache Commons Pool.
0518
0519 The caching key is built up from the following information:
0520
0521 * Topic name
0522 * Topic partition
0523 * Group ID
0524
0525 The following properties are available to configure the consumer pool:
0526
0527 <table class="table">
0528 <tr><th>Property Name</th><th>Default</th><th>Meaning</th><th>Since Version</th></tr>
0529 <tr>
0530 <td>spark.kafka.consumer.cache.capacity</td>
0531 <td>The maximum number of consumers cached. Please note that it's a soft limit.</td>
0532 <td>64</td>
0533 <td>3.0.0</td>
0534 </tr>
0535 <tr>
0536 <td>spark.kafka.consumer.cache.timeout</td>
0537 <td>The minimum amount of time a consumer may sit idle in the pool before it is eligible for eviction by the evictor.</td>
0538 <td>5m (5 minutes)</td>
0539 <td>3.0.0</td>
0540 </tr>
0541 <tr>
0542 <td>spark.kafka.consumer.cache.evictorThreadRunInterval</td>
0543 <td>The interval of time between runs of the idle evictor thread for consumer pool. When non-positive, no idle evictor thread will be run.</td>
0544 <td>1m (1 minute)</td>
0545 <td>3.0.0</td>
0546 </tr>
0547 <tr>
0548 <td>spark.kafka.consumer.cache.jmx.enable</td>
0549 <td>Enable or disable JMX for pools created with this configuration instance. Statistics of the pool are available via JMX instance.
0550 The prefix of JMX name is set to "kafka010-cached-simple-kafka-consumer-pool".
0551 </td>
0552 <td>false</td>
0553 <td>3.0.0</td>
0554 </tr>
0555 </table>
0556
0557 The size of the pool is limited by <code>spark.kafka.consumer.cache.capacity</code>,
0558 but it works as "soft-limit" to not block Spark tasks.
0559
0560 Idle eviction thread periodically removes consumers which are not used longer than given timeout.
0561 If this threshold is reached when borrowing, it tries to remove the least-used entry that is currently not in use.
0562
0563 If it cannot be removed, then the pool will keep growing. In the worst case, the pool will grow to
0564 the max number of concurrent tasks that can run in the executor (that is, number of task slots).
0565
0566 If a task fails for any reason, the new task is executed with a newly created Kafka consumer for safety reasons.
0567 At the same time, we invalidate all consumers in pool which have same caching key, to remove consumer which was used
0568 in failed execution. Consumers which any other tasks are using will not be closed, but will be invalidated as well
0569 when they are returned into pool.
0570
0571 Along with consumers, Spark pools the records fetched from Kafka separately, to let Kafka consumers stateless in point
0572 of Spark's view, and maximize the efficiency of pooling. It leverages same cache key with Kafka consumers pool.
0573 Note that it doesn't leverage Apache Commons Pool due to the difference of characteristics.
0574
0575 The following properties are available to configure the fetched data pool:
0576
0577 <table class="table">
0578 <tr><th>Property Name</th><th>Default</th><th>Meaning</th><th>Since Version</th></tr>
0579 <tr>
0580 <td>spark.kafka.consumer.fetchedData.cache.timeout</td>
0581 <td>The minimum amount of time a fetched data may sit idle in the pool before it is eligible for eviction by the evictor.</td>
0582 <td>5m (5 minutes)</td>
0583 <td>3.0.0</td>
0584 </tr>
0585 <tr>
0586 <td>spark.kafka.consumer.fetchedData.cache.evictorThreadRunInterval</td>
0587 <td>The interval of time between runs of the idle evictor thread for fetched data pool. When non-positive, no idle evictor thread will be run.</td>
0588 <td>1m (1 minute)</td>
0589 <td>3.0.0</td>
0590 </tr>
0591 </table>
0592
0593 ## Writing Data to Kafka
0594
0595 Here, we describe the support for writing Streaming Queries and Batch Queries to Apache Kafka. Take note that
0596 Apache Kafka only supports at least once write semantics. Consequently, when writing---either Streaming Queries
0597 or Batch Queries---to Kafka, some records may be duplicated; this can happen, for example, if Kafka needs
0598 to retry a message that was not acknowledged by a Broker, even though that Broker received and wrote the message record.
0599 Structured Streaming cannot prevent such duplicates from occurring due to these Kafka write semantics. However,
0600 if writing the query is successful, then you can assume that the query output was written at least once. A possible
0601 solution to remove duplicates when reading the written data could be to introduce a primary (unique) key
0602 that can be used to perform de-duplication when reading.
0603
0604 The Dataframe being written to Kafka should have the following columns in schema:
0605 <table class="table">
0606 <tr><th>Column</th><th>Type</th></tr>
0607 <tr>
0608 <td>key (optional)</td>
0609 <td>string or binary</td>
0610 </tr>
0611 <tr>
0612 <td>value (required)</td>
0613 <td>string or binary</td>
0614 </tr>
0615 <tr>
0616 <td>headers (optional)</td>
0617 <td>array</td>
0618 </tr>
0619 <tr>
0620 <td>topic (*optional)</td>
0621 <td>string</td>
0622 </tr>
0623 <tr>
0624 <td>partition (optional)</td>
0625 <td>int</td>
0626 </tr>
0627 </table>
0628 \* The topic column is required if the "topic" configuration option is not specified.<br>
0629
0630 The value column is the only required option. If a key column is not specified then
0631 a ```null``` valued key column will be automatically added (see Kafka semantics on
0632 how ```null``` valued key values are handled). If a topic column exists then its value
0633 is used as the topic when writing the given row to Kafka, unless the "topic" configuration
0634 option is set i.e., the "topic" configuration option overrides the topic column.
0635 If a "partition" column is not specified (or its value is ```null```)
0636 then the partition is calculated by the Kafka producer.
0637 A Kafka partitioner can be specified in Spark by setting the
0638 ```kafka.partitioner.class``` option. If not present, Kafka default partitioner
0639 will be used.
0640
0641
0642 The following options must be set for the Kafka sink
0643 for both batch and streaming queries.
0644
0645 <table class="table">
0646 <tr><th>Option</th><th>value</th><th>meaning</th></tr>
0647 <tr>
0648 <td>kafka.bootstrap.servers</td>
0649 <td>A comma-separated list of host:port</td>
0650 <td>The Kafka "bootstrap.servers" configuration.</td>
0651 </tr>
0652 </table>
0653
0654 The following configurations are optional:
0655
0656 <table class="table">
0657 <tr><th>Option</th><th>value</th><th>default</th><th>query type</th><th>meaning</th></tr>
0658 <tr>
0659 <td>topic</td>
0660 <td>string</td>
0661 <td>none</td>
0662 <td>streaming and batch</td>
0663 <td>Sets the topic that all rows will be written to in Kafka. This option overrides any
0664 topic column that may exist in the data.</td>
0665 </tr>
0666 <tr>
0667 <td>includeHeaders</td>
0668 <td>boolean</td>
0669 <td>false</td>
0670 <td>streaming and batch</td>
0671 <td>Whether to include the Kafka headers in the row.</td>
0672 </tr>
0673 </table>
0674
0675 ### Creating a Kafka Sink for Streaming Queries
0676
0677 <div class="codetabs">
0678 <div data-lang="scala" markdown="1">
0679 {% highlight scala %}
0680
0681 // Write key-value data from a DataFrame to a specific Kafka topic specified in an option
0682 val ds = df
0683 .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
0684 .writeStream
0685 .format("kafka")
0686 .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
0687 .option("topic", "topic1")
0688 .start()
0689
0690 // Write key-value data from a DataFrame to Kafka using a topic specified in the data
0691 val ds = df
0692 .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
0693 .writeStream
0694 .format("kafka")
0695 .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
0696 .start()
0697
0698 {% endhighlight %}
0699 </div>
0700 <div data-lang="java" markdown="1">
0701 {% highlight java %}
0702
0703 // Write key-value data from a DataFrame to a specific Kafka topic specified in an option
0704 StreamingQuery ds = df
0705 .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
0706 .writeStream()
0707 .format("kafka")
0708 .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
0709 .option("topic", "topic1")
0710 .start();
0711
0712 // Write key-value data from a DataFrame to Kafka using a topic specified in the data
0713 StreamingQuery ds = df
0714 .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
0715 .writeStream()
0716 .format("kafka")
0717 .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
0718 .start();
0719
0720 {% endhighlight %}
0721 </div>
0722 <div data-lang="python" markdown="1">
0723 {% highlight python %}
0724
0725 # Write key-value data from a DataFrame to a specific Kafka topic specified in an option
0726 ds = df \
0727 .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
0728 .writeStream \
0729 .format("kafka") \
0730 .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
0731 .option("topic", "topic1") \
0732 .start()
0733
0734 # Write key-value data from a DataFrame to Kafka using a topic specified in the data
0735 ds = df \
0736 .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
0737 .writeStream \
0738 .format("kafka") \
0739 .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
0740 .start()
0741
0742 {% endhighlight %}
0743 </div>
0744 </div>
0745
0746 ### Writing the output of Batch Queries to Kafka
0747
0748 <div class="codetabs">
0749 <div data-lang="scala" markdown="1">
0750 {% highlight scala %}
0751
0752 // Write key-value data from a DataFrame to a specific Kafka topic specified in an option
0753 df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
0754 .write
0755 .format("kafka")
0756 .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
0757 .option("topic", "topic1")
0758 .save()
0759
0760 // Write key-value data from a DataFrame to Kafka using a topic specified in the data
0761 df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
0762 .write
0763 .format("kafka")
0764 .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
0765 .save()
0766
0767 {% endhighlight %}
0768 </div>
0769 <div data-lang="java" markdown="1">
0770 {% highlight java %}
0771
0772 // Write key-value data from a DataFrame to a specific Kafka topic specified in an option
0773 df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
0774 .write()
0775 .format("kafka")
0776 .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
0777 .option("topic", "topic1")
0778 .save();
0779
0780 // Write key-value data from a DataFrame to Kafka using a topic specified in the data
0781 df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
0782 .write()
0783 .format("kafka")
0784 .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
0785 .save();
0786
0787 {% endhighlight %}
0788 </div>
0789 <div data-lang="python" markdown="1">
0790 {% highlight python %}
0791
0792 # Write key-value data from a DataFrame to a specific Kafka topic specified in an option
0793 df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
0794 .write \
0795 .format("kafka") \
0796 .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
0797 .option("topic", "topic1") \
0798 .save()
0799
0800 # Write key-value data from a DataFrame to Kafka using a topic specified in the data
0801 df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
0802 .write \
0803 .format("kafka") \
0804 .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
0805 .save()
0806
0807 {% endhighlight %}
0808 </div>
0809 </div>
0810
0811 ### Producer Caching
0812
0813 Given Kafka producer instance is designed to be thread-safe, Spark initializes a Kafka producer instance and co-use across tasks for same caching key.
0814
0815 The caching key is built up from the following information:
0816
0817 * Kafka producer configuration
0818
0819 This includes configuration for authorization, which Spark will automatically include when delegation token is being used. Even we take authorization into account, you can expect same Kafka producer instance will be used among same Kafka producer configuration.
0820 It will use different Kafka producer when delegation token is renewed; Kafka producer instance for old delegation token will be evicted according to the cache policy.
0821
0822 The following properties are available to configure the producer pool:
0823
0824 <table class="table">
0825 <tr><th>Property Name</th><th>Default</th><th>Meaning</th><th>Since Version</th></tr>
0826 <tr>
0827 <td>spark.kafka.producer.cache.timeout</td>
0828 <td>The minimum amount of time a producer may sit idle in the pool before it is eligible for eviction by the evictor.</td>
0829 <td>10m (10 minutes)</td>
0830 <td>2.2.1</td>
0831 </tr>
0832 <tr>
0833 <td>spark.kafka.producer.cache.evictorThreadRunInterval</td>
0834 <td>The interval of time between runs of the idle evictor thread for producer pool. When non-positive, no idle evictor thread will be run.</td>
0835 <td>1m (1 minute)</td>
0836 <td>3.0.0</td>
0837 </tr>
0838 </table>
0839
0840 Idle eviction thread periodically removes producers which are not used longer than given timeout. Note that the producer is shared and used concurrently, so the last used timestamp is determined by the moment the producer instance is returned and reference count is 0.
0841
0842 ## Kafka Specific Configurations
0843
0844 Kafka's own configurations can be set via `DataStreamReader.option` with `kafka.` prefix, e.g,
0845 `stream.option("kafka.bootstrap.servers", "host:port")`. For possible kafka parameters, see
0846 [Kafka consumer config docs](http://kafka.apache.org/documentation.html#newconsumerconfigs) for
0847 parameters related to reading data, and [Kafka producer config docs](http://kafka.apache.org/documentation/#producerconfigs)
0848 for parameters related to writing data.
0849
0850 Note that the following Kafka params cannot be set and the Kafka source or sink will throw an exception:
0851
0852 - **group.id**: Kafka source will create a unique group id for each query automatically. The user can
0853 set the prefix of the automatically generated group.id's via the optional source option `groupIdPrefix`,
0854 default value is "spark-kafka-source". You can also set "kafka.group.id" to force Spark to use a special
0855 group id, however, please read warnings for this option and use it with caution.
0856 - **auto.offset.reset**: Set the source option `startingOffsets` to specify
0857 where to start instead. Structured Streaming manages which offsets are consumed internally, rather
0858 than rely on the kafka Consumer to do it. This will ensure that no data is missed when new
0859 topics/partitions are dynamically subscribed. Note that `startingOffsets` only applies when a new
0860 streaming query is started, and that resuming will always pick up from where the query left off.
0861 - **key.deserializer**: Keys are always deserialized as byte arrays with ByteArrayDeserializer. Use
0862 DataFrame operations to explicitly deserialize the keys.
0863 - **value.deserializer**: Values are always deserialized as byte arrays with ByteArrayDeserializer.
0864 Use DataFrame operations to explicitly deserialize the values.
0865 - **key.serializer**: Keys are always serialized with ByteArraySerializer or StringSerializer. Use
0866 DataFrame operations to explicitly serialize the keys into either strings or byte arrays.
0867 - **value.serializer**: values are always serialized with ByteArraySerializer or StringSerializer. Use
0868 DataFrame operations to explicitly serialize the values into either strings or byte arrays.
0869 - **enable.auto.commit**: Kafka source doesn't commit any offset.
0870 - **interceptor.classes**: Kafka source always read keys and values as byte arrays. It's not safe to
0871 use ConsumerInterceptor as it may break the query.
0872
0873 ## Deploying
0874
0875 As with any Spark applications, `spark-submit` is used to launch your application. `spark-sql-kafka-0-10_{{site.SCALA_BINARY_VERSION}}`
0876 and its dependencies can be directly added to `spark-submit` using `--packages`, such as,
0877
0878 ./bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION_SHORT}} ...
0879
0880 For experimenting on `spark-shell`, you can also use `--packages` to add `spark-sql-kafka-0-10_{{site.SCALA_BINARY_VERSION}}` and its dependencies directly,
0881
0882 ./bin/spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION_SHORT}} ...
0883
0884 See [Application Submission Guide](submitting-applications.html) for more details about submitting
0885 applications with external dependencies.
0886
0887 ## Security
0888
0889 Kafka 0.9.0.0 introduced several features that increases security in a cluster. For detailed
0890 description about these possibilities, see [Kafka security docs](http://kafka.apache.org/documentation.html#security).
0891
0892 It's worth noting that security is optional and turned off by default.
0893
0894 Spark supports the following ways to authenticate against Kafka cluster:
0895 - **Delegation token (introduced in Kafka broker 1.1.0)**
0896 - **JAAS login configuration**
0897
0898 ### Delegation token
0899
0900 This way the application can be configured via Spark parameters and may not need JAAS login
0901 configuration (Spark can use Kafka's dynamic JAAS configuration feature). For further information
0902 about delegation tokens, see [Kafka delegation token docs](http://kafka.apache.org/documentation/#security_delegation_token).
0903
0904 The process is initiated by Spark's Kafka delegation token provider. When `spark.kafka.clusters.${cluster}.auth.bootstrap.servers` is set,
0905 Spark considers the following log in options, in order of preference:
0906 - **JAAS login configuration**, please see example below.
0907 - **Keytab file**, such as,
0908
0909 ./bin/spark-submit \
0910 --keytab <KEYTAB_FILE> \
0911 --principal <PRINCIPAL> \
0912 --conf spark.kafka.clusters.${cluster}.auth.bootstrap.servers=<KAFKA_SERVERS> \
0913 ...
0914
0915 - **Kerberos credential cache**, such as,
0916
0917 ./bin/spark-submit \
0918 --conf spark.kafka.clusters.${cluster}.auth.bootstrap.servers=<KAFKA_SERVERS> \
0919 ...
0920
0921 The Kafka delegation token provider can be turned off by setting `spark.security.credentials.kafka.enabled` to `false` (default: `true`).
0922
0923 Spark can be configured to use the following authentication protocols to obtain token (it must match with
0924 Kafka broker configuration):
0925 - **SASL SSL (default)**
0926 - **SSL**
0927 - **SASL PLAINTEXT (for testing)**
0928
0929 After obtaining delegation token successfully, Spark distributes it across nodes and renews it accordingly.
0930 Delegation token uses `SCRAM` login module for authentication and because of that the appropriate
0931 `spark.kafka.clusters.${cluster}.sasl.token.mechanism` (default: `SCRAM-SHA-512`) has to be configured. Also, this parameter
0932 must match with Kafka broker configuration.
0933
0934 When delegation token is available on an executor Spark considers the following log in options, in order of preference:
0935 - **JAAS login configuration**, please see example below.
0936 - **Delegation token**, please see <code>spark.kafka.clusters.${cluster}.target.bootstrap.servers.regex</code> parameter for further details.
0937
0938 When none of the above applies then unsecure connection assumed.
0939
0940
0941 #### Configuration
0942
0943 Delegation tokens can be obtained from multiple clusters and <code>${cluster}</code> is an arbitrary unique identifier which helps to group different configurations.
0944
0945 <table class="table">
0946 <tr><th>Property Name</th><th>Default</th><th>Meaning</th><th>Since Version</th></tr>
0947 <tr>
0948 <td><code>spark.kafka.clusters.${cluster}.auth.bootstrap.servers</code></td>
0949 <td>None</td>
0950 <td>
0951 A list of coma separated host/port pairs to use for establishing the initial connection
0952 to the Kafka cluster. For further details please see Kafka documentation. Only used to obtain delegation token.
0953 </td>
0954 <td>3.0.0</td>
0955 </tr>
0956 <tr>
0957 <td><code>spark.kafka.clusters.${cluster}.target.bootstrap.servers.regex</code></td>
0958 <td>.*</td>
0959 <td>
0960 Regular expression to match against the <code>bootstrap.servers</code> config for sources and sinks in the application.
0961 If a server address matches this regex, the delegation token obtained from the respective bootstrap servers will be used when connecting.
0962 If multiple clusters match the address, an exception will be thrown and the query won't be started.
0963 Kafka's secure and unsecure listeners are bound to different ports. When both used the secure listener port has to be part of the regular expression.
0964 </td>
0965 <td>3.0.0</td>
0966 </tr>
0967 <tr>
0968 <td><code>spark.kafka.clusters.${cluster}.security.protocol</code></td>
0969 <td>SASL_SSL</td>
0970 <td>
0971 Protocol used to communicate with brokers. For further details please see Kafka documentation. Protocol is applied on all the sources and sinks as default where
0972 <code>bootstrap.servers</code> config matches (for further details please see <code>spark.kafka.clusters.${cluster}.target.bootstrap.servers.regex</code>),
0973 and can be overridden by setting <code>kafka.security.protocol</code> on the source or sink.
0974 </td>
0975 <td>3.0.0</td>
0976 </tr>
0977 <tr>
0978 <td><code>spark.kafka.clusters.${cluster}.sasl.kerberos.service.name</code></td>
0979 <td>kafka</td>
0980 <td>
0981 The Kerberos principal name that Kafka runs as. This can be defined either in Kafka's JAAS config or in Kafka's config.
0982 For further details please see Kafka documentation. Only used to obtain delegation token.
0983 </td>
0984 <td>3.0.0</td>
0985 </tr>
0986 <tr>
0987 <td><code>spark.kafka.clusters.${cluster}.ssl.truststore.location</code></td>
0988 <td>None</td>
0989 <td>
0990 The location of the trust store file. For further details please see Kafka documentation. Only used to obtain delegation token.
0991 </td>
0992 <td>3.0.0</td>
0993 </tr>
0994 <tr>
0995 <td><code>spark.kafka.clusters.${cluster}.ssl.truststore.password</code></td>
0996 <td>None</td>
0997 <td>
0998 The store password for the trust store file. This is optional and only needed if <code>spark.kafka.clusters.${cluster}.ssl.truststore.location</code> is configured.
0999 For further details please see Kafka documentation. Only used to obtain delegation token.
1000 </td>
1001 <td>3.0.0</td>
1002 </tr>
1003 <tr>
1004 <td><code>spark.kafka.clusters.${cluster}.ssl.keystore.location</code></td>
1005 <td>None</td>
1006 <td>
1007 The location of the key store file. This is optional for client and can be used for two-way authentication for client.
1008 For further details please see Kafka documentation. Only used to obtain delegation token.
1009 </td>
1010 <td>3.0.0</td>
1011 </tr>
1012 <tr>
1013 <td><code>spark.kafka.clusters.${cluster}.ssl.keystore.password</code></td>
1014 <td>None</td>
1015 <td>
1016 The store password for the key store file. This is optional and only needed if <code>spark.kafka.clusters.${cluster}.ssl.keystore.location</code> is configured.
1017 For further details please see Kafka documentation. Only used to obtain delegation token.
1018 </td>
1019 <td>3.0.0</td>
1020 </tr>
1021 <tr>
1022 <td><code>spark.kafka.clusters.${cluster}.ssl.key.password</code></td>
1023 <td>None</td>
1024 <td>
1025 The password of the private key in the key store file. This is optional for client.
1026 For further details please see Kafka documentation. Only used to obtain delegation token.
1027 </td>
1028 <td>3.0.0</td>
1029 </tr>
1030 <tr>
1031 <td><code>spark.kafka.clusters.${cluster}.sasl.token.mechanism</code></td>
1032 <td>SCRAM-SHA-512</td>
1033 <td>
1034 SASL mechanism used for client connections with delegation token. Because SCRAM login module used for authentication a compatible mechanism has to be set here.
1035 For further details please see Kafka documentation (<code>sasl.mechanism</code>). Only used to authenticate against Kafka broker with delegation token.
1036 </td>
1037 <td>3.0.0</td>
1038 </tr>
1039 </table>
1040
1041 #### Kafka Specific Configurations
1042
1043 Kafka's own configurations can be set with `kafka.` prefix, e.g, `--conf spark.kafka.clusters.${cluster}.kafka.retries=1`.
1044 For possible Kafka parameters, see [Kafka adminclient config docs](http://kafka.apache.org/documentation.html#adminclientconfigs).
1045
1046 #### Caveats
1047
1048 - Obtaining delegation token for proxy user is not yet supported ([KAFKA-6945](https://issues.apache.org/jira/browse/KAFKA-6945)).
1049
1050 ### JAAS login configuration
1051
1052 JAAS login configuration must placed on all nodes where Spark tries to access Kafka cluster.
1053 This provides the possibility to apply any custom authentication logic with a higher cost to maintain.
1054 This can be done several ways. One possibility is to provide additional JVM parameters, such as,
1055
1056 ./bin/spark-submit \
1057 --driver-java-options "-Djava.security.auth.login.config=/path/to/custom_jaas.conf" \
1058 --conf spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/path/to/custom_jaas.conf \
1059 ...