Back to home page

OSCL-LXR

 
 

    


0001 ---
0002 layout: global
0003 title: Parquet Files
0004 displayTitle: Parquet Files
0005 license: |
0006   Licensed to the Apache Software Foundation (ASF) under one or more
0007   contributor license agreements.  See the NOTICE file distributed with
0008   this work for additional information regarding copyright ownership.
0009   The ASF licenses this file to You under the Apache License, Version 2.0
0010   (the "License"); you may not use this file except in compliance with
0011   the License.  You may obtain a copy of the License at
0012  
0013      http://www.apache.org/licenses/LICENSE-2.0
0014  
0015   Unless required by applicable law or agreed to in writing, software
0016   distributed under the License is distributed on an "AS IS" BASIS,
0017   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0018   See the License for the specific language governing permissions and
0019   limitations under the License.
0020 ---
0021 
0022 * Table of contents
0023 {:toc}
0024 
0025 [Parquet](http://parquet.io) is a columnar format that is supported by many other data processing systems.
0026 Spark SQL provides support for both reading and writing Parquet files that automatically preserves the schema
0027 of the original data. When reading Parquet files, all columns are automatically converted to be nullable for
0028 compatibility reasons.
0029 
0030 ### Loading Data Programmatically
0031 
0032 Using the data from the above example:
0033 
0034 <div class="codetabs">
0035 
0036 <div data-lang="scala"  markdown="1">
0037 {% include_example basic_parquet_example scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala %}
0038 </div>
0039 
0040 <div data-lang="java"  markdown="1">
0041 {% include_example basic_parquet_example java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java %}
0042 </div>
0043 
0044 <div data-lang="python"  markdown="1">
0045 
0046 {% include_example basic_parquet_example python/sql/datasource.py %}
0047 </div>
0048 
0049 <div data-lang="r"  markdown="1">
0050 
0051 {% include_example basic_parquet_example r/RSparkSQLExample.R %}
0052 
0053 </div>
0054 
0055 <div data-lang="SQL"  markdown="1">
0056 
0057 {% highlight sql %}
0058 
0059 CREATE TEMPORARY VIEW parquetTable
0060 USING org.apache.spark.sql.parquet
0061 OPTIONS (
0062   path "examples/src/main/resources/people.parquet"
0063 )
0064 
0065 SELECT * FROM parquetTable
0066 
0067 {% endhighlight %}
0068 
0069 </div>
0070 
0071 </div>
0072 
0073 ### Partition Discovery
0074 
0075 Table partitioning is a common optimization approach used in systems like Hive. In a partitioned
0076 table, data are usually stored in different directories, with partitioning column values encoded in
0077 the path of each partition directory. All built-in file sources (including Text/CSV/JSON/ORC/Parquet)
0078 are able to discover and infer partitioning information automatically.
0079 For example, we can store all our previously used
0080 population data into a partitioned table using the following directory structure, with two extra
0081 columns, `gender` and `country` as partitioning columns:
0082 
0083 {% highlight text %}
0084 
0085 path
0086 └── to
0087     └── table
0088         ├── gender=male
0089         │   ├── ...
0090         │   │
0091         │   ├── country=US
0092         │   │   └── data.parquet
0093         │   ├── country=CN
0094         │   │   └── data.parquet
0095         │   └── ...
0096         └── gender=female
0097             ├── ...
0098             │
0099             ├── country=US
0100             │   └── data.parquet
0101             ├── country=CN
0102             │   └── data.parquet
0103             └── ...
0104 
0105 {% endhighlight %}
0106 
0107 By passing `path/to/table` to either `SparkSession.read.parquet` or `SparkSession.read.load`, Spark SQL
0108 will automatically extract the partitioning information from the paths.
0109 Now the schema of the returned DataFrame becomes:
0110 
0111 {% highlight text %}
0112 
0113 root
0114 |-- name: string (nullable = true)
0115 |-- age: long (nullable = true)
0116 |-- gender: string (nullable = true)
0117 |-- country: string (nullable = true)
0118 
0119 {% endhighlight %}
0120 
0121 Notice that the data types of the partitioning columns are automatically inferred. Currently,
0122 numeric data types, date, timestamp and string type are supported. Sometimes users may not want
0123 to automatically infer the data types of the partitioning columns. For these use cases, the
0124 automatic type inference can be configured by
0125 `spark.sql.sources.partitionColumnTypeInference.enabled`, which is default to `true`. When type
0126 inference is disabled, string type will be used for the partitioning columns.
0127 
0128 Starting from Spark 1.6.0, partition discovery only finds partitions under the given paths
0129 by default. For the above example, if users pass `path/to/table/gender=male` to either
0130 `SparkSession.read.parquet` or `SparkSession.read.load`, `gender` will not be considered as a
0131 partitioning column. If users need to specify the base path that partition discovery
0132 should start with, they can set `basePath` in the data source options. For example,
0133 when `path/to/table/gender=male` is the path of the data and
0134 users set `basePath` to `path/to/table/`, `gender` will be a partitioning column.
0135 
0136 ### Schema Merging
0137 
0138 Like Protocol Buffer, Avro, and Thrift, Parquet also supports schema evolution. Users can start with
0139 a simple schema, and gradually add more columns to the schema as needed. In this way, users may end
0140 up with multiple Parquet files with different but mutually compatible schemas. The Parquet data
0141 source is now able to automatically detect this case and merge schemas of all these files.
0142 
0143 Since schema merging is a relatively expensive operation, and is not a necessity in most cases, we
0144 turned it off by default starting from 1.5.0. You may enable it by
0145 
0146 1. setting data source option `mergeSchema` to `true` when reading Parquet files (as shown in the
0147    examples below), or
0148 2. setting the global SQL option `spark.sql.parquet.mergeSchema` to `true`.
0149 
0150 <div class="codetabs">
0151 
0152 <div data-lang="scala"  markdown="1">
0153 {% include_example schema_merging scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala %}
0154 </div>
0155 
0156 <div data-lang="java"  markdown="1">
0157 {% include_example schema_merging java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java %}
0158 </div>
0159 
0160 <div data-lang="python"  markdown="1">
0161 
0162 {% include_example schema_merging python/sql/datasource.py %}
0163 </div>
0164 
0165 <div data-lang="r"  markdown="1">
0166 
0167 {% include_example schema_merging r/RSparkSQLExample.R %}
0168 
0169 </div>
0170 
0171 </div>
0172 
0173 ### Hive metastore Parquet table conversion
0174 
0175 When reading from Hive metastore Parquet tables and writing to non-partitioned Hive metastore
0176 Parquet tables, Spark SQL will try to use its own Parquet support instead of Hive SerDe for
0177 better performance. This behavior is controlled by the `spark.sql.hive.convertMetastoreParquet`
0178 configuration, and is turned on by default.
0179 
0180 #### Hive/Parquet Schema Reconciliation
0181 
0182 There are two key differences between Hive and Parquet from the perspective of table schema
0183 processing.
0184 
0185 1. Hive is case insensitive, while Parquet is not
0186 1. Hive considers all columns nullable, while nullability in Parquet is significant
0187 
0188 Due to this reason, we must reconcile Hive metastore schema with Parquet schema when converting a
0189 Hive metastore Parquet table to a Spark SQL Parquet table. The reconciliation rules are:
0190 
0191 1. Fields that have the same name in both schema must have the same data type regardless of
0192    nullability. The reconciled field should have the data type of the Parquet side, so that
0193    nullability is respected.
0194 
0195 1. The reconciled schema contains exactly those fields defined in Hive metastore schema.
0196 
0197    - Any fields that only appear in the Parquet schema are dropped in the reconciled schema.
0198    - Any fields that only appear in the Hive metastore schema are added as nullable field in the
0199      reconciled schema.
0200 
0201 #### Metadata Refreshing
0202 
0203 Spark SQL caches Parquet metadata for better performance. When Hive metastore Parquet table
0204 conversion is enabled, metadata of those converted tables are also cached. If these tables are
0205 updated by Hive or other external tools, you need to refresh them manually to ensure consistent
0206 metadata.
0207 
0208 <div class="codetabs">
0209 
0210 <div data-lang="scala"  markdown="1">
0211 
0212 {% highlight scala %}
0213 // spark is an existing SparkSession
0214 spark.catalog.refreshTable("my_table")
0215 {% endhighlight %}
0216 
0217 </div>
0218 
0219 <div data-lang="java"  markdown="1">
0220 
0221 {% highlight java %}
0222 // spark is an existing SparkSession
0223 spark.catalog().refreshTable("my_table");
0224 {% endhighlight %}
0225 
0226 </div>
0227 
0228 <div data-lang="python"  markdown="1">
0229 
0230 {% highlight python %}
0231 # spark is an existing SparkSession
0232 spark.catalog.refreshTable("my_table")
0233 {% endhighlight %}
0234 
0235 </div>
0236 
0237 <div data-lang="r"  markdown="1">
0238 
0239 {% highlight r %}
0240 refreshTable("my_table")
0241 {% endhighlight %}
0242 
0243 </div>
0244 
0245 <div data-lang="SQL"  markdown="1">
0246 
0247 {% highlight sql %}
0248 REFRESH TABLE my_table;
0249 {% endhighlight %}
0250 
0251 </div>
0252 
0253 </div>
0254 
0255 ### Configuration
0256 
0257 Configuration of Parquet can be done using the `setConf` method on `SparkSession` or by running
0258 `SET key=value` commands using SQL.
0259 
0260 <table class="table">
0261 <tr><th>Property Name</th><th>Default</th><th>Meaning</th><th>Since Version</th></tr>
0262 <tr>
0263   <td><code>spark.sql.parquet.binaryAsString</code></td>
0264   <td>false</td>
0265   <td>
0266     Some other Parquet-producing systems, in particular Impala, Hive, and older versions of Spark SQL, do
0267     not differentiate between binary data and strings when writing out the Parquet schema. This
0268     flag tells Spark SQL to interpret binary data as a string to provide compatibility with these systems.
0269   </td>
0270   <td>1.1.1</td>
0271 </tr>
0272 <tr>
0273   <td><code>spark.sql.parquet.int96AsTimestamp</code></td>
0274   <td>true</td>
0275   <td>
0276     Some Parquet-producing systems, in particular Impala and Hive, store Timestamp into INT96. This
0277     flag tells Spark SQL to interpret INT96 data as a timestamp to provide compatibility with these systems.
0278   </td>
0279   <td>1.3.0</td>
0280 </tr>
0281 <tr>
0282   <td><code>spark.sql.parquet.compression.codec</code></td>
0283   <td>snappy</td>
0284   <td>
0285     Sets the compression codec used when writing Parquet files. If either <code>compression</code> or
0286     <code>parquet.compression</code> is specified in the table-specific options/properties, the precedence would be
0287     <code>compression</code>, <code>parquet.compression</code>, <code>spark.sql.parquet.compression.codec</code>. Acceptable values include:
0288     none, uncompressed, snappy, gzip, lzo, brotli, lz4, zstd.
0289     Note that <code>zstd</code> requires <code>ZStandardCodec</code> to be installed before Hadoop 2.9.0, <code>brotli</code> requires
0290     <code>BrotliCodec</code> to be installed.
0291   </td>
0292   <td>1.1.1</td>
0293 </tr>
0294 <tr>
0295   <td><code>spark.sql.parquet.filterPushdown</code></td>
0296   <td>true</td>
0297   <td>Enables Parquet filter push-down optimization when set to true.</td>
0298   <td>1.2.0</td>
0299 </tr>
0300 <tr>
0301   <td><code>spark.sql.hive.convertMetastoreParquet</code></td>
0302   <td>true</td>
0303   <td>
0304     When set to false, Spark SQL will use the Hive SerDe for parquet tables instead of the built in
0305     support.
0306   </td>
0307   <td>1.1.1</td>
0308 </tr>
0309 <tr>
0310   <td><code>spark.sql.parquet.mergeSchema</code></td>
0311   <td>false</td>
0312   <td>
0313     <p>
0314       When true, the Parquet data source merges schemas collected from all data files, otherwise the
0315       schema is picked from the summary file or a random data file if no summary file is available.
0316     </p>
0317   </td>
0318   <td>1.5.0</td>
0319 </tr>
0320 <tr>
0321   <td><code>spark.sql.parquet.writeLegacyFormat</code></td>
0322   <td>false</td>
0323   <td>
0324     If true, data will be written in a way of Spark 1.4 and earlier. For example, decimal values
0325     will be written in Apache Parquet's fixed-length byte array format, which other systems such as
0326     Apache Hive and Apache Impala use. If false, the newer format in Parquet will be used. For
0327     example, decimals will be written in int-based format. If Parquet output is intended for use
0328     with systems that do not support this newer format, set to true.
0329   </td>
0330   <td>1.6.0</td>
0331 </tr>
0332 </table>