0001 ---
0002 layout: global
0003 title: Performance Tuning
0004 displayTitle: Performance Tuning
0005 license: |
0006 Licensed to the Apache Software Foundation (ASF) under one or more
0007 contributor license agreements. See the NOTICE file distributed with
0008 this work for additional information regarding copyright ownership.
0009 The ASF licenses this file to You under the Apache License, Version 2.0
0010 (the "License"); you may not use this file except in compliance with
0011 the License. You may obtain a copy of the License at
0012
0013 http://www.apache.org/licenses/LICENSE-2.0
0014
0015 Unless required by applicable law or agreed to in writing, software
0016 distributed under the License is distributed on an "AS IS" BASIS,
0017 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0018 See the License for the specific language governing permissions and
0019 limitations under the License.
0020 ---
0021
0022 * Table of contents
0023 {:toc}
0024
0025 For some workloads, it is possible to improve performance by either caching data in memory, or by
0026 turning on some experimental options.
0027
0028 ## Caching Data In Memory
0029
0030 Spark SQL can cache tables using an in-memory columnar format by calling `spark.catalog.cacheTable("tableName")` or `dataFrame.cache()`.
0031 Then Spark SQL will scan only required columns and will automatically tune compression to minimize
0032 memory usage and GC pressure. You can call `spark.catalog.uncacheTable("tableName")` to remove the table from memory.
0033
0034 Configuration of in-memory caching can be done using the `setConf` method on `SparkSession` or by running
0035 `SET key=value` commands using SQL.
0036
0037 <table class="table">
0038 <tr><th>Property Name</th><th>Default</th><th>Meaning</th><th>Since Version</th></tr>
0039 <tr>
0040 <td><code>spark.sql.inMemoryColumnarStorage.compressed</code></td>
0041 <td>true</td>
0042 <td>
0043 When set to true Spark SQL will automatically select a compression codec for each column based
0044 on statistics of the data.
0045 </td>
0046 <td>1.0.1</td>
0047 </tr>
0048 <tr>
0049 <td><code>spark.sql.inMemoryColumnarStorage.batchSize</code></td>
0050 <td>10000</td>
0051 <td>
0052 Controls the size of batches for columnar caching. Larger batch sizes can improve memory utilization
0053 and compression, but risk OOMs when caching data.
0054 </td>
0055 <td>1.1.1</td>
0056 </tr>
0057
0058 </table>
0059
0060 ## Other Configuration Options
0061
0062 The following options can also be used to tune the performance of query execution. It is possible
0063 that these options will be deprecated in future release as more optimizations are performed automatically.
0064
0065 <table class="table">
0066 <tr><th>Property Name</th><th>Default</th><th>Meaning</th><th>Since Version</th></tr>
0067 <tr>
0068 <td><code>spark.sql.files.maxPartitionBytes</code></td>
0069 <td>134217728 (128 MB)</td>
0070 <td>
0071 The maximum number of bytes to pack into a single partition when reading files.
0072 This configuration is effective only when using file-based sources such as Parquet, JSON and ORC.
0073 </td>
0074 <td>2.0.0</td>
0075 </tr>
0076 <tr>
0077 <td><code>spark.sql.files.openCostInBytes</code></td>
0078 <td>4194304 (4 MB)</td>
0079 <td>
0080 The estimated cost to open a file, measured by the number of bytes could be scanned in the same
0081 time. This is used when putting multiple files into a partition. It is better to over-estimated,
0082 then the partitions with small files will be faster than partitions with bigger files (which is
0083 scheduled first). This configuration is effective only when using file-based sources such as Parquet,
0084 JSON and ORC.
0085 </td>
0086 <td>2.0.0</td>
0087 </tr>
0088 <tr>
0089 <td><code>spark.sql.broadcastTimeout</code></td>
0090 <td>300</td>
0091 <td>
0092 <p>
0093 Timeout in seconds for the broadcast wait time in broadcast joins
0094 </p>
0095 </td>
0096 <td>1.3.0</td>
0097 </tr>
0098 <tr>
0099 <td><code>spark.sql.autoBroadcastJoinThreshold</code></td>
0100 <td>10485760 (10 MB)</td>
0101 <td>
0102 Configures the maximum size in bytes for a table that will be broadcast to all worker nodes when
0103 performing a join. By setting this value to -1 broadcasting can be disabled. Note that currently
0104 statistics are only supported for Hive Metastore tables where the command
0105 <code>ANALYZE TABLE <tableName> COMPUTE STATISTICS noscan</code> has been run.
0106 </td>
0107 <td>1.1.0</td>
0108 </tr>
0109 <tr>
0110 <td><code>spark.sql.shuffle.partitions</code></td>
0111 <td>200</td>
0112 <td>
0113 Configures the number of partitions to use when shuffling data for joins or aggregations.
0114 </td>
0115 <td>1.1.0</td>
0116 </tr>
0117 </table>
0118
0119 ## Join Strategy Hints for SQL Queries
0120
0121 The join strategy hints, namely `BROADCAST`, `MERGE`, `SHUFFLE_HASH` and `SHUFFLE_REPLICATE_NL`,
0122 instruct Spark to use the hinted strategy on each specified relation when joining them with another
0123 relation. For example, when the `BROADCAST` hint is used on table 't1', broadcast join (either
0124 broadcast hash join or broadcast nested loop join depending on whether there is any equi-join key)
0125 with 't1' as the build side will be prioritized by Spark even if the size of table 't1' suggested
0126 by the statistics is above the configuration `spark.sql.autoBroadcastJoinThreshold`.
0127
0128 When different join strategy hints are specified on both sides of a join, Spark prioritizes the
0129 `BROADCAST` hint over the `MERGE` hint over the `SHUFFLE_HASH` hint over the `SHUFFLE_REPLICATE_NL`
0130 hint. When both sides are specified with the `BROADCAST` hint or the `SHUFFLE_HASH` hint, Spark will
0131 pick the build side based on the join type and the sizes of the relations.
0132
0133 Note that there is no guarantee that Spark will choose the join strategy specified in the hint since
0134 a specific strategy may not support all join types.
0135
0136 <div class="codetabs">
0137
0138 <div data-lang="scala" markdown="1">
0139
0140 {% highlight scala %}
0141 spark.table("src").join(spark.table("records").hint("broadcast"), "key").show()
0142 {% endhighlight %}
0143
0144 </div>
0145
0146 <div data-lang="java" markdown="1">
0147
0148 {% highlight java %}
0149 spark.table("src").join(spark.table("records").hint("broadcast"), "key").show();
0150 {% endhighlight %}
0151
0152 </div>
0153
0154 <div data-lang="python" markdown="1">
0155
0156 {% highlight python %}
0157 spark.table("src").join(spark.table("records").hint("broadcast"), "key").show()
0158 {% endhighlight %}
0159
0160 </div>
0161
0162 <div data-lang="r" markdown="1">
0163
0164 {% highlight r %}
0165 src <- sql("SELECT * FROM src")
0166 records <- sql("SELECT * FROM records")
0167 head(join(src, hint(records, "broadcast"), src$key == records$key))
0168 {% endhighlight %}
0169
0170 </div>
0171
0172 <div data-lang="SQL" markdown="1">
0173
0174 {% highlight sql %}
0175 -- We accept BROADCAST, BROADCASTJOIN and MAPJOIN for broadcast hint
0176 SELECT /*+ BROADCAST(r) */ * FROM records r JOIN src s ON r.key = s.key
0177 {% endhighlight %}
0178
0179 </div>
0180 </div>
0181
0182 For more details please refer to the documentation of [Join Hints](sql-ref-syntax-qry-select-hints.html#join-hints).
0183
0184 ## Coalesce Hints for SQL Queries
0185
0186 Coalesce hints allows the Spark SQL users to control the number of output files just like the
0187 `coalesce`, `repartition` and `repartitionByRange` in Dataset API, they can be used for performance
0188 tuning and reducing the number of output files. The "COALESCE" hint only has a partition number as a
0189 parameter. The "REPARTITION" hint has a partition number, columns, or both of them as parameters.
0190 The "REPARTITION_BY_RANGE" hint must have column names and a partition number is optional.
0191
0192 SELECT /*+ COALESCE(3) */ * FROM t
0193 SELECT /*+ REPARTITION(3) */ * FROM t
0194 SELECT /*+ REPARTITION(c) */ * FROM t
0195 SELECT /*+ REPARTITION(3, c) */ * FROM t
0196 SELECT /*+ REPARTITION_BY_RANGE(c) */ * FROM t
0197 SELECT /*+ REPARTITION_BY_RANGE(3, c) */ * FROM t
0198
0199 For more details please refer to the documentation of [Partitioning Hints](sql-ref-syntax-qry-select-hints.html#partitioning-hints).
0200
0201 ## Adaptive Query Execution
0202 Adaptive Query Execution (AQE) is an optimization technique in Spark SQL that makes use of the runtime statistics to choose the most efficient query execution plan. AQE is disabled by default. Spark SQL can use the umbrella configuration of `spark.sql.adaptive.enabled` to control whether turn it on/off. As of Spark 3.0, there are three major features in AQE, including coalescing post-shuffle partitions, converting sort-merge join to broadcast join, and skew join optimization.
0203
0204 ### Coalescing Post Shuffle Partitions
0205 This feature coalesces the post shuffle partitions based on the map output statistics when both `spark.sql.adaptive.enabled` and `spark.sql.adaptive.coalescePartitions.enabled` configurations are true. This feature simplifies the tuning of shuffle partition number when running queries. You do not need to set a proper shuffle partition number to fit your dataset. Spark can pick the proper shuffle partition number at runtime once you set a large enough initial number of shuffle partitions via `spark.sql.adaptive.coalescePartitions.initialPartitionNum` configuration.
0206 <table class="table">
0207 <tr><th>Property Name</th><th>Default</th><th>Meaning</th><th>Since Version</th></tr>
0208 <tr>
0209 <td><code>spark.sql.adaptive.coalescePartitions.enabled</code></td>
0210 <td>true</td>
0211 <td>
0212 When true and <code>spark.sql.adaptive.enabled</code> is true, Spark will coalesce contiguous shuffle partitions according to the target size (specified by <code>spark.sql.adaptive.advisoryPartitionSizeInBytes</code>), to avoid too many small tasks.
0213 </td>
0214 <td>3.0.0</td>
0215 </tr>
0216 <tr>
0217 <td><code>spark.sql.adaptive.coalescePartitions.minPartitionNum</code></td>
0218 <td>Default Parallelism</td>
0219 <td>
0220 The minimum number of shuffle partitions after coalescing. If not set, the default value is the default parallelism of the Spark cluster. This configuration only has an effect when <code>spark.sql.adaptive.enabled</code> and <code>spark.sql.adaptive.coalescePartitions.enabled</code> are both enabled.
0221 </td>
0222 <td>3.0.0</td>
0223 </tr>
0224 <tr>
0225 <td><code>spark.sql.adaptive.coalescePartitions.initialPartitionNum</code></td>
0226 <td>200</td>
0227 <td>
0228 The initial number of shuffle partitions before coalescing. By default it equals to <code>spark.sql.shuffle.partitions</code>. This configuration only has an effect when <code>spark.sql.adaptive.enabled</code> and <code>spark.sql.adaptive.coalescePartitions.enabled</code> are both enabled.
0229 </td>
0230 <td>3.0.0</td>
0231 </tr>
0232 <tr>
0233 <td><code>spark.sql.adaptive.advisoryPartitionSizeInBytes</code></td>
0234 <td>64 MB</td>
0235 <td>
0236 The advisory size in bytes of the shuffle partition during adaptive optimization (when <code>spark.sql.adaptive.enabled</code> is true). It takes effect when Spark coalesces small shuffle partitions or splits skewed shuffle partition.
0237 </td>
0238 <td>3.0.0</td>
0239 </tr>
0240 </table>
0241
0242 ### Converting sort-merge join to broadcast join
0243 AQE converts sort-merge join to broadcast hash join when the runtime statistics of any join side is smaller than the broadcast hash join threshold. This is not as efficient as planning a broadcast hash join in the first place, but it's better than keep doing the sort-merge join, as we can save the sorting of both the join sides, and read shuffle files locally to save network traffic(if `spark.sql.adaptive.localShuffleReader.enabled` is true)
0244
0245 ### Optimizing Skew Join
0246 Data skew can severely downgrade the performance of join queries. This feature dynamically handles skew in sort-merge join by splitting (and replicating if needed) skewed tasks into roughly evenly sized tasks. It takes effect when both `spark.sql.adaptive.enabled` and `spark.sql.adaptive.skewJoin.enabled` configurations are enabled.
0247 <table class="table">
0248 <tr><th>Property Name</th><th>Default</th><th>Meaning</th><th>Since Version</th></tr>
0249 <tr>
0250 <td><code>spark.sql.adaptive.skewJoin.enabled</code></td>
0251 <td>true</td>
0252 <td>
0253 When true and <code>spark.sql.adaptive.enabled</code> is true, Spark dynamically handles skew in sort-merge join by splitting (and replicating if needed) skewed partitions.
0254 </td>
0255 <td>3.0.0</td>
0256 </tr>
0257 <tr>
0258 <td><code>spark.sql.adaptive.skewJoin.skewedPartitionFactor</code></td>
0259 <td>10</td>
0260 <td>
0261 A partition is considered as skewed if its size is larger than this factor multiplying the median partition size and also larger than <code>spark.sql.adaptive.skewedPartitionThresholdInBytes</code>.
0262 </td>
0263 <td>3.0.0</td>
0264 </tr>
0265 <tr>
0266 <td><code>spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes</code></td>
0267 <td>256MB</td>
0268 <td>
0269 A partition is considered as skewed if its size in bytes is larger than this threshold and also larger than <code>spark.sql.adaptive.skewJoin.skewedPartitionFactor</code> multiplying the median partition size. Ideally this config should be set larger than <code>spark.sql.adaptive.advisoryPartitionSizeInBytes</code>.
0270 </td>
0271 <td>3.0.0</td>
0272 </tr>
0273 </table>