Back to home page

OSCL-LXR

 
 

    


0001 ---
0002 layout: global
0003 displayTitle: SparkR (R on Spark)
0004 title: SparkR (R on Spark)
0005 license: |
0006   Licensed to the Apache Software Foundation (ASF) under one or more
0007   contributor license agreements.  See the NOTICE file distributed with
0008   this work for additional information regarding copyright ownership.
0009   The ASF licenses this file to You under the Apache License, Version 2.0
0010   (the "License"); you may not use this file except in compliance with
0011   the License.  You may obtain a copy of the License at
0012  
0013      http://www.apache.org/licenses/LICENSE-2.0
0014  
0015   Unless required by applicable law or agreed to in writing, software
0016   distributed under the License is distributed on an "AS IS" BASIS,
0017   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0018   See the License for the specific language governing permissions and
0019   limitations under the License.
0020 ---
0021 
0022 * This will become a table of contents (this text will be scraped).
0023 {:toc}
0024 
0025 # Overview
0026 SparkR is an R package that provides a light-weight frontend to use Apache Spark from R.
0027 In Spark {{site.SPARK_VERSION}}, SparkR provides a distributed data frame implementation that
0028 supports operations like selection, filtering, aggregation etc. (similar to R data frames,
0029 [dplyr](https://github.com/hadley/dplyr)) but on large datasets. SparkR also supports distributed
0030 machine learning using MLlib.
0031 
0032 # SparkDataFrame
0033 
0034 A SparkDataFrame is a distributed collection of data organized into named columns. It is conceptually
0035 equivalent to a table in a relational database or a data frame in R, but with richer
0036 optimizations under the hood. SparkDataFrames can be constructed from a wide array of sources such as:
0037 structured data files, tables in Hive, external databases, or existing local R data frames.
0038 
0039 All of the examples on this page use sample data included in R or the Spark distribution and can be run using the `./bin/sparkR` shell.
0040 
0041 ## Starting Up: SparkSession
0042 
0043 <div data-lang="r"  markdown="1">
0044 The entry point into SparkR is the `SparkSession` which connects your R program to a Spark cluster.
0045 You can create a `SparkSession` using `sparkR.session` and pass in options such as the application name, any spark packages depended on, etc. Further, you can also work with SparkDataFrames via `SparkSession`. If you are working from the `sparkR` shell, the `SparkSession` should already be created for you, and you would not need to call `sparkR.session`.
0046 
0047 <div data-lang="r" markdown="1">
0048 {% highlight r %}
0049 sparkR.session()
0050 {% endhighlight %}
0051 </div>
0052 
0053 ## Starting Up from RStudio
0054 
0055 You can also start SparkR from RStudio. You can connect your R program to a Spark cluster from
0056 RStudio, R shell, Rscript or other R IDEs. To start, make sure SPARK_HOME is set in environment
0057 (you can check [Sys.getenv](https://stat.ethz.ch/R-manual/R-devel/library/base/html/Sys.getenv.html)),
0058 load the SparkR package, and call `sparkR.session` as below. It will check for the Spark installation, and, if not found, it will be downloaded and cached automatically. Alternatively, you can also run `install.spark` manually.
0059 
0060 In addition to calling `sparkR.session`,
0061  you could also specify certain Spark driver properties. Normally these
0062 [Application properties](configuration.html#application-properties) and
0063 [Runtime Environment](configuration.html#runtime-environment) cannot be set programmatically, as the
0064 driver JVM process would have been started, in this case SparkR takes care of this for you. To set
0065 them, pass them as you would other configuration properties in the `sparkConfig` argument to
0066 `sparkR.session()`.
0067 
0068 <div data-lang="r" markdown="1">
0069 {% highlight r %}
0070 if (nchar(Sys.getenv("SPARK_HOME")) < 1) {
0071   Sys.setenv(SPARK_HOME = "/home/spark")
0072 }
0073 library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))
0074 sparkR.session(master = "local[*]", sparkConfig = list(spark.driver.memory = "2g"))
0075 {% endhighlight %}
0076 </div>
0077 
0078 The following Spark driver properties can be set in `sparkConfig` with `sparkR.session` from RStudio:
0079 
0080 <table class="table">
0081   <tr><th>Property Name</th><th>Property group</th><th><code>spark-submit</code> equivalent</th></tr>
0082   <tr>
0083     <td><code>spark.master</code></td>
0084     <td>Application Properties</td>
0085     <td><code>--master</code></td>
0086   </tr>
0087   <tr>
0088     <td><code>spark.kerberos.keytab</code></td>
0089     <td>Application Properties</td>
0090     <td><code>--keytab</code></td>
0091   </tr>
0092   <tr>
0093     <td><code>spark.kerberos.principal</code></td>
0094     <td>Application Properties</td>
0095     <td><code>--principal</code></td>
0096   </tr>
0097   <tr>
0098     <td><code>spark.driver.memory</code></td>
0099     <td>Application Properties</td>
0100     <td><code>--driver-memory</code></td>
0101   </tr>
0102   <tr>
0103     <td><code>spark.driver.extraClassPath</code></td>
0104     <td>Runtime Environment</td>
0105     <td><code>--driver-class-path</code></td>
0106   </tr>
0107   <tr>
0108     <td><code>spark.driver.extraJavaOptions</code></td>
0109     <td>Runtime Environment</td>
0110     <td><code>--driver-java-options</code></td>
0111   </tr>
0112   <tr>
0113     <td><code>spark.driver.extraLibraryPath</code></td>
0114     <td>Runtime Environment</td>
0115     <td><code>--driver-library-path</code></td>
0116   </tr>
0117 </table>
0118 
0119 </div>
0120 
0121 ## Creating SparkDataFrames
0122 With a `SparkSession`, applications can create `SparkDataFrame`s from a local R data frame, from a [Hive table](sql-data-sources-hive-tables.html), or from other [data sources](sql-data-sources.html).
0123 
0124 ### From local data frames
0125 The simplest way to create a data frame is to convert a local R data frame into a SparkDataFrame. Specifically, we can use `as.DataFrame` or `createDataFrame` and pass in the local R data frame to create a SparkDataFrame. As an example, the following creates a `SparkDataFrame` based using the `faithful` dataset from R.
0126 
0127 <div data-lang="r"  markdown="1">
0128 {% highlight r %}
0129 df <- as.DataFrame(faithful)
0130 
0131 # Displays the first part of the SparkDataFrame
0132 head(df)
0133 ##  eruptions waiting
0134 ##1     3.600      79
0135 ##2     1.800      54
0136 ##3     3.333      74
0137 
0138 {% endhighlight %}
0139 </div>
0140 
0141 ### From Data Sources
0142 
0143 SparkR supports operating on a variety of data sources through the `SparkDataFrame` interface. This section describes the general methods for loading and saving data using Data Sources. You can check the Spark SQL programming guide for more [specific options](sql-data-sources-load-save-functions.html#manually-specifying-options) that are available for the built-in data sources.
0144 
0145 The general method for creating SparkDataFrames from data sources is `read.df`. This method takes in the path for the file to load and the type of data source, and the currently active SparkSession will be used automatically.
0146 SparkR supports reading JSON, CSV and Parquet files natively, and through packages available from sources like [Third Party Projects](https://spark.apache.org/third-party-projects.html), you can find data source connectors for popular file formats like Avro. These packages can either be added by
0147 specifying `--packages` with `spark-submit` or `sparkR` commands, or if initializing SparkSession with `sparkPackages` parameter when in an interactive R shell or from RStudio.
0148 
0149 <div data-lang="r" markdown="1">
0150 {% highlight r %}
0151 sparkR.session(sparkPackages = "org.apache.spark:spark-avro_{{site.SCALA_BINARY_VERSION}}:{{site.SPARK_VERSION}}")
0152 {% endhighlight %}
0153 </div>
0154 
0155 We can see how to use data sources using an example JSON input file. Note that the file that is used here is _not_ a typical JSON file. Each line in the file must contain a separate, self-contained valid JSON object. For more information, please see [JSON Lines text format, also called newline-delimited JSON](http://jsonlines.org/). As a consequence, a regular multi-line JSON file will most often fail.
0156 
0157 <div data-lang="r"  markdown="1">
0158 {% highlight r %}
0159 people <- read.df("./examples/src/main/resources/people.json", "json")
0160 head(people)
0161 ##  age    name
0162 ##1  NA Michael
0163 ##2  30    Andy
0164 ##3  19  Justin
0165 
0166 # SparkR automatically infers the schema from the JSON file
0167 printSchema(people)
0168 # root
0169 #  |-- age: long (nullable = true)
0170 #  |-- name: string (nullable = true)
0171 
0172 # Similarly, multiple files can be read with read.json
0173 people <- read.json(c("./examples/src/main/resources/people.json", "./examples/src/main/resources/people2.json"))
0174 
0175 {% endhighlight %}
0176 </div>
0177 
0178 The data sources API natively supports CSV formatted input files. For more information please refer to SparkR [read.df](api/R/read.df.html) API documentation.
0179 
0180 <div data-lang="r"  markdown="1">
0181 {% highlight r %}
0182 df <- read.df(csvPath, "csv", header = "true", inferSchema = "true", na.strings = "NA")
0183 
0184 {% endhighlight %}
0185 </div>
0186 
0187 The data sources API can also be used to save out SparkDataFrames into multiple file formats. For example, we can save the SparkDataFrame from the previous example
0188 to a Parquet file using `write.df`.
0189 
0190 <div data-lang="r"  markdown="1">
0191 {% highlight r %}
0192 write.df(people, path = "people.parquet", source = "parquet", mode = "overwrite")
0193 {% endhighlight %}
0194 </div>
0195 
0196 ### From Hive tables
0197 
0198 You can also create SparkDataFrames from Hive tables. To do this we will need to create a SparkSession with Hive support which can access tables in the Hive MetaStore. Note that Spark should have been built with [Hive support](building-spark.html#building-with-hive-and-jdbc-support) and more details can be found in the [SQL programming guide](sql-getting-started.html#starting-point-sparksession). In SparkR, by default it will attempt to create a SparkSession with Hive support enabled (`enableHiveSupport = TRUE`).
0199 
0200 <div data-lang="r" markdown="1">
0201 {% highlight r %}
0202 sparkR.session()
0203 
0204 sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
0205 sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
0206 
0207 # Queries can be expressed in HiveQL.
0208 results <- sql("FROM src SELECT key, value")
0209 
0210 # results is now a SparkDataFrame
0211 head(results)
0212 ##  key   value
0213 ## 1 238 val_238
0214 ## 2  86  val_86
0215 ## 3 311 val_311
0216 
0217 {% endhighlight %}
0218 </div>
0219 
0220 ## SparkDataFrame Operations
0221 
0222 SparkDataFrames support a number of functions to do structured data processing.
0223 Here we include some basic examples and a complete list can be found in the [API](api/R/index.html) docs:
0224 
0225 ### Selecting rows, columns
0226 
0227 <div data-lang="r"  markdown="1">
0228 {% highlight r %}
0229 # Create the SparkDataFrame
0230 df <- as.DataFrame(faithful)
0231 
0232 # Get basic information about the SparkDataFrame
0233 df
0234 ## SparkDataFrame[eruptions:double, waiting:double]
0235 
0236 # Select only the "eruptions" column
0237 head(select(df, df$eruptions))
0238 ##  eruptions
0239 ##1     3.600
0240 ##2     1.800
0241 ##3     3.333
0242 
0243 # You can also pass in column name as strings
0244 head(select(df, "eruptions"))
0245 
0246 # Filter the SparkDataFrame to only retain rows with wait times shorter than 50 mins
0247 head(filter(df, df$waiting < 50))
0248 ##  eruptions waiting
0249 ##1     1.750      47
0250 ##2     1.750      47
0251 ##3     1.867      48
0252 
0253 {% endhighlight %}
0254 
0255 </div>
0256 
0257 ### Grouping, Aggregation
0258 
0259 SparkR data frames support a number of commonly used functions to aggregate data after grouping. For example, we can compute a histogram of the `waiting` time in the `faithful` dataset as shown below
0260 
0261 <div data-lang="r"  markdown="1">
0262 {% highlight r %}
0263 
0264 # We use the `n` operator to count the number of times each waiting time appears
0265 head(summarize(groupBy(df, df$waiting), count = n(df$waiting)))
0266 ##  waiting count
0267 ##1      70     4
0268 ##2      67     1
0269 ##3      69     2
0270 
0271 # We can also sort the output from the aggregation to get the most common waiting times
0272 waiting_counts <- summarize(groupBy(df, df$waiting), count = n(df$waiting))
0273 head(arrange(waiting_counts, desc(waiting_counts$count)))
0274 ##   waiting count
0275 ##1      78    15
0276 ##2      83    14
0277 ##3      81    13
0278 
0279 {% endhighlight %}
0280 </div>
0281 
0282 In addition to standard aggregations, SparkR supports [OLAP cube](https://en.wikipedia.org/wiki/OLAP_cube) operators `cube`:
0283 
0284 <div data-lang="r"  markdown="1">
0285 {% highlight r %}
0286 head(agg(cube(df, "cyl", "disp", "gear"), avg(df$mpg)))
0287 ##  cyl  disp gear avg(mpg)
0288 ##1  NA 140.8    4     22.8
0289 ##2   4  75.7    4     30.4
0290 ##3   8 400.0    3     19.2
0291 ##4   8 318.0    3     15.5
0292 ##5  NA 351.0   NA     15.8
0293 ##6  NA 275.8   NA     16.3
0294 {% endhighlight %}
0295 </div>
0296 
0297 and `rollup`:
0298 
0299 <div data-lang="r"  markdown="1">
0300 {% highlight r %}
0301 head(agg(rollup(df, "cyl", "disp", "gear"), avg(df$mpg)))
0302 ##  cyl  disp gear avg(mpg)
0303 ##1   4  75.7    4     30.4
0304 ##2   8 400.0    3     19.2
0305 ##3   8 318.0    3     15.5
0306 ##4   4  78.7   NA     32.4
0307 ##5   8 304.0    3     15.2
0308 ##6   4  79.0   NA     27.3
0309 {% endhighlight %}
0310 </div>
0311 
0312 ### Operating on Columns
0313 
0314 SparkR also provides a number of functions that can be directly applied to columns for data processing and during aggregation. The example below shows the use of basic arithmetic functions.
0315 
0316 <div data-lang="r"  markdown="1">
0317 {% highlight r %}
0318 
0319 # Convert waiting time from hours to seconds.
0320 # Note that we can assign this to a new column in the same SparkDataFrame
0321 df$waiting_secs <- df$waiting * 60
0322 head(df)
0323 ##  eruptions waiting waiting_secs
0324 ##1     3.600      79         4740
0325 ##2     1.800      54         3240
0326 ##3     3.333      74         4440
0327 
0328 {% endhighlight %}
0329 </div>
0330 
0331 ### Applying User-Defined Function
0332 In SparkR, we support several kinds of User-Defined Functions:
0333 
0334 #### Run a given function on a large dataset using `dapply` or `dapplyCollect`
0335 
0336 ##### dapply
0337 Apply a function to each partition of a `SparkDataFrame`. The function to be applied to each partition of the `SparkDataFrame`
0338 and should have only one parameter, to which a `data.frame` corresponds to each partition will be passed. The output of function should be a `data.frame`. Schema specifies the row format of the resulting a `SparkDataFrame`. It must match to [data types](#data-type-mapping-between-r-and-spark) of returned value.
0339 
0340 <div data-lang="r"  markdown="1">
0341 {% highlight r %}
0342 
0343 # Convert waiting time from hours to seconds.
0344 # Note that we can apply UDF to DataFrame.
0345 schema <- structType(structField("eruptions", "double"), structField("waiting", "double"),
0346                      structField("waiting_secs", "double"))
0347 df1 <- dapply(df, function(x) { x <- cbind(x, x$waiting * 60) }, schema)
0348 head(collect(df1))
0349 ##  eruptions waiting waiting_secs
0350 ##1     3.600      79         4740
0351 ##2     1.800      54         3240
0352 ##3     3.333      74         4440
0353 ##4     2.283      62         3720
0354 ##5     4.533      85         5100
0355 ##6     2.883      55         3300
0356 {% endhighlight %}
0357 </div>
0358 
0359 ##### dapplyCollect
0360 Like `dapply`, apply a function to each partition of a `SparkDataFrame` and collect the result back. The output of function
0361 should be a `data.frame`. But, Schema is not required to be passed. Note that `dapplyCollect` can fail if the output of UDF run on all the partition cannot be pulled to the driver and fit in driver memory.
0362 
0363 <div data-lang="r"  markdown="1">
0364 {% highlight r %}
0365 
0366 # Convert waiting time from hours to seconds.
0367 # Note that we can apply UDF to DataFrame and return a R's data.frame
0368 ldf <- dapplyCollect(
0369          df,
0370          function(x) {
0371            x <- cbind(x, "waiting_secs" = x$waiting * 60)
0372          })
0373 head(ldf, 3)
0374 ##  eruptions waiting waiting_secs
0375 ##1     3.600      79         4740
0376 ##2     1.800      54         3240
0377 ##3     3.333      74         4440
0378 
0379 {% endhighlight %}
0380 </div>
0381 
0382 #### Run a given function on a large dataset grouping by input column(s) and using `gapply` or `gapplyCollect`
0383 
0384 ##### gapply
0385 Apply a function to each group of a `SparkDataFrame`. The function is to be applied to each group of the `SparkDataFrame` and should have only two parameters: grouping key and R `data.frame` corresponding to
0386 that key. The groups are chosen from `SparkDataFrame`s column(s).
0387 The output of function should be a `data.frame`. Schema specifies the row format of the resulting
0388 `SparkDataFrame`. It must represent R function's output schema on the basis of Spark [data types](#data-type-mapping-between-r-and-spark). The column names of the returned `data.frame` are set by user.
0389 
0390 <div data-lang="r"  markdown="1">
0391 {% highlight r %}
0392 
0393 # Determine six waiting times with the largest eruption time in minutes.
0394 schema <- structType(structField("waiting", "double"), structField("max_eruption", "double"))
0395 result <- gapply(
0396     df,
0397     "waiting",
0398     function(key, x) {
0399         y <- data.frame(key, max(x$eruptions))
0400     },
0401     schema)
0402 head(collect(arrange(result, "max_eruption", decreasing = TRUE)))
0403 
0404 ##    waiting   max_eruption
0405 ##1      64       5.100
0406 ##2      69       5.067
0407 ##3      71       5.033
0408 ##4      87       5.000
0409 ##5      63       4.933
0410 ##6      89       4.900
0411 {% endhighlight %}
0412 </div>
0413 
0414 ##### gapplyCollect
0415 Like `gapply`, applies a function to each partition of a `SparkDataFrame` and collect the result back to R data.frame. The output of the function should be a `data.frame`. But, the schema is not required to be passed. Note that `gapplyCollect` can fail if the output of UDF run on all the partition cannot be pulled to the driver and fit in driver memory.
0416 
0417 <div data-lang="r"  markdown="1">
0418 {% highlight r %}
0419 
0420 # Determine six waiting times with the largest eruption time in minutes.
0421 result <- gapplyCollect(
0422     df,
0423     "waiting",
0424     function(key, x) {
0425         y <- data.frame(key, max(x$eruptions))
0426         colnames(y) <- c("waiting", "max_eruption")
0427         y
0428     })
0429 head(result[order(result$max_eruption, decreasing = TRUE), ])
0430 
0431 ##    waiting   max_eruption
0432 ##1      64       5.100
0433 ##2      69       5.067
0434 ##3      71       5.033
0435 ##4      87       5.000
0436 ##5      63       4.933
0437 ##6      89       4.900
0438 
0439 {% endhighlight %}
0440 </div>
0441 
0442 #### Run local R functions distributed using `spark.lapply`
0443 
0444 ##### spark.lapply
0445 Similar to `lapply` in native R, `spark.lapply` runs a function over a list of elements and distributes the computations with Spark.
0446 Applies a function in a manner that is similar to `doParallel` or `lapply` to elements of a list. The results of all the computations
0447 should fit in a single machine. If that is not the case they can do something like `df <- createDataFrame(list)` and then use
0448 `dapply`
0449 
0450 <div data-lang="r"  markdown="1">
0451 {% highlight r %}
0452 # Perform distributed training of multiple models with spark.lapply. Here, we pass
0453 # a read-only list of arguments which specifies family the generalized linear model should be.
0454 families <- c("gaussian", "poisson")
0455 train <- function(family) {
0456   model <- glm(Sepal.Length ~ Sepal.Width + Species, iris, family = family)
0457   summary(model)
0458 }
0459 # Return a list of model's summaries
0460 model.summaries <- spark.lapply(families, train)
0461 
0462 # Print the summary of each model
0463 print(model.summaries)
0464 
0465 {% endhighlight %}
0466 </div>
0467 
0468 ### Eager execution
0469 
0470 If eager execution is enabled, the data will be returned to R client immediately when the `SparkDataFrame` is created. By default, eager execution is not enabled and can be enabled by setting the configuration property `spark.sql.repl.eagerEval.enabled` to `true` when the `SparkSession` is started up.
0471 
0472 Maximum number of rows and maximum number of characters per column of data to display can be controlled by `spark.sql.repl.eagerEval.maxNumRows` and `spark.sql.repl.eagerEval.truncate` configuration properties, respectively. These properties are only effective when eager execution is enabled. If these properties are not set explicitly, by default, data up to 20 rows and up to 20 characters per column will be showed.
0473 
0474 <div data-lang="r" markdown="1">
0475 {% highlight r %}
0476 
0477 # Start up spark session with eager execution enabled
0478 sparkR.session(master = "local[*]",
0479                sparkConfig = list(spark.sql.repl.eagerEval.enabled = "true",
0480                                   spark.sql.repl.eagerEval.maxNumRows = as.integer(10)))
0481 
0482 # Create a grouped and sorted SparkDataFrame
0483 df <- createDataFrame(faithful)
0484 df2 <- arrange(summarize(groupBy(df, df$waiting), count = n(df$waiting)), "waiting")
0485 
0486 # Similar to R data.frame, displays the data returned, instead of SparkDataFrame class string
0487 df2
0488 
0489 ##+-------+-----+
0490 ##|waiting|count|
0491 ##+-------+-----+
0492 ##|   43.0|    1|
0493 ##|   45.0|    3|
0494 ##|   46.0|    5|
0495 ##|   47.0|    4|
0496 ##|   48.0|    3|
0497 ##|   49.0|    5|
0498 ##|   50.0|    5|
0499 ##|   51.0|    6|
0500 ##|   52.0|    5|
0501 ##|   53.0|    7|
0502 ##+-------+-----+
0503 ##only showing top 10 rows
0504 
0505 {% endhighlight %}
0506 </div>
0507 
0508 Note that to enable eager execution in `sparkR` shell, add `spark.sql.repl.eagerEval.enabled=true` configuration property to the `--conf` option.
0509 
0510 ## Running SQL Queries from SparkR
0511 A SparkDataFrame can also be registered as a temporary view in Spark SQL and that allows you to run SQL queries over its data.
0512 The `sql` function enables applications to run SQL queries programmatically and returns the result as a `SparkDataFrame`.
0513 
0514 <div data-lang="r"  markdown="1">
0515 {% highlight r %}
0516 # Load a JSON file
0517 people <- read.df("./examples/src/main/resources/people.json", "json")
0518 
0519 # Register this SparkDataFrame as a temporary view.
0520 createOrReplaceTempView(people, "people")
0521 
0522 # SQL statements can be run by using the sql method
0523 teenagers <- sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
0524 head(teenagers)
0525 ##    name
0526 ##1 Justin
0527 
0528 {% endhighlight %}
0529 </div>
0530 
0531 # Machine Learning
0532 
0533 ## Algorithms
0534 
0535 SparkR supports the following machine learning algorithms currently:
0536 
0537 #### Classification
0538 
0539 * [`spark.logit`](api/R/spark.logit.html): [`Logistic Regression`](ml-classification-regression.html#logistic-regression)
0540 * [`spark.mlp`](api/R/spark.mlp.html): [`Multilayer Perceptron (MLP)`](ml-classification-regression.html#multilayer-perceptron-classifier)
0541 * [`spark.naiveBayes`](api/R/spark.naiveBayes.html): [`Naive Bayes`](ml-classification-regression.html#naive-bayes)
0542 * [`spark.svmLinear`](api/R/spark.svmLinear.html): [`Linear Support Vector Machine`](ml-classification-regression.html#linear-support-vector-machine)
0543 
0544 #### Regression
0545 
0546 * [`spark.survreg`](api/R/spark.survreg.html): [`Accelerated Failure Time (AFT) Survival  Model`](ml-classification-regression.html#survival-regression)
0547 * [`spark.glm`](api/R/spark.glm.html) or [`glm`](api/R/glm.html): [`Generalized Linear Model (GLM)`](ml-classification-regression.html#generalized-linear-regression)
0548 * [`spark.isoreg`](api/R/spark.isoreg.html): [`Isotonic Regression`](ml-classification-regression.html#isotonic-regression)
0549 
0550 #### Tree
0551 
0552 * [`spark.decisionTree`](api/R/spark.decisionTree.html): `Decision Tree for` [`Regression`](ml-classification-regression.html#decision-tree-regression) `and` [`Classification`](ml-classification-regression.html#decision-tree-classifier)
0553 * [`spark.gbt`](api/R/spark.gbt.html): `Gradient Boosted Trees for` [`Regression`](ml-classification-regression.html#gradient-boosted-tree-regression) `and` [`Classification`](ml-classification-regression.html#gradient-boosted-tree-classifier)
0554 * [`spark.randomForest`](api/R/spark.randomForest.html): `Random Forest for` [`Regression`](ml-classification-regression.html#random-forest-regression) `and` [`Classification`](ml-classification-regression.html#random-forest-classifier)
0555 
0556 #### Clustering
0557 
0558 * [`spark.bisectingKmeans`](api/R/spark.bisectingKmeans.html): [`Bisecting k-means`](ml-clustering.html#bisecting-k-means)
0559 * [`spark.gaussianMixture`](api/R/spark.gaussianMixture.html): [`Gaussian Mixture Model (GMM)`](ml-clustering.html#gaussian-mixture-model-gmm)
0560 * [`spark.kmeans`](api/R/spark.kmeans.html): [`K-Means`](ml-clustering.html#k-means)
0561 * [`spark.lda`](api/R/spark.lda.html): [`Latent Dirichlet Allocation (LDA)`](ml-clustering.html#latent-dirichlet-allocation-lda)
0562 * [`spark.powerIterationClustering (PIC)`](api/R/spark.powerIterationClustering.html): [`Power Iteration Clustering (PIC)`](ml-clustering.html#power-iteration-clustering-pic)
0563 
0564 #### Collaborative Filtering
0565 
0566 * [`spark.als`](api/R/spark.als.html): [`Alternating Least Squares (ALS)`](ml-collaborative-filtering.html#collaborative-filtering)
0567 
0568 #### Frequent Pattern Mining
0569 
0570 * [`spark.fpGrowth`](api/R/spark.fpGrowth.html) : [`FP-growth`](ml-frequent-pattern-mining.html#fp-growth)
0571 * [`spark.prefixSpan`](api/R/spark.prefixSpan.html) : [`PrefixSpan`](ml-frequent-pattern-mining.html#prefixSpan)
0572 
0573 #### Statistics
0574 
0575 * [`spark.kstest`](api/R/spark.kstest.html): `Kolmogorov-Smirnov Test`
0576 
0577 Under the hood, SparkR uses MLlib to train the model. Please refer to the corresponding section of MLlib user guide for example code.
0578 Users can call `summary` to print a summary of the fitted model, [predict](api/R/predict.html) to make predictions on new data, and [write.ml](api/R/write.ml.html)/[read.ml](api/R/read.ml.html) to save/load fitted models.
0579 SparkR supports a subset of the available R formula operators for model fitting, including ‘~’, ‘.’, ‘:’, ‘+’, and ‘-‘.
0580 
0581 
0582 ## Model persistence
0583 
0584 The following example shows how to save/load a MLlib model by SparkR.
0585 {% include_example read_write r/ml/ml.R %}
0586 
0587 # Data type mapping between R and Spark
0588 <table class="table">
0589 <tr><th>R</th><th>Spark</th></tr>
0590 <tr>
0591   <td>byte</td>
0592   <td>byte</td>
0593 </tr>
0594 <tr>
0595   <td>integer</td>
0596   <td>integer</td>
0597 </tr>
0598 <tr>
0599   <td>float</td>
0600   <td>float</td>
0601 </tr>
0602 <tr>
0603   <td>double</td>
0604   <td>double</td>
0605 </tr>
0606 <tr>
0607   <td>numeric</td>
0608   <td>double</td>
0609 </tr>
0610 <tr>
0611   <td>character</td>
0612   <td>string</td>
0613 </tr>
0614 <tr>
0615   <td>string</td>
0616   <td>string</td>
0617 </tr>
0618 <tr>
0619   <td>binary</td>
0620   <td>binary</td>
0621 </tr>
0622 <tr>
0623   <td>raw</td>
0624   <td>binary</td>
0625 </tr>
0626 <tr>
0627   <td>logical</td>
0628   <td>boolean</td>
0629 </tr>
0630 <tr>
0631   <td><a href="https://stat.ethz.ch/R-manual/R-devel/library/base/html/DateTimeClasses.html">POSIXct</a></td>
0632   <td>timestamp</td>
0633 </tr>
0634 <tr>
0635   <td><a href="https://stat.ethz.ch/R-manual/R-devel/library/base/html/DateTimeClasses.html">POSIXlt</a></td>
0636   <td>timestamp</td>
0637 </tr>
0638 <tr>
0639   <td><a href="https://stat.ethz.ch/R-manual/R-devel/library/base/html/Dates.html">Date</a></td>
0640   <td>date</td>
0641 </tr>
0642 <tr>
0643   <td>array</td>
0644   <td>array</td>
0645 </tr>
0646 <tr>
0647   <td>list</td>
0648   <td>array</td>
0649 </tr>
0650 <tr>
0651   <td>env</td>
0652   <td>map</td>
0653 </tr>
0654 </table>
0655 
0656 # Structured Streaming
0657 
0658 SparkR supports the Structured Streaming API. Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. For more information see the R API on the [Structured Streaming Programming Guide](structured-streaming-programming-guide.html)
0659 
0660 # Apache Arrow in SparkR
0661 
0662 Apache Arrow is an in-memory columnar data format that is used in Spark to efficiently transfer data between JVM and R processes. See also PySpark optimization done, [PySpark Usage Guide for Pandas with Apache Arrow](sql-pyspark-pandas-with-arrow.html). This guide targets to explain how to use Arrow optimization in SparkR with some key points.
0663 
0664 ## Ensure Arrow Installed
0665 
0666 Arrow R library is available on CRAN and it can be installed as below.
0667 
0668 ```bash
0669 Rscript -e 'install.packages("arrow", repos="https://cloud.r-project.org/")'
0670 ```
0671 Please refer [the official documentation of Apache Arrow](https://arrow.apache.org/docs/r/) for more detials.
0672 
0673 Note that you must ensure that Arrow R package is installed and available on all cluster nodes.
0674 The current supported minimum version is 0.15.1; however, this might change between the minor releases since Arrow optimization in SparkR is experimental.
0675 
0676 ## Enabling for Conversion to/from R DataFrame, `dapply` and `gapply`
0677 
0678 Arrow optimization is available when converting a Spark DataFrame to an R DataFrame using the call `collect(spark_df)`,
0679 when creating a Spark DataFrame from an R DataFrame with `createDataFrame(r_df)`, when applying an R native function to each partition
0680 via `dapply(...)` and when applying an R native function to grouped data via `gapply(...)`.
0681 To use Arrow when executing these calls, users need to first set the Spark configuration ‘spark.sql.execution.arrow.sparkr.enabled’
0682 to ‘true’. This is disabled by default.
0683 
0684 In addition, optimizations enabled by ‘spark.sql.execution.arrow.sparkr.enabled’ could fallback automatically to non-Arrow optimization
0685 implementation if an error occurs before the actual computation within Spark during converting a Spark DataFrame to/from an R
0686 DataFrame.
0687 
0688 <div data-lang="r" markdown="1">
0689 {% highlight r %}
0690 # Start up spark session with Arrow optimization enabled
0691 sparkR.session(master = "local[*]",
0692                sparkConfig = list(spark.sql.execution.arrow.sparkr.enabled = "true"))
0693 
0694 # Converts Spark DataFrame from an R DataFrame
0695 spark_df <- createDataFrame(mtcars)
0696 
0697 # Converts Spark DataFrame to an R DataFrame
0698 collect(spark_df)
0699 
0700 # Apply an R native function to each partition.
0701 collect(dapply(spark_df, function(rdf) { data.frame(rdf$gear + 1) }, structType("gear double")))
0702 
0703 # Apply an R native function to grouped data.
0704 collect(gapply(spark_df,
0705                "gear",
0706                function(key, group) {
0707                  data.frame(gear = key[[1]], disp = mean(group$disp) > group$disp)
0708                },
0709                structType("gear double, disp boolean")))
0710 {% endhighlight %}
0711 </div>
0712 
0713 Using the above optimizations with Arrow will produce the same results as when Arrow is not enabled. Note that even with Arrow,
0714 `collect(spark_df)` results in the collection of all records in the DataFrame to the driver program and should be done on a
0715 small subset of the data.
0716 
0717 ## Supported SQL Types
0718 
0719 Currently, all Spark SQL data types are supported by Arrow-based conversion except `FloatType`, `BinaryType`, `ArrayType`, `StructType` and `MapType`.
0720 
0721 # R Function Name Conflicts
0722 
0723 When loading and attaching a new package in R, it is possible to have a name [conflict](https://stat.ethz.ch/R-manual/R-devel/library/base/html/library.html), where a
0724 function is masking another function.
0725 
0726 The following functions are masked by the SparkR package:
0727 
0728 <table class="table">
0729   <tr><th>Masked function</th><th>How to Access</th></tr>
0730   <tr>
0731     <td><code>cov</code> in <code>package:stats</code></td>
0732     <td><code><pre>stats::cov(x, y = NULL, use = "everything",
0733            method = c("pearson", "kendall", "spearman"))</pre></code></td>
0734   </tr>
0735   <tr>
0736     <td><code>filter</code> in <code>package:stats</code></td>
0737     <td><code><pre>stats::filter(x, filter, method = c("convolution", "recursive"),
0738               sides = 2, circular = FALSE, init)</pre></code></td>
0739   </tr>
0740   <tr>
0741     <td><code>sample</code> in <code>package:base</code></td>
0742     <td><code>base::sample(x, size, replace = FALSE, prob = NULL)</code></td>
0743   </tr>
0744 </table>
0745 
0746 Since part of SparkR is modeled on the `dplyr` package, certain functions in SparkR share the same names with those in `dplyr`. Depending on the load order of the two packages, some functions from the package loaded first are masked by those in the package loaded after. In such case, prefix such calls with the package name, for instance, `SparkR::cume_dist(x)` or `dplyr::cume_dist(x)`.
0747 
0748 You can inspect the search path in R with [`search()`](https://stat.ethz.ch/R-manual/R-devel/library/base/html/search.html)
0749 
0750 
0751 # Migration Guide
0752 
0753 The migration guide is now archived [on this page](sparkr-migration-guide.html).
0754