Back to home page

OSCL-LXR

 
 

    


0001 #
0002 # Licensed to the Apache Software Foundation (ASF) under one or more
0003 # contributor license agreements.  See the NOTICE file distributed with
0004 # this work for additional information regarding copyright ownership.
0005 # The ASF licenses this file to You under the Apache License, Version 2.0
0006 # (the "License"); you may not use this file except in compliance with
0007 # the License.  You may obtain a copy of the License at
0008 #
0009 #    http://www.apache.org/licenses/LICENSE-2.0
0010 #
0011 # Unless required by applicable law or agreed to in writing, software
0012 # distributed under the License is distributed on an "AS IS" BASIS,
0013 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014 # See the License for the specific language governing permissions and
0015 # limitations under the License.
0016 #
0017 
0018 # DataFrame.R - SparkDataFrame class and methods implemented in S4 OO classes
0019 
0020 #' @include generics.R jobj.R schema.R RDD.R pairRDD.R column.R group.R
0021 NULL
0022 
0023 setOldClass("jobj")
0024 setOldClass("structType")
0025 
0026 #' S4 class that represents a SparkDataFrame
0027 #'
0028 #' SparkDataFrames can be created using functions like \link{createDataFrame},
0029 #' \link{read.json}, \link{table} etc.
0030 #'
0031 #' @family SparkDataFrame functions
0032 #' @rdname SparkDataFrame
0033 #' @docType class
0034 #'
0035 #' @slot env An R environment that stores bookkeeping states of the SparkDataFrame
0036 #' @slot sdf A Java object reference to the backing Scala DataFrame
0037 #' @seealso \link{createDataFrame}, \link{read.json}, \link{table}
0038 #' @seealso \url{https://spark.apache.org/docs/latest/sparkr.html#sparkr-dataframes}
0039 #' @examples
0040 #'\dontrun{
0041 #' sparkR.session()
0042 #' df <- createDataFrame(faithful)
0043 #'}
0044 #' @note SparkDataFrame since 2.0.0
0045 setClass("SparkDataFrame",
0046          slots = list(env = "environment",
0047                       sdf = "jobj"))
0048 
0049 setMethod("initialize", "SparkDataFrame", function(.Object, sdf, isCached) {
0050   .Object@env <- new.env()
0051   .Object@env$isCached <- isCached
0052 
0053   .Object@sdf <- sdf
0054   .Object
0055 })
0056 
0057 #' Set options/mode and then return the write object
0058 #' @noRd
0059 setWriteOptions <- function(write, path = NULL, mode = "error", ...) {
0060   options <- varargsToStrEnv(...)
0061   if (!is.null(path)) {
0062     options[["path"]] <- path
0063   }
0064   write <- setWriteMode(write, mode)
0065   write <- callJMethod(write, "options", options)
0066   write
0067 }
0068 
0069 #' Set mode and then return the write object
0070 #' @noRd
0071 setWriteMode <- function(write, mode) {
0072   if (!is.character(mode)) {
0073     stop("mode should be character or omitted. It is 'error' by default.")
0074   }
0075   write <- handledCallJMethod(write, "mode", mode)
0076   write
0077 }
0078 
0079 #' @param sdf A Java object reference to the backing Scala DataFrame
0080 #' @param isCached TRUE if the SparkDataFrame is cached
0081 #' @noRd
0082 dataFrame <- function(sdf, isCached = FALSE) {
0083   new("SparkDataFrame", sdf, isCached)
0084 }
0085 
0086 ############################ SparkDataFrame Methods ##############################################
0087 
0088 #' Print Schema of a SparkDataFrame
0089 #'
0090 #' Prints out the schema in tree format
0091 #'
0092 #' @param x A SparkDataFrame
0093 #'
0094 #' @family SparkDataFrame functions
0095 #' @rdname printSchema
0096 #' @name printSchema
0097 #' @aliases printSchema,SparkDataFrame-method
0098 #' @examples
0099 #'\dontrun{
0100 #' sparkR.session()
0101 #' path <- "path/to/file.json"
0102 #' df <- read.json(path)
0103 #' printSchema(df)
0104 #'}
0105 #' @note printSchema since 1.4.0
0106 setMethod("printSchema",
0107           signature(x = "SparkDataFrame"),
0108           function(x) {
0109             schemaString <- callJMethod(schema(x)$jobj, "treeString")
0110             cat(schemaString)
0111           })
0112 
0113 #' Get schema object
0114 #'
0115 #' Returns the schema of this SparkDataFrame as a structType object.
0116 #'
0117 #' @param x A SparkDataFrame
0118 #'
0119 #' @family SparkDataFrame functions
0120 #' @rdname schema
0121 #' @name schema
0122 #' @aliases schema,SparkDataFrame-method
0123 #' @examples
0124 #'\dontrun{
0125 #' sparkR.session()
0126 #' path <- "path/to/file.json"
0127 #' df <- read.json(path)
0128 #' dfSchema <- schema(df)
0129 #'}
0130 #' @note schema since 1.4.0
0131 setMethod("schema",
0132           signature(x = "SparkDataFrame"),
0133           function(x) {
0134             structType(callJMethod(x@sdf, "schema"))
0135           })
0136 
0137 #' Explain
0138 #'
0139 #' Print the logical and physical Catalyst plans to the console for debugging.
0140 #'
0141 #' @family SparkDataFrame functions
0142 #' @aliases explain,SparkDataFrame-method
0143 #' @rdname explain
0144 #' @name explain
0145 #' @examples
0146 #'\dontrun{
0147 #' sparkR.session()
0148 #' path <- "path/to/file.json"
0149 #' df <- read.json(path)
0150 #' explain(df, TRUE)
0151 #'}
0152 #' @note explain since 1.4.0
0153 setMethod("explain",
0154           signature(x = "SparkDataFrame"),
0155           function(x, extended = FALSE) {
0156             queryExec <- callJMethod(x@sdf, "queryExecution")
0157             if (extended) {
0158               cat(callJMethod(queryExec, "toString"))
0159             } else {
0160               execPlan <- callJMethod(queryExec, "executedPlan")
0161               cat(callJMethod(execPlan, "toString"))
0162             }
0163           })
0164 
0165 #' isLocal
0166 #'
0167 #' Returns True if the \code{collect} and \code{take} methods can be run locally
0168 #' (without any Spark executors).
0169 #'
0170 #' @param x A SparkDataFrame
0171 #'
0172 #' @family SparkDataFrame functions
0173 #' @rdname isLocal
0174 #' @name isLocal
0175 #' @aliases isLocal,SparkDataFrame-method
0176 #' @examples
0177 #'\dontrun{
0178 #' sparkR.session()
0179 #' path <- "path/to/file.json"
0180 #' df <- read.json(path)
0181 #' isLocal(df)
0182 #'}
0183 #' @note isLocal since 1.4.0
0184 setMethod("isLocal",
0185           signature(x = "SparkDataFrame"),
0186           function(x) {
0187             callJMethod(x@sdf, "isLocal")
0188           })
0189 
0190 #' showDF
0191 #'
0192 #' Print the first numRows rows of a SparkDataFrame
0193 #'
0194 #' @param x a SparkDataFrame.
0195 #' @param numRows the number of rows to print. Defaults to 20.
0196 #' @param truncate whether truncate long strings. If \code{TRUE}, strings more than
0197 #'                 20 characters will be truncated. However, if set greater than zero,
0198 #'                 truncates strings longer than \code{truncate} characters and all cells
0199 #'                 will be aligned right.
0200 #' @param vertical whether print output rows vertically (one line per column value).
0201 #' @param ... further arguments to be passed to or from other methods.
0202 #' @family SparkDataFrame functions
0203 #' @aliases showDF,SparkDataFrame-method
0204 #' @rdname showDF
0205 #' @name showDF
0206 #' @examples
0207 #'\dontrun{
0208 #' sparkR.session()
0209 #' path <- "path/to/file.json"
0210 #' df <- read.json(path)
0211 #' showDF(df)
0212 #'}
0213 #' @note showDF since 1.4.0
0214 setMethod("showDF",
0215           signature(x = "SparkDataFrame"),
0216           function(x, numRows = 20, truncate = TRUE, vertical = FALSE) {
0217             if (is.logical(truncate) && truncate) {
0218               s <- callJMethod(x@sdf, "showString", numToInt(numRows), numToInt(20), vertical)
0219             } else {
0220               truncate2 <- as.numeric(truncate)
0221               s <- callJMethod(x@sdf, "showString", numToInt(numRows), numToInt(truncate2),
0222                                vertical)
0223             }
0224             cat(s)
0225           })
0226 
0227 #' show
0228 #'
0229 #' If eager evaluation is enabled and the Spark object is a SparkDataFrame, evaluate the
0230 #' SparkDataFrame and print top rows of the SparkDataFrame, otherwise, print the class
0231 #' and type information of the Spark object.
0232 #'
0233 #' @param object a Spark object. Can be a SparkDataFrame, Column, GroupedData, WindowSpec.
0234 #'
0235 #' @family SparkDataFrame functions
0236 #' @rdname show
0237 #' @aliases show,SparkDataFrame-method
0238 #' @name show
0239 #' @examples
0240 #'\dontrun{
0241 #' sparkR.session()
0242 #' path <- "path/to/file.json"
0243 #' df <- read.json(path)
0244 #' show(df)
0245 #'}
0246 #' @note show(SparkDataFrame) since 1.4.0
0247 setMethod("show", "SparkDataFrame",
0248           function(object) {
0249             allConf <- sparkR.conf()
0250             prop <- allConf[["spark.sql.repl.eagerEval.enabled"]]
0251             if (!is.null(prop) && identical(prop, "true")) {
0252               argsList <- list()
0253               argsList$x <- object
0254               prop <- allConf[["spark.sql.repl.eagerEval.maxNumRows"]]
0255               if (!is.null(prop)) {
0256                 numRows <- as.integer(prop)
0257                 if (numRows > 0) {
0258                   argsList$numRows <- numRows
0259                 }
0260               }
0261               prop <- allConf[["spark.sql.repl.eagerEval.truncate"]]
0262               if (!is.null(prop)) {
0263                 truncate <- as.integer(prop)
0264                 if (truncate > 0) {
0265                   argsList$truncate <- truncate
0266                 }
0267               }
0268               do.call(showDF, argsList)
0269             } else {
0270               cols <- lapply(dtypes(object), function(l) {
0271                 paste(l, collapse = ":")
0272               })
0273               s <- paste(cols, collapse = ", ")
0274               cat(paste0(class(object), "[", s, "]\n"))
0275             }
0276           })
0277 
0278 #' DataTypes
0279 #'
0280 #' Return all column names and their data types as a list
0281 #'
0282 #' @param x A SparkDataFrame
0283 #'
0284 #' @family SparkDataFrame functions
0285 #' @rdname dtypes
0286 #' @name dtypes
0287 #' @aliases dtypes,SparkDataFrame-method
0288 #' @examples
0289 #'\dontrun{
0290 #' sparkR.session()
0291 #' path <- "path/to/file.json"
0292 #' df <- read.json(path)
0293 #' dtypes(df)
0294 #'}
0295 #' @note dtypes since 1.4.0
0296 setMethod("dtypes",
0297           signature(x = "SparkDataFrame"),
0298           function(x) {
0299             lapply(schema(x)$fields(), function(f) {
0300               c(f$name(), f$dataType.simpleString())
0301             })
0302           })
0303 
0304 #' Column Names of SparkDataFrame
0305 #'
0306 #' Return a vector of column names.
0307 #'
0308 #' @param x a SparkDataFrame.
0309 #'
0310 #' @family SparkDataFrame functions
0311 #' @rdname columns
0312 #' @name columns
0313 #' @aliases columns,SparkDataFrame-method
0314 #' @examples
0315 #'\dontrun{
0316 #' sparkR.session()
0317 #' path <- "path/to/file.json"
0318 #' df <- read.json(path)
0319 #' columns(df)
0320 #' colnames(df)
0321 #'}
0322 #' @note columns since 1.4.0
0323 setMethod("columns",
0324           signature(x = "SparkDataFrame"),
0325           function(x) {
0326             sapply(schema(x)$fields(), function(f) {
0327               f$name()
0328             })
0329           })
0330 
0331 #' @rdname columns
0332 #' @name names
0333 #' @aliases names,SparkDataFrame-method
0334 #' @note names since 1.5.0
0335 setMethod("names",
0336           signature(x = "SparkDataFrame"),
0337           function(x) {
0338             columns(x)
0339           })
0340 
0341 #' @rdname columns
0342 #' @aliases names<-,SparkDataFrame-method
0343 #' @name names<-
0344 #' @note names<- since 1.5.0
0345 setMethod("names<-",
0346           signature(x = "SparkDataFrame"),
0347           function(x, value) {
0348             colnames(x) <- value
0349             x
0350           })
0351 
0352 #' @rdname columns
0353 #' @aliases colnames,SparkDataFrame-method
0354 #' @name colnames
0355 #' @note colnames since 1.6.0
0356 setMethod("colnames",
0357           signature(x = "SparkDataFrame"),
0358           function(x) {
0359             columns(x)
0360           })
0361 
0362 #' @param value a character vector. Must have the same length as the number
0363 #'              of columns to be renamed.
0364 #' @rdname columns
0365 #' @aliases colnames<-,SparkDataFrame-method
0366 #' @name colnames<-
0367 #' @note colnames<- since 1.6.0
0368 setMethod("colnames<-",
0369           signature(x = "SparkDataFrame"),
0370           function(x, value) {
0371 
0372             # Check parameter integrity
0373             if (class(value) != "character") {
0374               stop("Invalid column names.")
0375             }
0376 
0377             if (length(value) != ncol(x)) {
0378               stop(
0379                 "Column names must have the same length as the number of columns in the dataset.")
0380             }
0381 
0382             if (any(is.na(value))) {
0383               stop("Column names cannot be NA.")
0384             }
0385 
0386             # Check if the column names have . in it
0387             if (any(regexec(".", value, fixed = TRUE)[[1]][1] != -1)) {
0388               stop("Column names cannot contain the '.' symbol.")
0389             }
0390 
0391             sdf <- callJMethod(x@sdf, "toDF", as.list(value))
0392             dataFrame(sdf)
0393           })
0394 
0395 #' coltypes
0396 #'
0397 #' Get column types of a SparkDataFrame
0398 #'
0399 #' @param x A SparkDataFrame
0400 #' @return value A character vector with the column types of the given SparkDataFrame
0401 #' @rdname coltypes
0402 #' @aliases coltypes,SparkDataFrame-method
0403 #' @name coltypes
0404 #' @family SparkDataFrame functions
0405 #' @examples
0406 #'\dontrun{
0407 #' irisDF <- createDataFrame(iris)
0408 #' coltypes(irisDF) # get column types
0409 #'}
0410 #' @note coltypes since 1.6.0
0411 setMethod("coltypes",
0412           signature(x = "SparkDataFrame"),
0413           function(x) {
0414             # Get the data types of the SparkDataFrame by invoking dtypes() function
0415             types <- sapply(dtypes(x), function(x) {x[[2]]})
0416 
0417             # Map Spark data types into R's data types using DATA_TYPES environment
0418             rTypes <- sapply(types, USE.NAMES = F, FUN = function(x) {
0419               # Check for primitive types
0420               type <- PRIMITIVE_TYPES[[x]]
0421 
0422               if (is.null(type)) {
0423                 # Check for complex types
0424                 for (t in names(COMPLEX_TYPES)) {
0425                   if (substring(x, 1, nchar(t)) == t) {
0426                     type <- COMPLEX_TYPES[[t]]
0427                     break
0428                   }
0429                 }
0430 
0431                 if (is.null(type)) {
0432                   specialtype <- specialtypeshandle(x)
0433                   if (is.null(specialtype)) {
0434                     stop("Unsupported data type: ", x)
0435                   }
0436                   type <- PRIMITIVE_TYPES[[specialtype]]
0437                 }
0438               }
0439               type[[1]]
0440             })
0441 
0442             # Find which types don't have mapping to R
0443             naIndices <- which(is.na(rTypes))
0444 
0445             # Assign the original scala data types to the unmatched ones
0446             rTypes[naIndices] <- types[naIndices]
0447 
0448             rTypes
0449           })
0450 
0451 #' coltypes
0452 #'
0453 #' Set the column types of a SparkDataFrame.
0454 #'
0455 #' @param value A character vector with the target column types for the given
0456 #'    SparkDataFrame. Column types can be one of integer, numeric/double, character, logical, or NA
0457 #'    to keep that column as-is.
0458 #' @rdname coltypes
0459 #' @name coltypes<-
0460 #' @aliases coltypes<-,SparkDataFrame,character-method
0461 #' @examples
0462 #'\dontrun{
0463 #' sparkR.session()
0464 #' path <- "path/to/file.json"
0465 #' df <- read.json(path)
0466 #' coltypes(df) <- c("character", "integer") # set column types
0467 #' coltypes(df) <- c(NA, "numeric") # set column types
0468 #'}
0469 #' @note coltypes<- since 1.6.0
0470 setMethod("coltypes<-",
0471           signature(x = "SparkDataFrame", value = "character"),
0472           function(x, value) {
0473             cols <- columns(x)
0474             ncols <- length(cols)
0475             if (length(value) == 0) {
0476               stop("Cannot set types of an empty SparkDataFrame with no Column")
0477             }
0478             if (length(value) != ncols) {
0479               stop("Length of type vector should match the number of columns for SparkDataFrame")
0480             }
0481             newCols <- lapply(seq_len(ncols), function(i) {
0482               col <- getColumn(x, cols[i])
0483               if (!is.na(value[i])) {
0484                 stype <- rToSQLTypes[[value[i]]]
0485                 if (is.null(stype)) {
0486                   stop("Only atomic type is supported for column types")
0487                 }
0488                 cast(col, stype)
0489               } else {
0490                 col
0491               }
0492             })
0493             nx <- select(x, newCols)
0494             dataFrame(nx@sdf)
0495           })
0496 
0497 #' Creates a temporary view using the given name.
0498 #'
0499 #' Creates a new temporary view using a SparkDataFrame in the Spark Session. If a
0500 #' temporary view with the same name already exists, replaces it.
0501 #'
0502 #' @param x A SparkDataFrame
0503 #' @param viewName A character vector containing the name of the table
0504 #'
0505 #' @family SparkDataFrame functions
0506 #' @rdname createOrReplaceTempView
0507 #' @name createOrReplaceTempView
0508 #' @aliases createOrReplaceTempView,SparkDataFrame,character-method
0509 #' @examples
0510 #'\dontrun{
0511 #' sparkR.session()
0512 #' path <- "path/to/file.json"
0513 #' df <- read.json(path)
0514 #' createOrReplaceTempView(df, "json_df")
0515 #' new_df <- sql("SELECT * FROM json_df")
0516 #'}
0517 #' @note createOrReplaceTempView since 2.0.0
0518 setMethod("createOrReplaceTempView",
0519           signature(x = "SparkDataFrame", viewName = "character"),
0520           function(x, viewName) {
0521               invisible(callJMethod(x@sdf, "createOrReplaceTempView", viewName))
0522           })
0523 
0524 #' (Deprecated) Register Temporary Table
0525 #'
0526 #' Registers a SparkDataFrame as a Temporary Table in the SparkSession
0527 #' @param x A SparkDataFrame
0528 #' @param tableName A character vector containing the name of the table
0529 #'
0530 #' @seealso \link{createOrReplaceTempView}
0531 #' @rdname registerTempTable-deprecated
0532 #' @name registerTempTable
0533 #' @aliases registerTempTable,SparkDataFrame,character-method
0534 #' @examples
0535 #'\dontrun{
0536 #' sparkR.session()
0537 #' path <- "path/to/file.json"
0538 #' df <- read.json(path)
0539 #' registerTempTable(df, "json_df")
0540 #' new_df <- sql("SELECT * FROM json_df")
0541 #'}
0542 #' @note registerTempTable since 1.4.0
0543 setMethod("registerTempTable",
0544           signature(x = "SparkDataFrame", tableName = "character"),
0545           function(x, tableName) {
0546               .Deprecated("createOrReplaceTempView")
0547               invisible(callJMethod(x@sdf, "createOrReplaceTempView", tableName))
0548           })
0549 
0550 #' insertInto
0551 #'
0552 #' Insert the contents of a SparkDataFrame into a table registered in the current SparkSession.
0553 #'
0554 #' @param x a SparkDataFrame.
0555 #' @param tableName a character vector containing the name of the table.
0556 #' @param overwrite a logical argument indicating whether or not to overwrite.
0557 #' @param ... further arguments to be passed to or from other methods.
0558 #' the existing rows in the table.
0559 #'
0560 #' @family SparkDataFrame functions
0561 #' @rdname insertInto
0562 #' @name insertInto
0563 #' @aliases insertInto,SparkDataFrame,character-method
0564 #' @examples
0565 #'\dontrun{
0566 #' sparkR.session()
0567 #' df <- read.df(path, "parquet")
0568 #' df2 <- read.df(path2, "parquet")
0569 #' saveAsTable(df, "table1")
0570 #' insertInto(df2, "table1", overwrite = TRUE)
0571 #'}
0572 #' @note insertInto since 1.4.0
0573 setMethod("insertInto",
0574           signature(x = "SparkDataFrame", tableName = "character"),
0575           function(x, tableName, overwrite = FALSE) {
0576             write <- callJMethod(x@sdf, "write")
0577             write <- setWriteMode(write, ifelse(overwrite, "overwrite", "append"))
0578             invisible(callJMethod(write, "insertInto", tableName))
0579           })
0580 
0581 #' Cache
0582 #'
0583 #' Persist with the default storage level (MEMORY_ONLY).
0584 #'
0585 #' @param x A SparkDataFrame
0586 #'
0587 #' @family SparkDataFrame functions
0588 #' @aliases cache,SparkDataFrame-method
0589 #' @rdname cache
0590 #' @name cache
0591 #' @examples
0592 #'\dontrun{
0593 #' sparkR.session()
0594 #' path <- "path/to/file.json"
0595 #' df <- read.json(path)
0596 #' cache(df)
0597 #'}
0598 #' @note cache since 1.4.0
0599 setMethod("cache",
0600           signature(x = "SparkDataFrame"),
0601           function(x) {
0602             cached <- callJMethod(x@sdf, "cache")
0603             x@env$isCached <- TRUE
0604             x
0605           })
0606 
0607 #' Persist
0608 #'
0609 #' Persist this SparkDataFrame with the specified storage level. For details of the
0610 #' supported storage levels, refer to
0611 #' \url{http://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence}.
0612 #'
0613 #' @param x the SparkDataFrame to persist.
0614 #' @param newLevel storage level chosen for the persistence. See available options in
0615 #'        the description.
0616 #'
0617 #' @family SparkDataFrame functions
0618 #' @rdname persist
0619 #' @name persist
0620 #' @aliases persist,SparkDataFrame,character-method
0621 #' @examples
0622 #'\dontrun{
0623 #' sparkR.session()
0624 #' path <- "path/to/file.json"
0625 #' df <- read.json(path)
0626 #' persist(df, "MEMORY_AND_DISK")
0627 #'}
0628 #' @note persist since 1.4.0
0629 setMethod("persist",
0630           signature(x = "SparkDataFrame", newLevel = "character"),
0631           function(x, newLevel) {
0632             callJMethod(x@sdf, "persist", getStorageLevel(newLevel))
0633             x@env$isCached <- TRUE
0634             x
0635           })
0636 
0637 #' Unpersist
0638 #'
0639 #' Mark this SparkDataFrame as non-persistent, and remove all blocks for it from memory and
0640 #' disk.
0641 #'
0642 #' @param x the SparkDataFrame to unpersist.
0643 #' @param blocking whether to block until all blocks are deleted.
0644 #' @param ... further arguments to be passed to or from other methods.
0645 #'
0646 #' @family SparkDataFrame functions
0647 #' @rdname unpersist
0648 #' @aliases unpersist,SparkDataFrame-method
0649 #' @name unpersist
0650 #' @examples
0651 #'\dontrun{
0652 #' sparkR.session()
0653 #' path <- "path/to/file.json"
0654 #' df <- read.json(path)
0655 #' persist(df, "MEMORY_AND_DISK")
0656 #' unpersist(df)
0657 #'}
0658 #' @note unpersist since 1.4.0
0659 setMethod("unpersist",
0660           signature(x = "SparkDataFrame"),
0661           function(x, blocking = TRUE) {
0662             callJMethod(x@sdf, "unpersist", blocking)
0663             x@env$isCached <- FALSE
0664             x
0665           })
0666 
0667 #' StorageLevel
0668 #'
0669 #' Get storagelevel of this SparkDataFrame.
0670 #'
0671 #' @param x the SparkDataFrame to get the storageLevel.
0672 #'
0673 #' @family SparkDataFrame functions
0674 #' @rdname storageLevel
0675 #' @aliases storageLevel,SparkDataFrame-method
0676 #' @name storageLevel
0677 #' @examples
0678 #'\dontrun{
0679 #' sparkR.session()
0680 #' path <- "path/to/file.json"
0681 #' df <- read.json(path)
0682 #' persist(df, "MEMORY_AND_DISK")
0683 #' storageLevel(df)
0684 #'}
0685 #' @note storageLevel since 2.1.0
0686 setMethod("storageLevel",
0687           signature(x = "SparkDataFrame"),
0688           function(x) {
0689             storageLevelToString(callJMethod(x@sdf, "storageLevel"))
0690           })
0691 
0692 #' Coalesce
0693 #'
0694 #' Returns a new SparkDataFrame that has exactly \code{numPartitions} partitions.
0695 #' This operation results in a narrow dependency, e.g. if you go from 1000 partitions to 100
0696 #' partitions, there will not be a shuffle, instead each of the 100 new partitions will claim 10 of
0697 #' the current partitions. If a larger number of partitions is requested, it will stay at the
0698 #' current number of partitions.
0699 #'
0700 #' However, if you're doing a drastic coalesce on a SparkDataFrame, e.g. to numPartitions = 1,
0701 #' this may result in your computation taking place on fewer nodes than
0702 #' you like (e.g. one node in the case of numPartitions = 1). To avoid this,
0703 #' call \code{repartition}. This will add a shuffle step, but means the
0704 #' current upstream partitions will be executed in parallel (per whatever
0705 #' the current partitioning is).
0706 #'
0707 #' @param numPartitions the number of partitions to use.
0708 #'
0709 #' @family SparkDataFrame functions
0710 #' @rdname coalesce
0711 #' @name coalesce
0712 #' @aliases coalesce,SparkDataFrame-method
0713 #' @seealso \link{repartition}, \link{repartitionByRange}
0714 #' @examples
0715 #'\dontrun{
0716 #' sparkR.session()
0717 #' path <- "path/to/file.json"
0718 #' df <- read.json(path)
0719 #' newDF <- coalesce(df, 1L)
0720 #'}
0721 #' @note coalesce(SparkDataFrame) since 2.1.1
0722 setMethod("coalesce",
0723           signature(x = "SparkDataFrame"),
0724           function(x, numPartitions) {
0725             stopifnot(is.numeric(numPartitions))
0726             sdf <- callJMethod(x@sdf, "coalesce", numToInt(numPartitions))
0727             dataFrame(sdf)
0728           })
0729 
0730 #' Repartition
0731 #'
0732 #' The following options for repartition are possible:
0733 #' \itemize{
0734 #'  \item{1.} {Return a new SparkDataFrame that has exactly \code{numPartitions}.}
0735 #'  \item{2.} {Return a new SparkDataFrame hash partitioned by
0736 #'                      the given columns into \code{numPartitions}.}
0737 #'  \item{3.} {Return a new SparkDataFrame hash partitioned by the given column(s),
0738 #'                      using \code{spark.sql.shuffle.partitions} as number of partitions.}
0739 #'}
0740 #' @param x a SparkDataFrame.
0741 #' @param numPartitions the number of partitions to use.
0742 #' @param col the column by which the partitioning will be performed.
0743 #' @param ... additional column(s) to be used in the partitioning.
0744 #'
0745 #' @family SparkDataFrame functions
0746 #' @rdname repartition
0747 #' @name repartition
0748 #' @aliases repartition,SparkDataFrame-method
0749 #' @seealso \link{coalesce}, \link{repartitionByRange}
0750 #' @examples
0751 #'\dontrun{
0752 #' sparkR.session()
0753 #' path <- "path/to/file.json"
0754 #' df <- read.json(path)
0755 #' newDF <- repartition(df, 2L)
0756 #' newDF <- repartition(df, numPartitions = 2L)
0757 #' newDF <- repartition(df, col = df$"col1", df$"col2")
0758 #' newDF <- repartition(df, 3L, col = df$"col1", df$"col2")
0759 #'}
0760 #' @note repartition since 1.4.0
0761 setMethod("repartition",
0762           signature(x = "SparkDataFrame"),
0763           function(x, numPartitions = NULL, col = NULL, ...) {
0764             if (!is.null(numPartitions) && is.numeric(numPartitions)) {
0765               # number of partitions and columns both are specified
0766               if (!is.null(col) && class(col) == "Column") {
0767                 cols <- list(col, ...)
0768                 jcol <- lapply(cols, function(c) { c@jc })
0769                 sdf <- callJMethod(x@sdf, "repartition", numToInt(numPartitions), jcol)
0770               } else {
0771                 # only number of partitions is specified
0772                 sdf <- callJMethod(x@sdf, "repartition", numToInt(numPartitions))
0773               }
0774             } else if (!is.null(col) && class(col) == "Column") {
0775               # only columns are specified
0776               cols <- list(col, ...)
0777               jcol <- lapply(cols, function(c) { c@jc })
0778               sdf <- callJMethod(x@sdf, "repartition", jcol)
0779             } else {
0780               stop("Please, specify the number of partitions and/or a column(s)")
0781             }
0782             dataFrame(sdf)
0783           })
0784 
0785 
0786 #' Repartition by range
0787 #'
0788 #' The following options for repartition by range are possible:
0789 #' \itemize{
0790 #'  \item{1.} {Return a new SparkDataFrame range partitioned by
0791 #'                      the given columns into \code{numPartitions}.}
0792 #'  \item{2.} {Return a new SparkDataFrame range partitioned by the given column(s),
0793 #'                      using \code{spark.sql.shuffle.partitions} as number of partitions.}
0794 #'}
0795 #' At least one partition-by expression must be specified.
0796 #' When no explicit sort order is specified, "ascending nulls first" is assumed.
0797 #'
0798 #' Note that due to performance reasons this method uses sampling to estimate the ranges.
0799 #' Hence, the output may not be consistent, since sampling can return different values.
0800 #' The sample size can be controlled by the config
0801 #' \code{spark.sql.execution.rangeExchange.sampleSizePerPartition}.
0802 #'
0803 #' @param x a SparkDataFrame.
0804 #' @param numPartitions the number of partitions to use.
0805 #' @param col the column by which the range partitioning will be performed.
0806 #' @param ... additional column(s) to be used in the range partitioning.
0807 #'
0808 #' @family SparkDataFrame functions
0809 #' @rdname repartitionByRange
0810 #' @name repartitionByRange
0811 #' @aliases repartitionByRange,SparkDataFrame-method
0812 #' @seealso \link{repartition}, \link{coalesce}
0813 #' @examples
0814 #'\dontrun{
0815 #' sparkR.session()
0816 #' path <- "path/to/file.json"
0817 #' df <- read.json(path)
0818 #' newDF <- repartitionByRange(df, col = df$col1, df$col2)
0819 #' newDF <- repartitionByRange(df, 3L, col = df$col1, df$col2)
0820 #'}
0821 #' @note repartitionByRange since 2.4.0
0822 setMethod("repartitionByRange",
0823           signature(x = "SparkDataFrame"),
0824           function(x, numPartitions = NULL, col = NULL, ...) {
0825             if (!is.null(numPartitions) && !is.null(col)) {
0826               # number of partitions and columns both are specified
0827               if (is.numeric(numPartitions) && class(col) == "Column") {
0828                 cols <- list(col, ...)
0829                 jcol <- lapply(cols, function(c) { c@jc })
0830                 sdf <- callJMethod(x@sdf, "repartitionByRange", numToInt(numPartitions), jcol)
0831               } else {
0832                 stop("numPartitions and col must be numeric and Column; however, got ",
0833                      class(numPartitions), " and ", class(col))
0834               }
0835             } else if (!is.null(col))  {
0836               # only columns are specified
0837               if (class(col) == "Column") {
0838                 cols <- list(col, ...)
0839                 jcol <- lapply(cols, function(c) { c@jc })
0840                 sdf <- callJMethod(x@sdf, "repartitionByRange", jcol)
0841               } else {
0842                 stop("col must be Column; however, got ", class(col))
0843               }
0844             } else if (!is.null(numPartitions)) {
0845               # only numPartitions is specified
0846               stop("At least one partition-by column must be specified.")
0847             } else {
0848               stop("Please, specify a column(s) or the number of partitions with a column(s)")
0849             }
0850             dataFrame(sdf)
0851           })
0852 
0853 #' toJSON
0854 #'
0855 #' Converts a SparkDataFrame into a SparkDataFrame of JSON string.
0856 #' Each row is turned into a JSON document with columns as different fields.
0857 #' The returned SparkDataFrame has a single character column with the name \code{value}
0858 #'
0859 #' @param x a SparkDataFrame
0860 #' @return a SparkDataFrame
0861 #' @family SparkDataFrame functions
0862 #' @rdname toJSON
0863 #' @name toJSON
0864 #' @aliases toJSON,SparkDataFrame-method
0865 #' @examples
0866 #'\dontrun{
0867 #' sparkR.session()
0868 #' path <- "path/to/file.parquet"
0869 #' df <- read.parquet(path)
0870 #' df_json <- toJSON(df)
0871 #'}
0872 #' @note toJSON since 2.2.0
0873 setMethod("toJSON",
0874           signature(x = "SparkDataFrame"),
0875           function(x) {
0876             jsonDS <- callJMethod(x@sdf, "toJSON")
0877             df <- callJMethod(jsonDS, "toDF")
0878             dataFrame(df)
0879           })
0880 
0881 #' Save the contents of SparkDataFrame as a JSON file
0882 #'
0883 #' Save the contents of a SparkDataFrame as a JSON file (\href{http://jsonlines.org/}{
0884 #' JSON Lines text format or newline-delimited JSON}). Files written out
0885 #' with this method can be read back in as a SparkDataFrame using read.json().
0886 #'
0887 #' @param x A SparkDataFrame
0888 #' @param path The directory where the file is saved
0889 #' @param mode one of 'append', 'overwrite', 'error', 'errorifexists', 'ignore'
0890 #'             save mode (it is 'error' by default)
0891 #' @param ... additional argument(s) passed to the method.
0892 #'
0893 #' @family SparkDataFrame functions
0894 #' @rdname write.json
0895 #' @name write.json
0896 #' @aliases write.json,SparkDataFrame,character-method
0897 #' @examples
0898 #'\dontrun{
0899 #' sparkR.session()
0900 #' path <- "path/to/file.json"
0901 #' df <- read.json(path)
0902 #' write.json(df, "/tmp/sparkr-tmp/")
0903 #'}
0904 #' @note write.json since 1.6.0
0905 setMethod("write.json",
0906           signature(x = "SparkDataFrame", path = "character"),
0907           function(x, path, mode = "error", ...) {
0908             write <- callJMethod(x@sdf, "write")
0909             write <- setWriteOptions(write, mode = mode, ...)
0910             invisible(handledCallJMethod(write, "json", path))
0911           })
0912 
0913 #' Save the contents of SparkDataFrame as an ORC file, preserving the schema.
0914 #'
0915 #' Save the contents of a SparkDataFrame as an ORC file, preserving the schema. Files written out
0916 #' with this method can be read back in as a SparkDataFrame using read.orc().
0917 #'
0918 #' @param x A SparkDataFrame
0919 #' @param path The directory where the file is saved
0920 #' @param mode one of 'append', 'overwrite', 'error', 'errorifexists', 'ignore'
0921 #'             save mode (it is 'error' by default)
0922 #' @param ... additional argument(s) passed to the method.
0923 #'
0924 #' @family SparkDataFrame functions
0925 #' @aliases write.orc,SparkDataFrame,character-method
0926 #' @rdname write.orc
0927 #' @name write.orc
0928 #' @examples
0929 #'\dontrun{
0930 #' sparkR.session()
0931 #' path <- "path/to/file.json"
0932 #' df <- read.json(path)
0933 #' write.orc(df, "/tmp/sparkr-tmp1/")
0934 #' }
0935 #' @note write.orc since 2.0.0
0936 setMethod("write.orc",
0937           signature(x = "SparkDataFrame", path = "character"),
0938           function(x, path, mode = "error", ...) {
0939             write <- callJMethod(x@sdf, "write")
0940             write <- setWriteOptions(write, mode = mode, ...)
0941             invisible(handledCallJMethod(write, "orc", path))
0942           })
0943 
0944 #' Save the contents of SparkDataFrame as a Parquet file, preserving the schema.
0945 #'
0946 #' Save the contents of a SparkDataFrame as a Parquet file, preserving the schema. Files written out
0947 #' with this method can be read back in as a SparkDataFrame using read.parquet().
0948 #'
0949 #' @param x A SparkDataFrame
0950 #' @param path The directory where the file is saved
0951 #' @param mode one of 'append', 'overwrite', 'error', 'errorifexists', 'ignore'
0952 #'             save mode (it is 'error' by default)
0953 #' @param ... additional argument(s) passed to the method.
0954 #'
0955 #' @family SparkDataFrame functions
0956 #' @rdname write.parquet
0957 #' @name write.parquet
0958 #' @aliases write.parquet,SparkDataFrame,character-method
0959 #' @examples
0960 #'\dontrun{
0961 #' sparkR.session()
0962 #' path <- "path/to/file.json"
0963 #' df <- read.json(path)
0964 #' write.parquet(df, "/tmp/sparkr-tmp1/")
0965 #'}
0966 #' @note write.parquet since 1.6.0
0967 setMethod("write.parquet",
0968           signature(x = "SparkDataFrame", path = "character"),
0969           function(x, path, mode = "error", ...) {
0970             write <- callJMethod(x@sdf, "write")
0971             write <- setWriteOptions(write, mode = mode, ...)
0972             invisible(handledCallJMethod(write, "parquet", path))
0973           })
0974 
0975 #' Save the content of SparkDataFrame in a text file at the specified path.
0976 #'
0977 #' Save the content of the SparkDataFrame in a text file at the specified path.
0978 #' The SparkDataFrame must have only one column of string type with the name "value".
0979 #' Each row becomes a new line in the output file. The text files will be encoded as UTF-8.
0980 #'
0981 #' @param x A SparkDataFrame
0982 #' @param path The directory where the file is saved
0983 #' @param mode one of 'append', 'overwrite', 'error', 'errorifexists', 'ignore'
0984 #'             save mode (it is 'error' by default)
0985 #' @param ... additional argument(s) passed to the method.
0986 #'
0987 #' @family SparkDataFrame functions
0988 #' @aliases write.text,SparkDataFrame,character-method
0989 #' @rdname write.text
0990 #' @name write.text
0991 #' @examples
0992 #'\dontrun{
0993 #' sparkR.session()
0994 #' path <- "path/to/file.txt"
0995 #' df <- read.text(path)
0996 #' write.text(df, "/tmp/sparkr-tmp/")
0997 #'}
0998 #' @note write.text since 2.0.0
0999 setMethod("write.text",
1000           signature(x = "SparkDataFrame", path = "character"),
1001           function(x, path, mode = "error", ...) {
1002             write <- callJMethod(x@sdf, "write")
1003             write <- setWriteOptions(write, mode = mode, ...)
1004             invisible(handledCallJMethod(write, "text", path))
1005           })
1006 
1007 #' Distinct
1008 #'
1009 #' Return a new SparkDataFrame containing the distinct rows in this SparkDataFrame.
1010 #'
1011 #' @param x A SparkDataFrame
1012 #'
1013 #' @family SparkDataFrame functions
1014 #' @aliases distinct,SparkDataFrame-method
1015 #' @rdname distinct
1016 #' @name distinct
1017 #' @examples
1018 #'\dontrun{
1019 #' sparkR.session()
1020 #' path <- "path/to/file.json"
1021 #' df <- read.json(path)
1022 #' distinctDF <- distinct(df)
1023 #'}
1024 #' @note distinct since 1.4.0
1025 setMethod("distinct",
1026           signature(x = "SparkDataFrame"),
1027           function(x) {
1028             sdf <- callJMethod(x@sdf, "distinct")
1029             dataFrame(sdf)
1030           })
1031 
1032 #' @rdname distinct
1033 #' @name unique
1034 #' @aliases unique,SparkDataFrame-method
1035 #' @note unique since 1.5.0
1036 setMethod("unique",
1037           signature(x = "SparkDataFrame"),
1038           function(x) {
1039             distinct(x)
1040           })
1041 
1042 #' Sample
1043 #'
1044 #' Return a sampled subset of this SparkDataFrame using a random seed.
1045 #' Note: this is not guaranteed to provide exactly the fraction specified
1046 #' of the total count of of the given SparkDataFrame.
1047 #'
1048 #' @param x A SparkDataFrame
1049 #' @param withReplacement Sampling with replacement or not
1050 #' @param fraction The (rough) sample target fraction
1051 #' @param seed Randomness seed value. Default is a random seed.
1052 #'
1053 #' @family SparkDataFrame functions
1054 #' @aliases sample,SparkDataFrame-method
1055 #' @rdname sample
1056 #' @name sample
1057 #' @examples
1058 #'\dontrun{
1059 #' sparkR.session()
1060 #' path <- "path/to/file.json"
1061 #' df <- read.json(path)
1062 #' collect(sample(df, fraction = 0.5))
1063 #' collect(sample(df, FALSE, 0.5))
1064 #' collect(sample(df, TRUE, 0.5, seed = 3))
1065 #'}
1066 #' @note sample since 1.4.0
1067 setMethod("sample",
1068           signature(x = "SparkDataFrame"),
1069           function(x, withReplacement = FALSE, fraction, seed) {
1070             if (!is.numeric(fraction)) {
1071               stop("fraction must be numeric; however, got ", class(fraction))
1072             }
1073             if (!is.logical(withReplacement)) {
1074               stop("withReplacement must be logical; however, got ", class(withReplacement))
1075             }
1076 
1077             if (!missing(seed)) {
1078               if (is.null(seed)) {
1079                 stop("seed must not be NULL or NA; however, got NULL")
1080               }
1081               if (is.na(seed)) {
1082                 stop("seed must not be NULL or NA; however, got NA")
1083               }
1084 
1085               # TODO : Figure out how to send integer as java.lang.Long to JVM so
1086               # we can send seed as an argument through callJMethod
1087               sdf <- handledCallJMethod(x@sdf, "sample", as.logical(withReplacement),
1088                                         as.numeric(fraction), as.integer(seed))
1089             } else {
1090               sdf <- handledCallJMethod(x@sdf, "sample",
1091                                         as.logical(withReplacement), as.numeric(fraction))
1092             }
1093             dataFrame(sdf)
1094           })
1095 
1096 #' @rdname sample
1097 #' @aliases sample_frac,SparkDataFrame-method
1098 #' @name sample_frac
1099 #' @note sample_frac since 1.4.0
1100 setMethod("sample_frac",
1101           signature(x = "SparkDataFrame"),
1102           function(x, withReplacement = FALSE, fraction, seed) {
1103             sample(x, withReplacement, fraction, seed)
1104           })
1105 
1106 #' Returns the number of rows in a SparkDataFrame
1107 #'
1108 #' @param x a SparkDataFrame.
1109 #' @family SparkDataFrame functions
1110 #' @rdname nrow
1111 #' @name nrow
1112 #' @aliases count,SparkDataFrame-method
1113 #' @examples
1114 #'\dontrun{
1115 #' sparkR.session()
1116 #' path <- "path/to/file.json"
1117 #' df <- read.json(path)
1118 #' count(df)
1119 #' }
1120 #' @note count since 1.4.0
1121 setMethod("count",
1122           signature(x = "SparkDataFrame"),
1123           function(x) {
1124             callJMethod(x@sdf, "count")
1125           })
1126 
1127 #' @name nrow
1128 #' @rdname nrow
1129 #' @aliases nrow,SparkDataFrame-method
1130 #' @note nrow since 1.5.0
1131 setMethod("nrow",
1132           signature(x = "SparkDataFrame"),
1133           function(x) {
1134             count(x)
1135           })
1136 
1137 #' Returns the number of columns in a SparkDataFrame
1138 #'
1139 #' @param x a SparkDataFrame
1140 #'
1141 #' @family SparkDataFrame functions
1142 #' @rdname ncol
1143 #' @name ncol
1144 #' @aliases ncol,SparkDataFrame-method
1145 #' @examples
1146 #'\dontrun{
1147 #' sparkR.session()
1148 #' path <- "path/to/file.json"
1149 #' df <- read.json(path)
1150 #' ncol(df)
1151 #' }
1152 #' @note ncol since 1.5.0
1153 setMethod("ncol",
1154           signature(x = "SparkDataFrame"),
1155           function(x) {
1156             length(columns(x))
1157           })
1158 
1159 #' Returns the dimensions of SparkDataFrame
1160 #'
1161 #' Returns the dimensions (number of rows and columns) of a SparkDataFrame
1162 #' @param x a SparkDataFrame
1163 #'
1164 #' @family SparkDataFrame functions
1165 #' @rdname dim
1166 #' @aliases dim,SparkDataFrame-method
1167 #' @name dim
1168 #' @examples
1169 #'\dontrun{
1170 #' sparkR.session()
1171 #' path <- "path/to/file.json"
1172 #' df <- read.json(path)
1173 #' dim(df)
1174 #' }
1175 #' @note dim since 1.5.0
1176 setMethod("dim",
1177           signature(x = "SparkDataFrame"),
1178           function(x) {
1179             c(count(x), ncol(x))
1180           })
1181 
1182 #' Collects all the elements of a SparkDataFrame and coerces them into an R data.frame.
1183 #'
1184 #' @param x a SparkDataFrame.
1185 #' @param stringsAsFactors (Optional) a logical indicating whether or not string columns
1186 #' should be converted to factors. FALSE by default.
1187 #' @param ... further arguments to be passed to or from other methods.
1188 #'
1189 #' @family SparkDataFrame functions
1190 #' @rdname collect
1191 #' @aliases collect,SparkDataFrame-method
1192 #' @name collect
1193 #' @examples
1194 #'\dontrun{
1195 #' sparkR.session()
1196 #' path <- "path/to/file.json"
1197 #' df <- read.json(path)
1198 #' collected <- collect(df)
1199 #' class(collected)
1200 #' firstName <- names(collected)[1]
1201 #' }
1202 #' @note collect since 1.4.0
1203 setMethod("collect",
1204           signature(x = "SparkDataFrame"),
1205           function(x, stringsAsFactors = FALSE) {
1206             connectionTimeout <- as.numeric(Sys.getenv("SPARKR_BACKEND_CONNECTION_TIMEOUT", "6000"))
1207             useArrow <- FALSE
1208             arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.sparkr.enabled")[[1]] == "true"
1209             if (arrowEnabled) {
1210               useArrow <- tryCatch({
1211                 checkSchemaInArrow(schema(x))
1212                 TRUE
1213               }, error = function(e) {
1214                 warning("The conversion from Spark DataFrame to R DataFrame was attempted ",
1215                         "with Arrow optimization because ",
1216                         "'spark.sql.execution.arrow.sparkr.enabled' is set to true; ",
1217                         "however, failed, attempting non-optimization. Reason: ", e)
1218                 FALSE
1219               })
1220             }
1221 
1222             dtypes <- dtypes(x)
1223             ncol <- length(dtypes)
1224             if (ncol <= 0) {
1225               # empty data.frame with 0 columns and 0 rows
1226               data.frame()
1227             } else if (useArrow) {
1228               if (requireNamespace("arrow", quietly = TRUE)) {
1229                 portAuth <- callJMethod(x@sdf, "collectAsArrowToR")
1230                 port <- portAuth[[1]]
1231                 authSecret <- portAuth[[2]]
1232                 conn <- socketConnection(
1233                   port = port, blocking = TRUE, open = "wb", timeout = connectionTimeout)
1234                 output <- tryCatch({
1235                   doServerAuth(conn, authSecret)
1236                   arrowTable <- arrow::read_arrow(readRaw(conn))
1237                   # Arrow drops `as_tibble` since 0.14.0, see ARROW-5190.
1238                   if (exists("as_tibble", envir = asNamespace("arrow"))) {
1239                     as.data.frame(arrow::as_tibble(arrowTable), stringsAsFactors = stringsAsFactors)
1240                   } else {
1241                     as.data.frame(arrowTable, stringsAsFactors = stringsAsFactors)
1242                   }
1243                 }, finally = {
1244                   close(conn)
1245                 })
1246                 return(output)
1247               } else {
1248                 stop("'arrow' package should be installed.")
1249               }
1250             } else {
1251               # listCols is a list of columns
1252               listCols <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "dfToCols", x@sdf)
1253               stopifnot(length(listCols) == ncol)
1254 
1255               # An empty data.frame with 0 columns and number of rows as collected
1256               nrow <- length(listCols[[1]])
1257               if (nrow <= 0) {
1258                 df <- data.frame()
1259               } else {
1260                 df <- data.frame(row.names = 1 : nrow)
1261               }
1262 
1263               # Append columns one by one
1264               for (colIndex in 1 : ncol) {
1265                 # Note: appending a column of list type into a data.frame so that
1266                 # data of complex type can be held. But getting a cell from a column
1267                 # of list type returns a list instead of a vector. So for columns of
1268                 # non-complex type, append them as vector.
1269                 #
1270                 # For columns of complex type, be careful to access them.
1271                 # Get a column of complex type returns a list.
1272                 # Get a cell from a column of complex type returns a list instead of a vector.
1273                 col <- listCols[[colIndex]]
1274                 if (length(col) <= 0) {
1275                   df[[colIndex]] <- col
1276                 } else {
1277                   colType <- dtypes[[colIndex]][[2]]
1278                   if (is.null(PRIMITIVE_TYPES[[colType]])) {
1279                     specialtype <- specialtypeshandle(colType)
1280                     if (!is.null(specialtype)) {
1281                       colType <- specialtype
1282                     }
1283                   }
1284 
1285                   # Note that "binary" columns behave like complex types.
1286                   if (!is.null(PRIMITIVE_TYPES[[colType]]) && colType != "binary") {
1287                     vec <- do.call(c, col)
1288                     stopifnot(class(vec) != "list")
1289                     class(vec) <- PRIMITIVE_TYPES[[colType]]
1290                     if (is.character(vec) && stringsAsFactors) {
1291                       vec <- as.factor(vec)
1292                     }
1293                     df[[colIndex]] <- vec
1294                   } else {
1295                     df[[colIndex]] <- col
1296                   }
1297                 }
1298               }
1299               names(df) <- names(x)
1300               df
1301             }
1302           })
1303 
1304 #' Limit
1305 #'
1306 #' Limit the resulting SparkDataFrame to the number of rows specified.
1307 #'
1308 #' @param x A SparkDataFrame
1309 #' @param num The number of rows to return
1310 #' @return A new SparkDataFrame containing the number of rows specified.
1311 #'
1312 #' @family SparkDataFrame functions
1313 #' @rdname limit
1314 #' @name limit
1315 #' @aliases limit,SparkDataFrame,numeric-method
1316 #' @examples
1317 #' \dontrun{
1318 #' sparkR.session()
1319 #' path <- "path/to/file.json"
1320 #' df <- read.json(path)
1321 #' limitedDF <- limit(df, 10)
1322 #' }
1323 #' @note limit since 1.4.0
1324 setMethod("limit",
1325           signature(x = "SparkDataFrame", num = "numeric"),
1326           function(x, num) {
1327             res <- callJMethod(x@sdf, "limit", as.integer(num))
1328             dataFrame(res)
1329           })
1330 
1331 #' Take the first NUM rows of a SparkDataFrame and return the results as a R data.frame
1332 #'
1333 #' @param x a SparkDataFrame.
1334 #' @param num number of rows to take.
1335 #' @family SparkDataFrame functions
1336 #' @rdname take
1337 #' @name take
1338 #' @aliases take,SparkDataFrame,numeric-method
1339 #' @examples
1340 #'\dontrun{
1341 #' sparkR.session()
1342 #' path <- "path/to/file.json"
1343 #' df <- read.json(path)
1344 #' take(df, 2)
1345 #' }
1346 #' @note take since 1.4.0
1347 setMethod("take",
1348           signature(x = "SparkDataFrame", num = "numeric"),
1349           function(x, num) {
1350             limited <- limit(x, num)
1351             collect(limited)
1352           })
1353 
1354 #' Head
1355 #'
1356 #' Return the first \code{num} rows of a SparkDataFrame as a R data.frame. If \code{num} is not
1357 #' specified, then head() returns the first 6 rows as with R data.frame.
1358 #'
1359 #' @param x a SparkDataFrame.
1360 #' @param num the number of rows to return. Default is 6.
1361 #' @return A data.frame.
1362 #'
1363 #' @family SparkDataFrame functions
1364 #' @aliases head,SparkDataFrame-method
1365 #' @rdname head
1366 #' @name head
1367 #' @examples
1368 #'\dontrun{
1369 #' sparkR.session()
1370 #' path <- "path/to/file.json"
1371 #' df <- read.json(path)
1372 #' head(df)
1373 #' }
1374 #' @note head since 1.4.0
1375 setMethod("head",
1376           signature(x = "SparkDataFrame"),
1377           function(x, num = 6L) {
1378           # Default num is 6L in keeping with R's data.frame convention
1379             take(x, num)
1380           })
1381 
1382 #' Return the first row of a SparkDataFrame
1383 #'
1384 #' @param x a SparkDataFrame or a column used in aggregation function.
1385 #' @param ... further arguments to be passed to or from other methods.
1386 #'
1387 #' @family SparkDataFrame functions
1388 #' @aliases first,SparkDataFrame-method
1389 #' @rdname first
1390 #' @name first
1391 #' @examples
1392 #'\dontrun{
1393 #' sparkR.session()
1394 #' path <- "path/to/file.json"
1395 #' df <- read.json(path)
1396 #' first(df)
1397 #' }
1398 #' @note first(SparkDataFrame) since 1.4.0
1399 setMethod("first",
1400           signature(x = "SparkDataFrame"),
1401           function(x) {
1402             take(x, 1)
1403           })
1404 
1405 #' toRDD
1406 #'
1407 #' Converts a SparkDataFrame to an RDD while preserving column names.
1408 #'
1409 #' @param x A SparkDataFrame
1410 #'
1411 #' @noRd
1412 #' @examples
1413 #'\dontrun{
1414 #' sparkR.session()
1415 #' path <- "path/to/file.json"
1416 #' df <- read.json(path)
1417 #' rdd <- toRDD(df)
1418 #'}
1419 setMethod("toRDD",
1420           signature(x = "SparkDataFrame"),
1421           function(x) {
1422             jrdd <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "dfToRowRDD", x@sdf)
1423             colNames <- callJMethod(x@sdf, "columns")
1424             rdd <- RDD(jrdd, serializedMode = "row")
1425             lapply(rdd, function(row) {
1426               names(row) <- colNames
1427               row
1428             })
1429           })
1430 
1431 #' GroupBy
1432 #'
1433 #' Groups the SparkDataFrame using the specified columns, so we can run aggregation on them.
1434 #'
1435 #' @param x a SparkDataFrame.
1436 #' @param ... character name(s) or Column(s) to group on.
1437 #' @return A GroupedData.
1438 #' @family SparkDataFrame functions
1439 #' @aliases groupBy,SparkDataFrame-method
1440 #' @rdname groupBy
1441 #' @name groupBy
1442 #' @examples
1443 #' \dontrun{
1444 #'   # Compute the average for all numeric columns grouped by department.
1445 #'   avg(groupBy(df, "department"))
1446 #'
1447 #'   # Compute the max age and average salary, grouped by department and gender.
1448 #'   agg(groupBy(df, "department", "gender"), salary="avg", "age" -> "max")
1449 #' }
1450 #' @note groupBy since 1.4.0
1451 #' @seealso \link{agg}, \link{cube}, \link{rollup}
1452 setMethod("groupBy",
1453            signature(x = "SparkDataFrame"),
1454            function(x, ...) {
1455              cols <- list(...)
1456              if (length(cols) >= 1 && class(cols[[1]]) == "character") {
1457                sgd <- callJMethod(x@sdf, "groupBy", cols[[1]], cols[-1])
1458              } else {
1459                jcol <- lapply(cols, function(c) { c@jc })
1460                sgd <- callJMethod(x@sdf, "groupBy", jcol)
1461              }
1462              groupedData(sgd)
1463            })
1464 
1465 #' @rdname groupBy
1466 #' @name group_by
1467 #' @aliases group_by,SparkDataFrame-method
1468 #' @note group_by since 1.4.0
1469 setMethod("group_by",
1470           signature(x = "SparkDataFrame"),
1471           function(x, ...) {
1472             groupBy(x, ...)
1473           })
1474 
1475 #' Summarize data across columns
1476 #'
1477 #' Compute aggregates by specifying a list of columns
1478 #'
1479 #' @family SparkDataFrame functions
1480 #' @aliases agg,SparkDataFrame-method
1481 #' @rdname summarize
1482 #' @name agg
1483 #' @note agg since 1.4.0
1484 setMethod("agg",
1485           signature(x = "SparkDataFrame"),
1486           function(x, ...) {
1487             agg(groupBy(x), ...)
1488           })
1489 
1490 #' @rdname summarize
1491 #' @name summarize
1492 #' @aliases summarize,SparkDataFrame-method
1493 #' @note summarize since 1.4.0
1494 setMethod("summarize",
1495           signature(x = "SparkDataFrame"),
1496           function(x, ...) {
1497             agg(x, ...)
1498           })
1499 
1500 dapplyInternal <- function(x, func, schema) {
1501   if (is.character(schema)) {
1502     schema <- structType(schema)
1503   }
1504 
1505   arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.sparkr.enabled")[[1]] == "true"
1506   if (arrowEnabled) {
1507     if (inherits(schema, "structType")) {
1508       checkSchemaInArrow(schema)
1509     } else if (is.null(schema)) {
1510       stop("Arrow optimization does not support 'dapplyCollect' yet. Please disable ",
1511            "Arrow optimization or use 'collect' and 'dapply' APIs instead.")
1512     } else {
1513       stop("'schema' should be DDL-formatted string or structType.")
1514     }
1515   }
1516 
1517   packageNamesArr <- serialize(.sparkREnv[[".packages"]],
1518                                connection = NULL)
1519 
1520   broadcastArr <- lapply(ls(.broadcastNames),
1521                          function(name) { get(name, .broadcastNames) })
1522 
1523   sdf <- callJStatic(
1524            "org.apache.spark.sql.api.r.SQLUtils",
1525            "dapply",
1526            x@sdf,
1527            serialize(cleanClosure(func), connection = NULL),
1528            packageNamesArr,
1529            broadcastArr,
1530            if (is.null(schema)) { schema } else { schema$jobj })
1531   dataFrame(sdf)
1532 }
1533 
1534 setClassUnion("characterOrstructType", c("character", "structType"))
1535 
1536 #' dapply
1537 #'
1538 #' Apply a function to each partition of a SparkDataFrame.
1539 #'
1540 #' @param x A SparkDataFrame
1541 #' @param func A function to be applied to each partition of the SparkDataFrame.
1542 #'             func should have only one parameter, to which a R data.frame corresponds
1543 #'             to each partition will be passed.
1544 #'             The output of func should be a R data.frame.
1545 #' @param schema The schema of the resulting SparkDataFrame after the function is applied.
1546 #'               It must match the output of func. Since Spark 2.3, the DDL-formatted string
1547 #'               is also supported for the schema.
1548 #' @family SparkDataFrame functions
1549 #' @rdname dapply
1550 #' @aliases dapply,SparkDataFrame,function,characterOrstructType-method
1551 #' @name dapply
1552 #' @seealso \link{dapplyCollect}
1553 #' @examples
1554 #' \dontrun{
1555 #'   df <- createDataFrame(iris)
1556 #'   df1 <- dapply(df, function(x) { x }, schema(df))
1557 #'   collect(df1)
1558 #'
1559 #'   # filter and add a column
1560 #'   df <- createDataFrame(
1561 #'           list(list(1L, 1, "1"), list(2L, 2, "2"), list(3L, 3, "3")),
1562 #'           c("a", "b", "c"))
1563 #'   schema <- structType(structField("a", "integer"), structField("b", "double"),
1564 #'                      structField("c", "string"), structField("d", "integer"))
1565 #'   df1 <- dapply(
1566 #'            df,
1567 #'            function(x) {
1568 #'              y <- x[x[1] > 1, ]
1569 #'              y <- cbind(y, y[1] + 1L)
1570 #'            },
1571 #'            schema)
1572 #'
1573 #'   # The schema also can be specified in a DDL-formatted string.
1574 #'   schema <- "a INT, d DOUBLE, c STRING, d INT"
1575 #'   df1 <- dapply(
1576 #'            df,
1577 #'            function(x) {
1578 #'              y <- x[x[1] > 1, ]
1579 #'              y <- cbind(y, y[1] + 1L)
1580 #'            },
1581 #'            schema)
1582 #'
1583 #'   collect(df1)
1584 #'   # the result
1585 #'   #       a b c d
1586 #'   #     1 2 2 2 3
1587 #'   #     2 3 3 3 4
1588 #' }
1589 #' @note dapply since 2.0.0
1590 setMethod("dapply",
1591           signature(x = "SparkDataFrame", func = "function", schema = "characterOrstructType"),
1592           function(x, func, schema) {
1593             dapplyInternal(x, func, schema)
1594           })
1595 
1596 #' dapplyCollect
1597 #'
1598 #' Apply a function to each partition of a SparkDataFrame and collect the result back
1599 #' to R as a data.frame.
1600 #'
1601 #' @param x A SparkDataFrame
1602 #' @param func A function to be applied to each partition of the SparkDataFrame.
1603 #'             func should have only one parameter, to which a R data.frame corresponds
1604 #'             to each partition will be passed.
1605 #'             The output of func should be a R data.frame.
1606 #' @family SparkDataFrame functions
1607 #' @rdname dapplyCollect
1608 #' @aliases dapplyCollect,SparkDataFrame,function-method
1609 #' @name dapplyCollect
1610 #' @seealso \link{dapply}
1611 #' @examples
1612 #' \dontrun{
1613 #'   df <- createDataFrame(iris)
1614 #'   ldf <- dapplyCollect(df, function(x) { x })
1615 #'
1616 #'   # filter and add a column
1617 #'   df <- createDataFrame(
1618 #'           list(list(1L, 1, "1"), list(2L, 2, "2"), list(3L, 3, "3")),
1619 #'           c("a", "b", "c"))
1620 #'   ldf <- dapplyCollect(
1621 #'            df,
1622 #'            function(x) {
1623 #'              y <- x[x[1] > 1, ]
1624 #'              y <- cbind(y, y[1] + 1L)
1625 #'            })
1626 #'   # the result
1627 #'   #       a b c d
1628 #'   #       2 2 2 3
1629 #'   #       3 3 3 4
1630 #' }
1631 #' @note dapplyCollect since 2.0.0
1632 setMethod("dapplyCollect",
1633           signature(x = "SparkDataFrame", func = "function"),
1634           function(x, func) {
1635             df <- dapplyInternal(x, func, NULL)
1636 
1637             content <- callJMethod(df@sdf, "collect")
1638             # content is a list of items of struct type. Each item has a single field
1639             # which is a serialized data.frame corresponds to one partition of the
1640             # SparkDataFrame.
1641             ldfs <- lapply(content, function(x) { unserialize(x[[1]]) })
1642             ldf <- do.call(rbind, ldfs)
1643             row.names(ldf) <- NULL
1644             ldf
1645           })
1646 
1647 #' gapply
1648 #'
1649 #' Groups the SparkDataFrame using the specified columns and applies the R function to each
1650 #' group.
1651 #'
1652 #' @param cols grouping columns.
1653 #' @param func a function to be applied to each group partition specified by grouping
1654 #'             column of the SparkDataFrame. See Details.
1655 #' @param schema the schema of the resulting SparkDataFrame after the function is applied.
1656 #'               The schema must match to output of \code{func}. It has to be defined for each
1657 #'               output column with preferred output column name and corresponding data type.
1658 #'               Since Spark 2.3, the DDL-formatted string is also supported for the schema.
1659 #' @return A SparkDataFrame.
1660 #' @family SparkDataFrame functions
1661 #' @aliases gapply,SparkDataFrame-method
1662 #' @rdname gapply
1663 #' @name gapply
1664 #' @details
1665 #' \code{func} is a function of two arguments. The first, usually named \code{key}
1666 #' (though this is not enforced) corresponds to the grouping key, will be an
1667 #' unnamed \code{list} of \code{length(cols)} length-one objects corresponding
1668 #' to the grouping columns' values for the current group.
1669 #'
1670 #' The second, herein \code{x}, will be a local \code{\link{data.frame}} with the
1671 #' columns of the input not in \code{cols} for the rows corresponding to \code{key}.
1672 #'
1673 #' The output of \code{func} must be a \code{data.frame} matching \code{schema} --
1674 #' in particular this means the names of the output \code{data.frame} are irrelevant
1675 #'
1676 #' @seealso \link{gapplyCollect}
1677 #' @examples
1678 #'
1679 #' \dontrun{
1680 #' # Computes the arithmetic mean of the second column by grouping
1681 #' # on the first and third columns. Output the grouping values and the average.
1682 #'
1683 #' df <- createDataFrame (
1684 #' list(list(1L, 1, "1", 0.1), list(1L, 2, "1", 0.2), list(3L, 3, "3", 0.3)),
1685 #'   c("a", "b", "c", "d"))
1686 #'
1687 #' # Here our output contains three columns, the key which is a combination of two
1688 #' # columns with data types integer and string and the mean which is a double.
1689 #' schema <- structType(structField("a", "integer"), structField("c", "string"),
1690 #'   structField("avg", "double"))
1691 #' result <- gapply(
1692 #'   df,
1693 #'   c("a", "c"),
1694 #'   function(key, x) {
1695 #'     # key will either be list(1L, '1') (for the group where a=1L,c='1') or
1696 #'     #   list(3L, '3') (for the group where a=3L,c='3')
1697 #'     y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE)
1698 #' }, schema)
1699 #'
1700 #' # The schema also can be specified in a DDL-formatted string.
1701 #' schema <- "a INT, c STRING, avg DOUBLE"
1702 #' result <- gapply(
1703 #'   df,
1704 #'   c("a", "c"),
1705 #'   function(key, x) {
1706 #'     y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE)
1707 #' }, schema)
1708 #'
1709 #' # We can also group the data and afterwards call gapply on GroupedData.
1710 #' # For example:
1711 #' gdf <- group_by(df, "a", "c")
1712 #' result <- gapply(
1713 #'   gdf,
1714 #'   function(key, x) {
1715 #'     y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE)
1716 #' }, schema)
1717 #' collect(result)
1718 #'
1719 #' # Result
1720 #' # ------
1721 #' # a c avg
1722 #' # 3 3 3.0
1723 #' # 1 1 1.5
1724 #'
1725 #' # Fits linear models on iris dataset by grouping on the 'Species' column and
1726 #' # using 'Sepal_Length' as a target variable, 'Sepal_Width', 'Petal_Length'
1727 #' # and 'Petal_Width' as training features.
1728 #'
1729 #' df <- createDataFrame (iris)
1730 #' schema <- structType(structField("(Intercept)", "double"),
1731 #'   structField("Sepal_Width", "double"),structField("Petal_Length", "double"),
1732 #'   structField("Petal_Width", "double"))
1733 #' df1 <- gapply(
1734 #'   df,
1735 #'   df$"Species",
1736 #'   function(key, x) {
1737 #'     m <- suppressWarnings(lm(Sepal_Length ~
1738 #'     Sepal_Width + Petal_Length + Petal_Width, x))
1739 #'     data.frame(t(coef(m)))
1740 #'   }, schema)
1741 #' collect(df1)
1742 #'
1743 #' # Result
1744 #' # ---------
1745 #' # Model  (Intercept)  Sepal_Width  Petal_Length  Petal_Width
1746 #' # 1        0.699883    0.3303370    0.9455356    -0.1697527
1747 #' # 2        1.895540    0.3868576    0.9083370    -0.6792238
1748 #' # 3        2.351890    0.6548350    0.2375602     0.2521257
1749 #'
1750 #'}
1751 #' @note gapply(SparkDataFrame) since 2.0.0
1752 setMethod("gapply",
1753           signature(x = "SparkDataFrame"),
1754           function(x, cols, func, schema) {
1755             grouped <- do.call("groupBy", c(x, cols))
1756             gapply(grouped, func, schema)
1757           })
1758 
1759 #' gapplyCollect
1760 #'
1761 #' Groups the SparkDataFrame using the specified columns, applies the R function to each
1762 #' group and collects the result back to R as data.frame.
1763 #'
1764 #' @param cols grouping columns.
1765 #' @param func a function to be applied to each group partition specified by grouping
1766 #'             column of the SparkDataFrame. See Details.
1767 #' @return A data.frame.
1768 #' @family SparkDataFrame functions
1769 #' @aliases gapplyCollect,SparkDataFrame-method
1770 #' @rdname gapplyCollect
1771 #' @name gapplyCollect
1772 #' @details
1773 #' \code{func} is a function of two arguments. The first, usually named \code{key}
1774 #' (though this is not enforced) corresponds to the grouping key, will be an
1775 #' unnamed \code{list} of \code{length(cols)} length-one objects corresponding
1776 #' to the grouping columns' values for the current group.
1777 #'
1778 #' The second, herein \code{x}, will be a local \code{\link{data.frame}} with the
1779 #' columns of the input not in \code{cols} for the rows corresponding to \code{key}.
1780 #'
1781 #' The output of \code{func} must be a \code{data.frame} matching \code{schema} --
1782 #' in particular this means the names of the output \code{data.frame} are irrelevant
1783 #'
1784 #' @seealso \link{gapply}
1785 #' @examples
1786 #'
1787 #' \dontrun{
1788 #' # Computes the arithmetic mean of the second column by grouping
1789 #' # on the first and third columns. Output the grouping values and the average.
1790 #'
1791 #' df <- createDataFrame (
1792 #' list(list(1L, 1, "1", 0.1), list(1L, 2, "1", 0.2), list(3L, 3, "3", 0.3)),
1793 #'   c("a", "b", "c", "d"))
1794 #'
1795 #' result <- gapplyCollect(
1796 #'   df,
1797 #'   c("a", "c"),
1798 #'   function(key, x) {
1799 #'     y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE)
1800 #'     colnames(y) <- c("key_a", "key_c", "mean_b")
1801 #'     y
1802 #'   })
1803 #'
1804 #' # We can also group the data and afterwards call gapply on GroupedData.
1805 #' # For example:
1806 #' gdf <- group_by(df, "a", "c")
1807 #' result <- gapplyCollect(
1808 #'   gdf,
1809 #'   function(key, x) {
1810 #'     y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE)
1811 #'     colnames(y) <- c("key_a", "key_c", "mean_b")
1812 #'     y
1813 #'   })
1814 #'
1815 #' # Result
1816 #' # ------
1817 #' # key_a key_c mean_b
1818 #' # 3 3 3.0
1819 #' # 1 1 1.5
1820 #'
1821 #' # Fits linear models on iris dataset by grouping on the 'Species' column and
1822 #' # using 'Sepal_Length' as a target variable, 'Sepal_Width', 'Petal_Length'
1823 #' # and 'Petal_Width' as training features.
1824 #'
1825 #' df <- createDataFrame (iris)
1826 #' result <- gapplyCollect(
1827 #'   df,
1828 #'   df$"Species",
1829 #'   function(key, x) {
1830 #'     m <- suppressWarnings(lm(Sepal_Length ~
1831 #'     Sepal_Width + Petal_Length + Petal_Width, x))
1832 #'     data.frame(t(coef(m)))
1833 #'   })
1834 #'
1835 #' # Result
1836 #' # ---------
1837 #' # Model  X.Intercept.  Sepal_Width  Petal_Length  Petal_Width
1838 #' # 1        0.699883    0.3303370    0.9455356    -0.1697527
1839 #' # 2        1.895540    0.3868576    0.9083370    -0.6792238
1840 #' # 3        2.351890    0.6548350    0.2375602     0.2521257
1841 #'
1842 #'}
1843 #' @note gapplyCollect(SparkDataFrame) since 2.0.0
1844 setMethod("gapplyCollect",
1845           signature(x = "SparkDataFrame"),
1846           function(x, cols, func) {
1847             grouped <- do.call("groupBy", c(x, cols))
1848             gapplyCollect(grouped, func)
1849           })
1850 
1851 ############################## RDD Map Functions ##################################
1852 # All of the following functions mirror the existing RDD map functions,           #
1853 # but allow for use with DataFrames by first converting to an RRDD before calling #
1854 # the requested map function.                                                     #
1855 ###################################################################################
1856 
1857 #' @rdname lapply
1858 #' @noRd
1859 setMethod("lapply",
1860           signature(X = "SparkDataFrame", FUN = "function"),
1861           function(X, FUN) {
1862             rdd <- toRDD(X)
1863             lapply(rdd, FUN)
1864           })
1865 
1866 #' @rdname lapply
1867 #' @noRd
1868 setMethod("map",
1869           signature(X = "SparkDataFrame", FUN = "function"),
1870           function(X, FUN) {
1871             lapply(X, FUN)
1872           })
1873 
1874 #' @rdname flatMap
1875 #' @noRd
1876 setMethod("flatMap",
1877           signature(X = "SparkDataFrame", FUN = "function"),
1878           function(X, FUN) {
1879             rdd <- toRDD(X)
1880             flatMap(rdd, FUN)
1881           })
1882 
1883 #' @rdname lapplyPartition
1884 #' @noRd
1885 setMethod("lapplyPartition",
1886           signature(X = "SparkDataFrame", FUN = "function"),
1887           function(X, FUN) {
1888             rdd <- toRDD(X)
1889             lapplyPartition(rdd, FUN)
1890           })
1891 
1892 #' @rdname lapplyPartition
1893 #' @noRd
1894 setMethod("mapPartitions",
1895           signature(X = "SparkDataFrame", FUN = "function"),
1896           function(X, FUN) {
1897             lapplyPartition(X, FUN)
1898           })
1899 
1900 #' @rdname foreach
1901 #' @noRd
1902 setMethod("foreach",
1903           signature(x = "SparkDataFrame", func = "function"),
1904           function(x, func) {
1905             rdd <- toRDD(x)
1906             foreach(rdd, func)
1907           })
1908 
1909 #' @rdname foreach
1910 #' @noRd
1911 setMethod("foreachPartition",
1912           signature(x = "SparkDataFrame", func = "function"),
1913           function(x, func) {
1914             rdd <- toRDD(x)
1915             foreachPartition(rdd, func)
1916           })
1917 
1918 
1919 ############################## SELECT ##################################
1920 
1921 getColumn <- function(x, c) {
1922   column(callJMethod(x@sdf, "col", c))
1923 }
1924 
1925 setColumn <- function(x, c, value) {
1926   if (class(value) != "Column" && !is.null(value)) {
1927     if (isAtomicLengthOne(value)) {
1928       value <- lit(value)
1929     } else {
1930       stop("value must be a Column, literal value as atomic in length of 1, or NULL")
1931     }
1932   }
1933 
1934   if (is.null(value)) {
1935     nx <- drop(x, c)
1936   } else {
1937     nx <- withColumn(x, c, value)
1938   }
1939   nx
1940 }
1941 
1942 #' @param name name of a Column (without being wrapped by \code{""}).
1943 #' @rdname select
1944 #' @name $
1945 #' @aliases $,SparkDataFrame-method
1946 #' @note $ since 1.4.0
1947 setMethod("$", signature(x = "SparkDataFrame"),
1948           function(x, name) {
1949             getColumn(x, name)
1950           })
1951 
1952 #' @param value a Column or an atomic vector in the length of 1 as literal value, or \code{NULL}.
1953 #'              If \code{NULL}, the specified Column is dropped.
1954 #' @rdname select
1955 #' @name $<-
1956 #' @aliases $<-,SparkDataFrame-method
1957 #' @note $<- since 1.4.0
1958 setMethod("$<-", signature(x = "SparkDataFrame"),
1959           function(x, name, value) {
1960             nx <- setColumn(x, name, value)
1961             x@sdf <- nx@sdf
1962             x
1963           })
1964 
1965 setClassUnion("numericOrcharacter", c("numeric", "character"))
1966 
1967 #' @rdname subset
1968 #' @name [[
1969 #' @aliases [[,SparkDataFrame,numericOrcharacter-method
1970 #' @note [[ since 1.4.0
1971 setMethod("[[", signature(x = "SparkDataFrame", i = "numericOrcharacter"),
1972           function(x, i) {
1973             if (length(i) > 1) {
1974               warning("Subset index has length > 1. Only the first index is used.")
1975               i <- i[1]
1976             }
1977             if (is.numeric(i)) {
1978               cols <- columns(x)
1979               i <- cols[[i]]
1980             }
1981             getColumn(x, i)
1982           })
1983 
1984 #' @rdname subset
1985 #' @name [[<-
1986 #' @aliases [[<-,SparkDataFrame,numericOrcharacter-method
1987 #' @note [[<- since 2.1.1
1988 setMethod("[[<-", signature(x = "SparkDataFrame", i = "numericOrcharacter"),
1989           function(x, i, value) {
1990             if (length(i) > 1) {
1991               warning("Subset index has length > 1. Only the first index is used.")
1992               i <- i[1]
1993             }
1994             if (is.numeric(i)) {
1995               cols <- columns(x)
1996               i <- cols[[i]]
1997             }
1998             nx <- setColumn(x, i, value)
1999             x@sdf <- nx@sdf
2000             x
2001           })
2002 
2003 #' @rdname subset
2004 #' @name [
2005 #' @aliases [,SparkDataFrame-method
2006 #' @note [ since 1.4.0
2007 setMethod("[", signature(x = "SparkDataFrame"),
2008           function(x, i, j, ..., drop = F) {
2009             # Perform filtering first if needed
2010             filtered <- if (missing(i)) {
2011               x
2012             } else {
2013               if (class(i) != "Column") {
2014                 stop("Expressions other than filtering predicates are not supported ",
2015                       "in the first parameter of extract operator [ or subset() method.")
2016               }
2017               filter(x, i)
2018             }
2019 
2020             # If something is to be projected, then do so on the filtered SparkDataFrame
2021             if (missing(j)) {
2022               filtered
2023             } else {
2024               if (is.numeric(j)) {
2025                 cols <- columns(filtered)
2026                 j <- cols[j]
2027               }
2028               if (length(j) > 1) {
2029                 j <- as.list(j)
2030               }
2031               selected <- select(filtered, j)
2032 
2033               # Acknowledge parameter drop. Return a Column or SparkDataFrame accordingly
2034               if (ncol(selected) == 1 & drop == T) {
2035                 getColumn(selected, names(selected))
2036               } else {
2037                 selected
2038               }
2039             }
2040           })
2041 
2042 #' Subset
2043 #'
2044 #' Return subsets of SparkDataFrame according to given conditions
2045 #' @param x a SparkDataFrame.
2046 #' @param i,subset (Optional) a logical expression to filter on rows.
2047 #'                 For extract operator [[ and replacement operator [[<-, the indexing parameter for
2048 #'                 a single Column.
2049 #' @param j,select expression for the single Column or a list of columns to select from the
2050 #'                 SparkDataFrame.
2051 #' @param drop if TRUE, a Column will be returned if the resulting dataset has only one column.
2052 #'             Otherwise, a SparkDataFrame will always be returned.
2053 #' @param value a Column or an atomic vector in the length of 1 as literal value, or \code{NULL}.
2054 #'              If \code{NULL}, the specified Column is dropped.
2055 #' @param ... currently not used.
2056 #' @return A new SparkDataFrame containing only the rows that meet the condition with selected
2057 #'         columns.
2058 #' @family SparkDataFrame functions
2059 #' @aliases subset,SparkDataFrame-method
2060 #' @seealso \link{withColumn}
2061 #' @rdname subset
2062 #' @name subset
2063 #' @family subsetting functions
2064 #' @examples
2065 #' \dontrun{
2066 #'   # Columns can be selected using [[ and [
2067 #'   df[[2]] == df[["age"]]
2068 #'   df[,2] == df[,"age"]
2069 #'   df[,c("name", "age")]
2070 #'   # Or to filter rows
2071 #'   df[df$age > 20,]
2072 #'   # SparkDataFrame can be subset on both rows and Columns
2073 #'   df[df$name == "Smith", c(1,2)]
2074 #'   df[df$age %in% c(19, 30), 1:2]
2075 #'   subset(df, df$age %in% c(19, 30), 1:2)
2076 #'   subset(df, df$age %in% c(19), select = c(1,2))
2077 #'   subset(df, select = c(1,2))
2078 #'   # Columns can be selected and set
2079 #'   df[["age"]] <- 23
2080 #'   df[[1]] <- df$age
2081 #'   df[[2]] <- NULL # drop column
2082 #' }
2083 #' @note subset since 1.5.0
2084 setMethod("subset", signature(x = "SparkDataFrame"),
2085           function(x, subset, select, drop = F, ...) {
2086             if (missing(subset)) {
2087                 x[, select, drop = drop, ...]
2088             } else {
2089                 x[subset, select, drop = drop, ...]
2090             }
2091           })
2092 
2093 #' Select
2094 #'
2095 #' Selects a set of columns with names or Column expressions.
2096 #' @param x a SparkDataFrame.
2097 #' @param col a list of columns or single Column or name.
2098 #' @param ... additional column(s) if only one column is specified in \code{col}.
2099 #'            If more than one column is assigned in \code{col}, \code{...}
2100 #'            should be left empty.
2101 #' @return A new SparkDataFrame with selected columns.
2102 #' @family SparkDataFrame functions
2103 #' @rdname select
2104 #' @aliases select,SparkDataFrame,character-method
2105 #' @name select
2106 #' @family subsetting functions
2107 #' @examples
2108 #' \dontrun{
2109 #'   select(df, "*")
2110 #'   select(df, "col1", "col2")
2111 #'   select(df, df$name, df$age + 1)
2112 #'   select(df, c("col1", "col2"))
2113 #'   select(df, list(df$name, df$age + 1))
2114 #'   # Similar to R data frames columns can also be selected using $
2115 #'   df[,df$age]
2116 #' }
2117 #' @note select(SparkDataFrame, character) since 1.4.0
2118 setMethod("select", signature(x = "SparkDataFrame", col = "character"),
2119           function(x, col, ...) {
2120             if (length(col) > 1) {
2121               if (length(list(...)) > 0) {
2122                 stop("To select multiple columns, use a character vector or list for col")
2123               }
2124 
2125               select(x, as.list(col))
2126             } else {
2127               sdf <- callJMethod(x@sdf, "select", col, list(...))
2128               dataFrame(sdf)
2129             }
2130           })
2131 
2132 #' @rdname select
2133 #' @aliases select,SparkDataFrame,Column-method
2134 #' @note select(SparkDataFrame, Column) since 1.4.0
2135 setMethod("select", signature(x = "SparkDataFrame", col = "Column"),
2136           function(x, col, ...) {
2137             jcols <- lapply(list(col, ...), function(c) {
2138               c@jc
2139             })
2140             sdf <- callJMethod(x@sdf, "select", jcols)
2141             dataFrame(sdf)
2142           })
2143 
2144 #' @rdname select
2145 #' @aliases select,SparkDataFrame,list-method
2146 #' @note select(SparkDataFrame, list) since 1.4.0
2147 setMethod("select",
2148           signature(x = "SparkDataFrame", col = "list"),
2149           function(x, col) {
2150             cols <- lapply(col, function(c) {
2151               if (class(c) == "Column") {
2152                 c@jc
2153               } else {
2154                 col(c)@jc
2155               }
2156             })
2157             sdf <- callJMethod(x@sdf, "select", cols)
2158             dataFrame(sdf)
2159           })
2160 
2161 #' SelectExpr
2162 #'
2163 #' Select from a SparkDataFrame using a set of SQL expressions.
2164 #'
2165 #' @param x A SparkDataFrame to be selected from.
2166 #' @param expr A string containing a SQL expression
2167 #' @param ... Additional expressions
2168 #' @return A SparkDataFrame
2169 #' @family SparkDataFrame functions
2170 #' @aliases selectExpr,SparkDataFrame,character-method
2171 #' @rdname selectExpr
2172 #' @name selectExpr
2173 #' @examples
2174 #'\dontrun{
2175 #' sparkR.session()
2176 #' path <- "path/to/file.json"
2177 #' df <- read.json(path)
2178 #' selectExpr(df, "col1", "(col2 * 5) as newCol")
2179 #' }
2180 #' @note selectExpr since 1.4.0
2181 setMethod("selectExpr",
2182           signature(x = "SparkDataFrame", expr = "character"),
2183           function(x, expr, ...) {
2184             exprList <- list(expr, ...)
2185             sdf <- callJMethod(x@sdf, "selectExpr", exprList)
2186             dataFrame(sdf)
2187           })
2188 
2189 #' WithColumn
2190 #'
2191 #' Return a new SparkDataFrame by adding a column or replacing the existing column
2192 #' that has the same name.
2193 #'
2194 #' Note: This method introduces a projection internally. Therefore, calling it multiple times,
2195 #' for instance, via loops in order to add multiple columns can generate big plans which
2196 #' can cause performance issues and even \code{StackOverflowException}. To avoid this,
2197 #' use \code{select} with the multiple columns at once.
2198 #'
2199 #' @param x a SparkDataFrame.
2200 #' @param colName a column name.
2201 #' @param col a Column expression (which must refer only to this SparkDataFrame), or an atomic
2202 #' vector in the length of 1 as literal value.
2203 #' @return A SparkDataFrame with the new column added or the existing column replaced.
2204 #' @family SparkDataFrame functions
2205 #' @aliases withColumn,SparkDataFrame,character-method
2206 #' @rdname withColumn
2207 #' @name withColumn
2208 #' @seealso \link{rename} \link{mutate} \link{subset}
2209 #' @examples
2210 #'\dontrun{
2211 #' sparkR.session()
2212 #' path <- "path/to/file.json"
2213 #' df <- read.json(path)
2214 #' newDF <- withColumn(df, "newCol", df$col1 * 5)
2215 #' # Replace an existing column
2216 #' newDF2 <- withColumn(newDF, "newCol", newDF$col1)
2217 #' newDF3 <- withColumn(newDF, "newCol", 42)
2218 #' # Use extract operator to set an existing or new column
2219 #' df[["age"]] <- 23
2220 #' df[[2]] <- df$col1
2221 #' df[[2]] <- NULL # drop column
2222 #' }
2223 #' @note withColumn since 1.4.0
2224 setMethod("withColumn",
2225           signature(x = "SparkDataFrame", colName = "character"),
2226           function(x, colName, col) {
2227             if (class(col) != "Column") {
2228               if (!isAtomicLengthOne(col)) stop("Literal value must be atomic in length of 1")
2229               col <- lit(col)
2230             }
2231             sdf <- callJMethod(x@sdf, "withColumn", colName, col@jc)
2232             dataFrame(sdf)
2233           })
2234 
2235 #' Mutate
2236 #'
2237 #' Return a new SparkDataFrame with the specified columns added or replaced.
2238 #'
2239 #' @param .data a SparkDataFrame.
2240 #' @param ... additional column argument(s) each in the form name = col.
2241 #' @return A new SparkDataFrame with the new columns added or replaced.
2242 #' @family SparkDataFrame functions
2243 #' @aliases mutate,SparkDataFrame-method
2244 #' @rdname mutate
2245 #' @name mutate
2246 #' @seealso \link{rename} \link{withColumn}
2247 #' @examples
2248 #'\dontrun{
2249 #' sparkR.session()
2250 #' path <- "path/to/file.json"
2251 #' df <- read.json(path)
2252 #' newDF <- mutate(df, newCol = df$col1 * 5, newCol2 = df$col1 * 2)
2253 #' names(newDF) # Will contain newCol, newCol2
2254 #' newDF2 <- transform(df, newCol = df$col1 / 5, newCol2 = df$col1 * 2)
2255 #'
2256 #' df <- createDataFrame(list(list("Andy", 30L), list("Justin", 19L)), c("name", "age"))
2257 #' # Replace the "age" column
2258 #' df1 <- mutate(df, age = df$age + 1L)
2259 #' }
2260 #' @note mutate since 1.4.0
2261 setMethod("mutate",
2262           signature(.data = "SparkDataFrame"),
2263           function(.data, ...) {
2264             x <- .data
2265             cols <- list(...)
2266             if (length(cols) <= 0) {
2267               return(x)
2268             }
2269 
2270             lapply(cols, function(col) {
2271               stopifnot(class(col) == "Column")
2272             })
2273 
2274             # Check if there is any duplicated column name in the DataFrame
2275             dfCols <- columns(x)
2276             if (length(unique(dfCols)) != length(dfCols)) {
2277               stop("Error: found duplicated column name in the DataFrame")
2278             }
2279 
2280             # TODO: simplify the implementation of this method after SPARK-12225 is resolved.
2281 
2282             # For named arguments, use the names for arguments as the column names
2283             # For unnamed arguments, use the argument symbols as the column names
2284             args <- sapply(substitute(list(...))[-1], deparse)
2285             ns <- names(cols)
2286             if (!is.null(ns)) {
2287               lapply(seq_along(args), function(i) {
2288                 if (ns[[i]] != "") {
2289                   args[[i]] <<- ns[[i]]
2290                 }
2291               })
2292             }
2293             ns <- args
2294 
2295             # The last column of the same name in the specific columns takes effect
2296             deDupCols <- list()
2297             for (i in seq_len(length(cols))) {
2298               deDupCols[[ns[[i]]]] <- alias(cols[[i]], ns[[i]])
2299             }
2300 
2301             # Construct the column list for projection
2302             colList <- lapply(dfCols, function(col) {
2303               if (!is.null(deDupCols[[col]])) {
2304                 # Replace existing column
2305                 tmpCol <- deDupCols[[col]]
2306                 deDupCols[[col]] <<- NULL
2307                 tmpCol
2308               } else {
2309                 col(col)
2310               }
2311             })
2312 
2313             do.call(select, c(x, colList, deDupCols))
2314           })
2315 
2316 #' @param _data a SparkDataFrame.
2317 #' @rdname mutate
2318 #' @aliases transform,SparkDataFrame-method
2319 #' @name transform
2320 #' @note transform since 1.5.0
2321 setMethod("transform",
2322           signature(`_data` = "SparkDataFrame"),
2323           function(`_data`, ...) {
2324             mutate(`_data`, ...)
2325           })
2326 
2327 #' rename
2328 #'
2329 #' Rename an existing column in a SparkDataFrame.
2330 #'
2331 #' @param x A SparkDataFrame
2332 #' @param existingCol The name of the column you want to change.
2333 #' @param newCol The new column name.
2334 #' @return A SparkDataFrame with the column name changed.
2335 #' @family SparkDataFrame functions
2336 #' @rdname rename
2337 #' @name withColumnRenamed
2338 #' @aliases withColumnRenamed,SparkDataFrame,character,character-method
2339 #' @seealso \link{mutate}
2340 #' @examples
2341 #'\dontrun{
2342 #' sparkR.session()
2343 #' path <- "path/to/file.json"
2344 #' df <- read.json(path)
2345 #' newDF <- withColumnRenamed(df, "col1", "newCol1")
2346 #' }
2347 #' @note withColumnRenamed since 1.4.0
2348 setMethod("withColumnRenamed",
2349           signature(x = "SparkDataFrame", existingCol = "character", newCol = "character"),
2350           function(x, existingCol, newCol) {
2351             cols <- lapply(columns(x), function(c) {
2352               if (c == existingCol) {
2353                 alias(col(c), newCol)
2354               } else {
2355                 col(c)
2356               }
2357             })
2358             select(x, cols)
2359           })
2360 
2361 #' @param ... A named pair of the form new_column_name = existing_column
2362 #' @rdname rename
2363 #' @name rename
2364 #' @aliases rename,SparkDataFrame-method
2365 #' @examples
2366 #'\dontrun{
2367 #' sparkR.session()
2368 #' path <- "path/to/file.json"
2369 #' df <- read.json(path)
2370 #' newDF <- rename(df, col1 = df$newCol1)
2371 #' }
2372 #' @note rename since 1.4.0
2373 setMethod("rename",
2374           signature(x = "SparkDataFrame"),
2375           function(x, ...) {
2376             renameCols <- list(...)
2377             stopifnot(length(renameCols) > 0)
2378             stopifnot(class(renameCols[[1]]) == "Column")
2379             newNames <- names(renameCols)
2380             oldNames <- lapply(renameCols, function(col) {
2381               callJMethod(col@jc, "toString")
2382             })
2383             cols <- lapply(columns(x), function(c) {
2384               if (c %in% oldNames) {
2385                 alias(col(c), newNames[[match(c, oldNames)]])
2386               } else {
2387                 col(c)
2388               }
2389             })
2390             select(x, cols)
2391           })
2392 
2393 setClassUnion("characterOrColumn", c("character", "Column"))
2394 
2395 setClassUnion("numericOrColumn", c("numeric", "Column"))
2396 
2397 #' Arrange Rows by Variables
2398 #'
2399 #' Sort a SparkDataFrame by the specified column(s).
2400 #'
2401 #' @param x a SparkDataFrame to be sorted.
2402 #' @param col a character or Column object indicating the fields to sort on
2403 #' @param ... additional sorting fields
2404 #' @param decreasing a logical argument indicating sorting order for columns when
2405 #'                   a character vector is specified for col
2406 #' @param withinPartitions a logical argument indicating whether to sort only within each partition
2407 #' @return A SparkDataFrame where all elements are sorted.
2408 #' @family SparkDataFrame functions
2409 #' @aliases arrange,SparkDataFrame,Column-method
2410 #' @rdname arrange
2411 #' @name arrange
2412 #' @examples
2413 #'\dontrun{
2414 #' sparkR.session()
2415 #' path <- "path/to/file.json"
2416 #' df <- read.json(path)
2417 #' arrange(df, df$col1)
2418 #' arrange(df, asc(df$col1), desc(abs(df$col2)))
2419 #' arrange(df, "col1", decreasing = TRUE)
2420 #' arrange(df, "col1", "col2", decreasing = c(TRUE, FALSE))
2421 #' arrange(df, "col1", "col2", withinPartitions = TRUE)
2422 #' }
2423 #' @note arrange(SparkDataFrame, Column) since 1.4.0
2424 setMethod("arrange",
2425           signature(x = "SparkDataFrame", col = "Column"),
2426           function(x, col, ..., withinPartitions = FALSE) {
2427               jcols <- lapply(list(col, ...), function(c) {
2428                 c@jc
2429               })
2430 
2431             if (withinPartitions) {
2432               sdf <- callJMethod(x@sdf, "sortWithinPartitions", jcols)
2433             } else {
2434               sdf <- callJMethod(x@sdf, "sort", jcols)
2435             }
2436             dataFrame(sdf)
2437           })
2438 
2439 #' @rdname arrange
2440 #' @name arrange
2441 #' @aliases arrange,SparkDataFrame,character-method
2442 #' @note arrange(SparkDataFrame, character) since 1.4.0
2443 setMethod("arrange",
2444           signature(x = "SparkDataFrame", col = "character"),
2445           function(x, col, ..., decreasing = FALSE, withinPartitions = FALSE) {
2446 
2447             # all sorting columns
2448             by <- list(col, ...)
2449 
2450             if (length(decreasing) == 1) {
2451               # in case only 1 boolean argument - decreasing value is specified,
2452               # it will be used for all columns
2453               decreasing <- rep(decreasing, length(by))
2454             } else if (length(decreasing) != length(by)) {
2455               stop("Arguments 'col' and 'decreasing' must have the same length")
2456             }
2457 
2458             # builds a list of columns of type Column
2459             # example: [[1]] Column Species ASC
2460             #          [[2]] Column Petal_Length DESC
2461             jcols <- lapply(seq_len(length(decreasing)), function(i) {
2462               if (decreasing[[i]]) {
2463                 desc(getColumn(x, by[[i]]))
2464               } else {
2465                 asc(getColumn(x, by[[i]]))
2466               }
2467             })
2468 
2469             do.call("arrange", c(x, jcols, withinPartitions = withinPartitions))
2470           })
2471 
2472 #' @rdname arrange
2473 #' @aliases orderBy,SparkDataFrame,characterOrColumn-method
2474 #' @note orderBy(SparkDataFrame, characterOrColumn) since 1.4.0
2475 setMethod("orderBy",
2476           signature(x = "SparkDataFrame", col = "characterOrColumn"),
2477           function(x, col, ...) {
2478             arrange(x, col, ...)
2479           })
2480 
2481 #' Filter
2482 #'
2483 #' Filter the rows of a SparkDataFrame according to a given condition.
2484 #'
2485 #' @param x A SparkDataFrame to be sorted.
2486 #' @param condition The condition to filter on. This may either be a Column expression
2487 #' or a string containing a SQL statement
2488 #' @return A SparkDataFrame containing only the rows that meet the condition.
2489 #' @family SparkDataFrame functions
2490 #' @aliases filter,SparkDataFrame,characterOrColumn-method
2491 #' @rdname filter
2492 #' @name filter
2493 #' @family subsetting functions
2494 #' @examples
2495 #'\dontrun{
2496 #' sparkR.session()
2497 #' path <- "path/to/file.json"
2498 #' df <- read.json(path)
2499 #' filter(df, "col1 > 0")
2500 #' filter(df, df$col2 != "abcdefg")
2501 #' }
2502 #' @note filter since 1.4.0
2503 setMethod("filter",
2504           signature(x = "SparkDataFrame", condition = "characterOrColumn"),
2505           function(x, condition) {
2506             if (class(condition) == "Column") {
2507               condition <- condition@jc
2508             }
2509             sdf <- callJMethod(x@sdf, "filter", condition)
2510             dataFrame(sdf)
2511           })
2512 
2513 #' @rdname filter
2514 #' @name where
2515 #' @aliases where,SparkDataFrame,characterOrColumn-method
2516 #' @note where since 1.4.0
2517 setMethod("where",
2518           signature(x = "SparkDataFrame", condition = "characterOrColumn"),
2519           function(x, condition) {
2520             filter(x, condition)
2521           })
2522 
2523 #' dropDuplicates
2524 #'
2525 #' Returns a new SparkDataFrame with duplicate rows removed, considering only
2526 #' the subset of columns.
2527 #'
2528 #' @param x A SparkDataFrame.
2529 #' @param ... A character vector of column names or string column names.
2530 #'            If the first argument contains a character vector, the followings are ignored.
2531 #' @return A SparkDataFrame with duplicate rows removed.
2532 #' @family SparkDataFrame functions
2533 #' @aliases dropDuplicates,SparkDataFrame-method
2534 #' @rdname dropDuplicates
2535 #' @name dropDuplicates
2536 #' @examples
2537 #'\dontrun{
2538 #' sparkR.session()
2539 #' path <- "path/to/file.json"
2540 #' df <- read.json(path)
2541 #' dropDuplicates(df)
2542 #' dropDuplicates(df, "col1", "col2")
2543 #' dropDuplicates(df, c("col1", "col2"))
2544 #' }
2545 #' @note dropDuplicates since 2.0.0
2546 setMethod("dropDuplicates",
2547           signature(x = "SparkDataFrame"),
2548           function(x, ...) {
2549             cols <- list(...)
2550             if (length(cols) == 0) {
2551               sdf <- callJMethod(x@sdf, "dropDuplicates", as.list(columns(x)))
2552             } else {
2553               if (!all(sapply(cols, function(c) { is.character(c) }))) {
2554                 stop("all columns names should be characters")
2555               }
2556               col <- cols[[1]]
2557               if (length(col) > 1) {
2558                 sdf <- callJMethod(x@sdf, "dropDuplicates", as.list(col))
2559               } else {
2560                 sdf <- callJMethod(x@sdf, "dropDuplicates", cols)
2561               }
2562             }
2563             dataFrame(sdf)
2564           })
2565 
2566 #' Join
2567 #'
2568 #' Joins two SparkDataFrames based on the given join expression.
2569 #'
2570 #' @param x A SparkDataFrame
2571 #' @param y A SparkDataFrame
2572 #' @param joinExpr (Optional) The expression used to perform the join. joinExpr must be a
2573 #' Column expression. If joinExpr is omitted, the default, inner join is attempted and an error is
2574 #' thrown if it would be a Cartesian Product. For Cartesian join, use crossJoin instead.
2575 #' @param joinType The type of join to perform, default 'inner'.
2576 #' Must be one of: 'inner', 'cross', 'outer', 'full', 'fullouter', 'full_outer',
2577 #' 'left', 'leftouter', 'left_outer', 'right', 'rightouter', 'right_outer', 'semi',
2578 #' 'leftsemi', 'left_semi', 'anti', 'leftanti', 'left_anti'.
2579 #' @return A SparkDataFrame containing the result of the join operation.
2580 #' @family SparkDataFrame functions
2581 #' @aliases join,SparkDataFrame,SparkDataFrame-method
2582 #' @rdname join
2583 #' @name join
2584 #' @seealso \link{merge} \link{crossJoin}
2585 #' @examples
2586 #'\dontrun{
2587 #' sparkR.session()
2588 #' df1 <- read.json(path)
2589 #' df2 <- read.json(path2)
2590 #' join(df1, df2, df1$col1 == df2$col2) # Performs an inner join based on expression
2591 #' join(df1, df2, df1$col1 == df2$col2, "right_outer")
2592 #' join(df1, df2) # Attempts an inner join
2593 #' }
2594 #' @note join since 1.4.0
2595 setMethod("join",
2596           signature(x = "SparkDataFrame", y = "SparkDataFrame"),
2597           function(x, y, joinExpr = NULL, joinType = NULL) {
2598             if (is.null(joinExpr)) {
2599               # this may not fail until the planner checks for Cartesian join later on.
2600               sdf <- callJMethod(x@sdf, "join", y@sdf)
2601             } else {
2602               if (class(joinExpr) != "Column") stop("joinExpr must be a Column")
2603               if (is.null(joinType)) {
2604                 sdf <- callJMethod(x@sdf, "join", y@sdf, joinExpr@jc)
2605               } else {
2606                 validJoinTypes <- c("inner", "cross",
2607                     "outer", "full", "fullouter", "full_outer",
2608                     "left", "leftouter", "left_outer",
2609                     "right", "rightouter", "right_outer",
2610                     "semi", "leftsemi", "left_semi", "anti", "leftanti", "left_anti")
2611                 if (joinType %in% validJoinTypes) {
2612                   joinType <- gsub("_", "", joinType, fixed = TRUE)
2613                   sdf <- callJMethod(x@sdf, "join", y@sdf, joinExpr@jc, joinType)
2614                 } else {
2615                   stop("joinType must be one of the following types: ",
2616                        "'", paste(validJoinTypes, collapse = "', '"), "'")
2617                 }
2618               }
2619             }
2620             dataFrame(sdf)
2621           })
2622 
2623 #' CrossJoin
2624 #'
2625 #' Returns Cartesian Product on two SparkDataFrames.
2626 #'
2627 #' @param x A SparkDataFrame
2628 #' @param y A SparkDataFrame
2629 #' @return A SparkDataFrame containing the result of the join operation.
2630 #' @family SparkDataFrame functions
2631 #' @aliases crossJoin,SparkDataFrame,SparkDataFrame-method
2632 #' @rdname crossJoin
2633 #' @name crossJoin
2634 #' @seealso \link{merge} \link{join}
2635 #' @examples
2636 #'\dontrun{
2637 #' sparkR.session()
2638 #' df1 <- read.json(path)
2639 #' df2 <- read.json(path2)
2640 #' crossJoin(df1, df2) # Performs a Cartesian
2641 #' }
2642 #' @note crossJoin since 2.1.0
2643 setMethod("crossJoin",
2644           signature(x = "SparkDataFrame", y = "SparkDataFrame"),
2645           function(x, y) {
2646             sdf <- callJMethod(x@sdf, "crossJoin", y@sdf)
2647             dataFrame(sdf)
2648           })
2649 
2650 #' Merges two data frames
2651 #'
2652 #' @name merge
2653 #' @param x the first data frame to be joined.
2654 #' @param y the second data frame to be joined.
2655 #' @param by a character vector specifying the join columns. If by is not
2656 #'   specified, the common column names in \code{x} and \code{y} will be used.
2657 #'   If by or both by.x and by.y are explicitly set to NULL or of length 0, the Cartesian
2658 #'   Product of x and y will be returned.
2659 #' @param by.x a character vector specifying the joining columns for x.
2660 #' @param by.y a character vector specifying the joining columns for y.
2661 #' @param all a boolean value setting \code{all.x} and \code{all.y}
2662 #'            if any of them are unset.
2663 #' @param all.x a boolean value indicating whether all the rows in x should
2664 #'              be including in the join.
2665 #' @param all.y a boolean value indicating whether all the rows in y should
2666 #'              be including in the join.
2667 #' @param sort a logical argument indicating whether the resulting columns should be sorted.
2668 #' @param suffixes a string vector of length 2 used to make colnames of
2669 #'                 \code{x} and \code{y} unique.
2670 #'                 The first element is appended to each colname of \code{x}.
2671 #'                 The second element is appended to each colname of \code{y}.
2672 #' @param ... additional argument(s) passed to the method.
2673 #' @details  If all.x and all.y are set to FALSE, a natural join will be returned. If
2674 #'   all.x is set to TRUE and all.y is set to FALSE, a left outer join will
2675 #'   be returned. If all.x is set to FALSE and all.y is set to TRUE, a right
2676 #'   outer join will be returned. If all.x and all.y are set to TRUE, a full
2677 #'   outer join will be returned.
2678 #' @family SparkDataFrame functions
2679 #' @aliases merge,SparkDataFrame,SparkDataFrame-method
2680 #' @rdname merge
2681 #' @seealso \link{join} \link{crossJoin}
2682 #' @examples
2683 #'\dontrun{
2684 #' sparkR.session()
2685 #' df1 <- read.json(path)
2686 #' df2 <- read.json(path2)
2687 #' merge(df1, df2) # Performs an inner join by common columns
2688 #' merge(df1, df2, by = "col1") # Performs an inner join based on expression
2689 #' merge(df1, df2, by.x = "col1", by.y = "col2", all.y = TRUE)
2690 #' merge(df1, df2, by.x = "col1", by.y = "col2", all.x = TRUE)
2691 #' merge(df1, df2, by.x = "col1", by.y = "col2", all.x = TRUE, all.y = TRUE)
2692 #' merge(df1, df2, by.x = "col1", by.y = "col2", all = TRUE, sort = FALSE)
2693 #' merge(df1, df2, by = "col1", all = TRUE, suffixes = c("-X", "-Y"))
2694 #' merge(df1, df2, by = NULL) # Performs a Cartesian join
2695 #' }
2696 #' @note merge since 1.5.0
2697 setMethod("merge",
2698           signature(x = "SparkDataFrame", y = "SparkDataFrame"),
2699           function(x, y, by = intersect(names(x), names(y)), by.x = by, by.y = by,
2700                    all = FALSE, all.x = all, all.y = all,
2701                    sort = TRUE, suffixes = c("_x", "_y"), ...) {
2702 
2703             if (length(suffixes) != 2) {
2704               stop("suffixes must have length 2")
2705             }
2706 
2707             # join type is identified based on the values of all, all.x and all.y
2708             # default join type is inner, according to R it should be natural but since it
2709             # is not supported in spark inner join is used
2710             joinType <- "inner"
2711             if (all || (all.x && all.y)) {
2712               joinType <- "outer"
2713             } else if (all.x) {
2714               joinType <- "left_outer"
2715             } else if (all.y) {
2716               joinType <- "right_outer"
2717             }
2718 
2719             # join expression is based on by.x, by.y if both by.x and by.y are not missing
2720             # or on by, if by.x or by.y are missing or have different lengths
2721             if (length(by.x) > 0 && length(by.x) == length(by.y)) {
2722               joinX <- by.x
2723               joinY <- by.y
2724             } else if (length(by) > 0) {
2725               # if join columns have the same name for both dataframes,
2726               # they are used in join expression
2727               joinX <- by
2728               joinY <- by
2729             } else {
2730               # if by or both by.x and by.y have length 0, use Cartesian Product
2731               joinRes <- crossJoin(x, y)
2732               return(joinRes)
2733             }
2734 
2735             # sets alias for making colnames unique in dataframes 'x' and 'y'
2736             colsX <- genAliasesForIntersectedCols(x, by, suffixes[1])
2737             colsY <- genAliasesForIntersectedCols(y, by, suffixes[2])
2738 
2739             # selects columns with their aliases from dataframes
2740             # in case same column names are present in both data frames
2741             xsel <- select(x, colsX)
2742             ysel <- select(y, colsY)
2743 
2744             # generates join conditions and adds them into a list
2745             # it also considers alias names of the columns while generating join conditions
2746             joinColumns <- lapply(seq_len(length(joinX)), function(i) {
2747               colX <- joinX[[i]]
2748               colY <- joinY[[i]]
2749 
2750               if (colX %in% by) {
2751                 colX <- paste0(colX, suffixes[1])
2752               }
2753               if (colY %in% by) {
2754                 colY <- paste0(colY, suffixes[2])
2755               }
2756 
2757               colX <- getColumn(xsel, colX)
2758               colY <- getColumn(ysel, colY)
2759 
2760               colX == colY
2761             })
2762 
2763             # concatenates join columns with '&' and executes join
2764             joinExpr <- Reduce("&", joinColumns)
2765             joinRes <- join(xsel, ysel, joinExpr, joinType)
2766 
2767             # sorts the result by 'by' columns if sort = TRUE
2768             if (sort && length(by) > 0) {
2769               colNameWithSuffix <- paste0(by, suffixes[2])
2770               joinRes <- do.call("arrange", c(joinRes, colNameWithSuffix, decreasing = FALSE))
2771             }
2772 
2773             joinRes
2774           })
2775 
2776 #' Creates a list of columns by replacing the intersected ones with aliases
2777 #'
2778 #' Creates a list of columns by replacing the intersected ones with aliases.
2779 #' The name of the alias column is formed by concatanating the original column name and a suffix.
2780 #'
2781 #' @param x a SparkDataFrame
2782 #' @param intersectedColNames a list of intersected column names of the SparkDataFrame
2783 #' @param suffix a suffix for the column name
2784 #' @return list of columns
2785 #' @noRd
2786 genAliasesForIntersectedCols <- function(x, intersectedColNames, suffix) {
2787   allColNames <- names(x)
2788   # sets alias for making colnames unique in dataframe 'x'
2789   cols <- lapply(allColNames, function(colName) {
2790     col <- getColumn(x, colName)
2791     if (colName %in% intersectedColNames) {
2792       newJoin <- paste0(colName, suffix)
2793       if (newJoin %in% allColNames) {
2794         stop("The following column name: ", newJoin, " occurs more than once in the 'DataFrame'.",
2795           "Please use different suffixes for the intersected columns.")
2796       }
2797       col <- alias(col, newJoin)
2798     }
2799     col
2800   })
2801   cols
2802 }
2803 
2804 #' Return a new SparkDataFrame containing the union of rows
2805 #'
2806 #' Return a new SparkDataFrame containing the union of rows in this SparkDataFrame
2807 #' and another SparkDataFrame. This is equivalent to \code{UNION ALL} in SQL.
2808 #' Input SparkDataFrames can have different schemas (names and data types).
2809 #'
2810 #' Note: This does not remove duplicate rows across the two SparkDataFrames.
2811 #' Also as standard in SQL, this function resolves columns by position (not by name).
2812 #'
2813 #' @param x A SparkDataFrame
2814 #' @param y A SparkDataFrame
2815 #' @return A SparkDataFrame containing the result of the union.
2816 #' @family SparkDataFrame functions
2817 #' @rdname union
2818 #' @name union
2819 #' @aliases union,SparkDataFrame,SparkDataFrame-method
2820 #' @seealso \link{rbind} \link{unionByName}
2821 #' @examples
2822 #'\dontrun{
2823 #' sparkR.session()
2824 #' df1 <- read.json(path)
2825 #' df2 <- read.json(path2)
2826 #' unioned <- union(df, df2)
2827 #' unions <- rbind(df, df2, df3, df4)
2828 #' }
2829 #' @note union since 2.0.0
2830 setMethod("union",
2831           signature(x = "SparkDataFrame", y = "SparkDataFrame"),
2832           function(x, y) {
2833             unioned <- callJMethod(x@sdf, "union", y@sdf)
2834             dataFrame(unioned)
2835           })
2836 
2837 #' Return a new SparkDataFrame containing the union of rows.
2838 #'
2839 #' This is an alias for \code{union}.
2840 #'
2841 #' @param x a SparkDataFrame.
2842 #' @param y a SparkDataFrame.
2843 #' @return A SparkDataFrame containing the result of the unionAll operation.
2844 #' @family SparkDataFrame functions
2845 #' @aliases unionAll,SparkDataFrame,SparkDataFrame-method
2846 #' @rdname unionAll
2847 #' @name unionAll
2848 #' @seealso \link{union}
2849 #' @examples
2850 #'\dontrun{
2851 #' sparkR.session()
2852 #' df1 <- read.json(path)
2853 #' df2 <- read.json(path2)
2854 #' unionAllDF <- unionAll(df1, df2)
2855 #' }
2856 #' @note unionAll since 1.4.0
2857 setMethod("unionAll",
2858           signature(x = "SparkDataFrame", y = "SparkDataFrame"),
2859           function(x, y) {
2860             union(x, y)
2861           })
2862 
2863 #' Return a new SparkDataFrame containing the union of rows, matched by column names
2864 #'
2865 #' Return a new SparkDataFrame containing the union of rows in this SparkDataFrame
2866 #' and another SparkDataFrame. This is different from \code{union} function, and both
2867 #' \code{UNION ALL} and \code{UNION DISTINCT} in SQL as column positions are not taken
2868 #' into account. Input SparkDataFrames can have different data types in the schema.
2869 #'
2870 #' Note: This does not remove duplicate rows across the two SparkDataFrames.
2871 #' This function resolves columns by name (not by position).
2872 #'
2873 #' @param x A SparkDataFrame
2874 #' @param y A SparkDataFrame
2875 #' @return A SparkDataFrame containing the result of the union.
2876 #' @family SparkDataFrame functions
2877 #' @rdname unionByName
2878 #' @name unionByName
2879 #' @aliases unionByName,SparkDataFrame,SparkDataFrame-method
2880 #' @seealso \link{rbind} \link{union}
2881 #' @examples
2882 #'\dontrun{
2883 #' sparkR.session()
2884 #' df1 <- select(createDataFrame(mtcars), "carb", "am", "gear")
2885 #' df2 <- select(createDataFrame(mtcars), "am", "gear", "carb")
2886 #' head(unionByName(df1, df2))
2887 #' }
2888 #' @note unionByName since 2.3.0
2889 setMethod("unionByName",
2890           signature(x = "SparkDataFrame", y = "SparkDataFrame"),
2891           function(x, y) {
2892             unioned <- callJMethod(x@sdf, "unionByName", y@sdf)
2893             dataFrame(unioned)
2894           })
2895 
2896 #' Union two or more SparkDataFrames
2897 #'
2898 #' Union two or more SparkDataFrames by row. As in R's \code{rbind}, this method
2899 #' requires that the input SparkDataFrames have the same column names.
2900 #'
2901 #' Note: This does not remove duplicate rows across the two SparkDataFrames.
2902 #'
2903 #' @param x a SparkDataFrame.
2904 #' @param ... additional SparkDataFrame(s).
2905 #' @param deparse.level currently not used (put here to match the signature of
2906 #'                      the base implementation).
2907 #' @return A SparkDataFrame containing the result of the union.
2908 #' @family SparkDataFrame functions
2909 #' @aliases rbind,SparkDataFrame-method
2910 #' @rdname rbind
2911 #' @name rbind
2912 #' @seealso \link{union} \link{unionByName}
2913 #' @examples
2914 #'\dontrun{
2915 #' sparkR.session()
2916 #' unions <- rbind(df, df2, df3, df4)
2917 #' }
2918 #' @note rbind since 1.5.0
2919 setMethod("rbind",
2920           signature(... = "SparkDataFrame"),
2921           function(x, ..., deparse.level = 1) {
2922             nm <- lapply(list(x, ...), names)
2923             if (length(unique(nm)) != 1) {
2924               stop("Names of input data frames are different.")
2925             }
2926             if (nargs() == 3) {
2927               union(x, ...)
2928             } else {
2929               union(x, Recall(..., deparse.level = 1))
2930             }
2931           })
2932 
2933 #' Intersect
2934 #'
2935 #' Return a new SparkDataFrame containing rows only in both this SparkDataFrame
2936 #' and another SparkDataFrame. This is equivalent to \code{INTERSECT} in SQL.
2937 #'
2938 #' @param x A SparkDataFrame
2939 #' @param y A SparkDataFrame
2940 #' @return A SparkDataFrame containing the result of the intersect.
2941 #' @family SparkDataFrame functions
2942 #' @aliases intersect,SparkDataFrame,SparkDataFrame-method
2943 #' @rdname intersect
2944 #' @name intersect
2945 #' @examples
2946 #'\dontrun{
2947 #' sparkR.session()
2948 #' df1 <- read.json(path)
2949 #' df2 <- read.json(path2)
2950 #' intersectDF <- intersect(df, df2)
2951 #' }
2952 #' @note intersect since 1.4.0
2953 setMethod("intersect",
2954           signature(x = "SparkDataFrame", y = "SparkDataFrame"),
2955           function(x, y) {
2956             intersected <- callJMethod(x@sdf, "intersect", y@sdf)
2957             dataFrame(intersected)
2958           })
2959 
2960 #' intersectAll
2961 #'
2962 #' Return a new SparkDataFrame containing rows in both this SparkDataFrame
2963 #' and another SparkDataFrame while preserving the duplicates.
2964 #' This is equivalent to \code{INTERSECT ALL} in SQL. Also as standard in
2965 #' SQL, this function resolves columns by position (not by name).
2966 #'
2967 #' @param x a SparkDataFrame.
2968 #' @param y a SparkDataFrame.
2969 #' @return A SparkDataFrame containing the result of the intersect all operation.
2970 #' @family SparkDataFrame functions
2971 #' @aliases intersectAll,SparkDataFrame,SparkDataFrame-method
2972 #' @rdname intersectAll
2973 #' @name intersectAll
2974 #' @examples
2975 #'\dontrun{
2976 #' sparkR.session()
2977 #' df1 <- read.json(path)
2978 #' df2 <- read.json(path2)
2979 #' intersectAllDF <- intersectAll(df1, df2)
2980 #' }
2981 #' @note intersectAll since 2.4.0
2982 setMethod("intersectAll",
2983           signature(x = "SparkDataFrame", y = "SparkDataFrame"),
2984           function(x, y) {
2985             intersected <- callJMethod(x@sdf, "intersectAll", y@sdf)
2986             dataFrame(intersected)
2987           })
2988 
2989 #' except
2990 #'
2991 #' Return a new SparkDataFrame containing rows in this SparkDataFrame
2992 #' but not in another SparkDataFrame. This is equivalent to \code{EXCEPT DISTINCT} in SQL.
2993 #'
2994 #' @param x a SparkDataFrame.
2995 #' @param y a SparkDataFrame.
2996 #' @return A SparkDataFrame containing the result of the except operation.
2997 #' @family SparkDataFrame functions
2998 #' @aliases except,SparkDataFrame,SparkDataFrame-method
2999 #' @rdname except
3000 #' @name except
3001 #' @examples
3002 #'\dontrun{
3003 #' sparkR.session()
3004 #' df1 <- read.json(path)
3005 #' df2 <- read.json(path2)
3006 #' exceptDF <- except(df, df2)
3007 #' }
3008 #' @note except since 1.4.0
3009 setMethod("except",
3010           signature(x = "SparkDataFrame", y = "SparkDataFrame"),
3011           function(x, y) {
3012             excepted <- callJMethod(x@sdf, "except", y@sdf)
3013             dataFrame(excepted)
3014           })
3015 
3016 #' exceptAll
3017 #'
3018 #' Return a new SparkDataFrame containing rows in this SparkDataFrame
3019 #' but not in another SparkDataFrame while preserving the duplicates.
3020 #' This is equivalent to \code{EXCEPT ALL} in SQL. Also as standard in
3021 #' SQL, this function resolves columns by position (not by name).
3022 #'
3023 #' @param x a SparkDataFrame.
3024 #' @param y a SparkDataFrame.
3025 #' @return A SparkDataFrame containing the result of the except all operation.
3026 #' @family SparkDataFrame functions
3027 #' @aliases exceptAll,SparkDataFrame,SparkDataFrame-method
3028 #' @rdname exceptAll
3029 #' @name exceptAll
3030 #' @examples
3031 #'\dontrun{
3032 #' sparkR.session()
3033 #' df1 <- read.json(path)
3034 #' df2 <- read.json(path2)
3035 #' exceptAllDF <- exceptAll(df1, df2)
3036 #' }
3037 #' @note exceptAll since 2.4.0
3038 setMethod("exceptAll",
3039           signature(x = "SparkDataFrame", y = "SparkDataFrame"),
3040           function(x, y) {
3041             excepted <- callJMethod(x@sdf, "exceptAll", y@sdf)
3042             dataFrame(excepted)
3043           })
3044 
3045 #' Save the contents of SparkDataFrame to a data source.
3046 #'
3047 #' The data source is specified by the \code{source} and a set of options (...).
3048 #' If \code{source} is not specified, the default data source configured by
3049 #' spark.sql.sources.default will be used.
3050 #'
3051 #' Additionally, mode is used to specify the behavior of the save operation when data already
3052 #' exists in the data source. There are four modes:
3053 #' \itemize{
3054 #'   \item 'append': Contents of this SparkDataFrame are expected to be appended to existing data.
3055 #'   \item 'overwrite': Existing data is expected to be overwritten by the contents of this
3056 #'         SparkDataFrame.
3057 #'   \item 'error' or 'errorifexists': An exception is expected to be thrown.
3058 #'   \item 'ignore': The save operation is expected to not save the contents of the SparkDataFrame
3059 #'         and to not change the existing data.
3060 #' }
3061 #'
3062 #' @param df a SparkDataFrame.
3063 #' @param path a name for the table.
3064 #' @param source a name for external data source.
3065 #' @param mode one of 'append', 'overwrite', 'error', 'errorifexists', 'ignore'
3066 #'             save mode (it is 'error' by default)
3067 #' @param partitionBy a name or a list of names of columns to partition the output by on the file
3068 #'                    system. If specified, the output is laid out on the file system similar
3069 #'                    to Hive's partitioning scheme.
3070 #' @param ... additional argument(s) passed to the method.
3071 #'
3072 #' @family SparkDataFrame functions
3073 #' @aliases write.df,SparkDataFrame-method
3074 #' @rdname write.df
3075 #' @name write.df
3076 #' @examples
3077 #'\dontrun{
3078 #' sparkR.session()
3079 #' path <- "path/to/file.json"
3080 #' df <- read.json(path)
3081 #' write.df(df, "myfile", "parquet", "overwrite", partitionBy = c("col1", "col2"))
3082 #' saveDF(df, parquetPath2, "parquet", mode = "append", mergeSchema = TRUE)
3083 #' }
3084 #' @note write.df since 1.4.0
3085 setMethod("write.df",
3086           signature(df = "SparkDataFrame"),
3087           function(df, path = NULL, source = NULL, mode = "error", partitionBy = NULL, ...) {
3088             if (!is.null(path) && !is.character(path)) {
3089               stop("path should be character, NULL or omitted.")
3090             }
3091             if (!is.null(source) && !is.character(source)) {
3092               stop("source should be character, NULL or omitted. It is the datasource specified ",
3093                    "in 'spark.sql.sources.default' configuration by default.")
3094             }
3095             if (!is.character(mode)) {
3096               stop("mode should be character or omitted. It is 'error' by default.")
3097             }
3098             if (is.null(source)) {
3099               source <- getDefaultSqlSource()
3100             }
3101             cols <- NULL
3102             if (!is.null(partitionBy)) {
3103               if (!all(sapply(partitionBy, function(c) is.character(c)))) {
3104                 stop("All partitionBy column names should be characters.")
3105               }
3106               cols <- as.list(partitionBy)
3107             }
3108             write <- callJMethod(df@sdf, "write")
3109             write <- callJMethod(write, "format", source)
3110             if (!is.null(cols)) {
3111               write <- callJMethod(write, "partitionBy", cols)
3112             }
3113             write <- setWriteOptions(write, path = path, mode = mode, ...)
3114             write <- handledCallJMethod(write, "save")
3115           })
3116 
3117 #' @rdname write.df
3118 #' @name saveDF
3119 #' @aliases saveDF,SparkDataFrame,character-method
3120 #' @note saveDF since 1.4.0
3121 setMethod("saveDF",
3122           signature(df = "SparkDataFrame", path = "character"),
3123           function(df, path, source = NULL, mode = "error", ...) {
3124             write.df(df, path, source, mode, ...)
3125           })
3126 
3127 #' Save the contents of the SparkDataFrame to a data source as a table
3128 #'
3129 #' The data source is specified by the \code{source} and a set of options (...).
3130 #' If \code{source} is not specified, the default data source configured by
3131 #' spark.sql.sources.default will be used.
3132 #'
3133 #' Additionally, mode is used to specify the behavior of the save operation when
3134 #' data already exists in the data source. There are four modes: \cr
3135 #'  'append': Contents of this SparkDataFrame are expected to be appended to existing data. \cr
3136 #'  'overwrite': Existing data is expected to be overwritten by the contents of this
3137 #'     SparkDataFrame. \cr
3138 #'  'error' or 'errorifexists': An exception is expected to be thrown. \cr
3139 #'  'ignore': The save operation is expected to not save the contents of the SparkDataFrame
3140 #'     and to not change the existing data. \cr
3141 #'
3142 #' @param df a SparkDataFrame.
3143 #' @param tableName a name for the table.
3144 #' @param source a name for external data source.
3145 #' @param mode one of 'append', 'overwrite', 'error', 'errorifexists', 'ignore'
3146 #'             save mode (it is 'error' by default)
3147 #' @param ... additional option(s) passed to the method.
3148 #'
3149 #' @family SparkDataFrame functions
3150 #' @aliases saveAsTable,SparkDataFrame,character-method
3151 #' @rdname saveAsTable
3152 #' @name saveAsTable
3153 #' @examples
3154 #'\dontrun{
3155 #' sparkR.session()
3156 #' path <- "path/to/file.json"
3157 #' df <- read.json(path)
3158 #' saveAsTable(df, "myfile")
3159 #' }
3160 #' @note saveAsTable since 1.4.0
3161 setMethod("saveAsTable",
3162           signature(df = "SparkDataFrame", tableName = "character"),
3163           function(df, tableName, source = NULL, mode="error", ...) {
3164             if (is.null(source)) {
3165               source <- getDefaultSqlSource()
3166             }
3167             options <- varargsToStrEnv(...)
3168 
3169             write <- callJMethod(df@sdf, "write")
3170             write <- callJMethod(write, "format", source)
3171             write <- setWriteMode(write, mode)
3172             write <- callJMethod(write, "options", options)
3173             invisible(callJMethod(write, "saveAsTable", tableName))
3174           })
3175 
3176 #' describe
3177 #'
3178 #' Computes statistics for numeric and string columns.
3179 #' If no columns are given, this function computes statistics for all numerical or string columns.
3180 #'
3181 #' @param x a SparkDataFrame to be computed.
3182 #' @param col a string of name.
3183 #' @param ... additional expressions.
3184 #' @return A SparkDataFrame.
3185 #' @family SparkDataFrame functions
3186 #' @aliases describe,SparkDataFrame,character-method describe,SparkDataFrame,ANY-method
3187 #' @rdname describe
3188 #' @name describe
3189 #' @examples
3190 #'\dontrun{
3191 #' sparkR.session()
3192 #' path <- "path/to/file.json"
3193 #' df <- read.json(path)
3194 #' describe(df)
3195 #' describe(df, "col1")
3196 #' describe(df, "col1", "col2")
3197 #' }
3198 #' @seealso See \link{summary} for expanded statistics and control over which statistics to compute.
3199 #' @note describe(SparkDataFrame, character) since 1.4.0
3200 setMethod("describe",
3201           signature(x = "SparkDataFrame", col = "character"),
3202           function(x, col, ...) {
3203             colList <- list(col, ...)
3204             sdf <- callJMethod(x@sdf, "describe", colList)
3205             dataFrame(sdf)
3206           })
3207 
3208 #' @rdname describe
3209 #' @name describe
3210 #' @aliases describe,SparkDataFrame-method
3211 #' @note describe(SparkDataFrame) since 1.4.0
3212 setMethod("describe",
3213           signature(x = "SparkDataFrame"),
3214           function(x) {
3215             sdf <- callJMethod(x@sdf, "describe", list())
3216             dataFrame(sdf)
3217           })
3218 
3219 #' summary
3220 #'
3221 #' Computes specified statistics for numeric and string columns. Available statistics are:
3222 #' \itemize{
3223 #' \item count
3224 #' \item mean
3225 #' \item stddev
3226 #' \item min
3227 #' \item max
3228 #' \item arbitrary approximate percentiles specified as a percentage (eg, "75\%")
3229 #' }
3230 #' If no statistics are given, this function computes count, mean, stddev, min,
3231 #' approximate quartiles (percentiles at 25\%, 50\%, and 75\%), and max.
3232 #' This function is meant for exploratory data analysis, as we make no guarantee about the
3233 #' backward compatibility of the schema of the resulting Dataset. If you want to
3234 #' programmatically compute summary statistics, use the \code{agg} function instead.
3235 #'
3236 #'
3237 #' @param object a SparkDataFrame to be summarized.
3238 #' @param ... (optional) statistics to be computed for all columns.
3239 #' @return A SparkDataFrame.
3240 #' @family SparkDataFrame functions
3241 #' @rdname summary
3242 #' @name summary
3243 #' @aliases summary,SparkDataFrame-method
3244 #' @examples
3245 #'\dontrun{
3246 #' sparkR.session()
3247 #' path <- "path/to/file.json"
3248 #' df <- read.json(path)
3249 #' summary(df)
3250 #' summary(df, "min", "25%", "75%", "max")
3251 #' summary(select(df, "age", "height"))
3252 #' }
3253 #' @note summary(SparkDataFrame) since 1.5.0
3254 #' @note The statistics provided by \code{summary} were change in 2.3.0 use \link{describe} for
3255 #'       previous defaults.
3256 #' @seealso \link{describe}
3257 setMethod("summary",
3258           signature(object = "SparkDataFrame"),
3259           function(object, ...) {
3260             statisticsList <- list(...)
3261             sdf <- callJMethod(object@sdf, "summary", statisticsList)
3262             dataFrame(sdf)
3263           })
3264 
3265 
3266 #' A set of SparkDataFrame functions working with NA values
3267 #'
3268 #' dropna, na.omit - Returns a new SparkDataFrame omitting rows with null values.
3269 #'
3270 #' @param x a SparkDataFrame.
3271 #' @param how "any" or "all".
3272 #'            if "any", drop a row if it contains any nulls.
3273 #'            if "all", drop a row only if all its values are null.
3274 #'            if \code{minNonNulls} is specified, how is ignored.
3275 #' @param minNonNulls if specified, drop rows that have less than
3276 #'                    \code{minNonNulls} non-null values.
3277 #'                    This overwrites the how parameter.
3278 #' @param cols optional list of column names to consider. In \code{fillna},
3279 #'             columns specified in cols that do not have matching data
3280 #'             type are ignored. For example, if value is a character, and
3281 #'             subset contains a non-character column, then the non-character
3282 #'             column is simply ignored.
3283 #' @return A SparkDataFrame.
3284 #'
3285 #' @family SparkDataFrame functions
3286 #' @rdname nafunctions
3287 #' @aliases dropna,SparkDataFrame-method
3288 #' @name dropna
3289 #' @examples
3290 #'\dontrun{
3291 #' sparkR.session()
3292 #' path <- "path/to/file.json"
3293 #' df <- read.json(path)
3294 #' dropna(df)
3295 #' }
3296 #' @note dropna since 1.4.0
3297 setMethod("dropna",
3298           signature(x = "SparkDataFrame"),
3299           function(x, how = c("any", "all"), minNonNulls = NULL, cols = NULL) {
3300             how <- match.arg(how)
3301             if (is.null(cols)) {
3302               cols <- columns(x)
3303             }
3304             if (is.null(minNonNulls)) {
3305               minNonNulls <- if (how == "any") { length(cols) } else { 1 }
3306             }
3307 
3308             naFunctions <- callJMethod(x@sdf, "na")
3309             sdf <- callJMethod(naFunctions, "drop",
3310                                as.integer(minNonNulls), as.list(cols))
3311             dataFrame(sdf)
3312           })
3313 
3314 #' @param object a SparkDataFrame.
3315 #' @param ... further arguments to be passed to or from other methods.
3316 #' @rdname nafunctions
3317 #' @name na.omit
3318 #' @aliases na.omit,SparkDataFrame-method
3319 #' @note na.omit since 1.5.0
3320 setMethod("na.omit",
3321           signature(object = "SparkDataFrame"),
3322           function(object, how = c("any", "all"), minNonNulls = NULL, cols = NULL) {
3323             dropna(object, how, minNonNulls, cols)
3324           })
3325 
3326 #' fillna - Replace null values.
3327 #'
3328 #' @param value value to replace null values with.
3329 #'              Should be an integer, numeric, character or named list.
3330 #'              If the value is a named list, then cols is ignored and
3331 #'              value must be a mapping from column name (character) to
3332 #'              replacement value. The replacement value must be an
3333 #'              integer, numeric or character.
3334 #'
3335 #' @rdname nafunctions
3336 #' @name fillna
3337 #' @aliases fillna,SparkDataFrame-method
3338 #' @examples
3339 #'\dontrun{
3340 #' sparkR.session()
3341 #' path <- "path/to/file.json"
3342 #' df <- read.json(path)
3343 #' fillna(df, 1)
3344 #' fillna(df, list("age" = 20, "name" = "unknown"))
3345 #' }
3346 #' @note fillna since 1.4.0
3347 setMethod("fillna",
3348           signature(x = "SparkDataFrame"),
3349           function(x, value, cols = NULL) {
3350             if (!(class(value) %in% c("integer", "numeric", "character", "list"))) {
3351               stop("value should be an integer, numeric, character or named list.")
3352             }
3353 
3354             if (class(value) == "list") {
3355               # Check column names in the named list
3356               colNames <- names(value)
3357               if (length(colNames) == 0 || !all(colNames != "")) {
3358                 stop("value should be an a named list with each name being a column name.")
3359               }
3360               # Check each item in the named list is of valid type
3361               lapply(value, function(v) {
3362                 if (!(class(v) %in% c("integer", "numeric", "character"))) {
3363                   stop("Each item in value should be an integer, numeric or character.")
3364                 }
3365               })
3366 
3367               # Convert to the named list to an environment to be passed to JVM
3368               valueMap <- convertNamedListToEnv(value)
3369 
3370               # When value is a named list, caller is expected not to pass in cols
3371               if (!is.null(cols)) {
3372                 warning("When value is a named list, cols is ignored!")
3373                 cols <- NULL
3374               }
3375 
3376               value <- valueMap
3377             } else if (is.integer(value)) {
3378               # Cast an integer to a numeric
3379               value <- as.numeric(value)
3380             }
3381 
3382             naFunctions <- callJMethod(x@sdf, "na")
3383             sdf <- if (length(cols) == 0) {
3384               callJMethod(naFunctions, "fill", value)
3385             } else {
3386               callJMethod(naFunctions, "fill", value, as.list(cols))
3387             }
3388             dataFrame(sdf)
3389           })
3390 
3391 #' Download data from a SparkDataFrame into a R data.frame
3392 #'
3393 #' This function downloads the contents of a SparkDataFrame into an R's data.frame.
3394 #' Since data.frames are held in memory, ensure that you have enough memory
3395 #' in your system to accommodate the contents.
3396 #'
3397 #' @param x a SparkDataFrame.
3398 #' @param row.names \code{NULL} or a character vector giving the row names for the data frame.
3399 #' @param optional If \code{TRUE}, converting column names is optional.
3400 #' @param ... additional arguments to pass to base::as.data.frame.
3401 #' @return A data.frame.
3402 #' @family SparkDataFrame functions
3403 #' @aliases as.data.frame,SparkDataFrame-method
3404 #' @rdname as.data.frame
3405 #' @examples
3406 #' \dontrun{
3407 #' irisDF <- createDataFrame(iris)
3408 #' df <- as.data.frame(irisDF[irisDF$Species == "setosa", ])
3409 #' }
3410 #' @note as.data.frame since 1.6.0
3411 setMethod("as.data.frame",
3412           signature(x = "SparkDataFrame"),
3413           function(x, row.names = NULL, optional = FALSE, ...) {
3414             as.data.frame(collect(x), row.names, optional, ...)
3415           })
3416 
3417 #' Attach SparkDataFrame to R search path
3418 #'
3419 #' The specified SparkDataFrame is attached to the R search path. This means that
3420 #' the SparkDataFrame is searched by R when evaluating a variable, so columns in
3421 #' the SparkDataFrame can be accessed by simply giving their names.
3422 #'
3423 #' @family SparkDataFrame functions
3424 #' @rdname attach
3425 #' @aliases attach attach,SparkDataFrame-method
3426 #' @param what (SparkDataFrame) The SparkDataFrame to attach
3427 #' @param pos (integer) Specify position in search() where to attach.
3428 #' @param name (character) Name to use for the attached SparkDataFrame. Names
3429 #'   starting with package: are reserved for library.
3430 #' @param warn.conflicts (logical) If TRUE, warnings are printed about conflicts
3431 #' from attaching the database, unless that SparkDataFrame contains an object
3432 #' @examples
3433 #' \dontrun{
3434 #' attach(irisDf)
3435 #' summary(Sepal_Width)
3436 #' }
3437 #' @seealso \link{detach}
3438 #' @note attach since 1.6.0
3439 setMethod("attach",
3440           signature(what = "SparkDataFrame"),
3441           function(what, pos = 2L, name = deparse(substitute(what), backtick = FALSE),
3442                    warn.conflicts = TRUE) {
3443             args <- as.list(environment()) # capture all parameters - this must be the first line
3444             newEnv <- assignNewEnv(args$what)
3445             args$what <- newEnv
3446             do.call(attach, args)
3447           })
3448 
3449 #' Evaluate a R expression in an environment constructed from a SparkDataFrame
3450 #'
3451 #' Evaluate a R expression in an environment constructed from a SparkDataFrame
3452 #' with() allows access to columns of a SparkDataFrame by simply referring to
3453 #' their name. It appends every column of a SparkDataFrame into a new
3454 #' environment. Then, the given expression is evaluated in this new
3455 #' environment.
3456 #'
3457 #' @rdname with
3458 #' @family SparkDataFrame functions
3459 #' @aliases with,SparkDataFrame-method
3460 #' @param data (SparkDataFrame) SparkDataFrame to use for constructing an environment.
3461 #' @param expr (expression) Expression to evaluate.
3462 #' @param ... arguments to be passed to future methods.
3463 #' @examples
3464 #' \dontrun{
3465 #' with(irisDf, nrow(Sepal_Width))
3466 #' }
3467 #' @seealso \link{attach}
3468 #' @note with since 1.6.0
3469 setMethod("with",
3470           signature(data = "SparkDataFrame"),
3471           function(data, expr, ...) {
3472             newEnv <- assignNewEnv(data)
3473             eval(substitute(expr), envir = newEnv, enclos = newEnv)
3474           })
3475 
3476 #' Compactly display the structure of a dataset
3477 #'
3478 #' Display the structure of a SparkDataFrame, including column names, column types, as well as a
3479 #' a small sample of rows.
3480 #'
3481 #' @name str
3482 #' @rdname str
3483 #' @aliases str,SparkDataFrame-method
3484 #' @family SparkDataFrame functions
3485 #' @param object a SparkDataFrame
3486 #' @examples
3487 #' \dontrun{
3488 #' # Create a SparkDataFrame from the Iris dataset
3489 #' irisDF <- createDataFrame(iris)
3490 #'
3491 #' # Show the structure of the SparkDataFrame
3492 #' str(irisDF)
3493 #' }
3494 #' @note str since 1.6.1
3495 setMethod("str",
3496           signature(object = "SparkDataFrame"),
3497           function(object) {
3498 
3499             # TODO: These could be made global parameters, though in R it's not the case
3500             MAX_CHAR_PER_ROW <- 120
3501             MAX_COLS <- 100
3502 
3503             # Get the column names and types of the DataFrame
3504             names <- names(object)
3505             types <- coltypes(object)
3506 
3507             # Get the first elements of the dataset. Limit number of columns accordingly
3508             localDF <- if (ncol(object) > MAX_COLS) {
3509               head(object[, c(1:MAX_COLS)])
3510             } else {
3511               head(object)
3512             }
3513 
3514             # The number of observations will not be displayed as computing the
3515             # number of rows is a very expensive operation
3516             cat(paste0("'", class(object), "': ", length(names), " variables:\n"))
3517 
3518             if (nrow(localDF) > 0) {
3519               for (i in seq_len(ncol(localDF))) {
3520                 # Get the first elements for each column
3521 
3522                 firstElements <- if (types[i] == "character") {
3523                   paste(paste0("\"", localDF[, i], "\""), collapse = " ")
3524                 } else {
3525                   paste(localDF[, i], collapse = " ")
3526                 }
3527 
3528                 # Add the corresponding number of spaces for alignment
3529                 spaces <- paste(rep(" ", max(nchar(names) - nchar(names[i]))), collapse = "")
3530 
3531                 # Get the short type. For 'character', it would be 'chr';
3532                 # 'for numeric', it's 'num', etc.
3533                 dataType <- SHORT_TYPES[[types[i]]]
3534                 if (is.null(dataType)) {
3535                   dataType <- substring(types[i], 1, 3)
3536                 }
3537 
3538                 # Concatenate the colnames, coltypes, and first
3539                 # elements of each column
3540                 line <- paste0(" $ ", names[i], spaces, ": ",
3541                                dataType, " ", firstElements)
3542 
3543                 # Chop off extra characters if this is too long
3544                 cat(substr(line, 1, MAX_CHAR_PER_ROW))
3545                 cat("\n")
3546               }
3547 
3548               if (ncol(localDF) < ncol(object)) {
3549                 cat(paste0("\nDisplaying first ", ncol(localDF), " columns only."))
3550               }
3551             }
3552           })
3553 
3554 #' drop
3555 #'
3556 #' Returns a new SparkDataFrame with columns dropped.
3557 #' This is a no-op if schema doesn't contain column name(s).
3558 #'
3559 #' @param x a SparkDataFrame.
3560 #' @param col a character vector of column names or a Column.
3561 #' @param ... further arguments to be passed to or from other methods.
3562 #' @return A SparkDataFrame.
3563 #'
3564 #' @family SparkDataFrame functions
3565 #' @rdname drop
3566 #' @name drop
3567 #' @aliases drop,SparkDataFrame-method
3568 #' @examples
3569 #'\dontrun{
3570 #' sparkR.session()
3571 #' path <- "path/to/file.json"
3572 #' df <- read.json(path)
3573 #' drop(df, "col1")
3574 #' drop(df, c("col1", "col2"))
3575 #' drop(df, df$col1)
3576 #' }
3577 #' @note drop since 2.0.0
3578 setMethod("drop",
3579           signature(x = "SparkDataFrame"),
3580           function(x, col) {
3581             stopifnot(class(col) == "character" || class(col) == "Column")
3582 
3583             if (class(col) == "Column") {
3584               sdf <- callJMethod(x@sdf, "drop", col@jc)
3585             } else {
3586               sdf <- callJMethod(x@sdf, "drop", as.list(col))
3587             }
3588             dataFrame(sdf)
3589           })
3590 
3591 # Expose base::drop
3592 #' @name drop
3593 #' @rdname drop
3594 #' @aliases drop,ANY-method
3595 setMethod("drop",
3596           signature(x = "ANY"),
3597           function(x) {
3598             base::drop(x)
3599           })
3600 
3601 #' Compute histogram statistics for given column
3602 #'
3603 #' This function computes a histogram for a given SparkR Column.
3604 #'
3605 #' @name histogram
3606 #' @param nbins the number of bins (optional). Default value is 10.
3607 #' @param col the column as Character string or a Column to build the histogram from.
3608 #' @param df the SparkDataFrame containing the Column to build the histogram from.
3609 #' @return a data.frame with the histogram statistics, i.e., counts and centroids.
3610 #' @rdname histogram
3611 #' @aliases histogram,SparkDataFrame,characterOrColumn-method
3612 #' @family SparkDataFrame functions
3613 #' @examples
3614 #' \dontrun{
3615 #'
3616 #' # Create a SparkDataFrame from the Iris dataset
3617 #' irisDF <- createDataFrame(iris)
3618 #'
3619 #' # Compute histogram statistics
3620 #' histStats <- histogram(irisDF, irisDF$Sepal_Length, nbins = 12)
3621 #'
3622 #' # Once SparkR has computed the histogram statistics, the histogram can be
3623 #' # rendered using the ggplot2 library:
3624 #'
3625 #' require(ggplot2)
3626 #' plot <- ggplot(histStats, aes(x = centroids, y = counts)) +
3627 #'         geom_bar(stat = "identity") +
3628 #'         xlab("Sepal_Length") + ylab("Frequency")
3629 #' }
3630 #' @note histogram since 2.0.0
3631 setMethod("histogram",
3632           signature(df = "SparkDataFrame", col = "characterOrColumn"),
3633           function(df, col, nbins = 10) {
3634             # Validate nbins
3635             if (nbins < 2) {
3636               stop("The number of bins must be a positive integer number greater than 1.")
3637             }
3638 
3639             # Round nbins to the smallest integer
3640             nbins <- floor(nbins)
3641 
3642             # Validate col
3643             if (is.null(col)) {
3644               stop("col must be specified.")
3645             }
3646 
3647             colname <- col
3648             x <- if (class(col) == "character") {
3649               if (!colname %in% names(df)) {
3650                 stop("Specified colname does not belong to the given SparkDataFrame.")
3651               }
3652 
3653               # Filter NA values in the target column and remove all other columns
3654               df <- na.omit(df[, colname, drop = F])
3655               getColumn(df, colname)
3656 
3657             } else if (class(col) == "Column") {
3658 
3659               # The given column needs to be appended to the SparkDataFrame so that we can
3660               # use method describe() to compute statistics in one single pass. The new
3661               # column must have a name that doesn't exist in the dataset.
3662               # To do so, we generate a random column name with more characters than the
3663               # longest colname in the dataset, but no more than 100 (think of a UUID).
3664               # This column name will never be visible to the user, so the name is irrelevant.
3665               # Limiting the colname length to 100 makes debugging easier and it does
3666               # introduce a negligible probability of collision: assuming the user has 1 million
3667               # columns AND all of them have names 100 characters long (which is very unlikely),
3668               # AND they run 1 billion histograms, the probability of collision will roughly be
3669               # 1 in 4.4 x 10 ^ 96
3670               colname <- paste(base::sample(c(letters, LETTERS),
3671                                              size = min(max(nchar(colnames(df))) + 1, 100),
3672                                              replace = TRUE),
3673                                collapse = "")
3674 
3675               # Append the given column to the dataset. This is to support Columns that
3676               # don't belong to the SparkDataFrame but are rather expressions
3677               df <- withColumn(df, colname, col)
3678 
3679               # Filter NA values in the target column. Cannot remove all other columns
3680               # since given Column may be an expression on one or more existing columns
3681               df <- na.omit(df)
3682 
3683               col
3684             }
3685 
3686             stats <- collect(describe(df[, colname, drop = F]))
3687             min <- as.numeric(stats[4, 2])
3688             max <- as.numeric(stats[5, 2])
3689 
3690             # Normalize the data
3691             xnorm <- (x - min) / (max - min)
3692 
3693             # Round the data to 4 significant digits. This is to avoid rounding issues.
3694             xnorm <- cast(xnorm * 10000, "integer") / 10000.0
3695 
3696             # Since min = 0, max = 1 (data is already normalized)
3697             normBinSize <- 1 / nbins
3698             binsize <- (max - min) / nbins
3699             approxBins <- xnorm / normBinSize
3700 
3701             # Adjust values that are equal to the upper bound of each bin
3702             bins <- cast(approxBins -
3703                            ifelse(approxBins == cast(approxBins, "integer") & x != min, 1, 0),
3704                          "integer")
3705 
3706             df$bins <- bins
3707             histStats <- collect(count(groupBy(df, "bins")))
3708             names(histStats) <- c("bins", "counts")
3709 
3710             # Fill bins with zero counts
3711             y <- data.frame("bins" = seq(0, nbins - 1))
3712             histStats <- merge(histStats, y, all.x = T, all.y = T)
3713             histStats[is.na(histStats$count), 2] <- 0
3714 
3715             # Compute centroids
3716             histStats$centroids <- histStats$bins * binsize + min + binsize / 2
3717 
3718             # Return the statistics
3719             return(histStats)
3720           })
3721 
3722 #' Save the content of SparkDataFrame to an external database table via JDBC.
3723 #'
3724 #' Save the content of the SparkDataFrame to an external database table via JDBC. Additional JDBC
3725 #' database connection properties can be set (...)
3726 #'
3727 #' Also, mode is used to specify the behavior of the save operation when
3728 #' data already exists in the data source. There are four modes:
3729 #' \itemize{
3730 #'   \item 'append': Contents of this SparkDataFrame are expected to be appended to existing data.
3731 #'   \item 'overwrite': Existing data is expected to be overwritten by the contents of this
3732 #'         SparkDataFrame.
3733 #'   \item 'error' or 'errorifexists': An exception is expected to be thrown.
3734 #'   \item 'ignore': The save operation is expected to not save the contents of the SparkDataFrame
3735 #'         and to not change the existing data.
3736 #' }
3737 #'
3738 #' @param x a SparkDataFrame.
3739 #' @param url JDBC database url of the form \code{jdbc:subprotocol:subname}.
3740 #' @param tableName yhe name of the table in the external database.
3741 #' @param mode one of 'append', 'overwrite', 'error', 'errorifexists', 'ignore'
3742 #'             save mode (it is 'error' by default)
3743 #' @param ... additional JDBC database connection properties.
3744 #' @family SparkDataFrame functions
3745 #' @rdname write.jdbc
3746 #' @name write.jdbc
3747 #' @aliases write.jdbc,SparkDataFrame,character,character-method
3748 #' @examples
3749 #'\dontrun{
3750 #' sparkR.session()
3751 #' jdbcUrl <- "jdbc:mysql://localhost:3306/databasename"
3752 #' write.jdbc(df, jdbcUrl, "table", user = "username", password = "password")
3753 #' }
3754 #' @note write.jdbc since 2.0.0
3755 setMethod("write.jdbc",
3756           signature(x = "SparkDataFrame", url = "character", tableName = "character"),
3757           function(x, url, tableName, mode = "error", ...) {
3758             jprops <- varargsToJProperties(...)
3759             write <- callJMethod(x@sdf, "write")
3760             write <- setWriteMode(write, mode)
3761             invisible(handledCallJMethod(write, "jdbc", url, tableName, jprops))
3762           })
3763 
3764 #' randomSplit
3765 #'
3766 #' Return a list of randomly split dataframes with the provided weights.
3767 #'
3768 #' @param x A SparkDataFrame
3769 #' @param weights A vector of weights for splits, will be normalized if they don't sum to 1
3770 #' @param seed A seed to use for random split
3771 #'
3772 #' @family SparkDataFrame functions
3773 #' @aliases randomSplit,SparkDataFrame,numeric-method
3774 #' @rdname randomSplit
3775 #' @name randomSplit
3776 #' @examples
3777 #'\dontrun{
3778 #' sparkR.session()
3779 #' df <- createDataFrame(data.frame(id = 1:1000))
3780 #' df_list <- randomSplit(df, c(2, 3, 5), 0)
3781 #' # df_list contains 3 SparkDataFrames with each having about 200, 300 and 500 rows respectively
3782 #' sapply(df_list, count)
3783 #' }
3784 #' @note randomSplit since 2.0.0
3785 setMethod("randomSplit",
3786           signature(x = "SparkDataFrame", weights = "numeric"),
3787           function(x, weights, seed) {
3788             if (!all(sapply(weights, function(c) { c >= 0 }))) {
3789               stop("all weight values should not be negative")
3790             }
3791             normalized_list <- as.list(weights / sum(weights))
3792             if (!missing(seed)) {
3793               sdfs <- callJMethod(x@sdf, "randomSplit", normalized_list, as.integer(seed))
3794             } else {
3795               sdfs <- callJMethod(x@sdf, "randomSplit", normalized_list)
3796             }
3797             sapply(sdfs, dataFrame)
3798           })
3799 
3800 #' getNumPartitions
3801 #'
3802 #' Return the number of partitions
3803 #'
3804 #' @param x A SparkDataFrame
3805 #' @family SparkDataFrame functions
3806 #' @aliases getNumPartitions,SparkDataFrame-method
3807 #' @rdname getNumPartitions
3808 #' @name getNumPartitions
3809 #' @examples
3810 #'\dontrun{
3811 #' sparkR.session()
3812 #' df <- createDataFrame(cars, numPartitions = 2)
3813 #' getNumPartitions(df)
3814 #' }
3815 #' @note getNumPartitions since 2.1.1
3816 setMethod("getNumPartitions",
3817           signature(x = "SparkDataFrame"),
3818           function(x) {
3819             callJMethod(callJMethod(x@sdf, "rdd"), "getNumPartitions")
3820           })
3821 
3822 #' isStreaming
3823 #'
3824 #' Returns TRUE if this SparkDataFrame contains one or more sources that continuously return data
3825 #' as it arrives. A dataset that reads data from a streaming source must be executed as a
3826 #' \code{StreamingQuery} using \code{write.stream}.
3827 #'
3828 #' @param x A SparkDataFrame
3829 #' @return TRUE if this SparkDataFrame is from a streaming source
3830 #' @family SparkDataFrame functions
3831 #' @aliases isStreaming,SparkDataFrame-method
3832 #' @rdname isStreaming
3833 #' @name isStreaming
3834 #' @seealso \link{read.stream} \link{write.stream}
3835 #' @examples
3836 #'\dontrun{
3837 #' sparkR.session()
3838 #' df <- read.stream("socket", host = "localhost", port = 9999)
3839 #' isStreaming(df)
3840 #' }
3841 #' @note isStreaming since 2.2.0
3842 #' @note experimental
3843 setMethod("isStreaming",
3844           signature(x = "SparkDataFrame"),
3845           function(x) {
3846             callJMethod(x@sdf, "isStreaming")
3847           })
3848 
3849 #' Write the streaming SparkDataFrame to a data source.
3850 #'
3851 #' The data source is specified by the \code{source} and a set of options (...).
3852 #' If \code{source} is not specified, the default data source configured by
3853 #' spark.sql.sources.default will be used.
3854 #'
3855 #' Additionally, \code{outputMode} specifies how data of a streaming SparkDataFrame is written to a
3856 #' output data source. There are three modes:
3857 #' \itemize{
3858 #'   \item append: Only the new rows in the streaming SparkDataFrame will be written out. This
3859 #'                 output mode can be only be used in queries that do not contain any aggregation.
3860 #'   \item complete: All the rows in the streaming SparkDataFrame will be written out every time
3861 #'                   there are some updates. This output mode can only be used in queries that
3862 #'                   contain aggregations.
3863 #'   \item update: Only the rows that were updated in the streaming SparkDataFrame will be written
3864 #'                 out every time there are some updates. If the query doesn't contain aggregations,
3865 #'                 it will be equivalent to \code{append} mode.
3866 #' }
3867 #'
3868 #' @param df a streaming SparkDataFrame.
3869 #' @param source a name for external data source.
3870 #' @param outputMode one of 'append', 'complete', 'update'.
3871 #' @param partitionBy a name or a list of names of columns to partition the output by on the file
3872 #'        system. If specified, the output is laid out on the file system similar to Hive's
3873 #'        partitioning scheme.
3874 #' @param trigger.processingTime a processing time interval as a string, e.g. '5 seconds',
3875 #'        '1 minute'. This is a trigger that runs a query periodically based on the processing
3876 #'        time. If value is '0 seconds', the query will run as fast as possible, this is the
3877 #'        default. Only one trigger can be set.
3878 #' @param trigger.once a logical, must be set to \code{TRUE}. This is a trigger that processes only
3879 #'        one batch of data in a streaming query then terminates the query. Only one trigger can be
3880 #'        set.
3881 #' @param ... additional external data source specific named options.
3882 #'
3883 #' @family SparkDataFrame functions
3884 #' @seealso \link{read.stream}
3885 #' @aliases write.stream,SparkDataFrame-method
3886 #' @rdname write.stream
3887 #' @name write.stream
3888 #' @examples
3889 #'\dontrun{
3890 #' sparkR.session()
3891 #' df <- read.stream("socket", host = "localhost", port = 9999)
3892 #' isStreaming(df)
3893 #' wordCounts <- count(group_by(df, "value"))
3894 #'
3895 #' # console
3896 #' q <- write.stream(wordCounts, "console", outputMode = "complete")
3897 #' # text stream
3898 #' q <- write.stream(df, "text", path = "/home/user/out", checkpointLocation = "/home/user/cp"
3899 #'                   partitionBy = c("year", "month"), trigger.processingTime = "30 seconds")
3900 #' # memory stream
3901 #' q <- write.stream(wordCounts, "memory", queryName = "outs", outputMode = "complete")
3902 #' head(sql("SELECT * from outs"))
3903 #' queryName(q)
3904 #'
3905 #' stopQuery(q)
3906 #' }
3907 #' @note write.stream since 2.2.0
3908 #' @note experimental
3909 setMethod("write.stream",
3910           signature(df = "SparkDataFrame"),
3911           function(df, source = NULL, outputMode = NULL, partitionBy = NULL,
3912                    trigger.processingTime = NULL, trigger.once = NULL, ...) {
3913             if (!is.null(source) && !is.character(source)) {
3914               stop("source should be character, NULL or omitted. It is the data source specified ",
3915                    "in 'spark.sql.sources.default' configuration by default.")
3916             }
3917             if (!is.null(outputMode) && !is.character(outputMode)) {
3918               stop("outputMode should be character or omitted.")
3919             }
3920             if (is.null(source)) {
3921               source <- getDefaultSqlSource()
3922             }
3923             cols <- NULL
3924             if (!is.null(partitionBy)) {
3925               if (!all(sapply(partitionBy, function(c) { is.character(c) }))) {
3926                 stop("All partitionBy column names should be characters.")
3927               }
3928               cols <- as.list(partitionBy)
3929             }
3930             jtrigger <- NULL
3931             if (!is.null(trigger.processingTime) && !is.na(trigger.processingTime)) {
3932               if (!is.null(trigger.once)) {
3933                 stop("Multiple triggers not allowed.")
3934               }
3935               interval <- as.character(trigger.processingTime)
3936               if (nchar(interval) == 0) {
3937                 stop("Value for trigger.processingTime must be a non-empty string.")
3938               }
3939               jtrigger <- handledCallJStatic("org.apache.spark.sql.streaming.Trigger",
3940                                              "ProcessingTime",
3941                                              interval)
3942             } else if (!is.null(trigger.once) && !is.na(trigger.once)) {
3943               if (!is.logical(trigger.once) || !trigger.once) {
3944                 stop("Value for trigger.once must be TRUE.")
3945               }
3946               jtrigger <- callJStatic("org.apache.spark.sql.streaming.Trigger", "Once")
3947             }
3948             options <- varargsToStrEnv(...)
3949             write <- handledCallJMethod(df@sdf, "writeStream")
3950             write <- callJMethod(write, "format", source)
3951             if (!is.null(outputMode)) {
3952               write <- callJMethod(write, "outputMode", outputMode)
3953             }
3954             if (!is.null(cols)) {
3955               write <- callJMethod(write, "partitionBy", cols)
3956             }
3957             if (!is.null(jtrigger)) {
3958               write <- callJMethod(write, "trigger", jtrigger)
3959             }
3960             write <- callJMethod(write, "options", options)
3961             ssq <- handledCallJMethod(write, "start")
3962             streamingQuery(ssq)
3963           })
3964 
3965 #' checkpoint
3966 #'
3967 #' Returns a checkpointed version of this SparkDataFrame. Checkpointing can be used to truncate the
3968 #' logical plan, which is especially useful in iterative algorithms where the plan may grow
3969 #' exponentially. It will be saved to files inside the checkpoint directory set with
3970 #' \code{setCheckpointDir}
3971 #'
3972 #' @param x A SparkDataFrame
3973 #' @param eager whether to checkpoint this SparkDataFrame immediately
3974 #' @return a new checkpointed SparkDataFrame
3975 #' @family SparkDataFrame functions
3976 #' @aliases checkpoint,SparkDataFrame-method
3977 #' @rdname checkpoint
3978 #' @name checkpoint
3979 #' @seealso \link{setCheckpointDir}
3980 #' @examples
3981 #'\dontrun{
3982 #' setCheckpointDir("/checkpoint")
3983 #' df <- checkpoint(df)
3984 #' }
3985 #' @note checkpoint since 2.2.0
3986 setMethod("checkpoint",
3987           signature(x = "SparkDataFrame"),
3988           function(x, eager = TRUE) {
3989             df <- callJMethod(x@sdf, "checkpoint", as.logical(eager))
3990             dataFrame(df)
3991           })
3992 
3993 #' localCheckpoint
3994 #'
3995 #' Returns a locally checkpointed version of this SparkDataFrame. Checkpointing can be used to
3996 #' truncate the logical plan, which is especially useful in iterative algorithms where the plan
3997 #' may grow exponentially. Local checkpoints are stored in the executors using the caching
3998 #' subsystem and therefore they are not reliable.
3999 #'
4000 #' @param x A SparkDataFrame
4001 #' @param eager whether to locally checkpoint this SparkDataFrame immediately
4002 #' @return a new locally checkpointed SparkDataFrame
4003 #' @family SparkDataFrame functions
4004 #' @aliases localCheckpoint,SparkDataFrame-method
4005 #' @rdname localCheckpoint
4006 #' @name localCheckpoint
4007 #' @examples
4008 #'\dontrun{
4009 #' df <- localCheckpoint(df)
4010 #' }
4011 #' @note localCheckpoint since 2.3.0
4012 setMethod("localCheckpoint",
4013           signature(x = "SparkDataFrame"),
4014           function(x, eager = TRUE) {
4015             df <- callJMethod(x@sdf, "localCheckpoint", as.logical(eager))
4016             dataFrame(df)
4017           })
4018 
4019 #' cube
4020 #'
4021 #' Create a multi-dimensional cube for the SparkDataFrame using the specified columns.
4022 #'
4023 #' If grouping expression is missing \code{cube} creates a single global aggregate and is
4024 #' equivalent to direct application of \link{agg}.
4025 #'
4026 #' @param x a SparkDataFrame.
4027 #' @param ... character name(s) or Column(s) to group on.
4028 #' @return A GroupedData.
4029 #' @family SparkDataFrame functions
4030 #' @aliases cube,SparkDataFrame-method
4031 #' @rdname cube
4032 #' @name cube
4033 #' @examples
4034 #' \dontrun{
4035 #' df <- createDataFrame(mtcars)
4036 #' mean(cube(df, "cyl", "gear", "am"), "mpg")
4037 #'
4038 #' # Following calls are equivalent
4039 #' agg(cube(df), mean(df$mpg))
4040 #' agg(df, mean(df$mpg))
4041 #' }
4042 #' @note cube since 2.3.0
4043 #' @seealso \link{agg}, \link{groupBy}, \link{rollup}
4044 setMethod("cube",
4045           signature(x = "SparkDataFrame"),
4046           function(x, ...) {
4047             cols <- list(...)
4048             jcol <- lapply(cols, function(x) if (class(x) == "Column") x@jc else column(x)@jc)
4049             sgd <- callJMethod(x@sdf, "cube", jcol)
4050             groupedData(sgd)
4051           })
4052 
4053 #' rollup
4054 #'
4055 #' Create a multi-dimensional rollup for the SparkDataFrame using the specified columns.
4056 #'
4057 #' If grouping expression is missing \code{rollup} creates a single global aggregate and is
4058 #' equivalent to direct application of \link{agg}.
4059 #'
4060 #' @param x a SparkDataFrame.
4061 #' @param ... character name(s) or Column(s) to group on.
4062 #' @return A GroupedData.
4063 #' @family SparkDataFrame functions
4064 #' @aliases rollup,SparkDataFrame-method
4065 #' @rdname rollup
4066 #' @name rollup
4067 #' @examples
4068 #'\dontrun{
4069 #' df <- createDataFrame(mtcars)
4070 #' mean(rollup(df, "cyl", "gear", "am"), "mpg")
4071 #'
4072 #' # Following calls are equivalent
4073 #' agg(rollup(df), mean(df$mpg))
4074 #' agg(df, mean(df$mpg))
4075 #' }
4076 #' @note rollup since 2.3.0
4077 #' @seealso \link{agg}, \link{cube}, \link{groupBy}
4078 setMethod("rollup",
4079           signature(x = "SparkDataFrame"),
4080           function(x, ...) {
4081             cols <- list(...)
4082             jcol <- lapply(cols, function(x) if (class(x) == "Column") x@jc else column(x)@jc)
4083             sgd <- callJMethod(x@sdf, "rollup", jcol)
4084             groupedData(sgd)
4085           })
4086 
4087 #' hint
4088 #'
4089 #' Specifies execution plan hint and return a new SparkDataFrame.
4090 #'
4091 #' @param x a SparkDataFrame.
4092 #' @param name a name of the hint.
4093 #' @param ... optional parameters for the hint.
4094 #' @return A SparkDataFrame.
4095 #' @family SparkDataFrame functions
4096 #' @aliases hint,SparkDataFrame,character-method
4097 #' @rdname hint
4098 #' @name hint
4099 #' @examples
4100 #' \dontrun{
4101 #' df <- createDataFrame(mtcars)
4102 #' avg_mpg <- mean(groupBy(createDataFrame(mtcars), "cyl"), "mpg")
4103 #'
4104 #' head(join(df, hint(avg_mpg, "broadcast"), df$cyl == avg_mpg$cyl))
4105 #' }
4106 #' @note hint since 2.2.0
4107 setMethod("hint",
4108           signature(x = "SparkDataFrame", name = "character"),
4109           function(x, name, ...) {
4110             parameters <- list(...)
4111             if (!all(sapply(parameters, function(y) {
4112               if (is.character(y) || is.numeric(y)) {
4113                 TRUE
4114               } else if (is.list(y)) {
4115                 all(sapply(y, function(z) { is.character(z) || is.numeric(z) }))
4116               } else {
4117                 FALSE
4118               }
4119             }))) {
4120               stop("sql hint should be character, numeric, or list with character or numeric.")
4121             }
4122             jdf <- callJMethod(x@sdf, "hint", name, parameters)
4123             dataFrame(jdf)
4124           })
4125 
4126 #' alias
4127 #'
4128 #' @aliases alias,SparkDataFrame-method
4129 #' @family SparkDataFrame functions
4130 #' @rdname alias
4131 #' @name alias
4132 #' @examples
4133 #' \dontrun{
4134 #' df <- alias(createDataFrame(mtcars), "mtcars")
4135 #' avg_mpg <- alias(agg(groupBy(df, df$cyl), avg(df$mpg)), "avg_mpg")
4136 #'
4137 #' head(select(df, column("mtcars.mpg")))
4138 #' head(join(df, avg_mpg, column("mtcars.cyl") == column("avg_mpg.cyl")))
4139 #' }
4140 #' @note alias(SparkDataFrame) since 2.3.0
4141 setMethod("alias",
4142           signature(object = "SparkDataFrame"),
4143           function(object, data) {
4144             stopifnot(is.character(data))
4145             sdf <- callJMethod(object@sdf, "alias", data)
4146             dataFrame(sdf)
4147           })
4148 
4149 #' broadcast
4150 #'
4151 #' Return a new SparkDataFrame marked as small enough for use in broadcast joins.
4152 #'
4153 #' Equivalent to \code{hint(x, "broadcast")}.
4154 #'
4155 #' @param x a SparkDataFrame.
4156 #' @return a SparkDataFrame.
4157 #'
4158 #' @aliases broadcast,SparkDataFrame-method
4159 #' @family SparkDataFrame functions
4160 #' @rdname broadcast
4161 #' @name broadcast
4162 #' @examples
4163 #' \dontrun{
4164 #' df <- createDataFrame(mtcars)
4165 #' avg_mpg <- mean(groupBy(createDataFrame(mtcars), "cyl"), "mpg")
4166 #'
4167 #' head(join(df, broadcast(avg_mpg), df$cyl == avg_mpg$cyl))
4168 #' }
4169 #' @note broadcast since 2.3.0
4170 setMethod("broadcast",
4171           signature(x = "SparkDataFrame"),
4172           function(x) {
4173             sdf <- callJStatic("org.apache.spark.sql.functions", "broadcast", x@sdf)
4174             dataFrame(sdf)
4175           })
4176 
4177 #' withWatermark
4178 #'
4179 #' Defines an event time watermark for this streaming SparkDataFrame. A watermark tracks a point in
4180 #' time before which we assume no more late data is going to arrive.
4181 #'
4182 #' Spark will use this watermark for several purposes:
4183 #' \itemize{
4184 #'  \item To know when a given time window aggregation can be finalized and thus can be emitted
4185 #' when using output modes that do not allow updates.
4186 #'  \item To minimize the amount of state that we need to keep for on-going aggregations.
4187 #' }
4188 #' The current watermark is computed by looking at the \code{MAX(eventTime)} seen across
4189 #' all of the partitions in the query minus a user specified \code{delayThreshold}. Due to the cost
4190 #' of coordinating this value across partitions, the actual watermark used is only guaranteed
4191 #' to be at least \code{delayThreshold} behind the actual event time.  In some cases we may still
4192 #' process records that arrive more than \code{delayThreshold} late.
4193 #'
4194 #' @param x a streaming SparkDataFrame
4195 #' @param eventTime a string specifying the name of the Column that contains the event time of the
4196 #'                  row.
4197 #' @param delayThreshold a string specifying the minimum delay to wait to data to arrive late,
4198 #'                       relative to the latest record that has been processed in the form of an
4199 #'                       interval (e.g. "1 minute" or "5 hours"). NOTE: This should not be negative.
4200 #' @return a SparkDataFrame.
4201 #' @aliases withWatermark,SparkDataFrame,character,character-method
4202 #' @family SparkDataFrame functions
4203 #' @rdname withWatermark
4204 #' @name withWatermark
4205 #' @examples
4206 #' \dontrun{
4207 #' sparkR.session()
4208 #' schema <- structType(structField("time", "timestamp"), structField("value", "double"))
4209 #' df <- read.stream("json", path = jsonDir, schema = schema, maxFilesPerTrigger = 1)
4210 #' df <- withWatermark(df, "time", "10 minutes")
4211 #' }
4212 #' @note withWatermark since 2.3.0
4213 setMethod("withWatermark",
4214           signature(x = "SparkDataFrame", eventTime = "character", delayThreshold = "character"),
4215           function(x, eventTime, delayThreshold) {
4216             sdf <- callJMethod(x@sdf, "withWatermark", eventTime, delayThreshold)
4217             dataFrame(sdf)
4218           })