Back to home page

OSCL-LXR

 
 

    


0001 ---
0002 layout: global
0003 title: Structured Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher)
0004 license: |
0005   Licensed to the Apache Software Foundation (ASF) under one or more
0006   contributor license agreements.  See the NOTICE file distributed with
0007   this work for additional information regarding copyright ownership.
0008   The ASF licenses this file to You under the Apache License, Version 2.0
0009   (the "License"); you may not use this file except in compliance with
0010   the License.  You may obtain a copy of the License at
0011  
0012      http://www.apache.org/licenses/LICENSE-2.0
0013  
0014   Unless required by applicable law or agreed to in writing, software
0015   distributed under the License is distributed on an "AS IS" BASIS,
0016   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0017   See the License for the specific language governing permissions and
0018   limitations under the License.
0019 ---
0020 
0021 Structured Streaming integration for Kafka 0.10 to read data from and write data to Kafka.
0022 
0023 ## Linking
0024 For Scala/Java applications using SBT/Maven project definitions, link your application with the following artifact:
0025 
0026     groupId = org.apache.spark
0027     artifactId = spark-sql-kafka-0-10_{{site.SCALA_BINARY_VERSION}}
0028     version = {{site.SPARK_VERSION_SHORT}}
0029 
0030 Please note that to use the headers functionality, your Kafka client version should be version 0.11.0.0 or up.
0031 
0032 For Python applications, you need to add this above library and its dependencies when deploying your
0033 application. See the [Deploying](#deploying) subsection below.
0034 
0035 For experimenting on `spark-shell`, you need to add this above library and its dependencies too when invoking `spark-shell`. Also, see the [Deploying](#deploying) subsection below.
0036 
0037 ## Reading Data from Kafka
0038 
0039 ### Creating a Kafka Source for Streaming Queries
0040 
0041 <div class="codetabs">
0042 <div data-lang="scala" markdown="1">
0043 {% highlight scala %}
0044 
0045 // Subscribe to 1 topic
0046 val df = spark
0047   .readStream
0048   .format("kafka")
0049   .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
0050   .option("subscribe", "topic1")
0051   .load()
0052 df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
0053   .as[(String, String)]
0054 
0055 // Subscribe to 1 topic, with headers
0056 val df = spark
0057   .readStream
0058   .format("kafka")
0059   .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
0060   .option("subscribe", "topic1")
0061   .option("includeHeaders", "true")
0062   .load()
0063 df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers")
0064   .as[(String, String, Map)]
0065 
0066 // Subscribe to multiple topics
0067 val df = spark
0068   .readStream
0069   .format("kafka")
0070   .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
0071   .option("subscribe", "topic1,topic2")
0072   .load()
0073 df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
0074   .as[(String, String)]
0075 
0076 // Subscribe to a pattern
0077 val df = spark
0078   .readStream
0079   .format("kafka")
0080   .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
0081   .option("subscribePattern", "topic.*")
0082   .load()
0083 df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
0084   .as[(String, String)]
0085 
0086 {% endhighlight %}
0087 </div>
0088 <div data-lang="java" markdown="1">
0089 {% highlight java %}
0090 
0091 // Subscribe to 1 topic
0092 Dataset<Row> df = spark
0093   .readStream()
0094   .format("kafka")
0095   .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
0096   .option("subscribe", "topic1")
0097   .load();
0098 df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
0099 
0100 // Subscribe to 1 topic, with headers
0101 Dataset<Row> df = spark
0102   .readStream()
0103   .format("kafka")
0104   .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
0105   .option("subscribe", "topic1")
0106   .option("includeHeaders", "true")
0107   .load()
0108 df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers");
0109 
0110 // Subscribe to multiple topics
0111 Dataset<Row> df = spark
0112   .readStream()
0113   .format("kafka")
0114   .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
0115   .option("subscribe", "topic1,topic2")
0116   .load();
0117 df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
0118 
0119 // Subscribe to a pattern
0120 Dataset<Row> df = spark
0121   .readStream()
0122   .format("kafka")
0123   .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
0124   .option("subscribePattern", "topic.*")
0125   .load();
0126 df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
0127 
0128 {% endhighlight %}
0129 </div>
0130 <div data-lang="python" markdown="1">
0131 {% highlight python %}
0132 
0133 # Subscribe to 1 topic
0134 df = spark \
0135   .readStream \
0136   .format("kafka") \
0137   .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
0138   .option("subscribe", "topic1") \
0139   .load()
0140 df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
0141 
0142 # Subscribe to 1 topic, with headers
0143 val df = spark \
0144   .readStream \
0145   .format("kafka") \
0146   .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
0147   .option("subscribe", "topic1") \
0148   .option("includeHeaders", "true") \
0149   .load()
0150 df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers")
0151 
0152 # Subscribe to multiple topics
0153 df = spark \
0154   .readStream \
0155   .format("kafka") \
0156   .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
0157   .option("subscribe", "topic1,topic2") \
0158   .load()
0159 df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
0160 
0161 # Subscribe to a pattern
0162 df = spark \
0163   .readStream \
0164   .format("kafka") \
0165   .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
0166   .option("subscribePattern", "topic.*") \
0167   .load()
0168 df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
0169 
0170 {% endhighlight %}
0171 </div>
0172 </div>
0173 
0174 ### Creating a Kafka Source for Batch Queries
0175 If you have a use case that is better suited to batch processing,
0176 you can create a Dataset/DataFrame for a defined range of offsets.
0177 
0178 <div class="codetabs">
0179 <div data-lang="scala" markdown="1">
0180 {% highlight scala %}
0181 
0182 // Subscribe to 1 topic defaults to the earliest and latest offsets
0183 val df = spark
0184   .read
0185   .format("kafka")
0186   .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
0187   .option("subscribe", "topic1")
0188   .load()
0189 df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
0190   .as[(String, String)]
0191 
0192 // Subscribe to multiple topics, specifying explicit Kafka offsets
0193 val df = spark
0194   .read
0195   .format("kafka")
0196   .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
0197   .option("subscribe", "topic1,topic2")
0198   .option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""")
0199   .option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""")
0200   .load()
0201 df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
0202   .as[(String, String)]
0203 
0204 // Subscribe to a pattern, at the earliest and latest offsets
0205 val df = spark
0206   .read
0207   .format("kafka")
0208   .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
0209   .option("subscribePattern", "topic.*")
0210   .option("startingOffsets", "earliest")
0211   .option("endingOffsets", "latest")
0212   .load()
0213 df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
0214   .as[(String, String)]
0215 
0216 {% endhighlight %}
0217 </div>
0218 <div data-lang="java" markdown="1">
0219 {% highlight java %}
0220 
0221 // Subscribe to 1 topic defaults to the earliest and latest offsets
0222 Dataset<Row> df = spark
0223   .read()
0224   .format("kafka")
0225   .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
0226   .option("subscribe", "topic1")
0227   .load();
0228 df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
0229 
0230 // Subscribe to multiple topics, specifying explicit Kafka offsets
0231 Dataset<Row> df = spark
0232   .read()
0233   .format("kafka")
0234   .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
0235   .option("subscribe", "topic1,topic2")
0236   .option("startingOffsets", "{\"topic1\":{\"0\":23,\"1\":-2},\"topic2\":{\"0\":-2}}")
0237   .option("endingOffsets", "{\"topic1\":{\"0\":50,\"1\":-1},\"topic2\":{\"0\":-1}}")
0238   .load();
0239 df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
0240 
0241 // Subscribe to a pattern, at the earliest and latest offsets
0242 Dataset<Row> df = spark
0243   .read()
0244   .format("kafka")
0245   .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
0246   .option("subscribePattern", "topic.*")
0247   .option("startingOffsets", "earliest")
0248   .option("endingOffsets", "latest")
0249   .load();
0250 df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
0251 
0252 {% endhighlight %}
0253 </div>
0254 <div data-lang="python" markdown="1">
0255 {% highlight python %}
0256 
0257 # Subscribe to 1 topic defaults to the earliest and latest offsets
0258 df = spark \
0259   .read \
0260   .format("kafka") \
0261   .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
0262   .option("subscribe", "topic1") \
0263   .load()
0264 df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
0265 
0266 # Subscribe to multiple topics, specifying explicit Kafka offsets
0267 df = spark \
0268   .read \
0269   .format("kafka") \
0270   .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
0271   .option("subscribe", "topic1,topic2") \
0272   .option("startingOffsets", """{"topic1":{"0":23,"1":-2},"topic2":{"0":-2}}""") \
0273   .option("endingOffsets", """{"topic1":{"0":50,"1":-1},"topic2":{"0":-1}}""") \
0274   .load()
0275 df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
0276 
0277 # Subscribe to a pattern, at the earliest and latest offsets
0278 df = spark \
0279   .read \
0280   .format("kafka") \
0281   .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
0282   .option("subscribePattern", "topic.*") \
0283   .option("startingOffsets", "earliest") \
0284   .option("endingOffsets", "latest") \
0285   .load()
0286 df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
0287 {% endhighlight %}
0288 </div>
0289 </div>
0290 
0291 Each row in the source has the following schema:
0292 <table class="table">
0293 <tr><th>Column</th><th>Type</th></tr>
0294 <tr>
0295   <td>key</td>
0296   <td>binary</td>
0297 </tr>
0298 <tr>
0299   <td>value</td>
0300   <td>binary</td>
0301 </tr>
0302 <tr>
0303   <td>topic</td>
0304   <td>string</td>
0305 </tr>
0306 <tr>
0307   <td>partition</td>
0308   <td>int</td>
0309 </tr>
0310 <tr>
0311   <td>offset</td>
0312   <td>long</td>
0313 </tr>
0314 <tr>
0315   <td>timestamp</td>
0316   <td>timestamp</td>
0317 </tr>
0318 <tr>
0319   <td>timestampType</td>
0320   <td>int</td>
0321 </tr>
0322 <tr>
0323   <td>headers (optional)</td>
0324   <td>array</td>
0325 </tr>
0326 </table>
0327 
0328 The following options must be set for the Kafka source
0329 for both batch and streaming queries.
0330 
0331 <table class="table">
0332 <tr><th>Option</th><th>value</th><th>meaning</th></tr>
0333 <tr>
0334   <td>assign</td>
0335   <td>json string {"topicA":[0,1],"topicB":[2,4]}</td>
0336   <td>Specific TopicPartitions to consume.
0337   Only one of "assign", "subscribe" or "subscribePattern"
0338   options can be specified for Kafka source.</td>
0339 </tr>
0340 <tr>
0341   <td>subscribe</td>
0342   <td>A comma-separated list of topics</td>
0343   <td>The topic list to subscribe.
0344   Only one of "assign", "subscribe" or "subscribePattern"
0345   options can be specified for Kafka source.</td>
0346 </tr>
0347 <tr>
0348   <td>subscribePattern</td>
0349   <td>Java regex string</td>
0350   <td>The pattern used to subscribe to topic(s).
0351   Only one of "assign, "subscribe" or "subscribePattern"
0352   options can be specified for Kafka source.</td>
0353 </tr>
0354 <tr>
0355   <td>kafka.bootstrap.servers</td>
0356   <td>A comma-separated list of host:port</td>
0357   <td>The Kafka "bootstrap.servers" configuration.</td>
0358 </tr>
0359 </table>
0360 
0361 The following configurations are optional:
0362 
0363 <table class="table">
0364 <tr><th>Option</th><th>value</th><th>default</th><th>query type</th><th>meaning</th></tr>
0365 <tr>
0366   <td>startingOffsetsByTimestamp</td>
0367   <td>json string
0368   """ {"topicA":{"0": 1000, "1": 1000}, "topicB": {"0": 2000, "1": 2000}} """
0369   </td>
0370   <td>none (the value of <code>startingOffsets</code> will apply)</td>
0371   <td>streaming and batch</td>
0372   <td>The start point of timestamp when a query is started, a json string specifying a starting timestamp for
0373   each TopicPartition. The returned offset for each partition is the earliest offset whose timestamp is greater than or
0374   equal to the given timestamp in the corresponding partition. If the matched offset doesn't exist,
0375   the query will fail immediately to prevent unintended read from such partition. (This is a kind of limitation as of now, and will be addressed in near future.)<p/>
0376   <p/>
0377   Spark simply passes the timestamp information to <code>KafkaConsumer.offsetsForTimes</code>, and doesn't interpret or reason about the value. <p/>
0378   For more details on <code>KafkaConsumer.offsetsForTimes</code>, please refer <a href="https://kafka.apache.org/21/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#offsetsForTimes-java.util.Map-">javadoc</a> for details.<p/>
0379   Also the meaning of <code>timestamp</code> here can be vary according to Kafka configuration (<code>log.message.timestamp.type</code>): please refer <a href="https://kafka.apache.org/documentation/">Kafka documentation</a> for further details.<p/>
0380   Note: This option requires Kafka 0.10.1.0 or higher.<p/>
0381   Note2: <code>startingOffsetsByTimestamp</code> takes precedence over <code>startingOffsets</code>.<p/>
0382   Note3: For streaming queries, this only applies when a new query is started, and that resuming will
0383   always pick up from where the query left off. Newly discovered partitions during a query will start at
0384   earliest.</td>
0385 </tr>
0386 <tr>
0387   <td>startingOffsets</td>
0388   <td>"earliest", "latest" (streaming only), or json string
0389   """ {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}} """
0390   </td>
0391   <td>"latest" for streaming, "earliest" for batch</td>
0392   <td>streaming and batch</td>
0393   <td>The start point when a query is started, either "earliest" which is from the earliest offsets,
0394   "latest" which is just from the latest offsets, or a json string specifying a starting offset for
0395   each TopicPartition.  In the json, -2 as an offset can be used to refer to earliest, -1 to latest.
0396   Note: For batch queries, latest (either implicitly or by using -1 in json) is not allowed.
0397   For streaming queries, this only applies when a new query is started, and that resuming will
0398   always pick up from where the query left off. Newly discovered partitions during a query will start at
0399   earliest.</td>
0400 </tr>
0401 <tr>
0402   <td>endingOffsetsByTimestamp</td>
0403   <td>json string
0404   """ {"topicA":{"0": 1000, "1": 1000}, "topicB": {"0": 2000, "1": 2000}} """
0405   </td>
0406   <td>latest</td>
0407   <td>batch query</td>
0408   <td>The end point when a batch query is ended, a json string specifying an ending timestamp for each TopicPartition.
0409   The returned offset for each partition is the earliest offset whose timestamp is greater than or equal to
0410   the given timestamp in the corresponding partition. If the matched offset doesn't exist, the offset will
0411   be set to latest.<p/>
0412   <p/>
0413   Spark simply passes the timestamp information to <code>KafkaConsumer.offsetsForTimes</code>, and doesn't interpret or reason about the value. <p/>
0414   For more details on <code>KafkaConsumer.offsetsForTimes</code>, please refer <a href="https://kafka.apache.org/21/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#offsetsForTimes-java.util.Map-">javadoc</a> for details.<p/>
0415   Also the meaning of <code>timestamp</code> here can be vary according to Kafka configuration (<code>log.message.timestamp.type</code>): please refer <a href="https://kafka.apache.org/documentation/">Kafka documentation</a> for further details.<p/>
0416   Note: This option requires Kafka 0.10.1.0 or higher.<p/>
0417   Note2: <code>endingOffsetsByTimestamp</code> takes precedence over <code>endingOffsets</code>.
0418   </td>
0419 </tr>
0420 <tr>
0421   <td>endingOffsets</td>
0422   <td>latest or json string
0423   {"topicA":{"0":23,"1":-1},"topicB":{"0":-1}}
0424   </td>
0425   <td>latest</td>
0426   <td>batch query</td>
0427   <td>The end point when a batch query is ended, either "latest" which is just referred to the
0428   latest, or a json string specifying an ending offset for each TopicPartition.  In the json, -1
0429   as an offset can be used to refer to latest, and -2 (earliest) as an offset is not allowed.</td>
0430 </tr>
0431 <tr>
0432   <td>failOnDataLoss</td>
0433   <td>true or false</td>
0434   <td>true</td>
0435   <td>streaming and batch</td>
0436   <td>Whether to fail the query when it's possible that data is lost (e.g., topics are deleted, or
0437   offsets are out of range). This may be a false alarm. You can disable it when it doesn't work
0438   as you expected.</td>
0439 </tr>
0440 <tr>
0441   <td>kafkaConsumer.pollTimeoutMs</td>
0442   <td>long</td>
0443   <td>512</td>
0444   <td>streaming and batch</td>
0445   <td>The timeout in milliseconds to poll data from Kafka in executors.</td>
0446 </tr>
0447 <tr>
0448   <td>fetchOffset.numRetries</td>
0449   <td>int</td>
0450   <td>3</td>
0451   <td>streaming and batch</td>
0452   <td>Number of times to retry before giving up fetching Kafka offsets.</td>
0453 </tr>
0454 <tr>
0455   <td>fetchOffset.retryIntervalMs</td>
0456   <td>long</td>
0457   <td>10</td>
0458   <td>streaming and batch</td>
0459   <td>milliseconds to wait before retrying to fetch Kafka offsets</td>
0460 </tr>
0461 <tr>
0462   <td>maxOffsetsPerTrigger</td>
0463   <td>long</td>
0464   <td>none</td>
0465   <td>streaming and batch</td>
0466   <td>Rate limit on maximum number of offsets processed per trigger interval. The specified total number of offsets will be proportionally split across topicPartitions of different volume.</td>
0467 </tr>
0468 <tr>
0469   <td>minPartitions</td>
0470   <td>int</td>
0471   <td>none</td>
0472   <td>streaming and batch</td>
0473   <td>Desired minimum number of partitions to read from Kafka.
0474   By default, Spark has a 1-1 mapping of topicPartitions to Spark partitions consuming from Kafka.
0475   If you set this option to a value greater than your topicPartitions, Spark will divvy up large
0476   Kafka partitions to smaller pieces. Please note that this configuration is like a <code>hint</code>: the
0477   number of Spark tasks will be <strong>approximately</strong> <code>minPartitions</code>. It can be less or more depending on
0478   rounding errors or Kafka partitions that didn't receive any new data.</td>
0479 </tr>
0480 <tr>
0481   <td>groupIdPrefix</td>
0482   <td>string</td>
0483   <td>spark-kafka-source</td>
0484   <td>streaming and batch</td>
0485   <td>Prefix of consumer group identifiers (<code>group.id</code>) that are generated by structured streaming
0486   queries. If "kafka.group.id" is set, this option will be ignored.</td>
0487 </tr>
0488 <tr>
0489   <td>kafka.group.id</td>
0490   <td>string</td>
0491   <td>none</td>
0492   <td>streaming and batch</td>
0493   <td>The Kafka group id to use in Kafka consumer while reading from Kafka. Use this with caution.
0494   By default, each query generates a unique group id for reading data. This ensures that each Kafka
0495   source has its own consumer group that does not face interference from any other consumer, and
0496   therefore can read all of the partitions of its subscribed topics. In some scenarios (for example,
0497   Kafka group-based authorization), you may want to use a specific authorized group id to read data.
0498   You can optionally set the group id. However, do this with extreme caution as it can cause
0499   unexpected behavior. Concurrently running queries (both, batch and streaming) or sources with the
0500   same group id are likely interfere with each other causing each query to read only part of the
0501   data. This may also occur when queries are started/restarted in quick succession. To minimize such
0502   issues, set the Kafka consumer session timeout (by setting option "kafka.session.timeout.ms") to
0503   be very small. When this is set, option "groupIdPrefix" will be ignored.</td>
0504 </tr>
0505 <tr>
0506   <td>includeHeaders</td>
0507   <td>boolean</td>
0508   <td>false</td>
0509   <td>streaming and batch</td>
0510   <td>Whether to include the Kafka headers in the row.</td>
0511 </tr>
0512 </table>
0513 
0514 ### Consumer Caching
0515 
0516 It's time-consuming to initialize Kafka consumers, especially in streaming scenarios where processing time is a key factor.
0517 Because of this, Spark pools Kafka consumers on executors, by leveraging Apache Commons Pool.
0518 
0519 The caching key is built up from the following information:
0520 
0521 * Topic name
0522 * Topic partition
0523 * Group ID
0524 
0525 The following properties are available to configure the consumer pool:
0526 
0527 <table class="table">
0528 <tr><th>Property Name</th><th>Default</th><th>Meaning</th><th>Since Version</th></tr>
0529 <tr>
0530   <td>spark.kafka.consumer.cache.capacity</td>
0531   <td>The maximum number of consumers cached. Please note that it's a soft limit.</td>
0532   <td>64</td>
0533   <td>3.0.0</td>
0534 </tr>
0535 <tr>
0536   <td>spark.kafka.consumer.cache.timeout</td>
0537   <td>The minimum amount of time a consumer may sit idle in the pool before it is eligible for eviction by the evictor.</td>
0538   <td>5m (5 minutes)</td>
0539   <td>3.0.0</td>
0540 </tr>
0541 <tr>
0542   <td>spark.kafka.consumer.cache.evictorThreadRunInterval</td>
0543   <td>The interval of time between runs of the idle evictor thread for consumer pool. When non-positive, no idle evictor thread will be run.</td>
0544   <td>1m (1 minute)</td>
0545   <td>3.0.0</td>
0546 </tr>
0547 <tr>
0548   <td>spark.kafka.consumer.cache.jmx.enable</td>
0549   <td>Enable or disable JMX for pools created with this configuration instance. Statistics of the pool are available via JMX instance.
0550   The prefix of JMX name is set to "kafka010-cached-simple-kafka-consumer-pool".
0551   </td>
0552   <td>false</td>
0553   <td>3.0.0</td>
0554 </tr>
0555 </table>
0556 
0557 The size of the pool is limited by <code>spark.kafka.consumer.cache.capacity</code>,
0558 but it works as "soft-limit" to not block Spark tasks.
0559 
0560 Idle eviction thread periodically removes consumers which are not used longer than given timeout.
0561 If this threshold is reached when borrowing, it tries to remove the least-used entry that is currently not in use.
0562 
0563 If it cannot be removed, then the pool will keep growing. In the worst case, the pool will grow to
0564 the max number of concurrent tasks that can run in the executor (that is, number of task slots).
0565 
0566 If a task fails for any reason, the new task is executed with a newly created Kafka consumer for safety reasons.
0567 At the same time, we invalidate all consumers in pool which have same caching key, to remove consumer which was used
0568 in failed execution. Consumers which any other tasks are using will not be closed, but will be invalidated as well
0569 when they are returned into pool.
0570 
0571 Along with consumers, Spark pools the records fetched from Kafka separately, to let Kafka consumers stateless in point
0572 of Spark's view, and maximize the efficiency of pooling. It leverages same cache key with Kafka consumers pool.
0573 Note that it doesn't leverage Apache Commons Pool due to the difference of characteristics.
0574 
0575 The following properties are available to configure the fetched data pool:
0576 
0577 <table class="table">
0578 <tr><th>Property Name</th><th>Default</th><th>Meaning</th><th>Since Version</th></tr>
0579 <tr>
0580   <td>spark.kafka.consumer.fetchedData.cache.timeout</td>
0581   <td>The minimum amount of time a fetched data may sit idle in the pool before it is eligible for eviction by the evictor.</td>
0582   <td>5m (5 minutes)</td>
0583   <td>3.0.0</td>
0584 </tr>
0585 <tr>
0586   <td>spark.kafka.consumer.fetchedData.cache.evictorThreadRunInterval</td>
0587   <td>The interval of time between runs of the idle evictor thread for fetched data pool. When non-positive, no idle evictor thread will be run.</td>
0588   <td>1m (1 minute)</td>
0589   <td>3.0.0</td>
0590 </tr>
0591 </table>
0592 
0593 ## Writing Data to Kafka
0594 
0595 Here, we describe the support for writing Streaming Queries and Batch Queries to Apache Kafka. Take note that
0596 Apache Kafka only supports at least once write semantics. Consequently, when writing---either Streaming Queries
0597 or Batch Queries---to Kafka, some records may be duplicated; this can happen, for example, if Kafka needs
0598 to retry a message that was not acknowledged by a Broker, even though that Broker received and wrote the message record.
0599 Structured Streaming cannot prevent such duplicates from occurring due to these Kafka write semantics. However,
0600 if writing the query is successful, then you can assume that the query output was written at least once. A possible
0601 solution to remove duplicates when reading the written data could be to introduce a primary (unique) key
0602 that can be used to perform de-duplication when reading.
0603 
0604 The Dataframe being written to Kafka should have the following columns in schema:
0605 <table class="table">
0606 <tr><th>Column</th><th>Type</th></tr>
0607 <tr>
0608   <td>key (optional)</td>
0609   <td>string or binary</td>
0610 </tr>
0611 <tr>
0612   <td>value (required)</td>
0613   <td>string or binary</td>
0614 </tr>
0615 <tr>
0616   <td>headers (optional)</td>
0617   <td>array</td>
0618 </tr>
0619 <tr>
0620   <td>topic (*optional)</td>
0621   <td>string</td>
0622 </tr>
0623 <tr>
0624   <td>partition (optional)</td>
0625   <td>int</td>
0626 </tr>
0627 </table>
0628 \* The topic column is required if the "topic" configuration option is not specified.<br>
0629 
0630 The value column is the only required option. If a key column is not specified then
0631 a ```null``` valued key column will be automatically added (see Kafka semantics on
0632 how ```null``` valued key values are handled). If a topic column exists then its value
0633 is used as the topic when writing the given row to Kafka, unless the "topic" configuration
0634 option is set i.e., the "topic" configuration option overrides the topic column.
0635 If a "partition" column is not specified (or its value is ```null```) 
0636 then the partition is calculated by the Kafka producer.
0637 A Kafka partitioner can be specified in Spark by setting the
0638 ```kafka.partitioner.class``` option. If not present, Kafka default partitioner
0639 will be used.
0640 
0641 
0642 The following options must be set for the Kafka sink
0643 for both batch and streaming queries.
0644 
0645 <table class="table">
0646 <tr><th>Option</th><th>value</th><th>meaning</th></tr>
0647 <tr>
0648   <td>kafka.bootstrap.servers</td>
0649   <td>A comma-separated list of host:port</td>
0650   <td>The Kafka "bootstrap.servers" configuration.</td>
0651 </tr>
0652 </table>
0653 
0654 The following configurations are optional:
0655 
0656 <table class="table">
0657 <tr><th>Option</th><th>value</th><th>default</th><th>query type</th><th>meaning</th></tr>
0658 <tr>
0659   <td>topic</td>
0660   <td>string</td>
0661   <td>none</td>
0662   <td>streaming and batch</td>
0663   <td>Sets the topic that all rows will be written to in Kafka. This option overrides any
0664   topic column that may exist in the data.</td>
0665 </tr>
0666 <tr>
0667   <td>includeHeaders</td>
0668   <td>boolean</td>
0669   <td>false</td>
0670   <td>streaming and batch</td>
0671   <td>Whether to include the Kafka headers in the row.</td>
0672 </tr>
0673 </table>
0674 
0675 ### Creating a Kafka Sink for Streaming Queries
0676 
0677 <div class="codetabs">
0678 <div data-lang="scala" markdown="1">
0679 {% highlight scala %}
0680 
0681 // Write key-value data from a DataFrame to a specific Kafka topic specified in an option
0682 val ds = df
0683   .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
0684   .writeStream
0685   .format("kafka")
0686   .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
0687   .option("topic", "topic1")
0688   .start()
0689 
0690 // Write key-value data from a DataFrame to Kafka using a topic specified in the data
0691 val ds = df
0692   .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
0693   .writeStream
0694   .format("kafka")
0695   .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
0696   .start()
0697 
0698 {% endhighlight %}
0699 </div>
0700 <div data-lang="java" markdown="1">
0701 {% highlight java %}
0702 
0703 // Write key-value data from a DataFrame to a specific Kafka topic specified in an option
0704 StreamingQuery ds = df
0705   .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
0706   .writeStream()
0707   .format("kafka")
0708   .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
0709   .option("topic", "topic1")
0710   .start();
0711 
0712 // Write key-value data from a DataFrame to Kafka using a topic specified in the data
0713 StreamingQuery ds = df
0714   .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
0715   .writeStream()
0716   .format("kafka")
0717   .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
0718   .start();
0719 
0720 {% endhighlight %}
0721 </div>
0722 <div data-lang="python" markdown="1">
0723 {% highlight python %}
0724 
0725 # Write key-value data from a DataFrame to a specific Kafka topic specified in an option
0726 ds = df \
0727   .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
0728   .writeStream \
0729   .format("kafka") \
0730   .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
0731   .option("topic", "topic1") \
0732   .start()
0733 
0734 # Write key-value data from a DataFrame to Kafka using a topic specified in the data
0735 ds = df \
0736   .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
0737   .writeStream \
0738   .format("kafka") \
0739   .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
0740   .start()
0741 
0742 {% endhighlight %}
0743 </div>
0744 </div>
0745 
0746 ### Writing the output of Batch Queries to Kafka
0747 
0748 <div class="codetabs">
0749 <div data-lang="scala" markdown="1">
0750 {% highlight scala %}
0751 
0752 // Write key-value data from a DataFrame to a specific Kafka topic specified in an option
0753 df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
0754   .write
0755   .format("kafka")
0756   .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
0757   .option("topic", "topic1")
0758   .save()
0759 
0760 // Write key-value data from a DataFrame to Kafka using a topic specified in the data
0761 df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
0762   .write
0763   .format("kafka")
0764   .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
0765   .save()
0766 
0767 {% endhighlight %}
0768 </div>
0769 <div data-lang="java" markdown="1">
0770 {% highlight java %}
0771 
0772 // Write key-value data from a DataFrame to a specific Kafka topic specified in an option
0773 df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
0774   .write()
0775   .format("kafka")
0776   .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
0777   .option("topic", "topic1")
0778   .save();
0779 
0780 // Write key-value data from a DataFrame to Kafka using a topic specified in the data
0781 df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
0782   .write()
0783   .format("kafka")
0784   .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
0785   .save();
0786 
0787 {% endhighlight %}
0788 </div>
0789 <div data-lang="python" markdown="1">
0790 {% highlight python %}
0791 
0792 # Write key-value data from a DataFrame to a specific Kafka topic specified in an option
0793 df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
0794   .write \
0795   .format("kafka") \
0796   .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
0797   .option("topic", "topic1") \
0798   .save()
0799 
0800 # Write key-value data from a DataFrame to Kafka using a topic specified in the data
0801 df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") \
0802   .write \
0803   .format("kafka") \
0804   .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
0805   .save()
0806 
0807 {% endhighlight %}
0808 </div>
0809 </div>
0810 
0811 ### Producer Caching
0812 
0813 Given Kafka producer instance is designed to be thread-safe, Spark initializes a Kafka producer instance and co-use across tasks for same caching key.
0814 
0815 The caching key is built up from the following information:
0816 
0817 * Kafka producer configuration
0818 
0819 This includes configuration for authorization, which Spark will automatically include when delegation token is being used. Even we take authorization into account, you can expect same Kafka producer instance will be used among same Kafka producer configuration.
0820 It will use different Kafka producer when delegation token is renewed; Kafka producer instance for old delegation token will be evicted according to the cache policy.
0821 
0822 The following properties are available to configure the producer pool:
0823 
0824 <table class="table">
0825 <tr><th>Property Name</th><th>Default</th><th>Meaning</th><th>Since Version</th></tr>
0826 <tr>
0827   <td>spark.kafka.producer.cache.timeout</td>
0828   <td>The minimum amount of time a producer may sit idle in the pool before it is eligible for eviction by the evictor.</td>
0829   <td>10m (10 minutes)</td>
0830   <td>2.2.1</td>
0831 </tr>
0832 <tr>
0833   <td>spark.kafka.producer.cache.evictorThreadRunInterval</td>
0834   <td>The interval of time between runs of the idle evictor thread for producer pool. When non-positive, no idle evictor thread will be run.</td>
0835   <td>1m (1 minute)</td>
0836   <td>3.0.0</td>
0837 </tr>
0838 </table>
0839 
0840 Idle eviction thread periodically removes producers which are not used longer than given timeout. Note that the producer is shared and used concurrently, so the last used timestamp is determined by the moment the producer instance is returned and reference count is 0.
0841 
0842 ## Kafka Specific Configurations
0843 
0844 Kafka's own configurations can be set via `DataStreamReader.option` with `kafka.` prefix, e.g,
0845 `stream.option("kafka.bootstrap.servers", "host:port")`. For possible kafka parameters, see
0846 [Kafka consumer config docs](http://kafka.apache.org/documentation.html#newconsumerconfigs) for
0847 parameters related to reading data, and [Kafka producer config docs](http://kafka.apache.org/documentation/#producerconfigs)
0848 for parameters related to writing data.
0849 
0850 Note that the following Kafka params cannot be set and the Kafka source or sink will throw an exception:
0851 
0852 - **group.id**: Kafka source will create a unique group id for each query automatically. The user can
0853 set the prefix of the automatically generated group.id's via the optional source option `groupIdPrefix`,
0854 default value is "spark-kafka-source". You can also set "kafka.group.id" to force Spark to use a special
0855 group id, however, please read warnings for this option and use it with caution.
0856 - **auto.offset.reset**: Set the source option `startingOffsets` to specify
0857  where to start instead. Structured Streaming manages which offsets are consumed internally, rather
0858  than rely on the kafka Consumer to do it. This will ensure that no data is missed when new
0859  topics/partitions are dynamically subscribed. Note that `startingOffsets` only applies when a new
0860  streaming query is started, and that resuming will always pick up from where the query left off.
0861 - **key.deserializer**: Keys are always deserialized as byte arrays with ByteArrayDeserializer. Use
0862  DataFrame operations to explicitly deserialize the keys.
0863 - **value.deserializer**: Values are always deserialized as byte arrays with ByteArrayDeserializer.
0864  Use DataFrame operations to explicitly deserialize the values.
0865 - **key.serializer**: Keys are always serialized with ByteArraySerializer or StringSerializer. Use
0866 DataFrame operations to explicitly serialize the keys into either strings or byte arrays.
0867 - **value.serializer**: values are always serialized with ByteArraySerializer or StringSerializer. Use
0868 DataFrame operations to explicitly serialize the values into either strings or byte arrays.
0869 - **enable.auto.commit**: Kafka source doesn't commit any offset.
0870 - **interceptor.classes**: Kafka source always read keys and values as byte arrays. It's not safe to
0871  use ConsumerInterceptor as it may break the query.
0872 
0873 ## Deploying
0874 
0875 As with any Spark applications, `spark-submit` is used to launch your application. `spark-sql-kafka-0-10_{{site.SCALA_BINARY_VERSION}}`
0876 and its dependencies can be directly added to `spark-submit` using `--packages`, such as,
0877 
0878     ./bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION_SHORT}} ...
0879 
0880 For experimenting on `spark-shell`, you can also use `--packages` to add `spark-sql-kafka-0-10_{{site.SCALA_BINARY_VERSION}}` and its dependencies directly,
0881 
0882     ./bin/spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION_SHORT}} ...
0883 
0884 See [Application Submission Guide](submitting-applications.html) for more details about submitting
0885 applications with external dependencies.
0886 
0887 ## Security
0888 
0889 Kafka 0.9.0.0 introduced several features that increases security in a cluster. For detailed
0890 description about these possibilities, see [Kafka security docs](http://kafka.apache.org/documentation.html#security).
0891 
0892 It's worth noting that security is optional and turned off by default.
0893 
0894 Spark supports the following ways to authenticate against Kafka cluster:
0895 - **Delegation token (introduced in Kafka broker 1.1.0)**
0896 - **JAAS login configuration**
0897 
0898 ### Delegation token
0899 
0900 This way the application can be configured via Spark parameters and may not need JAAS login
0901 configuration (Spark can use Kafka's dynamic JAAS configuration feature). For further information
0902 about delegation tokens, see [Kafka delegation token docs](http://kafka.apache.org/documentation/#security_delegation_token).
0903 
0904 The process is initiated by Spark's Kafka delegation token provider. When `spark.kafka.clusters.${cluster}.auth.bootstrap.servers` is set,
0905 Spark considers the following log in options, in order of preference:
0906 - **JAAS login configuration**, please see example below.
0907 - **Keytab file**, such as,
0908 
0909       ./bin/spark-submit \
0910           --keytab <KEYTAB_FILE> \
0911           --principal <PRINCIPAL> \
0912           --conf spark.kafka.clusters.${cluster}.auth.bootstrap.servers=<KAFKA_SERVERS> \
0913           ...
0914 
0915 - **Kerberos credential cache**, such as,
0916 
0917       ./bin/spark-submit \
0918           --conf spark.kafka.clusters.${cluster}.auth.bootstrap.servers=<KAFKA_SERVERS> \
0919           ...
0920 
0921 The Kafka delegation token provider can be turned off by setting `spark.security.credentials.kafka.enabled` to `false` (default: `true`).
0922 
0923 Spark can be configured to use the following authentication protocols to obtain token (it must match with
0924 Kafka broker configuration):
0925 - **SASL SSL (default)**
0926 - **SSL**
0927 - **SASL PLAINTEXT (for testing)**
0928 
0929 After obtaining delegation token successfully, Spark distributes it across nodes and renews it accordingly.
0930 Delegation token uses `SCRAM` login module for authentication and because of that the appropriate
0931 `spark.kafka.clusters.${cluster}.sasl.token.mechanism` (default: `SCRAM-SHA-512`) has to be configured. Also, this parameter
0932 must match with Kafka broker configuration.
0933 
0934 When delegation token is available on an executor Spark considers the following log in options, in order of preference:
0935 - **JAAS login configuration**, please see example below.
0936 - **Delegation token**, please see <code>spark.kafka.clusters.${cluster}.target.bootstrap.servers.regex</code> parameter for further details.
0937 
0938 When none of the above applies then unsecure connection assumed.
0939 
0940 
0941 #### Configuration
0942 
0943 Delegation tokens can be obtained from multiple clusters and <code>${cluster}</code> is an arbitrary unique identifier which helps to group different configurations.
0944 
0945 <table class="table">
0946 <tr><th>Property Name</th><th>Default</th><th>Meaning</th><th>Since Version</th></tr>
0947   <tr>
0948     <td><code>spark.kafka.clusters.${cluster}.auth.bootstrap.servers</code></td>
0949     <td>None</td>
0950     <td>
0951       A list of coma separated host/port pairs to use for establishing the initial connection
0952       to the Kafka cluster. For further details please see Kafka documentation. Only used to obtain delegation token.
0953     </td>
0954     <td>3.0.0</td>
0955   </tr>
0956   <tr>
0957     <td><code>spark.kafka.clusters.${cluster}.target.bootstrap.servers.regex</code></td>
0958     <td>.*</td>
0959     <td>
0960       Regular expression to match against the <code>bootstrap.servers</code> config for sources and sinks in the application.
0961       If a server address matches this regex, the delegation token obtained from the respective bootstrap servers will be used when connecting.
0962       If multiple clusters match the address, an exception will be thrown and the query won't be started.
0963       Kafka's secure and unsecure listeners are bound to different ports. When both used the secure listener port has to be part of the regular expression.
0964     </td>
0965     <td>3.0.0</td>
0966   </tr>
0967   <tr>
0968     <td><code>spark.kafka.clusters.${cluster}.security.protocol</code></td>
0969     <td>SASL_SSL</td>
0970     <td>
0971       Protocol used to communicate with brokers. For further details please see Kafka documentation. Protocol is applied on all the sources and sinks as default where
0972       <code>bootstrap.servers</code> config matches (for further details please see <code>spark.kafka.clusters.${cluster}.target.bootstrap.servers.regex</code>),
0973       and can be overridden by setting <code>kafka.security.protocol</code> on the source or sink.
0974     </td>
0975     <td>3.0.0</td>
0976   </tr>
0977   <tr>
0978     <td><code>spark.kafka.clusters.${cluster}.sasl.kerberos.service.name</code></td>
0979     <td>kafka</td>
0980     <td>
0981       The Kerberos principal name that Kafka runs as. This can be defined either in Kafka's JAAS config or in Kafka's config.
0982       For further details please see Kafka documentation. Only used to obtain delegation token.
0983     </td>
0984     <td>3.0.0</td>
0985   </tr>
0986   <tr>
0987     <td><code>spark.kafka.clusters.${cluster}.ssl.truststore.location</code></td>
0988     <td>None</td>
0989     <td>
0990       The location of the trust store file. For further details please see Kafka documentation. Only used to obtain delegation token.
0991     </td>
0992     <td>3.0.0</td>
0993   </tr>
0994   <tr>
0995     <td><code>spark.kafka.clusters.${cluster}.ssl.truststore.password</code></td>
0996     <td>None</td>
0997     <td>
0998       The store password for the trust store file. This is optional and only needed if <code>spark.kafka.clusters.${cluster}.ssl.truststore.location</code> is configured.
0999       For further details please see Kafka documentation. Only used to obtain delegation token.
1000     </td>
1001     <td>3.0.0</td>
1002   </tr>
1003   <tr>
1004     <td><code>spark.kafka.clusters.${cluster}.ssl.keystore.location</code></td>
1005     <td>None</td>
1006     <td>
1007       The location of the key store file. This is optional for client and can be used for two-way authentication for client.
1008       For further details please see Kafka documentation. Only used to obtain delegation token.
1009     </td>
1010     <td>3.0.0</td>
1011   </tr>
1012   <tr>
1013     <td><code>spark.kafka.clusters.${cluster}.ssl.keystore.password</code></td>
1014     <td>None</td>
1015     <td>
1016       The store password for the key store file. This is optional and only needed if <code>spark.kafka.clusters.${cluster}.ssl.keystore.location</code> is configured.
1017       For further details please see Kafka documentation. Only used to obtain delegation token.
1018     </td>
1019     <td>3.0.0</td>
1020   </tr>
1021   <tr>
1022     <td><code>spark.kafka.clusters.${cluster}.ssl.key.password</code></td>
1023     <td>None</td>
1024     <td>
1025       The password of the private key in the key store file. This is optional for client.
1026       For further details please see Kafka documentation. Only used to obtain delegation token.
1027     </td>
1028     <td>3.0.0</td>
1029   </tr>
1030   <tr>
1031     <td><code>spark.kafka.clusters.${cluster}.sasl.token.mechanism</code></td>
1032     <td>SCRAM-SHA-512</td>
1033     <td>
1034       SASL mechanism used for client connections with delegation token. Because SCRAM login module used for authentication a compatible mechanism has to be set here.
1035       For further details please see Kafka documentation (<code>sasl.mechanism</code>). Only used to authenticate against Kafka broker with delegation token.
1036     </td>
1037     <td>3.0.0</td>
1038   </tr>
1039 </table>
1040 
1041 #### Kafka Specific Configurations
1042 
1043 Kafka's own configurations can be set with `kafka.` prefix, e.g, `--conf spark.kafka.clusters.${cluster}.kafka.retries=1`.
1044 For possible Kafka parameters, see [Kafka adminclient config docs](http://kafka.apache.org/documentation.html#adminclientconfigs).
1045 
1046 #### Caveats
1047 
1048 - Obtaining delegation token for proxy user is not yet supported ([KAFKA-6945](https://issues.apache.org/jira/browse/KAFKA-6945)).
1049 
1050 ### JAAS login configuration
1051 
1052 JAAS login configuration must placed on all nodes where Spark tries to access Kafka cluster.
1053 This provides the possibility to apply any custom authentication logic with a higher cost to maintain.
1054 This can be done several ways. One possibility is to provide additional JVM parameters, such as,
1055 
1056     ./bin/spark-submit \
1057         --driver-java-options "-Djava.security.auth.login.config=/path/to/custom_jaas.conf" \
1058         --conf spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/path/to/custom_jaas.conf \
1059         ...