Back to home page

OSCL-LXR

 
 

    


0001 ---
0002 layout: global
0003 title: Quick Start
0004 description: Quick start tutorial for Spark SPARK_VERSION_SHORT
0005 license: |
0006   Licensed to the Apache Software Foundation (ASF) under one or more
0007   contributor license agreements.  See the NOTICE file distributed with
0008   this work for additional information regarding copyright ownership.
0009   The ASF licenses this file to You under the Apache License, Version 2.0
0010   (the "License"); you may not use this file except in compliance with
0011   the License.  You may obtain a copy of the License at
0012  
0013      http://www.apache.org/licenses/LICENSE-2.0
0014  
0015   Unless required by applicable law or agreed to in writing, software
0016   distributed under the License is distributed on an "AS IS" BASIS,
0017   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0018   See the License for the specific language governing permissions and
0019   limitations under the License.
0020 ---
0021 
0022 * This will become a table of contents (this text will be scraped).
0023 {:toc}
0024 
0025 This tutorial provides a quick introduction to using Spark. We will first introduce the API through Spark's
0026 interactive shell (in Python or Scala),
0027 then show how to write applications in Java, Scala, and Python.
0028 
0029 To follow along with this guide, first, download a packaged release of Spark from the
0030 [Spark website](https://spark.apache.org/downloads.html). Since we won't be using HDFS,
0031 you can download a package for any version of Hadoop.
0032 
0033 Note that, before Spark 2.0, the main programming interface of Spark was the Resilient Distributed Dataset (RDD). After Spark 2.0, RDDs are replaced by Dataset, which is strongly-typed like an RDD, but with richer optimizations under the hood. The RDD interface is still supported, and you can get a more detailed reference at the [RDD programming guide](rdd-programming-guide.html). However, we highly recommend you to switch to use Dataset, which has better performance than RDD. See the [SQL programming guide](sql-programming-guide.html) to get more information about Dataset.
0034 
0035 # Security
0036 
0037 Security in Spark is OFF by default. This could mean you are vulnerable to attack by default.
0038 Please see [Spark Security](security.html) before running Spark.
0039 
0040 # Interactive Analysis with the Spark Shell
0041 
0042 ## Basics
0043 
0044 Spark's shell provides a simple way to learn the API, as well as a powerful tool to analyze data interactively.
0045 It is available in either Scala (which runs on the Java VM and is thus a good way to use existing Java libraries)
0046 or Python. Start it by running the following in the Spark directory:
0047 
0048 <div class="codetabs">
0049 <div data-lang="scala" markdown="1">
0050 
0051     ./bin/spark-shell
0052 
0053 Spark's primary abstraction is a distributed collection of items called a Dataset. Datasets can be created from Hadoop InputFormats (such as HDFS files) or by transforming other Datasets. Let's make a new Dataset from the text of the README file in the Spark source directory:
0054 
0055 {% highlight scala %}
0056 scala> val textFile = spark.read.textFile("README.md")
0057 textFile: org.apache.spark.sql.Dataset[String] = [value: string]
0058 {% endhighlight %}
0059 
0060 You can get values from Dataset directly, by calling some actions, or transform the Dataset to get a new one. For more details, please read the _[API doc](api/scala/org/apache/spark/sql/Dataset.html)_.
0061 
0062 {% highlight scala %}
0063 scala> textFile.count() // Number of items in this Dataset
0064 res0: Long = 126 // May be different from yours as README.md will change over time, similar to other outputs
0065 
0066 scala> textFile.first() // First item in this Dataset
0067 res1: String = # Apache Spark
0068 {% endhighlight %}
0069 
0070 Now let's transform this Dataset into a new one. We call `filter` to return a new Dataset with a subset of the items in the file.
0071 
0072 {% highlight scala %}
0073 scala> val linesWithSpark = textFile.filter(line => line.contains("Spark"))
0074 linesWithSpark: org.apache.spark.sql.Dataset[String] = [value: string]
0075 {% endhighlight %}
0076 
0077 We can chain together transformations and actions:
0078 
0079 {% highlight scala %}
0080 scala> textFile.filter(line => line.contains("Spark")).count() // How many lines contain "Spark"?
0081 res3: Long = 15
0082 {% endhighlight %}
0083 
0084 </div>
0085 <div data-lang="python" markdown="1">
0086 
0087     ./bin/pyspark
0088 
0089 
0090 Or if PySpark is installed with pip in your current environment:
0091 
0092     pyspark
0093 
0094 Spark's primary abstraction is a distributed collection of items called a Dataset. Datasets can be created from Hadoop InputFormats (such as HDFS files) or by transforming other Datasets. Due to Python's dynamic nature, we don't need the Dataset to be strongly-typed in Python. As a result, all Datasets in Python are Dataset[Row], and we call it `DataFrame` to be consistent with the data frame concept in Pandas and R. Let's make a new DataFrame from the text of the README file in the Spark source directory:
0095 
0096 {% highlight python %}
0097 >>> textFile = spark.read.text("README.md")
0098 {% endhighlight %}
0099 
0100 You can get values from DataFrame directly, by calling some actions, or transform the DataFrame to get a new one. For more details, please read the _[API doc](api/python/index.html#pyspark.sql.DataFrame)_.
0101 
0102 {% highlight python %}
0103 >>> textFile.count()  # Number of rows in this DataFrame
0104 126
0105 
0106 >>> textFile.first()  # First row in this DataFrame
0107 Row(value=u'# Apache Spark')
0108 {% endhighlight %}
0109 
0110 Now let's transform this DataFrame to a new one. We call `filter` to return a new DataFrame with a subset of the lines in the file.
0111 
0112 {% highlight python %}
0113 >>> linesWithSpark = textFile.filter(textFile.value.contains("Spark"))
0114 {% endhighlight %}
0115 
0116 We can chain together transformations and actions:
0117 
0118 {% highlight python %}
0119 >>> textFile.filter(textFile.value.contains("Spark")).count()  # How many lines contain "Spark"?
0120 15
0121 {% endhighlight %}
0122 
0123 </div>
0124 </div>
0125 
0126 
0127 ## More on Dataset Operations
0128 Dataset actions and transformations can be used for more complex computations. Let's say we want to find the line with the most words:
0129 
0130 <div class="codetabs">
0131 <div data-lang="scala" markdown="1">
0132 
0133 {% highlight scala %}
0134 scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
0135 res4: Long = 15
0136 {% endhighlight %}
0137 
0138 This first maps a line to an integer value, creating a new Dataset. `reduce` is called on that Dataset to find the largest word count. The arguments to `map` and `reduce` are Scala function literals (closures), and can use any language feature or Scala/Java library. For example, we can easily call functions declared elsewhere. We'll use `Math.max()` function to make this code easier to understand:
0139 
0140 {% highlight scala %}
0141 scala> import java.lang.Math
0142 import java.lang.Math
0143 
0144 scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))
0145 res5: Int = 15
0146 {% endhighlight %}
0147 
0148 One common data flow pattern is MapReduce, as popularized by Hadoop. Spark can implement MapReduce flows easily:
0149 
0150 {% highlight scala %}
0151 scala> val wordCounts = textFile.flatMap(line => line.split(" ")).groupByKey(identity).count()
0152 wordCounts: org.apache.spark.sql.Dataset[(String, Long)] = [value: string, count(1): bigint]
0153 {% endhighlight %}
0154 
0155 Here, we call `flatMap` to transform a Dataset of lines to a Dataset of words, and then combine `groupByKey` and `count` to compute the per-word counts in the file as a Dataset of (String, Long) pairs. To collect the word counts in our shell, we can call `collect`:
0156 
0157 {% highlight scala %}
0158 scala> wordCounts.collect()
0159 res6: Array[(String, Int)] = Array((means,1), (under,2), (this,3), (Because,1), (Python,2), (agree,1), (cluster.,1), ...)
0160 {% endhighlight %}
0161 
0162 </div>
0163 <div data-lang="python" markdown="1">
0164 
0165 {% highlight python %}
0166 >>> from pyspark.sql.functions import *
0167 >>> textFile.select(size(split(textFile.value, "\s+")).name("numWords")).agg(max(col("numWords"))).collect()
0168 [Row(max(numWords)=15)]
0169 {% endhighlight %}
0170 
0171 This first maps a line to an integer value and aliases it as "numWords", creating a new DataFrame. `agg` is called on that DataFrame to find the largest word count. The arguments to `select` and `agg` are both _[Column](api/python/index.html#pyspark.sql.Column)_, we can use `df.colName` to get a column from a DataFrame. We can also import pyspark.sql.functions, which provides a lot of convenient functions to build a new Column from an old one.
0172 
0173 One common data flow pattern is MapReduce, as popularized by Hadoop. Spark can implement MapReduce flows easily:
0174 
0175 {% highlight python %}
0176 >>> wordCounts = textFile.select(explode(split(textFile.value, "\s+")).alias("word")).groupBy("word").count()
0177 {% endhighlight %}
0178 
0179 Here, we use the `explode` function in `select`, to transform a Dataset of lines to a Dataset of words, and then combine `groupBy` and `count` to compute the per-word counts in the file as a DataFrame of 2 columns: "word" and "count". To collect the word counts in our shell, we can call `collect`:
0180 
0181 {% highlight python %}
0182 >>> wordCounts.collect()
0183 [Row(word=u'online', count=1), Row(word=u'graphs', count=1), ...]
0184 {% endhighlight %}
0185 
0186 </div>
0187 </div>
0188 
0189 ## Caching
0190 Spark also supports pulling data sets into a cluster-wide in-memory cache. This is very useful when data is accessed repeatedly, such as when querying a small "hot" dataset or when running an iterative algorithm like PageRank. As a simple example, let's mark our `linesWithSpark` dataset to be cached:
0191 
0192 <div class="codetabs">
0193 <div data-lang="scala" markdown="1">
0194 
0195 {% highlight scala %}
0196 scala> linesWithSpark.cache()
0197 res7: linesWithSpark.type = [value: string]
0198 
0199 scala> linesWithSpark.count()
0200 res8: Long = 15
0201 
0202 scala> linesWithSpark.count()
0203 res9: Long = 15
0204 {% endhighlight %}
0205 
0206 It may seem silly to use Spark to explore and cache a 100-line text file. The interesting part is
0207 that these same functions can be used on very large data sets, even when they are striped across
0208 tens or hundreds of nodes. You can also do this interactively by connecting `bin/spark-shell` to
0209 a cluster, as described in the [RDD programming guide](rdd-programming-guide.html#using-the-shell).
0210 
0211 </div>
0212 <div data-lang="python" markdown="1">
0213 
0214 {% highlight python %}
0215 >>> linesWithSpark.cache()
0216 
0217 >>> linesWithSpark.count()
0218 15
0219 
0220 >>> linesWithSpark.count()
0221 15
0222 {% endhighlight %}
0223 
0224 It may seem silly to use Spark to explore and cache a 100-line text file. The interesting part is
0225 that these same functions can be used on very large data sets, even when they are striped across
0226 tens or hundreds of nodes. You can also do this interactively by connecting `bin/pyspark` to
0227 a cluster, as described in the [RDD programming guide](rdd-programming-guide.html#using-the-shell).
0228 
0229 </div>
0230 </div>
0231 
0232 # Self-Contained Applications
0233 Suppose we wish to write a self-contained application using the Spark API. We will walk through a
0234 simple application in Scala (with sbt), Java (with Maven), and Python (pip).
0235 
0236 <div class="codetabs">
0237 <div data-lang="scala" markdown="1">
0238 
0239 We'll create a very simple Spark application in Scala--so simple, in fact, that it's
0240 named `SimpleApp.scala`:
0241 
0242 {% highlight scala %}
0243 /* SimpleApp.scala */
0244 import org.apache.spark.sql.SparkSession
0245 
0246 object SimpleApp {
0247   def main(args: Array[String]) {
0248     val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system
0249     val spark = SparkSession.builder.appName("Simple Application").getOrCreate()
0250     val logData = spark.read.textFile(logFile).cache()
0251     val numAs = logData.filter(line => line.contains("a")).count()
0252     val numBs = logData.filter(line => line.contains("b")).count()
0253     println(s"Lines with a: $numAs, Lines with b: $numBs")
0254     spark.stop()
0255   }
0256 }
0257 {% endhighlight %}
0258 
0259 Note that applications should define a `main()` method instead of extending `scala.App`.
0260 Subclasses of `scala.App` may not work correctly.
0261 
0262 This program just counts the number of lines containing 'a' and the number containing 'b' in the
0263 Spark README. Note that you'll need to replace YOUR_SPARK_HOME with the location where Spark is
0264 installed. Unlike the earlier examples with the Spark shell, which initializes its own SparkSession,
0265 we initialize a SparkSession as part of the program.
0266 
0267 We call `SparkSession.builder` to construct a `SparkSession`, then set the application name, and finally call `getOrCreate` to get the `SparkSession` instance.
0268 
0269 Our application depends on the Spark API, so we'll also include an sbt configuration file,
0270 `build.sbt`, which explains that Spark is a dependency. This file also adds a repository that
0271 Spark depends on:
0272 
0273 {% highlight scala %}
0274 name := "Simple Project"
0275 
0276 version := "1.0"
0277 
0278 scalaVersion := "{{site.SCALA_VERSION}}"
0279 
0280 libraryDependencies += "org.apache.spark" %% "spark-sql" % "{{site.SPARK_VERSION}}"
0281 {% endhighlight %}
0282 
0283 For sbt to work correctly, we'll need to layout `SimpleApp.scala` and `build.sbt`
0284 according to the typical directory structure. Once that is in place, we can create a JAR package
0285 containing the application's code, then use the `spark-submit` script to run our program.
0286 
0287 {% highlight bash %}
0288 # Your directory layout should look like this
0289 $ find .
0290 .
0291 ./build.sbt
0292 ./src
0293 ./src/main
0294 ./src/main/scala
0295 ./src/main/scala/SimpleApp.scala
0296 
0297 # Package a jar containing your application
0298 $ sbt package
0299 ...
0300 [info] Packaging {..}/{..}/target/scala-{{site.SCALA_BINARY_VERSION}}/simple-project_{{site.SCALA_BINARY_VERSION}}-1.0.jar
0301 
0302 # Use spark-submit to run your application
0303 $ YOUR_SPARK_HOME/bin/spark-submit \
0304   --class "SimpleApp" \
0305   --master local[4] \
0306   target/scala-{{site.SCALA_BINARY_VERSION}}/simple-project_{{site.SCALA_BINARY_VERSION}}-1.0.jar
0307 ...
0308 Lines with a: 46, Lines with b: 23
0309 {% endhighlight %}
0310 
0311 </div>
0312 <div data-lang="java" markdown="1">
0313 This example will use Maven to compile an application JAR, but any similar build system will work.
0314 
0315 We'll create a very simple Spark application, `SimpleApp.java`:
0316 
0317 {% highlight java %}
0318 /* SimpleApp.java */
0319 import org.apache.spark.sql.SparkSession;
0320 import org.apache.spark.sql.Dataset;
0321 
0322 public class SimpleApp {
0323   public static void main(String[] args) {
0324     String logFile = "YOUR_SPARK_HOME/README.md"; // Should be some file on your system
0325     SparkSession spark = SparkSession.builder().appName("Simple Application").getOrCreate();
0326     Dataset<String> logData = spark.read().textFile(logFile).cache();
0327 
0328     long numAs = logData.filter(s -> s.contains("a")).count();
0329     long numBs = logData.filter(s -> s.contains("b")).count();
0330 
0331     System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs);
0332 
0333     spark.stop();
0334   }
0335 }
0336 {% endhighlight %}
0337 
0338 This program just counts the number of lines containing 'a' and the number containing 'b' in the
0339 Spark README. Note that you'll need to replace YOUR_SPARK_HOME with the location where Spark is
0340 installed. Unlike the earlier examples with the Spark shell, which initializes its own SparkSession,
0341 we initialize a SparkSession as part of the program.
0342 
0343 To build the program, we also write a Maven `pom.xml` file that lists Spark as a dependency.
0344 Note that Spark artifacts are tagged with a Scala version.
0345 
0346 {% highlight xml %}
0347 <project>
0348   <groupId>edu.berkeley</groupId>
0349   <artifactId>simple-project</artifactId>
0350   <modelVersion>4.0.0</modelVersion>
0351   <name>Simple Project</name>
0352   <packaging>jar</packaging>
0353   <version>1.0</version>
0354   <dependencies>
0355     <dependency> <!-- Spark dependency -->
0356       <groupId>org.apache.spark</groupId>
0357       <artifactId>spark-sql_{{site.SCALA_BINARY_VERSION}}</artifactId>
0358       <version>{{site.SPARK_VERSION}}</version>
0359       <scope>provided</scope>
0360     </dependency>
0361   </dependencies>
0362 </project>
0363 {% endhighlight %}
0364 
0365 We lay out these files according to the canonical Maven directory structure:
0366 {% highlight bash %}
0367 $ find .
0368 ./pom.xml
0369 ./src
0370 ./src/main
0371 ./src/main/java
0372 ./src/main/java/SimpleApp.java
0373 {% endhighlight %}
0374 
0375 Now, we can package the application using Maven and execute it with `./bin/spark-submit`.
0376 
0377 {% highlight bash %}
0378 # Package a JAR containing your application
0379 $ mvn package
0380 ...
0381 [INFO] Building jar: {..}/{..}/target/simple-project-1.0.jar
0382 
0383 # Use spark-submit to run your application
0384 $ YOUR_SPARK_HOME/bin/spark-submit \
0385   --class "SimpleApp" \
0386   --master local[4] \
0387   target/simple-project-1.0.jar
0388 ...
0389 Lines with a: 46, Lines with b: 23
0390 {% endhighlight %}
0391 
0392 </div>
0393 <div data-lang="python" markdown="1">
0394 
0395 Now we will show how to write an application using the Python API (PySpark).
0396 
0397 
0398 If you are building a packaged PySpark application or library you can add it to your setup.py file as:
0399 
0400 {% highlight python %}
0401     install_requires=[
0402         'pyspark=={site.SPARK_VERSION}'
0403     ]
0404 {% endhighlight %}
0405 
0406 
0407 As an example, we'll create a simple Spark application, `SimpleApp.py`:
0408 
0409 {% highlight python %}
0410 """SimpleApp.py"""
0411 from pyspark.sql import SparkSession
0412 
0413 logFile = "YOUR_SPARK_HOME/README.md"  # Should be some file on your system
0414 spark = SparkSession.builder.appName("SimpleApp").getOrCreate()
0415 logData = spark.read.text(logFile).cache()
0416 
0417 numAs = logData.filter(logData.value.contains('a')).count()
0418 numBs = logData.filter(logData.value.contains('b')).count()
0419 
0420 print("Lines with a: %i, lines with b: %i" % (numAs, numBs))
0421 
0422 spark.stop()
0423 {% endhighlight %}
0424 
0425 
0426 This program just counts the number of lines containing 'a' and the number containing 'b' in a
0427 text file.
0428 Note that you'll need to replace YOUR_SPARK_HOME with the location where Spark is installed.
0429 As with the Scala and Java examples, we use a SparkSession to create Datasets.
0430 For applications that use custom classes or third-party libraries, we can also add code
0431 dependencies to `spark-submit` through its `--py-files` argument by packaging them into a
0432 .zip file (see `spark-submit --help` for details).
0433 `SimpleApp` is simple enough that we do not need to specify any code dependencies.
0434 
0435 We can run this application using the `bin/spark-submit` script:
0436 
0437 {% highlight bash %}
0438 # Use spark-submit to run your application
0439 $ YOUR_SPARK_HOME/bin/spark-submit \
0440   --master local[4] \
0441   SimpleApp.py
0442 ...
0443 Lines with a: 46, Lines with b: 23
0444 {% endhighlight %}
0445 
0446 If you have PySpark pip installed into your environment (e.g., `pip install pyspark`), you can run your application with the regular Python interpreter or use the provided 'spark-submit' as you prefer.
0447 
0448 {% highlight bash %}
0449 # Use the Python interpreter to run your application
0450 $ python SimpleApp.py
0451 ...
0452 Lines with a: 46, Lines with b: 23
0453 {% endhighlight %}
0454 
0455 </div>
0456 </div>
0457 
0458 # Where to Go from Here
0459 Congratulations on running your first Spark application!
0460 
0461 * For an in-depth overview of the API, start with the [RDD programming guide](rdd-programming-guide.html) and the [SQL programming guide](sql-programming-guide.html), or see "Programming Guides" menu for other components.
0462 * For running applications on a cluster, head to the [deployment overview](cluster-overview.html).
0463 * Finally, Spark includes several samples in the `examples` directory
0464 ([Scala]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/org/apache/spark/examples),
0465  [Java]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/java/org/apache/spark/examples),
0466  [Python]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/python),
0467  [R]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/r)).
0468 You can run them as follows:
0469 
0470 {% highlight bash %}
0471 # For Scala and Java, use run-example:
0472 ./bin/run-example SparkPi
0473 
0474 # For Python examples, use spark-submit directly:
0475 ./bin/spark-submit examples/src/main/python/pi.py
0476 
0477 # For R examples, use spark-submit directly:
0478 ./bin/spark-submit examples/src/main/r/dataframe.R
0479 {% endhighlight %}