0001 ---
0002 layout: global
0003 title: Quick Start
0004 description: Quick start tutorial for Spark SPARK_VERSION_SHORT
0005 license: |
0006 Licensed to the Apache Software Foundation (ASF) under one or more
0007 contributor license agreements. See the NOTICE file distributed with
0008 this work for additional information regarding copyright ownership.
0009 The ASF licenses this file to You under the Apache License, Version 2.0
0010 (the "License"); you may not use this file except in compliance with
0011 the License. You may obtain a copy of the License at
0012
0013 http://www.apache.org/licenses/LICENSE-2.0
0014
0015 Unless required by applicable law or agreed to in writing, software
0016 distributed under the License is distributed on an "AS IS" BASIS,
0017 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0018 See the License for the specific language governing permissions and
0019 limitations under the License.
0020 ---
0021
0022 * This will become a table of contents (this text will be scraped).
0023 {:toc}
0024
0025 This tutorial provides a quick introduction to using Spark. We will first introduce the API through Spark's
0026 interactive shell (in Python or Scala),
0027 then show how to write applications in Java, Scala, and Python.
0028
0029 To follow along with this guide, first, download a packaged release of Spark from the
0030 [Spark website](https://spark.apache.org/downloads.html). Since we won't be using HDFS,
0031 you can download a package for any version of Hadoop.
0032
0033 Note that, before Spark 2.0, the main programming interface of Spark was the Resilient Distributed Dataset (RDD). After Spark 2.0, RDDs are replaced by Dataset, which is strongly-typed like an RDD, but with richer optimizations under the hood. The RDD interface is still supported, and you can get a more detailed reference at the [RDD programming guide](rdd-programming-guide.html). However, we highly recommend you to switch to use Dataset, which has better performance than RDD. See the [SQL programming guide](sql-programming-guide.html) to get more information about Dataset.
0034
0035 # Security
0036
0037 Security in Spark is OFF by default. This could mean you are vulnerable to attack by default.
0038 Please see [Spark Security](security.html) before running Spark.
0039
0040 # Interactive Analysis with the Spark Shell
0041
0042 ## Basics
0043
0044 Spark's shell provides a simple way to learn the API, as well as a powerful tool to analyze data interactively.
0045 It is available in either Scala (which runs on the Java VM and is thus a good way to use existing Java libraries)
0046 or Python. Start it by running the following in the Spark directory:
0047
0048 <div class="codetabs">
0049 <div data-lang="scala" markdown="1">
0050
0051 ./bin/spark-shell
0052
0053 Spark's primary abstraction is a distributed collection of items called a Dataset. Datasets can be created from Hadoop InputFormats (such as HDFS files) or by transforming other Datasets. Let's make a new Dataset from the text of the README file in the Spark source directory:
0054
0055 {% highlight scala %}
0056 scala> val textFile = spark.read.textFile("README.md")
0057 textFile: org.apache.spark.sql.Dataset[String] = [value: string]
0058 {% endhighlight %}
0059
0060 You can get values from Dataset directly, by calling some actions, or transform the Dataset to get a new one. For more details, please read the _[API doc](api/scala/org/apache/spark/sql/Dataset.html)_.
0061
0062 {% highlight scala %}
0063 scala> textFile.count() // Number of items in this Dataset
0064 res0: Long = 126 // May be different from yours as README.md will change over time, similar to other outputs
0065
0066 scala> textFile.first() // First item in this Dataset
0067 res1: String = # Apache Spark
0068 {% endhighlight %}
0069
0070 Now let's transform this Dataset into a new one. We call `filter` to return a new Dataset with a subset of the items in the file.
0071
0072 {% highlight scala %}
0073 scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
0074 linesWithSpark: org.apache.spark.sql.Dataset[String] = [value: string]
0075 {% endhighlight %}
0076
0077 We can chain together transformations and actions:
0078
0079 {% highlight scala %}
0080 scala> textFile.filter(line => line.contains("Spark")).count() // How many lines contain "Spark"?
0081 res3: Long = 15
0082 {% endhighlight %}
0083
0084 </div>
0085 <div data-lang="python" markdown="1">
0086
0087 ./bin/pyspark
0088
0089
0090 Or if PySpark is installed with pip in your current environment:
0091
0092 pyspark
0093
0094 Spark's primary abstraction is a distributed collection of items called a Dataset. Datasets can be created from Hadoop InputFormats (such as HDFS files) or by transforming other Datasets. Due to Python's dynamic nature, we don't need the Dataset to be strongly-typed in Python. As a result, all Datasets in Python are Dataset[Row], and we call it `DataFrame` to be consistent with the data frame concept in Pandas and R. Let's make a new DataFrame from the text of the README file in the Spark source directory:
0095
0096 {% highlight python %}
0097 >>> textFile = spark.read.text("README.md")
0098 {% endhighlight %}
0099
0100 You can get values from DataFrame directly, by calling some actions, or transform the DataFrame to get a new one. For more details, please read the _[API doc](api/python/index.html#pyspark.sql.DataFrame)_.
0101
0102 {% highlight python %}
0103 >>> textFile.count() # Number of rows in this DataFrame
0104 126
0105
0106 >>> textFile.first() # First row in this DataFrame
0107 Row(value=u'# Apache Spark')
0108 {% endhighlight %}
0109
0110 Now let's transform this DataFrame to a new one. We call `filter` to return a new DataFrame with a subset of the lines in the file.
0111
0112 {% highlight python %}
0113 >>> linesWithSpark = textFile.filter(textFile.value.contains("Spark"))
0114 {% endhighlight %}
0115
0116 We can chain together transformations and actions:
0117
0118 {% highlight python %}
0119 >>> textFile.filter(textFile.value.contains("Spark")).count() # How many lines contain "Spark"?
0120 15
0121 {% endhighlight %}
0122
0123 </div>
0124 </div>
0125
0126
0127 ## More on Dataset Operations
0128 Dataset actions and transformations can be used for more complex computations. Let's say we want to find the line with the most words:
0129
0130 <div class="codetabs">
0131 <div data-lang="scala" markdown="1">
0132
0133 {% highlight scala %}
0134 scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
0135 res4: Long = 15
0136 {% endhighlight %}
0137
0138 This first maps a line to an integer value, creating a new Dataset. `reduce` is called on that Dataset to find the largest word count. The arguments to `map` and `reduce` are Scala function literals (closures), and can use any language feature or Scala/Java library. For example, we can easily call functions declared elsewhere. We'll use `Math.max()` function to make this code easier to understand:
0139
0140 {% highlight scala %}
0141 scala> import java.lang.Math
0142 import java.lang.Math
0143
0144 scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))
0145 res5: Int = 15
0146 {% endhighlight %}
0147
0148 One common data flow pattern is MapReduce, as popularized by Hadoop. Spark can implement MapReduce flows easily:
0149
0150 {% highlight scala %}
0151 scala> val wordCounts = textFile.flatMap(line => line.split(" ")).groupByKey(identity).count()
0152 wordCounts: org.apache.spark.sql.Dataset[(String, Long)] = [value: string, count(1): bigint]
0153 {% endhighlight %}
0154
0155 Here, we call `flatMap` to transform a Dataset of lines to a Dataset of words, and then combine `groupByKey` and `count` to compute the per-word counts in the file as a Dataset of (String, Long) pairs. To collect the word counts in our shell, we can call `collect`:
0156
0157 {% highlight scala %}
0158 scala> wordCounts.collect()
0159 res6: Array[(String, Int)] = Array((means,1), (under,2), (this,3), (Because,1), (Python,2), (agree,1), (cluster.,1), ...)
0160 {% endhighlight %}
0161
0162 </div>
0163 <div data-lang="python" markdown="1">
0164
0165 {% highlight python %}
0166 >>> from pyspark.sql.functions import *
0167 >>> textFile.select(size(split(textFile.value, "\s+")).name("numWords")).agg(max(col("numWords"))).collect()
0168 [Row(max(numWords)=15)]
0169 {% endhighlight %}
0170
0171 This first maps a line to an integer value and aliases it as "numWords", creating a new DataFrame. `agg` is called on that DataFrame to find the largest word count. The arguments to `select` and `agg` are both _[Column](api/python/index.html#pyspark.sql.Column)_, we can use `df.colName` to get a column from a DataFrame. We can also import pyspark.sql.functions, which provides a lot of convenient functions to build a new Column from an old one.
0172
0173 One common data flow pattern is MapReduce, as popularized by Hadoop. Spark can implement MapReduce flows easily:
0174
0175 {% highlight python %}
0176 >>> wordCounts = textFile.select(explode(split(textFile.value, "\s+")).alias("word")).groupBy("word").count()
0177 {% endhighlight %}
0178
0179 Here, we use the `explode` function in `select`, to transform a Dataset of lines to a Dataset of words, and then combine `groupBy` and `count` to compute the per-word counts in the file as a DataFrame of 2 columns: "word" and "count". To collect the word counts in our shell, we can call `collect`:
0180
0181 {% highlight python %}
0182 >>> wordCounts.collect()
0183 [Row(word=u'online', count=1), Row(word=u'graphs', count=1), ...]
0184 {% endhighlight %}
0185
0186 </div>
0187 </div>
0188
0189 ## Caching
0190 Spark also supports pulling data sets into a cluster-wide in-memory cache. This is very useful when data is accessed repeatedly, such as when querying a small "hot" dataset or when running an iterative algorithm like PageRank. As a simple example, let's mark our `linesWithSpark` dataset to be cached:
0191
0192 <div class="codetabs">
0193 <div data-lang="scala" markdown="1">
0194
0195 {% highlight scala %}
0196 scala> linesWithSpark.cache()
0197 res7: linesWithSpark.type = [value: string]
0198
0199 scala> linesWithSpark.count()
0200 res8: Long = 15
0201
0202 scala> linesWithSpark.count()
0203 res9: Long = 15
0204 {% endhighlight %}
0205
0206 It may seem silly to use Spark to explore and cache a 100-line text file. The interesting part is
0207 that these same functions can be used on very large data sets, even when they are striped across
0208 tens or hundreds of nodes. You can also do this interactively by connecting `bin/spark-shell` to
0209 a cluster, as described in the [RDD programming guide](rdd-programming-guide.html#using-the-shell).
0210
0211 </div>
0212 <div data-lang="python" markdown="1">
0213
0214 {% highlight python %}
0215 >>> linesWithSpark.cache()
0216
0217 >>> linesWithSpark.count()
0218 15
0219
0220 >>> linesWithSpark.count()
0221 15
0222 {% endhighlight %}
0223
0224 It may seem silly to use Spark to explore and cache a 100-line text file. The interesting part is
0225 that these same functions can be used on very large data sets, even when they are striped across
0226 tens or hundreds of nodes. You can also do this interactively by connecting `bin/pyspark` to
0227 a cluster, as described in the [RDD programming guide](rdd-programming-guide.html#using-the-shell).
0228
0229 </div>
0230 </div>
0231
0232 # Self-Contained Applications
0233 Suppose we wish to write a self-contained application using the Spark API. We will walk through a
0234 simple application in Scala (with sbt), Java (with Maven), and Python (pip).
0235
0236 <div class="codetabs">
0237 <div data-lang="scala" markdown="1">
0238
0239 We'll create a very simple Spark application in Scala--so simple, in fact, that it's
0240 named `SimpleApp.scala`:
0241
0242 {% highlight scala %}
0243 /* SimpleApp.scala */
0244 import org.apache.spark.sql.SparkSession
0245
0246 object SimpleApp {
0247 def main(args: Array[String]) {
0248 val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system
0249 val spark = SparkSession.builder.appName("Simple Application").getOrCreate()
0250 val logData = spark.read.textFile(logFile).cache()
0251 val numAs = logData.filter(line => line.contains("a")).count()
0252 val numBs = logData.filter(line => line.contains("b")).count()
0253 println(s"Lines with a: $numAs, Lines with b: $numBs")
0254 spark.stop()
0255 }
0256 }
0257 {% endhighlight %}
0258
0259 Note that applications should define a `main()` method instead of extending `scala.App`.
0260 Subclasses of `scala.App` may not work correctly.
0261
0262 This program just counts the number of lines containing 'a' and the number containing 'b' in the
0263 Spark README. Note that you'll need to replace YOUR_SPARK_HOME with the location where Spark is
0264 installed. Unlike the earlier examples with the Spark shell, which initializes its own SparkSession,
0265 we initialize a SparkSession as part of the program.
0266
0267 We call `SparkSession.builder` to construct a `SparkSession`, then set the application name, and finally call `getOrCreate` to get the `SparkSession` instance.
0268
0269 Our application depends on the Spark API, so we'll also include an sbt configuration file,
0270 `build.sbt`, which explains that Spark is a dependency. This file also adds a repository that
0271 Spark depends on:
0272
0273 {% highlight scala %}
0274 name := "Simple Project"
0275
0276 version := "1.0"
0277
0278 scalaVersion := "{{site.SCALA_VERSION}}"
0279
0280 libraryDependencies += "org.apache.spark" %% "spark-sql" % "{{site.SPARK_VERSION}}"
0281 {% endhighlight %}
0282
0283 For sbt to work correctly, we'll need to layout `SimpleApp.scala` and `build.sbt`
0284 according to the typical directory structure. Once that is in place, we can create a JAR package
0285 containing the application's code, then use the `spark-submit` script to run our program.
0286
0287 {% highlight bash %}
0288 # Your directory layout should look like this
0289 $ find .
0290 .
0291 ./build.sbt
0292 ./src
0293 ./src/main
0294 ./src/main/scala
0295 ./src/main/scala/SimpleApp.scala
0296
0297 # Package a jar containing your application
0298 $ sbt package
0299 ...
0300 [info] Packaging {..}/{..}/target/scala-{{site.SCALA_BINARY_VERSION}}/simple-project_{{site.SCALA_BINARY_VERSION}}-1.0.jar
0301
0302 # Use spark-submit to run your application
0303 $ YOUR_SPARK_HOME/bin/spark-submit \
0304 --class "SimpleApp" \
0305 --master local[4] \
0306 target/scala-{{site.SCALA_BINARY_VERSION}}/simple-project_{{site.SCALA_BINARY_VERSION}}-1.0.jar
0307 ...
0308 Lines with a: 46, Lines with b: 23
0309 {% endhighlight %}
0310
0311 </div>
0312 <div data-lang="java" markdown="1">
0313 This example will use Maven to compile an application JAR, but any similar build system will work.
0314
0315 We'll create a very simple Spark application, `SimpleApp.java`:
0316
0317 {% highlight java %}
0318 /* SimpleApp.java */
0319 import org.apache.spark.sql.SparkSession;
0320 import org.apache.spark.sql.Dataset;
0321
0322 public class SimpleApp {
0323 public static void main(String[] args) {
0324 String logFile = "YOUR_SPARK_HOME/README.md"; // Should be some file on your system
0325 SparkSession spark = SparkSession.builder().appName("Simple Application").getOrCreate();
0326 Dataset<String> logData = spark.read().textFile(logFile).cache();
0327
0328 long numAs = logData.filter(s -> s.contains("a")).count();
0329 long numBs = logData.filter(s -> s.contains("b")).count();
0330
0331 System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs);
0332
0333 spark.stop();
0334 }
0335 }
0336 {% endhighlight %}
0337
0338 This program just counts the number of lines containing 'a' and the number containing 'b' in the
0339 Spark README. Note that you'll need to replace YOUR_SPARK_HOME with the location where Spark is
0340 installed. Unlike the earlier examples with the Spark shell, which initializes its own SparkSession,
0341 we initialize a SparkSession as part of the program.
0342
0343 To build the program, we also write a Maven `pom.xml` file that lists Spark as a dependency.
0344 Note that Spark artifacts are tagged with a Scala version.
0345
0346 {% highlight xml %}
0347 <project>
0348 <groupId>edu.berkeley</groupId>
0349 <artifactId>simple-project</artifactId>
0350 <modelVersion>4.0.0</modelVersion>
0351 <name>Simple Project</name>
0352 <packaging>jar</packaging>
0353 <version>1.0</version>
0354 <dependencies>
0355 <dependency> <!-- Spark dependency -->
0356 <groupId>org.apache.spark</groupId>
0357 <artifactId>spark-sql_{{site.SCALA_BINARY_VERSION}}</artifactId>
0358 <version>{{site.SPARK_VERSION}}</version>
0359 <scope>provided</scope>
0360 </dependency>
0361 </dependencies>
0362 </project>
0363 {% endhighlight %}
0364
0365 We lay out these files according to the canonical Maven directory structure:
0366 {% highlight bash %}
0367 $ find .
0368 ./pom.xml
0369 ./src
0370 ./src/main
0371 ./src/main/java
0372 ./src/main/java/SimpleApp.java
0373 {% endhighlight %}
0374
0375 Now, we can package the application using Maven and execute it with `./bin/spark-submit`.
0376
0377 {% highlight bash %}
0378 # Package a JAR containing your application
0379 $ mvn package
0380 ...
0381 [INFO] Building jar: {..}/{..}/target/simple-project-1.0.jar
0382
0383 # Use spark-submit to run your application
0384 $ YOUR_SPARK_HOME/bin/spark-submit \
0385 --class "SimpleApp" \
0386 --master local[4] \
0387 target/simple-project-1.0.jar
0388 ...
0389 Lines with a: 46, Lines with b: 23
0390 {% endhighlight %}
0391
0392 </div>
0393 <div data-lang="python" markdown="1">
0394
0395 Now we will show how to write an application using the Python API (PySpark).
0396
0397
0398 If you are building a packaged PySpark application or library you can add it to your setup.py file as:
0399
0400 {% highlight python %}
0401 install_requires=[
0402 'pyspark=={site.SPARK_VERSION}'
0403 ]
0404 {% endhighlight %}
0405
0406
0407 As an example, we'll create a simple Spark application, `SimpleApp.py`:
0408
0409 {% highlight python %}
0410 """SimpleApp.py"""
0411 from pyspark.sql import SparkSession
0412
0413 logFile = "YOUR_SPARK_HOME/README.md" # Should be some file on your system
0414 spark = SparkSession.builder.appName("SimpleApp").getOrCreate()
0415 logData = spark.read.text(logFile).cache()
0416
0417 numAs = logData.filter(logData.value.contains('a')).count()
0418 numBs = logData.filter(logData.value.contains('b')).count()
0419
0420 print("Lines with a: %i, lines with b: %i" % (numAs, numBs))
0421
0422 spark.stop()
0423 {% endhighlight %}
0424
0425
0426 This program just counts the number of lines containing 'a' and the number containing 'b' in a
0427 text file.
0428 Note that you'll need to replace YOUR_SPARK_HOME with the location where Spark is installed.
0429 As with the Scala and Java examples, we use a SparkSession to create Datasets.
0430 For applications that use custom classes or third-party libraries, we can also add code
0431 dependencies to `spark-submit` through its `--py-files` argument by packaging them into a
0432 .zip file (see `spark-submit --help` for details).
0433 `SimpleApp` is simple enough that we do not need to specify any code dependencies.
0434
0435 We can run this application using the `bin/spark-submit` script:
0436
0437 {% highlight bash %}
0438 # Use spark-submit to run your application
0439 $ YOUR_SPARK_HOME/bin/spark-submit \
0440 --master local[4] \
0441 SimpleApp.py
0442 ...
0443 Lines with a: 46, Lines with b: 23
0444 {% endhighlight %}
0445
0446 If you have PySpark pip installed into your environment (e.g., `pip install pyspark`), you can run your application with the regular Python interpreter or use the provided 'spark-submit' as you prefer.
0447
0448 {% highlight bash %}
0449 # Use the Python interpreter to run your application
0450 $ python SimpleApp.py
0451 ...
0452 Lines with a: 46, Lines with b: 23
0453 {% endhighlight %}
0454
0455 </div>
0456 </div>
0457
0458 # Where to Go from Here
0459 Congratulations on running your first Spark application!
0460
0461 * For an in-depth overview of the API, start with the [RDD programming guide](rdd-programming-guide.html) and the [SQL programming guide](sql-programming-guide.html), or see "Programming Guides" menu for other components.
0462 * For running applications on a cluster, head to the [deployment overview](cluster-overview.html).
0463 * Finally, Spark includes several samples in the `examples` directory
0464 ([Scala]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/examples),
0465 [Java]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/java/org/apache/spark/examples),
0466 [Python]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/python),
0467 [R]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/r)).
0468 You can run them as follows:
0469
0470 {% highlight bash %}
0471 # For Scala and Java, use run-example:
0472 ./bin/run-example SparkPi
0473
0474 # For Python examples, use spark-submit directly:
0475 ./bin/spark-submit examples/src/main/python/pi.py
0476
0477 # For R examples, use spark-submit directly:
0478 ./bin/spark-submit examples/src/main/r/dataframe.R
0479 {% endhighlight %}