0001 ---
0002 layout: global
0003 displayTitle: Tuning Spark
0004 title: Tuning
0005 description: Tuning and performance optimization guide for Spark SPARK_VERSION_SHORT
0006 license: |
0007 Licensed to the Apache Software Foundation (ASF) under one or more
0008 contributor license agreements. See the NOTICE file distributed with
0009 this work for additional information regarding copyright ownership.
0010 The ASF licenses this file to You under the Apache License, Version 2.0
0011 (the "License"); you may not use this file except in compliance with
0012 the License. You may obtain a copy of the License at
0013
0014 http://www.apache.org/licenses/LICENSE-2.0
0015
0016 Unless required by applicable law or agreed to in writing, software
0017 distributed under the License is distributed on an "AS IS" BASIS,
0018 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0019 See the License for the specific language governing permissions and
0020 limitations under the License.
0021 ---
0022
0023 * This will become a table of contents (this text will be scraped).
0024 {:toc}
0025
0026 Because of the in-memory nature of most Spark computations, Spark programs can be bottlenecked
0027 by any resource in the cluster: CPU, network bandwidth, or memory.
0028 Most often, if the data fits in memory, the bottleneck is network bandwidth, but sometimes, you
0029 also need to do some tuning, such as
0030 [storing RDDs in serialized form](rdd-programming-guide.html#rdd-persistence), to
0031 decrease memory usage.
0032 This guide will cover two main topics: data serialization, which is crucial for good network
0033 performance and can also reduce memory use, and memory tuning. We also sketch several smaller topics.
0034
0035 # Data Serialization
0036
0037 Serialization plays an important role in the performance of any distributed application.
0038 Formats that are slow to serialize objects into, or consume a large number of
0039 bytes, will greatly slow down the computation.
0040 Often, this will be the first thing you should tune to optimize a Spark application.
0041 Spark aims to strike a balance between convenience (allowing you to work with any Java type
0042 in your operations) and performance. It provides two serialization libraries:
0043
0044 * [Java serialization](https://docs.oracle.com/javase/8/docs/api/java/io/Serializable.html):
0045 By default, Spark serializes objects using Java's `ObjectOutputStream` framework, and can work
0046 with any class you create that implements
0047 [`java.io.Serializable`](https://docs.oracle.com/javase/8/docs/api/java/io/Serializable.html).
0048 You can also control the performance of your serialization more closely by extending
0049 [`java.io.Externalizable`](https://docs.oracle.com/javase/8/docs/api/java/io/Externalizable.html).
0050 Java serialization is flexible but often quite slow, and leads to large
0051 serialized formats for many classes.
0052 * [Kryo serialization](https://github.com/EsotericSoftware/kryo): Spark can also use
0053 the Kryo library (version 4) to serialize objects more quickly. Kryo is significantly
0054 faster and more compact than Java serialization (often as much as 10x), but does not support all
0055 `Serializable` types and requires you to *register* the classes you'll use in the program in advance
0056 for best performance.
0057
0058 You can switch to using Kryo by initializing your job with a [SparkConf](configuration.html#spark-properties)
0059 and calling `conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")`.
0060 This setting configures the serializer used for not only shuffling data between worker
0061 nodes but also when serializing RDDs to disk. The only reason Kryo is not the default is because of the custom
0062 registration requirement, but we recommend trying it in any network-intensive application.
0063 Since Spark 2.0.0, we internally use Kryo serializer when shuffling RDDs with simple types, arrays of simple types, or string type.
0064
0065 Spark automatically includes Kryo serializers for the many commonly-used core Scala classes covered
0066 in the AllScalaRegistrar from the [Twitter chill](https://github.com/twitter/chill) library.
0067
0068 To register your own custom classes with Kryo, use the `registerKryoClasses` method.
0069
0070 {% highlight scala %}
0071 val conf = new SparkConf().setMaster(...).setAppName(...)
0072 conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
0073 val sc = new SparkContext(conf)
0074 {% endhighlight %}
0075
0076 The [Kryo documentation](https://github.com/EsotericSoftware/kryo) describes more advanced
0077 registration options, such as adding custom serialization code.
0078
0079 If your objects are large, you may also need to increase the `spark.kryoserializer.buffer`
0080 [config](configuration.html#compression-and-serialization). This value needs to be large enough
0081 to hold the *largest* object you will serialize.
0082
0083 Finally, if you don't register your custom classes, Kryo will still work, but it will have to store
0084 the full class name with each object, which is wasteful.
0085
0086 # Memory Tuning
0087
0088 There are three considerations in tuning memory usage: the *amount* of memory used by your objects
0089 (you may want your entire dataset to fit in memory), the *cost* of accessing those objects, and the
0090 overhead of *garbage collection* (if you have high turnover in terms of objects).
0091
0092 By default, Java objects are fast to access, but can easily consume a factor of 2-5x more space
0093 than the "raw" data inside their fields. This is due to several reasons:
0094
0095 * Each distinct Java object has an "object header", which is about 16 bytes and contains information
0096 such as a pointer to its class. For an object with very little data in it (say one `Int` field), this
0097 can be bigger than the data.
0098 * Java `String`s have about 40 bytes of overhead over the raw string data (since they store it in an
0099 array of `Char`s and keep extra data such as the length), and store each character
0100 as *two* bytes due to `String`'s internal usage of UTF-16 encoding. Thus a 10-character string can
0101 easily consume 60 bytes.
0102 * Common collection classes, such as `HashMap` and `LinkedList`, use linked data structures, where
0103 there is a "wrapper" object for each entry (e.g. `Map.Entry`). This object not only has a header,
0104 but also pointers (typically 8 bytes each) to the next object in the list.
0105 * Collections of primitive types often store them as "boxed" objects such as `java.lang.Integer`.
0106
0107 This section will start with an overview of memory management in Spark, then discuss specific
0108 strategies the user can take to make more efficient use of memory in his/her application. In
0109 particular, we will describe how to determine the memory usage of your objects, and how to
0110 improve it -- either by changing your data structures, or by storing data in a serialized
0111 format. We will then cover tuning Spark's cache size and the Java garbage collector.
0112
0113 ## Memory Management Overview
0114
0115 Memory usage in Spark largely falls under one of two categories: execution and storage.
0116 Execution memory refers to that used for computation in shuffles, joins, sorts and aggregations,
0117 while storage memory refers to that used for caching and propagating internal data across the
0118 cluster. In Spark, execution and storage share a unified region (M). When no execution memory is
0119 used, storage can acquire all the available memory and vice versa. Execution may evict storage
0120 if necessary, but only until total storage memory usage falls under a certain threshold (R).
0121 In other words, `R` describes a subregion within `M` where cached blocks are never evicted.
0122 Storage may not evict execution due to complexities in implementation.
0123
0124 This design ensures several desirable properties. First, applications that do not use caching
0125 can use the entire space for execution, obviating unnecessary disk spills. Second, applications
0126 that do use caching can reserve a minimum storage space (R) where their data blocks are immune
0127 to being evicted. Lastly, this approach provides reasonable out-of-the-box performance for a
0128 variety of workloads without requiring user expertise of how memory is divided internally.
0129
0130 Although there are two relevant configurations, the typical user should not need to adjust them
0131 as the default values are applicable to most workloads:
0132
0133 * `spark.memory.fraction` expresses the size of `M` as a fraction of the (JVM heap space - 300MiB)
0134 (default 0.6). The rest of the space (40%) is reserved for user data structures, internal
0135 metadata in Spark, and safeguarding against OOM errors in the case of sparse and unusually
0136 large records.
0137 * `spark.memory.storageFraction` expresses the size of `R` as a fraction of `M` (default 0.5).
0138 `R` is the storage space within `M` where cached blocks immune to being evicted by execution.
0139
0140 The value of `spark.memory.fraction` should be set in order to fit this amount of heap space
0141 comfortably within the JVM's old or "tenured" generation. See the discussion of advanced GC
0142 tuning below for details.
0143
0144 ## Determining Memory Consumption
0145
0146 The best way to size the amount of memory consumption a dataset will require is to create an RDD, put it
0147 into cache, and look at the "Storage" page in the web UI. The page will tell you how much memory the RDD
0148 is occupying.
0149
0150 To estimate the memory consumption of a particular object, use `SizeEstimator`'s `estimate` method.
0151 This is useful for experimenting with different data layouts to trim memory usage, as well as
0152 determining the amount of space a broadcast variable will occupy on each executor heap.
0153
0154 ## Tuning Data Structures
0155
0156 The first way to reduce memory consumption is to avoid the Java features that add overhead, such as
0157 pointer-based data structures and wrapper objects. There are several ways to do this:
0158
0159 1. Design your data structures to prefer arrays of objects, and primitive types, instead of the
0160 standard Java or Scala collection classes (e.g. `HashMap`). The [fastutil](http://fastutil.di.unimi.it)
0161 library provides convenient collection classes for primitive types that are compatible with the
0162 Java standard library.
0163 2. Avoid nested structures with a lot of small objects and pointers when possible.
0164 3. Consider using numeric IDs or enumeration objects instead of strings for keys.
0165 4. If you have less than 32 GiB of RAM, set the JVM flag `-XX:+UseCompressedOops` to make pointers be
0166 four bytes instead of eight. You can add these options in
0167 [`spark-env.sh`](configuration.html#environment-variables).
0168
0169 ## Serialized RDD Storage
0170
0171 When your objects are still too large to efficiently store despite this tuning, a much simpler way
0172 to reduce memory usage is to store them in *serialized* form, using the serialized StorageLevels in
0173 the [RDD persistence API](rdd-programming-guide.html#rdd-persistence), such as `MEMORY_ONLY_SER`.
0174 Spark will then store each RDD partition as one large byte array.
0175 The only downside of storing data in serialized form is slower access times, due to having to
0176 deserialize each object on the fly.
0177 We highly recommend [using Kryo](#data-serialization) if you want to cache data in serialized form, as
0178 it leads to much smaller sizes than Java serialization (and certainly than raw Java objects).
0179
0180 ## Garbage Collection Tuning
0181
0182 JVM garbage collection can be a problem when you have large "churn" in terms of the RDDs
0183 stored by your program. (It is usually not a problem in programs that just read an RDD once
0184 and then run many operations on it.) When Java needs to evict old objects to make room for new ones, it will
0185 need to trace through all your Java objects and find the unused ones. The main point to remember here is
0186 that *the cost of garbage collection is proportional to the number of Java objects*, so using data
0187 structures with fewer objects (e.g. an array of `Int`s instead of a `LinkedList`) greatly lowers
0188 this cost. An even better method is to persist objects in serialized form, as described above: now
0189 there will be only *one* object (a byte array) per RDD partition. Before trying other
0190 techniques, the first thing to try if GC is a problem is to use [serialized caching](#serialized-rdd-storage).
0191
0192 GC can also be a problem due to interference between your tasks' working memory (the
0193 amount of space needed to run the task) and the RDDs cached on your nodes. We will discuss how to control
0194 the space allocated to the RDD cache to mitigate this.
0195
0196 **Measuring the Impact of GC**
0197
0198 The first step in GC tuning is to collect statistics on how frequently garbage collection occurs and the amount of
0199 time spent GC. This can be done by adding `-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps` to the Java options. (See the [configuration guide](configuration.html#Dynamically-Loading-Spark-Properties) for info on passing Java options to Spark jobs.) Next time your Spark job is run, you will see messages printed in the worker's logs
0200 each time a garbage collection occurs. Note these logs will be on your cluster's worker nodes (in the `stdout` files in
0201 their work directories), *not* on your driver program.
0202
0203 **Advanced GC Tuning**
0204
0205 To further tune garbage collection, we first need to understand some basic information about memory management in the JVM:
0206
0207 * Java Heap space is divided in to two regions Young and Old. The Young generation is meant to hold short-lived objects
0208 while the Old generation is intended for objects with longer lifetimes.
0209
0210 * The Young generation is further divided into three regions \[Eden, Survivor1, Survivor2\].
0211
0212 * A simplified description of the garbage collection procedure: When Eden is full, a minor GC is run on Eden and objects
0213 that are alive from Eden and Survivor1 are copied to Survivor2. The Survivor regions are swapped. If an object is old
0214 enough or Survivor2 is full, it is moved to Old. Finally, when Old is close to full, a full GC is invoked.
0215
0216 The goal of GC tuning in Spark is to ensure that only long-lived RDDs are stored in the Old generation and that
0217 the Young generation is sufficiently sized to store short-lived objects. This will help avoid full GCs to collect
0218 temporary objects created during task execution. Some steps which may be useful are:
0219
0220 * Check if there are too many garbage collections by collecting GC stats. If a full GC is invoked multiple times for
0221 before a task completes, it means that there isn't enough memory available for executing tasks.
0222
0223 * If there are too many minor collections but not many major GCs, allocating more memory for Eden would help. You
0224 can set the size of the Eden to be an over-estimate of how much memory each task will need. If the size of Eden
0225 is determined to be `E`, then you can set the size of the Young generation using the option `-Xmn=4/3*E`. (The scaling
0226 up by 4/3 is to account for space used by survivor regions as well.)
0227
0228 * In the GC stats that are printed, if the OldGen is close to being full, reduce the amount of
0229 memory used for caching by lowering `spark.memory.fraction`; it is better to cache fewer
0230 objects than to slow down task execution. Alternatively, consider decreasing the size of
0231 the Young generation. This means lowering `-Xmn` if you've set it as above. If not, try changing the
0232 value of the JVM's `NewRatio` parameter. Many JVMs default this to 2, meaning that the Old generation
0233 occupies 2/3 of the heap. It should be large enough such that this fraction exceeds `spark.memory.fraction`.
0234
0235 * Try the G1GC garbage collector with `-XX:+UseG1GC`. It can improve performance in some situations where
0236 garbage collection is a bottleneck. Note that with large executor heap sizes, it may be important to
0237 increase the [G1 region size](http://www.oracle.com/technetwork/articles/java/g1gc-1984535.html)
0238 with `-XX:G1HeapRegionSize`
0239
0240 * As an example, if your task is reading data from HDFS, the amount of memory used by the task can be estimated using
0241 the size of the data block read from HDFS. Note that the size of a decompressed block is often 2 or 3 times the
0242 size of the block. So if we wish to have 3 or 4 tasks' worth of working space, and the HDFS block size is 128 MiB,
0243 we can estimate size of Eden to be `4*3*128MiB`.
0244
0245 * Monitor how the frequency and time taken by garbage collection changes with the new settings.
0246
0247 Our experience suggests that the effect of GC tuning depends on your application and the amount of memory available.
0248 There are [many more tuning options](https://docs.oracle.com/javase/8/docs/technotes/guides/vm/gctuning/index.html) described online,
0249 but at a high level, managing how frequently full GC takes place can help in reducing the overhead.
0250
0251 GC tuning flags for executors can be specified by setting `spark.executor.defaultJavaOptions` or `spark.executor.extraJavaOptions` in
0252 a job's configuration.
0253
0254 # Other Considerations
0255
0256 ## Level of Parallelism
0257
0258 Clusters will not be fully utilized unless you set the level of parallelism for each operation high
0259 enough. Spark automatically sets the number of "map" tasks to run on each file according to its size
0260 (though you can control it through optional parameters to `SparkContext.textFile`, etc), and for
0261 distributed "reduce" operations, such as `groupByKey` and `reduceByKey`, it uses the largest
0262 parent RDD's number of partitions. You can pass the level of parallelism as a second argument
0263 (see the [`spark.PairRDDFunctions`](api/scala/org/apache/spark/rdd/PairRDDFunctions.html) documentation),
0264 or set the config property `spark.default.parallelism` to change the default.
0265 In general, we recommend 2-3 tasks per CPU core in your cluster.
0266
0267 ## Memory Usage of Reduce Tasks
0268
0269 Sometimes, you will get an OutOfMemoryError not because your RDDs don't fit in memory, but because the
0270 working set of one of your tasks, such as one of the reduce tasks in `groupByKey`, was too large.
0271 Spark's shuffle operations (`sortByKey`, `groupByKey`, `reduceByKey`, `join`, etc) build a hash table
0272 within each task to perform the grouping, which can often be large. The simplest fix here is to
0273 *increase the level of parallelism*, so that each task's input set is smaller. Spark can efficiently
0274 support tasks as short as 200 ms, because it reuses one executor JVM across many tasks and it has
0275 a low task launching cost, so you can safely increase the level of parallelism to more than the
0276 number of cores in your clusters.
0277
0278 ## Broadcasting Large Variables
0279
0280 Using the [broadcast functionality](rdd-programming-guide.html#broadcast-variables)
0281 available in `SparkContext` can greatly reduce the size of each serialized task, and the cost
0282 of launching a job over a cluster. If your tasks use any large object from the driver program
0283 inside of them (e.g. a static lookup table), consider turning it into a broadcast variable.
0284 Spark prints the serialized size of each task on the master, so you can look at that to
0285 decide whether your tasks are too large; in general tasks larger than about 20 KiB are probably
0286 worth optimizing.
0287
0288 ## Data Locality
0289
0290 Data locality can have a major impact on the performance of Spark jobs. If data and the code that
0291 operates on it are together then computation tends to be fast. But if code and data are separated,
0292 one must move to the other. Typically it is faster to ship serialized code from place to place than
0293 a chunk of data because code size is much smaller than data. Spark builds its scheduling around
0294 this general principle of data locality.
0295
0296 Data locality is how close data is to the code processing it. There are several levels of
0297 locality based on the data's current location. In order from closest to farthest:
0298
0299 - `PROCESS_LOCAL` data is in the same JVM as the running code. This is the best locality
0300 possible
0301 - `NODE_LOCAL` data is on the same node. Examples might be in HDFS on the same node, or in
0302 another executor on the same node. This is a little slower than `PROCESS_LOCAL` because the data
0303 has to travel between processes
0304 - `NO_PREF` data is accessed equally quickly from anywhere and has no locality preference
0305 - `RACK_LOCAL` data is on the same rack of servers. Data is on a different server on the same rack
0306 so needs to be sent over the network, typically through a single switch
0307 - `ANY` data is elsewhere on the network and not in the same rack
0308
0309 Spark prefers to schedule all tasks at the best locality level, but this is not always possible. In
0310 situations where there is no unprocessed data on any idle executor, Spark switches to lower locality
0311 levels. There are two options: a) wait until a busy CPU frees up to start a task on data on the same
0312 server, or b) immediately start a new task in a farther away place that requires moving data there.
0313
0314 What Spark typically does is wait a bit in the hopes that a busy CPU frees up. Once that timeout
0315 expires, it starts moving the data from far away to the free CPU. The wait timeout for fallback
0316 between each level can be configured individually or all together in one parameter; see the
0317 `spark.locality` parameters on the [configuration page](configuration.html#scheduling) for details.
0318 You should increase these settings if your tasks are long and see poor locality, but the default
0319 usually works well.
0320
0321 # Summary
0322
0323 This has been a short guide to point out the main concerns you should know about when tuning a
0324 Spark application -- most importantly, data serialization and memory tuning. For most programs,
0325 switching to Kryo serialization and persisting data in serialized form will solve most common
0326 performance issues. Feel free to ask on the
0327 [Spark mailing list](https://spark.apache.org/community.html) about other tuning best practices.