0001 #
0002 # Licensed to the Apache Software Foundation (ASF) under one or more
0003 # contributor license agreements. See the NOTICE file distributed with
0004 # this work for additional information regarding copyright ownership.
0005 # The ASF licenses this file to You under the Apache License, Version 2.0
0006 # (the "License"); you may not use this file except in compliance with
0007 # the License. You may obtain a copy of the License at
0008 #
0009 # http://www.apache.org/licenses/LICENSE-2.0
0010 #
0011 # Unless required by applicable law or agreed to in writing, software
0012 # distributed under the License is distributed on an "AS IS" BASIS,
0013 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014 # See the License for the specific language governing permissions and
0015 # limitations under the License.
0016 #
0017
0018 # DataFrame.R - SparkDataFrame class and methods implemented in S4 OO classes
0019
0020 #' @include generics.R jobj.R schema.R RDD.R pairRDD.R column.R group.R
0021 NULL
0022
0023 setOldClass("jobj")
0024 setOldClass("structType")
0025
0026 #' S4 class that represents a SparkDataFrame
0027 #'
0028 #' SparkDataFrames can be created using functions like \link{createDataFrame},
0029 #' \link{read.json}, \link{table} etc.
0030 #'
0031 #' @family SparkDataFrame functions
0032 #' @rdname SparkDataFrame
0033 #' @docType class
0034 #'
0035 #' @slot env An R environment that stores bookkeeping states of the SparkDataFrame
0036 #' @slot sdf A Java object reference to the backing Scala DataFrame
0037 #' @seealso \link{createDataFrame}, \link{read.json}, \link{table}
0038 #' @seealso \url{https://spark.apache.org/docs/latest/sparkr.html#sparkr-dataframes}
0039 #' @examples
0040 #'\dontrun{
0041 #' sparkR.session()
0042 #' df <- createDataFrame(faithful)
0043 #'}
0044 #' @note SparkDataFrame since 2.0.0
0045 setClass("SparkDataFrame",
0046 slots = list(env = "environment",
0047 sdf = "jobj"))
0048
0049 setMethod("initialize", "SparkDataFrame", function(.Object, sdf, isCached) {
0050 .Object@env <- new.env()
0051 .Object@env$isCached <- isCached
0052
0053 .Object@sdf <- sdf
0054 .Object
0055 })
0056
0057 #' Set options/mode and then return the write object
0058 #' @noRd
0059 setWriteOptions <- function(write, path = NULL, mode = "error", ...) {
0060 options <- varargsToStrEnv(...)
0061 if (!is.null(path)) {
0062 options[["path"]] <- path
0063 }
0064 write <- setWriteMode(write, mode)
0065 write <- callJMethod(write, "options", options)
0066 write
0067 }
0068
0069 #' Set mode and then return the write object
0070 #' @noRd
0071 setWriteMode <- function(write, mode) {
0072 if (!is.character(mode)) {
0073 stop("mode should be character or omitted. It is 'error' by default.")
0074 }
0075 write <- handledCallJMethod(write, "mode", mode)
0076 write
0077 }
0078
0079 #' @param sdf A Java object reference to the backing Scala DataFrame
0080 #' @param isCached TRUE if the SparkDataFrame is cached
0081 #' @noRd
0082 dataFrame <- function(sdf, isCached = FALSE) {
0083 new("SparkDataFrame", sdf, isCached)
0084 }
0085
0086 ############################ SparkDataFrame Methods ##############################################
0087
0088 #' Print Schema of a SparkDataFrame
0089 #'
0090 #' Prints out the schema in tree format
0091 #'
0092 #' @param x A SparkDataFrame
0093 #'
0094 #' @family SparkDataFrame functions
0095 #' @rdname printSchema
0096 #' @name printSchema
0097 #' @aliases printSchema,SparkDataFrame-method
0098 #' @examples
0099 #'\dontrun{
0100 #' sparkR.session()
0101 #' path <- "path/to/file.json"
0102 #' df <- read.json(path)
0103 #' printSchema(df)
0104 #'}
0105 #' @note printSchema since 1.4.0
0106 setMethod("printSchema",
0107 signature(x = "SparkDataFrame"),
0108 function(x) {
0109 schemaString <- callJMethod(schema(x)$jobj, "treeString")
0110 cat(schemaString)
0111 })
0112
0113 #' Get schema object
0114 #'
0115 #' Returns the schema of this SparkDataFrame as a structType object.
0116 #'
0117 #' @param x A SparkDataFrame
0118 #'
0119 #' @family SparkDataFrame functions
0120 #' @rdname schema
0121 #' @name schema
0122 #' @aliases schema,SparkDataFrame-method
0123 #' @examples
0124 #'\dontrun{
0125 #' sparkR.session()
0126 #' path <- "path/to/file.json"
0127 #' df <- read.json(path)
0128 #' dfSchema <- schema(df)
0129 #'}
0130 #' @note schema since 1.4.0
0131 setMethod("schema",
0132 signature(x = "SparkDataFrame"),
0133 function(x) {
0134 structType(callJMethod(x@sdf, "schema"))
0135 })
0136
0137 #' Explain
0138 #'
0139 #' Print the logical and physical Catalyst plans to the console for debugging.
0140 #'
0141 #' @family SparkDataFrame functions
0142 #' @aliases explain,SparkDataFrame-method
0143 #' @rdname explain
0144 #' @name explain
0145 #' @examples
0146 #'\dontrun{
0147 #' sparkR.session()
0148 #' path <- "path/to/file.json"
0149 #' df <- read.json(path)
0150 #' explain(df, TRUE)
0151 #'}
0152 #' @note explain since 1.4.0
0153 setMethod("explain",
0154 signature(x = "SparkDataFrame"),
0155 function(x, extended = FALSE) {
0156 queryExec <- callJMethod(x@sdf, "queryExecution")
0157 if (extended) {
0158 cat(callJMethod(queryExec, "toString"))
0159 } else {
0160 execPlan <- callJMethod(queryExec, "executedPlan")
0161 cat(callJMethod(execPlan, "toString"))
0162 }
0163 })
0164
0165 #' isLocal
0166 #'
0167 #' Returns True if the \code{collect} and \code{take} methods can be run locally
0168 #' (without any Spark executors).
0169 #'
0170 #' @param x A SparkDataFrame
0171 #'
0172 #' @family SparkDataFrame functions
0173 #' @rdname isLocal
0174 #' @name isLocal
0175 #' @aliases isLocal,SparkDataFrame-method
0176 #' @examples
0177 #'\dontrun{
0178 #' sparkR.session()
0179 #' path <- "path/to/file.json"
0180 #' df <- read.json(path)
0181 #' isLocal(df)
0182 #'}
0183 #' @note isLocal since 1.4.0
0184 setMethod("isLocal",
0185 signature(x = "SparkDataFrame"),
0186 function(x) {
0187 callJMethod(x@sdf, "isLocal")
0188 })
0189
0190 #' showDF
0191 #'
0192 #' Print the first numRows rows of a SparkDataFrame
0193 #'
0194 #' @param x a SparkDataFrame.
0195 #' @param numRows the number of rows to print. Defaults to 20.
0196 #' @param truncate whether truncate long strings. If \code{TRUE}, strings more than
0197 #' 20 characters will be truncated. However, if set greater than zero,
0198 #' truncates strings longer than \code{truncate} characters and all cells
0199 #' will be aligned right.
0200 #' @param vertical whether print output rows vertically (one line per column value).
0201 #' @param ... further arguments to be passed to or from other methods.
0202 #' @family SparkDataFrame functions
0203 #' @aliases showDF,SparkDataFrame-method
0204 #' @rdname showDF
0205 #' @name showDF
0206 #' @examples
0207 #'\dontrun{
0208 #' sparkR.session()
0209 #' path <- "path/to/file.json"
0210 #' df <- read.json(path)
0211 #' showDF(df)
0212 #'}
0213 #' @note showDF since 1.4.0
0214 setMethod("showDF",
0215 signature(x = "SparkDataFrame"),
0216 function(x, numRows = 20, truncate = TRUE, vertical = FALSE) {
0217 if (is.logical(truncate) && truncate) {
0218 s <- callJMethod(x@sdf, "showString", numToInt(numRows), numToInt(20), vertical)
0219 } else {
0220 truncate2 <- as.numeric(truncate)
0221 s <- callJMethod(x@sdf, "showString", numToInt(numRows), numToInt(truncate2),
0222 vertical)
0223 }
0224 cat(s)
0225 })
0226
0227 #' show
0228 #'
0229 #' If eager evaluation is enabled and the Spark object is a SparkDataFrame, evaluate the
0230 #' SparkDataFrame and print top rows of the SparkDataFrame, otherwise, print the class
0231 #' and type information of the Spark object.
0232 #'
0233 #' @param object a Spark object. Can be a SparkDataFrame, Column, GroupedData, WindowSpec.
0234 #'
0235 #' @family SparkDataFrame functions
0236 #' @rdname show
0237 #' @aliases show,SparkDataFrame-method
0238 #' @name show
0239 #' @examples
0240 #'\dontrun{
0241 #' sparkR.session()
0242 #' path <- "path/to/file.json"
0243 #' df <- read.json(path)
0244 #' show(df)
0245 #'}
0246 #' @note show(SparkDataFrame) since 1.4.0
0247 setMethod("show", "SparkDataFrame",
0248 function(object) {
0249 allConf <- sparkR.conf()
0250 prop <- allConf[["spark.sql.repl.eagerEval.enabled"]]
0251 if (!is.null(prop) && identical(prop, "true")) {
0252 argsList <- list()
0253 argsList$x <- object
0254 prop <- allConf[["spark.sql.repl.eagerEval.maxNumRows"]]
0255 if (!is.null(prop)) {
0256 numRows <- as.integer(prop)
0257 if (numRows > 0) {
0258 argsList$numRows <- numRows
0259 }
0260 }
0261 prop <- allConf[["spark.sql.repl.eagerEval.truncate"]]
0262 if (!is.null(prop)) {
0263 truncate <- as.integer(prop)
0264 if (truncate > 0) {
0265 argsList$truncate <- truncate
0266 }
0267 }
0268 do.call(showDF, argsList)
0269 } else {
0270 cols <- lapply(dtypes(object), function(l) {
0271 paste(l, collapse = ":")
0272 })
0273 s <- paste(cols, collapse = ", ")
0274 cat(paste0(class(object), "[", s, "]\n"))
0275 }
0276 })
0277
0278 #' DataTypes
0279 #'
0280 #' Return all column names and their data types as a list
0281 #'
0282 #' @param x A SparkDataFrame
0283 #'
0284 #' @family SparkDataFrame functions
0285 #' @rdname dtypes
0286 #' @name dtypes
0287 #' @aliases dtypes,SparkDataFrame-method
0288 #' @examples
0289 #'\dontrun{
0290 #' sparkR.session()
0291 #' path <- "path/to/file.json"
0292 #' df <- read.json(path)
0293 #' dtypes(df)
0294 #'}
0295 #' @note dtypes since 1.4.0
0296 setMethod("dtypes",
0297 signature(x = "SparkDataFrame"),
0298 function(x) {
0299 lapply(schema(x)$fields(), function(f) {
0300 c(f$name(), f$dataType.simpleString())
0301 })
0302 })
0303
0304 #' Column Names of SparkDataFrame
0305 #'
0306 #' Return a vector of column names.
0307 #'
0308 #' @param x a SparkDataFrame.
0309 #'
0310 #' @family SparkDataFrame functions
0311 #' @rdname columns
0312 #' @name columns
0313 #' @aliases columns,SparkDataFrame-method
0314 #' @examples
0315 #'\dontrun{
0316 #' sparkR.session()
0317 #' path <- "path/to/file.json"
0318 #' df <- read.json(path)
0319 #' columns(df)
0320 #' colnames(df)
0321 #'}
0322 #' @note columns since 1.4.0
0323 setMethod("columns",
0324 signature(x = "SparkDataFrame"),
0325 function(x) {
0326 sapply(schema(x)$fields(), function(f) {
0327 f$name()
0328 })
0329 })
0330
0331 #' @rdname columns
0332 #' @name names
0333 #' @aliases names,SparkDataFrame-method
0334 #' @note names since 1.5.0
0335 setMethod("names",
0336 signature(x = "SparkDataFrame"),
0337 function(x) {
0338 columns(x)
0339 })
0340
0341 #' @rdname columns
0342 #' @aliases names<-,SparkDataFrame-method
0343 #' @name names<-
0344 #' @note names<- since 1.5.0
0345 setMethod("names<-",
0346 signature(x = "SparkDataFrame"),
0347 function(x, value) {
0348 colnames(x) <- value
0349 x
0350 })
0351
0352 #' @rdname columns
0353 #' @aliases colnames,SparkDataFrame-method
0354 #' @name colnames
0355 #' @note colnames since 1.6.0
0356 setMethod("colnames",
0357 signature(x = "SparkDataFrame"),
0358 function(x) {
0359 columns(x)
0360 })
0361
0362 #' @param value a character vector. Must have the same length as the number
0363 #' of columns to be renamed.
0364 #' @rdname columns
0365 #' @aliases colnames<-,SparkDataFrame-method
0366 #' @name colnames<-
0367 #' @note colnames<- since 1.6.0
0368 setMethod("colnames<-",
0369 signature(x = "SparkDataFrame"),
0370 function(x, value) {
0371
0372 # Check parameter integrity
0373 if (class(value) != "character") {
0374 stop("Invalid column names.")
0375 }
0376
0377 if (length(value) != ncol(x)) {
0378 stop(
0379 "Column names must have the same length as the number of columns in the dataset.")
0380 }
0381
0382 if (any(is.na(value))) {
0383 stop("Column names cannot be NA.")
0384 }
0385
0386 # Check if the column names have . in it
0387 if (any(regexec(".", value, fixed = TRUE)[[1]][1] != -1)) {
0388 stop("Column names cannot contain the '.' symbol.")
0389 }
0390
0391 sdf <- callJMethod(x@sdf, "toDF", as.list(value))
0392 dataFrame(sdf)
0393 })
0394
0395 #' coltypes
0396 #'
0397 #' Get column types of a SparkDataFrame
0398 #'
0399 #' @param x A SparkDataFrame
0400 #' @return value A character vector with the column types of the given SparkDataFrame
0401 #' @rdname coltypes
0402 #' @aliases coltypes,SparkDataFrame-method
0403 #' @name coltypes
0404 #' @family SparkDataFrame functions
0405 #' @examples
0406 #'\dontrun{
0407 #' irisDF <- createDataFrame(iris)
0408 #' coltypes(irisDF) # get column types
0409 #'}
0410 #' @note coltypes since 1.6.0
0411 setMethod("coltypes",
0412 signature(x = "SparkDataFrame"),
0413 function(x) {
0414 # Get the data types of the SparkDataFrame by invoking dtypes() function
0415 types <- sapply(dtypes(x), function(x) {x[[2]]})
0416
0417 # Map Spark data types into R's data types using DATA_TYPES environment
0418 rTypes <- sapply(types, USE.NAMES = F, FUN = function(x) {
0419 # Check for primitive types
0420 type <- PRIMITIVE_TYPES[[x]]
0421
0422 if (is.null(type)) {
0423 # Check for complex types
0424 for (t in names(COMPLEX_TYPES)) {
0425 if (substring(x, 1, nchar(t)) == t) {
0426 type <- COMPLEX_TYPES[[t]]
0427 break
0428 }
0429 }
0430
0431 if (is.null(type)) {
0432 specialtype <- specialtypeshandle(x)
0433 if (is.null(specialtype)) {
0434 stop("Unsupported data type: ", x)
0435 }
0436 type <- PRIMITIVE_TYPES[[specialtype]]
0437 }
0438 }
0439 type[[1]]
0440 })
0441
0442 # Find which types don't have mapping to R
0443 naIndices <- which(is.na(rTypes))
0444
0445 # Assign the original scala data types to the unmatched ones
0446 rTypes[naIndices] <- types[naIndices]
0447
0448 rTypes
0449 })
0450
0451 #' coltypes
0452 #'
0453 #' Set the column types of a SparkDataFrame.
0454 #'
0455 #' @param value A character vector with the target column types for the given
0456 #' SparkDataFrame. Column types can be one of integer, numeric/double, character, logical, or NA
0457 #' to keep that column as-is.
0458 #' @rdname coltypes
0459 #' @name coltypes<-
0460 #' @aliases coltypes<-,SparkDataFrame,character-method
0461 #' @examples
0462 #'\dontrun{
0463 #' sparkR.session()
0464 #' path <- "path/to/file.json"
0465 #' df <- read.json(path)
0466 #' coltypes(df) <- c("character", "integer") # set column types
0467 #' coltypes(df) <- c(NA, "numeric") # set column types
0468 #'}
0469 #' @note coltypes<- since 1.6.0
0470 setMethod("coltypes<-",
0471 signature(x = "SparkDataFrame", value = "character"),
0472 function(x, value) {
0473 cols <- columns(x)
0474 ncols <- length(cols)
0475 if (length(value) == 0) {
0476 stop("Cannot set types of an empty SparkDataFrame with no Column")
0477 }
0478 if (length(value) != ncols) {
0479 stop("Length of type vector should match the number of columns for SparkDataFrame")
0480 }
0481 newCols <- lapply(seq_len(ncols), function(i) {
0482 col <- getColumn(x, cols[i])
0483 if (!is.na(value[i])) {
0484 stype <- rToSQLTypes[[value[i]]]
0485 if (is.null(stype)) {
0486 stop("Only atomic type is supported for column types")
0487 }
0488 cast(col, stype)
0489 } else {
0490 col
0491 }
0492 })
0493 nx <- select(x, newCols)
0494 dataFrame(nx@sdf)
0495 })
0496
0497 #' Creates a temporary view using the given name.
0498 #'
0499 #' Creates a new temporary view using a SparkDataFrame in the Spark Session. If a
0500 #' temporary view with the same name already exists, replaces it.
0501 #'
0502 #' @param x A SparkDataFrame
0503 #' @param viewName A character vector containing the name of the table
0504 #'
0505 #' @family SparkDataFrame functions
0506 #' @rdname createOrReplaceTempView
0507 #' @name createOrReplaceTempView
0508 #' @aliases createOrReplaceTempView,SparkDataFrame,character-method
0509 #' @examples
0510 #'\dontrun{
0511 #' sparkR.session()
0512 #' path <- "path/to/file.json"
0513 #' df <- read.json(path)
0514 #' createOrReplaceTempView(df, "json_df")
0515 #' new_df <- sql("SELECT * FROM json_df")
0516 #'}
0517 #' @note createOrReplaceTempView since 2.0.0
0518 setMethod("createOrReplaceTempView",
0519 signature(x = "SparkDataFrame", viewName = "character"),
0520 function(x, viewName) {
0521 invisible(callJMethod(x@sdf, "createOrReplaceTempView", viewName))
0522 })
0523
0524 #' (Deprecated) Register Temporary Table
0525 #'
0526 #' Registers a SparkDataFrame as a Temporary Table in the SparkSession
0527 #' @param x A SparkDataFrame
0528 #' @param tableName A character vector containing the name of the table
0529 #'
0530 #' @seealso \link{createOrReplaceTempView}
0531 #' @rdname registerTempTable-deprecated
0532 #' @name registerTempTable
0533 #' @aliases registerTempTable,SparkDataFrame,character-method
0534 #' @examples
0535 #'\dontrun{
0536 #' sparkR.session()
0537 #' path <- "path/to/file.json"
0538 #' df <- read.json(path)
0539 #' registerTempTable(df, "json_df")
0540 #' new_df <- sql("SELECT * FROM json_df")
0541 #'}
0542 #' @note registerTempTable since 1.4.0
0543 setMethod("registerTempTable",
0544 signature(x = "SparkDataFrame", tableName = "character"),
0545 function(x, tableName) {
0546 .Deprecated("createOrReplaceTempView")
0547 invisible(callJMethod(x@sdf, "createOrReplaceTempView", tableName))
0548 })
0549
0550 #' insertInto
0551 #'
0552 #' Insert the contents of a SparkDataFrame into a table registered in the current SparkSession.
0553 #'
0554 #' @param x a SparkDataFrame.
0555 #' @param tableName a character vector containing the name of the table.
0556 #' @param overwrite a logical argument indicating whether or not to overwrite.
0557 #' @param ... further arguments to be passed to or from other methods.
0558 #' the existing rows in the table.
0559 #'
0560 #' @family SparkDataFrame functions
0561 #' @rdname insertInto
0562 #' @name insertInto
0563 #' @aliases insertInto,SparkDataFrame,character-method
0564 #' @examples
0565 #'\dontrun{
0566 #' sparkR.session()
0567 #' df <- read.df(path, "parquet")
0568 #' df2 <- read.df(path2, "parquet")
0569 #' saveAsTable(df, "table1")
0570 #' insertInto(df2, "table1", overwrite = TRUE)
0571 #'}
0572 #' @note insertInto since 1.4.0
0573 setMethod("insertInto",
0574 signature(x = "SparkDataFrame", tableName = "character"),
0575 function(x, tableName, overwrite = FALSE) {
0576 write <- callJMethod(x@sdf, "write")
0577 write <- setWriteMode(write, ifelse(overwrite, "overwrite", "append"))
0578 invisible(callJMethod(write, "insertInto", tableName))
0579 })
0580
0581 #' Cache
0582 #'
0583 #' Persist with the default storage level (MEMORY_ONLY).
0584 #'
0585 #' @param x A SparkDataFrame
0586 #'
0587 #' @family SparkDataFrame functions
0588 #' @aliases cache,SparkDataFrame-method
0589 #' @rdname cache
0590 #' @name cache
0591 #' @examples
0592 #'\dontrun{
0593 #' sparkR.session()
0594 #' path <- "path/to/file.json"
0595 #' df <- read.json(path)
0596 #' cache(df)
0597 #'}
0598 #' @note cache since 1.4.0
0599 setMethod("cache",
0600 signature(x = "SparkDataFrame"),
0601 function(x) {
0602 cached <- callJMethod(x@sdf, "cache")
0603 x@env$isCached <- TRUE
0604 x
0605 })
0606
0607 #' Persist
0608 #'
0609 #' Persist this SparkDataFrame with the specified storage level. For details of the
0610 #' supported storage levels, refer to
0611 #' \url{http://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence}.
0612 #'
0613 #' @param x the SparkDataFrame to persist.
0614 #' @param newLevel storage level chosen for the persistence. See available options in
0615 #' the description.
0616 #'
0617 #' @family SparkDataFrame functions
0618 #' @rdname persist
0619 #' @name persist
0620 #' @aliases persist,SparkDataFrame,character-method
0621 #' @examples
0622 #'\dontrun{
0623 #' sparkR.session()
0624 #' path <- "path/to/file.json"
0625 #' df <- read.json(path)
0626 #' persist(df, "MEMORY_AND_DISK")
0627 #'}
0628 #' @note persist since 1.4.0
0629 setMethod("persist",
0630 signature(x = "SparkDataFrame", newLevel = "character"),
0631 function(x, newLevel) {
0632 callJMethod(x@sdf, "persist", getStorageLevel(newLevel))
0633 x@env$isCached <- TRUE
0634 x
0635 })
0636
0637 #' Unpersist
0638 #'
0639 #' Mark this SparkDataFrame as non-persistent, and remove all blocks for it from memory and
0640 #' disk.
0641 #'
0642 #' @param x the SparkDataFrame to unpersist.
0643 #' @param blocking whether to block until all blocks are deleted.
0644 #' @param ... further arguments to be passed to or from other methods.
0645 #'
0646 #' @family SparkDataFrame functions
0647 #' @rdname unpersist
0648 #' @aliases unpersist,SparkDataFrame-method
0649 #' @name unpersist
0650 #' @examples
0651 #'\dontrun{
0652 #' sparkR.session()
0653 #' path <- "path/to/file.json"
0654 #' df <- read.json(path)
0655 #' persist(df, "MEMORY_AND_DISK")
0656 #' unpersist(df)
0657 #'}
0658 #' @note unpersist since 1.4.0
0659 setMethod("unpersist",
0660 signature(x = "SparkDataFrame"),
0661 function(x, blocking = TRUE) {
0662 callJMethod(x@sdf, "unpersist", blocking)
0663 x@env$isCached <- FALSE
0664 x
0665 })
0666
0667 #' StorageLevel
0668 #'
0669 #' Get storagelevel of this SparkDataFrame.
0670 #'
0671 #' @param x the SparkDataFrame to get the storageLevel.
0672 #'
0673 #' @family SparkDataFrame functions
0674 #' @rdname storageLevel
0675 #' @aliases storageLevel,SparkDataFrame-method
0676 #' @name storageLevel
0677 #' @examples
0678 #'\dontrun{
0679 #' sparkR.session()
0680 #' path <- "path/to/file.json"
0681 #' df <- read.json(path)
0682 #' persist(df, "MEMORY_AND_DISK")
0683 #' storageLevel(df)
0684 #'}
0685 #' @note storageLevel since 2.1.0
0686 setMethod("storageLevel",
0687 signature(x = "SparkDataFrame"),
0688 function(x) {
0689 storageLevelToString(callJMethod(x@sdf, "storageLevel"))
0690 })
0691
0692 #' Coalesce
0693 #'
0694 #' Returns a new SparkDataFrame that has exactly \code{numPartitions} partitions.
0695 #' This operation results in a narrow dependency, e.g. if you go from 1000 partitions to 100
0696 #' partitions, there will not be a shuffle, instead each of the 100 new partitions will claim 10 of
0697 #' the current partitions. If a larger number of partitions is requested, it will stay at the
0698 #' current number of partitions.
0699 #'
0700 #' However, if you're doing a drastic coalesce on a SparkDataFrame, e.g. to numPartitions = 1,
0701 #' this may result in your computation taking place on fewer nodes than
0702 #' you like (e.g. one node in the case of numPartitions = 1). To avoid this,
0703 #' call \code{repartition}. This will add a shuffle step, but means the
0704 #' current upstream partitions will be executed in parallel (per whatever
0705 #' the current partitioning is).
0706 #'
0707 #' @param numPartitions the number of partitions to use.
0708 #'
0709 #' @family SparkDataFrame functions
0710 #' @rdname coalesce
0711 #' @name coalesce
0712 #' @aliases coalesce,SparkDataFrame-method
0713 #' @seealso \link{repartition}, \link{repartitionByRange}
0714 #' @examples
0715 #'\dontrun{
0716 #' sparkR.session()
0717 #' path <- "path/to/file.json"
0718 #' df <- read.json(path)
0719 #' newDF <- coalesce(df, 1L)
0720 #'}
0721 #' @note coalesce(SparkDataFrame) since 2.1.1
0722 setMethod("coalesce",
0723 signature(x = "SparkDataFrame"),
0724 function(x, numPartitions) {
0725 stopifnot(is.numeric(numPartitions))
0726 sdf <- callJMethod(x@sdf, "coalesce", numToInt(numPartitions))
0727 dataFrame(sdf)
0728 })
0729
0730 #' Repartition
0731 #'
0732 #' The following options for repartition are possible:
0733 #' \itemize{
0734 #' \item{1.} {Return a new SparkDataFrame that has exactly \code{numPartitions}.}
0735 #' \item{2.} {Return a new SparkDataFrame hash partitioned by
0736 #' the given columns into \code{numPartitions}.}
0737 #' \item{3.} {Return a new SparkDataFrame hash partitioned by the given column(s),
0738 #' using \code{spark.sql.shuffle.partitions} as number of partitions.}
0739 #'}
0740 #' @param x a SparkDataFrame.
0741 #' @param numPartitions the number of partitions to use.
0742 #' @param col the column by which the partitioning will be performed.
0743 #' @param ... additional column(s) to be used in the partitioning.
0744 #'
0745 #' @family SparkDataFrame functions
0746 #' @rdname repartition
0747 #' @name repartition
0748 #' @aliases repartition,SparkDataFrame-method
0749 #' @seealso \link{coalesce}, \link{repartitionByRange}
0750 #' @examples
0751 #'\dontrun{
0752 #' sparkR.session()
0753 #' path <- "path/to/file.json"
0754 #' df <- read.json(path)
0755 #' newDF <- repartition(df, 2L)
0756 #' newDF <- repartition(df, numPartitions = 2L)
0757 #' newDF <- repartition(df, col = df$"col1", df$"col2")
0758 #' newDF <- repartition(df, 3L, col = df$"col1", df$"col2")
0759 #'}
0760 #' @note repartition since 1.4.0
0761 setMethod("repartition",
0762 signature(x = "SparkDataFrame"),
0763 function(x, numPartitions = NULL, col = NULL, ...) {
0764 if (!is.null(numPartitions) && is.numeric(numPartitions)) {
0765 # number of partitions and columns both are specified
0766 if (!is.null(col) && class(col) == "Column") {
0767 cols <- list(col, ...)
0768 jcol <- lapply(cols, function(c) { c@jc })
0769 sdf <- callJMethod(x@sdf, "repartition", numToInt(numPartitions), jcol)
0770 } else {
0771 # only number of partitions is specified
0772 sdf <- callJMethod(x@sdf, "repartition", numToInt(numPartitions))
0773 }
0774 } else if (!is.null(col) && class(col) == "Column") {
0775 # only columns are specified
0776 cols <- list(col, ...)
0777 jcol <- lapply(cols, function(c) { c@jc })
0778 sdf <- callJMethod(x@sdf, "repartition", jcol)
0779 } else {
0780 stop("Please, specify the number of partitions and/or a column(s)")
0781 }
0782 dataFrame(sdf)
0783 })
0784
0785
0786 #' Repartition by range
0787 #'
0788 #' The following options for repartition by range are possible:
0789 #' \itemize{
0790 #' \item{1.} {Return a new SparkDataFrame range partitioned by
0791 #' the given columns into \code{numPartitions}.}
0792 #' \item{2.} {Return a new SparkDataFrame range partitioned by the given column(s),
0793 #' using \code{spark.sql.shuffle.partitions} as number of partitions.}
0794 #'}
0795 #' At least one partition-by expression must be specified.
0796 #' When no explicit sort order is specified, "ascending nulls first" is assumed.
0797 #'
0798 #' Note that due to performance reasons this method uses sampling to estimate the ranges.
0799 #' Hence, the output may not be consistent, since sampling can return different values.
0800 #' The sample size can be controlled by the config
0801 #' \code{spark.sql.execution.rangeExchange.sampleSizePerPartition}.
0802 #'
0803 #' @param x a SparkDataFrame.
0804 #' @param numPartitions the number of partitions to use.
0805 #' @param col the column by which the range partitioning will be performed.
0806 #' @param ... additional column(s) to be used in the range partitioning.
0807 #'
0808 #' @family SparkDataFrame functions
0809 #' @rdname repartitionByRange
0810 #' @name repartitionByRange
0811 #' @aliases repartitionByRange,SparkDataFrame-method
0812 #' @seealso \link{repartition}, \link{coalesce}
0813 #' @examples
0814 #'\dontrun{
0815 #' sparkR.session()
0816 #' path <- "path/to/file.json"
0817 #' df <- read.json(path)
0818 #' newDF <- repartitionByRange(df, col = df$col1, df$col2)
0819 #' newDF <- repartitionByRange(df, 3L, col = df$col1, df$col2)
0820 #'}
0821 #' @note repartitionByRange since 2.4.0
0822 setMethod("repartitionByRange",
0823 signature(x = "SparkDataFrame"),
0824 function(x, numPartitions = NULL, col = NULL, ...) {
0825 if (!is.null(numPartitions) && !is.null(col)) {
0826 # number of partitions and columns both are specified
0827 if (is.numeric(numPartitions) && class(col) == "Column") {
0828 cols <- list(col, ...)
0829 jcol <- lapply(cols, function(c) { c@jc })
0830 sdf <- callJMethod(x@sdf, "repartitionByRange", numToInt(numPartitions), jcol)
0831 } else {
0832 stop("numPartitions and col must be numeric and Column; however, got ",
0833 class(numPartitions), " and ", class(col))
0834 }
0835 } else if (!is.null(col)) {
0836 # only columns are specified
0837 if (class(col) == "Column") {
0838 cols <- list(col, ...)
0839 jcol <- lapply(cols, function(c) { c@jc })
0840 sdf <- callJMethod(x@sdf, "repartitionByRange", jcol)
0841 } else {
0842 stop("col must be Column; however, got ", class(col))
0843 }
0844 } else if (!is.null(numPartitions)) {
0845 # only numPartitions is specified
0846 stop("At least one partition-by column must be specified.")
0847 } else {
0848 stop("Please, specify a column(s) or the number of partitions with a column(s)")
0849 }
0850 dataFrame(sdf)
0851 })
0852
0853 #' toJSON
0854 #'
0855 #' Converts a SparkDataFrame into a SparkDataFrame of JSON string.
0856 #' Each row is turned into a JSON document with columns as different fields.
0857 #' The returned SparkDataFrame has a single character column with the name \code{value}
0858 #'
0859 #' @param x a SparkDataFrame
0860 #' @return a SparkDataFrame
0861 #' @family SparkDataFrame functions
0862 #' @rdname toJSON
0863 #' @name toJSON
0864 #' @aliases toJSON,SparkDataFrame-method
0865 #' @examples
0866 #'\dontrun{
0867 #' sparkR.session()
0868 #' path <- "path/to/file.parquet"
0869 #' df <- read.parquet(path)
0870 #' df_json <- toJSON(df)
0871 #'}
0872 #' @note toJSON since 2.2.0
0873 setMethod("toJSON",
0874 signature(x = "SparkDataFrame"),
0875 function(x) {
0876 jsonDS <- callJMethod(x@sdf, "toJSON")
0877 df <- callJMethod(jsonDS, "toDF")
0878 dataFrame(df)
0879 })
0880
0881 #' Save the contents of SparkDataFrame as a JSON file
0882 #'
0883 #' Save the contents of a SparkDataFrame as a JSON file (\href{http://jsonlines.org/}{
0884 #' JSON Lines text format or newline-delimited JSON}). Files written out
0885 #' with this method can be read back in as a SparkDataFrame using read.json().
0886 #'
0887 #' @param x A SparkDataFrame
0888 #' @param path The directory where the file is saved
0889 #' @param mode one of 'append', 'overwrite', 'error', 'errorifexists', 'ignore'
0890 #' save mode (it is 'error' by default)
0891 #' @param ... additional argument(s) passed to the method.
0892 #'
0893 #' @family SparkDataFrame functions
0894 #' @rdname write.json
0895 #' @name write.json
0896 #' @aliases write.json,SparkDataFrame,character-method
0897 #' @examples
0898 #'\dontrun{
0899 #' sparkR.session()
0900 #' path <- "path/to/file.json"
0901 #' df <- read.json(path)
0902 #' write.json(df, "/tmp/sparkr-tmp/")
0903 #'}
0904 #' @note write.json since 1.6.0
0905 setMethod("write.json",
0906 signature(x = "SparkDataFrame", path = "character"),
0907 function(x, path, mode = "error", ...) {
0908 write <- callJMethod(x@sdf, "write")
0909 write <- setWriteOptions(write, mode = mode, ...)
0910 invisible(handledCallJMethod(write, "json", path))
0911 })
0912
0913 #' Save the contents of SparkDataFrame as an ORC file, preserving the schema.
0914 #'
0915 #' Save the contents of a SparkDataFrame as an ORC file, preserving the schema. Files written out
0916 #' with this method can be read back in as a SparkDataFrame using read.orc().
0917 #'
0918 #' @param x A SparkDataFrame
0919 #' @param path The directory where the file is saved
0920 #' @param mode one of 'append', 'overwrite', 'error', 'errorifexists', 'ignore'
0921 #' save mode (it is 'error' by default)
0922 #' @param ... additional argument(s) passed to the method.
0923 #'
0924 #' @family SparkDataFrame functions
0925 #' @aliases write.orc,SparkDataFrame,character-method
0926 #' @rdname write.orc
0927 #' @name write.orc
0928 #' @examples
0929 #'\dontrun{
0930 #' sparkR.session()
0931 #' path <- "path/to/file.json"
0932 #' df <- read.json(path)
0933 #' write.orc(df, "/tmp/sparkr-tmp1/")
0934 #' }
0935 #' @note write.orc since 2.0.0
0936 setMethod("write.orc",
0937 signature(x = "SparkDataFrame", path = "character"),
0938 function(x, path, mode = "error", ...) {
0939 write <- callJMethod(x@sdf, "write")
0940 write <- setWriteOptions(write, mode = mode, ...)
0941 invisible(handledCallJMethod(write, "orc", path))
0942 })
0943
0944 #' Save the contents of SparkDataFrame as a Parquet file, preserving the schema.
0945 #'
0946 #' Save the contents of a SparkDataFrame as a Parquet file, preserving the schema. Files written out
0947 #' with this method can be read back in as a SparkDataFrame using read.parquet().
0948 #'
0949 #' @param x A SparkDataFrame
0950 #' @param path The directory where the file is saved
0951 #' @param mode one of 'append', 'overwrite', 'error', 'errorifexists', 'ignore'
0952 #' save mode (it is 'error' by default)
0953 #' @param ... additional argument(s) passed to the method.
0954 #'
0955 #' @family SparkDataFrame functions
0956 #' @rdname write.parquet
0957 #' @name write.parquet
0958 #' @aliases write.parquet,SparkDataFrame,character-method
0959 #' @examples
0960 #'\dontrun{
0961 #' sparkR.session()
0962 #' path <- "path/to/file.json"
0963 #' df <- read.json(path)
0964 #' write.parquet(df, "/tmp/sparkr-tmp1/")
0965 #'}
0966 #' @note write.parquet since 1.6.0
0967 setMethod("write.parquet",
0968 signature(x = "SparkDataFrame", path = "character"),
0969 function(x, path, mode = "error", ...) {
0970 write <- callJMethod(x@sdf, "write")
0971 write <- setWriteOptions(write, mode = mode, ...)
0972 invisible(handledCallJMethod(write, "parquet", path))
0973 })
0974
0975 #' Save the content of SparkDataFrame in a text file at the specified path.
0976 #'
0977 #' Save the content of the SparkDataFrame in a text file at the specified path.
0978 #' The SparkDataFrame must have only one column of string type with the name "value".
0979 #' Each row becomes a new line in the output file. The text files will be encoded as UTF-8.
0980 #'
0981 #' @param x A SparkDataFrame
0982 #' @param path The directory where the file is saved
0983 #' @param mode one of 'append', 'overwrite', 'error', 'errorifexists', 'ignore'
0984 #' save mode (it is 'error' by default)
0985 #' @param ... additional argument(s) passed to the method.
0986 #'
0987 #' @family SparkDataFrame functions
0988 #' @aliases write.text,SparkDataFrame,character-method
0989 #' @rdname write.text
0990 #' @name write.text
0991 #' @examples
0992 #'\dontrun{
0993 #' sparkR.session()
0994 #' path <- "path/to/file.txt"
0995 #' df <- read.text(path)
0996 #' write.text(df, "/tmp/sparkr-tmp/")
0997 #'}
0998 #' @note write.text since 2.0.0
0999 setMethod("write.text",
1000 signature(x = "SparkDataFrame", path = "character"),
1001 function(x, path, mode = "error", ...) {
1002 write <- callJMethod(x@sdf, "write")
1003 write <- setWriteOptions(write, mode = mode, ...)
1004 invisible(handledCallJMethod(write, "text", path))
1005 })
1006
1007 #' Distinct
1008 #'
1009 #' Return a new SparkDataFrame containing the distinct rows in this SparkDataFrame.
1010 #'
1011 #' @param x A SparkDataFrame
1012 #'
1013 #' @family SparkDataFrame functions
1014 #' @aliases distinct,SparkDataFrame-method
1015 #' @rdname distinct
1016 #' @name distinct
1017 #' @examples
1018 #'\dontrun{
1019 #' sparkR.session()
1020 #' path <- "path/to/file.json"
1021 #' df <- read.json(path)
1022 #' distinctDF <- distinct(df)
1023 #'}
1024 #' @note distinct since 1.4.0
1025 setMethod("distinct",
1026 signature(x = "SparkDataFrame"),
1027 function(x) {
1028 sdf <- callJMethod(x@sdf, "distinct")
1029 dataFrame(sdf)
1030 })
1031
1032 #' @rdname distinct
1033 #' @name unique
1034 #' @aliases unique,SparkDataFrame-method
1035 #' @note unique since 1.5.0
1036 setMethod("unique",
1037 signature(x = "SparkDataFrame"),
1038 function(x) {
1039 distinct(x)
1040 })
1041
1042 #' Sample
1043 #'
1044 #' Return a sampled subset of this SparkDataFrame using a random seed.
1045 #' Note: this is not guaranteed to provide exactly the fraction specified
1046 #' of the total count of of the given SparkDataFrame.
1047 #'
1048 #' @param x A SparkDataFrame
1049 #' @param withReplacement Sampling with replacement or not
1050 #' @param fraction The (rough) sample target fraction
1051 #' @param seed Randomness seed value. Default is a random seed.
1052 #'
1053 #' @family SparkDataFrame functions
1054 #' @aliases sample,SparkDataFrame-method
1055 #' @rdname sample
1056 #' @name sample
1057 #' @examples
1058 #'\dontrun{
1059 #' sparkR.session()
1060 #' path <- "path/to/file.json"
1061 #' df <- read.json(path)
1062 #' collect(sample(df, fraction = 0.5))
1063 #' collect(sample(df, FALSE, 0.5))
1064 #' collect(sample(df, TRUE, 0.5, seed = 3))
1065 #'}
1066 #' @note sample since 1.4.0
1067 setMethod("sample",
1068 signature(x = "SparkDataFrame"),
1069 function(x, withReplacement = FALSE, fraction, seed) {
1070 if (!is.numeric(fraction)) {
1071 stop("fraction must be numeric; however, got ", class(fraction))
1072 }
1073 if (!is.logical(withReplacement)) {
1074 stop("withReplacement must be logical; however, got ", class(withReplacement))
1075 }
1076
1077 if (!missing(seed)) {
1078 if (is.null(seed)) {
1079 stop("seed must not be NULL or NA; however, got NULL")
1080 }
1081 if (is.na(seed)) {
1082 stop("seed must not be NULL or NA; however, got NA")
1083 }
1084
1085 # TODO : Figure out how to send integer as java.lang.Long to JVM so
1086 # we can send seed as an argument through callJMethod
1087 sdf <- handledCallJMethod(x@sdf, "sample", as.logical(withReplacement),
1088 as.numeric(fraction), as.integer(seed))
1089 } else {
1090 sdf <- handledCallJMethod(x@sdf, "sample",
1091 as.logical(withReplacement), as.numeric(fraction))
1092 }
1093 dataFrame(sdf)
1094 })
1095
1096 #' @rdname sample
1097 #' @aliases sample_frac,SparkDataFrame-method
1098 #' @name sample_frac
1099 #' @note sample_frac since 1.4.0
1100 setMethod("sample_frac",
1101 signature(x = "SparkDataFrame"),
1102 function(x, withReplacement = FALSE, fraction, seed) {
1103 sample(x, withReplacement, fraction, seed)
1104 })
1105
1106 #' Returns the number of rows in a SparkDataFrame
1107 #'
1108 #' @param x a SparkDataFrame.
1109 #' @family SparkDataFrame functions
1110 #' @rdname nrow
1111 #' @name nrow
1112 #' @aliases count,SparkDataFrame-method
1113 #' @examples
1114 #'\dontrun{
1115 #' sparkR.session()
1116 #' path <- "path/to/file.json"
1117 #' df <- read.json(path)
1118 #' count(df)
1119 #' }
1120 #' @note count since 1.4.0
1121 setMethod("count",
1122 signature(x = "SparkDataFrame"),
1123 function(x) {
1124 callJMethod(x@sdf, "count")
1125 })
1126
1127 #' @name nrow
1128 #' @rdname nrow
1129 #' @aliases nrow,SparkDataFrame-method
1130 #' @note nrow since 1.5.0
1131 setMethod("nrow",
1132 signature(x = "SparkDataFrame"),
1133 function(x) {
1134 count(x)
1135 })
1136
1137 #' Returns the number of columns in a SparkDataFrame
1138 #'
1139 #' @param x a SparkDataFrame
1140 #'
1141 #' @family SparkDataFrame functions
1142 #' @rdname ncol
1143 #' @name ncol
1144 #' @aliases ncol,SparkDataFrame-method
1145 #' @examples
1146 #'\dontrun{
1147 #' sparkR.session()
1148 #' path <- "path/to/file.json"
1149 #' df <- read.json(path)
1150 #' ncol(df)
1151 #' }
1152 #' @note ncol since 1.5.0
1153 setMethod("ncol",
1154 signature(x = "SparkDataFrame"),
1155 function(x) {
1156 length(columns(x))
1157 })
1158
1159 #' Returns the dimensions of SparkDataFrame
1160 #'
1161 #' Returns the dimensions (number of rows and columns) of a SparkDataFrame
1162 #' @param x a SparkDataFrame
1163 #'
1164 #' @family SparkDataFrame functions
1165 #' @rdname dim
1166 #' @aliases dim,SparkDataFrame-method
1167 #' @name dim
1168 #' @examples
1169 #'\dontrun{
1170 #' sparkR.session()
1171 #' path <- "path/to/file.json"
1172 #' df <- read.json(path)
1173 #' dim(df)
1174 #' }
1175 #' @note dim since 1.5.0
1176 setMethod("dim",
1177 signature(x = "SparkDataFrame"),
1178 function(x) {
1179 c(count(x), ncol(x))
1180 })
1181
1182 #' Collects all the elements of a SparkDataFrame and coerces them into an R data.frame.
1183 #'
1184 #' @param x a SparkDataFrame.
1185 #' @param stringsAsFactors (Optional) a logical indicating whether or not string columns
1186 #' should be converted to factors. FALSE by default.
1187 #' @param ... further arguments to be passed to or from other methods.
1188 #'
1189 #' @family SparkDataFrame functions
1190 #' @rdname collect
1191 #' @aliases collect,SparkDataFrame-method
1192 #' @name collect
1193 #' @examples
1194 #'\dontrun{
1195 #' sparkR.session()
1196 #' path <- "path/to/file.json"
1197 #' df <- read.json(path)
1198 #' collected <- collect(df)
1199 #' class(collected)
1200 #' firstName <- names(collected)[1]
1201 #' }
1202 #' @note collect since 1.4.0
1203 setMethod("collect",
1204 signature(x = "SparkDataFrame"),
1205 function(x, stringsAsFactors = FALSE) {
1206 connectionTimeout <- as.numeric(Sys.getenv("SPARKR_BACKEND_CONNECTION_TIMEOUT", "6000"))
1207 useArrow <- FALSE
1208 arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.sparkr.enabled")[[1]] == "true"
1209 if (arrowEnabled) {
1210 useArrow <- tryCatch({
1211 checkSchemaInArrow(schema(x))
1212 TRUE
1213 }, error = function(e) {
1214 warning("The conversion from Spark DataFrame to R DataFrame was attempted ",
1215 "with Arrow optimization because ",
1216 "'spark.sql.execution.arrow.sparkr.enabled' is set to true; ",
1217 "however, failed, attempting non-optimization. Reason: ", e)
1218 FALSE
1219 })
1220 }
1221
1222 dtypes <- dtypes(x)
1223 ncol <- length(dtypes)
1224 if (ncol <= 0) {
1225 # empty data.frame with 0 columns and 0 rows
1226 data.frame()
1227 } else if (useArrow) {
1228 if (requireNamespace("arrow", quietly = TRUE)) {
1229 portAuth <- callJMethod(x@sdf, "collectAsArrowToR")
1230 port <- portAuth[[1]]
1231 authSecret <- portAuth[[2]]
1232 conn <- socketConnection(
1233 port = port, blocking = TRUE, open = "wb", timeout = connectionTimeout)
1234 output <- tryCatch({
1235 doServerAuth(conn, authSecret)
1236 arrowTable <- arrow::read_arrow(readRaw(conn))
1237 # Arrow drops `as_tibble` since 0.14.0, see ARROW-5190.
1238 if (exists("as_tibble", envir = asNamespace("arrow"))) {
1239 as.data.frame(arrow::as_tibble(arrowTable), stringsAsFactors = stringsAsFactors)
1240 } else {
1241 as.data.frame(arrowTable, stringsAsFactors = stringsAsFactors)
1242 }
1243 }, finally = {
1244 close(conn)
1245 })
1246 return(output)
1247 } else {
1248 stop("'arrow' package should be installed.")
1249 }
1250 } else {
1251 # listCols is a list of columns
1252 listCols <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "dfToCols", x@sdf)
1253 stopifnot(length(listCols) == ncol)
1254
1255 # An empty data.frame with 0 columns and number of rows as collected
1256 nrow <- length(listCols[[1]])
1257 if (nrow <= 0) {
1258 df <- data.frame()
1259 } else {
1260 df <- data.frame(row.names = 1 : nrow)
1261 }
1262
1263 # Append columns one by one
1264 for (colIndex in 1 : ncol) {
1265 # Note: appending a column of list type into a data.frame so that
1266 # data of complex type can be held. But getting a cell from a column
1267 # of list type returns a list instead of a vector. So for columns of
1268 # non-complex type, append them as vector.
1269 #
1270 # For columns of complex type, be careful to access them.
1271 # Get a column of complex type returns a list.
1272 # Get a cell from a column of complex type returns a list instead of a vector.
1273 col <- listCols[[colIndex]]
1274 if (length(col) <= 0) {
1275 df[[colIndex]] <- col
1276 } else {
1277 colType <- dtypes[[colIndex]][[2]]
1278 if (is.null(PRIMITIVE_TYPES[[colType]])) {
1279 specialtype <- specialtypeshandle(colType)
1280 if (!is.null(specialtype)) {
1281 colType <- specialtype
1282 }
1283 }
1284
1285 # Note that "binary" columns behave like complex types.
1286 if (!is.null(PRIMITIVE_TYPES[[colType]]) && colType != "binary") {
1287 vec <- do.call(c, col)
1288 stopifnot(class(vec) != "list")
1289 class(vec) <- PRIMITIVE_TYPES[[colType]]
1290 if (is.character(vec) && stringsAsFactors) {
1291 vec <- as.factor(vec)
1292 }
1293 df[[colIndex]] <- vec
1294 } else {
1295 df[[colIndex]] <- col
1296 }
1297 }
1298 }
1299 names(df) <- names(x)
1300 df
1301 }
1302 })
1303
1304 #' Limit
1305 #'
1306 #' Limit the resulting SparkDataFrame to the number of rows specified.
1307 #'
1308 #' @param x A SparkDataFrame
1309 #' @param num The number of rows to return
1310 #' @return A new SparkDataFrame containing the number of rows specified.
1311 #'
1312 #' @family SparkDataFrame functions
1313 #' @rdname limit
1314 #' @name limit
1315 #' @aliases limit,SparkDataFrame,numeric-method
1316 #' @examples
1317 #' \dontrun{
1318 #' sparkR.session()
1319 #' path <- "path/to/file.json"
1320 #' df <- read.json(path)
1321 #' limitedDF <- limit(df, 10)
1322 #' }
1323 #' @note limit since 1.4.0
1324 setMethod("limit",
1325 signature(x = "SparkDataFrame", num = "numeric"),
1326 function(x, num) {
1327 res <- callJMethod(x@sdf, "limit", as.integer(num))
1328 dataFrame(res)
1329 })
1330
1331 #' Take the first NUM rows of a SparkDataFrame and return the results as a R data.frame
1332 #'
1333 #' @param x a SparkDataFrame.
1334 #' @param num number of rows to take.
1335 #' @family SparkDataFrame functions
1336 #' @rdname take
1337 #' @name take
1338 #' @aliases take,SparkDataFrame,numeric-method
1339 #' @examples
1340 #'\dontrun{
1341 #' sparkR.session()
1342 #' path <- "path/to/file.json"
1343 #' df <- read.json(path)
1344 #' take(df, 2)
1345 #' }
1346 #' @note take since 1.4.0
1347 setMethod("take",
1348 signature(x = "SparkDataFrame", num = "numeric"),
1349 function(x, num) {
1350 limited <- limit(x, num)
1351 collect(limited)
1352 })
1353
1354 #' Head
1355 #'
1356 #' Return the first \code{num} rows of a SparkDataFrame as a R data.frame. If \code{num} is not
1357 #' specified, then head() returns the first 6 rows as with R data.frame.
1358 #'
1359 #' @param x a SparkDataFrame.
1360 #' @param num the number of rows to return. Default is 6.
1361 #' @return A data.frame.
1362 #'
1363 #' @family SparkDataFrame functions
1364 #' @aliases head,SparkDataFrame-method
1365 #' @rdname head
1366 #' @name head
1367 #' @examples
1368 #'\dontrun{
1369 #' sparkR.session()
1370 #' path <- "path/to/file.json"
1371 #' df <- read.json(path)
1372 #' head(df)
1373 #' }
1374 #' @note head since 1.4.0
1375 setMethod("head",
1376 signature(x = "SparkDataFrame"),
1377 function(x, num = 6L) {
1378 # Default num is 6L in keeping with R's data.frame convention
1379 take(x, num)
1380 })
1381
1382 #' Return the first row of a SparkDataFrame
1383 #'
1384 #' @param x a SparkDataFrame or a column used in aggregation function.
1385 #' @param ... further arguments to be passed to or from other methods.
1386 #'
1387 #' @family SparkDataFrame functions
1388 #' @aliases first,SparkDataFrame-method
1389 #' @rdname first
1390 #' @name first
1391 #' @examples
1392 #'\dontrun{
1393 #' sparkR.session()
1394 #' path <- "path/to/file.json"
1395 #' df <- read.json(path)
1396 #' first(df)
1397 #' }
1398 #' @note first(SparkDataFrame) since 1.4.0
1399 setMethod("first",
1400 signature(x = "SparkDataFrame"),
1401 function(x) {
1402 take(x, 1)
1403 })
1404
1405 #' toRDD
1406 #'
1407 #' Converts a SparkDataFrame to an RDD while preserving column names.
1408 #'
1409 #' @param x A SparkDataFrame
1410 #'
1411 #' @noRd
1412 #' @examples
1413 #'\dontrun{
1414 #' sparkR.session()
1415 #' path <- "path/to/file.json"
1416 #' df <- read.json(path)
1417 #' rdd <- toRDD(df)
1418 #'}
1419 setMethod("toRDD",
1420 signature(x = "SparkDataFrame"),
1421 function(x) {
1422 jrdd <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "dfToRowRDD", x@sdf)
1423 colNames <- callJMethod(x@sdf, "columns")
1424 rdd <- RDD(jrdd, serializedMode = "row")
1425 lapply(rdd, function(row) {
1426 names(row) <- colNames
1427 row
1428 })
1429 })
1430
1431 #' GroupBy
1432 #'
1433 #' Groups the SparkDataFrame using the specified columns, so we can run aggregation on them.
1434 #'
1435 #' @param x a SparkDataFrame.
1436 #' @param ... character name(s) or Column(s) to group on.
1437 #' @return A GroupedData.
1438 #' @family SparkDataFrame functions
1439 #' @aliases groupBy,SparkDataFrame-method
1440 #' @rdname groupBy
1441 #' @name groupBy
1442 #' @examples
1443 #' \dontrun{
1444 #' # Compute the average for all numeric columns grouped by department.
1445 #' avg(groupBy(df, "department"))
1446 #'
1447 #' # Compute the max age and average salary, grouped by department and gender.
1448 #' agg(groupBy(df, "department", "gender"), salary="avg", "age" -> "max")
1449 #' }
1450 #' @note groupBy since 1.4.0
1451 #' @seealso \link{agg}, \link{cube}, \link{rollup}
1452 setMethod("groupBy",
1453 signature(x = "SparkDataFrame"),
1454 function(x, ...) {
1455 cols <- list(...)
1456 if (length(cols) >= 1 && class(cols[[1]]) == "character") {
1457 sgd <- callJMethod(x@sdf, "groupBy", cols[[1]], cols[-1])
1458 } else {
1459 jcol <- lapply(cols, function(c) { c@jc })
1460 sgd <- callJMethod(x@sdf, "groupBy", jcol)
1461 }
1462 groupedData(sgd)
1463 })
1464
1465 #' @rdname groupBy
1466 #' @name group_by
1467 #' @aliases group_by,SparkDataFrame-method
1468 #' @note group_by since 1.4.0
1469 setMethod("group_by",
1470 signature(x = "SparkDataFrame"),
1471 function(x, ...) {
1472 groupBy(x, ...)
1473 })
1474
1475 #' Summarize data across columns
1476 #'
1477 #' Compute aggregates by specifying a list of columns
1478 #'
1479 #' @family SparkDataFrame functions
1480 #' @aliases agg,SparkDataFrame-method
1481 #' @rdname summarize
1482 #' @name agg
1483 #' @note agg since 1.4.0
1484 setMethod("agg",
1485 signature(x = "SparkDataFrame"),
1486 function(x, ...) {
1487 agg(groupBy(x), ...)
1488 })
1489
1490 #' @rdname summarize
1491 #' @name summarize
1492 #' @aliases summarize,SparkDataFrame-method
1493 #' @note summarize since 1.4.0
1494 setMethod("summarize",
1495 signature(x = "SparkDataFrame"),
1496 function(x, ...) {
1497 agg(x, ...)
1498 })
1499
1500 dapplyInternal <- function(x, func, schema) {
1501 if (is.character(schema)) {
1502 schema <- structType(schema)
1503 }
1504
1505 arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.sparkr.enabled")[[1]] == "true"
1506 if (arrowEnabled) {
1507 if (inherits(schema, "structType")) {
1508 checkSchemaInArrow(schema)
1509 } else if (is.null(schema)) {
1510 stop("Arrow optimization does not support 'dapplyCollect' yet. Please disable ",
1511 "Arrow optimization or use 'collect' and 'dapply' APIs instead.")
1512 } else {
1513 stop("'schema' should be DDL-formatted string or structType.")
1514 }
1515 }
1516
1517 packageNamesArr <- serialize(.sparkREnv[[".packages"]],
1518 connection = NULL)
1519
1520 broadcastArr <- lapply(ls(.broadcastNames),
1521 function(name) { get(name, .broadcastNames) })
1522
1523 sdf <- callJStatic(
1524 "org.apache.spark.sql.api.r.SQLUtils",
1525 "dapply",
1526 x@sdf,
1527 serialize(cleanClosure(func), connection = NULL),
1528 packageNamesArr,
1529 broadcastArr,
1530 if (is.null(schema)) { schema } else { schema$jobj })
1531 dataFrame(sdf)
1532 }
1533
1534 setClassUnion("characterOrstructType", c("character", "structType"))
1535
1536 #' dapply
1537 #'
1538 #' Apply a function to each partition of a SparkDataFrame.
1539 #'
1540 #' @param x A SparkDataFrame
1541 #' @param func A function to be applied to each partition of the SparkDataFrame.
1542 #' func should have only one parameter, to which a R data.frame corresponds
1543 #' to each partition will be passed.
1544 #' The output of func should be a R data.frame.
1545 #' @param schema The schema of the resulting SparkDataFrame after the function is applied.
1546 #' It must match the output of func. Since Spark 2.3, the DDL-formatted string
1547 #' is also supported for the schema.
1548 #' @family SparkDataFrame functions
1549 #' @rdname dapply
1550 #' @aliases dapply,SparkDataFrame,function,characterOrstructType-method
1551 #' @name dapply
1552 #' @seealso \link{dapplyCollect}
1553 #' @examples
1554 #' \dontrun{
1555 #' df <- createDataFrame(iris)
1556 #' df1 <- dapply(df, function(x) { x }, schema(df))
1557 #' collect(df1)
1558 #'
1559 #' # filter and add a column
1560 #' df <- createDataFrame(
1561 #' list(list(1L, 1, "1"), list(2L, 2, "2"), list(3L, 3, "3")),
1562 #' c("a", "b", "c"))
1563 #' schema <- structType(structField("a", "integer"), structField("b", "double"),
1564 #' structField("c", "string"), structField("d", "integer"))
1565 #' df1 <- dapply(
1566 #' df,
1567 #' function(x) {
1568 #' y <- x[x[1] > 1, ]
1569 #' y <- cbind(y, y[1] + 1L)
1570 #' },
1571 #' schema)
1572 #'
1573 #' # The schema also can be specified in a DDL-formatted string.
1574 #' schema <- "a INT, d DOUBLE, c STRING, d INT"
1575 #' df1 <- dapply(
1576 #' df,
1577 #' function(x) {
1578 #' y <- x[x[1] > 1, ]
1579 #' y <- cbind(y, y[1] + 1L)
1580 #' },
1581 #' schema)
1582 #'
1583 #' collect(df1)
1584 #' # the result
1585 #' # a b c d
1586 #' # 1 2 2 2 3
1587 #' # 2 3 3 3 4
1588 #' }
1589 #' @note dapply since 2.0.0
1590 setMethod("dapply",
1591 signature(x = "SparkDataFrame", func = "function", schema = "characterOrstructType"),
1592 function(x, func, schema) {
1593 dapplyInternal(x, func, schema)
1594 })
1595
1596 #' dapplyCollect
1597 #'
1598 #' Apply a function to each partition of a SparkDataFrame and collect the result back
1599 #' to R as a data.frame.
1600 #'
1601 #' @param x A SparkDataFrame
1602 #' @param func A function to be applied to each partition of the SparkDataFrame.
1603 #' func should have only one parameter, to which a R data.frame corresponds
1604 #' to each partition will be passed.
1605 #' The output of func should be a R data.frame.
1606 #' @family SparkDataFrame functions
1607 #' @rdname dapplyCollect
1608 #' @aliases dapplyCollect,SparkDataFrame,function-method
1609 #' @name dapplyCollect
1610 #' @seealso \link{dapply}
1611 #' @examples
1612 #' \dontrun{
1613 #' df <- createDataFrame(iris)
1614 #' ldf <- dapplyCollect(df, function(x) { x })
1615 #'
1616 #' # filter and add a column
1617 #' df <- createDataFrame(
1618 #' list(list(1L, 1, "1"), list(2L, 2, "2"), list(3L, 3, "3")),
1619 #' c("a", "b", "c"))
1620 #' ldf <- dapplyCollect(
1621 #' df,
1622 #' function(x) {
1623 #' y <- x[x[1] > 1, ]
1624 #' y <- cbind(y, y[1] + 1L)
1625 #' })
1626 #' # the result
1627 #' # a b c d
1628 #' # 2 2 2 3
1629 #' # 3 3 3 4
1630 #' }
1631 #' @note dapplyCollect since 2.0.0
1632 setMethod("dapplyCollect",
1633 signature(x = "SparkDataFrame", func = "function"),
1634 function(x, func) {
1635 df <- dapplyInternal(x, func, NULL)
1636
1637 content <- callJMethod(df@sdf, "collect")
1638 # content is a list of items of struct type. Each item has a single field
1639 # which is a serialized data.frame corresponds to one partition of the
1640 # SparkDataFrame.
1641 ldfs <- lapply(content, function(x) { unserialize(x[[1]]) })
1642 ldf <- do.call(rbind, ldfs)
1643 row.names(ldf) <- NULL
1644 ldf
1645 })
1646
1647 #' gapply
1648 #'
1649 #' Groups the SparkDataFrame using the specified columns and applies the R function to each
1650 #' group.
1651 #'
1652 #' @param cols grouping columns.
1653 #' @param func a function to be applied to each group partition specified by grouping
1654 #' column of the SparkDataFrame. See Details.
1655 #' @param schema the schema of the resulting SparkDataFrame after the function is applied.
1656 #' The schema must match to output of \code{func}. It has to be defined for each
1657 #' output column with preferred output column name and corresponding data type.
1658 #' Since Spark 2.3, the DDL-formatted string is also supported for the schema.
1659 #' @return A SparkDataFrame.
1660 #' @family SparkDataFrame functions
1661 #' @aliases gapply,SparkDataFrame-method
1662 #' @rdname gapply
1663 #' @name gapply
1664 #' @details
1665 #' \code{func} is a function of two arguments. The first, usually named \code{key}
1666 #' (though this is not enforced) corresponds to the grouping key, will be an
1667 #' unnamed \code{list} of \code{length(cols)} length-one objects corresponding
1668 #' to the grouping columns' values for the current group.
1669 #'
1670 #' The second, herein \code{x}, will be a local \code{\link{data.frame}} with the
1671 #' columns of the input not in \code{cols} for the rows corresponding to \code{key}.
1672 #'
1673 #' The output of \code{func} must be a \code{data.frame} matching \code{schema} --
1674 #' in particular this means the names of the output \code{data.frame} are irrelevant
1675 #'
1676 #' @seealso \link{gapplyCollect}
1677 #' @examples
1678 #'
1679 #' \dontrun{
1680 #' # Computes the arithmetic mean of the second column by grouping
1681 #' # on the first and third columns. Output the grouping values and the average.
1682 #'
1683 #' df <- createDataFrame (
1684 #' list(list(1L, 1, "1", 0.1), list(1L, 2, "1", 0.2), list(3L, 3, "3", 0.3)),
1685 #' c("a", "b", "c", "d"))
1686 #'
1687 #' # Here our output contains three columns, the key which is a combination of two
1688 #' # columns with data types integer and string and the mean which is a double.
1689 #' schema <- structType(structField("a", "integer"), structField("c", "string"),
1690 #' structField("avg", "double"))
1691 #' result <- gapply(
1692 #' df,
1693 #' c("a", "c"),
1694 #' function(key, x) {
1695 #' # key will either be list(1L, '1') (for the group where a=1L,c='1') or
1696 #' # list(3L, '3') (for the group where a=3L,c='3')
1697 #' y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE)
1698 #' }, schema)
1699 #'
1700 #' # The schema also can be specified in a DDL-formatted string.
1701 #' schema <- "a INT, c STRING, avg DOUBLE"
1702 #' result <- gapply(
1703 #' df,
1704 #' c("a", "c"),
1705 #' function(key, x) {
1706 #' y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE)
1707 #' }, schema)
1708 #'
1709 #' # We can also group the data and afterwards call gapply on GroupedData.
1710 #' # For example:
1711 #' gdf <- group_by(df, "a", "c")
1712 #' result <- gapply(
1713 #' gdf,
1714 #' function(key, x) {
1715 #' y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE)
1716 #' }, schema)
1717 #' collect(result)
1718 #'
1719 #' # Result
1720 #' # ------
1721 #' # a c avg
1722 #' # 3 3 3.0
1723 #' # 1 1 1.5
1724 #'
1725 #' # Fits linear models on iris dataset by grouping on the 'Species' column and
1726 #' # using 'Sepal_Length' as a target variable, 'Sepal_Width', 'Petal_Length'
1727 #' # and 'Petal_Width' as training features.
1728 #'
1729 #' df <- createDataFrame (iris)
1730 #' schema <- structType(structField("(Intercept)", "double"),
1731 #' structField("Sepal_Width", "double"),structField("Petal_Length", "double"),
1732 #' structField("Petal_Width", "double"))
1733 #' df1 <- gapply(
1734 #' df,
1735 #' df$"Species",
1736 #' function(key, x) {
1737 #' m <- suppressWarnings(lm(Sepal_Length ~
1738 #' Sepal_Width + Petal_Length + Petal_Width, x))
1739 #' data.frame(t(coef(m)))
1740 #' }, schema)
1741 #' collect(df1)
1742 #'
1743 #' # Result
1744 #' # ---------
1745 #' # Model (Intercept) Sepal_Width Petal_Length Petal_Width
1746 #' # 1 0.699883 0.3303370 0.9455356 -0.1697527
1747 #' # 2 1.895540 0.3868576 0.9083370 -0.6792238
1748 #' # 3 2.351890 0.6548350 0.2375602 0.2521257
1749 #'
1750 #'}
1751 #' @note gapply(SparkDataFrame) since 2.0.0
1752 setMethod("gapply",
1753 signature(x = "SparkDataFrame"),
1754 function(x, cols, func, schema) {
1755 grouped <- do.call("groupBy", c(x, cols))
1756 gapply(grouped, func, schema)
1757 })
1758
1759 #' gapplyCollect
1760 #'
1761 #' Groups the SparkDataFrame using the specified columns, applies the R function to each
1762 #' group and collects the result back to R as data.frame.
1763 #'
1764 #' @param cols grouping columns.
1765 #' @param func a function to be applied to each group partition specified by grouping
1766 #' column of the SparkDataFrame. See Details.
1767 #' @return A data.frame.
1768 #' @family SparkDataFrame functions
1769 #' @aliases gapplyCollect,SparkDataFrame-method
1770 #' @rdname gapplyCollect
1771 #' @name gapplyCollect
1772 #' @details
1773 #' \code{func} is a function of two arguments. The first, usually named \code{key}
1774 #' (though this is not enforced) corresponds to the grouping key, will be an
1775 #' unnamed \code{list} of \code{length(cols)} length-one objects corresponding
1776 #' to the grouping columns' values for the current group.
1777 #'
1778 #' The second, herein \code{x}, will be a local \code{\link{data.frame}} with the
1779 #' columns of the input not in \code{cols} for the rows corresponding to \code{key}.
1780 #'
1781 #' The output of \code{func} must be a \code{data.frame} matching \code{schema} --
1782 #' in particular this means the names of the output \code{data.frame} are irrelevant
1783 #'
1784 #' @seealso \link{gapply}
1785 #' @examples
1786 #'
1787 #' \dontrun{
1788 #' # Computes the arithmetic mean of the second column by grouping
1789 #' # on the first and third columns. Output the grouping values and the average.
1790 #'
1791 #' df <- createDataFrame (
1792 #' list(list(1L, 1, "1", 0.1), list(1L, 2, "1", 0.2), list(3L, 3, "3", 0.3)),
1793 #' c("a", "b", "c", "d"))
1794 #'
1795 #' result <- gapplyCollect(
1796 #' df,
1797 #' c("a", "c"),
1798 #' function(key, x) {
1799 #' y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE)
1800 #' colnames(y) <- c("key_a", "key_c", "mean_b")
1801 #' y
1802 #' })
1803 #'
1804 #' # We can also group the data and afterwards call gapply on GroupedData.
1805 #' # For example:
1806 #' gdf <- group_by(df, "a", "c")
1807 #' result <- gapplyCollect(
1808 #' gdf,
1809 #' function(key, x) {
1810 #' y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE)
1811 #' colnames(y) <- c("key_a", "key_c", "mean_b")
1812 #' y
1813 #' })
1814 #'
1815 #' # Result
1816 #' # ------
1817 #' # key_a key_c mean_b
1818 #' # 3 3 3.0
1819 #' # 1 1 1.5
1820 #'
1821 #' # Fits linear models on iris dataset by grouping on the 'Species' column and
1822 #' # using 'Sepal_Length' as a target variable, 'Sepal_Width', 'Petal_Length'
1823 #' # and 'Petal_Width' as training features.
1824 #'
1825 #' df <- createDataFrame (iris)
1826 #' result <- gapplyCollect(
1827 #' df,
1828 #' df$"Species",
1829 #' function(key, x) {
1830 #' m <- suppressWarnings(lm(Sepal_Length ~
1831 #' Sepal_Width + Petal_Length + Petal_Width, x))
1832 #' data.frame(t(coef(m)))
1833 #' })
1834 #'
1835 #' # Result
1836 #' # ---------
1837 #' # Model X.Intercept. Sepal_Width Petal_Length Petal_Width
1838 #' # 1 0.699883 0.3303370 0.9455356 -0.1697527
1839 #' # 2 1.895540 0.3868576 0.9083370 -0.6792238
1840 #' # 3 2.351890 0.6548350 0.2375602 0.2521257
1841 #'
1842 #'}
1843 #' @note gapplyCollect(SparkDataFrame) since 2.0.0
1844 setMethod("gapplyCollect",
1845 signature(x = "SparkDataFrame"),
1846 function(x, cols, func) {
1847 grouped <- do.call("groupBy", c(x, cols))
1848 gapplyCollect(grouped, func)
1849 })
1850
1851 ############################## RDD Map Functions ##################################
1852 # All of the following functions mirror the existing RDD map functions, #
1853 # but allow for use with DataFrames by first converting to an RRDD before calling #
1854 # the requested map function. #
1855 ###################################################################################
1856
1857 #' @rdname lapply
1858 #' @noRd
1859 setMethod("lapply",
1860 signature(X = "SparkDataFrame", FUN = "function"),
1861 function(X, FUN) {
1862 rdd <- toRDD(X)
1863 lapply(rdd, FUN)
1864 })
1865
1866 #' @rdname lapply
1867 #' @noRd
1868 setMethod("map",
1869 signature(X = "SparkDataFrame", FUN = "function"),
1870 function(X, FUN) {
1871 lapply(X, FUN)
1872 })
1873
1874 #' @rdname flatMap
1875 #' @noRd
1876 setMethod("flatMap",
1877 signature(X = "SparkDataFrame", FUN = "function"),
1878 function(X, FUN) {
1879 rdd <- toRDD(X)
1880 flatMap(rdd, FUN)
1881 })
1882
1883 #' @rdname lapplyPartition
1884 #' @noRd
1885 setMethod("lapplyPartition",
1886 signature(X = "SparkDataFrame", FUN = "function"),
1887 function(X, FUN) {
1888 rdd <- toRDD(X)
1889 lapplyPartition(rdd, FUN)
1890 })
1891
1892 #' @rdname lapplyPartition
1893 #' @noRd
1894 setMethod("mapPartitions",
1895 signature(X = "SparkDataFrame", FUN = "function"),
1896 function(X, FUN) {
1897 lapplyPartition(X, FUN)
1898 })
1899
1900 #' @rdname foreach
1901 #' @noRd
1902 setMethod("foreach",
1903 signature(x = "SparkDataFrame", func = "function"),
1904 function(x, func) {
1905 rdd <- toRDD(x)
1906 foreach(rdd, func)
1907 })
1908
1909 #' @rdname foreach
1910 #' @noRd
1911 setMethod("foreachPartition",
1912 signature(x = "SparkDataFrame", func = "function"),
1913 function(x, func) {
1914 rdd <- toRDD(x)
1915 foreachPartition(rdd, func)
1916 })
1917
1918
1919 ############################## SELECT ##################################
1920
1921 getColumn <- function(x, c) {
1922 column(callJMethod(x@sdf, "col", c))
1923 }
1924
1925 setColumn <- function(x, c, value) {
1926 if (class(value) != "Column" && !is.null(value)) {
1927 if (isAtomicLengthOne(value)) {
1928 value <- lit(value)
1929 } else {
1930 stop("value must be a Column, literal value as atomic in length of 1, or NULL")
1931 }
1932 }
1933
1934 if (is.null(value)) {
1935 nx <- drop(x, c)
1936 } else {
1937 nx <- withColumn(x, c, value)
1938 }
1939 nx
1940 }
1941
1942 #' @param name name of a Column (without being wrapped by \code{""}).
1943 #' @rdname select
1944 #' @name $
1945 #' @aliases $,SparkDataFrame-method
1946 #' @note $ since 1.4.0
1947 setMethod("$", signature(x = "SparkDataFrame"),
1948 function(x, name) {
1949 getColumn(x, name)
1950 })
1951
1952 #' @param value a Column or an atomic vector in the length of 1 as literal value, or \code{NULL}.
1953 #' If \code{NULL}, the specified Column is dropped.
1954 #' @rdname select
1955 #' @name $<-
1956 #' @aliases $<-,SparkDataFrame-method
1957 #' @note $<- since 1.4.0
1958 setMethod("$<-", signature(x = "SparkDataFrame"),
1959 function(x, name, value) {
1960 nx <- setColumn(x, name, value)
1961 x@sdf <- nx@sdf
1962 x
1963 })
1964
1965 setClassUnion("numericOrcharacter", c("numeric", "character"))
1966
1967 #' @rdname subset
1968 #' @name [[
1969 #' @aliases [[,SparkDataFrame,numericOrcharacter-method
1970 #' @note [[ since 1.4.0
1971 setMethod("[[", signature(x = "SparkDataFrame", i = "numericOrcharacter"),
1972 function(x, i) {
1973 if (length(i) > 1) {
1974 warning("Subset index has length > 1. Only the first index is used.")
1975 i <- i[1]
1976 }
1977 if (is.numeric(i)) {
1978 cols <- columns(x)
1979 i <- cols[[i]]
1980 }
1981 getColumn(x, i)
1982 })
1983
1984 #' @rdname subset
1985 #' @name [[<-
1986 #' @aliases [[<-,SparkDataFrame,numericOrcharacter-method
1987 #' @note [[<- since 2.1.1
1988 setMethod("[[<-", signature(x = "SparkDataFrame", i = "numericOrcharacter"),
1989 function(x, i, value) {
1990 if (length(i) > 1) {
1991 warning("Subset index has length > 1. Only the first index is used.")
1992 i <- i[1]
1993 }
1994 if (is.numeric(i)) {
1995 cols <- columns(x)
1996 i <- cols[[i]]
1997 }
1998 nx <- setColumn(x, i, value)
1999 x@sdf <- nx@sdf
2000 x
2001 })
2002
2003 #' @rdname subset
2004 #' @name [
2005 #' @aliases [,SparkDataFrame-method
2006 #' @note [ since 1.4.0
2007 setMethod("[", signature(x = "SparkDataFrame"),
2008 function(x, i, j, ..., drop = F) {
2009 # Perform filtering first if needed
2010 filtered <- if (missing(i)) {
2011 x
2012 } else {
2013 if (class(i) != "Column") {
2014 stop("Expressions other than filtering predicates are not supported ",
2015 "in the first parameter of extract operator [ or subset() method.")
2016 }
2017 filter(x, i)
2018 }
2019
2020 # If something is to be projected, then do so on the filtered SparkDataFrame
2021 if (missing(j)) {
2022 filtered
2023 } else {
2024 if (is.numeric(j)) {
2025 cols <- columns(filtered)
2026 j <- cols[j]
2027 }
2028 if (length(j) > 1) {
2029 j <- as.list(j)
2030 }
2031 selected <- select(filtered, j)
2032
2033 # Acknowledge parameter drop. Return a Column or SparkDataFrame accordingly
2034 if (ncol(selected) == 1 & drop == T) {
2035 getColumn(selected, names(selected))
2036 } else {
2037 selected
2038 }
2039 }
2040 })
2041
2042 #' Subset
2043 #'
2044 #' Return subsets of SparkDataFrame according to given conditions
2045 #' @param x a SparkDataFrame.
2046 #' @param i,subset (Optional) a logical expression to filter on rows.
2047 #' For extract operator [[ and replacement operator [[<-, the indexing parameter for
2048 #' a single Column.
2049 #' @param j,select expression for the single Column or a list of columns to select from the
2050 #' SparkDataFrame.
2051 #' @param drop if TRUE, a Column will be returned if the resulting dataset has only one column.
2052 #' Otherwise, a SparkDataFrame will always be returned.
2053 #' @param value a Column or an atomic vector in the length of 1 as literal value, or \code{NULL}.
2054 #' If \code{NULL}, the specified Column is dropped.
2055 #' @param ... currently not used.
2056 #' @return A new SparkDataFrame containing only the rows that meet the condition with selected
2057 #' columns.
2058 #' @family SparkDataFrame functions
2059 #' @aliases subset,SparkDataFrame-method
2060 #' @seealso \link{withColumn}
2061 #' @rdname subset
2062 #' @name subset
2063 #' @family subsetting functions
2064 #' @examples
2065 #' \dontrun{
2066 #' # Columns can be selected using [[ and [
2067 #' df[[2]] == df[["age"]]
2068 #' df[,2] == df[,"age"]
2069 #' df[,c("name", "age")]
2070 #' # Or to filter rows
2071 #' df[df$age > 20,]
2072 #' # SparkDataFrame can be subset on both rows and Columns
2073 #' df[df$name == "Smith", c(1,2)]
2074 #' df[df$age %in% c(19, 30), 1:2]
2075 #' subset(df, df$age %in% c(19, 30), 1:2)
2076 #' subset(df, df$age %in% c(19), select = c(1,2))
2077 #' subset(df, select = c(1,2))
2078 #' # Columns can be selected and set
2079 #' df[["age"]] <- 23
2080 #' df[[1]] <- df$age
2081 #' df[[2]] <- NULL # drop column
2082 #' }
2083 #' @note subset since 1.5.0
2084 setMethod("subset", signature(x = "SparkDataFrame"),
2085 function(x, subset, select, drop = F, ...) {
2086 if (missing(subset)) {
2087 x[, select, drop = drop, ...]
2088 } else {
2089 x[subset, select, drop = drop, ...]
2090 }
2091 })
2092
2093 #' Select
2094 #'
2095 #' Selects a set of columns with names or Column expressions.
2096 #' @param x a SparkDataFrame.
2097 #' @param col a list of columns or single Column or name.
2098 #' @param ... additional column(s) if only one column is specified in \code{col}.
2099 #' If more than one column is assigned in \code{col}, \code{...}
2100 #' should be left empty.
2101 #' @return A new SparkDataFrame with selected columns.
2102 #' @family SparkDataFrame functions
2103 #' @rdname select
2104 #' @aliases select,SparkDataFrame,character-method
2105 #' @name select
2106 #' @family subsetting functions
2107 #' @examples
2108 #' \dontrun{
2109 #' select(df, "*")
2110 #' select(df, "col1", "col2")
2111 #' select(df, df$name, df$age + 1)
2112 #' select(df, c("col1", "col2"))
2113 #' select(df, list(df$name, df$age + 1))
2114 #' # Similar to R data frames columns can also be selected using $
2115 #' df[,df$age]
2116 #' }
2117 #' @note select(SparkDataFrame, character) since 1.4.0
2118 setMethod("select", signature(x = "SparkDataFrame", col = "character"),
2119 function(x, col, ...) {
2120 if (length(col) > 1) {
2121 if (length(list(...)) > 0) {
2122 stop("To select multiple columns, use a character vector or list for col")
2123 }
2124
2125 select(x, as.list(col))
2126 } else {
2127 sdf <- callJMethod(x@sdf, "select", col, list(...))
2128 dataFrame(sdf)
2129 }
2130 })
2131
2132 #' @rdname select
2133 #' @aliases select,SparkDataFrame,Column-method
2134 #' @note select(SparkDataFrame, Column) since 1.4.0
2135 setMethod("select", signature(x = "SparkDataFrame", col = "Column"),
2136 function(x, col, ...) {
2137 jcols <- lapply(list(col, ...), function(c) {
2138 c@jc
2139 })
2140 sdf <- callJMethod(x@sdf, "select", jcols)
2141 dataFrame(sdf)
2142 })
2143
2144 #' @rdname select
2145 #' @aliases select,SparkDataFrame,list-method
2146 #' @note select(SparkDataFrame, list) since 1.4.0
2147 setMethod("select",
2148 signature(x = "SparkDataFrame", col = "list"),
2149 function(x, col) {
2150 cols <- lapply(col, function(c) {
2151 if (class(c) == "Column") {
2152 c@jc
2153 } else {
2154 col(c)@jc
2155 }
2156 })
2157 sdf <- callJMethod(x@sdf, "select", cols)
2158 dataFrame(sdf)
2159 })
2160
2161 #' SelectExpr
2162 #'
2163 #' Select from a SparkDataFrame using a set of SQL expressions.
2164 #'
2165 #' @param x A SparkDataFrame to be selected from.
2166 #' @param expr A string containing a SQL expression
2167 #' @param ... Additional expressions
2168 #' @return A SparkDataFrame
2169 #' @family SparkDataFrame functions
2170 #' @aliases selectExpr,SparkDataFrame,character-method
2171 #' @rdname selectExpr
2172 #' @name selectExpr
2173 #' @examples
2174 #'\dontrun{
2175 #' sparkR.session()
2176 #' path <- "path/to/file.json"
2177 #' df <- read.json(path)
2178 #' selectExpr(df, "col1", "(col2 * 5) as newCol")
2179 #' }
2180 #' @note selectExpr since 1.4.0
2181 setMethod("selectExpr",
2182 signature(x = "SparkDataFrame", expr = "character"),
2183 function(x, expr, ...) {
2184 exprList <- list(expr, ...)
2185 sdf <- callJMethod(x@sdf, "selectExpr", exprList)
2186 dataFrame(sdf)
2187 })
2188
2189 #' WithColumn
2190 #'
2191 #' Return a new SparkDataFrame by adding a column or replacing the existing column
2192 #' that has the same name.
2193 #'
2194 #' Note: This method introduces a projection internally. Therefore, calling it multiple times,
2195 #' for instance, via loops in order to add multiple columns can generate big plans which
2196 #' can cause performance issues and even \code{StackOverflowException}. To avoid this,
2197 #' use \code{select} with the multiple columns at once.
2198 #'
2199 #' @param x a SparkDataFrame.
2200 #' @param colName a column name.
2201 #' @param col a Column expression (which must refer only to this SparkDataFrame), or an atomic
2202 #' vector in the length of 1 as literal value.
2203 #' @return A SparkDataFrame with the new column added or the existing column replaced.
2204 #' @family SparkDataFrame functions
2205 #' @aliases withColumn,SparkDataFrame,character-method
2206 #' @rdname withColumn
2207 #' @name withColumn
2208 #' @seealso \link{rename} \link{mutate} \link{subset}
2209 #' @examples
2210 #'\dontrun{
2211 #' sparkR.session()
2212 #' path <- "path/to/file.json"
2213 #' df <- read.json(path)
2214 #' newDF <- withColumn(df, "newCol", df$col1 * 5)
2215 #' # Replace an existing column
2216 #' newDF2 <- withColumn(newDF, "newCol", newDF$col1)
2217 #' newDF3 <- withColumn(newDF, "newCol", 42)
2218 #' # Use extract operator to set an existing or new column
2219 #' df[["age"]] <- 23
2220 #' df[[2]] <- df$col1
2221 #' df[[2]] <- NULL # drop column
2222 #' }
2223 #' @note withColumn since 1.4.0
2224 setMethod("withColumn",
2225 signature(x = "SparkDataFrame", colName = "character"),
2226 function(x, colName, col) {
2227 if (class(col) != "Column") {
2228 if (!isAtomicLengthOne(col)) stop("Literal value must be atomic in length of 1")
2229 col <- lit(col)
2230 }
2231 sdf <- callJMethod(x@sdf, "withColumn", colName, col@jc)
2232 dataFrame(sdf)
2233 })
2234
2235 #' Mutate
2236 #'
2237 #' Return a new SparkDataFrame with the specified columns added or replaced.
2238 #'
2239 #' @param .data a SparkDataFrame.
2240 #' @param ... additional column argument(s) each in the form name = col.
2241 #' @return A new SparkDataFrame with the new columns added or replaced.
2242 #' @family SparkDataFrame functions
2243 #' @aliases mutate,SparkDataFrame-method
2244 #' @rdname mutate
2245 #' @name mutate
2246 #' @seealso \link{rename} \link{withColumn}
2247 #' @examples
2248 #'\dontrun{
2249 #' sparkR.session()
2250 #' path <- "path/to/file.json"
2251 #' df <- read.json(path)
2252 #' newDF <- mutate(df, newCol = df$col1 * 5, newCol2 = df$col1 * 2)
2253 #' names(newDF) # Will contain newCol, newCol2
2254 #' newDF2 <- transform(df, newCol = df$col1 / 5, newCol2 = df$col1 * 2)
2255 #'
2256 #' df <- createDataFrame(list(list("Andy", 30L), list("Justin", 19L)), c("name", "age"))
2257 #' # Replace the "age" column
2258 #' df1 <- mutate(df, age = df$age + 1L)
2259 #' }
2260 #' @note mutate since 1.4.0
2261 setMethod("mutate",
2262 signature(.data = "SparkDataFrame"),
2263 function(.data, ...) {
2264 x <- .data
2265 cols <- list(...)
2266 if (length(cols) <= 0) {
2267 return(x)
2268 }
2269
2270 lapply(cols, function(col) {
2271 stopifnot(class(col) == "Column")
2272 })
2273
2274 # Check if there is any duplicated column name in the DataFrame
2275 dfCols <- columns(x)
2276 if (length(unique(dfCols)) != length(dfCols)) {
2277 stop("Error: found duplicated column name in the DataFrame")
2278 }
2279
2280 # TODO: simplify the implementation of this method after SPARK-12225 is resolved.
2281
2282 # For named arguments, use the names for arguments as the column names
2283 # For unnamed arguments, use the argument symbols as the column names
2284 args <- sapply(substitute(list(...))[-1], deparse)
2285 ns <- names(cols)
2286 if (!is.null(ns)) {
2287 lapply(seq_along(args), function(i) {
2288 if (ns[[i]] != "") {
2289 args[[i]] <<- ns[[i]]
2290 }
2291 })
2292 }
2293 ns <- args
2294
2295 # The last column of the same name in the specific columns takes effect
2296 deDupCols <- list()
2297 for (i in seq_len(length(cols))) {
2298 deDupCols[[ns[[i]]]] <- alias(cols[[i]], ns[[i]])
2299 }
2300
2301 # Construct the column list for projection
2302 colList <- lapply(dfCols, function(col) {
2303 if (!is.null(deDupCols[[col]])) {
2304 # Replace existing column
2305 tmpCol <- deDupCols[[col]]
2306 deDupCols[[col]] <<- NULL
2307 tmpCol
2308 } else {
2309 col(col)
2310 }
2311 })
2312
2313 do.call(select, c(x, colList, deDupCols))
2314 })
2315
2316 #' @param _data a SparkDataFrame.
2317 #' @rdname mutate
2318 #' @aliases transform,SparkDataFrame-method
2319 #' @name transform
2320 #' @note transform since 1.5.0
2321 setMethod("transform",
2322 signature(`_data` = "SparkDataFrame"),
2323 function(`_data`, ...) {
2324 mutate(`_data`, ...)
2325 })
2326
2327 #' rename
2328 #'
2329 #' Rename an existing column in a SparkDataFrame.
2330 #'
2331 #' @param x A SparkDataFrame
2332 #' @param existingCol The name of the column you want to change.
2333 #' @param newCol The new column name.
2334 #' @return A SparkDataFrame with the column name changed.
2335 #' @family SparkDataFrame functions
2336 #' @rdname rename
2337 #' @name withColumnRenamed
2338 #' @aliases withColumnRenamed,SparkDataFrame,character,character-method
2339 #' @seealso \link{mutate}
2340 #' @examples
2341 #'\dontrun{
2342 #' sparkR.session()
2343 #' path <- "path/to/file.json"
2344 #' df <- read.json(path)
2345 #' newDF <- withColumnRenamed(df, "col1", "newCol1")
2346 #' }
2347 #' @note withColumnRenamed since 1.4.0
2348 setMethod("withColumnRenamed",
2349 signature(x = "SparkDataFrame", existingCol = "character", newCol = "character"),
2350 function(x, existingCol, newCol) {
2351 cols <- lapply(columns(x), function(c) {
2352 if (c == existingCol) {
2353 alias(col(c), newCol)
2354 } else {
2355 col(c)
2356 }
2357 })
2358 select(x, cols)
2359 })
2360
2361 #' @param ... A named pair of the form new_column_name = existing_column
2362 #' @rdname rename
2363 #' @name rename
2364 #' @aliases rename,SparkDataFrame-method
2365 #' @examples
2366 #'\dontrun{
2367 #' sparkR.session()
2368 #' path <- "path/to/file.json"
2369 #' df <- read.json(path)
2370 #' newDF <- rename(df, col1 = df$newCol1)
2371 #' }
2372 #' @note rename since 1.4.0
2373 setMethod("rename",
2374 signature(x = "SparkDataFrame"),
2375 function(x, ...) {
2376 renameCols <- list(...)
2377 stopifnot(length(renameCols) > 0)
2378 stopifnot(class(renameCols[[1]]) == "Column")
2379 newNames <- names(renameCols)
2380 oldNames <- lapply(renameCols, function(col) {
2381 callJMethod(col@jc, "toString")
2382 })
2383 cols <- lapply(columns(x), function(c) {
2384 if (c %in% oldNames) {
2385 alias(col(c), newNames[[match(c, oldNames)]])
2386 } else {
2387 col(c)
2388 }
2389 })
2390 select(x, cols)
2391 })
2392
2393 setClassUnion("characterOrColumn", c("character", "Column"))
2394
2395 setClassUnion("numericOrColumn", c("numeric", "Column"))
2396
2397 #' Arrange Rows by Variables
2398 #'
2399 #' Sort a SparkDataFrame by the specified column(s).
2400 #'
2401 #' @param x a SparkDataFrame to be sorted.
2402 #' @param col a character or Column object indicating the fields to sort on
2403 #' @param ... additional sorting fields
2404 #' @param decreasing a logical argument indicating sorting order for columns when
2405 #' a character vector is specified for col
2406 #' @param withinPartitions a logical argument indicating whether to sort only within each partition
2407 #' @return A SparkDataFrame where all elements are sorted.
2408 #' @family SparkDataFrame functions
2409 #' @aliases arrange,SparkDataFrame,Column-method
2410 #' @rdname arrange
2411 #' @name arrange
2412 #' @examples
2413 #'\dontrun{
2414 #' sparkR.session()
2415 #' path <- "path/to/file.json"
2416 #' df <- read.json(path)
2417 #' arrange(df, df$col1)
2418 #' arrange(df, asc(df$col1), desc(abs(df$col2)))
2419 #' arrange(df, "col1", decreasing = TRUE)
2420 #' arrange(df, "col1", "col2", decreasing = c(TRUE, FALSE))
2421 #' arrange(df, "col1", "col2", withinPartitions = TRUE)
2422 #' }
2423 #' @note arrange(SparkDataFrame, Column) since 1.4.0
2424 setMethod("arrange",
2425 signature(x = "SparkDataFrame", col = "Column"),
2426 function(x, col, ..., withinPartitions = FALSE) {
2427 jcols <- lapply(list(col, ...), function(c) {
2428 c@jc
2429 })
2430
2431 if (withinPartitions) {
2432 sdf <- callJMethod(x@sdf, "sortWithinPartitions", jcols)
2433 } else {
2434 sdf <- callJMethod(x@sdf, "sort", jcols)
2435 }
2436 dataFrame(sdf)
2437 })
2438
2439 #' @rdname arrange
2440 #' @name arrange
2441 #' @aliases arrange,SparkDataFrame,character-method
2442 #' @note arrange(SparkDataFrame, character) since 1.4.0
2443 setMethod("arrange",
2444 signature(x = "SparkDataFrame", col = "character"),
2445 function(x, col, ..., decreasing = FALSE, withinPartitions = FALSE) {
2446
2447 # all sorting columns
2448 by <- list(col, ...)
2449
2450 if (length(decreasing) == 1) {
2451 # in case only 1 boolean argument - decreasing value is specified,
2452 # it will be used for all columns
2453 decreasing <- rep(decreasing, length(by))
2454 } else if (length(decreasing) != length(by)) {
2455 stop("Arguments 'col' and 'decreasing' must have the same length")
2456 }
2457
2458 # builds a list of columns of type Column
2459 # example: [[1]] Column Species ASC
2460 # [[2]] Column Petal_Length DESC
2461 jcols <- lapply(seq_len(length(decreasing)), function(i) {
2462 if (decreasing[[i]]) {
2463 desc(getColumn(x, by[[i]]))
2464 } else {
2465 asc(getColumn(x, by[[i]]))
2466 }
2467 })
2468
2469 do.call("arrange", c(x, jcols, withinPartitions = withinPartitions))
2470 })
2471
2472 #' @rdname arrange
2473 #' @aliases orderBy,SparkDataFrame,characterOrColumn-method
2474 #' @note orderBy(SparkDataFrame, characterOrColumn) since 1.4.0
2475 setMethod("orderBy",
2476 signature(x = "SparkDataFrame", col = "characterOrColumn"),
2477 function(x, col, ...) {
2478 arrange(x, col, ...)
2479 })
2480
2481 #' Filter
2482 #'
2483 #' Filter the rows of a SparkDataFrame according to a given condition.
2484 #'
2485 #' @param x A SparkDataFrame to be sorted.
2486 #' @param condition The condition to filter on. This may either be a Column expression
2487 #' or a string containing a SQL statement
2488 #' @return A SparkDataFrame containing only the rows that meet the condition.
2489 #' @family SparkDataFrame functions
2490 #' @aliases filter,SparkDataFrame,characterOrColumn-method
2491 #' @rdname filter
2492 #' @name filter
2493 #' @family subsetting functions
2494 #' @examples
2495 #'\dontrun{
2496 #' sparkR.session()
2497 #' path <- "path/to/file.json"
2498 #' df <- read.json(path)
2499 #' filter(df, "col1 > 0")
2500 #' filter(df, df$col2 != "abcdefg")
2501 #' }
2502 #' @note filter since 1.4.0
2503 setMethod("filter",
2504 signature(x = "SparkDataFrame", condition = "characterOrColumn"),
2505 function(x, condition) {
2506 if (class(condition) == "Column") {
2507 condition <- condition@jc
2508 }
2509 sdf <- callJMethod(x@sdf, "filter", condition)
2510 dataFrame(sdf)
2511 })
2512
2513 #' @rdname filter
2514 #' @name where
2515 #' @aliases where,SparkDataFrame,characterOrColumn-method
2516 #' @note where since 1.4.0
2517 setMethod("where",
2518 signature(x = "SparkDataFrame", condition = "characterOrColumn"),
2519 function(x, condition) {
2520 filter(x, condition)
2521 })
2522
2523 #' dropDuplicates
2524 #'
2525 #' Returns a new SparkDataFrame with duplicate rows removed, considering only
2526 #' the subset of columns.
2527 #'
2528 #' @param x A SparkDataFrame.
2529 #' @param ... A character vector of column names or string column names.
2530 #' If the first argument contains a character vector, the followings are ignored.
2531 #' @return A SparkDataFrame with duplicate rows removed.
2532 #' @family SparkDataFrame functions
2533 #' @aliases dropDuplicates,SparkDataFrame-method
2534 #' @rdname dropDuplicates
2535 #' @name dropDuplicates
2536 #' @examples
2537 #'\dontrun{
2538 #' sparkR.session()
2539 #' path <- "path/to/file.json"
2540 #' df <- read.json(path)
2541 #' dropDuplicates(df)
2542 #' dropDuplicates(df, "col1", "col2")
2543 #' dropDuplicates(df, c("col1", "col2"))
2544 #' }
2545 #' @note dropDuplicates since 2.0.0
2546 setMethod("dropDuplicates",
2547 signature(x = "SparkDataFrame"),
2548 function(x, ...) {
2549 cols <- list(...)
2550 if (length(cols) == 0) {
2551 sdf <- callJMethod(x@sdf, "dropDuplicates", as.list(columns(x)))
2552 } else {
2553 if (!all(sapply(cols, function(c) { is.character(c) }))) {
2554 stop("all columns names should be characters")
2555 }
2556 col <- cols[[1]]
2557 if (length(col) > 1) {
2558 sdf <- callJMethod(x@sdf, "dropDuplicates", as.list(col))
2559 } else {
2560 sdf <- callJMethod(x@sdf, "dropDuplicates", cols)
2561 }
2562 }
2563 dataFrame(sdf)
2564 })
2565
2566 #' Join
2567 #'
2568 #' Joins two SparkDataFrames based on the given join expression.
2569 #'
2570 #' @param x A SparkDataFrame
2571 #' @param y A SparkDataFrame
2572 #' @param joinExpr (Optional) The expression used to perform the join. joinExpr must be a
2573 #' Column expression. If joinExpr is omitted, the default, inner join is attempted and an error is
2574 #' thrown if it would be a Cartesian Product. For Cartesian join, use crossJoin instead.
2575 #' @param joinType The type of join to perform, default 'inner'.
2576 #' Must be one of: 'inner', 'cross', 'outer', 'full', 'fullouter', 'full_outer',
2577 #' 'left', 'leftouter', 'left_outer', 'right', 'rightouter', 'right_outer', 'semi',
2578 #' 'leftsemi', 'left_semi', 'anti', 'leftanti', 'left_anti'.
2579 #' @return A SparkDataFrame containing the result of the join operation.
2580 #' @family SparkDataFrame functions
2581 #' @aliases join,SparkDataFrame,SparkDataFrame-method
2582 #' @rdname join
2583 #' @name join
2584 #' @seealso \link{merge} \link{crossJoin}
2585 #' @examples
2586 #'\dontrun{
2587 #' sparkR.session()
2588 #' df1 <- read.json(path)
2589 #' df2 <- read.json(path2)
2590 #' join(df1, df2, df1$col1 == df2$col2) # Performs an inner join based on expression
2591 #' join(df1, df2, df1$col1 == df2$col2, "right_outer")
2592 #' join(df1, df2) # Attempts an inner join
2593 #' }
2594 #' @note join since 1.4.0
2595 setMethod("join",
2596 signature(x = "SparkDataFrame", y = "SparkDataFrame"),
2597 function(x, y, joinExpr = NULL, joinType = NULL) {
2598 if (is.null(joinExpr)) {
2599 # this may not fail until the planner checks for Cartesian join later on.
2600 sdf <- callJMethod(x@sdf, "join", y@sdf)
2601 } else {
2602 if (class(joinExpr) != "Column") stop("joinExpr must be a Column")
2603 if (is.null(joinType)) {
2604 sdf <- callJMethod(x@sdf, "join", y@sdf, joinExpr@jc)
2605 } else {
2606 validJoinTypes <- c("inner", "cross",
2607 "outer", "full", "fullouter", "full_outer",
2608 "left", "leftouter", "left_outer",
2609 "right", "rightouter", "right_outer",
2610 "semi", "leftsemi", "left_semi", "anti", "leftanti", "left_anti")
2611 if (joinType %in% validJoinTypes) {
2612 joinType <- gsub("_", "", joinType, fixed = TRUE)
2613 sdf <- callJMethod(x@sdf, "join", y@sdf, joinExpr@jc, joinType)
2614 } else {
2615 stop("joinType must be one of the following types: ",
2616 "'", paste(validJoinTypes, collapse = "', '"), "'")
2617 }
2618 }
2619 }
2620 dataFrame(sdf)
2621 })
2622
2623 #' CrossJoin
2624 #'
2625 #' Returns Cartesian Product on two SparkDataFrames.
2626 #'
2627 #' @param x A SparkDataFrame
2628 #' @param y A SparkDataFrame
2629 #' @return A SparkDataFrame containing the result of the join operation.
2630 #' @family SparkDataFrame functions
2631 #' @aliases crossJoin,SparkDataFrame,SparkDataFrame-method
2632 #' @rdname crossJoin
2633 #' @name crossJoin
2634 #' @seealso \link{merge} \link{join}
2635 #' @examples
2636 #'\dontrun{
2637 #' sparkR.session()
2638 #' df1 <- read.json(path)
2639 #' df2 <- read.json(path2)
2640 #' crossJoin(df1, df2) # Performs a Cartesian
2641 #' }
2642 #' @note crossJoin since 2.1.0
2643 setMethod("crossJoin",
2644 signature(x = "SparkDataFrame", y = "SparkDataFrame"),
2645 function(x, y) {
2646 sdf <- callJMethod(x@sdf, "crossJoin", y@sdf)
2647 dataFrame(sdf)
2648 })
2649
2650 #' Merges two data frames
2651 #'
2652 #' @name merge
2653 #' @param x the first data frame to be joined.
2654 #' @param y the second data frame to be joined.
2655 #' @param by a character vector specifying the join columns. If by is not
2656 #' specified, the common column names in \code{x} and \code{y} will be used.
2657 #' If by or both by.x and by.y are explicitly set to NULL or of length 0, the Cartesian
2658 #' Product of x and y will be returned.
2659 #' @param by.x a character vector specifying the joining columns for x.
2660 #' @param by.y a character vector specifying the joining columns for y.
2661 #' @param all a boolean value setting \code{all.x} and \code{all.y}
2662 #' if any of them are unset.
2663 #' @param all.x a boolean value indicating whether all the rows in x should
2664 #' be including in the join.
2665 #' @param all.y a boolean value indicating whether all the rows in y should
2666 #' be including in the join.
2667 #' @param sort a logical argument indicating whether the resulting columns should be sorted.
2668 #' @param suffixes a string vector of length 2 used to make colnames of
2669 #' \code{x} and \code{y} unique.
2670 #' The first element is appended to each colname of \code{x}.
2671 #' The second element is appended to each colname of \code{y}.
2672 #' @param ... additional argument(s) passed to the method.
2673 #' @details If all.x and all.y are set to FALSE, a natural join will be returned. If
2674 #' all.x is set to TRUE and all.y is set to FALSE, a left outer join will
2675 #' be returned. If all.x is set to FALSE and all.y is set to TRUE, a right
2676 #' outer join will be returned. If all.x and all.y are set to TRUE, a full
2677 #' outer join will be returned.
2678 #' @family SparkDataFrame functions
2679 #' @aliases merge,SparkDataFrame,SparkDataFrame-method
2680 #' @rdname merge
2681 #' @seealso \link{join} \link{crossJoin}
2682 #' @examples
2683 #'\dontrun{
2684 #' sparkR.session()
2685 #' df1 <- read.json(path)
2686 #' df2 <- read.json(path2)
2687 #' merge(df1, df2) # Performs an inner join by common columns
2688 #' merge(df1, df2, by = "col1") # Performs an inner join based on expression
2689 #' merge(df1, df2, by.x = "col1", by.y = "col2", all.y = TRUE)
2690 #' merge(df1, df2, by.x = "col1", by.y = "col2", all.x = TRUE)
2691 #' merge(df1, df2, by.x = "col1", by.y = "col2", all.x = TRUE, all.y = TRUE)
2692 #' merge(df1, df2, by.x = "col1", by.y = "col2", all = TRUE, sort = FALSE)
2693 #' merge(df1, df2, by = "col1", all = TRUE, suffixes = c("-X", "-Y"))
2694 #' merge(df1, df2, by = NULL) # Performs a Cartesian join
2695 #' }
2696 #' @note merge since 1.5.0
2697 setMethod("merge",
2698 signature(x = "SparkDataFrame", y = "SparkDataFrame"),
2699 function(x, y, by = intersect(names(x), names(y)), by.x = by, by.y = by,
2700 all = FALSE, all.x = all, all.y = all,
2701 sort = TRUE, suffixes = c("_x", "_y"), ...) {
2702
2703 if (length(suffixes) != 2) {
2704 stop("suffixes must have length 2")
2705 }
2706
2707 # join type is identified based on the values of all, all.x and all.y
2708 # default join type is inner, according to R it should be natural but since it
2709 # is not supported in spark inner join is used
2710 joinType <- "inner"
2711 if (all || (all.x && all.y)) {
2712 joinType <- "outer"
2713 } else if (all.x) {
2714 joinType <- "left_outer"
2715 } else if (all.y) {
2716 joinType <- "right_outer"
2717 }
2718
2719 # join expression is based on by.x, by.y if both by.x and by.y are not missing
2720 # or on by, if by.x or by.y are missing or have different lengths
2721 if (length(by.x) > 0 && length(by.x) == length(by.y)) {
2722 joinX <- by.x
2723 joinY <- by.y
2724 } else if (length(by) > 0) {
2725 # if join columns have the same name for both dataframes,
2726 # they are used in join expression
2727 joinX <- by
2728 joinY <- by
2729 } else {
2730 # if by or both by.x and by.y have length 0, use Cartesian Product
2731 joinRes <- crossJoin(x, y)
2732 return(joinRes)
2733 }
2734
2735 # sets alias for making colnames unique in dataframes 'x' and 'y'
2736 colsX <- genAliasesForIntersectedCols(x, by, suffixes[1])
2737 colsY <- genAliasesForIntersectedCols(y, by, suffixes[2])
2738
2739 # selects columns with their aliases from dataframes
2740 # in case same column names are present in both data frames
2741 xsel <- select(x, colsX)
2742 ysel <- select(y, colsY)
2743
2744 # generates join conditions and adds them into a list
2745 # it also considers alias names of the columns while generating join conditions
2746 joinColumns <- lapply(seq_len(length(joinX)), function(i) {
2747 colX <- joinX[[i]]
2748 colY <- joinY[[i]]
2749
2750 if (colX %in% by) {
2751 colX <- paste0(colX, suffixes[1])
2752 }
2753 if (colY %in% by) {
2754 colY <- paste0(colY, suffixes[2])
2755 }
2756
2757 colX <- getColumn(xsel, colX)
2758 colY <- getColumn(ysel, colY)
2759
2760 colX == colY
2761 })
2762
2763 # concatenates join columns with '&' and executes join
2764 joinExpr <- Reduce("&", joinColumns)
2765 joinRes <- join(xsel, ysel, joinExpr, joinType)
2766
2767 # sorts the result by 'by' columns if sort = TRUE
2768 if (sort && length(by) > 0) {
2769 colNameWithSuffix <- paste0(by, suffixes[2])
2770 joinRes <- do.call("arrange", c(joinRes, colNameWithSuffix, decreasing = FALSE))
2771 }
2772
2773 joinRes
2774 })
2775
2776 #' Creates a list of columns by replacing the intersected ones with aliases
2777 #'
2778 #' Creates a list of columns by replacing the intersected ones with aliases.
2779 #' The name of the alias column is formed by concatanating the original column name and a suffix.
2780 #'
2781 #' @param x a SparkDataFrame
2782 #' @param intersectedColNames a list of intersected column names of the SparkDataFrame
2783 #' @param suffix a suffix for the column name
2784 #' @return list of columns
2785 #' @noRd
2786 genAliasesForIntersectedCols <- function(x, intersectedColNames, suffix) {
2787 allColNames <- names(x)
2788 # sets alias for making colnames unique in dataframe 'x'
2789 cols <- lapply(allColNames, function(colName) {
2790 col <- getColumn(x, colName)
2791 if (colName %in% intersectedColNames) {
2792 newJoin <- paste0(colName, suffix)
2793 if (newJoin %in% allColNames) {
2794 stop("The following column name: ", newJoin, " occurs more than once in the 'DataFrame'.",
2795 "Please use different suffixes for the intersected columns.")
2796 }
2797 col <- alias(col, newJoin)
2798 }
2799 col
2800 })
2801 cols
2802 }
2803
2804 #' Return a new SparkDataFrame containing the union of rows
2805 #'
2806 #' Return a new SparkDataFrame containing the union of rows in this SparkDataFrame
2807 #' and another SparkDataFrame. This is equivalent to \code{UNION ALL} in SQL.
2808 #' Input SparkDataFrames can have different schemas (names and data types).
2809 #'
2810 #' Note: This does not remove duplicate rows across the two SparkDataFrames.
2811 #' Also as standard in SQL, this function resolves columns by position (not by name).
2812 #'
2813 #' @param x A SparkDataFrame
2814 #' @param y A SparkDataFrame
2815 #' @return A SparkDataFrame containing the result of the union.
2816 #' @family SparkDataFrame functions
2817 #' @rdname union
2818 #' @name union
2819 #' @aliases union,SparkDataFrame,SparkDataFrame-method
2820 #' @seealso \link{rbind} \link{unionByName}
2821 #' @examples
2822 #'\dontrun{
2823 #' sparkR.session()
2824 #' df1 <- read.json(path)
2825 #' df2 <- read.json(path2)
2826 #' unioned <- union(df, df2)
2827 #' unions <- rbind(df, df2, df3, df4)
2828 #' }
2829 #' @note union since 2.0.0
2830 setMethod("union",
2831 signature(x = "SparkDataFrame", y = "SparkDataFrame"),
2832 function(x, y) {
2833 unioned <- callJMethod(x@sdf, "union", y@sdf)
2834 dataFrame(unioned)
2835 })
2836
2837 #' Return a new SparkDataFrame containing the union of rows.
2838 #'
2839 #' This is an alias for \code{union}.
2840 #'
2841 #' @param x a SparkDataFrame.
2842 #' @param y a SparkDataFrame.
2843 #' @return A SparkDataFrame containing the result of the unionAll operation.
2844 #' @family SparkDataFrame functions
2845 #' @aliases unionAll,SparkDataFrame,SparkDataFrame-method
2846 #' @rdname unionAll
2847 #' @name unionAll
2848 #' @seealso \link{union}
2849 #' @examples
2850 #'\dontrun{
2851 #' sparkR.session()
2852 #' df1 <- read.json(path)
2853 #' df2 <- read.json(path2)
2854 #' unionAllDF <- unionAll(df1, df2)
2855 #' }
2856 #' @note unionAll since 1.4.0
2857 setMethod("unionAll",
2858 signature(x = "SparkDataFrame", y = "SparkDataFrame"),
2859 function(x, y) {
2860 union(x, y)
2861 })
2862
2863 #' Return a new SparkDataFrame containing the union of rows, matched by column names
2864 #'
2865 #' Return a new SparkDataFrame containing the union of rows in this SparkDataFrame
2866 #' and another SparkDataFrame. This is different from \code{union} function, and both
2867 #' \code{UNION ALL} and \code{UNION DISTINCT} in SQL as column positions are not taken
2868 #' into account. Input SparkDataFrames can have different data types in the schema.
2869 #'
2870 #' Note: This does not remove duplicate rows across the two SparkDataFrames.
2871 #' This function resolves columns by name (not by position).
2872 #'
2873 #' @param x A SparkDataFrame
2874 #' @param y A SparkDataFrame
2875 #' @return A SparkDataFrame containing the result of the union.
2876 #' @family SparkDataFrame functions
2877 #' @rdname unionByName
2878 #' @name unionByName
2879 #' @aliases unionByName,SparkDataFrame,SparkDataFrame-method
2880 #' @seealso \link{rbind} \link{union}
2881 #' @examples
2882 #'\dontrun{
2883 #' sparkR.session()
2884 #' df1 <- select(createDataFrame(mtcars), "carb", "am", "gear")
2885 #' df2 <- select(createDataFrame(mtcars), "am", "gear", "carb")
2886 #' head(unionByName(df1, df2))
2887 #' }
2888 #' @note unionByName since 2.3.0
2889 setMethod("unionByName",
2890 signature(x = "SparkDataFrame", y = "SparkDataFrame"),
2891 function(x, y) {
2892 unioned <- callJMethod(x@sdf, "unionByName", y@sdf)
2893 dataFrame(unioned)
2894 })
2895
2896 #' Union two or more SparkDataFrames
2897 #'
2898 #' Union two or more SparkDataFrames by row. As in R's \code{rbind}, this method
2899 #' requires that the input SparkDataFrames have the same column names.
2900 #'
2901 #' Note: This does not remove duplicate rows across the two SparkDataFrames.
2902 #'
2903 #' @param x a SparkDataFrame.
2904 #' @param ... additional SparkDataFrame(s).
2905 #' @param deparse.level currently not used (put here to match the signature of
2906 #' the base implementation).
2907 #' @return A SparkDataFrame containing the result of the union.
2908 #' @family SparkDataFrame functions
2909 #' @aliases rbind,SparkDataFrame-method
2910 #' @rdname rbind
2911 #' @name rbind
2912 #' @seealso \link{union} \link{unionByName}
2913 #' @examples
2914 #'\dontrun{
2915 #' sparkR.session()
2916 #' unions <- rbind(df, df2, df3, df4)
2917 #' }
2918 #' @note rbind since 1.5.0
2919 setMethod("rbind",
2920 signature(... = "SparkDataFrame"),
2921 function(x, ..., deparse.level = 1) {
2922 nm <- lapply(list(x, ...), names)
2923 if (length(unique(nm)) != 1) {
2924 stop("Names of input data frames are different.")
2925 }
2926 if (nargs() == 3) {
2927 union(x, ...)
2928 } else {
2929 union(x, Recall(..., deparse.level = 1))
2930 }
2931 })
2932
2933 #' Intersect
2934 #'
2935 #' Return a new SparkDataFrame containing rows only in both this SparkDataFrame
2936 #' and another SparkDataFrame. This is equivalent to \code{INTERSECT} in SQL.
2937 #'
2938 #' @param x A SparkDataFrame
2939 #' @param y A SparkDataFrame
2940 #' @return A SparkDataFrame containing the result of the intersect.
2941 #' @family SparkDataFrame functions
2942 #' @aliases intersect,SparkDataFrame,SparkDataFrame-method
2943 #' @rdname intersect
2944 #' @name intersect
2945 #' @examples
2946 #'\dontrun{
2947 #' sparkR.session()
2948 #' df1 <- read.json(path)
2949 #' df2 <- read.json(path2)
2950 #' intersectDF <- intersect(df, df2)
2951 #' }
2952 #' @note intersect since 1.4.0
2953 setMethod("intersect",
2954 signature(x = "SparkDataFrame", y = "SparkDataFrame"),
2955 function(x, y) {
2956 intersected <- callJMethod(x@sdf, "intersect", y@sdf)
2957 dataFrame(intersected)
2958 })
2959
2960 #' intersectAll
2961 #'
2962 #' Return a new SparkDataFrame containing rows in both this SparkDataFrame
2963 #' and another SparkDataFrame while preserving the duplicates.
2964 #' This is equivalent to \code{INTERSECT ALL} in SQL. Also as standard in
2965 #' SQL, this function resolves columns by position (not by name).
2966 #'
2967 #' @param x a SparkDataFrame.
2968 #' @param y a SparkDataFrame.
2969 #' @return A SparkDataFrame containing the result of the intersect all operation.
2970 #' @family SparkDataFrame functions
2971 #' @aliases intersectAll,SparkDataFrame,SparkDataFrame-method
2972 #' @rdname intersectAll
2973 #' @name intersectAll
2974 #' @examples
2975 #'\dontrun{
2976 #' sparkR.session()
2977 #' df1 <- read.json(path)
2978 #' df2 <- read.json(path2)
2979 #' intersectAllDF <- intersectAll(df1, df2)
2980 #' }
2981 #' @note intersectAll since 2.4.0
2982 setMethod("intersectAll",
2983 signature(x = "SparkDataFrame", y = "SparkDataFrame"),
2984 function(x, y) {
2985 intersected <- callJMethod(x@sdf, "intersectAll", y@sdf)
2986 dataFrame(intersected)
2987 })
2988
2989 #' except
2990 #'
2991 #' Return a new SparkDataFrame containing rows in this SparkDataFrame
2992 #' but not in another SparkDataFrame. This is equivalent to \code{EXCEPT DISTINCT} in SQL.
2993 #'
2994 #' @param x a SparkDataFrame.
2995 #' @param y a SparkDataFrame.
2996 #' @return A SparkDataFrame containing the result of the except operation.
2997 #' @family SparkDataFrame functions
2998 #' @aliases except,SparkDataFrame,SparkDataFrame-method
2999 #' @rdname except
3000 #' @name except
3001 #' @examples
3002 #'\dontrun{
3003 #' sparkR.session()
3004 #' df1 <- read.json(path)
3005 #' df2 <- read.json(path2)
3006 #' exceptDF <- except(df, df2)
3007 #' }
3008 #' @note except since 1.4.0
3009 setMethod("except",
3010 signature(x = "SparkDataFrame", y = "SparkDataFrame"),
3011 function(x, y) {
3012 excepted <- callJMethod(x@sdf, "except", y@sdf)
3013 dataFrame(excepted)
3014 })
3015
3016 #' exceptAll
3017 #'
3018 #' Return a new SparkDataFrame containing rows in this SparkDataFrame
3019 #' but not in another SparkDataFrame while preserving the duplicates.
3020 #' This is equivalent to \code{EXCEPT ALL} in SQL. Also as standard in
3021 #' SQL, this function resolves columns by position (not by name).
3022 #'
3023 #' @param x a SparkDataFrame.
3024 #' @param y a SparkDataFrame.
3025 #' @return A SparkDataFrame containing the result of the except all operation.
3026 #' @family SparkDataFrame functions
3027 #' @aliases exceptAll,SparkDataFrame,SparkDataFrame-method
3028 #' @rdname exceptAll
3029 #' @name exceptAll
3030 #' @examples
3031 #'\dontrun{
3032 #' sparkR.session()
3033 #' df1 <- read.json(path)
3034 #' df2 <- read.json(path2)
3035 #' exceptAllDF <- exceptAll(df1, df2)
3036 #' }
3037 #' @note exceptAll since 2.4.0
3038 setMethod("exceptAll",
3039 signature(x = "SparkDataFrame", y = "SparkDataFrame"),
3040 function(x, y) {
3041 excepted <- callJMethod(x@sdf, "exceptAll", y@sdf)
3042 dataFrame(excepted)
3043 })
3044
3045 #' Save the contents of SparkDataFrame to a data source.
3046 #'
3047 #' The data source is specified by the \code{source} and a set of options (...).
3048 #' If \code{source} is not specified, the default data source configured by
3049 #' spark.sql.sources.default will be used.
3050 #'
3051 #' Additionally, mode is used to specify the behavior of the save operation when data already
3052 #' exists in the data source. There are four modes:
3053 #' \itemize{
3054 #' \item 'append': Contents of this SparkDataFrame are expected to be appended to existing data.
3055 #' \item 'overwrite': Existing data is expected to be overwritten by the contents of this
3056 #' SparkDataFrame.
3057 #' \item 'error' or 'errorifexists': An exception is expected to be thrown.
3058 #' \item 'ignore': The save operation is expected to not save the contents of the SparkDataFrame
3059 #' and to not change the existing data.
3060 #' }
3061 #'
3062 #' @param df a SparkDataFrame.
3063 #' @param path a name for the table.
3064 #' @param source a name for external data source.
3065 #' @param mode one of 'append', 'overwrite', 'error', 'errorifexists', 'ignore'
3066 #' save mode (it is 'error' by default)
3067 #' @param partitionBy a name or a list of names of columns to partition the output by on the file
3068 #' system. If specified, the output is laid out on the file system similar
3069 #' to Hive's partitioning scheme.
3070 #' @param ... additional argument(s) passed to the method.
3071 #'
3072 #' @family SparkDataFrame functions
3073 #' @aliases write.df,SparkDataFrame-method
3074 #' @rdname write.df
3075 #' @name write.df
3076 #' @examples
3077 #'\dontrun{
3078 #' sparkR.session()
3079 #' path <- "path/to/file.json"
3080 #' df <- read.json(path)
3081 #' write.df(df, "myfile", "parquet", "overwrite", partitionBy = c("col1", "col2"))
3082 #' saveDF(df, parquetPath2, "parquet", mode = "append", mergeSchema = TRUE)
3083 #' }
3084 #' @note write.df since 1.4.0
3085 setMethod("write.df",
3086 signature(df = "SparkDataFrame"),
3087 function(df, path = NULL, source = NULL, mode = "error", partitionBy = NULL, ...) {
3088 if (!is.null(path) && !is.character(path)) {
3089 stop("path should be character, NULL or omitted.")
3090 }
3091 if (!is.null(source) && !is.character(source)) {
3092 stop("source should be character, NULL or omitted. It is the datasource specified ",
3093 "in 'spark.sql.sources.default' configuration by default.")
3094 }
3095 if (!is.character(mode)) {
3096 stop("mode should be character or omitted. It is 'error' by default.")
3097 }
3098 if (is.null(source)) {
3099 source <- getDefaultSqlSource()
3100 }
3101 cols <- NULL
3102 if (!is.null(partitionBy)) {
3103 if (!all(sapply(partitionBy, function(c) is.character(c)))) {
3104 stop("All partitionBy column names should be characters.")
3105 }
3106 cols <- as.list(partitionBy)
3107 }
3108 write <- callJMethod(df@sdf, "write")
3109 write <- callJMethod(write, "format", source)
3110 if (!is.null(cols)) {
3111 write <- callJMethod(write, "partitionBy", cols)
3112 }
3113 write <- setWriteOptions(write, path = path, mode = mode, ...)
3114 write <- handledCallJMethod(write, "save")
3115 })
3116
3117 #' @rdname write.df
3118 #' @name saveDF
3119 #' @aliases saveDF,SparkDataFrame,character-method
3120 #' @note saveDF since 1.4.0
3121 setMethod("saveDF",
3122 signature(df = "SparkDataFrame", path = "character"),
3123 function(df, path, source = NULL, mode = "error", ...) {
3124 write.df(df, path, source, mode, ...)
3125 })
3126
3127 #' Save the contents of the SparkDataFrame to a data source as a table
3128 #'
3129 #' The data source is specified by the \code{source} and a set of options (...).
3130 #' If \code{source} is not specified, the default data source configured by
3131 #' spark.sql.sources.default will be used.
3132 #'
3133 #' Additionally, mode is used to specify the behavior of the save operation when
3134 #' data already exists in the data source. There are four modes: \cr
3135 #' 'append': Contents of this SparkDataFrame are expected to be appended to existing data. \cr
3136 #' 'overwrite': Existing data is expected to be overwritten by the contents of this
3137 #' SparkDataFrame. \cr
3138 #' 'error' or 'errorifexists': An exception is expected to be thrown. \cr
3139 #' 'ignore': The save operation is expected to not save the contents of the SparkDataFrame
3140 #' and to not change the existing data. \cr
3141 #'
3142 #' @param df a SparkDataFrame.
3143 #' @param tableName a name for the table.
3144 #' @param source a name for external data source.
3145 #' @param mode one of 'append', 'overwrite', 'error', 'errorifexists', 'ignore'
3146 #' save mode (it is 'error' by default)
3147 #' @param ... additional option(s) passed to the method.
3148 #'
3149 #' @family SparkDataFrame functions
3150 #' @aliases saveAsTable,SparkDataFrame,character-method
3151 #' @rdname saveAsTable
3152 #' @name saveAsTable
3153 #' @examples
3154 #'\dontrun{
3155 #' sparkR.session()
3156 #' path <- "path/to/file.json"
3157 #' df <- read.json(path)
3158 #' saveAsTable(df, "myfile")
3159 #' }
3160 #' @note saveAsTable since 1.4.0
3161 setMethod("saveAsTable",
3162 signature(df = "SparkDataFrame", tableName = "character"),
3163 function(df, tableName, source = NULL, mode="error", ...) {
3164 if (is.null(source)) {
3165 source <- getDefaultSqlSource()
3166 }
3167 options <- varargsToStrEnv(...)
3168
3169 write <- callJMethod(df@sdf, "write")
3170 write <- callJMethod(write, "format", source)
3171 write <- setWriteMode(write, mode)
3172 write <- callJMethod(write, "options", options)
3173 invisible(callJMethod(write, "saveAsTable", tableName))
3174 })
3175
3176 #' describe
3177 #'
3178 #' Computes statistics for numeric and string columns.
3179 #' If no columns are given, this function computes statistics for all numerical or string columns.
3180 #'
3181 #' @param x a SparkDataFrame to be computed.
3182 #' @param col a string of name.
3183 #' @param ... additional expressions.
3184 #' @return A SparkDataFrame.
3185 #' @family SparkDataFrame functions
3186 #' @aliases describe,SparkDataFrame,character-method describe,SparkDataFrame,ANY-method
3187 #' @rdname describe
3188 #' @name describe
3189 #' @examples
3190 #'\dontrun{
3191 #' sparkR.session()
3192 #' path <- "path/to/file.json"
3193 #' df <- read.json(path)
3194 #' describe(df)
3195 #' describe(df, "col1")
3196 #' describe(df, "col1", "col2")
3197 #' }
3198 #' @seealso See \link{summary} for expanded statistics and control over which statistics to compute.
3199 #' @note describe(SparkDataFrame, character) since 1.4.0
3200 setMethod("describe",
3201 signature(x = "SparkDataFrame", col = "character"),
3202 function(x, col, ...) {
3203 colList <- list(col, ...)
3204 sdf <- callJMethod(x@sdf, "describe", colList)
3205 dataFrame(sdf)
3206 })
3207
3208 #' @rdname describe
3209 #' @name describe
3210 #' @aliases describe,SparkDataFrame-method
3211 #' @note describe(SparkDataFrame) since 1.4.0
3212 setMethod("describe",
3213 signature(x = "SparkDataFrame"),
3214 function(x) {
3215 sdf <- callJMethod(x@sdf, "describe", list())
3216 dataFrame(sdf)
3217 })
3218
3219 #' summary
3220 #'
3221 #' Computes specified statistics for numeric and string columns. Available statistics are:
3222 #' \itemize{
3223 #' \item count
3224 #' \item mean
3225 #' \item stddev
3226 #' \item min
3227 #' \item max
3228 #' \item arbitrary approximate percentiles specified as a percentage (eg, "75\%")
3229 #' }
3230 #' If no statistics are given, this function computes count, mean, stddev, min,
3231 #' approximate quartiles (percentiles at 25\%, 50\%, and 75\%), and max.
3232 #' This function is meant for exploratory data analysis, as we make no guarantee about the
3233 #' backward compatibility of the schema of the resulting Dataset. If you want to
3234 #' programmatically compute summary statistics, use the \code{agg} function instead.
3235 #'
3236 #'
3237 #' @param object a SparkDataFrame to be summarized.
3238 #' @param ... (optional) statistics to be computed for all columns.
3239 #' @return A SparkDataFrame.
3240 #' @family SparkDataFrame functions
3241 #' @rdname summary
3242 #' @name summary
3243 #' @aliases summary,SparkDataFrame-method
3244 #' @examples
3245 #'\dontrun{
3246 #' sparkR.session()
3247 #' path <- "path/to/file.json"
3248 #' df <- read.json(path)
3249 #' summary(df)
3250 #' summary(df, "min", "25%", "75%", "max")
3251 #' summary(select(df, "age", "height"))
3252 #' }
3253 #' @note summary(SparkDataFrame) since 1.5.0
3254 #' @note The statistics provided by \code{summary} were change in 2.3.0 use \link{describe} for
3255 #' previous defaults.
3256 #' @seealso \link{describe}
3257 setMethod("summary",
3258 signature(object = "SparkDataFrame"),
3259 function(object, ...) {
3260 statisticsList <- list(...)
3261 sdf <- callJMethod(object@sdf, "summary", statisticsList)
3262 dataFrame(sdf)
3263 })
3264
3265
3266 #' A set of SparkDataFrame functions working with NA values
3267 #'
3268 #' dropna, na.omit - Returns a new SparkDataFrame omitting rows with null values.
3269 #'
3270 #' @param x a SparkDataFrame.
3271 #' @param how "any" or "all".
3272 #' if "any", drop a row if it contains any nulls.
3273 #' if "all", drop a row only if all its values are null.
3274 #' if \code{minNonNulls} is specified, how is ignored.
3275 #' @param minNonNulls if specified, drop rows that have less than
3276 #' \code{minNonNulls} non-null values.
3277 #' This overwrites the how parameter.
3278 #' @param cols optional list of column names to consider. In \code{fillna},
3279 #' columns specified in cols that do not have matching data
3280 #' type are ignored. For example, if value is a character, and
3281 #' subset contains a non-character column, then the non-character
3282 #' column is simply ignored.
3283 #' @return A SparkDataFrame.
3284 #'
3285 #' @family SparkDataFrame functions
3286 #' @rdname nafunctions
3287 #' @aliases dropna,SparkDataFrame-method
3288 #' @name dropna
3289 #' @examples
3290 #'\dontrun{
3291 #' sparkR.session()
3292 #' path <- "path/to/file.json"
3293 #' df <- read.json(path)
3294 #' dropna(df)
3295 #' }
3296 #' @note dropna since 1.4.0
3297 setMethod("dropna",
3298 signature(x = "SparkDataFrame"),
3299 function(x, how = c("any", "all"), minNonNulls = NULL, cols = NULL) {
3300 how <- match.arg(how)
3301 if (is.null(cols)) {
3302 cols <- columns(x)
3303 }
3304 if (is.null(minNonNulls)) {
3305 minNonNulls <- if (how == "any") { length(cols) } else { 1 }
3306 }
3307
3308 naFunctions <- callJMethod(x@sdf, "na")
3309 sdf <- callJMethod(naFunctions, "drop",
3310 as.integer(minNonNulls), as.list(cols))
3311 dataFrame(sdf)
3312 })
3313
3314 #' @param object a SparkDataFrame.
3315 #' @param ... further arguments to be passed to or from other methods.
3316 #' @rdname nafunctions
3317 #' @name na.omit
3318 #' @aliases na.omit,SparkDataFrame-method
3319 #' @note na.omit since 1.5.0
3320 setMethod("na.omit",
3321 signature(object = "SparkDataFrame"),
3322 function(object, how = c("any", "all"), minNonNulls = NULL, cols = NULL) {
3323 dropna(object, how, minNonNulls, cols)
3324 })
3325
3326 #' fillna - Replace null values.
3327 #'
3328 #' @param value value to replace null values with.
3329 #' Should be an integer, numeric, character or named list.
3330 #' If the value is a named list, then cols is ignored and
3331 #' value must be a mapping from column name (character) to
3332 #' replacement value. The replacement value must be an
3333 #' integer, numeric or character.
3334 #'
3335 #' @rdname nafunctions
3336 #' @name fillna
3337 #' @aliases fillna,SparkDataFrame-method
3338 #' @examples
3339 #'\dontrun{
3340 #' sparkR.session()
3341 #' path <- "path/to/file.json"
3342 #' df <- read.json(path)
3343 #' fillna(df, 1)
3344 #' fillna(df, list("age" = 20, "name" = "unknown"))
3345 #' }
3346 #' @note fillna since 1.4.0
3347 setMethod("fillna",
3348 signature(x = "SparkDataFrame"),
3349 function(x, value, cols = NULL) {
3350 if (!(class(value) %in% c("integer", "numeric", "character", "list"))) {
3351 stop("value should be an integer, numeric, character or named list.")
3352 }
3353
3354 if (class(value) == "list") {
3355 # Check column names in the named list
3356 colNames <- names(value)
3357 if (length(colNames) == 0 || !all(colNames != "")) {
3358 stop("value should be an a named list with each name being a column name.")
3359 }
3360 # Check each item in the named list is of valid type
3361 lapply(value, function(v) {
3362 if (!(class(v) %in% c("integer", "numeric", "character"))) {
3363 stop("Each item in value should be an integer, numeric or character.")
3364 }
3365 })
3366
3367 # Convert to the named list to an environment to be passed to JVM
3368 valueMap <- convertNamedListToEnv(value)
3369
3370 # When value is a named list, caller is expected not to pass in cols
3371 if (!is.null(cols)) {
3372 warning("When value is a named list, cols is ignored!")
3373 cols <- NULL
3374 }
3375
3376 value <- valueMap
3377 } else if (is.integer(value)) {
3378 # Cast an integer to a numeric
3379 value <- as.numeric(value)
3380 }
3381
3382 naFunctions <- callJMethod(x@sdf, "na")
3383 sdf <- if (length(cols) == 0) {
3384 callJMethod(naFunctions, "fill", value)
3385 } else {
3386 callJMethod(naFunctions, "fill", value, as.list(cols))
3387 }
3388 dataFrame(sdf)
3389 })
3390
3391 #' Download data from a SparkDataFrame into a R data.frame
3392 #'
3393 #' This function downloads the contents of a SparkDataFrame into an R's data.frame.
3394 #' Since data.frames are held in memory, ensure that you have enough memory
3395 #' in your system to accommodate the contents.
3396 #'
3397 #' @param x a SparkDataFrame.
3398 #' @param row.names \code{NULL} or a character vector giving the row names for the data frame.
3399 #' @param optional If \code{TRUE}, converting column names is optional.
3400 #' @param ... additional arguments to pass to base::as.data.frame.
3401 #' @return A data.frame.
3402 #' @family SparkDataFrame functions
3403 #' @aliases as.data.frame,SparkDataFrame-method
3404 #' @rdname as.data.frame
3405 #' @examples
3406 #' \dontrun{
3407 #' irisDF <- createDataFrame(iris)
3408 #' df <- as.data.frame(irisDF[irisDF$Species == "setosa", ])
3409 #' }
3410 #' @note as.data.frame since 1.6.0
3411 setMethod("as.data.frame",
3412 signature(x = "SparkDataFrame"),
3413 function(x, row.names = NULL, optional = FALSE, ...) {
3414 as.data.frame(collect(x), row.names, optional, ...)
3415 })
3416
3417 #' Attach SparkDataFrame to R search path
3418 #'
3419 #' The specified SparkDataFrame is attached to the R search path. This means that
3420 #' the SparkDataFrame is searched by R when evaluating a variable, so columns in
3421 #' the SparkDataFrame can be accessed by simply giving their names.
3422 #'
3423 #' @family SparkDataFrame functions
3424 #' @rdname attach
3425 #' @aliases attach attach,SparkDataFrame-method
3426 #' @param what (SparkDataFrame) The SparkDataFrame to attach
3427 #' @param pos (integer) Specify position in search() where to attach.
3428 #' @param name (character) Name to use for the attached SparkDataFrame. Names
3429 #' starting with package: are reserved for library.
3430 #' @param warn.conflicts (logical) If TRUE, warnings are printed about conflicts
3431 #' from attaching the database, unless that SparkDataFrame contains an object
3432 #' @examples
3433 #' \dontrun{
3434 #' attach(irisDf)
3435 #' summary(Sepal_Width)
3436 #' }
3437 #' @seealso \link{detach}
3438 #' @note attach since 1.6.0
3439 setMethod("attach",
3440 signature(what = "SparkDataFrame"),
3441 function(what, pos = 2L, name = deparse(substitute(what), backtick = FALSE),
3442 warn.conflicts = TRUE) {
3443 args <- as.list(environment()) # capture all parameters - this must be the first line
3444 newEnv <- assignNewEnv(args$what)
3445 args$what <- newEnv
3446 do.call(attach, args)
3447 })
3448
3449 #' Evaluate a R expression in an environment constructed from a SparkDataFrame
3450 #'
3451 #' Evaluate a R expression in an environment constructed from a SparkDataFrame
3452 #' with() allows access to columns of a SparkDataFrame by simply referring to
3453 #' their name. It appends every column of a SparkDataFrame into a new
3454 #' environment. Then, the given expression is evaluated in this new
3455 #' environment.
3456 #'
3457 #' @rdname with
3458 #' @family SparkDataFrame functions
3459 #' @aliases with,SparkDataFrame-method
3460 #' @param data (SparkDataFrame) SparkDataFrame to use for constructing an environment.
3461 #' @param expr (expression) Expression to evaluate.
3462 #' @param ... arguments to be passed to future methods.
3463 #' @examples
3464 #' \dontrun{
3465 #' with(irisDf, nrow(Sepal_Width))
3466 #' }
3467 #' @seealso \link{attach}
3468 #' @note with since 1.6.0
3469 setMethod("with",
3470 signature(data = "SparkDataFrame"),
3471 function(data, expr, ...) {
3472 newEnv <- assignNewEnv(data)
3473 eval(substitute(expr), envir = newEnv, enclos = newEnv)
3474 })
3475
3476 #' Compactly display the structure of a dataset
3477 #'
3478 #' Display the structure of a SparkDataFrame, including column names, column types, as well as a
3479 #' a small sample of rows.
3480 #'
3481 #' @name str
3482 #' @rdname str
3483 #' @aliases str,SparkDataFrame-method
3484 #' @family SparkDataFrame functions
3485 #' @param object a SparkDataFrame
3486 #' @examples
3487 #' \dontrun{
3488 #' # Create a SparkDataFrame from the Iris dataset
3489 #' irisDF <- createDataFrame(iris)
3490 #'
3491 #' # Show the structure of the SparkDataFrame
3492 #' str(irisDF)
3493 #' }
3494 #' @note str since 1.6.1
3495 setMethod("str",
3496 signature(object = "SparkDataFrame"),
3497 function(object) {
3498
3499 # TODO: These could be made global parameters, though in R it's not the case
3500 MAX_CHAR_PER_ROW <- 120
3501 MAX_COLS <- 100
3502
3503 # Get the column names and types of the DataFrame
3504 names <- names(object)
3505 types <- coltypes(object)
3506
3507 # Get the first elements of the dataset. Limit number of columns accordingly
3508 localDF <- if (ncol(object) > MAX_COLS) {
3509 head(object[, c(1:MAX_COLS)])
3510 } else {
3511 head(object)
3512 }
3513
3514 # The number of observations will not be displayed as computing the
3515 # number of rows is a very expensive operation
3516 cat(paste0("'", class(object), "': ", length(names), " variables:\n"))
3517
3518 if (nrow(localDF) > 0) {
3519 for (i in seq_len(ncol(localDF))) {
3520 # Get the first elements for each column
3521
3522 firstElements <- if (types[i] == "character") {
3523 paste(paste0("\"", localDF[, i], "\""), collapse = " ")
3524 } else {
3525 paste(localDF[, i], collapse = " ")
3526 }
3527
3528 # Add the corresponding number of spaces for alignment
3529 spaces <- paste(rep(" ", max(nchar(names) - nchar(names[i]))), collapse = "")
3530
3531 # Get the short type. For 'character', it would be 'chr';
3532 # 'for numeric', it's 'num', etc.
3533 dataType <- SHORT_TYPES[[types[i]]]
3534 if (is.null(dataType)) {
3535 dataType <- substring(types[i], 1, 3)
3536 }
3537
3538 # Concatenate the colnames, coltypes, and first
3539 # elements of each column
3540 line <- paste0(" $ ", names[i], spaces, ": ",
3541 dataType, " ", firstElements)
3542
3543 # Chop off extra characters if this is too long
3544 cat(substr(line, 1, MAX_CHAR_PER_ROW))
3545 cat("\n")
3546 }
3547
3548 if (ncol(localDF) < ncol(object)) {
3549 cat(paste0("\nDisplaying first ", ncol(localDF), " columns only."))
3550 }
3551 }
3552 })
3553
3554 #' drop
3555 #'
3556 #' Returns a new SparkDataFrame with columns dropped.
3557 #' This is a no-op if schema doesn't contain column name(s).
3558 #'
3559 #' @param x a SparkDataFrame.
3560 #' @param col a character vector of column names or a Column.
3561 #' @param ... further arguments to be passed to or from other methods.
3562 #' @return A SparkDataFrame.
3563 #'
3564 #' @family SparkDataFrame functions
3565 #' @rdname drop
3566 #' @name drop
3567 #' @aliases drop,SparkDataFrame-method
3568 #' @examples
3569 #'\dontrun{
3570 #' sparkR.session()
3571 #' path <- "path/to/file.json"
3572 #' df <- read.json(path)
3573 #' drop(df, "col1")
3574 #' drop(df, c("col1", "col2"))
3575 #' drop(df, df$col1)
3576 #' }
3577 #' @note drop since 2.0.0
3578 setMethod("drop",
3579 signature(x = "SparkDataFrame"),
3580 function(x, col) {
3581 stopifnot(class(col) == "character" || class(col) == "Column")
3582
3583 if (class(col) == "Column") {
3584 sdf <- callJMethod(x@sdf, "drop", col@jc)
3585 } else {
3586 sdf <- callJMethod(x@sdf, "drop", as.list(col))
3587 }
3588 dataFrame(sdf)
3589 })
3590
3591 # Expose base::drop
3592 #' @name drop
3593 #' @rdname drop
3594 #' @aliases drop,ANY-method
3595 setMethod("drop",
3596 signature(x = "ANY"),
3597 function(x) {
3598 base::drop(x)
3599 })
3600
3601 #' Compute histogram statistics for given column
3602 #'
3603 #' This function computes a histogram for a given SparkR Column.
3604 #'
3605 #' @name histogram
3606 #' @param nbins the number of bins (optional). Default value is 10.
3607 #' @param col the column as Character string or a Column to build the histogram from.
3608 #' @param df the SparkDataFrame containing the Column to build the histogram from.
3609 #' @return a data.frame with the histogram statistics, i.e., counts and centroids.
3610 #' @rdname histogram
3611 #' @aliases histogram,SparkDataFrame,characterOrColumn-method
3612 #' @family SparkDataFrame functions
3613 #' @examples
3614 #' \dontrun{
3615 #'
3616 #' # Create a SparkDataFrame from the Iris dataset
3617 #' irisDF <- createDataFrame(iris)
3618 #'
3619 #' # Compute histogram statistics
3620 #' histStats <- histogram(irisDF, irisDF$Sepal_Length, nbins = 12)
3621 #'
3622 #' # Once SparkR has computed the histogram statistics, the histogram can be
3623 #' # rendered using the ggplot2 library:
3624 #'
3625 #' require(ggplot2)
3626 #' plot <- ggplot(histStats, aes(x = centroids, y = counts)) +
3627 #' geom_bar(stat = "identity") +
3628 #' xlab("Sepal_Length") + ylab("Frequency")
3629 #' }
3630 #' @note histogram since 2.0.0
3631 setMethod("histogram",
3632 signature(df = "SparkDataFrame", col = "characterOrColumn"),
3633 function(df, col, nbins = 10) {
3634 # Validate nbins
3635 if (nbins < 2) {
3636 stop("The number of bins must be a positive integer number greater than 1.")
3637 }
3638
3639 # Round nbins to the smallest integer
3640 nbins <- floor(nbins)
3641
3642 # Validate col
3643 if (is.null(col)) {
3644 stop("col must be specified.")
3645 }
3646
3647 colname <- col
3648 x <- if (class(col) == "character") {
3649 if (!colname %in% names(df)) {
3650 stop("Specified colname does not belong to the given SparkDataFrame.")
3651 }
3652
3653 # Filter NA values in the target column and remove all other columns
3654 df <- na.omit(df[, colname, drop = F])
3655 getColumn(df, colname)
3656
3657 } else if (class(col) == "Column") {
3658
3659 # The given column needs to be appended to the SparkDataFrame so that we can
3660 # use method describe() to compute statistics in one single pass. The new
3661 # column must have a name that doesn't exist in the dataset.
3662 # To do so, we generate a random column name with more characters than the
3663 # longest colname in the dataset, but no more than 100 (think of a UUID).
3664 # This column name will never be visible to the user, so the name is irrelevant.
3665 # Limiting the colname length to 100 makes debugging easier and it does
3666 # introduce a negligible probability of collision: assuming the user has 1 million
3667 # columns AND all of them have names 100 characters long (which is very unlikely),
3668 # AND they run 1 billion histograms, the probability of collision will roughly be
3669 # 1 in 4.4 x 10 ^ 96
3670 colname <- paste(base::sample(c(letters, LETTERS),
3671 size = min(max(nchar(colnames(df))) + 1, 100),
3672 replace = TRUE),
3673 collapse = "")
3674
3675 # Append the given column to the dataset. This is to support Columns that
3676 # don't belong to the SparkDataFrame but are rather expressions
3677 df <- withColumn(df, colname, col)
3678
3679 # Filter NA values in the target column. Cannot remove all other columns
3680 # since given Column may be an expression on one or more existing columns
3681 df <- na.omit(df)
3682
3683 col
3684 }
3685
3686 stats <- collect(describe(df[, colname, drop = F]))
3687 min <- as.numeric(stats[4, 2])
3688 max <- as.numeric(stats[5, 2])
3689
3690 # Normalize the data
3691 xnorm <- (x - min) / (max - min)
3692
3693 # Round the data to 4 significant digits. This is to avoid rounding issues.
3694 xnorm <- cast(xnorm * 10000, "integer") / 10000.0
3695
3696 # Since min = 0, max = 1 (data is already normalized)
3697 normBinSize <- 1 / nbins
3698 binsize <- (max - min) / nbins
3699 approxBins <- xnorm / normBinSize
3700
3701 # Adjust values that are equal to the upper bound of each bin
3702 bins <- cast(approxBins -
3703 ifelse(approxBins == cast(approxBins, "integer") & x != min, 1, 0),
3704 "integer")
3705
3706 df$bins <- bins
3707 histStats <- collect(count(groupBy(df, "bins")))
3708 names(histStats) <- c("bins", "counts")
3709
3710 # Fill bins with zero counts
3711 y <- data.frame("bins" = seq(0, nbins - 1))
3712 histStats <- merge(histStats, y, all.x = T, all.y = T)
3713 histStats[is.na(histStats$count), 2] <- 0
3714
3715 # Compute centroids
3716 histStats$centroids <- histStats$bins * binsize + min + binsize / 2
3717
3718 # Return the statistics
3719 return(histStats)
3720 })
3721
3722 #' Save the content of SparkDataFrame to an external database table via JDBC.
3723 #'
3724 #' Save the content of the SparkDataFrame to an external database table via JDBC. Additional JDBC
3725 #' database connection properties can be set (...)
3726 #'
3727 #' Also, mode is used to specify the behavior of the save operation when
3728 #' data already exists in the data source. There are four modes:
3729 #' \itemize{
3730 #' \item 'append': Contents of this SparkDataFrame are expected to be appended to existing data.
3731 #' \item 'overwrite': Existing data is expected to be overwritten by the contents of this
3732 #' SparkDataFrame.
3733 #' \item 'error' or 'errorifexists': An exception is expected to be thrown.
3734 #' \item 'ignore': The save operation is expected to not save the contents of the SparkDataFrame
3735 #' and to not change the existing data.
3736 #' }
3737 #'
3738 #' @param x a SparkDataFrame.
3739 #' @param url JDBC database url of the form \code{jdbc:subprotocol:subname}.
3740 #' @param tableName yhe name of the table in the external database.
3741 #' @param mode one of 'append', 'overwrite', 'error', 'errorifexists', 'ignore'
3742 #' save mode (it is 'error' by default)
3743 #' @param ... additional JDBC database connection properties.
3744 #' @family SparkDataFrame functions
3745 #' @rdname write.jdbc
3746 #' @name write.jdbc
3747 #' @aliases write.jdbc,SparkDataFrame,character,character-method
3748 #' @examples
3749 #'\dontrun{
3750 #' sparkR.session()
3751 #' jdbcUrl <- "jdbc:mysql://localhost:3306/databasename"
3752 #' write.jdbc(df, jdbcUrl, "table", user = "username", password = "password")
3753 #' }
3754 #' @note write.jdbc since 2.0.0
3755 setMethod("write.jdbc",
3756 signature(x = "SparkDataFrame", url = "character", tableName = "character"),
3757 function(x, url, tableName, mode = "error", ...) {
3758 jprops <- varargsToJProperties(...)
3759 write <- callJMethod(x@sdf, "write")
3760 write <- setWriteMode(write, mode)
3761 invisible(handledCallJMethod(write, "jdbc", url, tableName, jprops))
3762 })
3763
3764 #' randomSplit
3765 #'
3766 #' Return a list of randomly split dataframes with the provided weights.
3767 #'
3768 #' @param x A SparkDataFrame
3769 #' @param weights A vector of weights for splits, will be normalized if they don't sum to 1
3770 #' @param seed A seed to use for random split
3771 #'
3772 #' @family SparkDataFrame functions
3773 #' @aliases randomSplit,SparkDataFrame,numeric-method
3774 #' @rdname randomSplit
3775 #' @name randomSplit
3776 #' @examples
3777 #'\dontrun{
3778 #' sparkR.session()
3779 #' df <- createDataFrame(data.frame(id = 1:1000))
3780 #' df_list <- randomSplit(df, c(2, 3, 5), 0)
3781 #' # df_list contains 3 SparkDataFrames with each having about 200, 300 and 500 rows respectively
3782 #' sapply(df_list, count)
3783 #' }
3784 #' @note randomSplit since 2.0.0
3785 setMethod("randomSplit",
3786 signature(x = "SparkDataFrame", weights = "numeric"),
3787 function(x, weights, seed) {
3788 if (!all(sapply(weights, function(c) { c >= 0 }))) {
3789 stop("all weight values should not be negative")
3790 }
3791 normalized_list <- as.list(weights / sum(weights))
3792 if (!missing(seed)) {
3793 sdfs <- callJMethod(x@sdf, "randomSplit", normalized_list, as.integer(seed))
3794 } else {
3795 sdfs <- callJMethod(x@sdf, "randomSplit", normalized_list)
3796 }
3797 sapply(sdfs, dataFrame)
3798 })
3799
3800 #' getNumPartitions
3801 #'
3802 #' Return the number of partitions
3803 #'
3804 #' @param x A SparkDataFrame
3805 #' @family SparkDataFrame functions
3806 #' @aliases getNumPartitions,SparkDataFrame-method
3807 #' @rdname getNumPartitions
3808 #' @name getNumPartitions
3809 #' @examples
3810 #'\dontrun{
3811 #' sparkR.session()
3812 #' df <- createDataFrame(cars, numPartitions = 2)
3813 #' getNumPartitions(df)
3814 #' }
3815 #' @note getNumPartitions since 2.1.1
3816 setMethod("getNumPartitions",
3817 signature(x = "SparkDataFrame"),
3818 function(x) {
3819 callJMethod(callJMethod(x@sdf, "rdd"), "getNumPartitions")
3820 })
3821
3822 #' isStreaming
3823 #'
3824 #' Returns TRUE if this SparkDataFrame contains one or more sources that continuously return data
3825 #' as it arrives. A dataset that reads data from a streaming source must be executed as a
3826 #' \code{StreamingQuery} using \code{write.stream}.
3827 #'
3828 #' @param x A SparkDataFrame
3829 #' @return TRUE if this SparkDataFrame is from a streaming source
3830 #' @family SparkDataFrame functions
3831 #' @aliases isStreaming,SparkDataFrame-method
3832 #' @rdname isStreaming
3833 #' @name isStreaming
3834 #' @seealso \link{read.stream} \link{write.stream}
3835 #' @examples
3836 #'\dontrun{
3837 #' sparkR.session()
3838 #' df <- read.stream("socket", host = "localhost", port = 9999)
3839 #' isStreaming(df)
3840 #' }
3841 #' @note isStreaming since 2.2.0
3842 #' @note experimental
3843 setMethod("isStreaming",
3844 signature(x = "SparkDataFrame"),
3845 function(x) {
3846 callJMethod(x@sdf, "isStreaming")
3847 })
3848
3849 #' Write the streaming SparkDataFrame to a data source.
3850 #'
3851 #' The data source is specified by the \code{source} and a set of options (...).
3852 #' If \code{source} is not specified, the default data source configured by
3853 #' spark.sql.sources.default will be used.
3854 #'
3855 #' Additionally, \code{outputMode} specifies how data of a streaming SparkDataFrame is written to a
3856 #' output data source. There are three modes:
3857 #' \itemize{
3858 #' \item append: Only the new rows in the streaming SparkDataFrame will be written out. This
3859 #' output mode can be only be used in queries that do not contain any aggregation.
3860 #' \item complete: All the rows in the streaming SparkDataFrame will be written out every time
3861 #' there are some updates. This output mode can only be used in queries that
3862 #' contain aggregations.
3863 #' \item update: Only the rows that were updated in the streaming SparkDataFrame will be written
3864 #' out every time there are some updates. If the query doesn't contain aggregations,
3865 #' it will be equivalent to \code{append} mode.
3866 #' }
3867 #'
3868 #' @param df a streaming SparkDataFrame.
3869 #' @param source a name for external data source.
3870 #' @param outputMode one of 'append', 'complete', 'update'.
3871 #' @param partitionBy a name or a list of names of columns to partition the output by on the file
3872 #' system. If specified, the output is laid out on the file system similar to Hive's
3873 #' partitioning scheme.
3874 #' @param trigger.processingTime a processing time interval as a string, e.g. '5 seconds',
3875 #' '1 minute'. This is a trigger that runs a query periodically based on the processing
3876 #' time. If value is '0 seconds', the query will run as fast as possible, this is the
3877 #' default. Only one trigger can be set.
3878 #' @param trigger.once a logical, must be set to \code{TRUE}. This is a trigger that processes only
3879 #' one batch of data in a streaming query then terminates the query. Only one trigger can be
3880 #' set.
3881 #' @param ... additional external data source specific named options.
3882 #'
3883 #' @family SparkDataFrame functions
3884 #' @seealso \link{read.stream}
3885 #' @aliases write.stream,SparkDataFrame-method
3886 #' @rdname write.stream
3887 #' @name write.stream
3888 #' @examples
3889 #'\dontrun{
3890 #' sparkR.session()
3891 #' df <- read.stream("socket", host = "localhost", port = 9999)
3892 #' isStreaming(df)
3893 #' wordCounts <- count(group_by(df, "value"))
3894 #'
3895 #' # console
3896 #' q <- write.stream(wordCounts, "console", outputMode = "complete")
3897 #' # text stream
3898 #' q <- write.stream(df, "text", path = "/home/user/out", checkpointLocation = "/home/user/cp"
3899 #' partitionBy = c("year", "month"), trigger.processingTime = "30 seconds")
3900 #' # memory stream
3901 #' q <- write.stream(wordCounts, "memory", queryName = "outs", outputMode = "complete")
3902 #' head(sql("SELECT * from outs"))
3903 #' queryName(q)
3904 #'
3905 #' stopQuery(q)
3906 #' }
3907 #' @note write.stream since 2.2.0
3908 #' @note experimental
3909 setMethod("write.stream",
3910 signature(df = "SparkDataFrame"),
3911 function(df, source = NULL, outputMode = NULL, partitionBy = NULL,
3912 trigger.processingTime = NULL, trigger.once = NULL, ...) {
3913 if (!is.null(source) && !is.character(source)) {
3914 stop("source should be character, NULL or omitted. It is the data source specified ",
3915 "in 'spark.sql.sources.default' configuration by default.")
3916 }
3917 if (!is.null(outputMode) && !is.character(outputMode)) {
3918 stop("outputMode should be character or omitted.")
3919 }
3920 if (is.null(source)) {
3921 source <- getDefaultSqlSource()
3922 }
3923 cols <- NULL
3924 if (!is.null(partitionBy)) {
3925 if (!all(sapply(partitionBy, function(c) { is.character(c) }))) {
3926 stop("All partitionBy column names should be characters.")
3927 }
3928 cols <- as.list(partitionBy)
3929 }
3930 jtrigger <- NULL
3931 if (!is.null(trigger.processingTime) && !is.na(trigger.processingTime)) {
3932 if (!is.null(trigger.once)) {
3933 stop("Multiple triggers not allowed.")
3934 }
3935 interval <- as.character(trigger.processingTime)
3936 if (nchar(interval) == 0) {
3937 stop("Value for trigger.processingTime must be a non-empty string.")
3938 }
3939 jtrigger <- handledCallJStatic("org.apache.spark.sql.streaming.Trigger",
3940 "ProcessingTime",
3941 interval)
3942 } else if (!is.null(trigger.once) && !is.na(trigger.once)) {
3943 if (!is.logical(trigger.once) || !trigger.once) {
3944 stop("Value for trigger.once must be TRUE.")
3945 }
3946 jtrigger <- callJStatic("org.apache.spark.sql.streaming.Trigger", "Once")
3947 }
3948 options <- varargsToStrEnv(...)
3949 write <- handledCallJMethod(df@sdf, "writeStream")
3950 write <- callJMethod(write, "format", source)
3951 if (!is.null(outputMode)) {
3952 write <- callJMethod(write, "outputMode", outputMode)
3953 }
3954 if (!is.null(cols)) {
3955 write <- callJMethod(write, "partitionBy", cols)
3956 }
3957 if (!is.null(jtrigger)) {
3958 write <- callJMethod(write, "trigger", jtrigger)
3959 }
3960 write <- callJMethod(write, "options", options)
3961 ssq <- handledCallJMethod(write, "start")
3962 streamingQuery(ssq)
3963 })
3964
3965 #' checkpoint
3966 #'
3967 #' Returns a checkpointed version of this SparkDataFrame. Checkpointing can be used to truncate the
3968 #' logical plan, which is especially useful in iterative algorithms where the plan may grow
3969 #' exponentially. It will be saved to files inside the checkpoint directory set with
3970 #' \code{setCheckpointDir}
3971 #'
3972 #' @param x A SparkDataFrame
3973 #' @param eager whether to checkpoint this SparkDataFrame immediately
3974 #' @return a new checkpointed SparkDataFrame
3975 #' @family SparkDataFrame functions
3976 #' @aliases checkpoint,SparkDataFrame-method
3977 #' @rdname checkpoint
3978 #' @name checkpoint
3979 #' @seealso \link{setCheckpointDir}
3980 #' @examples
3981 #'\dontrun{
3982 #' setCheckpointDir("/checkpoint")
3983 #' df <- checkpoint(df)
3984 #' }
3985 #' @note checkpoint since 2.2.0
3986 setMethod("checkpoint",
3987 signature(x = "SparkDataFrame"),
3988 function(x, eager = TRUE) {
3989 df <- callJMethod(x@sdf, "checkpoint", as.logical(eager))
3990 dataFrame(df)
3991 })
3992
3993 #' localCheckpoint
3994 #'
3995 #' Returns a locally checkpointed version of this SparkDataFrame. Checkpointing can be used to
3996 #' truncate the logical plan, which is especially useful in iterative algorithms where the plan
3997 #' may grow exponentially. Local checkpoints are stored in the executors using the caching
3998 #' subsystem and therefore they are not reliable.
3999 #'
4000 #' @param x A SparkDataFrame
4001 #' @param eager whether to locally checkpoint this SparkDataFrame immediately
4002 #' @return a new locally checkpointed SparkDataFrame
4003 #' @family SparkDataFrame functions
4004 #' @aliases localCheckpoint,SparkDataFrame-method
4005 #' @rdname localCheckpoint
4006 #' @name localCheckpoint
4007 #' @examples
4008 #'\dontrun{
4009 #' df <- localCheckpoint(df)
4010 #' }
4011 #' @note localCheckpoint since 2.3.0
4012 setMethod("localCheckpoint",
4013 signature(x = "SparkDataFrame"),
4014 function(x, eager = TRUE) {
4015 df <- callJMethod(x@sdf, "localCheckpoint", as.logical(eager))
4016 dataFrame(df)
4017 })
4018
4019 #' cube
4020 #'
4021 #' Create a multi-dimensional cube for the SparkDataFrame using the specified columns.
4022 #'
4023 #' If grouping expression is missing \code{cube} creates a single global aggregate and is
4024 #' equivalent to direct application of \link{agg}.
4025 #'
4026 #' @param x a SparkDataFrame.
4027 #' @param ... character name(s) or Column(s) to group on.
4028 #' @return A GroupedData.
4029 #' @family SparkDataFrame functions
4030 #' @aliases cube,SparkDataFrame-method
4031 #' @rdname cube
4032 #' @name cube
4033 #' @examples
4034 #' \dontrun{
4035 #' df <- createDataFrame(mtcars)
4036 #' mean(cube(df, "cyl", "gear", "am"), "mpg")
4037 #'
4038 #' # Following calls are equivalent
4039 #' agg(cube(df), mean(df$mpg))
4040 #' agg(df, mean(df$mpg))
4041 #' }
4042 #' @note cube since 2.3.0
4043 #' @seealso \link{agg}, \link{groupBy}, \link{rollup}
4044 setMethod("cube",
4045 signature(x = "SparkDataFrame"),
4046 function(x, ...) {
4047 cols <- list(...)
4048 jcol <- lapply(cols, function(x) if (class(x) == "Column") x@jc else column(x)@jc)
4049 sgd <- callJMethod(x@sdf, "cube", jcol)
4050 groupedData(sgd)
4051 })
4052
4053 #' rollup
4054 #'
4055 #' Create a multi-dimensional rollup for the SparkDataFrame using the specified columns.
4056 #'
4057 #' If grouping expression is missing \code{rollup} creates a single global aggregate and is
4058 #' equivalent to direct application of \link{agg}.
4059 #'
4060 #' @param x a SparkDataFrame.
4061 #' @param ... character name(s) or Column(s) to group on.
4062 #' @return A GroupedData.
4063 #' @family SparkDataFrame functions
4064 #' @aliases rollup,SparkDataFrame-method
4065 #' @rdname rollup
4066 #' @name rollup
4067 #' @examples
4068 #'\dontrun{
4069 #' df <- createDataFrame(mtcars)
4070 #' mean(rollup(df, "cyl", "gear", "am"), "mpg")
4071 #'
4072 #' # Following calls are equivalent
4073 #' agg(rollup(df), mean(df$mpg))
4074 #' agg(df, mean(df$mpg))
4075 #' }
4076 #' @note rollup since 2.3.0
4077 #' @seealso \link{agg}, \link{cube}, \link{groupBy}
4078 setMethod("rollup",
4079 signature(x = "SparkDataFrame"),
4080 function(x, ...) {
4081 cols <- list(...)
4082 jcol <- lapply(cols, function(x) if (class(x) == "Column") x@jc else column(x)@jc)
4083 sgd <- callJMethod(x@sdf, "rollup", jcol)
4084 groupedData(sgd)
4085 })
4086
4087 #' hint
4088 #'
4089 #' Specifies execution plan hint and return a new SparkDataFrame.
4090 #'
4091 #' @param x a SparkDataFrame.
4092 #' @param name a name of the hint.
4093 #' @param ... optional parameters for the hint.
4094 #' @return A SparkDataFrame.
4095 #' @family SparkDataFrame functions
4096 #' @aliases hint,SparkDataFrame,character-method
4097 #' @rdname hint
4098 #' @name hint
4099 #' @examples
4100 #' \dontrun{
4101 #' df <- createDataFrame(mtcars)
4102 #' avg_mpg <- mean(groupBy(createDataFrame(mtcars), "cyl"), "mpg")
4103 #'
4104 #' head(join(df, hint(avg_mpg, "broadcast"), df$cyl == avg_mpg$cyl))
4105 #' }
4106 #' @note hint since 2.2.0
4107 setMethod("hint",
4108 signature(x = "SparkDataFrame", name = "character"),
4109 function(x, name, ...) {
4110 parameters <- list(...)
4111 if (!all(sapply(parameters, function(y) {
4112 if (is.character(y) || is.numeric(y)) {
4113 TRUE
4114 } else if (is.list(y)) {
4115 all(sapply(y, function(z) { is.character(z) || is.numeric(z) }))
4116 } else {
4117 FALSE
4118 }
4119 }))) {
4120 stop("sql hint should be character, numeric, or list with character or numeric.")
4121 }
4122 jdf <- callJMethod(x@sdf, "hint", name, parameters)
4123 dataFrame(jdf)
4124 })
4125
4126 #' alias
4127 #'
4128 #' @aliases alias,SparkDataFrame-method
4129 #' @family SparkDataFrame functions
4130 #' @rdname alias
4131 #' @name alias
4132 #' @examples
4133 #' \dontrun{
4134 #' df <- alias(createDataFrame(mtcars), "mtcars")
4135 #' avg_mpg <- alias(agg(groupBy(df, df$cyl), avg(df$mpg)), "avg_mpg")
4136 #'
4137 #' head(select(df, column("mtcars.mpg")))
4138 #' head(join(df, avg_mpg, column("mtcars.cyl") == column("avg_mpg.cyl")))
4139 #' }
4140 #' @note alias(SparkDataFrame) since 2.3.0
4141 setMethod("alias",
4142 signature(object = "SparkDataFrame"),
4143 function(object, data) {
4144 stopifnot(is.character(data))
4145 sdf <- callJMethod(object@sdf, "alias", data)
4146 dataFrame(sdf)
4147 })
4148
4149 #' broadcast
4150 #'
4151 #' Return a new SparkDataFrame marked as small enough for use in broadcast joins.
4152 #'
4153 #' Equivalent to \code{hint(x, "broadcast")}.
4154 #'
4155 #' @param x a SparkDataFrame.
4156 #' @return a SparkDataFrame.
4157 #'
4158 #' @aliases broadcast,SparkDataFrame-method
4159 #' @family SparkDataFrame functions
4160 #' @rdname broadcast
4161 #' @name broadcast
4162 #' @examples
4163 #' \dontrun{
4164 #' df <- createDataFrame(mtcars)
4165 #' avg_mpg <- mean(groupBy(createDataFrame(mtcars), "cyl"), "mpg")
4166 #'
4167 #' head(join(df, broadcast(avg_mpg), df$cyl == avg_mpg$cyl))
4168 #' }
4169 #' @note broadcast since 2.3.0
4170 setMethod("broadcast",
4171 signature(x = "SparkDataFrame"),
4172 function(x) {
4173 sdf <- callJStatic("org.apache.spark.sql.functions", "broadcast", x@sdf)
4174 dataFrame(sdf)
4175 })
4176
4177 #' withWatermark
4178 #'
4179 #' Defines an event time watermark for this streaming SparkDataFrame. A watermark tracks a point in
4180 #' time before which we assume no more late data is going to arrive.
4181 #'
4182 #' Spark will use this watermark for several purposes:
4183 #' \itemize{
4184 #' \item To know when a given time window aggregation can be finalized and thus can be emitted
4185 #' when using output modes that do not allow updates.
4186 #' \item To minimize the amount of state that we need to keep for on-going aggregations.
4187 #' }
4188 #' The current watermark is computed by looking at the \code{MAX(eventTime)} seen across
4189 #' all of the partitions in the query minus a user specified \code{delayThreshold}. Due to the cost
4190 #' of coordinating this value across partitions, the actual watermark used is only guaranteed
4191 #' to be at least \code{delayThreshold} behind the actual event time. In some cases we may still
4192 #' process records that arrive more than \code{delayThreshold} late.
4193 #'
4194 #' @param x a streaming SparkDataFrame
4195 #' @param eventTime a string specifying the name of the Column that contains the event time of the
4196 #' row.
4197 #' @param delayThreshold a string specifying the minimum delay to wait to data to arrive late,
4198 #' relative to the latest record that has been processed in the form of an
4199 #' interval (e.g. "1 minute" or "5 hours"). NOTE: This should not be negative.
4200 #' @return a SparkDataFrame.
4201 #' @aliases withWatermark,SparkDataFrame,character,character-method
4202 #' @family SparkDataFrame functions
4203 #' @rdname withWatermark
4204 #' @name withWatermark
4205 #' @examples
4206 #' \dontrun{
4207 #' sparkR.session()
4208 #' schema <- structType(structField("time", "timestamp"), structField("value", "double"))
4209 #' df <- read.stream("json", path = jsonDir, schema = schema, maxFilesPerTrigger = 1)
4210 #' df <- withWatermark(df, "time", "10 minutes")
4211 #' }
4212 #' @note withWatermark since 2.3.0
4213 setMethod("withWatermark",
4214 signature(x = "SparkDataFrame", eventTime = "character", delayThreshold = "character"),
4215 function(x, eventTime, delayThreshold) {
4216 sdf <- callJMethod(x@sdf, "withWatermark", eventTime, delayThreshold)
4217 dataFrame(sdf)
4218 })