0001 ---
0002 layout: global
0003 displayTitle: SparkR (R on Spark)
0004 title: SparkR (R on Spark)
0005 license: |
0006 Licensed to the Apache Software Foundation (ASF) under one or more
0007 contributor license agreements. See the NOTICE file distributed with
0008 this work for additional information regarding copyright ownership.
0009 The ASF licenses this file to You under the Apache License, Version 2.0
0010 (the "License"); you may not use this file except in compliance with
0011 the License. You may obtain a copy of the License at
0012
0013 http://www.apache.org/licenses/LICENSE-2.0
0014
0015 Unless required by applicable law or agreed to in writing, software
0016 distributed under the License is distributed on an "AS IS" BASIS,
0017 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0018 See the License for the specific language governing permissions and
0019 limitations under the License.
0020 ---
0021
0022 * This will become a table of contents (this text will be scraped).
0023 {:toc}
0024
0025 # Overview
0026 SparkR is an R package that provides a light-weight frontend to use Apache Spark from R.
0027 In Spark {{site.SPARK_VERSION}}, SparkR provides a distributed data frame implementation that
0028 supports operations like selection, filtering, aggregation etc. (similar to R data frames,
0029 [dplyr](https://github.com/hadley/dplyr)) but on large datasets. SparkR also supports distributed
0030 machine learning using MLlib.
0031
0032 # SparkDataFrame
0033
0034 A SparkDataFrame is a distributed collection of data organized into named columns. It is conceptually
0035 equivalent to a table in a relational database or a data frame in R, but with richer
0036 optimizations under the hood. SparkDataFrames can be constructed from a wide array of sources such as:
0037 structured data files, tables in Hive, external databases, or existing local R data frames.
0038
0039 All of the examples on this page use sample data included in R or the Spark distribution and can be run using the `./bin/sparkR` shell.
0040
0041 ## Starting Up: SparkSession
0042
0043 <div data-lang="r" markdown="1">
0044 The entry point into SparkR is the `SparkSession` which connects your R program to a Spark cluster.
0045 You can create a `SparkSession` using `sparkR.session` and pass in options such as the application name, any spark packages depended on, etc. Further, you can also work with SparkDataFrames via `SparkSession`. If you are working from the `sparkR` shell, the `SparkSession` should already be created for you, and you would not need to call `sparkR.session`.
0046
0047 <div data-lang="r" markdown="1">
0048 {% highlight r %}
0049 sparkR.session()
0050 {% endhighlight %}
0051 </div>
0052
0053 ## Starting Up from RStudio
0054
0055 You can also start SparkR from RStudio. You can connect your R program to a Spark cluster from
0056 RStudio, R shell, Rscript or other R IDEs. To start, make sure SPARK_HOME is set in environment
0057 (you can check [Sys.getenv](https://stat.ethz.ch/R-manual/R-devel/library/base/html/Sys.getenv.html)),
0058 load the SparkR package, and call `sparkR.session` as below. It will check for the Spark installation, and, if not found, it will be downloaded and cached automatically. Alternatively, you can also run `install.spark` manually.
0059
0060 In addition to calling `sparkR.session`,
0061 you could also specify certain Spark driver properties. Normally these
0062 [Application properties](configuration.html#application-properties) and
0063 [Runtime Environment](configuration.html#runtime-environment) cannot be set programmatically, as the
0064 driver JVM process would have been started, in this case SparkR takes care of this for you. To set
0065 them, pass them as you would other configuration properties in the `sparkConfig` argument to
0066 `sparkR.session()`.
0067
0068 <div data-lang="r" markdown="1">
0069 {% highlight r %}
0070 if (nchar(Sys.getenv("SPARK_HOME")) < 1) {
0071 Sys.setenv(SPARK_HOME = "/home/spark")
0072 }
0073 library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))
0074 sparkR.session(master = "local[*]", sparkConfig = list(spark.driver.memory = "2g"))
0075 {% endhighlight %}
0076 </div>
0077
0078 The following Spark driver properties can be set in `sparkConfig` with `sparkR.session` from RStudio:
0079
0080 <table class="table">
0081 <tr><th>Property Name</th><th>Property group</th><th><code>spark-submit</code> equivalent</th></tr>
0082 <tr>
0083 <td><code>spark.master</code></td>
0084 <td>Application Properties</td>
0085 <td><code>--master</code></td>
0086 </tr>
0087 <tr>
0088 <td><code>spark.kerberos.keytab</code></td>
0089 <td>Application Properties</td>
0090 <td><code>--keytab</code></td>
0091 </tr>
0092 <tr>
0093 <td><code>spark.kerberos.principal</code></td>
0094 <td>Application Properties</td>
0095 <td><code>--principal</code></td>
0096 </tr>
0097 <tr>
0098 <td><code>spark.driver.memory</code></td>
0099 <td>Application Properties</td>
0100 <td><code>--driver-memory</code></td>
0101 </tr>
0102 <tr>
0103 <td><code>spark.driver.extraClassPath</code></td>
0104 <td>Runtime Environment</td>
0105 <td><code>--driver-class-path</code></td>
0106 </tr>
0107 <tr>
0108 <td><code>spark.driver.extraJavaOptions</code></td>
0109 <td>Runtime Environment</td>
0110 <td><code>--driver-java-options</code></td>
0111 </tr>
0112 <tr>
0113 <td><code>spark.driver.extraLibraryPath</code></td>
0114 <td>Runtime Environment</td>
0115 <td><code>--driver-library-path</code></td>
0116 </tr>
0117 </table>
0118
0119 </div>
0120
0121 ## Creating SparkDataFrames
0122 With a `SparkSession`, applications can create `SparkDataFrame`s from a local R data frame, from a [Hive table](sql-data-sources-hive-tables.html), or from other [data sources](sql-data-sources.html).
0123
0124 ### From local data frames
0125 The simplest way to create a data frame is to convert a local R data frame into a SparkDataFrame. Specifically, we can use `as.DataFrame` or `createDataFrame` and pass in the local R data frame to create a SparkDataFrame. As an example, the following creates a `SparkDataFrame` based using the `faithful` dataset from R.
0126
0127 <div data-lang="r" markdown="1">
0128 {% highlight r %}
0129 df <- as.DataFrame(faithful)
0130
0131 # Displays the first part of the SparkDataFrame
0132 head(df)
0133 ## eruptions waiting
0134 ##1 3.600 79
0135 ##2 1.800 54
0136 ##3 3.333 74
0137
0138 {% endhighlight %}
0139 </div>
0140
0141 ### From Data Sources
0142
0143 SparkR supports operating on a variety of data sources through the `SparkDataFrame` interface. This section describes the general methods for loading and saving data using Data Sources. You can check the Spark SQL programming guide for more [specific options](sql-data-sources-load-save-functions.html#manually-specifying-options) that are available for the built-in data sources.
0144
0145 The general method for creating SparkDataFrames from data sources is `read.df`. This method takes in the path for the file to load and the type of data source, and the currently active SparkSession will be used automatically.
0146 SparkR supports reading JSON, CSV and Parquet files natively, and through packages available from sources like [Third Party Projects](https://spark.apache.org/third-party-projects.html), you can find data source connectors for popular file formats like Avro. These packages can either be added by
0147 specifying `--packages` with `spark-submit` or `sparkR` commands, or if initializing SparkSession with `sparkPackages` parameter when in an interactive R shell or from RStudio.
0148
0149 <div data-lang="r" markdown="1">
0150 {% highlight r %}
0151 sparkR.session(sparkPackages = "org.apache.spark:spark-avro_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION}}")
0152 {% endhighlight %}
0153 </div>
0154
0155 We can see how to use data sources using an example JSON input file. Note that the file that is used here is _not_ a typical JSON file. Each line in the file must contain a separate, self-contained valid JSON object. For more information, please see [JSON Lines text format, also called newline-delimited JSON](http://jsonlines.org/). As a consequence, a regular multi-line JSON file will most often fail.
0156
0157 <div data-lang="r" markdown="1">
0158 {% highlight r %}
0159 people <- read.df("./examples/src/main/resources/people.json", "json")
0160 head(people)
0161 ## age name
0162 ##1 NA Michael
0163 ##2 30 Andy
0164 ##3 19 Justin
0165
0166 # SparkR automatically infers the schema from the JSON file
0167 printSchema(people)
0168 # root
0169 # |-- age: long (nullable = true)
0170 # |-- name: string (nullable = true)
0171
0172 # Similarly, multiple files can be read with read.json
0173 people <- read.json(c("./examples/src/main/resources/people.json", "./examples/src/main/resources/people2.json"))
0174
0175 {% endhighlight %}
0176 </div>
0177
0178 The data sources API natively supports CSV formatted input files. For more information please refer to SparkR [read.df](api/R/read.df.html) API documentation.
0179
0180 <div data-lang="r" markdown="1">
0181 {% highlight r %}
0182 df <- read.df(csvPath, "csv", header = "true", inferSchema = "true", na.strings = "NA")
0183
0184 {% endhighlight %}
0185 </div>
0186
0187 The data sources API can also be used to save out SparkDataFrames into multiple file formats. For example, we can save the SparkDataFrame from the previous example
0188 to a Parquet file using `write.df`.
0189
0190 <div data-lang="r" markdown="1">
0191 {% highlight r %}
0192 write.df(people, path = "people.parquet", source = "parquet", mode = "overwrite")
0193 {% endhighlight %}
0194 </div>
0195
0196 ### From Hive tables
0197
0198 You can also create SparkDataFrames from Hive tables. To do this we will need to create a SparkSession with Hive support which can access tables in the Hive MetaStore. Note that Spark should have been built with [Hive support](building-spark.html#building-with-hive-and-jdbc-support) and more details can be found in the [SQL programming guide](sql-getting-started.html#starting-point-sparksession). In SparkR, by default it will attempt to create a SparkSession with Hive support enabled (`enableHiveSupport = TRUE`).
0199
0200 <div data-lang="r" markdown="1">
0201 {% highlight r %}
0202 sparkR.session()
0203
0204 sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
0205 sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
0206
0207 # Queries can be expressed in HiveQL.
0208 results <- sql("FROM src SELECT key, value")
0209
0210 # results is now a SparkDataFrame
0211 head(results)
0212 ## key value
0213 ## 1 238 val_238
0214 ## 2 86 val_86
0215 ## 3 311 val_311
0216
0217 {% endhighlight %}
0218 </div>
0219
0220 ## SparkDataFrame Operations
0221
0222 SparkDataFrames support a number of functions to do structured data processing.
0223 Here we include some basic examples and a complete list can be found in the [API](api/R/index.html) docs:
0224
0225 ### Selecting rows, columns
0226
0227 <div data-lang="r" markdown="1">
0228 {% highlight r %}
0229 # Create the SparkDataFrame
0230 df <- as.DataFrame(faithful)
0231
0232 # Get basic information about the SparkDataFrame
0233 df
0234 ## SparkDataFrame[eruptions:double, waiting:double]
0235
0236 # Select only the "eruptions" column
0237 head(select(df, df$eruptions))
0238 ## eruptions
0239 ##1 3.600
0240 ##2 1.800
0241 ##3 3.333
0242
0243 # You can also pass in column name as strings
0244 head(select(df, "eruptions"))
0245
0246 # Filter the SparkDataFrame to only retain rows with wait times shorter than 50 mins
0247 head(filter(df, df$waiting < 50))
0248 ## eruptions waiting
0249 ##1 1.750 47
0250 ##2 1.750 47
0251 ##3 1.867 48
0252
0253 {% endhighlight %}
0254
0255 </div>
0256
0257 ### Grouping, Aggregation
0258
0259 SparkR data frames support a number of commonly used functions to aggregate data after grouping. For example, we can compute a histogram of the `waiting` time in the `faithful` dataset as shown below
0260
0261 <div data-lang="r" markdown="1">
0262 {% highlight r %}
0263
0264 # We use the `n` operator to count the number of times each waiting time appears
0265 head(summarize(groupBy(df, df$waiting), count = n(df$waiting)))
0266 ## waiting count
0267 ##1 70 4
0268 ##2 67 1
0269 ##3 69 2
0270
0271 # We can also sort the output from the aggregation to get the most common waiting times
0272 waiting_counts <- summarize(groupBy(df, df$waiting), count = n(df$waiting))
0273 head(arrange(waiting_counts, desc(waiting_counts$count)))
0274 ## waiting count
0275 ##1 78 15
0276 ##2 83 14
0277 ##3 81 13
0278
0279 {% endhighlight %}
0280 </div>
0281
0282 In addition to standard aggregations, SparkR supports [OLAP cube](https://en.wikipedia.org/wiki/OLAP_cube) operators `cube`:
0283
0284 <div data-lang="r" markdown="1">
0285 {% highlight r %}
0286 head(agg(cube(df, "cyl", "disp", "gear"), avg(df$mpg)))
0287 ## cyl disp gear avg(mpg)
0288 ##1 NA 140.8 4 22.8
0289 ##2 4 75.7 4 30.4
0290 ##3 8 400.0 3 19.2
0291 ##4 8 318.0 3 15.5
0292 ##5 NA 351.0 NA 15.8
0293 ##6 NA 275.8 NA 16.3
0294 {% endhighlight %}
0295 </div>
0296
0297 and `rollup`:
0298
0299 <div data-lang="r" markdown="1">
0300 {% highlight r %}
0301 head(agg(rollup(df, "cyl", "disp", "gear"), avg(df$mpg)))
0302 ## cyl disp gear avg(mpg)
0303 ##1 4 75.7 4 30.4
0304 ##2 8 400.0 3 19.2
0305 ##3 8 318.0 3 15.5
0306 ##4 4 78.7 NA 32.4
0307 ##5 8 304.0 3 15.2
0308 ##6 4 79.0 NA 27.3
0309 {% endhighlight %}
0310 </div>
0311
0312 ### Operating on Columns
0313
0314 SparkR also provides a number of functions that can be directly applied to columns for data processing and during aggregation. The example below shows the use of basic arithmetic functions.
0315
0316 <div data-lang="r" markdown="1">
0317 {% highlight r %}
0318
0319 # Convert waiting time from hours to seconds.
0320 # Note that we can assign this to a new column in the same SparkDataFrame
0321 df$waiting_secs <- df$waiting * 60
0322 head(df)
0323 ## eruptions waiting waiting_secs
0324 ##1 3.600 79 4740
0325 ##2 1.800 54 3240
0326 ##3 3.333 74 4440
0327
0328 {% endhighlight %}
0329 </div>
0330
0331 ### Applying User-Defined Function
0332 In SparkR, we support several kinds of User-Defined Functions:
0333
0334 #### Run a given function on a large dataset using `dapply` or `dapplyCollect`
0335
0336 ##### dapply
0337 Apply a function to each partition of a `SparkDataFrame`. The function to be applied to each partition of the `SparkDataFrame`
0338 and should have only one parameter, to which a `data.frame` corresponds to each partition will be passed. The output of function should be a `data.frame`. Schema specifies the row format of the resulting a `SparkDataFrame`. It must match to [data types](#data-type-mapping-between-r-and-spark) of returned value.
0339
0340 <div data-lang="r" markdown="1">
0341 {% highlight r %}
0342
0343 # Convert waiting time from hours to seconds.
0344 # Note that we can apply UDF to DataFrame.
0345 schema <- structType(structField("eruptions", "double"), structField("waiting", "double"),
0346 structField("waiting_secs", "double"))
0347 df1 <- dapply(df, function(x) { x <- cbind(x, x$waiting * 60) }, schema)
0348 head(collect(df1))
0349 ## eruptions waiting waiting_secs
0350 ##1 3.600 79 4740
0351 ##2 1.800 54 3240
0352 ##3 3.333 74 4440
0353 ##4 2.283 62 3720
0354 ##5 4.533 85 5100
0355 ##6 2.883 55 3300
0356 {% endhighlight %}
0357 </div>
0358
0359 ##### dapplyCollect
0360 Like `dapply`, apply a function to each partition of a `SparkDataFrame` and collect the result back. The output of function
0361 should be a `data.frame`. But, Schema is not required to be passed. Note that `dapplyCollect` can fail if the output of UDF run on all the partition cannot be pulled to the driver and fit in driver memory.
0362
0363 <div data-lang="r" markdown="1">
0364 {% highlight r %}
0365
0366 # Convert waiting time from hours to seconds.
0367 # Note that we can apply UDF to DataFrame and return a R's data.frame
0368 ldf <- dapplyCollect(
0369 df,
0370 function(x) {
0371 x <- cbind(x, "waiting_secs" = x$waiting * 60)
0372 })
0373 head(ldf, 3)
0374 ## eruptions waiting waiting_secs
0375 ##1 3.600 79 4740
0376 ##2 1.800 54 3240
0377 ##3 3.333 74 4440
0378
0379 {% endhighlight %}
0380 </div>
0381
0382 #### Run a given function on a large dataset grouping by input column(s) and using `gapply` or `gapplyCollect`
0383
0384 ##### gapply
0385 Apply a function to each group of a `SparkDataFrame`. The function is to be applied to each group of the `SparkDataFrame` and should have only two parameters: grouping key and R `data.frame` corresponding to
0386 that key. The groups are chosen from `SparkDataFrame`s column(s).
0387 The output of function should be a `data.frame`. Schema specifies the row format of the resulting
0388 `SparkDataFrame`. It must represent R function's output schema on the basis of Spark [data types](#data-type-mapping-between-r-and-spark). The column names of the returned `data.frame` are set by user.
0389
0390 <div data-lang="r" markdown="1">
0391 {% highlight r %}
0392
0393 # Determine six waiting times with the largest eruption time in minutes.
0394 schema <- structType(structField("waiting", "double"), structField("max_eruption", "double"))
0395 result <- gapply(
0396 df,
0397 "waiting",
0398 function(key, x) {
0399 y <- data.frame(key, max(x$eruptions))
0400 },
0401 schema)
0402 head(collect(arrange(result, "max_eruption", decreasing = TRUE)))
0403
0404 ## waiting max_eruption
0405 ##1 64 5.100
0406 ##2 69 5.067
0407 ##3 71 5.033
0408 ##4 87 5.000
0409 ##5 63 4.933
0410 ##6 89 4.900
0411 {% endhighlight %}
0412 </div>
0413
0414 ##### gapplyCollect
0415 Like `gapply`, applies a function to each partition of a `SparkDataFrame` and collect the result back to R data.frame. The output of the function should be a `data.frame`. But, the schema is not required to be passed. Note that `gapplyCollect` can fail if the output of UDF run on all the partition cannot be pulled to the driver and fit in driver memory.
0416
0417 <div data-lang="r" markdown="1">
0418 {% highlight r %}
0419
0420 # Determine six waiting times with the largest eruption time in minutes.
0421 result <- gapplyCollect(
0422 df,
0423 "waiting",
0424 function(key, x) {
0425 y <- data.frame(key, max(x$eruptions))
0426 colnames(y) <- c("waiting", "max_eruption")
0427 y
0428 })
0429 head(result[order(result$max_eruption, decreasing = TRUE), ])
0430
0431 ## waiting max_eruption
0432 ##1 64 5.100
0433 ##2 69 5.067
0434 ##3 71 5.033
0435 ##4 87 5.000
0436 ##5 63 4.933
0437 ##6 89 4.900
0438
0439 {% endhighlight %}
0440 </div>
0441
0442 #### Run local R functions distributed using `spark.lapply`
0443
0444 ##### spark.lapply
0445 Similar to `lapply` in native R, `spark.lapply` runs a function over a list of elements and distributes the computations with Spark.
0446 Applies a function in a manner that is similar to `doParallel` or `lapply` to elements of a list. The results of all the computations
0447 should fit in a single machine. If that is not the case they can do something like `df <- createDataFrame(list)` and then use
0448 `dapply`
0449
0450 <div data-lang="r" markdown="1">
0451 {% highlight r %}
0452 # Perform distributed training of multiple models with spark.lapply. Here, we pass
0453 # a read-only list of arguments which specifies family the generalized linear model should be.
0454 families <- c("gaussian", "poisson")
0455 train <- function(family) {
0456 model <- glm(Sepal.Length ~ Sepal.Width + Species, iris, family = family)
0457 summary(model)
0458 }
0459 # Return a list of model's summaries
0460 model.summaries <- spark.lapply(families, train)
0461
0462 # Print the summary of each model
0463 print(model.summaries)
0464
0465 {% endhighlight %}
0466 </div>
0467
0468 ### Eager execution
0469
0470 If eager execution is enabled, the data will be returned to R client immediately when the `SparkDataFrame` is created. By default, eager execution is not enabled and can be enabled by setting the configuration property `spark.sql.repl.eagerEval.enabled` to `true` when the `SparkSession` is started up.
0471
0472 Maximum number of rows and maximum number of characters per column of data to display can be controlled by `spark.sql.repl.eagerEval.maxNumRows` and `spark.sql.repl.eagerEval.truncate` configuration properties, respectively. These properties are only effective when eager execution is enabled. If these properties are not set explicitly, by default, data up to 20 rows and up to 20 characters per column will be showed.
0473
0474 <div data-lang="r" markdown="1">
0475 {% highlight r %}
0476
0477 # Start up spark session with eager execution enabled
0478 sparkR.session(master = "local[*]",
0479 sparkConfig = list(spark.sql.repl.eagerEval.enabled = "true",
0480 spark.sql.repl.eagerEval.maxNumRows = as.integer(10)))
0481
0482 # Create a grouped and sorted SparkDataFrame
0483 df <- createDataFrame(faithful)
0484 df2 <- arrange(summarize(groupBy(df, df$waiting), count = n(df$waiting)), "waiting")
0485
0486 # Similar to R data.frame, displays the data returned, instead of SparkDataFrame class string
0487 df2
0488
0489 ##+-------+-----+
0490 ##|waiting|count|
0491 ##+-------+-----+
0492 ##| 43.0| 1|
0493 ##| 45.0| 3|
0494 ##| 46.0| 5|
0495 ##| 47.0| 4|
0496 ##| 48.0| 3|
0497 ##| 49.0| 5|
0498 ##| 50.0| 5|
0499 ##| 51.0| 6|
0500 ##| 52.0| 5|
0501 ##| 53.0| 7|
0502 ##+-------+-----+
0503 ##only showing top 10 rows
0504
0505 {% endhighlight %}
0506 </div>
0507
0508 Note that to enable eager execution in `sparkR` shell, add `spark.sql.repl.eagerEval.enabled=true` configuration property to the `--conf` option.
0509
0510 ## Running SQL Queries from SparkR
0511 A SparkDataFrame can also be registered as a temporary view in Spark SQL and that allows you to run SQL queries over its data.
0512 The `sql` function enables applications to run SQL queries programmatically and returns the result as a `SparkDataFrame`.
0513
0514 <div data-lang="r" markdown="1">
0515 {% highlight r %}
0516 # Load a JSON file
0517 people <- read.df("./examples/src/main/resources/people.json", "json")
0518
0519 # Register this SparkDataFrame as a temporary view.
0520 createOrReplaceTempView(people, "people")
0521
0522 # SQL statements can be run by using the sql method
0523 teenagers <- sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
0524 head(teenagers)
0525 ## name
0526 ##1 Justin
0527
0528 {% endhighlight %}
0529 </div>
0530
0531 # Machine Learning
0532
0533 ## Algorithms
0534
0535 SparkR supports the following machine learning algorithms currently:
0536
0537 #### Classification
0538
0539 * [`spark.logit`](api/R/spark.logit.html): [`Logistic Regression`](ml-classification-regression.html#logistic-regression)
0540 * [`spark.mlp`](api/R/spark.mlp.html): [`Multilayer Perceptron (MLP)`](ml-classification-regression.html#multilayer-perceptron-classifier)
0541 * [`spark.naiveBayes`](api/R/spark.naiveBayes.html): [`Naive Bayes`](ml-classification-regression.html#naive-bayes)
0542 * [`spark.svmLinear`](api/R/spark.svmLinear.html): [`Linear Support Vector Machine`](ml-classification-regression.html#linear-support-vector-machine)
0543
0544 #### Regression
0545
0546 * [`spark.survreg`](api/R/spark.survreg.html): [`Accelerated Failure Time (AFT) Survival Model`](ml-classification-regression.html#survival-regression)
0547 * [`spark.glm`](api/R/spark.glm.html) or [`glm`](api/R/glm.html): [`Generalized Linear Model (GLM)`](ml-classification-regression.html#generalized-linear-regression)
0548 * [`spark.isoreg`](api/R/spark.isoreg.html): [`Isotonic Regression`](ml-classification-regression.html#isotonic-regression)
0549
0550 #### Tree
0551
0552 * [`spark.decisionTree`](api/R/spark.decisionTree.html): `Decision Tree for` [`Regression`](ml-classification-regression.html#decision-tree-regression) `and` [`Classification`](ml-classification-regression.html#decision-tree-classifier)
0553 * [`spark.gbt`](api/R/spark.gbt.html): `Gradient Boosted Trees for` [`Regression`](ml-classification-regression.html#gradient-boosted-tree-regression) `and` [`Classification`](ml-classification-regression.html#gradient-boosted-tree-classifier)
0554 * [`spark.randomForest`](api/R/spark.randomForest.html): `Random Forest for` [`Regression`](ml-classification-regression.html#random-forest-regression) `and` [`Classification`](ml-classification-regression.html#random-forest-classifier)
0555
0556 #### Clustering
0557
0558 * [`spark.bisectingKmeans`](api/R/spark.bisectingKmeans.html): [`Bisecting k-means`](ml-clustering.html#bisecting-k-means)
0559 * [`spark.gaussianMixture`](api/R/spark.gaussianMixture.html): [`Gaussian Mixture Model (GMM)`](ml-clustering.html#gaussian-mixture-model-gmm)
0560 * [`spark.kmeans`](api/R/spark.kmeans.html): [`K-Means`](ml-clustering.html#k-means)
0561 * [`spark.lda`](api/R/spark.lda.html): [`Latent Dirichlet Allocation (LDA)`](ml-clustering.html#latent-dirichlet-allocation-lda)
0562 * [`spark.powerIterationClustering (PIC)`](api/R/spark.powerIterationClustering.html): [`Power Iteration Clustering (PIC)`](ml-clustering.html#power-iteration-clustering-pic)
0563
0564 #### Collaborative Filtering
0565
0566 * [`spark.als`](api/R/spark.als.html): [`Alternating Least Squares (ALS)`](ml-collaborative-filtering.html#collaborative-filtering)
0567
0568 #### Frequent Pattern Mining
0569
0570 * [`spark.fpGrowth`](api/R/spark.fpGrowth.html) : [`FP-growth`](ml-frequent-pattern-mining.html#fp-growth)
0571 * [`spark.prefixSpan`](api/R/spark.prefixSpan.html) : [`PrefixSpan`](ml-frequent-pattern-mining.html#prefixSpan)
0572
0573 #### Statistics
0574
0575 * [`spark.kstest`](api/R/spark.kstest.html): `Kolmogorov-Smirnov Test`
0576
0577 Under the hood, SparkR uses MLlib to train the model. Please refer to the corresponding section of MLlib user guide for example code.
0578 Users can call `summary` to print a summary of the fitted model, [predict](api/R/predict.html) to make predictions on new data, and [write.ml](api/R/write.ml.html)/[read.ml](api/R/read.ml.html) to save/load fitted models.
0579 SparkR supports a subset of the available R formula operators for model fitting, including ‘~’, ‘.’, ‘:’, ‘+’, and ‘-‘.
0580
0581
0582 ## Model persistence
0583
0584 The following example shows how to save/load a MLlib model by SparkR.
0585 {% include_example read_write r/ml/ml.R %}
0586
0587 # Data type mapping between R and Spark
0588 <table class="table">
0589 <tr><th>R</th><th>Spark</th></tr>
0590 <tr>
0591 <td>byte</td>
0592 <td>byte</td>
0593 </tr>
0594 <tr>
0595 <td>integer</td>
0596 <td>integer</td>
0597 </tr>
0598 <tr>
0599 <td>float</td>
0600 <td>float</td>
0601 </tr>
0602 <tr>
0603 <td>double</td>
0604 <td>double</td>
0605 </tr>
0606 <tr>
0607 <td>numeric</td>
0608 <td>double</td>
0609 </tr>
0610 <tr>
0611 <td>character</td>
0612 <td>string</td>
0613 </tr>
0614 <tr>
0615 <td>string</td>
0616 <td>string</td>
0617 </tr>
0618 <tr>
0619 <td>binary</td>
0620 <td>binary</td>
0621 </tr>
0622 <tr>
0623 <td>raw</td>
0624 <td>binary</td>
0625 </tr>
0626 <tr>
0627 <td>logical</td>
0628 <td>boolean</td>
0629 </tr>
0630 <tr>
0631 <td><a href="https://stat.ethz.ch/R-manual/R-devel/library/base/html/DateTimeClasses.html">POSIXct</a></td>
0632 <td>timestamp</td>
0633 </tr>
0634 <tr>
0635 <td><a href="https://stat.ethz.ch/R-manual/R-devel/library/base/html/DateTimeClasses.html">POSIXlt</a></td>
0636 <td>timestamp</td>
0637 </tr>
0638 <tr>
0639 <td><a href="https://stat.ethz.ch/R-manual/R-devel/library/base/html/Dates.html">Date</a></td>
0640 <td>date</td>
0641 </tr>
0642 <tr>
0643 <td>array</td>
0644 <td>array</td>
0645 </tr>
0646 <tr>
0647 <td>list</td>
0648 <td>array</td>
0649 </tr>
0650 <tr>
0651 <td>env</td>
0652 <td>map</td>
0653 </tr>
0654 </table>
0655
0656 # Structured Streaming
0657
0658 SparkR supports the Structured Streaming API. Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. For more information see the R API on the [Structured Streaming Programming Guide](structured-streaming-programming-guide.html)
0659
0660 # Apache Arrow in SparkR
0661
0662 Apache Arrow is an in-memory columnar data format that is used in Spark to efficiently transfer data between JVM and R processes. See also PySpark optimization done, [PySpark Usage Guide for Pandas with Apache Arrow](sql-pyspark-pandas-with-arrow.html). This guide targets to explain how to use Arrow optimization in SparkR with some key points.
0663
0664 ## Ensure Arrow Installed
0665
0666 Arrow R library is available on CRAN and it can be installed as below.
0667
0668 ```bash
0669 Rscript -e 'install.packages("arrow", repos="https://cloud.r-project.org/")'
0670 ```
0671 Please refer [the official documentation of Apache Arrow](https://arrow.apache.org/docs/r/) for more detials.
0672
0673 Note that you must ensure that Arrow R package is installed and available on all cluster nodes.
0674 The current supported minimum version is 0.15.1; however, this might change between the minor releases since Arrow optimization in SparkR is experimental.
0675
0676 ## Enabling for Conversion to/from R DataFrame, `dapply` and `gapply`
0677
0678 Arrow optimization is available when converting a Spark DataFrame to an R DataFrame using the call `collect(spark_df)`,
0679 when creating a Spark DataFrame from an R DataFrame with `createDataFrame(r_df)`, when applying an R native function to each partition
0680 via `dapply(...)` and when applying an R native function to grouped data via `gapply(...)`.
0681 To use Arrow when executing these calls, users need to first set the Spark configuration ‘spark.sql.execution.arrow.sparkr.enabled’
0682 to ‘true’. This is disabled by default.
0683
0684 In addition, optimizations enabled by ‘spark.sql.execution.arrow.sparkr.enabled’ could fallback automatically to non-Arrow optimization
0685 implementation if an error occurs before the actual computation within Spark during converting a Spark DataFrame to/from an R
0686 DataFrame.
0687
0688 <div data-lang="r" markdown="1">
0689 {% highlight r %}
0690 # Start up spark session with Arrow optimization enabled
0691 sparkR.session(master = "local[*]",
0692 sparkConfig = list(spark.sql.execution.arrow.sparkr.enabled = "true"))
0693
0694 # Converts Spark DataFrame from an R DataFrame
0695 spark_df <- createDataFrame(mtcars)
0696
0697 # Converts Spark DataFrame to an R DataFrame
0698 collect(spark_df)
0699
0700 # Apply an R native function to each partition.
0701 collect(dapply(spark_df, function(rdf) { data.frame(rdf$gear + 1) }, structType("gear double")))
0702
0703 # Apply an R native function to grouped data.
0704 collect(gapply(spark_df,
0705 "gear",
0706 function(key, group) {
0707 data.frame(gear = key[[1]], disp = mean(group$disp) > group$disp)
0708 },
0709 structType("gear double, disp boolean")))
0710 {% endhighlight %}
0711 </div>
0712
0713 Using the above optimizations with Arrow will produce the same results as when Arrow is not enabled. Note that even with Arrow,
0714 `collect(spark_df)` results in the collection of all records in the DataFrame to the driver program and should be done on a
0715 small subset of the data.
0716
0717 ## Supported SQL Types
0718
0719 Currently, all Spark SQL data types are supported by Arrow-based conversion except `FloatType`, `BinaryType`, `ArrayType`, `StructType` and `MapType`.
0720
0721 # R Function Name Conflicts
0722
0723 When loading and attaching a new package in R, it is possible to have a name [conflict](https://stat.ethz.ch/R-manual/R-devel/library/base/html/library.html), where a
0724 function is masking another function.
0725
0726 The following functions are masked by the SparkR package:
0727
0728 <table class="table">
0729 <tr><th>Masked function</th><th>How to Access</th></tr>
0730 <tr>
0731 <td><code>cov</code> in <code>package:stats</code></td>
0732 <td><code><pre>stats::cov(x, y = NULL, use = "everything",
0733 method = c("pearson", "kendall", "spearman"))</pre></code></td>
0734 </tr>
0735 <tr>
0736 <td><code>filter</code> in <code>package:stats</code></td>
0737 <td><code><pre>stats::filter(x, filter, method = c("convolution", "recursive"),
0738 sides = 2, circular = FALSE, init)</pre></code></td>
0739 </tr>
0740 <tr>
0741 <td><code>sample</code> in <code>package:base</code></td>
0742 <td><code>base::sample(x, size, replace = FALSE, prob = NULL)</code></td>
0743 </tr>
0744 </table>
0745
0746 Since part of SparkR is modeled on the `dplyr` package, certain functions in SparkR share the same names with those in `dplyr`. Depending on the load order of the two packages, some functions from the package loaded first are masked by those in the package loaded after. In such case, prefix such calls with the package name, for instance, `SparkR::cume_dist(x)` or `dplyr::cume_dist(x)`.
0747
0748 You can inspect the search path in R with [`search()`](https://stat.ethz.ch/R-manual/R-devel/library/base/html/search.html)
0749
0750
0751 # Migration Guide
0752
0753 The migration guide is now archived [on this page](sparkr-migration-guide.html).
0754