0001 ---
0002 layout: global
0003 title: Apache Avro Data Source Guide
0004 license: |
0005 Licensed to the Apache Software Foundation (ASF) under one or more
0006 contributor license agreements. See the NOTICE file distributed with
0007 this work for additional information regarding copyright ownership.
0008 The ASF licenses this file to You under the Apache License, Version 2.0
0009 (the "License"); you may not use this file except in compliance with
0010 the License. You may obtain a copy of the License at
0011
0012 http://www.apache.org/licenses/LICENSE-2.0
0013
0014 Unless required by applicable law or agreed to in writing, software
0015 distributed under the License is distributed on an "AS IS" BASIS,
0016 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0017 See the License for the specific language governing permissions and
0018 limitations under the License.
0019 ---
0020
0021 * This will become a table of contents (this text will be scraped).
0022 {:toc}
0023
0024 Since Spark 2.4 release, [Spark SQL](https://spark.apache.org/docs/latest/sql-programming-guide.html) provides built-in support for reading and writing Apache Avro data.
0025
0026 ## Deploying
0027 The `spark-avro` module is external and not included in `spark-submit` or `spark-shell` by default.
0028
0029 As with any Spark applications, `spark-submit` is used to launch your application. `spark-avro_{{site.SCALA_BINARY_VERSION}}`
0030 and its dependencies can be directly added to `spark-submit` using `--packages`, such as,
0031
0032 ./bin/spark-submit --packages org.apache.spark:spark-avro_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION_SHORT}} ...
0033
0034 For experimenting on `spark-shell`, you can also use `--packages` to add `org.apache.spark:spark-avro_{{site.SCALA_BINARY_VERSION}}` and its dependencies directly,
0035
0036 ./bin/spark-shell --packages org.apache.spark:spark-avro_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION_SHORT}} ...
0037
0038 See [Application Submission Guide](submitting-applications.html) for more details about submitting applications with external dependencies.
0039
0040 ## Load and Save Functions
0041
0042 Since `spark-avro` module is external, there is no `.avro` API in
0043 `DataFrameReader` or `DataFrameWriter`.
0044
0045 To load/save data in Avro format, you need to specify the data source option `format` as `avro`(or `org.apache.spark.sql.avro`).
0046 <div class="codetabs">
0047 <div data-lang="scala" markdown="1">
0048 {% highlight scala %}
0049
0050 val usersDF = spark.read.format("avro").load("examples/src/main/resources/users.avro")
0051 usersDF.select("name", "favorite_color").write.format("avro").save("namesAndFavColors.avro")
0052
0053 {% endhighlight %}
0054 </div>
0055 <div data-lang="java" markdown="1">
0056 {% highlight java %}
0057
0058 Dataset<Row> usersDF = spark.read().format("avro").load("examples/src/main/resources/users.avro");
0059 usersDF.select("name", "favorite_color").write().format("avro").save("namesAndFavColors.avro");
0060
0061 {% endhighlight %}
0062 </div>
0063 <div data-lang="python" markdown="1">
0064 {% highlight python %}
0065
0066 df = spark.read.format("avro").load("examples/src/main/resources/users.avro")
0067 df.select("name", "favorite_color").write.format("avro").save("namesAndFavColors.avro")
0068
0069 {% endhighlight %}
0070 </div>
0071 <div data-lang="r" markdown="1">
0072 {% highlight r %}
0073
0074 df <- read.df("examples/src/main/resources/users.avro", "avro")
0075 write.df(select(df, "name", "favorite_color"), "namesAndFavColors.avro", "avro")
0076
0077 {% endhighlight %}
0078 </div>
0079 </div>
0080
0081 ## to_avro() and from_avro()
0082 The Avro package provides function `to_avro` to encode a column as binary in Avro
0083 format, and `from_avro()` to decode Avro binary data into a column. Both functions transform one column to
0084 another column, and the input/output SQL data type can be a complex type or a primitive type.
0085
0086 Using Avro record as columns is useful when reading from or writing to a streaming source like Kafka. Each
0087 Kafka key-value record will be augmented with some metadata, such as the ingestion timestamp into Kafka, the offset in Kafka, etc.
0088 * If the "value" field that contains your data is in Avro, you could use `from_avro()` to extract your data, enrich it, clean it, and then push it downstream to Kafka again or write it out to a file.
0089 * `to_avro()` can be used to turn structs into Avro records. This method is particularly useful when you would like to re-encode multiple columns into a single one when writing data out to Kafka.
0090
0091 Both functions are currently only available in Scala and Java.
0092
0093 <div class="codetabs">
0094 <div data-lang="scala" markdown="1">
0095 {% highlight scala %}
0096 import org.apache.spark.sql.avro.functions._
0097
0098 // `from_avro` requires Avro schema in JSON string format.
0099 val jsonFormatSchema = new String(Files.readAllBytes(Paths.get("./examples/src/main/resources/user.avsc")))
0100
0101 val df = spark
0102 .readStream
0103 .format("kafka")
0104 .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
0105 .option("subscribe", "topic1")
0106 .load()
0107
0108 // 1. Decode the Avro data into a struct;
0109 // 2. Filter by column `favorite_color`;
0110 // 3. Encode the column `name` in Avro format.
0111 val output = df
0112 .select(from_avro('value, jsonFormatSchema) as 'user)
0113 .where("user.favorite_color == \"red\"")
0114 .select(to_avro($"user.name") as 'value)
0115
0116 val query = output
0117 .writeStream
0118 .format("kafka")
0119 .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
0120 .option("topic", "topic2")
0121 .start()
0122
0123 {% endhighlight %}
0124 </div>
0125 <div data-lang="java" markdown="1">
0126 {% highlight java %}
0127 import static org.apache.spark.sql.functions.col;
0128 import static org.apache.spark.sql.avro.functions.*;
0129
0130 // `from_avro` requires Avro schema in JSON string format.
0131 String jsonFormatSchema = new String(Files.readAllBytes(Paths.get("./examples/src/main/resources/user.avsc")));
0132
0133 Dataset<Row> df = spark
0134 .readStream()
0135 .format("kafka")
0136 .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
0137 .option("subscribe", "topic1")
0138 .load();
0139
0140 // 1. Decode the Avro data into a struct;
0141 // 2. Filter by column `favorite_color`;
0142 // 3. Encode the column `name` in Avro format.
0143 Dataset<Row> output = df
0144 .select(from_avro(col("value"), jsonFormatSchema).as("user"))
0145 .where("user.favorite_color == \"red\"")
0146 .select(to_avro(col("user.name")).as("value"));
0147
0148 StreamingQuery query = output
0149 .writeStream()
0150 .format("kafka")
0151 .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
0152 .option("topic", "topic2")
0153 .start();
0154
0155 {% endhighlight %}
0156 </div>
0157 <div data-lang="python" markdown="1">
0158 {% highlight python %}
0159 from pyspark.sql.avro.functions import from_avro, to_avro
0160
0161 # `from_avro` requires Avro schema in JSON string format.
0162 jsonFormatSchema = open("examples/src/main/resources/user.avsc", "r").read()
0163
0164 df = spark\
0165 .readStream\
0166 .format("kafka")\
0167 .option("kafka.bootstrap.servers", "host1:port1,host2:port2")\
0168 .option("subscribe", "topic1")\
0169 .load()
0170
0171 # 1. Decode the Avro data into a struct;
0172 # 2. Filter by column `favorite_color`;
0173 # 3. Encode the column `name` in Avro format.
0174 output = df\
0175 .select(from_avro("value", jsonFormatSchema).alias("user"))\
0176 .where('user.favorite_color == "red"')\
0177 .select(to_avro("user.name").alias("value"))
0178
0179 query = output\
0180 .writeStream\
0181 .format("kafka")\
0182 .option("kafka.bootstrap.servers", "host1:port1,host2:port2")\
0183 .option("topic", "topic2")\
0184 .start()
0185
0186 {% endhighlight %}
0187 </div>
0188 </div>
0189
0190 ## Data Source Option
0191
0192 Data source options of Avro can be set via:
0193 * the `.option` method on `DataFrameReader` or `DataFrameWriter`.
0194 * the `options` parameter in function `from_avro`.
0195
0196 <table class="table">
0197 <tr><th><b>Property Name</b></th><th><b>Default</b></th><th><b>Meaning</b></th><th><b>Scope</b></th></tr>
0198 <tr>
0199 <td><code>avroSchema</code></td>
0200 <td>None</td>
0201 <td>Optional schema provided by a user in JSON format.
0202 <ul>
0203 <li>
0204 When reading Avro, this option can be set to an evolved schema, which is compatible but different with
0205 the actual Avro schema. The deserialization schema will be consistent with the evolved schema.
0206 For example, if we set an evolved schema containing one additional column with a default value,
0207 the reading result in Spark will contain the new column too.
0208 </li>
0209 <li>
0210 When writing Avro, this option can be set if the expected output Avro schema doesn't match the
0211 schema converted by Spark. For example, the expected schema of one column is of "enum" type,
0212 instead of "string" type in the default converted schema.
0213 </li>
0214 </ul>
0215 </td>
0216 <td> read, write and function <code>from_avro</code></td>
0217 </tr>
0218 <tr>
0219 <td><code>recordName</code></td>
0220 <td>topLevelRecord</td>
0221 <td>Top level record name in write result, which is required in Avro spec.</td>
0222 <td>write</td>
0223 </tr>
0224 <tr>
0225 <td><code>recordNamespace</code></td>
0226 <td>""</td>
0227 <td>Record namespace in write result.</td>
0228 <td>write</td>
0229 </tr>
0230 <tr>
0231 <td><code>ignoreExtension</code></td>
0232 <td>true</td>
0233 <td>The option controls ignoring of files without <code>.avro</code> extensions in read.<br> If the option is enabled, all files (with and without <code>.avro</code> extension) are loaded.<br> The option has been deprecated, and it will be removed in the future releases. Please use the general data source option <a href="./sql-data-sources-generic-options.html#path-global-filter">pathGlobFilter</a> for filtering file names.</td>
0234 <td>read</td>
0235 </tr>
0236 <tr>
0237 <td><code>compression</code></td>
0238 <td>snappy</td>
0239 <td>The <code>compression</code> option allows to specify a compression codec used in write.<br>
0240 Currently supported codecs are <code>uncompressed</code>, <code>snappy</code>, <code>deflate</code>, <code>bzip2</code> and <code>xz</code>.<br> If the option is not set, the configuration <code>spark.sql.avro.compression.codec</code> config is taken into account.</td>
0241 <td>write</td>
0242 </tr>
0243 <tr>
0244 <td><code>mode</code></td>
0245 <td>FAILFAST</td>
0246 <td>The <code>mode</code> option allows to specify parse mode for function <code>from_avro</code>.<br>
0247 Currently supported modes are:
0248 <ul>
0249 <li><code>FAILFAST</code>: Throws an exception on processing corrupted record.</li>
0250 <li><code>PERMISSIVE</code>: Corrupt records are processed as null result. Therefore, the
0251 data schema is forced to be fully nullable, which might be different from the one user provided.</li>
0252 </ul>
0253 </td>
0254 <td>function <code>from_avro</code></td>
0255 </tr>
0256 </table>
0257
0258 ## Configuration
0259 Configuration of Avro can be done using the `setConf` method on SparkSession or by running `SET key=value` commands using SQL.
0260 <table class="table">
0261 <tr><th><b>Property Name</b></th><th><b>Default</b></th><th><b>Meaning</b></th><th><b>Since Version</b></th></tr>
0262 <tr>
0263 <td>spark.sql.legacy.replaceDatabricksSparkAvro.enabled</td>
0264 <td>true</td>
0265 <td>
0266 If it is set to true, the data source provider <code>com.databricks.spark.avro</code> is mapped
0267 to the built-in but external Avro data source module for backward compatibility.
0268 </td>
0269 <td>2.4.0</td>
0270 </tr>
0271 <tr>
0272 <td>spark.sql.avro.compression.codec</td>
0273 <td>snappy</td>
0274 <td>
0275 Compression codec used in writing of AVRO files. Supported codecs: uncompressed, deflate,
0276 snappy, bzip2 and xz. Default codec is snappy.
0277 </td>
0278 <td>2.4.0</td>
0279 </tr>
0280 <tr>
0281 <td>spark.sql.avro.deflate.level</td>
0282 <td>-1</td>
0283 <td>
0284 Compression level for the deflate codec used in writing of AVRO files. Valid value must be in
0285 the range of from 1 to 9 inclusive or -1. The default value is -1 which corresponds to 6 level
0286 in the current implementation.
0287 </td>
0288 <td>2.4.0</td>
0289 </tr>
0290 </table>
0291
0292 ## Compatibility with Databricks spark-avro
0293 This Avro data source module is originally from and compatible with Databricks's open source repository
0294 [spark-avro](https://github.com/databricks/spark-avro).
0295
0296 By default with the SQL configuration `spark.sql.legacy.replaceDatabricksSparkAvro.enabled` enabled, the data source provider `com.databricks.spark.avro` is
0297 mapped to this built-in Avro module. For the Spark tables created with `Provider` property as `com.databricks.spark.avro` in
0298 catalog meta store, the mapping is essential to load these tables if you are using this built-in Avro module.
0299
0300 Note in Databricks's [spark-avro](https://github.com/databricks/spark-avro), implicit classes
0301 `AvroDataFrameWriter` and `AvroDataFrameReader` were created for shortcut function `.avro()`. In this
0302 built-in but external module, both implicit classes are removed. Please use `.format("avro")` in
0303 `DataFrameWriter` or `DataFrameReader` instead, which should be clean and good enough.
0304
0305 If you prefer using your own build of `spark-avro` jar file, you can simply disable the configuration
0306 `spark.sql.legacy.replaceDatabricksSparkAvro.enabled`, and use the option `--jars` on deploying your
0307 applications. Read the [Advanced Dependency Management](https://spark.apache
0308 .org/docs/latest/submitting-applications.html#advanced-dependency-management) section in Application
0309 Submission Guide for more details.
0310
0311 ## Supported types for Avro -> Spark SQL conversion
0312 Currently Spark supports reading all [primitive types](https://avro.apache.org/docs/1.8.2/spec.html#schema_primitive) and [complex types](https://avro.apache.org/docs/1.8.2/spec.html#schema_complex) under records of Avro.
0313 <table class="table">
0314 <tr><th><b>Avro type</b></th><th><b>Spark SQL type</b></th></tr>
0315 <tr>
0316 <td>boolean</td>
0317 <td>BooleanType</td>
0318 </tr>
0319 <tr>
0320 <td>int</td>
0321 <td>IntegerType</td>
0322 </tr>
0323 <tr>
0324 <td>long</td>
0325 <td>LongType</td>
0326 </tr>
0327 <tr>
0328 <td>float</td>
0329 <td>FloatType</td>
0330 </tr>
0331 <tr>
0332 <td>double</td>
0333 <td>DoubleType</td>
0334 </tr>
0335 <tr>
0336 <td>string</td>
0337 <td>StringType</td>
0338 </tr>
0339 <tr>
0340 <td>enum</td>
0341 <td>StringType</td>
0342 </tr>
0343 <tr>
0344 <td>fixed</td>
0345 <td>BinaryType</td>
0346 </tr>
0347 <tr>
0348 <td>bytes</td>
0349 <td>BinaryType</td>
0350 </tr>
0351 <tr>
0352 <td>record</td>
0353 <td>StructType</td>
0354 </tr>
0355 <tr>
0356 <td>array</td>
0357 <td>ArrayType</td>
0358 </tr>
0359 <tr>
0360 <td>map</td>
0361 <td>MapType</td>
0362 </tr>
0363 <tr>
0364 <td>union</td>
0365 <td>See below</td>
0366 </tr>
0367 </table>
0368
0369 In addition to the types listed above, it supports reading `union` types. The following three types are considered basic `union` types:
0370
0371 1. `union(int, long)` will be mapped to LongType.
0372 2. `union(float, double)` will be mapped to DoubleType.
0373 3. `union(something, null)`, where something is any supported Avro type. This will be mapped to the same Spark SQL type as that of something, with nullable set to true.
0374 All other union types are considered complex. They will be mapped to StructType where field names are member0, member1, etc., in accordance with members of the union. This is consistent with the behavior when converting between Avro and Parquet.
0375
0376 It also supports reading the following Avro [logical types](https://avro.apache.org/docs/1.8.2/spec.html#Logical+Types):
0377
0378 <table class="table">
0379 <tr><th><b>Avro logical type</b></th><th><b>Avro type</b></th><th><b>Spark SQL type</b></th></tr>
0380 <tr>
0381 <td>date</td>
0382 <td>int</td>
0383 <td>DateType</td>
0384 </tr>
0385 <tr>
0386 <td>timestamp-millis</td>
0387 <td>long</td>
0388 <td>TimestampType</td>
0389 </tr>
0390 <tr>
0391 <td>timestamp-micros</td>
0392 <td>long</td>
0393 <td>TimestampType</td>
0394 </tr>
0395 <tr>
0396 <td>decimal</td>
0397 <td>fixed</td>
0398 <td>DecimalType</td>
0399 </tr>
0400 <tr>
0401 <td>decimal</td>
0402 <td>bytes</td>
0403 <td>DecimalType</td>
0404 </tr>
0405 </table>
0406 At the moment, it ignores docs, aliases and other properties present in the Avro file.
0407
0408 ## Supported types for Spark SQL -> Avro conversion
0409 Spark supports writing of all Spark SQL types into Avro. For most types, the mapping from Spark types to Avro types is straightforward (e.g. IntegerType gets converted to int); however, there are a few special cases which are listed below:
0410
0411 <table class="table">
0412 <tr><th><b>Spark SQL type</b></th><th><b>Avro type</b></th><th><b>Avro logical type</b></th></tr>
0413 <tr>
0414 <td>ByteType</td>
0415 <td>int</td>
0416 <td></td>
0417 </tr>
0418 <tr>
0419 <td>ShortType</td>
0420 <td>int</td>
0421 <td></td>
0422 </tr>
0423 <tr>
0424 <td>BinaryType</td>
0425 <td>bytes</td>
0426 <td></td>
0427 </tr>
0428 <tr>
0429 <td>DateType</td>
0430 <td>int</td>
0431 <td>date</td>
0432 </tr>
0433 <tr>
0434 <td>TimestampType</td>
0435 <td>long</td>
0436 <td>timestamp-micros</td>
0437 </tr>
0438 <tr>
0439 <td>DecimalType</td>
0440 <td>fixed</td>
0441 <td>decimal</td>
0442 </tr>
0443 </table>
0444
0445 You can also specify the whole output Avro schema with the option `avroSchema`, so that Spark SQL types can be converted into other Avro types. The following conversions are not applied by default and require user specified Avro schema:
0446
0447 <table class="table">
0448 <tr><th><b>Spark SQL type</b></th><th><b>Avro type</b></th><th><b>Avro logical type</b></th></tr>
0449 <tr>
0450 <td>BinaryType</td>
0451 <td>fixed</td>
0452 <td></td>
0453 </tr>
0454 <tr>
0455 <td>StringType</td>
0456 <td>enum</td>
0457 <td></td>
0458 </tr>
0459 <tr>
0460 <td>TimestampType</td>
0461 <td>long</td>
0462 <td>timestamp-millis</td>
0463 </tr>
0464 <tr>
0465 <td>DecimalType</td>
0466 <td>bytes</td>
0467 <td>decimal</td>
0468 </tr>
0469 </table>