Back to home page

OSCL-LXR

 
 

    


0001 ---
0002 layout: global
0003 title: Performance Tuning
0004 displayTitle: Performance Tuning
0005 license: |
0006   Licensed to the Apache Software Foundation (ASF) under one or more
0007   contributor license agreements.  See the NOTICE file distributed with
0008   this work for additional information regarding copyright ownership.
0009   The ASF licenses this file to You under the Apache License, Version 2.0
0010   (the "License"); you may not use this file except in compliance with
0011   the License.  You may obtain a copy of the License at
0012  
0013      http://www.apache.org/licenses/LICENSE-2.0
0014  
0015   Unless required by applicable law or agreed to in writing, software
0016   distributed under the License is distributed on an "AS IS" BASIS,
0017   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0018   See the License for the specific language governing permissions and
0019   limitations under the License.
0020 ---
0021 
0022 * Table of contents
0023 {:toc}
0024 
0025 For some workloads, it is possible to improve performance by either caching data in memory, or by
0026 turning on some experimental options.
0027 
0028 ## Caching Data In Memory
0029 
0030 Spark SQL can cache tables using an in-memory columnar format by calling `spark.catalog.cacheTable("tableName")` or `dataFrame.cache()`.
0031 Then Spark SQL will scan only required columns and will automatically tune compression to minimize
0032 memory usage and GC pressure. You can call `spark.catalog.uncacheTable("tableName")` to remove the table from memory.
0033 
0034 Configuration of in-memory caching can be done using the `setConf` method on `SparkSession` or by running
0035 `SET key=value` commands using SQL.
0036 
0037 <table class="table">
0038 <tr><th>Property Name</th><th>Default</th><th>Meaning</th><th>Since Version</th></tr>
0039 <tr>
0040   <td><code>spark.sql.inMemoryColumnarStorage.compressed</code></td>
0041   <td>true</td>
0042   <td>
0043     When set to true Spark SQL will automatically select a compression codec for each column based
0044     on statistics of the data.
0045   </td>
0046   <td>1.0.1</td>
0047 </tr>
0048 <tr>
0049   <td><code>spark.sql.inMemoryColumnarStorage.batchSize</code></td>
0050   <td>10000</td>
0051   <td>
0052     Controls the size of batches for columnar caching. Larger batch sizes can improve memory utilization
0053     and compression, but risk OOMs when caching data.
0054   </td>
0055   <td>1.1.1</td>
0056 </tr>
0057 
0058 </table>
0059 
0060 ## Other Configuration Options
0061 
0062 The following options can also be used to tune the performance of query execution. It is possible
0063 that these options will be deprecated in future release as more optimizations are performed automatically.
0064 
0065 <table class="table">
0066   <tr><th>Property Name</th><th>Default</th><th>Meaning</th><th>Since Version</th></tr>
0067   <tr>
0068     <td><code>spark.sql.files.maxPartitionBytes</code></td>
0069     <td>134217728 (128 MB)</td>
0070     <td>
0071       The maximum number of bytes to pack into a single partition when reading files.
0072       This configuration is effective only when using file-based sources such as Parquet, JSON and ORC.
0073     </td>
0074     <td>2.0.0</td>
0075   </tr>
0076   <tr>
0077     <td><code>spark.sql.files.openCostInBytes</code></td>
0078     <td>4194304 (4 MB)</td>
0079     <td>
0080       The estimated cost to open a file, measured by the number of bytes could be scanned in the same
0081       time. This is used when putting multiple files into a partition. It is better to over-estimated,
0082       then the partitions with small files will be faster than partitions with bigger files (which is
0083       scheduled first). This configuration is effective only when using file-based sources such as Parquet,
0084       JSON and ORC.
0085     </td>
0086     <td>2.0.0</td>
0087   </tr>
0088   <tr>
0089     <td><code>spark.sql.broadcastTimeout</code></td>
0090     <td>300</td>
0091     <td>
0092       <p>
0093         Timeout in seconds for the broadcast wait time in broadcast joins
0094       </p>
0095     </td>
0096     <td>1.3.0</td>
0097   </tr>
0098   <tr>
0099     <td><code>spark.sql.autoBroadcastJoinThreshold</code></td>
0100     <td>10485760 (10 MB)</td>
0101     <td>
0102       Configures the maximum size in bytes for a table that will be broadcast to all worker nodes when
0103       performing a join. By setting this value to -1 broadcasting can be disabled. Note that currently
0104       statistics are only supported for Hive Metastore tables where the command
0105       <code>ANALYZE TABLE &lt;tableName&gt; COMPUTE STATISTICS noscan</code> has been run.
0106     </td>
0107     <td>1.1.0</td>
0108   </tr>
0109   <tr>
0110     <td><code>spark.sql.shuffle.partitions</code></td>
0111     <td>200</td>
0112     <td>
0113       Configures the number of partitions to use when shuffling data for joins or aggregations.
0114     </td>
0115     <td>1.1.0</td>
0116   </tr>
0117 </table>
0118 
0119 ## Join Strategy Hints for SQL Queries
0120 
0121 The join strategy hints, namely `BROADCAST`, `MERGE`, `SHUFFLE_HASH` and `SHUFFLE_REPLICATE_NL`,
0122 instruct Spark to use the hinted strategy on each specified relation when joining them with another
0123 relation. For example, when the `BROADCAST` hint is used on table 't1', broadcast join (either
0124 broadcast hash join or broadcast nested loop join depending on whether there is any equi-join key)
0125 with 't1' as the build side will be prioritized by Spark even if the size of table 't1' suggested
0126 by the statistics is above the configuration `spark.sql.autoBroadcastJoinThreshold`.
0127 
0128 When different join strategy hints are specified on both sides of a join, Spark prioritizes the
0129 `BROADCAST` hint over the `MERGE` hint over the `SHUFFLE_HASH` hint over the `SHUFFLE_REPLICATE_NL`
0130 hint. When both sides are specified with the `BROADCAST` hint or the `SHUFFLE_HASH` hint, Spark will
0131 pick the build side based on the join type and the sizes of the relations.
0132 
0133 Note that there is no guarantee that Spark will choose the join strategy specified in the hint since
0134 a specific strategy may not support all join types.
0135 
0136 <div class="codetabs">
0137 
0138 <div data-lang="scala"  markdown="1">
0139 
0140 {% highlight scala %}
0141 spark.table("src").join(spark.table("records").hint("broadcast"), "key").show()
0142 {% endhighlight %}
0143 
0144 </div>
0145 
0146 <div data-lang="java"  markdown="1">
0147 
0148 {% highlight java %}
0149 spark.table("src").join(spark.table("records").hint("broadcast"), "key").show();
0150 {% endhighlight %}
0151 
0152 </div>
0153 
0154 <div data-lang="python"  markdown="1">
0155 
0156 {% highlight python %}
0157 spark.table("src").join(spark.table("records").hint("broadcast"), "key").show()
0158 {% endhighlight %}
0159 
0160 </div>
0161 
0162 <div data-lang="r"  markdown="1">
0163 
0164 {% highlight r %}
0165 src <- sql("SELECT * FROM src")
0166 records <- sql("SELECT * FROM records")
0167 head(join(src, hint(records, "broadcast"), src$key == records$key))
0168 {% endhighlight %}
0169 
0170 </div>
0171 
0172 <div data-lang="SQL"  markdown="1">
0173 
0174 {% highlight sql %}
0175 -- We accept BROADCAST, BROADCASTJOIN and MAPJOIN for broadcast hint
0176 SELECT /*+ BROADCAST(r) */ * FROM records r JOIN src s ON r.key = s.key
0177 {% endhighlight %}
0178 
0179 </div>
0180 </div>
0181 
0182 For more details please refer to the documentation of [Join Hints](sql-ref-syntax-qry-select-hints.html#join-hints).
0183 
0184 ## Coalesce Hints for SQL Queries
0185 
0186 Coalesce hints allows the Spark SQL users to control the number of output files just like the
0187 `coalesce`, `repartition` and `repartitionByRange` in Dataset API, they can be used for performance
0188 tuning and reducing the number of output files. The "COALESCE" hint only has a partition number as a
0189 parameter. The "REPARTITION" hint has a partition number, columns, or both of them as parameters.
0190 The "REPARTITION_BY_RANGE" hint must have column names and a partition number is optional.
0191 
0192     SELECT /*+ COALESCE(3) */ * FROM t
0193     SELECT /*+ REPARTITION(3) */ * FROM t
0194     SELECT /*+ REPARTITION(c) */ * FROM t
0195     SELECT /*+ REPARTITION(3, c) */ * FROM t
0196     SELECT /*+ REPARTITION_BY_RANGE(c) */ * FROM t
0197     SELECT /*+ REPARTITION_BY_RANGE(3, c) */ * FROM t
0198 
0199 For more details please refer to the documentation of [Partitioning Hints](sql-ref-syntax-qry-select-hints.html#partitioning-hints).
0200 
0201 ## Adaptive Query Execution
0202 Adaptive Query Execution (AQE) is an optimization technique in Spark SQL that makes use of the runtime statistics to choose the most efficient query execution plan. AQE is disabled by default. Spark SQL can use the umbrella configuration of `spark.sql.adaptive.enabled` to control whether turn it on/off. As of Spark 3.0, there are three major features in AQE, including coalescing post-shuffle partitions, converting sort-merge join to broadcast join, and skew join optimization.
0203 
0204 ### Coalescing Post Shuffle Partitions
0205 This feature coalesces the post shuffle partitions based on the map output statistics when both `spark.sql.adaptive.enabled` and `spark.sql.adaptive.coalescePartitions.enabled` configurations are true. This feature simplifies the tuning of shuffle partition number when running queries. You do not need to set a proper shuffle partition number to fit your dataset. Spark can pick the proper shuffle partition number at runtime once you set a large enough initial number of shuffle partitions via `spark.sql.adaptive.coalescePartitions.initialPartitionNum` configuration.
0206  <table class="table">
0207    <tr><th>Property Name</th><th>Default</th><th>Meaning</th><th>Since Version</th></tr>
0208    <tr>
0209      <td><code>spark.sql.adaptive.coalescePartitions.enabled</code></td>
0210      <td>true</td>
0211      <td>
0212        When true and <code>spark.sql.adaptive.enabled</code> is true, Spark will coalesce contiguous shuffle partitions according to the target size (specified by <code>spark.sql.adaptive.advisoryPartitionSizeInBytes</code>), to avoid too many small tasks.
0213      </td>
0214      <td>3.0.0</td>
0215    </tr>
0216    <tr>
0217      <td><code>spark.sql.adaptive.coalescePartitions.minPartitionNum</code></td>
0218      <td>Default Parallelism</td>
0219      <td>
0220        The minimum number of shuffle partitions after coalescing. If not set, the default value is the default parallelism of the Spark cluster. This configuration only has an effect when <code>spark.sql.adaptive.enabled</code> and <code>spark.sql.adaptive.coalescePartitions.enabled</code> are both enabled.
0221      </td>
0222      <td>3.0.0</td>
0223    </tr>
0224    <tr>
0225      <td><code>spark.sql.adaptive.coalescePartitions.initialPartitionNum</code></td>
0226      <td>200</td>
0227      <td>
0228        The initial number of shuffle partitions before coalescing. By default it equals to <code>spark.sql.shuffle.partitions</code>. This configuration only has an effect when <code>spark.sql.adaptive.enabled</code> and <code>spark.sql.adaptive.coalescePartitions.enabled</code> are both enabled.
0229      </td>
0230      <td>3.0.0</td>
0231    </tr>
0232    <tr>
0233      <td><code>spark.sql.adaptive.advisoryPartitionSizeInBytes</code></td>
0234      <td>64 MB</td>
0235      <td>
0236        The advisory size in bytes of the shuffle partition during adaptive optimization (when <code>spark.sql.adaptive.enabled</code> is true). It takes effect when Spark coalesces small shuffle partitions or splits skewed shuffle partition.
0237      </td>
0238      <td>3.0.0</td>
0239    </tr>
0240  </table>
0241  
0242 ### Converting sort-merge join to broadcast join
0243 AQE converts sort-merge join to broadcast hash join when the runtime statistics of any join side is smaller than the broadcast hash join threshold. This is not as efficient as planning a broadcast hash join in the first place, but it's better than keep doing the sort-merge join, as we can save the sorting of both the join sides, and read shuffle files locally to save network traffic(if `spark.sql.adaptive.localShuffleReader.enabled` is true)
0244 
0245 ### Optimizing Skew Join
0246 Data skew can severely downgrade the performance of join queries. This feature dynamically handles skew in sort-merge join by splitting (and replicating if needed) skewed tasks into roughly evenly sized tasks. It takes effect when both `spark.sql.adaptive.enabled` and `spark.sql.adaptive.skewJoin.enabled` configurations are enabled.
0247   <table class="table">
0248      <tr><th>Property Name</th><th>Default</th><th>Meaning</th><th>Since Version</th></tr>
0249      <tr>
0250        <td><code>spark.sql.adaptive.skewJoin.enabled</code></td>
0251        <td>true</td>
0252        <td>
0253          When true and <code>spark.sql.adaptive.enabled</code> is true, Spark dynamically handles skew in sort-merge join by splitting (and replicating if needed) skewed partitions.
0254        </td>
0255        <td>3.0.0</td>
0256      </tr>
0257      <tr>
0258        <td><code>spark.sql.adaptive.skewJoin.skewedPartitionFactor</code></td>
0259        <td>10</td>
0260        <td>
0261          A partition is considered as skewed if its size is larger than this factor multiplying the median partition size and also larger than <code>spark.sql.adaptive.skewedPartitionThresholdInBytes</code>.
0262        </td>
0263        <td>3.0.0</td>
0264      </tr>
0265      <tr>
0266        <td><code>spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes</code></td>
0267        <td>256MB</td>
0268        <td>
0269          A partition is considered as skewed if its size in bytes is larger than this threshold and also larger than <code>spark.sql.adaptive.skewJoin.skewedPartitionFactor</code> multiplying the median partition size. Ideally this config should be set larger than <code>spark.sql.adaptive.advisoryPartitionSizeInBytes</code>.
0270        </td>
0271        <td>3.0.0</td>
0272      </tr>
0273    </table>