0001 ---
0002 layout: global
0003 title: Parquet Files
0004 displayTitle: Parquet Files
0005 license: |
0006 Licensed to the Apache Software Foundation (ASF) under one or more
0007 contributor license agreements. See the NOTICE file distributed with
0008 this work for additional information regarding copyright ownership.
0009 The ASF licenses this file to You under the Apache License, Version 2.0
0010 (the "License"); you may not use this file except in compliance with
0011 the License. You may obtain a copy of the License at
0012
0013 http://www.apache.org/licenses/LICENSE-2.0
0014
0015 Unless required by applicable law or agreed to in writing, software
0016 distributed under the License is distributed on an "AS IS" BASIS,
0017 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0018 See the License for the specific language governing permissions and
0019 limitations under the License.
0020 ---
0021
0022 * Table of contents
0023 {:toc}
0024
0025 [Parquet](http://parquet.io) is a columnar format that is supported by many other data processing systems.
0026 Spark SQL provides support for both reading and writing Parquet files that automatically preserves the schema
0027 of the original data. When reading Parquet files, all columns are automatically converted to be nullable for
0028 compatibility reasons.
0029
0030 ### Loading Data Programmatically
0031
0032 Using the data from the above example:
0033
0034 <div class="codetabs">
0035
0036 <div data-lang="scala" markdown="1">
0037 {% include_example basic_parquet_example scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala %}
0038 </div>
0039
0040 <div data-lang="java" markdown="1">
0041 {% include_example basic_parquet_example java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java %}
0042 </div>
0043
0044 <div data-lang="python" markdown="1">
0045
0046 {% include_example basic_parquet_example python/sql/datasource.py %}
0047 </div>
0048
0049 <div data-lang="r" markdown="1">
0050
0051 {% include_example basic_parquet_example r/RSparkSQLExample.R %}
0052
0053 </div>
0054
0055 <div data-lang="SQL" markdown="1">
0056
0057 {% highlight sql %}
0058
0059 CREATE TEMPORARY VIEW parquetTable
0060 USING org.apache.spark.sql.parquet
0061 OPTIONS (
0062 path "examples/src/main/resources/people.parquet"
0063 )
0064
0065 SELECT * FROM parquetTable
0066
0067 {% endhighlight %}
0068
0069 </div>
0070
0071 </div>
0072
0073 ### Partition Discovery
0074
0075 Table partitioning is a common optimization approach used in systems like Hive. In a partitioned
0076 table, data are usually stored in different directories, with partitioning column values encoded in
0077 the path of each partition directory. All built-in file sources (including Text/CSV/JSON/ORC/Parquet)
0078 are able to discover and infer partitioning information automatically.
0079 For example, we can store all our previously used
0080 population data into a partitioned table using the following directory structure, with two extra
0081 columns, `gender` and `country` as partitioning columns:
0082
0083 {% highlight text %}
0084
0085 path
0086 └── to
0087 └── table
0088 ├── gender=male
0089 │ ├── ...
0090 │ │
0091 │ ├── country=US
0092 │ │ └── data.parquet
0093 │ ├── country=CN
0094 │ │ └── data.parquet
0095 │ └── ...
0096 └── gender=female
0097 ├── ...
0098 │
0099 ├── country=US
0100 │ └── data.parquet
0101 ├── country=CN
0102 │ └── data.parquet
0103 └── ...
0104
0105 {% endhighlight %}
0106
0107 By passing `path/to/table` to either `SparkSession.read.parquet` or `SparkSession.read.load`, Spark SQL
0108 will automatically extract the partitioning information from the paths.
0109 Now the schema of the returned DataFrame becomes:
0110
0111 {% highlight text %}
0112
0113 root
0114 |-- name: string (nullable = true)
0115 |-- age: long (nullable = true)
0116 |-- gender: string (nullable = true)
0117 |-- country: string (nullable = true)
0118
0119 {% endhighlight %}
0120
0121 Notice that the data types of the partitioning columns are automatically inferred. Currently,
0122 numeric data types, date, timestamp and string type are supported. Sometimes users may not want
0123 to automatically infer the data types of the partitioning columns. For these use cases, the
0124 automatic type inference can be configured by
0125 `spark.sql.sources.partitionColumnTypeInference.enabled`, which is default to `true`. When type
0126 inference is disabled, string type will be used for the partitioning columns.
0127
0128 Starting from Spark 1.6.0, partition discovery only finds partitions under the given paths
0129 by default. For the above example, if users pass `path/to/table/gender=male` to either
0130 `SparkSession.read.parquet` or `SparkSession.read.load`, `gender` will not be considered as a
0131 partitioning column. If users need to specify the base path that partition discovery
0132 should start with, they can set `basePath` in the data source options. For example,
0133 when `path/to/table/gender=male` is the path of the data and
0134 users set `basePath` to `path/to/table/`, `gender` will be a partitioning column.
0135
0136 ### Schema Merging
0137
0138 Like Protocol Buffer, Avro, and Thrift, Parquet also supports schema evolution. Users can start with
0139 a simple schema, and gradually add more columns to the schema as needed. In this way, users may end
0140 up with multiple Parquet files with different but mutually compatible schemas. The Parquet data
0141 source is now able to automatically detect this case and merge schemas of all these files.
0142
0143 Since schema merging is a relatively expensive operation, and is not a necessity in most cases, we
0144 turned it off by default starting from 1.5.0. You may enable it by
0145
0146 1. setting data source option `mergeSchema` to `true` when reading Parquet files (as shown in the
0147 examples below), or
0148 2. setting the global SQL option `spark.sql.parquet.mergeSchema` to `true`.
0149
0150 <div class="codetabs">
0151
0152 <div data-lang="scala" markdown="1">
0153 {% include_example schema_merging scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala %}
0154 </div>
0155
0156 <div data-lang="java" markdown="1">
0157 {% include_example schema_merging java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java %}
0158 </div>
0159
0160 <div data-lang="python" markdown="1">
0161
0162 {% include_example schema_merging python/sql/datasource.py %}
0163 </div>
0164
0165 <div data-lang="r" markdown="1">
0166
0167 {% include_example schema_merging r/RSparkSQLExample.R %}
0168
0169 </div>
0170
0171 </div>
0172
0173 ### Hive metastore Parquet table conversion
0174
0175 When reading from Hive metastore Parquet tables and writing to non-partitioned Hive metastore
0176 Parquet tables, Spark SQL will try to use its own Parquet support instead of Hive SerDe for
0177 better performance. This behavior is controlled by the `spark.sql.hive.convertMetastoreParquet`
0178 configuration, and is turned on by default.
0179
0180 #### Hive/Parquet Schema Reconciliation
0181
0182 There are two key differences between Hive and Parquet from the perspective of table schema
0183 processing.
0184
0185 1. Hive is case insensitive, while Parquet is not
0186 1. Hive considers all columns nullable, while nullability in Parquet is significant
0187
0188 Due to this reason, we must reconcile Hive metastore schema with Parquet schema when converting a
0189 Hive metastore Parquet table to a Spark SQL Parquet table. The reconciliation rules are:
0190
0191 1. Fields that have the same name in both schema must have the same data type regardless of
0192 nullability. The reconciled field should have the data type of the Parquet side, so that
0193 nullability is respected.
0194
0195 1. The reconciled schema contains exactly those fields defined in Hive metastore schema.
0196
0197 - Any fields that only appear in the Parquet schema are dropped in the reconciled schema.
0198 - Any fields that only appear in the Hive metastore schema are added as nullable field in the
0199 reconciled schema.
0200
0201 #### Metadata Refreshing
0202
0203 Spark SQL caches Parquet metadata for better performance. When Hive metastore Parquet table
0204 conversion is enabled, metadata of those converted tables are also cached. If these tables are
0205 updated by Hive or other external tools, you need to refresh them manually to ensure consistent
0206 metadata.
0207
0208 <div class="codetabs">
0209
0210 <div data-lang="scala" markdown="1">
0211
0212 {% highlight scala %}
0213 // spark is an existing SparkSession
0214 spark.catalog.refreshTable("my_table")
0215 {% endhighlight %}
0216
0217 </div>
0218
0219 <div data-lang="java" markdown="1">
0220
0221 {% highlight java %}
0222 // spark is an existing SparkSession
0223 spark.catalog().refreshTable("my_table");
0224 {% endhighlight %}
0225
0226 </div>
0227
0228 <div data-lang="python" markdown="1">
0229
0230 {% highlight python %}
0231 # spark is an existing SparkSession
0232 spark.catalog.refreshTable("my_table")
0233 {% endhighlight %}
0234
0235 </div>
0236
0237 <div data-lang="r" markdown="1">
0238
0239 {% highlight r %}
0240 refreshTable("my_table")
0241 {% endhighlight %}
0242
0243 </div>
0244
0245 <div data-lang="SQL" markdown="1">
0246
0247 {% highlight sql %}
0248 REFRESH TABLE my_table;
0249 {% endhighlight %}
0250
0251 </div>
0252
0253 </div>
0254
0255 ### Configuration
0256
0257 Configuration of Parquet can be done using the `setConf` method on `SparkSession` or by running
0258 `SET key=value` commands using SQL.
0259
0260 <table class="table">
0261 <tr><th>Property Name</th><th>Default</th><th>Meaning</th><th>Since Version</th></tr>
0262 <tr>
0263 <td><code>spark.sql.parquet.binaryAsString</code></td>
0264 <td>false</td>
0265 <td>
0266 Some other Parquet-producing systems, in particular Impala, Hive, and older versions of Spark SQL, do
0267 not differentiate between binary data and strings when writing out the Parquet schema. This
0268 flag tells Spark SQL to interpret binary data as a string to provide compatibility with these systems.
0269 </td>
0270 <td>1.1.1</td>
0271 </tr>
0272 <tr>
0273 <td><code>spark.sql.parquet.int96AsTimestamp</code></td>
0274 <td>true</td>
0275 <td>
0276 Some Parquet-producing systems, in particular Impala and Hive, store Timestamp into INT96. This
0277 flag tells Spark SQL to interpret INT96 data as a timestamp to provide compatibility with these systems.
0278 </td>
0279 <td>1.3.0</td>
0280 </tr>
0281 <tr>
0282 <td><code>spark.sql.parquet.compression.codec</code></td>
0283 <td>snappy</td>
0284 <td>
0285 Sets the compression codec used when writing Parquet files. If either <code>compression</code> or
0286 <code>parquet.compression</code> is specified in the table-specific options/properties, the precedence would be
0287 <code>compression</code>, <code>parquet.compression</code>, <code>spark.sql.parquet.compression.codec</code>. Acceptable values include:
0288 none, uncompressed, snappy, gzip, lzo, brotli, lz4, zstd.
0289 Note that <code>zstd</code> requires <code>ZStandardCodec</code> to be installed before Hadoop 2.9.0, <code>brotli</code> requires
0290 <code>BrotliCodec</code> to be installed.
0291 </td>
0292 <td>1.1.1</td>
0293 </tr>
0294 <tr>
0295 <td><code>spark.sql.parquet.filterPushdown</code></td>
0296 <td>true</td>
0297 <td>Enables Parquet filter push-down optimization when set to true.</td>
0298 <td>1.2.0</td>
0299 </tr>
0300 <tr>
0301 <td><code>spark.sql.hive.convertMetastoreParquet</code></td>
0302 <td>true</td>
0303 <td>
0304 When set to false, Spark SQL will use the Hive SerDe for parquet tables instead of the built in
0305 support.
0306 </td>
0307 <td>1.1.1</td>
0308 </tr>
0309 <tr>
0310 <td><code>spark.sql.parquet.mergeSchema</code></td>
0311 <td>false</td>
0312 <td>
0313 <p>
0314 When true, the Parquet data source merges schemas collected from all data files, otherwise the
0315 schema is picked from the summary file or a random data file if no summary file is available.
0316 </p>
0317 </td>
0318 <td>1.5.0</td>
0319 </tr>
0320 <tr>
0321 <td><code>spark.sql.parquet.writeLegacyFormat</code></td>
0322 <td>false</td>
0323 <td>
0324 If true, data will be written in a way of Spark 1.4 and earlier. For example, decimal values
0325 will be written in Apache Parquet's fixed-length byte array format, which other systems such as
0326 Apache Hive and Apache Impala use. If false, the newer format in Parquet will be used. For
0327 example, decimals will be written in int-based format. If Parquet output is intended for use
0328 with systems that do not support this newer format, set to true.
0329 </td>
0330 <td>1.6.0</td>
0331 </tr>
0332 </table>