Back to home page

OSCL-LXR

 
 

    


0001 ---
0002 layout: global
0003 title: Apache Avro Data Source Guide
0004 license: |
0005   Licensed to the Apache Software Foundation (ASF) under one or more
0006   contributor license agreements.  See the NOTICE file distributed with
0007   this work for additional information regarding copyright ownership.
0008   The ASF licenses this file to You under the Apache License, Version 2.0
0009   (the "License"); you may not use this file except in compliance with
0010   the License.  You may obtain a copy of the License at
0011  
0012      http://www.apache.org/licenses/LICENSE-2.0
0013  
0014   Unless required by applicable law or agreed to in writing, software
0015   distributed under the License is distributed on an "AS IS" BASIS,
0016   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0017   See the License for the specific language governing permissions and
0018   limitations under the License.
0019 ---
0020 
0021 * This will become a table of contents (this text will be scraped).
0022 {:toc}
0023 
0024 Since Spark 2.4 release, [Spark SQL](https://spark.apache.org/docs/latest/sql-programming-guide.html) provides built-in support for reading and writing Apache Avro data.
0025 
0026 ## Deploying
0027 The `spark-avro` module is external and not included in `spark-submit` or `spark-shell` by default.
0028 
0029 As with any Spark applications, `spark-submit` is used to launch your application. `spark-avro_{{site.SCALA_BINARY_VERSION}}`
0030 and its dependencies can be directly added to `spark-submit` using `--packages`, such as,
0031 
0032     ./bin/spark-submit --packages org.apache.spark:spark-avro_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION_SHORT}} ...
0033 
0034 For experimenting on `spark-shell`, you can also use `--packages` to add `org.apache.spark:spark-avro_{{site.SCALA_BINARY_VERSION}}` and its dependencies directly,
0035 
0036     ./bin/spark-shell --packages org.apache.spark:spark-avro_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION_SHORT}} ...
0037 
0038 See [Application Submission Guide](submitting-applications.html) for more details about submitting applications with external dependencies.
0039 
0040 ## Load and Save Functions
0041 
0042 Since `spark-avro` module is external, there is no `.avro` API in 
0043 `DataFrameReader` or `DataFrameWriter`.
0044 
0045 To load/save data in Avro format, you need to specify the data source option `format` as `avro`(or `org.apache.spark.sql.avro`).
0046 <div class="codetabs">
0047 <div data-lang="scala" markdown="1">
0048 {% highlight scala %}
0049 
0050 val usersDF = spark.read.format("avro").load("examples/src/main/resources/users.avro")
0051 usersDF.select("name", "favorite_color").write.format("avro").save("namesAndFavColors.avro")
0052 
0053 {% endhighlight %}
0054 </div>
0055 <div data-lang="java" markdown="1">
0056 {% highlight java %}
0057 
0058 Dataset<Row> usersDF = spark.read().format("avro").load("examples/src/main/resources/users.avro");
0059 usersDF.select("name", "favorite_color").write().format("avro").save("namesAndFavColors.avro");
0060 
0061 {% endhighlight %}
0062 </div>
0063 <div data-lang="python" markdown="1">
0064 {% highlight python %}
0065 
0066 df = spark.read.format("avro").load("examples/src/main/resources/users.avro")
0067 df.select("name", "favorite_color").write.format("avro").save("namesAndFavColors.avro")
0068 
0069 {% endhighlight %}
0070 </div>
0071 <div data-lang="r" markdown="1">
0072 {% highlight r %}
0073 
0074 df <- read.df("examples/src/main/resources/users.avro", "avro")
0075 write.df(select(df, "name", "favorite_color"), "namesAndFavColors.avro", "avro")
0076 
0077 {% endhighlight %}
0078 </div>
0079 </div>
0080 
0081 ## to_avro() and from_avro()
0082 The Avro package provides function `to_avro` to encode a column as binary in Avro 
0083 format, and `from_avro()` to decode Avro binary data into a column. Both functions transform one column to 
0084 another column, and the input/output SQL data type can be a complex type or a primitive type.
0085 
0086 Using Avro record as columns is useful when reading from or writing to a streaming source like Kafka. Each
0087 Kafka key-value record will be augmented with some metadata, such as the ingestion timestamp into Kafka, the offset in Kafka, etc.
0088 * If the "value" field that contains your data is in Avro, you could use `from_avro()` to extract your data, enrich it, clean it, and then push it downstream to Kafka again or write it out to a file.
0089 * `to_avro()` can be used to turn structs into Avro records. This method is particularly useful when you would like to re-encode multiple columns into a single one when writing data out to Kafka.
0090 
0091 Both functions are currently only available in Scala and Java.
0092 
0093 <div class="codetabs">
0094 <div data-lang="scala" markdown="1">
0095 {% highlight scala %}
0096 import org.apache.spark.sql.avro.functions._
0097 
0098 // `from_avro` requires Avro schema in JSON string format.
0099 val jsonFormatSchema = new String(Files.readAllBytes(Paths.get("./examples/src/main/resources/user.avsc")))
0100 
0101 val df = spark
0102   .readStream
0103   .format("kafka")
0104   .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
0105   .option("subscribe", "topic1")
0106   .load()
0107 
0108 // 1. Decode the Avro data into a struct;
0109 // 2. Filter by column `favorite_color`;
0110 // 3. Encode the column `name` in Avro format.
0111 val output = df
0112   .select(from_avro('value, jsonFormatSchema) as 'user)
0113   .where("user.favorite_color == \"red\"")
0114   .select(to_avro($"user.name") as 'value)
0115 
0116 val query = output
0117   .writeStream
0118   .format("kafka")
0119   .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
0120   .option("topic", "topic2")
0121   .start()
0122 
0123 {% endhighlight %}
0124 </div>
0125 <div data-lang="java" markdown="1">
0126 {% highlight java %}
0127 import static org.apache.spark.sql.functions.col;
0128 import static org.apache.spark.sql.avro.functions.*;
0129 
0130 // `from_avro` requires Avro schema in JSON string format.
0131 String jsonFormatSchema = new String(Files.readAllBytes(Paths.get("./examples/src/main/resources/user.avsc")));
0132 
0133 Dataset<Row> df = spark
0134   .readStream()
0135   .format("kafka")
0136   .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
0137   .option("subscribe", "topic1")
0138   .load();
0139 
0140 // 1. Decode the Avro data into a struct;
0141 // 2. Filter by column `favorite_color`;
0142 // 3. Encode the column `name` in Avro format.
0143 Dataset<Row> output = df
0144   .select(from_avro(col("value"), jsonFormatSchema).as("user"))
0145   .where("user.favorite_color == \"red\"")
0146   .select(to_avro(col("user.name")).as("value"));
0147 
0148 StreamingQuery query = output
0149   .writeStream()
0150   .format("kafka")
0151   .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
0152   .option("topic", "topic2")
0153   .start();
0154 
0155 {% endhighlight %}
0156 </div>
0157 <div data-lang="python" markdown="1">
0158 {% highlight python %}
0159 from pyspark.sql.avro.functions import from_avro, to_avro
0160 
0161 # `from_avro` requires Avro schema in JSON string format.
0162 jsonFormatSchema = open("examples/src/main/resources/user.avsc", "r").read()
0163 
0164 df = spark\
0165   .readStream\
0166   .format("kafka")\
0167   .option("kafka.bootstrap.servers", "host1:port1,host2:port2")\
0168   .option("subscribe", "topic1")\
0169   .load()
0170 
0171 # 1. Decode the Avro data into a struct;
0172 # 2. Filter by column `favorite_color`;
0173 # 3. Encode the column `name` in Avro format.
0174 output = df\
0175   .select(from_avro("value", jsonFormatSchema).alias("user"))\
0176   .where('user.favorite_color == "red"')\
0177   .select(to_avro("user.name").alias("value"))
0178 
0179 query = output\
0180   .writeStream\
0181   .format("kafka")\
0182   .option("kafka.bootstrap.servers", "host1:port1,host2:port2")\
0183   .option("topic", "topic2")\
0184   .start()
0185 
0186 {% endhighlight %}
0187 </div>
0188 </div>
0189 
0190 ## Data Source Option
0191 
0192 Data source options of Avro can be set via:
0193  * the `.option` method on `DataFrameReader` or `DataFrameWriter`.
0194  * the `options` parameter in function `from_avro`.
0195 
0196 <table class="table">
0197   <tr><th><b>Property Name</b></th><th><b>Default</b></th><th><b>Meaning</b></th><th><b>Scope</b></th></tr>
0198   <tr>
0199     <td><code>avroSchema</code></td>
0200     <td>None</td>
0201     <td>Optional schema provided by a user in JSON format.
0202       <ul>
0203         <li>
0204           When reading Avro, this option can be set to an evolved schema, which is compatible but different with
0205           the actual Avro schema. The deserialization schema will be consistent with the evolved schema.
0206           For example, if we set an evolved schema containing one additional column with a default value,
0207           the reading result in Spark will contain the new column too.
0208         </li>
0209         <li>
0210           When writing Avro, this option can be set if the expected output Avro schema doesn't match the
0211           schema converted by Spark. For example, the expected schema of one column is of "enum" type,
0212           instead of "string" type in the default converted schema.
0213         </li>
0214       </ul>
0215     </td>
0216     <td> read, write and function <code>from_avro</code></td>
0217   </tr>
0218   <tr>
0219     <td><code>recordName</code></td>
0220     <td>topLevelRecord</td>
0221     <td>Top level record name in write result, which is required in Avro spec.</td>
0222     <td>write</td>
0223   </tr>
0224   <tr>
0225     <td><code>recordNamespace</code></td>
0226     <td>""</td>
0227     <td>Record namespace in write result.</td>
0228     <td>write</td>
0229   </tr>
0230   <tr>
0231     <td><code>ignoreExtension</code></td>
0232     <td>true</td>
0233     <td>The option controls ignoring of files without <code>.avro</code> extensions in read.<br> If the option is enabled, all files (with and without <code>.avro</code> extension) are loaded.<br> The option has been deprecated, and it will be removed in the future releases. Please use the general data source option <a href="./sql-data-sources-generic-options.html#path-global-filter">pathGlobFilter</a> for filtering file names.</td>
0234     <td>read</td>
0235   </tr>
0236   <tr>
0237     <td><code>compression</code></td>
0238     <td>snappy</td>
0239     <td>The <code>compression</code> option allows to specify a compression codec used in write.<br>
0240   Currently supported codecs are <code>uncompressed</code>, <code>snappy</code>, <code>deflate</code>, <code>bzip2</code> and <code>xz</code>.<br> If the option is not set, the configuration <code>spark.sql.avro.compression.codec</code> config is taken into account.</td>
0241     <td>write</td>
0242   </tr>
0243   <tr>
0244     <td><code>mode</code></td>
0245     <td>FAILFAST</td>
0246     <td>The <code>mode</code> option allows to specify parse mode for function <code>from_avro</code>.<br>
0247       Currently supported modes are:
0248       <ul>
0249         <li><code>FAILFAST</code>: Throws an exception on processing corrupted record.</li>
0250         <li><code>PERMISSIVE</code>: Corrupt records are processed as null result. Therefore, the
0251         data schema is forced to be fully nullable, which might be different from the one user provided.</li>
0252       </ul>
0253     </td>
0254     <td>function <code>from_avro</code></td>
0255   </tr>
0256 </table>
0257 
0258 ## Configuration
0259 Configuration of Avro can be done using the `setConf` method on SparkSession or by running `SET key=value` commands using SQL.
0260 <table class="table">
0261   <tr><th><b>Property Name</b></th><th><b>Default</b></th><th><b>Meaning</b></th><th><b>Since Version</b></th></tr>
0262   <tr>
0263     <td>spark.sql.legacy.replaceDatabricksSparkAvro.enabled</td>
0264     <td>true</td>
0265     <td>
0266       If it is set to true, the data source provider <code>com.databricks.spark.avro</code> is mapped
0267       to the built-in but external Avro data source module for backward compatibility.
0268     </td>
0269     <td>2.4.0</td>
0270   </tr>
0271   <tr>
0272     <td>spark.sql.avro.compression.codec</td>
0273     <td>snappy</td>
0274     <td>
0275       Compression codec used in writing of AVRO files. Supported codecs: uncompressed, deflate,
0276       snappy, bzip2 and xz. Default codec is snappy.
0277     </td>
0278     <td>2.4.0</td>
0279   </tr>
0280   <tr>
0281     <td>spark.sql.avro.deflate.level</td>
0282     <td>-1</td>
0283     <td>
0284       Compression level for the deflate codec used in writing of AVRO files. Valid value must be in
0285       the range of from 1 to 9 inclusive or -1. The default value is -1 which corresponds to 6 level
0286       in the current implementation.
0287     </td>
0288     <td>2.4.0</td>
0289   </tr>
0290 </table>
0291 
0292 ## Compatibility with Databricks spark-avro
0293 This Avro data source module is originally from and compatible with Databricks's open source repository 
0294 [spark-avro](https://github.com/databricks/spark-avro).
0295 
0296 By default with the SQL configuration `spark.sql.legacy.replaceDatabricksSparkAvro.enabled` enabled, the data source provider `com.databricks.spark.avro` is 
0297 mapped to this built-in Avro module. For the Spark tables created with `Provider` property as `com.databricks.spark.avro` in 
0298 catalog meta store, the mapping is essential to load these tables if you are using this built-in Avro module. 
0299 
0300 Note in Databricks's [spark-avro](https://github.com/databricks/spark-avro), implicit classes 
0301 `AvroDataFrameWriter` and `AvroDataFrameReader` were created for shortcut function `.avro()`. In this 
0302 built-in but external module, both implicit classes are removed. Please use `.format("avro")` in 
0303 `DataFrameWriter` or `DataFrameReader` instead, which should be clean and good enough.
0304 
0305 If you prefer using your own build of `spark-avro` jar file, you can simply disable the configuration 
0306 `spark.sql.legacy.replaceDatabricksSparkAvro.enabled`, and use the option `--jars` on deploying your 
0307 applications. Read the [Advanced Dependency Management](https://spark.apache
0308 .org/docs/latest/submitting-applications.html#advanced-dependency-management) section in Application 
0309 Submission Guide for more details. 
0310 
0311 ## Supported types for Avro -> Spark SQL conversion
0312 Currently Spark supports reading all [primitive types](https://avro.apache.org/docs/1.8.2/spec.html#schema_primitive) and [complex types](https://avro.apache.org/docs/1.8.2/spec.html#schema_complex) under records of Avro.
0313 <table class="table">
0314   <tr><th><b>Avro type</b></th><th><b>Spark SQL type</b></th></tr>
0315   <tr>
0316     <td>boolean</td>
0317     <td>BooleanType</td>
0318   </tr>
0319   <tr>
0320     <td>int</td>
0321     <td>IntegerType</td>
0322   </tr>
0323   <tr>
0324     <td>long</td>
0325     <td>LongType</td>
0326   </tr>
0327   <tr>
0328     <td>float</td>
0329     <td>FloatType</td>
0330   </tr>
0331   <tr>
0332     <td>double</td>
0333     <td>DoubleType</td>
0334   </tr>
0335   <tr>
0336     <td>string</td>
0337     <td>StringType</td>
0338   </tr>
0339   <tr>
0340     <td>enum</td>
0341     <td>StringType</td>
0342   </tr>
0343   <tr>
0344     <td>fixed</td>
0345     <td>BinaryType</td>
0346   </tr>
0347   <tr>
0348     <td>bytes</td>
0349     <td>BinaryType</td>
0350   </tr>
0351   <tr>
0352     <td>record</td>
0353     <td>StructType</td>
0354   </tr>
0355   <tr>
0356     <td>array</td>
0357     <td>ArrayType</td>
0358   </tr>
0359   <tr>
0360     <td>map</td>
0361     <td>MapType</td>
0362   </tr>
0363   <tr>
0364     <td>union</td>
0365     <td>See below</td>
0366   </tr>
0367 </table>
0368 
0369 In addition to the types listed above, it supports reading `union` types. The following three types are considered basic `union` types:
0370 
0371 1. `union(int, long)` will be mapped to LongType.
0372 2. `union(float, double)` will be mapped to DoubleType.
0373 3. `union(something, null)`, where something is any supported Avro type. This will be mapped to the same Spark SQL type as that of something, with nullable set to true.
0374 All other union types are considered complex. They will be mapped to StructType where field names are member0, member1, etc., in accordance with members of the union. This is consistent with the behavior when converting between Avro and Parquet.
0375 
0376 It also supports reading the following Avro [logical types](https://avro.apache.org/docs/1.8.2/spec.html#Logical+Types):
0377 
0378 <table class="table">
0379   <tr><th><b>Avro logical type</b></th><th><b>Avro type</b></th><th><b>Spark SQL type</b></th></tr>
0380   <tr>
0381     <td>date</td>
0382     <td>int</td>
0383     <td>DateType</td>
0384   </tr>
0385   <tr>
0386     <td>timestamp-millis</td>
0387     <td>long</td>
0388     <td>TimestampType</td>
0389   </tr>
0390   <tr>
0391     <td>timestamp-micros</td>
0392     <td>long</td>
0393     <td>TimestampType</td>
0394   </tr>
0395   <tr>
0396     <td>decimal</td>
0397     <td>fixed</td>
0398     <td>DecimalType</td>
0399   </tr>
0400   <tr>
0401     <td>decimal</td>
0402     <td>bytes</td>
0403     <td>DecimalType</td>
0404   </tr>
0405 </table>
0406 At the moment, it ignores docs, aliases and other properties present in the Avro file.
0407 
0408 ## Supported types for Spark SQL -> Avro conversion
0409 Spark supports writing of all Spark SQL types into Avro. For most types, the mapping from Spark types to Avro types is straightforward (e.g. IntegerType gets converted to int); however, there are a few special cases which are listed below:
0410 
0411 <table class="table">
0412 <tr><th><b>Spark SQL type</b></th><th><b>Avro type</b></th><th><b>Avro logical type</b></th></tr>
0413   <tr>
0414     <td>ByteType</td>
0415     <td>int</td>
0416     <td></td>
0417   </tr>
0418   <tr>
0419     <td>ShortType</td>
0420     <td>int</td>
0421     <td></td>
0422   </tr>
0423   <tr>
0424     <td>BinaryType</td>
0425     <td>bytes</td>
0426     <td></td>
0427   </tr>
0428   <tr>
0429     <td>DateType</td>
0430     <td>int</td>
0431     <td>date</td>
0432   </tr>
0433   <tr>
0434     <td>TimestampType</td>
0435     <td>long</td>
0436     <td>timestamp-micros</td>
0437   </tr>
0438   <tr>
0439     <td>DecimalType</td>
0440     <td>fixed</td>
0441     <td>decimal</td>
0442   </tr>
0443 </table>
0444 
0445 You can also specify the whole output Avro schema with the option `avroSchema`, so that Spark SQL types can be converted into other Avro types. The following conversions are not applied by default and require user specified Avro schema:
0446 
0447 <table class="table">
0448   <tr><th><b>Spark SQL type</b></th><th><b>Avro type</b></th><th><b>Avro logical type</b></th></tr>
0449   <tr>
0450     <td>BinaryType</td>
0451     <td>fixed</td>
0452     <td></td>
0453   </tr>
0454   <tr>
0455     <td>StringType</td>
0456     <td>enum</td>
0457     <td></td>
0458   </tr>
0459   <tr>
0460     <td>TimestampType</td>
0461     <td>long</td>
0462     <td>timestamp-millis</td>
0463   </tr>
0464   <tr>
0465     <td>DecimalType</td>
0466     <td>bytes</td>
0467     <td>decimal</td>
0468   </tr>
0469 </table>