Back to home page

OSCL-LXR

 
 

    


0001 ---
0002 layout: global
0003 title: Job Scheduling
0004 license: |
0005   Licensed to the Apache Software Foundation (ASF) under one or more
0006   contributor license agreements.  See the NOTICE file distributed with
0007   this work for additional information regarding copyright ownership.
0008   The ASF licenses this file to You under the Apache License, Version 2.0
0009   (the "License"); you may not use this file except in compliance with
0010   the License.  You may obtain a copy of the License at
0011  
0012      http://www.apache.org/licenses/LICENSE-2.0
0013  
0014   Unless required by applicable law or agreed to in writing, software
0015   distributed under the License is distributed on an "AS IS" BASIS,
0016   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0017   See the License for the specific language governing permissions and
0018   limitations under the License.
0019 ---
0020 
0021 * This will become a table of contents (this text will be scraped).
0022 {:toc}
0023 
0024 # Overview
0025 
0026 Spark has several facilities for scheduling resources between computations. First, recall that, as described
0027 in the [cluster mode overview](cluster-overview.html), each Spark application (instance of SparkContext)
0028 runs an independent set of executor processes. The cluster managers that Spark runs on provide
0029 facilities for [scheduling across applications](#scheduling-across-applications). Second,
0030 _within_ each Spark application, multiple "jobs" (Spark actions) may be running concurrently
0031 if they were submitted by different threads. This is common if your application is serving requests
0032 over the network. Spark includes a [fair scheduler](#scheduling-within-an-application) to schedule resources within each SparkContext.
0033 
0034 # Scheduling Across Applications
0035 
0036 When running on a cluster, each Spark application gets an independent set of executor JVMs that only
0037 run tasks and store data for that application. If multiple users need to share your cluster, there are
0038 different options to manage allocation, depending on the cluster manager.
0039 
0040 The simplest option, available on all cluster managers, is _static partitioning_ of resources. With
0041 this approach, each application is given a maximum amount of resources it can use and holds onto them
0042 for its whole duration. This is the approach used in Spark's [standalone](spark-standalone.html)
0043 and [YARN](running-on-yarn.html) modes, as well as the
0044 [coarse-grained Mesos mode](running-on-mesos.html#mesos-run-modes).
0045 Resource allocation can be configured as follows, based on the cluster type:
0046 
0047 * **Standalone mode:** By default, applications submitted to the standalone mode cluster will run in
0048   FIFO (first-in-first-out) order, and each application will try to use all available nodes. You can limit
0049   the number of nodes an application uses by setting the `spark.cores.max` configuration property in it,
0050   or change the default for applications that don't set this setting through `spark.deploy.defaultCores`.
0051   Finally, in addition to controlling cores, each application's `spark.executor.memory` setting controls
0052   its memory use.
0053 * **Mesos:** To use static partitioning on Mesos, set the `spark.mesos.coarse` configuration property to `true`,
0054   and optionally set `spark.cores.max` to limit each application's resource share as in the standalone mode.
0055   You should also set `spark.executor.memory` to control the executor memory.
0056 * **YARN:** The `--num-executors` option to the Spark YARN client controls how many executors it will allocate
0057   on the cluster (`spark.executor.instances` as configuration property), while `--executor-memory`
0058   (`spark.executor.memory` configuration property) and `--executor-cores` (`spark.executor.cores` configuration
0059   property) control the resources per executor. For more information, see the
0060   [YARN Spark Properties](running-on-yarn.html).
0061 
0062 A second option available on Mesos is _dynamic sharing_ of CPU cores. In this mode, each Spark application
0063 still has a fixed and independent memory allocation (set by `spark.executor.memory`), but when the
0064 application is not running tasks on a machine, other applications may run tasks on those cores. This mode
0065 is useful when you expect large numbers of not overly active applications, such as shell sessions from
0066 separate users. However, it comes with a risk of less predictable latency, because it may take a while for
0067 an application to gain back cores on one node when it has work to do. To use this mode, simply use a
0068 `mesos://` URL and set `spark.mesos.coarse` to false.
0069 
0070 Note that none of the modes currently provide memory sharing across applications. If you would like to share
0071 data this way, we recommend running a single server application that can serve multiple requests by querying
0072 the same RDDs.
0073 
0074 ## Dynamic Resource Allocation
0075 
0076 Spark provides a mechanism to dynamically adjust the resources your application occupies based
0077 on the workload. This means that your application may give resources back to the cluster if they
0078 are no longer used and request them again later when there is demand. This feature is particularly
0079 useful if multiple applications share resources in your Spark cluster.
0080 
0081 This feature is disabled by default and available on all coarse-grained cluster managers, i.e.
0082 [standalone mode](spark-standalone.html), [YARN mode](running-on-yarn.html), and
0083 [Mesos coarse-grained mode](running-on-mesos.html#mesos-run-modes).
0084 
0085 ### Configuration and Setup
0086 
0087 There are two requirements for using this feature. First, your application must set
0088 `spark.dynamicAllocation.enabled` to `true`. Second, you must set up an *external shuffle service*
0089 on each worker node in the same cluster and set `spark.shuffle.service.enabled` to true in your
0090 application. The purpose of the external shuffle service is to allow executors to be removed
0091 without deleting shuffle files written by them (more detail described
0092 [below](job-scheduling.html#graceful-decommission-of-executors)). The way to set up this service
0093 varies across cluster managers:
0094 
0095 In standalone mode, simply start your workers with `spark.shuffle.service.enabled` set to `true`.
0096 
0097 In Mesos coarse-grained mode, run `$SPARK_HOME/sbin/start-mesos-shuffle-service.sh` on all
0098 slave nodes with `spark.shuffle.service.enabled` set to `true`. For instance, you may do so
0099 through Marathon.
0100 
0101 In YARN mode, follow the instructions [here](running-on-yarn.html#configuring-the-external-shuffle-service).
0102 
0103 All other relevant configurations are optional and under the `spark.dynamicAllocation.*` and
0104 `spark.shuffle.service.*` namespaces. For more detail, see the
0105 [configurations page](configuration.html#dynamic-allocation).
0106 
0107 ### Resource Allocation Policy
0108 
0109 At a high level, Spark should relinquish executors when they are no longer used and acquire
0110 executors when they are needed. Since there is no definitive way to predict whether an executor
0111 that is about to be removed will run a task in the near future, or whether a new executor that is
0112 about to be added will actually be idle, we need a set of heuristics to determine when to remove
0113 and request executors.
0114 
0115 #### Request Policy
0116 
0117 A Spark application with dynamic allocation enabled requests additional executors when it has
0118 pending tasks waiting to be scheduled. This condition necessarily implies that the existing set
0119 of executors is insufficient to simultaneously saturate all tasks that have been submitted but
0120 not yet finished.
0121 
0122 Spark requests executors in rounds. The actual request is triggered when there have been pending
0123 tasks for `spark.dynamicAllocation.schedulerBacklogTimeout` seconds, and then triggered again
0124 every `spark.dynamicAllocation.sustainedSchedulerBacklogTimeout` seconds thereafter if the queue
0125 of pending tasks persists. Additionally, the number of executors requested in each round increases
0126 exponentially from the previous round. For instance, an application will add 1 executor in the
0127 first round, and then 2, 4, 8 and so on executors in the subsequent rounds.
0128 
0129 The motivation for an exponential increase policy is twofold. First, an application should request
0130 executors cautiously in the beginning in case it turns out that only a few additional executors is
0131 sufficient. This echoes the justification for TCP slow start. Second, the application should be
0132 able to ramp up its resource usage in a timely manner in case it turns out that many executors are
0133 actually needed.
0134 
0135 #### Remove Policy
0136 
0137 The policy for removing executors is much simpler. A Spark application removes an executor when
0138 it has been idle for more than `spark.dynamicAllocation.executorIdleTimeout` seconds. Note that,
0139 under most circumstances, this condition is mutually exclusive with the request condition, in that
0140 an executor should not be idle if there are still pending tasks to be scheduled.
0141 
0142 ### Graceful Decommission of Executors
0143 
0144 Before dynamic allocation, a Spark executor exits either on failure or when the associated
0145 application has also exited. In both scenarios, all state associated with the executor is no
0146 longer needed and can be safely discarded. With dynamic allocation, however, the application
0147 is still running when an executor is explicitly removed. If the application attempts to access
0148 state stored in or written by the executor, it will have to perform a recompute the state. Thus,
0149 Spark needs a mechanism to decommission an executor gracefully by preserving its state before
0150 removing it.
0151 
0152 This requirement is especially important for shuffles. During a shuffle, the Spark executor first
0153 writes its own map outputs locally to disk, and then acts as the server for those files when other
0154 executors attempt to fetch them. In the event of stragglers, which are tasks that run for much
0155 longer than their peers, dynamic allocation may remove an executor before the shuffle completes,
0156 in which case the shuffle files written by that executor must be recomputed unnecessarily.
0157 
0158 The solution for preserving shuffle files is to use an external shuffle service, also introduced
0159 in Spark 1.2. This service refers to a long-running process that runs on each node of your cluster
0160 independently of your Spark applications and their executors. If the service is enabled, Spark
0161 executors will fetch shuffle files from the service instead of from each other. This means any
0162 shuffle state written by an executor may continue to be served beyond the executor's lifetime.
0163 
0164 In addition to writing shuffle files, executors also cache data either on disk or in memory.
0165 When an executor is removed, however, all cached data will no longer be accessible.  To mitigate this,
0166 by default executors containing cached data are never removed.  You can configure this behavior with
0167 `spark.dynamicAllocation.cachedExecutorIdleTimeout`.  In future releases, the cached data may be
0168 preserved through an off-heap storage similar in spirit to how shuffle files are preserved through
0169 the external shuffle service.
0170 
0171 # Scheduling Within an Application
0172 
0173 Inside a given Spark application (SparkContext instance), multiple parallel jobs can run simultaneously if
0174 they were submitted from separate threads. By "job", in this section, we mean a Spark action (e.g. `save`,
0175 `collect`) and any tasks that need to run to evaluate that action. Spark's scheduler is fully thread-safe
0176 and supports this use case to enable applications that serve multiple requests (e.g. queries for
0177 multiple users).
0178 
0179 By default, Spark's scheduler runs jobs in FIFO fashion. Each job is divided into "stages" (e.g. map and
0180 reduce phases), and the first job gets priority on all available resources while its stages have tasks to
0181 launch, then the second job gets priority, etc. If the jobs at the head of the queue don't need to use
0182 the whole cluster, later jobs can start to run right away, but if the jobs at the head of the queue are
0183 large, then later jobs may be delayed significantly.
0184 
0185 Starting in Spark 0.8, it is also possible to configure fair sharing between jobs. Under fair sharing,
0186 Spark assigns tasks between jobs in a "round robin" fashion, so that all jobs get a roughly equal share
0187 of cluster resources. This means that short jobs submitted while a long job is running can start receiving
0188 resources right away and still get good response times, without waiting for the long job to finish. This
0189 mode is best for multi-user settings.
0190 
0191 To enable the fair scheduler, simply set the `spark.scheduler.mode` property to `FAIR` when configuring
0192 a SparkContext:
0193 
0194 {% highlight scala %}
0195 val conf = new SparkConf().setMaster(...).setAppName(...)
0196 conf.set("spark.scheduler.mode", "FAIR")
0197 val sc = new SparkContext(conf)
0198 {% endhighlight %}
0199 
0200 ## Fair Scheduler Pools
0201 
0202 The fair scheduler also supports grouping jobs into _pools_, and setting different scheduling options
0203 (e.g. weight) for each pool. This can be useful to create a "high-priority" pool for more important jobs,
0204 for example, or to group the jobs of each user together and give _users_ equal shares regardless of how
0205 many concurrent jobs they have instead of giving _jobs_ equal shares. This approach is modeled after the
0206 [Hadoop Fair Scheduler](http://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-site/FairScheduler.html).
0207 
0208 Without any intervention, newly submitted jobs go into a _default pool_, but jobs' pools can be set by
0209 adding the `spark.scheduler.pool` "local property" to the SparkContext in the thread that's submitting them.
0210 This is done as follows:
0211 
0212 {% highlight scala %}
0213 // Assuming sc is your SparkContext variable
0214 sc.setLocalProperty("spark.scheduler.pool", "pool1")
0215 {% endhighlight %}
0216 
0217 After setting this local property, _all_ jobs submitted within this thread (by calls in this thread
0218 to `RDD.save`, `count`, `collect`, etc) will use this pool name. The setting is per-thread to make
0219 it easy to have a thread run multiple jobs on behalf of the same user. If you'd like to clear the
0220 pool that a thread is associated with, simply call:
0221 
0222 {% highlight scala %}
0223 sc.setLocalProperty("spark.scheduler.pool", null)
0224 {% endhighlight %}
0225 
0226 ## Default Behavior of Pools
0227 
0228 By default, each pool gets an equal share of the cluster (also equal in share to each job in the default
0229 pool), but inside each pool, jobs run in FIFO order. For example, if you create one pool per user, this
0230 means that each user will get an equal share of the cluster, and that each user's queries will run in
0231 order instead of later queries taking resources from that user's earlier ones.
0232 
0233 ## Configuring Pool Properties
0234 
0235 Specific pools' properties can also be modified through a configuration file. Each pool supports three
0236 properties:
0237 
0238 * `schedulingMode`: This can be FIFO or FAIR, to control whether jobs within the pool queue up behind
0239   each other (the default) or share the pool's resources fairly.
0240 * `weight`: This controls the pool's share of the cluster relative to other pools. By default, all pools
0241   have a weight of 1. If you give a specific pool a weight of 2, for example, it will get 2x more
0242   resources as other active pools. Setting a high weight such as 1000 also makes it possible to implement
0243   _priority_ between pools---in essence, the weight-1000 pool will always get to launch tasks first
0244   whenever it has jobs active.
0245 * `minShare`: Apart from an overall weight, each pool can be given a _minimum shares_ (as a number of
0246   CPU cores) that the administrator would like it to have. The fair scheduler always attempts to meet
0247   all active pools' minimum shares before redistributing extra resources according to the weights.
0248   The `minShare` property can, therefore, be another way to ensure that a pool can always get up to a
0249   certain number of resources (e.g. 10 cores) quickly without giving it a high priority for the rest
0250   of the cluster. By default, each pool's `minShare` is 0.
0251 
0252 The pool properties can be set by creating an XML file, similar to `conf/fairscheduler.xml.template`,
0253 and either putting a file named `fairscheduler.xml` on the classpath, or setting `spark.scheduler.allocation.file` property in your
0254 [SparkConf](configuration.html#spark-properties).
0255 
0256 {% highlight scala %}
0257 conf.set("spark.scheduler.allocation.file", "/path/to/file")
0258 {% endhighlight %}
0259 
0260 The format of the XML file is simply a `<pool>` element for each pool, with different elements
0261 within it for the various settings. For example:
0262 
0263 {% highlight xml %}
0264 <?xml version="1.0"?>
0265 <allocations>
0266   <pool name="production">
0267     <schedulingMode>FAIR</schedulingMode>
0268     <weight>1</weight>
0269     <minShare>2</minShare>
0270   </pool>
0271   <pool name="test">
0272     <schedulingMode>FIFO</schedulingMode>
0273     <weight>2</weight>
0274     <minShare>3</minShare>
0275   </pool>
0276 </allocations>
0277 {% endhighlight %}
0278 
0279 A full example is also available in `conf/fairscheduler.xml.template`. Note that any pools not
0280 configured in the XML file will simply get default values for all settings (scheduling mode FIFO,
0281 weight 1, and minShare 0).
0282 
0283 ## Scheduling using JDBC Connections
0284 To set a [Fair Scheduler](job-scheduling.html#fair-scheduler-pools) pool for a JDBC client session,
0285 users can set the `spark.sql.thriftserver.scheduler.pool` variable:
0286 
0287 {% highlight SQL %}
0288 SET spark.sql.thriftserver.scheduler.pool=accounting;
0289 {% endhighlight %}
0290 
0291 ## Concurrent Jobs in PySpark
0292 
0293 PySpark, by default, does not support to synchronize PVM threads with JVM threads and 
0294 launching multiple jobs in multiple PVM threads does not guarantee to launch each job
0295 in each corresponding JVM thread. Due to this limitation, it is unable to set a different job group
0296 via `sc.setJobGroup` in a separate PVM thread, which also disallows to cancel the job via `sc.cancelJobGroup`
0297 later.
0298 
0299 In order to synchronize PVM threads with JVM threads, you should set `PYSPARK_PIN_THREAD` environment variable
0300 to `true`. This pinned thread mode allows one PVM thread has one corresponding JVM thread.
0301 
0302 However, currently it cannot inherit the local properties from the parent thread although it isolates
0303 each thread with its own local properties. To work around this, you should manually copy and set the
0304 local properties from the parent thread to the child thread when you create another thread in PVM.
0305 
0306 Note that `PYSPARK_PIN_THREAD` is currently experimental and not recommended for use in production.
0307