0001 ---
0002 layout: global
0003 title: RDD Programming Guide
0004 description: Spark SPARK_VERSION_SHORT programming guide in Java, Scala and Python
0005 license: |
0006 Licensed to the Apache Software Foundation (ASF) under one or more
0007 contributor license agreements. See the NOTICE file distributed with
0008 this work for additional information regarding copyright ownership.
0009 The ASF licenses this file to You under the Apache License, Version 2.0
0010 (the "License"); you may not use this file except in compliance with
0011 the License. You may obtain a copy of the License at
0012
0013 http://www.apache.org/licenses/LICENSE-2.0
0014
0015 Unless required by applicable law or agreed to in writing, software
0016 distributed under the License is distributed on an "AS IS" BASIS,
0017 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0018 See the License for the specific language governing permissions and
0019 limitations under the License.
0020 ---
0021
0022 * This will become a table of contents (this text will be scraped).
0023 {:toc}
0024
0025
0026 # Overview
0027
0028 At a high level, every Spark application consists of a *driver program* that runs the user's `main` function and executes various *parallel operations* on a cluster. The main abstraction Spark provides is a *resilient distributed dataset* (RDD), which is a collection of elements partitioned across the nodes of the cluster that can be operated on in parallel. RDDs are created by starting with a file in the Hadoop file system (or any other Hadoop-supported file system), or an existing Scala collection in the driver program, and transforming it. Users may also ask Spark to *persist* an RDD in memory, allowing it to be reused efficiently across parallel operations. Finally, RDDs automatically recover from node failures.
0029
0030 A second abstraction in Spark is *shared variables* that can be used in parallel operations. By default, when Spark runs a function in parallel as a set of tasks on different nodes, it ships a copy of each variable used in the function to each task. Sometimes, a variable needs to be shared across tasks, or between tasks and the driver program. Spark supports two types of shared variables: *broadcast variables*, which can be used to cache a value in memory on all nodes, and *accumulators*, which are variables that are only "added" to, such as counters and sums.
0031
0032 This guide shows each of these features in each of Spark's supported languages. It is easiest to follow
0033 along with if you launch Spark's interactive shell -- either `bin/spark-shell` for the Scala shell or
0034 `bin/pyspark` for the Python one.
0035
0036 # Linking with Spark
0037
0038 <div class="codetabs">
0039
0040 <div data-lang="scala" markdown="1">
0041
0042 Spark {{site.SPARK_VERSION}} is built and distributed to work with Scala {{site.SCALA_BINARY_VERSION}}
0043 by default. (Spark can be built to work with other versions of Scala, too.) To write
0044 applications in Scala, you will need to use a compatible Scala version (e.g. {{site.SCALA_BINARY_VERSION}}.X).
0045
0046 To write a Spark application, you need to add a Maven dependency on Spark. Spark is available through Maven Central at:
0047
0048 groupId = org.apache.spark
0049 artifactId = spark-core_{{site.SCALA_BINARY_VERSION}}
0050 version = {{site.SPARK_VERSION}}
0051
0052 In addition, if you wish to access an HDFS cluster, you need to add a dependency on
0053 `hadoop-client` for your version of HDFS.
0054
0055 groupId = org.apache.hadoop
0056 artifactId = hadoop-client
0057 version = <your-hdfs-version>
0058
0059 Finally, you need to import some Spark classes into your program. Add the following lines:
0060
0061 {% highlight scala %}
0062 import org.apache.spark.SparkContext
0063 import org.apache.spark.SparkConf
0064 {% endhighlight %}
0065
0066 (Before Spark 1.3.0, you need to explicitly `import org.apache.spark.SparkContext._` to enable essential implicit conversions.)
0067
0068 </div>
0069
0070 <div data-lang="java" markdown="1">
0071
0072 Spark {{site.SPARK_VERSION}} supports
0073 [lambda expressions](http://docs.oracle.com/javase/tutorial/java/javaOO/lambdaexpressions.html)
0074 for concisely writing functions, otherwise you can use the classes in the
0075 [org.apache.spark.api.java.function](api/java/index.html?org/apache/spark/api/java/function/package-summary.html) package.
0076
0077 Note that support for Java 7 was removed in Spark 2.2.0.
0078
0079 To write a Spark application in Java, you need to add a dependency on Spark. Spark is available through Maven Central at:
0080
0081 groupId = org.apache.spark
0082 artifactId = spark-core_{{site.SCALA_BINARY_VERSION}}
0083 version = {{site.SPARK_VERSION}}
0084
0085 In addition, if you wish to access an HDFS cluster, you need to add a dependency on
0086 `hadoop-client` for your version of HDFS.
0087
0088 groupId = org.apache.hadoop
0089 artifactId = hadoop-client
0090 version = <your-hdfs-version>
0091
0092 Finally, you need to import some Spark classes into your program. Add the following lines:
0093
0094 {% highlight java %}
0095 import org.apache.spark.api.java.JavaSparkContext;
0096 import org.apache.spark.api.java.JavaRDD;
0097 import org.apache.spark.SparkConf;
0098 {% endhighlight %}
0099
0100 </div>
0101
0102 <div data-lang="python" markdown="1">
0103
0104 Spark {{site.SPARK_VERSION}} works with Python 2.7+ or Python 3.4+. It can use the standard CPython interpreter,
0105 so C libraries like NumPy can be used. It also works with PyPy 2.3+.
0106
0107 Note that Python 2 support is deprecated as of Spark 3.0.0.
0108
0109 Spark applications in Python can either be run with the `bin/spark-submit` script which includes Spark at runtime, or by including it in your setup.py as:
0110
0111 {% highlight python %}
0112 install_requires=[
0113 'pyspark=={site.SPARK_VERSION}'
0114 ]
0115 {% endhighlight %}
0116
0117
0118 To run Spark applications in Python without pip installing PySpark, use the `bin/spark-submit` script located in the Spark directory.
0119 This script will load Spark's Java/Scala libraries and allow you to submit applications to a cluster.
0120 You can also use `bin/pyspark` to launch an interactive Python shell.
0121
0122 If you wish to access HDFS data, you need to use a build of PySpark linking
0123 to your version of HDFS.
0124 [Prebuilt packages](https://spark.apache.org/downloads.html) are also available on the Spark homepage
0125 for common HDFS versions.
0126
0127 Finally, you need to import some Spark classes into your program. Add the following line:
0128
0129 {% highlight python %}
0130 from pyspark import SparkContext, SparkConf
0131 {% endhighlight %}
0132
0133 PySpark requires the same minor version of Python in both driver and workers. It uses the default python version in PATH,
0134 you can specify which version of Python you want to use by `PYSPARK_PYTHON`, for example:
0135
0136 {% highlight bash %}
0137 $ PYSPARK_PYTHON=python3.4 bin/pyspark
0138 $ PYSPARK_PYTHON=/opt/pypy-2.5/bin/pypy bin/spark-submit examples/src/main/python/pi.py
0139 {% endhighlight %}
0140
0141 </div>
0142
0143 </div>
0144
0145
0146 # Initializing Spark
0147
0148 <div class="codetabs">
0149
0150 <div data-lang="scala" markdown="1">
0151
0152 The first thing a Spark program must do is to create a [SparkContext](api/scala/org/apache/spark/SparkContext.html) object, which tells Spark
0153 how to access a cluster. To create a `SparkContext` you first need to build a [SparkConf](api/scala/org/apache/spark/SparkConf.html) object
0154 that contains information about your application.
0155
0156 Only one SparkContext should be active per JVM. You must `stop()` the active SparkContext before creating a new one.
0157
0158 {% highlight scala %}
0159 val conf = new SparkConf().setAppName(appName).setMaster(master)
0160 new SparkContext(conf)
0161 {% endhighlight %}
0162
0163 </div>
0164
0165 <div data-lang="java" markdown="1">
0166
0167 The first thing a Spark program must do is to create a [JavaSparkContext](api/java/index.html?org/apache/spark/api/java/JavaSparkContext.html) object, which tells Spark
0168 how to access a cluster. To create a `SparkContext` you first need to build a [SparkConf](api/java/index.html?org/apache/spark/SparkConf.html) object
0169 that contains information about your application.
0170
0171 {% highlight java %}
0172 SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
0173 JavaSparkContext sc = new JavaSparkContext(conf);
0174 {% endhighlight %}
0175
0176 </div>
0177
0178 <div data-lang="python" markdown="1">
0179
0180 The first thing a Spark program must do is to create a [SparkContext](api/python/pyspark.html#pyspark.SparkContext) object, which tells Spark
0181 how to access a cluster. To create a `SparkContext` you first need to build a [SparkConf](api/python/pyspark.html#pyspark.SparkConf) object
0182 that contains information about your application.
0183
0184 {% highlight python %}
0185 conf = SparkConf().setAppName(appName).setMaster(master)
0186 sc = SparkContext(conf=conf)
0187 {% endhighlight %}
0188
0189 </div>
0190
0191 </div>
0192
0193 The `appName` parameter is a name for your application to show on the cluster UI.
0194 `master` is a [Spark, Mesos or YARN cluster URL](submitting-applications.html#master-urls),
0195 or a special "local" string to run in local mode.
0196 In practice, when running on a cluster, you will not want to hardcode `master` in the program,
0197 but rather [launch the application with `spark-submit`](submitting-applications.html) and
0198 receive it there. However, for local testing and unit tests, you can pass "local" to run Spark
0199 in-process.
0200
0201
0202 ## Using the Shell
0203
0204 <div class="codetabs">
0205
0206 <div data-lang="scala" markdown="1">
0207
0208 In the Spark shell, a special interpreter-aware SparkContext is already created for you, in the
0209 variable called `sc`. Making your own SparkContext will not work. You can set which master the
0210 context connects to using the `--master` argument, and you can add JARs to the classpath
0211 by passing a comma-separated list to the `--jars` argument. You can also add dependencies
0212 (e.g. Spark Packages) to your shell session by supplying a comma-separated list of Maven coordinates
0213 to the `--packages` argument. Any additional repositories where dependencies might exist (e.g. Sonatype)
0214 can be passed to the `--repositories` argument. For example, to run `bin/spark-shell` on exactly
0215 four cores, use:
0216
0217 {% highlight bash %}
0218 $ ./bin/spark-shell --master local[4]
0219 {% endhighlight %}
0220
0221 Or, to also add `code.jar` to its classpath, use:
0222
0223 {% highlight bash %}
0224 $ ./bin/spark-shell --master local[4] --jars code.jar
0225 {% endhighlight %}
0226
0227 To include a dependency using Maven coordinates:
0228
0229 {% highlight bash %}
0230 $ ./bin/spark-shell --master local[4] --packages "org.example:example:0.1"
0231 {% endhighlight %}
0232
0233 For a complete list of options, run `spark-shell --help`. Behind the scenes,
0234 `spark-shell` invokes the more general [`spark-submit` script](submitting-applications.html).
0235
0236 </div>
0237
0238 <div data-lang="python" markdown="1">
0239
0240 In the PySpark shell, a special interpreter-aware SparkContext is already created for you, in the
0241 variable called `sc`. Making your own SparkContext will not work. You can set which master the
0242 context connects to using the `--master` argument, and you can add Python .zip, .egg or .py files
0243 to the runtime path by passing a comma-separated list to `--py-files`. You can also add dependencies
0244 (e.g. Spark Packages) to your shell session by supplying a comma-separated list of Maven coordinates
0245 to the `--packages` argument. Any additional repositories where dependencies might exist (e.g. Sonatype)
0246 can be passed to the `--repositories` argument. Any Python dependencies a Spark package has (listed in
0247 the requirements.txt of that package) must be manually installed using `pip` when necessary.
0248 For example, to run `bin/pyspark` on exactly four cores, use:
0249
0250 {% highlight bash %}
0251 $ ./bin/pyspark --master local[4]
0252 {% endhighlight %}
0253
0254 Or, to also add `code.py` to the search path (in order to later be able to `import code`), use:
0255
0256 {% highlight bash %}
0257 $ ./bin/pyspark --master local[4] --py-files code.py
0258 {% endhighlight %}
0259
0260 For a complete list of options, run `pyspark --help`. Behind the scenes,
0261 `pyspark` invokes the more general [`spark-submit` script](submitting-applications.html).
0262
0263 It is also possible to launch the PySpark shell in [IPython](http://ipython.org), the
0264 enhanced Python interpreter. PySpark works with IPython 1.0.0 and later. To
0265 use IPython, set the `PYSPARK_DRIVER_PYTHON` variable to `ipython` when running `bin/pyspark`:
0266
0267 {% highlight bash %}
0268 $ PYSPARK_DRIVER_PYTHON=ipython ./bin/pyspark
0269 {% endhighlight %}
0270
0271 To use the Jupyter notebook (previously known as the IPython notebook),
0272
0273 {% highlight bash %}
0274 $ PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS=notebook ./bin/pyspark
0275 {% endhighlight %}
0276
0277 You can customize the `ipython` or `jupyter` commands by setting `PYSPARK_DRIVER_PYTHON_OPTS`.
0278
0279 After the Jupyter Notebook server is launched, you can create a new "Python 2" notebook from
0280 the "Files" tab. Inside the notebook, you can input the command `%pylab inline` as part of
0281 your notebook before you start to try Spark from the Jupyter notebook.
0282
0283 </div>
0284
0285 </div>
0286
0287 # Resilient Distributed Datasets (RDDs)
0288
0289 Spark revolves around the concept of a _resilient distributed dataset_ (RDD), which is a fault-tolerant collection of elements that can be operated on in parallel. There are two ways to create RDDs: *parallelizing*
0290 an existing collection in your driver program, or referencing a dataset in an external storage system, such as a
0291 shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat.
0292
0293 ## Parallelized Collections
0294
0295 <div class="codetabs">
0296
0297 <div data-lang="scala" markdown="1">
0298
0299 Parallelized collections are created by calling `SparkContext`'s `parallelize` method on an existing collection in your driver program (a Scala `Seq`). The elements of the collection are copied to form a distributed dataset that can be operated on in parallel. For example, here is how to create a parallelized collection holding the numbers 1 to 5:
0300
0301 {% highlight scala %}
0302 val data = Array(1, 2, 3, 4, 5)
0303 val distData = sc.parallelize(data)
0304 {% endhighlight %}
0305
0306 Once created, the distributed dataset (`distData`) can be operated on in parallel. For example, we might call `distData.reduce((a, b) => a + b)` to add up the elements of the array. We describe operations on distributed datasets later on.
0307
0308 </div>
0309
0310 <div data-lang="java" markdown="1">
0311
0312 Parallelized collections are created by calling `JavaSparkContext`'s `parallelize` method on an existing `Collection` in your driver program. The elements of the collection are copied to form a distributed dataset that can be operated on in parallel. For example, here is how to create a parallelized collection holding the numbers 1 to 5:
0313
0314 {% highlight java %}
0315 List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
0316 JavaRDD<Integer> distData = sc.parallelize(data);
0317 {% endhighlight %}
0318
0319 Once created, the distributed dataset (`distData`) can be operated on in parallel. For example, we might call `distData.reduce((a, b) -> a + b)` to add up the elements of the list.
0320 We describe operations on distributed datasets later on.
0321
0322 </div>
0323
0324 <div data-lang="python" markdown="1">
0325
0326 Parallelized collections are created by calling `SparkContext`'s `parallelize` method on an existing iterable or collection in your driver program. The elements of the collection are copied to form a distributed dataset that can be operated on in parallel. For example, here is how to create a parallelized collection holding the numbers 1 to 5:
0327
0328 {% highlight python %}
0329 data = [1, 2, 3, 4, 5]
0330 distData = sc.parallelize(data)
0331 {% endhighlight %}
0332
0333 Once created, the distributed dataset (`distData`) can be operated on in parallel. For example, we can call `distData.reduce(lambda a, b: a + b)` to add up the elements of the list.
0334 We describe operations on distributed datasets later on.
0335
0336 </div>
0337
0338 </div>
0339
0340 One important parameter for parallel collections is the number of *partitions* to cut the dataset into. Spark will run one task for each partition of the cluster. Typically you want 2-4 partitions for each CPU in your cluster. Normally, Spark tries to set the number of partitions automatically based on your cluster. However, you can also set it manually by passing it as a second parameter to `parallelize` (e.g. `sc.parallelize(data, 10)`). Note: some places in the code use the term slices (a synonym for partitions) to maintain backward compatibility.
0341
0342 ## External Datasets
0343
0344 <div class="codetabs">
0345
0346 <div data-lang="scala" markdown="1">
0347
0348 Spark can create distributed datasets from any storage source supported by Hadoop, including your local file system, HDFS, Cassandra, HBase, [Amazon S3](http://wiki.apache.org/hadoop/AmazonS3), etc. Spark supports text files, [SequenceFiles](https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/mapred/SequenceFileInputFormat.html), and any other Hadoop [InputFormat](http://hadoop.apache.org/docs/stable/api/org/apache/hadoop/mapred/InputFormat.html).
0349
0350 Text file RDDs can be created using `SparkContext`'s `textFile` method. This method takes a URI for the file (either a local path on the machine, or a `hdfs://`, `s3a://`, etc URI) and reads it as a collection of lines. Here is an example invocation:
0351
0352 {% highlight scala %}
0353 scala> val distFile = sc.textFile("data.txt")
0354 distFile: org.apache.spark.rdd.RDD[String] = data.txt MapPartitionsRDD[10] at textFile at <console>:26
0355 {% endhighlight %}
0356
0357 Once created, `distFile` can be acted on by dataset operations. For example, we can add up the sizes of all the lines using the `map` and `reduce` operations as follows: `distFile.map(s => s.length).reduce((a, b) => a + b)`.
0358
0359 Some notes on reading files with Spark:
0360
0361 * If using a path on the local filesystem, the file must also be accessible at the same path on worker nodes. Either copy the file to all workers or use a network-mounted shared file system.
0362
0363 * All of Spark's file-based input methods, including `textFile`, support running on directories, compressed files, and wildcards as well. For example, you can use `textFile("/my/directory")`, `textFile("/my/directory/*.txt")`, and `textFile("/my/directory/*.gz")`.
0364
0365 * The `textFile` method also takes an optional second argument for controlling the number of partitions of the file. By default, Spark creates one partition for each block of the file (blocks being 128MB by default in HDFS), but you can also ask for a higher number of partitions by passing a larger value. Note that you cannot have fewer partitions than blocks.
0366
0367 Apart from text files, Spark's Scala API also supports several other data formats:
0368
0369 * `SparkContext.wholeTextFiles` lets you read a directory containing multiple small text files, and returns each of them as (filename, content) pairs. This is in contrast with `textFile`, which would return one record per line in each file. Partitioning is determined by data locality which, in some cases, may result in too few partitions. For those cases, `wholeTextFiles` provides an optional second argument for controlling the minimal number of partitions.
0370
0371 * For [SequenceFiles](https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/mapred/SequenceFileInputFormat.html), use SparkContext's `sequenceFile[K, V]` method where `K` and `V` are the types of key and values in the file. These should be subclasses of Hadoop's [Writable](https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/io/Writable.html) interface, like [IntWritable](https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/io/IntWritable.html) and [Text](https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/io/Text.html). In addition, Spark allows you to specify native types for a few common Writables; for example, `sequenceFile[Int, String]` will automatically read IntWritables and Texts.
0372
0373 * For other Hadoop InputFormats, you can use the `SparkContext.hadoopRDD` method, which takes an arbitrary `JobConf` and input format class, key class and value class. Set these the same way you would for a Hadoop job with your input source. You can also use `SparkContext.newAPIHadoopRDD` for InputFormats based on the "new" MapReduce API (`org.apache.hadoop.mapreduce`).
0374
0375 * `RDD.saveAsObjectFile` and `SparkContext.objectFile` support saving an RDD in a simple format consisting of serialized Java objects. While this is not as efficient as specialized formats like Avro, it offers an easy way to save any RDD.
0376
0377 </div>
0378
0379 <div data-lang="java" markdown="1">
0380
0381 Spark can create distributed datasets from any storage source supported by Hadoop, including your local file system, HDFS, Cassandra, HBase, [Amazon S3](http://wiki.apache.org/hadoop/AmazonS3), etc. Spark supports text files, [SequenceFiles](https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/mapred/SequenceFileInputFormat.html), and any other Hadoop [InputFormat](http://hadoop.apache.org/docs/stable/api/org/apache/hadoop/mapred/InputFormat.html).
0382
0383 Text file RDDs can be created using `SparkContext`'s `textFile` method. This method takes a URI for the file (either a local path on the machine, or a `hdfs://`, `s3a://`, etc URI) and reads it as a collection of lines. Here is an example invocation:
0384
0385 {% highlight java %}
0386 JavaRDD<String> distFile = sc.textFile("data.txt");
0387 {% endhighlight %}
0388
0389 Once created, `distFile` can be acted on by dataset operations. For example, we can add up the sizes of all the lines using the `map` and `reduce` operations as follows: `distFile.map(s -> s.length()).reduce((a, b) -> a + b)`.
0390
0391 Some notes on reading files with Spark:
0392
0393 * If using a path on the local filesystem, the file must also be accessible at the same path on worker nodes. Either copy the file to all workers or use a network-mounted shared file system.
0394
0395 * All of Spark's file-based input methods, including `textFile`, support running on directories, compressed files, and wildcards as well. For example, you can use `textFile("/my/directory")`, `textFile("/my/directory/*.txt")`, and `textFile("/my/directory/*.gz")`.
0396
0397 * The `textFile` method also takes an optional second argument for controlling the number of partitions of the file. By default, Spark creates one partition for each block of the file (blocks being 128MB by default in HDFS), but you can also ask for a higher number of partitions by passing a larger value. Note that you cannot have fewer partitions than blocks.
0398
0399 Apart from text files, Spark's Java API also supports several other data formats:
0400
0401 * `JavaSparkContext.wholeTextFiles` lets you read a directory containing multiple small text files, and returns each of them as (filename, content) pairs. This is in contrast with `textFile`, which would return one record per line in each file.
0402
0403 * For [SequenceFiles](https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/mapred/SequenceFileInputFormat.html), use SparkContext's `sequenceFile[K, V]` method where `K` and `V` are the types of key and values in the file. These should be subclasses of Hadoop's [Writable](https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/io/Writable.html) interface, like [IntWritable](https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/io/IntWritable.html) and [Text](https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/io/Text.html).
0404
0405 * For other Hadoop InputFormats, you can use the `JavaSparkContext.hadoopRDD` method, which takes an arbitrary `JobConf` and input format class, key class and value class. Set these the same way you would for a Hadoop job with your input source. You can also use `JavaSparkContext.newAPIHadoopRDD` for InputFormats based on the "new" MapReduce API (`org.apache.hadoop.mapreduce`).
0406
0407 * `JavaRDD.saveAsObjectFile` and `JavaSparkContext.objectFile` support saving an RDD in a simple format consisting of serialized Java objects. While this is not as efficient as specialized formats like Avro, it offers an easy way to save any RDD.
0408
0409 </div>
0410
0411 <div data-lang="python" markdown="1">
0412
0413 PySpark can create distributed datasets from any storage source supported by Hadoop, including your local file system, HDFS, Cassandra, HBase, [Amazon S3](http://wiki.apache.org/hadoop/AmazonS3), etc. Spark supports text files, [SequenceFiles](https://hadoop.apache.org/docs/stable/api/org/apache/hadoop/mapred/SequenceFileInputFormat.html), and any other Hadoop [InputFormat](http://hadoop.apache.org/docs/stable/api/org/apache/hadoop/mapred/InputFormat.html).
0414
0415 Text file RDDs can be created using `SparkContext`'s `textFile` method. This method takes a URI for the file (either a local path on the machine, or a `hdfs://`, `s3a://`, etc URI) and reads it as a collection of lines. Here is an example invocation:
0416
0417 {% highlight python %}
0418 >>> distFile = sc.textFile("data.txt")
0419 {% endhighlight %}
0420
0421 Once created, `distFile` can be acted on by dataset operations. For example, we can add up the sizes of all the lines using the `map` and `reduce` operations as follows: `distFile.map(lambda s: len(s)).reduce(lambda a, b: a + b)`.
0422
0423 Some notes on reading files with Spark:
0424
0425 * If using a path on the local filesystem, the file must also be accessible at the same path on worker nodes. Either copy the file to all workers or use a network-mounted shared file system.
0426
0427 * All of Spark's file-based input methods, including `textFile`, support running on directories, compressed files, and wildcards as well. For example, you can use `textFile("/my/directory")`, `textFile("/my/directory/*.txt")`, and `textFile("/my/directory/*.gz")`.
0428
0429 * The `textFile` method also takes an optional second argument for controlling the number of partitions of the file. By default, Spark creates one partition for each block of the file (blocks being 128MB by default in HDFS), but you can also ask for a higher number of partitions by passing a larger value. Note that you cannot have fewer partitions than blocks.
0430
0431 Apart from text files, Spark's Python API also supports several other data formats:
0432
0433 * `SparkContext.wholeTextFiles` lets you read a directory containing multiple small text files, and returns each of them as (filename, content) pairs. This is in contrast with `textFile`, which would return one record per line in each file.
0434
0435 * `RDD.saveAsPickleFile` and `SparkContext.pickleFile` support saving an RDD in a simple format consisting of pickled Python objects. Batching is used on pickle serialization, with default batch size 10.
0436
0437 * SequenceFile and Hadoop Input/Output Formats
0438
0439 **Note** this feature is currently marked ```Experimental``` and is intended for advanced users. It may be replaced in future with read/write support based on Spark SQL, in which case Spark SQL is the preferred approach.
0440
0441 **Writable Support**
0442
0443 PySpark SequenceFile support loads an RDD of key-value pairs within Java, converts Writables to base Java types, and pickles the
0444 resulting Java objects using [Pyrolite](https://github.com/irmen/Pyrolite/). When saving an RDD of key-value pairs to SequenceFile,
0445 PySpark does the reverse. It unpickles Python objects into Java objects and then converts them to Writables. The following
0446 Writables are automatically converted:
0447
0448 <table class="table">
0449 <tr><th>Writable Type</th><th>Python Type</th></tr>
0450 <tr><td>Text</td><td>unicode str</td></tr>
0451 <tr><td>IntWritable</td><td>int</td></tr>
0452 <tr><td>FloatWritable</td><td>float</td></tr>
0453 <tr><td>DoubleWritable</td><td>float</td></tr>
0454 <tr><td>BooleanWritable</td><td>bool</td></tr>
0455 <tr><td>BytesWritable</td><td>bytearray</td></tr>
0456 <tr><td>NullWritable</td><td>None</td></tr>
0457 <tr><td>MapWritable</td><td>dict</td></tr>
0458 </table>
0459
0460 Arrays are not handled out-of-the-box. Users need to specify custom `ArrayWritable` subtypes when reading or writing. When writing,
0461 users also need to specify custom converters that convert arrays to custom `ArrayWritable` subtypes. When reading, the default
0462 converter will convert custom `ArrayWritable` subtypes to Java `Object[]`, which then get pickled to Python tuples. To get
0463 Python `array.array` for arrays of primitive types, users need to specify custom converters.
0464
0465 **Saving and Loading SequenceFiles**
0466
0467 Similarly to text files, SequenceFiles can be saved and loaded by specifying the path. The key and value
0468 classes can be specified, but for standard Writables this is not required.
0469
0470 {% highlight python %}
0471 >>> rdd = sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x))
0472 >>> rdd.saveAsSequenceFile("path/to/file")
0473 >>> sorted(sc.sequenceFile("path/to/file").collect())
0474 [(1, u'a'), (2, u'aa'), (3, u'aaa')]
0475 {% endhighlight %}
0476
0477 **Saving and Loading Other Hadoop Input/Output Formats**
0478
0479 PySpark can also read any Hadoop InputFormat or write any Hadoop OutputFormat, for both 'new' and 'old' Hadoop MapReduce APIs.
0480 If required, a Hadoop configuration can be passed in as a Python dict. Here is an example using the
0481 Elasticsearch ESInputFormat:
0482
0483 {% highlight python %}
0484 $ ./bin/pyspark --jars /path/to/elasticsearch-hadoop.jar
0485 >>> conf = {"es.resource" : "index/type"} # assume Elasticsearch is running on localhost defaults
0486 >>> rdd = sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat",
0487 "org.apache.hadoop.io.NullWritable",
0488 "org.elasticsearch.hadoop.mr.LinkedMapWritable",
0489 conf=conf)
0490 >>> rdd.first() # the result is a MapWritable that is converted to a Python dict
0491 (u'Elasticsearch ID',
0492 {u'field1': True,
0493 u'field2': u'Some Text',
0494 u'field3': 12345})
0495 {% endhighlight %}
0496
0497 Note that, if the InputFormat simply depends on a Hadoop configuration and/or input path, and
0498 the key and value classes can easily be converted according to the above table,
0499 then this approach should work well for such cases.
0500
0501 If you have custom serialized binary data (such as loading data from Cassandra / HBase), then you will first need to
0502 transform that data on the Scala/Java side to something which can be handled by Pyrolite's pickler.
0503 A [Converter](api/scala/org/apache/spark/api/python/Converter.html) trait is provided
0504 for this. Simply extend this trait and implement your transformation code in the ```convert```
0505 method. Remember to ensure that this class, along with any dependencies required to access your ```InputFormat```, are packaged into your Spark job jar and included on the PySpark
0506 classpath.
0507
0508 See the [Python examples]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/python) and
0509 the [Converter examples]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/examples/pythonconverters)
0510 for examples of using Cassandra / HBase ```InputFormat``` and ```OutputFormat``` with custom converters.
0511
0512 </div>
0513 </div>
0514
0515 ## RDD Operations
0516
0517 RDDs support two types of operations: *transformations*, which create a new dataset from an existing one, and *actions*, which return a value to the driver program after running a computation on the dataset. For example, `map` is a transformation that passes each dataset element through a function and returns a new RDD representing the results. On the other hand, `reduce` is an action that aggregates all the elements of the RDD using some function and returns the final result to the driver program (although there is also a parallel `reduceByKey` that returns a distributed dataset).
0518
0519 All transformations in Spark are <i>lazy</i>, in that they do not compute their results right away. Instead, they just remember the transformations applied to some base dataset (e.g. a file). The transformations are only computed when an action requires a result to be returned to the driver program. This design enables Spark to run more efficiently. For example, we can realize that a dataset created through `map` will be used in a `reduce` and return only the result of the `reduce` to the driver, rather than the larger mapped dataset.
0520
0521 By default, each transformed RDD may be recomputed each time you run an action on it. However, you may also *persist* an RDD in memory using the `persist` (or `cache`) method, in which case Spark will keep the elements around on the cluster for much faster access the next time you query it. There is also support for persisting RDDs on disk, or replicated across multiple nodes.
0522
0523 ### Basics
0524
0525 <div class="codetabs">
0526
0527 <div data-lang="scala" markdown="1">
0528
0529 To illustrate RDD basics, consider the simple program below:
0530
0531 {% highlight scala %}
0532 val lines = sc.textFile("data.txt")
0533 val lineLengths = lines.map(s => s.length)
0534 val totalLength = lineLengths.reduce((a, b) => a + b)
0535 {% endhighlight %}
0536
0537 The first line defines a base RDD from an external file. This dataset is not loaded in memory or
0538 otherwise acted on: `lines` is merely a pointer to the file.
0539 The second line defines `lineLengths` as the result of a `map` transformation. Again, `lineLengths`
0540 is *not* immediately computed, due to laziness.
0541 Finally, we run `reduce`, which is an action. At this point Spark breaks the computation into tasks
0542 to run on separate machines, and each machine runs both its part of the map and a local reduction,
0543 returning only its answer to the driver program.
0544
0545 If we also wanted to use `lineLengths` again later, we could add:
0546
0547 {% highlight scala %}
0548 lineLengths.persist()
0549 {% endhighlight %}
0550
0551 before the `reduce`, which would cause `lineLengths` to be saved in memory after the first time it is computed.
0552
0553 </div>
0554
0555 <div data-lang="java" markdown="1">
0556
0557 To illustrate RDD basics, consider the simple program below:
0558
0559 {% highlight java %}
0560 JavaRDD<String> lines = sc.textFile("data.txt");
0561 JavaRDD<Integer> lineLengths = lines.map(s -> s.length());
0562 int totalLength = lineLengths.reduce((a, b) -> a + b);
0563 {% endhighlight %}
0564
0565 The first line defines a base RDD from an external file. This dataset is not loaded in memory or
0566 otherwise acted on: `lines` is merely a pointer to the file.
0567 The second line defines `lineLengths` as the result of a `map` transformation. Again, `lineLengths`
0568 is *not* immediately computed, due to laziness.
0569 Finally, we run `reduce`, which is an action. At this point Spark breaks the computation into tasks
0570 to run on separate machines, and each machine runs both its part of the map and a local reduction,
0571 returning only its answer to the driver program.
0572
0573 If we also wanted to use `lineLengths` again later, we could add:
0574
0575 {% highlight java %}
0576 lineLengths.persist(StorageLevel.MEMORY_ONLY());
0577 {% endhighlight %}
0578
0579 before the `reduce`, which would cause `lineLengths` to be saved in memory after the first time it is computed.
0580
0581 </div>
0582
0583 <div data-lang="python" markdown="1">
0584
0585 To illustrate RDD basics, consider the simple program below:
0586
0587 {% highlight python %}
0588 lines = sc.textFile("data.txt")
0589 lineLengths = lines.map(lambda s: len(s))
0590 totalLength = lineLengths.reduce(lambda a, b: a + b)
0591 {% endhighlight %}
0592
0593 The first line defines a base RDD from an external file. This dataset is not loaded in memory or
0594 otherwise acted on: `lines` is merely a pointer to the file.
0595 The second line defines `lineLengths` as the result of a `map` transformation. Again, `lineLengths`
0596 is *not* immediately computed, due to laziness.
0597 Finally, we run `reduce`, which is an action. At this point Spark breaks the computation into tasks
0598 to run on separate machines, and each machine runs both its part of the map and a local reduction,
0599 returning only its answer to the driver program.
0600
0601 If we also wanted to use `lineLengths` again later, we could add:
0602
0603 {% highlight python %}
0604 lineLengths.persist()
0605 {% endhighlight %}
0606
0607 before the `reduce`, which would cause `lineLengths` to be saved in memory after the first time it is computed.
0608
0609 </div>
0610
0611 </div>
0612
0613 ### Passing Functions to Spark
0614
0615 <div class="codetabs">
0616
0617 <div data-lang="scala" markdown="1">
0618
0619 Spark's API relies heavily on passing functions in the driver program to run on the cluster.
0620 There are two recommended ways to do this:
0621
0622 * [Anonymous function syntax](http://docs.scala-lang.org/tour/basics.html#functions),
0623 which can be used for short pieces of code.
0624 * Static methods in a global singleton object. For example, you can define `object MyFunctions` and then
0625 pass `MyFunctions.func1`, as follows:
0626
0627 {% highlight scala %}
0628 object MyFunctions {
0629 def func1(s: String): String = { ... }
0630 }
0631
0632 myRdd.map(MyFunctions.func1)
0633 {% endhighlight %}
0634
0635 Note that while it is also possible to pass a reference to a method in a class instance (as opposed to
0636 a singleton object), this requires sending the object that contains that class along with the method.
0637 For example, consider:
0638
0639 {% highlight scala %}
0640 class MyClass {
0641 def func1(s: String): String = { ... }
0642 def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) }
0643 }
0644 {% endhighlight %}
0645
0646 Here, if we create a new `MyClass` instance and call `doStuff` on it, the `map` inside there references the
0647 `func1` method *of that `MyClass` instance*, so the whole object needs to be sent to the cluster. It is
0648 similar to writing `rdd.map(x => this.func1(x))`.
0649
0650 In a similar way, accessing fields of the outer object will reference the whole object:
0651
0652 {% highlight scala %}
0653 class MyClass {
0654 val field = "Hello"
0655 def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(x => field + x) }
0656 }
0657 {% endhighlight %}
0658
0659 is equivalent to writing `rdd.map(x => this.field + x)`, which references all of `this`. To avoid this
0660 issue, the simplest way is to copy `field` into a local variable instead of accessing it externally:
0661
0662 {% highlight scala %}
0663 def doStuff(rdd: RDD[String]): RDD[String] = {
0664 val field_ = this.field
0665 rdd.map(x => field_ + x)
0666 }
0667 {% endhighlight %}
0668
0669 </div>
0670
0671 <div data-lang="java" markdown="1">
0672
0673 Spark's API relies heavily on passing functions in the driver program to run on the cluster.
0674 In Java, functions are represented by classes implementing the interfaces in the
0675 [org.apache.spark.api.java.function](api/java/index.html?org/apache/spark/api/java/function/package-summary.html) package.
0676 There are two ways to create such functions:
0677
0678 * Implement the Function interfaces in your own class, either as an anonymous inner class or a named one,
0679 and pass an instance of it to Spark.
0680 * Use [lambda expressions](http://docs.oracle.com/javase/tutorial/java/javaOO/lambdaexpressions.html)
0681 to concisely define an implementation.
0682
0683 While much of this guide uses lambda syntax for conciseness, it is easy to use all the same APIs
0684 in long-form. For example, we could have written our code above as follows:
0685
0686 {% highlight java %}
0687 JavaRDD<String> lines = sc.textFile("data.txt");
0688 JavaRDD<Integer> lineLengths = lines.map(new Function<String, Integer>() {
0689 public Integer call(String s) { return s.length(); }
0690 });
0691 int totalLength = lineLengths.reduce(new Function2<Integer, Integer, Integer>() {
0692 public Integer call(Integer a, Integer b) { return a + b; }
0693 });
0694 {% endhighlight %}
0695
0696 Or, if writing the functions inline is unwieldy:
0697
0698 {% highlight java %}
0699 class GetLength implements Function<String, Integer> {
0700 public Integer call(String s) { return s.length(); }
0701 }
0702 class Sum implements Function2<Integer, Integer, Integer> {
0703 public Integer call(Integer a, Integer b) { return a + b; }
0704 }
0705
0706 JavaRDD<String> lines = sc.textFile("data.txt");
0707 JavaRDD<Integer> lineLengths = lines.map(new GetLength());
0708 int totalLength = lineLengths.reduce(new Sum());
0709 {% endhighlight %}
0710
0711 Note that anonymous inner classes in Java can also access variables in the enclosing scope as long
0712 as they are marked `final`. Spark will ship copies of these variables to each worker node as it does
0713 for other languages.
0714
0715 </div>
0716
0717 <div data-lang="python" markdown="1">
0718
0719 Spark's API relies heavily on passing functions in the driver program to run on the cluster.
0720 There are three recommended ways to do this:
0721
0722 * [Lambda expressions](https://docs.python.org/2/tutorial/controlflow.html#lambda-expressions),
0723 for simple functions that can be written as an expression. (Lambdas do not support multi-statement
0724 functions or statements that do not return a value.)
0725 * Local `def`s inside the function calling into Spark, for longer code.
0726 * Top-level functions in a module.
0727
0728 For example, to pass a longer function than can be supported using a `lambda`, consider
0729 the code below:
0730
0731 {% highlight python %}
0732 """MyScript.py"""
0733 if __name__ == "__main__":
0734 def myFunc(s):
0735 words = s.split(" ")
0736 return len(words)
0737
0738 sc = SparkContext(...)
0739 sc.textFile("file.txt").map(myFunc)
0740 {% endhighlight %}
0741
0742 Note that while it is also possible to pass a reference to a method in a class instance (as opposed to
0743 a singleton object), this requires sending the object that contains that class along with the method.
0744 For example, consider:
0745
0746 {% highlight python %}
0747 class MyClass(object):
0748 def func(self, s):
0749 return s
0750 def doStuff(self, rdd):
0751 return rdd.map(self.func)
0752 {% endhighlight %}
0753
0754 Here, if we create a `new MyClass` and call `doStuff` on it, the `map` inside there references the
0755 `func` method *of that `MyClass` instance*, so the whole object needs to be sent to the cluster.
0756
0757 In a similar way, accessing fields of the outer object will reference the whole object:
0758
0759 {% highlight python %}
0760 class MyClass(object):
0761 def __init__(self):
0762 self.field = "Hello"
0763 def doStuff(self, rdd):
0764 return rdd.map(lambda s: self.field + s)
0765 {% endhighlight %}
0766
0767 To avoid this issue, the simplest way is to copy `field` into a local variable instead
0768 of accessing it externally:
0769
0770 {% highlight python %}
0771 def doStuff(self, rdd):
0772 field = self.field
0773 return rdd.map(lambda s: field + s)
0774 {% endhighlight %}
0775
0776 </div>
0777
0778 </div>
0779
0780 ### Understanding closures <a name="ClosuresLink"></a>
0781 One of the harder things about Spark is understanding the scope and life cycle of variables and methods when executing code across a cluster. RDD operations that modify variables outside of their scope can be a frequent source of confusion. In the example below we'll look at code that uses `foreach()` to increment a counter, but similar issues can occur for other operations as well.
0782
0783 #### Example
0784
0785 Consider the naive RDD element sum below, which may behave differently depending on whether execution is happening within the same JVM. A common example of this is when running Spark in `local` mode (`--master = local[n]`) versus deploying a Spark application to a cluster (e.g. via spark-submit to YARN):
0786
0787 <div class="codetabs">
0788
0789 <div data-lang="scala" markdown="1">
0790 {% highlight scala %}
0791 var counter = 0
0792 var rdd = sc.parallelize(data)
0793
0794 // Wrong: Don't do this!!
0795 rdd.foreach(x => counter += x)
0796
0797 println("Counter value: " + counter)
0798 {% endhighlight %}
0799 </div>
0800
0801 <div data-lang="java" markdown="1">
0802 {% highlight java %}
0803 int counter = 0;
0804 JavaRDD<Integer> rdd = sc.parallelize(data);
0805
0806 // Wrong: Don't do this!!
0807 rdd.foreach(x -> counter += x);
0808
0809 println("Counter value: " + counter);
0810 {% endhighlight %}
0811 </div>
0812
0813 <div data-lang="python" markdown="1">
0814 {% highlight python %}
0815 counter = 0
0816 rdd = sc.parallelize(data)
0817
0818 # Wrong: Don't do this!!
0819 def increment_counter(x):
0820 global counter
0821 counter += x
0822 rdd.foreach(increment_counter)
0823
0824 print("Counter value: ", counter)
0825 {% endhighlight %}
0826 </div>
0827
0828 </div>
0829
0830 #### Local vs. cluster modes
0831
0832 The behavior of the above code is undefined, and may not work as intended. To execute jobs, Spark breaks up the processing of RDD operations into tasks, each of which is executed by an executor. Prior to execution, Spark computes the task's **closure**. The closure is those variables and methods which must be visible for the executor to perform its computations on the RDD (in this case `foreach()`). This closure is serialized and sent to each executor.
0833
0834 The variables within the closure sent to each executor are now copies and thus, when **counter** is referenced within the `foreach` function, it's no longer the **counter** on the driver node. There is still a **counter** in the memory of the driver node but this is no longer visible to the executors! The executors only see the copy from the serialized closure. Thus, the final value of **counter** will still be zero since all operations on **counter** were referencing the value within the serialized closure.
0835
0836 In local mode, in some circumstances, the `foreach` function will actually execute within the same JVM as the driver and will reference the same original **counter**, and may actually update it.
0837
0838 To ensure well-defined behavior in these sorts of scenarios one should use an [`Accumulator`](#accumulators). Accumulators in Spark are used specifically to provide a mechanism for safely updating a variable when execution is split up across worker nodes in a cluster. The Accumulators section of this guide discusses these in more detail.
0839
0840 In general, closures - constructs like loops or locally defined methods, should not be used to mutate some global state. Spark does not define or guarantee the behavior of mutations to objects referenced from outside of closures. Some code that does this may work in local mode, but that's just by accident and such code will not behave as expected in distributed mode. Use an Accumulator instead if some global aggregation is needed.
0841
0842 #### Printing elements of an RDD
0843 Another common idiom is attempting to print out the elements of an RDD using `rdd.foreach(println)` or `rdd.map(println)`. On a single machine, this will generate the expected output and print all the RDD's elements. However, in `cluster` mode, the output to `stdout` being called by the executors is now writing to the executor's `stdout` instead, not the one on the driver, so `stdout` on the driver won't show these! To print all elements on the driver, one can use the `collect()` method to first bring the RDD to the driver node thus: `rdd.collect().foreach(println)`. This can cause the driver to run out of memory, though, because `collect()` fetches the entire RDD to a single machine; if you only need to print a few elements of the RDD, a safer approach is to use the `take()`: `rdd.take(100).foreach(println)`.
0844
0845 ### Working with Key-Value Pairs
0846
0847 <div class="codetabs">
0848
0849 <div data-lang="scala" markdown="1">
0850
0851 While most Spark operations work on RDDs containing any type of objects, a few special operations are
0852 only available on RDDs of key-value pairs.
0853 The most common ones are distributed "shuffle" operations, such as grouping or aggregating the elements
0854 by a key.
0855
0856 In Scala, these operations are automatically available on RDDs containing
0857 [Tuple2](http://www.scala-lang.org/api/{{site.SCALA_VERSION}}/index.html#scala.Tuple2) objects
0858 (the built-in tuples in the language, created by simply writing `(a, b)`). The key-value pair operations are available in the
0859 [PairRDDFunctions](api/scala/org/apache/spark/rdd/PairRDDFunctions.html) class,
0860 which automatically wraps around an RDD of tuples.
0861
0862 For example, the following code uses the `reduceByKey` operation on key-value pairs to count how
0863 many times each line of text occurs in a file:
0864
0865 {% highlight scala %}
0866 val lines = sc.textFile("data.txt")
0867 val pairs = lines.map(s => (s, 1))
0868 val counts = pairs.reduceByKey((a, b) => a + b)
0869 {% endhighlight %}
0870
0871 We could also use `counts.sortByKey()`, for example, to sort the pairs alphabetically, and finally
0872 `counts.collect()` to bring them back to the driver program as an array of objects.
0873
0874 **Note:** when using custom objects as the key in key-value pair operations, you must be sure that a
0875 custom `equals()` method is accompanied with a matching `hashCode()` method. For full details, see
0876 the contract outlined in the [Object.hashCode()
0877 documentation](https://docs.oracle.com/javase/8/docs/api/java/lang/Object.html#hashCode--).
0878
0879 </div>
0880
0881 <div data-lang="java" markdown="1">
0882
0883 While most Spark operations work on RDDs containing any type of objects, a few special operations are
0884 only available on RDDs of key-value pairs.
0885 The most common ones are distributed "shuffle" operations, such as grouping or aggregating the elements
0886 by a key.
0887
0888 In Java, key-value pairs are represented using the
0889 [scala.Tuple2](http://www.scala-lang.org/api/{{site.SCALA_VERSION}}/index.html#scala.Tuple2) class
0890 from the Scala standard library. You can simply call `new Tuple2(a, b)` to create a tuple, and access
0891 its fields later with `tuple._1()` and `tuple._2()`.
0892
0893 RDDs of key-value pairs are represented by the
0894 [JavaPairRDD](api/java/index.html?org/apache/spark/api/java/JavaPairRDD.html) class. You can construct
0895 JavaPairRDDs from JavaRDDs using special versions of the `map` operations, like
0896 `mapToPair` and `flatMapToPair`. The JavaPairRDD will have both standard RDD functions and special
0897 key-value ones.
0898
0899 For example, the following code uses the `reduceByKey` operation on key-value pairs to count how
0900 many times each line of text occurs in a file:
0901
0902 {% highlight scala %}
0903 JavaRDD<String> lines = sc.textFile("data.txt");
0904 JavaPairRDD<String, Integer> pairs = lines.mapToPair(s -> new Tuple2(s, 1));
0905 JavaPairRDD<String, Integer> counts = pairs.reduceByKey((a, b) -> a + b);
0906 {% endhighlight %}
0907
0908 We could also use `counts.sortByKey()`, for example, to sort the pairs alphabetically, and finally
0909 `counts.collect()` to bring them back to the driver program as an array of objects.
0910
0911 **Note:** when using custom objects as the key in key-value pair operations, you must be sure that a
0912 custom `equals()` method is accompanied with a matching `hashCode()` method. For full details, see
0913 the contract outlined in the [Object.hashCode()
0914 documentation](https://docs.oracle.com/javase/8/docs/api/java/lang/Object.html#hashCode--).
0915
0916 </div>
0917
0918 <div data-lang="python" markdown="1">
0919
0920 While most Spark operations work on RDDs containing any type of objects, a few special operations are
0921 only available on RDDs of key-value pairs.
0922 The most common ones are distributed "shuffle" operations, such as grouping or aggregating the elements
0923 by a key.
0924
0925 In Python, these operations work on RDDs containing built-in Python tuples such as `(1, 2)`.
0926 Simply create such tuples and then call your desired operation.
0927
0928 For example, the following code uses the `reduceByKey` operation on key-value pairs to count how
0929 many times each line of text occurs in a file:
0930
0931 {% highlight python %}
0932 lines = sc.textFile("data.txt")
0933 pairs = lines.map(lambda s: (s, 1))
0934 counts = pairs.reduceByKey(lambda a, b: a + b)
0935 {% endhighlight %}
0936
0937 We could also use `counts.sortByKey()`, for example, to sort the pairs alphabetically, and finally
0938 `counts.collect()` to bring them back to the driver program as a list of objects.
0939
0940 </div>
0941
0942 </div>
0943
0944
0945 ### Transformations
0946
0947 The following table lists some of the common transformations supported by Spark. Refer to the
0948 RDD API doc
0949 ([Scala](api/scala/org/apache/spark/rdd/RDD.html),
0950 [Java](api/java/index.html?org/apache/spark/api/java/JavaRDD.html),
0951 [Python](api/python/pyspark.html#pyspark.RDD),
0952 [R](api/R/index.html))
0953 and pair RDD functions doc
0954 ([Scala](api/scala/org/apache/spark/rdd/PairRDDFunctions.html),
0955 [Java](api/java/index.html?org/apache/spark/api/java/JavaPairRDD.html))
0956 for details.
0957
0958 <table class="table">
0959 <tr><th style="width:25%">Transformation</th><th>Meaning</th></tr>
0960 <tr>
0961 <td> <b>map</b>(<i>func</i>) </td>
0962 <td> Return a new distributed dataset formed by passing each element of the source through a function <i>func</i>. </td>
0963 </tr>
0964 <tr>
0965 <td> <b>filter</b>(<i>func</i>) </td>
0966 <td> Return a new dataset formed by selecting those elements of the source on which <i>func</i> returns true. </td>
0967 </tr>
0968 <tr>
0969 <td> <b>flatMap</b>(<i>func</i>) </td>
0970 <td> Similar to map, but each input item can be mapped to 0 or more output items (so <i>func</i> should return a Seq rather than a single item). </td>
0971 </tr>
0972 <tr>
0973 <td> <b>mapPartitions</b>(<i>func</i>) <a name="MapPartLink"></a> </td>
0974 <td> Similar to map, but runs separately on each partition (block) of the RDD, so <i>func</i> must be of type
0975 Iterator<T> => Iterator<U> when running on an RDD of type T. </td>
0976 </tr>
0977 <tr>
0978 <td> <b>mapPartitionsWithIndex</b>(<i>func</i>) </td>
0979 <td> Similar to mapPartitions, but also provides <i>func</i> with an integer value representing the index of
0980 the partition, so <i>func</i> must be of type (Int, Iterator<T>) => Iterator<U> when running on an RDD of type T.
0981 </td>
0982 </tr>
0983 <tr>
0984 <td> <b>sample</b>(<i>withReplacement</i>, <i>fraction</i>, <i>seed</i>) </td>
0985 <td> Sample a fraction <i>fraction</i> of the data, with or without replacement, using a given random number generator seed. </td>
0986 </tr>
0987 <tr>
0988 <td> <b>union</b>(<i>otherDataset</i>) </td>
0989 <td> Return a new dataset that contains the union of the elements in the source dataset and the argument. </td>
0990 </tr>
0991 <tr>
0992 <td> <b>intersection</b>(<i>otherDataset</i>) </td>
0993 <td> Return a new RDD that contains the intersection of elements in the source dataset and the argument. </td>
0994 </tr>
0995 <tr>
0996 <td> <b>distinct</b>([<i>numPartitions</i>])) </td>
0997 <td> Return a new dataset that contains the distinct elements of the source dataset.</td>
0998 </tr>
0999 <tr>
1000 <td> <b>groupByKey</b>([<i>numPartitions</i>]) <a name="GroupByLink"></a> </td>
1001 <td> When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable<V>) pairs. <br />
1002 <b>Note:</b> If you are grouping in order to perform an aggregation (such as a sum or
1003 average) over each key, using <code>reduceByKey</code> or <code>aggregateByKey</code> will yield much better
1004 performance.
1005 <br />
1006 <b>Note:</b> By default, the level of parallelism in the output depends on the number of partitions of the parent RDD.
1007 You can pass an optional <code>numPartitions</code> argument to set a different number of tasks.
1008 </td>
1009 </tr>
1010 <tr>
1011 <td> <b>reduceByKey</b>(<i>func</i>, [<i>numPartitions</i>]) <a name="ReduceByLink"></a> </td>
1012 <td> When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function <i>func</i>, which must be of type (V,V) => V. Like in <code>groupByKey</code>, the number of reduce tasks is configurable through an optional second argument. </td>
1013 </tr>
1014 <tr>
1015 <td> <b>aggregateByKey</b>(<i>zeroValue</i>)(<i>seqOp</i>, <i>combOp</i>, [<i>numPartitions</i>]) <a name="AggregateByLink"></a> </td>
1016 <td> When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral "zero" value. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations. Like in <code>groupByKey</code>, the number of reduce tasks is configurable through an optional second argument. </td>
1017 </tr>
1018 <tr>
1019 <td> <b>sortByKey</b>([<i>ascending</i>], [<i>numPartitions</i>]) <a name="SortByLink"></a> </td>
1020 <td> When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean <code>ascending</code> argument.</td>
1021 </tr>
1022 <tr>
1023 <td> <b>join</b>(<i>otherDataset</i>, [<i>numPartitions</i>]) <a name="JoinLink"></a> </td>
1024 <td> When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key.
1025 Outer joins are supported through <code>leftOuterJoin</code>, <code>rightOuterJoin</code>, and <code>fullOuterJoin</code>.
1026 </td>
1027 </tr>
1028 <tr>
1029 <td> <b>cogroup</b>(<i>otherDataset</i>, [<i>numPartitions</i>]) <a name="CogroupLink"></a> </td>
1030 <td> When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (Iterable<V>, Iterable<W>)) tuples. This operation is also called <code>groupWith</code>. </td>
1031 </tr>
1032 <tr>
1033 <td> <b>cartesian</b>(<i>otherDataset</i>) </td>
1034 <td> When called on datasets of types T and U, returns a dataset of (T, U) pairs (all pairs of elements). </td>
1035 </tr>
1036 <tr>
1037 <td> <b>pipe</b>(<i>command</i>, <i>[envVars]</i>) </td>
1038 <td> Pipe each partition of the RDD through a shell command, e.g. a Perl or bash script. RDD elements are written to the
1039 process's stdin and lines output to its stdout are returned as an RDD of strings. </td>
1040 </tr>
1041 <tr>
1042 <td> <b>coalesce</b>(<i>numPartitions</i>) <a name="CoalesceLink"></a> </td>
1043 <td> Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently
1044 after filtering down a large dataset. </td>
1045 </tr>
1046 <tr>
1047 <td> <b>repartition</b>(<i>numPartitions</i>) </td>
1048 <td> Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them.
1049 This always shuffles all data over the network. <a name="RepartitionLink"></a></td>
1050 </tr>
1051 <tr>
1052 <td> <b>repartitionAndSortWithinPartitions</b>(<i>partitioner</i>) <a name="Repartition2Link"></a></td>
1053 <td> Repartition the RDD according to the given partitioner and, within each resulting partition,
1054 sort records by their keys. This is more efficient than calling <code>repartition</code> and then sorting within
1055 each partition because it can push the sorting down into the shuffle machinery. </td>
1056 </tr>
1057 </table>
1058
1059 ### Actions
1060
1061 The following table lists some of the common actions supported by Spark. Refer to the
1062 RDD API doc
1063 ([Scala](api/scala/org/apache/spark/rdd/RDD.html),
1064 [Java](api/java/index.html?org/apache/spark/api/java/JavaRDD.html),
1065 [Python](api/python/pyspark.html#pyspark.RDD),
1066 [R](api/R/index.html))
1067
1068 and pair RDD functions doc
1069 ([Scala](api/scala/org/apache/spark/rdd/PairRDDFunctions.html),
1070 [Java](api/java/index.html?org/apache/spark/api/java/JavaPairRDD.html))
1071 for details.
1072
1073 <table class="table">
1074 <tr><th>Action</th><th>Meaning</th></tr>
1075 <tr>
1076 <td> <b>reduce</b>(<i>func</i>) </td>
1077 <td> Aggregate the elements of the dataset using a function <i>func</i> (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel. </td>
1078 </tr>
1079 <tr>
1080 <td> <b>collect</b>() </td>
1081 <td> Return all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data. </td>
1082 </tr>
1083 <tr>
1084 <td> <b>count</b>() </td>
1085 <td> Return the number of elements in the dataset. </td>
1086 </tr>
1087 <tr>
1088 <td> <b>first</b>() </td>
1089 <td> Return the first element of the dataset (similar to take(1)). </td>
1090 </tr>
1091 <tr>
1092 <td> <b>take</b>(<i>n</i>) </td>
1093 <td> Return an array with the first <i>n</i> elements of the dataset. </td>
1094 </tr>
1095 <tr>
1096 <td> <b>takeSample</b>(<i>withReplacement</i>, <i>num</i>, [<i>seed</i>]) </td>
1097 <td> Return an array with a random sample of <i>num</i> elements of the dataset, with or without replacement, optionally pre-specifying a random number generator seed.</td>
1098 </tr>
1099 <tr>
1100 <td> <b>takeOrdered</b>(<i>n</i>, <i>[ordering]</i>) </td>
1101 <td> Return the first <i>n</i> elements of the RDD using either their natural order or a custom comparator. </td>
1102 </tr>
1103 <tr>
1104 <td> <b>saveAsTextFile</b>(<i>path</i>) </td>
1105 <td> Write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark will call toString on each element to convert it to a line of text in the file. </td>
1106 </tr>
1107 <tr>
1108 <td> <b>saveAsSequenceFile</b>(<i>path</i>) <br /> (Java and Scala) </td>
1109 <td> Write the elements of the dataset as a Hadoop SequenceFile in a given path in the local filesystem, HDFS or any other Hadoop-supported file system. This is available on RDDs of key-value pairs that implement Hadoop's Writable interface. In Scala, it is also
1110 available on types that are implicitly convertible to Writable (Spark includes conversions for basic types like Int, Double, String, etc). </td>
1111 </tr>
1112 <tr>
1113 <td> <b>saveAsObjectFile</b>(<i>path</i>) <br /> (Java and Scala) </td>
1114 <td> Write the elements of the dataset in a simple format using Java serialization, which can then be loaded using
1115 <code>SparkContext.objectFile()</code>. </td>
1116 </tr>
1117 <tr>
1118 <td> <b>countByKey</b>() <a name="CountByLink"></a> </td>
1119 <td> Only available on RDDs of type (K, V). Returns a hashmap of (K, Int) pairs with the count of each key. </td>
1120 </tr>
1121 <tr>
1122 <td> <b>foreach</b>(<i>func</i>) </td>
1123 <td> Run a function <i>func</i> on each element of the dataset. This is usually done for side effects such as updating an <a href="#accumulators">Accumulator</a> or interacting with external storage systems.
1124 <br /><b>Note</b>: modifying variables other than Accumulators outside of the <code>foreach()</code> may result in undefined behavior. See <a href="#understanding-closures-a-nameclosureslinka">Understanding closures </a> for more details.</td>
1125 </tr>
1126 </table>
1127
1128 The Spark RDD API also exposes asynchronous versions of some actions, like `foreachAsync` for `foreach`, which immediately return a `FutureAction` to the caller instead of blocking on completion of the action. This can be used to manage or wait for the asynchronous execution of the action.
1129
1130
1131 ### Shuffle operations
1132
1133 Certain operations within Spark trigger an event known as the shuffle. The shuffle is Spark's
1134 mechanism for re-distributing data so that it's grouped differently across partitions. This typically
1135 involves copying data across executors and machines, making the shuffle a complex and
1136 costly operation.
1137
1138 #### Background
1139
1140 To understand what happens during the shuffle, we can consider the example of the
1141 [`reduceByKey`](#ReduceByLink) operation. The `reduceByKey` operation generates a new RDD where all
1142 values for a single key are combined into a tuple - the key and the result of executing a reduce
1143 function against all values associated with that key. The challenge is that not all values for a
1144 single key necessarily reside on the same partition, or even the same machine, but they must be
1145 co-located to compute the result.
1146
1147 In Spark, data is generally not distributed across partitions to be in the necessary place for a
1148 specific operation. During computations, a single task will operate on a single partition - thus, to
1149 organize all the data for a single `reduceByKey` reduce task to execute, Spark needs to perform an
1150 all-to-all operation. It must read from all partitions to find all the values for all keys,
1151 and then bring together values across partitions to compute the final result for each key -
1152 this is called the **shuffle**.
1153
1154 Although the set of elements in each partition of newly shuffled data will be deterministic, and so
1155 is the ordering of partitions themselves, the ordering of these elements is not. If one desires predictably
1156 ordered data following shuffle then it's possible to use:
1157
1158 * `mapPartitions` to sort each partition using, for example, `.sorted`
1159 * `repartitionAndSortWithinPartitions` to efficiently sort partitions while simultaneously repartitioning
1160 * `sortBy` to make a globally ordered RDD
1161
1162 Operations which can cause a shuffle include **repartition** operations like
1163 [`repartition`](#RepartitionLink) and [`coalesce`](#CoalesceLink), **'ByKey** operations
1164 (except for counting) like [`groupByKey`](#GroupByLink) and [`reduceByKey`](#ReduceByLink), and
1165 **join** operations like [`cogroup`](#CogroupLink) and [`join`](#JoinLink).
1166
1167 #### Performance Impact
1168 The **Shuffle** is an expensive operation since it involves disk I/O, data serialization, and
1169 network I/O. To organize data for the shuffle, Spark generates sets of tasks - *map* tasks to
1170 organize the data, and a set of *reduce* tasks to aggregate it. This nomenclature comes from
1171 MapReduce and does not directly relate to Spark's `map` and `reduce` operations.
1172
1173 Internally, results from individual map tasks are kept in memory until they can't fit. Then, these
1174 are sorted based on the target partition and written to a single file. On the reduce side, tasks
1175 read the relevant sorted blocks.
1176
1177 Certain shuffle operations can consume significant amounts of heap memory since they employ
1178 in-memory data structures to organize records before or after transferring them. Specifically,
1179 `reduceByKey` and `aggregateByKey` create these structures on the map side, and `'ByKey` operations
1180 generate these on the reduce side. When data does not fit in memory Spark will spill these tables
1181 to disk, incurring the additional overhead of disk I/O and increased garbage collection.
1182
1183 Shuffle also generates a large number of intermediate files on disk. As of Spark 1.3, these files
1184 are preserved until the corresponding RDDs are no longer used and are garbage collected.
1185 This is done so the shuffle files don't need to be re-created if the lineage is re-computed.
1186 Garbage collection may happen only after a long period of time, if the application retains references
1187 to these RDDs or if GC does not kick in frequently. This means that long-running Spark jobs may
1188 consume a large amount of disk space. The temporary storage directory is specified by the
1189 `spark.local.dir` configuration parameter when configuring the Spark context.
1190
1191 Shuffle behavior can be tuned by adjusting a variety of configuration parameters. See the
1192 'Shuffle Behavior' section within the [Spark Configuration Guide](configuration.html).
1193
1194 ## RDD Persistence
1195
1196 One of the most important capabilities in Spark is *persisting* (or *caching*) a dataset in memory
1197 across operations. When you persist an RDD, each node stores any partitions of it that it computes in
1198 memory and reuses them in other actions on that dataset (or datasets derived from it). This allows
1199 future actions to be much faster (often by more than 10x). Caching is a key tool for
1200 iterative algorithms and fast interactive use.
1201
1202 You can mark an RDD to be persisted using the `persist()` or `cache()` methods on it. The first time
1203 it is computed in an action, it will be kept in memory on the nodes. Spark's cache is fault-tolerant --
1204 if any partition of an RDD is lost, it will automatically be recomputed using the transformations
1205 that originally created it.
1206
1207 In addition, each persisted RDD can be stored using a different *storage level*, allowing you, for example,
1208 to persist the dataset on disk, persist it in memory but as serialized Java objects (to save space),
1209 replicate it across nodes.
1210 These levels are set by passing a
1211 `StorageLevel` object ([Scala](api/scala/org/apache/spark/storage/StorageLevel.html),
1212 [Java](api/java/index.html?org/apache/spark/storage/StorageLevel.html),
1213 [Python](api/python/pyspark.html#pyspark.StorageLevel))
1214 to `persist()`. The `cache()` method is a shorthand for using the default storage level,
1215 which is `StorageLevel.MEMORY_ONLY` (store deserialized objects in memory). The full set of
1216 storage levels is:
1217
1218 <table class="table">
1219 <tr><th style="width:23%">Storage Level</th><th>Meaning</th></tr>
1220 <tr>
1221 <td> MEMORY_ONLY </td>
1222 <td> Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, some partitions will
1223 not be cached and will be recomputed on the fly each time they're needed. This is the default level. </td>
1224 </tr>
1225 <tr>
1226 <td> MEMORY_AND_DISK </td>
1227 <td> Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, store the
1228 partitions that don't fit on disk, and read them from there when they're needed. </td>
1229 </tr>
1230 <tr>
1231 <td> MEMORY_ONLY_SER <br /> (Java and Scala) </td>
1232 <td> Store RDD as <i>serialized</i> Java objects (one byte array per partition).
1233 This is generally more space-efficient than deserialized objects, especially when using a
1234 <a href="tuning.html">fast serializer</a>, but more CPU-intensive to read.
1235 </td>
1236 </tr>
1237 <tr>
1238 <td> MEMORY_AND_DISK_SER <br /> (Java and Scala) </td>
1239 <td> Similar to MEMORY_ONLY_SER, but spill partitions that don't fit in memory to disk instead of
1240 recomputing them on the fly each time they're needed. </td>
1241 </tr>
1242 <tr>
1243 <td> DISK_ONLY </td>
1244 <td> Store the RDD partitions only on disk. </td>
1245 </tr>
1246 <tr>
1247 <td> MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. </td>
1248 <td> Same as the levels above, but replicate each partition on two cluster nodes. </td>
1249 </tr>
1250 <tr>
1251 <td> OFF_HEAP (experimental) </td>
1252 <td> Similar to MEMORY_ONLY_SER, but store the data in
1253 <a href="configuration.html#memory-management">off-heap memory</a>. This requires off-heap memory to be enabled. </td>
1254 </tr>
1255 </table>
1256
1257 **Note:** *In Python, stored objects will always be serialized with the [Pickle](https://docs.python.org/2/library/pickle.html) library,
1258 so it does not matter whether you choose a serialized level. The available storage levels in Python include `MEMORY_ONLY`, `MEMORY_ONLY_2`,
1259 `MEMORY_AND_DISK`, `MEMORY_AND_DISK_2`, `DISK_ONLY`, and `DISK_ONLY_2`.*
1260
1261 Spark also automatically persists some intermediate data in shuffle operations (e.g. `reduceByKey`), even without users calling `persist`. This is done to avoid recomputing the entire input if a node fails during the shuffle. We still recommend users call `persist` on the resulting RDD if they plan to reuse it.
1262
1263 ### Which Storage Level to Choose?
1264
1265 Spark's storage levels are meant to provide different trade-offs between memory usage and CPU
1266 efficiency. We recommend going through the following process to select one:
1267
1268 * If your RDDs fit comfortably with the default storage level (`MEMORY_ONLY`), leave them that way.
1269 This is the most CPU-efficient option, allowing operations on the RDDs to run as fast as possible.
1270
1271 * If not, try using `MEMORY_ONLY_SER` and [selecting a fast serialization library](tuning.html) to
1272 make the objects much more space-efficient, but still reasonably fast to access. (Java and Scala)
1273
1274 * Don't spill to disk unless the functions that computed your datasets are expensive, or they filter
1275 a large amount of the data. Otherwise, recomputing a partition may be as fast as reading it from
1276 disk.
1277
1278 * Use the replicated storage levels if you want fast fault recovery (e.g. if using Spark to serve
1279 requests from a web application). *All* the storage levels provide full fault tolerance by
1280 recomputing lost data, but the replicated ones let you continue running tasks on the RDD without
1281 waiting to recompute a lost partition.
1282
1283
1284 ### Removing Data
1285
1286 Spark automatically monitors cache usage on each node and drops out old data partitions in a
1287 least-recently-used (LRU) fashion. If you would like to manually remove an RDD instead of waiting for
1288 it to fall out of the cache, use the `RDD.unpersist()` method. Note that this method does not
1289 block by default. To block until resources are freed, specify `blocking=true` when calling this method.
1290
1291 # Shared Variables
1292
1293 Normally, when a function passed to a Spark operation (such as `map` or `reduce`) is executed on a
1294 remote cluster node, it works on separate copies of all the variables used in the function. These
1295 variables are copied to each machine, and no updates to the variables on the remote machine are
1296 propagated back to the driver program. Supporting general, read-write shared variables across tasks
1297 would be inefficient. However, Spark does provide two limited types of *shared variables* for two
1298 common usage patterns: broadcast variables and accumulators.
1299
1300 ## Broadcast Variables
1301
1302 Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather
1303 than shipping a copy of it with tasks. They can be used, for example, to give every node a copy of a
1304 large input dataset in an efficient manner. Spark also attempts to distribute broadcast variables
1305 using efficient broadcast algorithms to reduce communication cost.
1306
1307 Spark actions are executed through a set of stages, separated by distributed "shuffle" operations.
1308 Spark automatically broadcasts the common data needed by tasks within each stage. The data
1309 broadcasted this way is cached in serialized form and deserialized before running each task. This
1310 means that explicitly creating broadcast variables is only useful when tasks across multiple stages
1311 need the same data or when caching the data in deserialized form is important.
1312
1313 Broadcast variables are created from a variable `v` by calling `SparkContext.broadcast(v)`. The
1314 broadcast variable is a wrapper around `v`, and its value can be accessed by calling the `value`
1315 method. The code below shows this:
1316
1317 <div class="codetabs">
1318
1319 <div data-lang="scala" markdown="1">
1320
1321 {% highlight scala %}
1322 scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
1323 broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)
1324
1325 scala> broadcastVar.value
1326 res0: Array[Int] = Array(1, 2, 3)
1327 {% endhighlight %}
1328
1329 </div>
1330
1331 <div data-lang="java" markdown="1">
1332
1333 {% highlight java %}
1334 Broadcast<int[]> broadcastVar = sc.broadcast(new int[] {1, 2, 3});
1335
1336 broadcastVar.value();
1337 // returns [1, 2, 3]
1338 {% endhighlight %}
1339
1340 </div>
1341
1342 <div data-lang="python" markdown="1">
1343
1344 {% highlight python %}
1345 >>> broadcastVar = sc.broadcast([1, 2, 3])
1346 <pyspark.broadcast.Broadcast object at 0x102789f10>
1347
1348 >>> broadcastVar.value
1349 [1, 2, 3]
1350 {% endhighlight %}
1351
1352 </div>
1353
1354 </div>
1355
1356 After the broadcast variable is created, it should be used instead of the value `v` in any functions
1357 run on the cluster so that `v` is not shipped to the nodes more than once. In addition, the object
1358 `v` should not be modified after it is broadcast in order to ensure that all nodes get the same
1359 value of the broadcast variable (e.g. if the variable is shipped to a new node later).
1360
1361 To release the resources that the broadcast variable copied onto executors, call `.unpersist()`.
1362 If the broadcast is used again afterwards, it will be re-broadcast. To permanently release all
1363 resources used by the broadcast variable, call `.destroy()`. The broadcast variable can't be used
1364 after that. Note that these methods do not block by default. To block until resources are freed,
1365 specify `blocking=true` when calling them.
1366
1367 ## Accumulators
1368
1369 Accumulators are variables that are only "added" to through an associative and commutative operation and can
1370 therefore be efficiently supported in parallel. They can be used to implement counters (as in
1371 MapReduce) or sums. Spark natively supports accumulators of numeric types, and programmers
1372 can add support for new types.
1373
1374 As a user, you can create named or unnamed accumulators. As seen in the image below, a named accumulator (in this instance `counter`) will display in the web UI for the stage that modifies that accumulator. Spark displays the value for each accumulator modified by a task in the "Tasks" table.
1375
1376 <p style="text-align: center;">
1377 <img src="img/spark-webui-accumulators.png" title="Accumulators in the Spark UI" alt="Accumulators in the Spark UI" />
1378 </p>
1379
1380 Tracking accumulators in the UI can be useful for understanding the progress of
1381 running stages (NOTE: this is not yet supported in Python).
1382
1383 <div class="codetabs">
1384
1385 <div data-lang="scala" markdown="1">
1386
1387 A numeric accumulator can be created by calling `SparkContext.longAccumulator()` or `SparkContext.doubleAccumulator()`
1388 to accumulate values of type Long or Double, respectively. Tasks running on a cluster can then add to it using
1389 the `add` method. However, they cannot read its value. Only the driver program can read the accumulator's value,
1390 using its `value` method.
1391
1392 The code below shows an accumulator being used to add up the elements of an array:
1393
1394 {% highlight scala %}
1395 scala> val accum = sc.longAccumulator("My Accumulator")
1396 accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)
1397
1398 scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
1399 ...
1400 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
1401
1402 scala> accum.value
1403 res2: Long = 10
1404 {% endhighlight %}
1405
1406 While this code used the built-in support for accumulators of type Long, programmers can also
1407 create their own types by subclassing [AccumulatorV2](api/scala/org/apache/spark/util/AccumulatorV2.html).
1408 The AccumulatorV2 abstract class has several methods which one has to override: `reset` for resetting
1409 the accumulator to zero, `add` for adding another value into the accumulator,
1410 `merge` for merging another same-type accumulator into this one. Other methods that must be overridden
1411 are contained in the [API documentation](api/scala/org/apache/spark/util/AccumulatorV2.html). For example, supposing we had a `MyVector` class
1412 representing mathematical vectors, we could write:
1413
1414 {% highlight scala %}
1415 class VectorAccumulatorV2 extends AccumulatorV2[MyVector, MyVector] {
1416
1417 private val myVector: MyVector = MyVector.createZeroVector
1418
1419 def reset(): Unit = {
1420 myVector.reset()
1421 }
1422
1423 def add(v: MyVector): Unit = {
1424 myVector.add(v)
1425 }
1426 ...
1427 }
1428
1429 // Then, create an Accumulator of this type:
1430 val myVectorAcc = new VectorAccumulatorV2
1431 // Then, register it into spark context:
1432 sc.register(myVectorAcc, "MyVectorAcc1")
1433 {% endhighlight %}
1434
1435 Note that, when programmers define their own type of AccumulatorV2, the resulting type can be different than that of the elements added.
1436
1437 </div>
1438
1439 <div data-lang="java" markdown="1">
1440
1441 A numeric accumulator can be created by calling `SparkContext.longAccumulator()` or `SparkContext.doubleAccumulator()`
1442 to accumulate values of type Long or Double, respectively. Tasks running on a cluster can then add to it using
1443 the `add` method. However, they cannot read its value. Only the driver program can read the accumulator's value,
1444 using its `value` method.
1445
1446 The code below shows an accumulator being used to add up the elements of an array:
1447
1448 {% highlight java %}
1449 LongAccumulator accum = jsc.sc().longAccumulator();
1450
1451 sc.parallelize(Arrays.asList(1, 2, 3, 4)).foreach(x -> accum.add(x));
1452 // ...
1453 // 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
1454
1455 accum.value();
1456 // returns 10
1457 {% endhighlight %}
1458
1459 While this code used the built-in support for accumulators of type Long, programmers can also
1460 create their own types by subclassing [AccumulatorV2](api/scala/org/apache/spark/util/AccumulatorV2.html).
1461 The AccumulatorV2 abstract class has several methods which one has to override: `reset` for resetting
1462 the accumulator to zero, `add` for adding another value into the accumulator,
1463 `merge` for merging another same-type accumulator into this one. Other methods that must be overridden
1464 are contained in the [API documentation](api/scala/org/apache/spark/util/AccumulatorV2.html). For example, supposing we had a `MyVector` class
1465 representing mathematical vectors, we could write:
1466
1467 {% highlight java %}
1468 class VectorAccumulatorV2 implements AccumulatorV2<MyVector, MyVector> {
1469
1470 private MyVector myVector = MyVector.createZeroVector();
1471
1472 public void reset() {
1473 myVector.reset();
1474 }
1475
1476 public void add(MyVector v) {
1477 myVector.add(v);
1478 }
1479 ...
1480 }
1481
1482 // Then, create an Accumulator of this type:
1483 VectorAccumulatorV2 myVectorAcc = new VectorAccumulatorV2();
1484 // Then, register it into spark context:
1485 jsc.sc().register(myVectorAcc, "MyVectorAcc1");
1486 {% endhighlight %}
1487
1488 Note that, when programmers define their own type of AccumulatorV2, the resulting type can be different than that of the elements added.
1489
1490 *Warning*: When a Spark task finishes, Spark will try to merge the accumulated updates in this task to an accumulator.
1491 If it fails, Spark will ignore the failure and still mark the task successful and continue to run other tasks. Hence,
1492 a buggy accumulator will not impact a Spark job, but it may not get updated correctly although a Spark job is successful.
1493
1494 </div>
1495
1496 <div data-lang="python" markdown="1">
1497
1498 An accumulator is created from an initial value `v` by calling `SparkContext.accumulator(v)`. Tasks
1499 running on a cluster can then add to it using the `add` method or the `+=` operator. However, they cannot read its value.
1500 Only the driver program can read the accumulator's value, using its `value` method.
1501
1502 The code below shows an accumulator being used to add up the elements of an array:
1503
1504 {% highlight python %}
1505 >>> accum = sc.accumulator(0)
1506 >>> accum
1507 Accumulator<id=0, value=0>
1508
1509 >>> sc.parallelize([1, 2, 3, 4]).foreach(lambda x: accum.add(x))
1510 ...
1511 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
1512
1513 >>> accum.value
1514 10
1515 {% endhighlight %}
1516
1517 While this code used the built-in support for accumulators of type Int, programmers can also
1518 create their own types by subclassing [AccumulatorParam](api/python/pyspark.html#pyspark.AccumulatorParam).
1519 The AccumulatorParam interface has two methods: `zero` for providing a "zero value" for your data
1520 type, and `addInPlace` for adding two values together. For example, supposing we had a `Vector` class
1521 representing mathematical vectors, we could write:
1522
1523 {% highlight python %}
1524 class VectorAccumulatorParam(AccumulatorParam):
1525 def zero(self, initialValue):
1526 return Vector.zeros(initialValue.size)
1527
1528 def addInPlace(self, v1, v2):
1529 v1 += v2
1530 return v1
1531
1532 # Then, create an Accumulator of this type:
1533 vecAccum = sc.accumulator(Vector(...), VectorAccumulatorParam())
1534 {% endhighlight %}
1535
1536 </div>
1537
1538 </div>
1539
1540 For accumulator updates performed inside <b>actions only</b>, Spark guarantees that each task's update to the accumulator
1541 will only be applied once, i.e. restarted tasks will not update the value. In transformations, users should be aware
1542 of that each task's update may be applied more than once if tasks or job stages are re-executed.
1543
1544 Accumulators do not change the lazy evaluation model of Spark. If they are being updated within an operation on an RDD, their value is only updated once that RDD is computed as part of an action. Consequently, accumulator updates are not guaranteed to be executed when made within a lazy transformation like `map()`. The below code fragment demonstrates this property:
1545
1546 <div class="codetabs">
1547
1548 <div data-lang="scala" markdown="1">
1549 {% highlight scala %}
1550 val accum = sc.longAccumulator
1551 data.map { x => accum.add(x); x }
1552 // Here, accum is still 0 because no actions have caused the map operation to be computed.
1553 {% endhighlight %}
1554 </div>
1555
1556 <div data-lang="java" markdown="1">
1557 {% highlight java %}
1558 LongAccumulator accum = jsc.sc().longAccumulator();
1559 data.map(x -> { accum.add(x); return f(x); });
1560 // Here, accum is still 0 because no actions have caused the `map` to be computed.
1561 {% endhighlight %}
1562 </div>
1563
1564 <div data-lang="python" markdown="1">
1565 {% highlight python %}
1566 accum = sc.accumulator(0)
1567 def g(x):
1568 accum.add(x)
1569 return f(x)
1570 data.map(g)
1571 # Here, accum is still 0 because no actions have caused the `map` to be computed.
1572 {% endhighlight %}
1573 </div>
1574
1575 </div>
1576
1577 # Deploying to a Cluster
1578
1579 The [application submission guide](submitting-applications.html) describes how to submit applications to a cluster.
1580 In short, once you package your application into a JAR (for Java/Scala) or a set of `.py` or `.zip` files (for Python),
1581 the `bin/spark-submit` script lets you submit it to any supported cluster manager.
1582
1583 # Launching Spark jobs from Java / Scala
1584
1585 The [org.apache.spark.launcher](api/java/index.html?org/apache/spark/launcher/package-summary.html)
1586 package provides classes for launching Spark jobs as child processes using a simple Java API.
1587
1588 # Unit Testing
1589
1590 Spark is friendly to unit testing with any popular unit test framework.
1591 Simply create a `SparkContext` in your test with the master URL set to `local`, run your operations,
1592 and then call `SparkContext.stop()` to tear it down.
1593 Make sure you stop the context within a `finally` block or the test framework's `tearDown` method,
1594 as Spark does not support two contexts running concurrently in the same program.
1595
1596 # Where to Go from Here
1597
1598 You can see some [example Spark programs](https://spark.apache.org/examples.html) on the Spark website.
1599 In addition, Spark includes several samples in the `examples` directory
1600 ([Scala]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/examples),
1601 [Java]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/java/org/apache/spark/examples),
1602 [Python]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/python),
1603 [R]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/r)).
1604 You can run Java and Scala examples by passing the class name to Spark's `bin/run-example` script; for instance:
1605
1606 ./bin/run-example SparkPi
1607
1608 For Python examples, use `spark-submit` instead:
1609
1610 ./bin/spark-submit examples/src/main/python/pi.py
1611
1612 For R examples, use `spark-submit` instead:
1613
1614 ./bin/spark-submit examples/src/main/r/dataframe.R
1615
1616 For help on optimizing your programs, the [configuration](configuration.html) and
1617 [tuning](tuning.html) guides provide information on best practices. They are especially important for
1618 making sure that your data is stored in memory in an efficient format.
1619 For help on deploying, the [cluster mode overview](cluster-overview.html) describes the components involved
1620 in distributed operation and supported cluster managers.
1621
1622 Finally, full API documentation is available in
1623 [Scala](api/scala/org/apache/spark/), [Java](api/java/), [Python](api/python/) and [R](api/R/).