Back to home page

OSCL-LXR

 
 

    


0001 #
0002 # Licensed to the Apache Software Foundation (ASF) under one or more
0003 # contributor license agreements.  See the NOTICE file distributed with
0004 # this work for additional information regarding copyright ownership.
0005 # The ASF licenses this file to You under the Apache License, Version 2.0
0006 # (the "License"); you may not use this file except in compliance with
0007 # the License.  You may obtain a copy of the License at
0008 #
0009 #    http://www.apache.org/licenses/LICENSE-2.0
0010 #
0011 # Unless required by applicable law or agreed to in writing, software
0012 # distributed under the License is distributed on an "AS IS" BASIS,
0013 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014 # See the License for the specific language governing permissions and
0015 # limitations under the License.
0016 #
0017 
0018 # RDD in R implemented in S4 OO system.
0019 
0020 setOldClass("jobj")
0021 
0022 #' S4 class that represents an RDD
0023 #'
0024 #' RDD can be created using functions like
0025 #'              \code{parallelize}, \code{textFile} etc.
0026 #'
0027 #' @rdname RDD
0028 #' @seealso parallelize, textFile
0029 #' @slot env An R environment that stores bookkeeping states of the RDD
0030 #' @slot jrdd Java object reference to the backing JavaRDD
0031 #' to an RDD
0032 #' @noRd
0033 setClass("RDD",
0034          slots = list(env = "environment",
0035                       jrdd = "jobj"))
0036 
0037 setClass("PipelinedRDD",
0038          slots = list(prev = "RDD",
0039                       func = "function",
0040                       prev_jrdd = "jobj"),
0041          contains = "RDD")
0042 
0043 setMethod("initialize", "RDD", function(.Object, jrdd, serializedMode,
0044                                         isCached, isCheckpointed) {
0045   # Check that RDD constructor is using the correct version of serializedMode
0046   stopifnot(class(serializedMode) == "character")
0047   stopifnot(serializedMode %in% c("byte", "string", "row"))
0048   # RDD has three serialization types:
0049   # byte: The RDD stores data serialized in R.
0050   # string: The RDD stores data as strings.
0051   # row: The RDD stores the serialized rows of a SparkDataFrame.
0052 
0053   # We use an environment to store mutable states inside an RDD object.
0054   # Note that R's call-by-value semantics makes modifying slots inside an
0055   # object (passed as an argument into a function, such as cache()) difficult:
0056   # i.e. one needs to make a copy of the RDD object and sets the new slot value
0057   # there.
0058 
0059   # The slots are inheritable from superclass. Here, both `env' and `jrdd' are
0060   # inherited from RDD, but only the former is used.
0061   .Object@env <- new.env()
0062   .Object@env$isCached <- isCached
0063   .Object@env$isCheckpointed <- isCheckpointed
0064   .Object@env$serializedMode <- serializedMode
0065 
0066   .Object@jrdd <- jrdd
0067   .Object
0068 })
0069 
0070 setMethod("showRDD", "RDD",
0071           function(object) {
0072               cat(paste0(callJMethod(getJRDD(object), "toString"), "\n"))
0073           })
0074 
0075 setMethod("initialize", "PipelinedRDD", function(.Object, prev, func, jrdd_val) {
0076   .Object@env <- new.env()
0077   .Object@env$isCached <- FALSE
0078   .Object@env$isCheckpointed <- FALSE
0079   .Object@env$jrdd_val <- jrdd_val
0080   if (!is.null(jrdd_val)) {
0081     # This tracks the serialization mode for jrdd_val
0082     .Object@env$serializedMode <- prev@env$serializedMode
0083   }
0084 
0085   .Object@prev <- prev
0086 
0087   isPipelinable <- function(rdd) {
0088     e <- rdd@env
0089     # nolint start
0090     !(e$isCached || e$isCheckpointed)
0091     # nolint end
0092   }
0093 
0094   if (!inherits(prev, "PipelinedRDD") || !isPipelinable(prev)) {
0095     # This transformation is the first in its stage:
0096     .Object@func <- cleanClosure(func)
0097     .Object@prev_jrdd <- getJRDD(prev)
0098     .Object@env$prev_serializedMode <- prev@env$serializedMode
0099     # NOTE: We use prev_serializedMode to track the serialization mode of prev_JRDD
0100     # prev_serializedMode is used during the delayed computation of JRDD in getJRDD
0101   } else {
0102     pipelinedFunc <- function(partIndex, part) {
0103       f <- prev@func
0104       func(partIndex, f(partIndex, part))
0105     }
0106     .Object@func <- cleanClosure(pipelinedFunc)
0107     .Object@prev_jrdd <- prev@prev_jrdd # maintain the pipeline
0108     # Get the serialization mode of the parent RDD
0109     .Object@env$prev_serializedMode <- prev@env$prev_serializedMode
0110   }
0111 
0112   .Object
0113 })
0114 
0115 #' @rdname RDD
0116 #' @noRd
0117 #' @param jrdd Java object reference to the backing JavaRDD
0118 #' @param serializedMode Use "byte" if the RDD stores data serialized in R, "string" if the RDD
0119 #' stores strings, and "row" if the RDD stores the rows of a SparkDataFrame
0120 #' @param isCached TRUE if the RDD is cached
0121 #' @param isCheckpointed TRUE if the RDD has been checkpointed
0122 RDD <- function(jrdd, serializedMode = "byte", isCached = FALSE,
0123                 isCheckpointed = FALSE) {
0124   new("RDD", jrdd, serializedMode, isCached, isCheckpointed)
0125 }
0126 
0127 PipelinedRDD <- function(prev, func) {
0128   new("PipelinedRDD", prev, func, NULL)
0129 }
0130 
0131 # Return the serialization mode for an RDD.
0132 setGeneric("getSerializedMode", function(rdd, ...) { standardGeneric("getSerializedMode") })
0133 # For normal RDDs we can directly read the serializedMode
0134 setMethod("getSerializedMode", signature(rdd = "RDD"), function(rdd) rdd@env$serializedMode)
0135 # For pipelined RDDs if jrdd_val is set then serializedMode should exist
0136 # if not we return the defaultSerialization mode of "byte" as we don't know the serialization
0137 # mode at this point in time.
0138 setMethod("getSerializedMode", signature(rdd = "PipelinedRDD"),
0139           function(rdd) {
0140             if (!is.null(rdd@env$jrdd_val)) {
0141               return(rdd@env$serializedMode)
0142             } else {
0143               return("byte")
0144             }
0145           })
0146 
0147 # The jrdd accessor function.
0148 setMethod("getJRDD", signature(rdd = "RDD"), function(rdd) rdd@jrdd)
0149 setMethod("getJRDD", signature(rdd = "PipelinedRDD"),
0150           function(rdd, serializedMode = "byte") {
0151             if (!is.null(rdd@env$jrdd_val)) {
0152               return(rdd@env$jrdd_val)
0153             }
0154 
0155             packageNamesArr <- serialize(.sparkREnv[[".packages"]],
0156                                          connection = NULL)
0157 
0158             broadcastArr <- lapply(ls(.broadcastNames),
0159                                    function(name) { get(name, .broadcastNames) })
0160 
0161             serializedFuncArr <- serialize(rdd@func, connection = NULL)
0162 
0163             prev_jrdd <- rdd@prev_jrdd
0164 
0165             if (serializedMode == "string") {
0166               rddRef <- newJObject("org.apache.spark.api.r.StringRRDD",
0167                                    callJMethod(prev_jrdd, "rdd"),
0168                                    serializedFuncArr,
0169                                    rdd@env$prev_serializedMode,
0170                                    packageNamesArr,
0171                                    broadcastArr,
0172                                    callJMethod(prev_jrdd, "classTag"))
0173             } else {
0174               rddRef <- newJObject("org.apache.spark.api.r.RRDD",
0175                                    callJMethod(prev_jrdd, "rdd"),
0176                                    serializedFuncArr,
0177                                    rdd@env$prev_serializedMode,
0178                                    serializedMode,
0179                                    packageNamesArr,
0180                                    broadcastArr,
0181                                    callJMethod(prev_jrdd, "classTag"))
0182             }
0183             # Save the serialization flag after we create a RRDD
0184             rdd@env$serializedMode <- serializedMode
0185             rdd@env$jrdd_val <- callJMethod(rddRef, "asJavaRDD")
0186             rdd@env$jrdd_val
0187           })
0188 
0189 setValidity("RDD",
0190             function(object) {
0191               jrdd <- getJRDD(object)
0192               cls <- callJMethod(jrdd, "getClass")
0193               className <- callJMethod(cls, "getName")
0194               if (grep("spark.api.java.*RDD*", className) == 1) {
0195                 TRUE
0196               } else {
0197                 paste("Invalid RDD class ", className)
0198               }
0199             })
0200 
0201 
0202 ############ Actions and Transformations ############
0203 
0204 #' Persist an RDD
0205 #'
0206 #' Persist this RDD with the default storage level (MEMORY_ONLY).
0207 #'
0208 #' @param x The RDD to cache
0209 #' @examples
0210 #'\dontrun{
0211 #' sc <- sparkR.init()
0212 #' rdd <- parallelize(sc, 1:10, 2L)
0213 #' cache(rdd)
0214 #'}
0215 #' @rdname cache-methods
0216 #' @aliases cache,RDD-method
0217 #' @noRd
0218 setMethod("cacheRDD",
0219           signature(x = "RDD"),
0220           function(x) {
0221             callJMethod(getJRDD(x), "cache")
0222             x@env$isCached <- TRUE
0223             x
0224           })
0225 
0226 #' Persist an RDD
0227 #'
0228 #' Persist this RDD with the specified storage level. For details of the
0229 #' supported storage levels, refer to
0230 #'\url{http://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence}.
0231 #'
0232 #' @param x The RDD to persist
0233 #' @param newLevel The new storage level to be assigned
0234 #' @examples
0235 #'\dontrun{
0236 #' sc <- sparkR.init()
0237 #' rdd <- parallelize(sc, 1:10, 2L)
0238 #' persistRDD(rdd, "MEMORY_AND_DISK")
0239 #'}
0240 #' @rdname persist
0241 #' @aliases persist,RDD-method
0242 #' @noRd
0243 setMethod("persistRDD",
0244           signature(x = "RDD", newLevel = "character"),
0245           function(x, newLevel = "MEMORY_ONLY") {
0246             callJMethod(getJRDD(x), "persist", getStorageLevel(newLevel))
0247             x@env$isCached <- TRUE
0248             x
0249           })
0250 
0251 #' Unpersist an RDD
0252 #'
0253 #' Mark the RDD as non-persistent, and remove all blocks for it from memory and
0254 #' disk.
0255 #'
0256 #' @param x The RDD to unpersist
0257 #' @examples
0258 #'\dontrun{
0259 #' sc <- sparkR.init()
0260 #' rdd <- parallelize(sc, 1:10, 2L)
0261 #' cache(rdd) # rdd@@env$isCached == TRUE
0262 #' unpersistRDD(rdd) # rdd@@env$isCached == FALSE
0263 #'}
0264 #' @rdname unpersist
0265 #' @aliases unpersist,RDD-method
0266 #' @noRd
0267 setMethod("unpersistRDD",
0268           signature(x = "RDD"),
0269           function(x) {
0270             callJMethod(getJRDD(x), "unpersist")
0271             x@env$isCached <- FALSE
0272             x
0273           })
0274 
0275 #' Checkpoint an RDD
0276 #'
0277 #' Mark this RDD for checkpointing. It will be saved to a file inside the
0278 #' checkpoint directory set with setCheckpointDir() and all references to its
0279 #' parent RDDs will be removed. This function must be called before any job has
0280 #' been executed on this RDD. It is strongly recommended that this RDD is
0281 #' persisted in memory, otherwise saving it on a file will require recomputation.
0282 #'
0283 #' @param x The RDD to checkpoint
0284 #' @examples
0285 #'\dontrun{
0286 #' sc <- sparkR.init()
0287 #' setCheckpointDir(sc, "checkpoint")
0288 #' rdd <- parallelize(sc, 1:10, 2L)
0289 #' checkpoint(rdd)
0290 #'}
0291 #' @rdname checkpoint-methods
0292 #' @aliases checkpoint,RDD-method
0293 #' @noRd
0294 setMethod("checkpointRDD",
0295           signature(x = "RDD"),
0296           function(x) {
0297             jrdd <- getJRDD(x)
0298             callJMethod(jrdd, "checkpoint")
0299             x@env$isCheckpointed <- TRUE
0300             x
0301           })
0302 
0303 #' Gets the number of partitions of an RDD
0304 #'
0305 #' @param x A RDD.
0306 #' @return the number of partitions of rdd as an integer.
0307 #' @examples
0308 #'\dontrun{
0309 #' sc <- sparkR.init()
0310 #' rdd <- parallelize(sc, 1:10, 2L)
0311 #' getNumPartitions(rdd)  # 2L
0312 #'}
0313 #' @rdname getNumPartitions
0314 #' @aliases getNumPartitions,RDD-method
0315 #' @noRd
0316 setMethod("getNumPartitionsRDD",
0317           signature(x = "RDD"),
0318           function(x) {
0319             callJMethod(getJRDD(x), "getNumPartitions")
0320           })
0321 
0322 #' Gets the number of partitions of an RDD, the same as getNumPartitions.
0323 #' But this function has been deprecated, please use getNumPartitions.
0324 #'
0325 #' @rdname getNumPartitions
0326 #' @aliases numPartitions,RDD-method
0327 #' @noRd
0328 setMethod("numPartitions",
0329           signature(x = "RDD"),
0330           function(x) {
0331             .Deprecated("getNumPartitions")
0332             getNumPartitionsRDD(x)
0333           })
0334 
0335 #' Collect elements of an RDD
0336 #'
0337 #' @description
0338 #' \code{collect} returns a list that contains all of the elements in this RDD.
0339 #'
0340 #' @param x The RDD to collect
0341 #' @param ... Other optional arguments to collect
0342 #' @param flatten FALSE if the list should not flattened
0343 #' @return a list containing elements in the RDD
0344 #' @examples
0345 #'\dontrun{
0346 #' sc <- sparkR.init()
0347 #' rdd <- parallelize(sc, 1:10, 2L)
0348 #' collectRDD(rdd) # list from 1 to 10
0349 #' collectPartition(rdd, 0L) # list from 1 to 5
0350 #'}
0351 #' @rdname collect-methods
0352 #' @aliases collect,RDD-method
0353 #' @noRd
0354 setMethod("collectRDD",
0355           signature(x = "RDD"),
0356           function(x, flatten = TRUE) {
0357             # Assumes a pairwise RDD is backed by a JavaPairRDD.
0358             collected <- callJMethod(getJRDD(x), "collect")
0359             convertJListToRList(collected, flatten,
0360               serializedMode = getSerializedMode(x))
0361           })
0362 
0363 
0364 #' @description
0365 #' \code{collectPartition} returns a list that contains all of the elements
0366 #' in the specified partition of the RDD.
0367 #' @param partitionId the partition to collect (starts from 0)
0368 #' @rdname collect-methods
0369 #' @aliases collectPartition,integer,RDD-method
0370 #' @noRd
0371 setMethod("collectPartition",
0372           signature(x = "RDD", partitionId = "integer"),
0373           function(x, partitionId) {
0374             jPartitionsList <- callJMethod(getJRDD(x),
0375                                            "collectPartitions",
0376                                            as.list(as.integer(partitionId)))
0377 
0378             jList <- jPartitionsList[[1]]
0379             convertJListToRList(jList, flatten = TRUE,
0380               serializedMode = getSerializedMode(x))
0381           })
0382 
0383 #' @description
0384 #' \code{collectAsMap} returns a named list as a map that contains all of the elements
0385 #' in a key-value pair RDD.
0386 #' @examples
0387 # nolint start
0388 #'\dontrun{
0389 #' sc <- sparkR.init()
0390 #' rdd <- parallelize(sc, list(list(1, 2), list(3, 4)), 2L)
0391 #' collectAsMap(rdd) # list(`1` = 2, `3` = 4)
0392 #'}
0393 # nolint end
0394 #' @rdname collect-methods
0395 #' @aliases collectAsMap,RDD-method
0396 #' @noRd
0397 setMethod("collectAsMap",
0398           signature(x = "RDD"),
0399           function(x) {
0400             pairList <- collectRDD(x)
0401             map <- new.env()
0402             lapply(pairList, function(i) { assign(as.character(i[[1]]), i[[2]], envir = map) })
0403             as.list(map)
0404           })
0405 
0406 #' Return the number of elements in the RDD.
0407 #'
0408 #' @param x The RDD to count
0409 #' @return number of elements in the RDD.
0410 #' @examples
0411 #'\dontrun{
0412 #' sc <- sparkR.init()
0413 #' rdd <- parallelize(sc, 1:10)
0414 #' countRDD(rdd) # 10
0415 #' length(rdd) # Same as count
0416 #'}
0417 #' @rdname count
0418 #' @aliases count,RDD-method
0419 #' @noRd
0420 setMethod("countRDD",
0421           signature(x = "RDD"),
0422           function(x) {
0423             countPartition <- function(part) {
0424               as.integer(length(part))
0425             }
0426             valsRDD <- lapplyPartition(x, countPartition)
0427             vals <- collectRDD(valsRDD)
0428             sum(as.integer(vals))
0429           })
0430 
0431 #' Return the number of elements in the RDD
0432 #' @rdname count
0433 #' @noRd
0434 setMethod("lengthRDD",
0435           signature(x = "RDD"),
0436           function(x) {
0437             countRDD(x)
0438           })
0439 
0440 #' Return the count of each unique value in this RDD as a list of
0441 #' (value, count) pairs.
0442 #'
0443 #' Same as countByValue in Spark.
0444 #'
0445 #' @param x The RDD to count
0446 #' @return list of (value, count) pairs, where count is number of each unique
0447 #' value in rdd.
0448 #' @examples
0449 # nolint start
0450 #'\dontrun{
0451 #' sc <- sparkR.init()
0452 #' rdd <- parallelize(sc, c(1,2,3,2,1))
0453 #' countByValue(rdd) # (1,2L), (2,2L), (3,1L)
0454 #'}
0455 # nolint end
0456 #' @rdname countByValue
0457 #' @aliases countByValue,RDD-method
0458 #' @noRd
0459 setMethod("countByValue",
0460           signature(x = "RDD"),
0461           function(x) {
0462             ones <- lapply(x, function(item) { list(item, 1L) })
0463             collectRDD(reduceByKey(ones, `+`, getNumPartitionsRDD(x)))
0464           })
0465 
0466 #' Apply a function to all elements
0467 #'
0468 #' This function creates a new RDD by applying the given transformation to all
0469 #' elements of the given RDD
0470 #'
0471 #' @param X The RDD to apply the transformation.
0472 #' @param FUN the transformation to apply on each element
0473 #' @return a new RDD created by the transformation.
0474 #' @rdname lapply
0475 #' @noRd
0476 #' @aliases lapply
0477 #' @examples
0478 #'\dontrun{
0479 #' sc <- sparkR.init()
0480 #' rdd <- parallelize(sc, 1:10)
0481 #' multiplyByTwo <- lapply(rdd, function(x) { x * 2 })
0482 #' collectRDD(multiplyByTwo) # 2,4,6...
0483 #'}
0484 setMethod("lapply",
0485           signature(X = "RDD", FUN = "function"),
0486           function(X, FUN) {
0487             func <- function(partIndex, part) {
0488               lapply(part, FUN)
0489             }
0490             lapplyPartitionsWithIndex(X, func)
0491           })
0492 
0493 #' @rdname lapply
0494 #' @aliases map,RDD,function-method
0495 #' @noRd
0496 setMethod("map",
0497           signature(X = "RDD", FUN = "function"),
0498           function(X, FUN) {
0499             lapply(X, FUN)
0500           })
0501 
0502 #' Flatten results after applying a function to all elements
0503 #'
0504 #' This function returns a new RDD by first applying a function to all
0505 #' elements of this RDD, and then flattening the results.
0506 #'
0507 #' @param X The RDD to apply the transformation.
0508 #' @param FUN the transformation to apply on each element
0509 #' @return a new RDD created by the transformation.
0510 #' @examples
0511 #'\dontrun{
0512 #' sc <- sparkR.init()
0513 #' rdd <- parallelize(sc, 1:10)
0514 #' multiplyByTwo <- flatMap(rdd, function(x) { list(x*2, x*10) })
0515 #' collectRDD(multiplyByTwo) # 2,20,4,40,6,60...
0516 #'}
0517 #' @rdname flatMap
0518 #' @aliases flatMap,RDD,function-method
0519 #' @noRd
0520 setMethod("flatMap",
0521           signature(X = "RDD", FUN = "function"),
0522           function(X, FUN) {
0523             partitionFunc <- function(part) {
0524               unlist(
0525                 lapply(part, FUN),
0526                 recursive = F
0527               )
0528             }
0529             lapplyPartition(X, partitionFunc)
0530           })
0531 
0532 #' Apply a function to each partition of an RDD
0533 #'
0534 #' Return a new RDD by applying a function to each partition of this RDD.
0535 #'
0536 #' @param X The RDD to apply the transformation.
0537 #' @param FUN the transformation to apply on each partition.
0538 #' @return a new RDD created by the transformation.
0539 #' @examples
0540 #'\dontrun{
0541 #' sc <- sparkR.init()
0542 #' rdd <- parallelize(sc, 1:10)
0543 #' partitionSum <- lapplyPartition(rdd, function(part) { Reduce("+", part) })
0544 #' collectRDD(partitionSum) # 15, 40
0545 #'}
0546 #' @rdname lapplyPartition
0547 #' @aliases lapplyPartition,RDD,function-method
0548 #' @noRd
0549 setMethod("lapplyPartition",
0550           signature(X = "RDD", FUN = "function"),
0551           function(X, FUN) {
0552             lapplyPartitionsWithIndex(X, function(s, part) { FUN(part) })
0553           })
0554 
0555 #' mapPartitions is the same as lapplyPartition.
0556 #'
0557 #' @rdname lapplyPartition
0558 #' @aliases mapPartitions,RDD,function-method
0559 #' @noRd
0560 setMethod("mapPartitions",
0561           signature(X = "RDD", FUN = "function"),
0562           function(X, FUN) {
0563             lapplyPartition(X, FUN)
0564           })
0565 
0566 #' Return a new RDD by applying a function to each partition of this RDD, while
0567 #' tracking the index of the original partition.
0568 #'
0569 #' @param X The RDD to apply the transformation.
0570 #' @param FUN the transformation to apply on each partition; takes the partition
0571 #'        index and a list of elements in the particular partition.
0572 #' @return a new RDD created by the transformation.
0573 #' @examples
0574 #'\dontrun{
0575 #' sc <- sparkR.init()
0576 #' rdd <- parallelize(sc, 1:10, 5L)
0577 #' prod <- lapplyPartitionsWithIndex(rdd, function(partIndex, part) {
0578 #'                                          partIndex * Reduce("+", part) })
0579 #' collectRDD(prod, flatten = FALSE) # 0, 7, 22, 45, 76
0580 #'}
0581 #' @rdname lapplyPartitionsWithIndex
0582 #' @aliases lapplyPartitionsWithIndex,RDD,function-method
0583 #' @noRd
0584 setMethod("lapplyPartitionsWithIndex",
0585           signature(X = "RDD", FUN = "function"),
0586           function(X, FUN) {
0587             PipelinedRDD(X, FUN)
0588           })
0589 
0590 #' @rdname lapplyPartitionsWithIndex
0591 #' @aliases mapPartitionsWithIndex,RDD,function-method
0592 #' @noRd
0593 setMethod("mapPartitionsWithIndex",
0594           signature(X = "RDD", FUN = "function"),
0595           function(X, FUN) {
0596             lapplyPartitionsWithIndex(X, FUN)
0597           })
0598 
0599 #' This function returns a new RDD containing only the elements that satisfy
0600 #' a predicate (i.e. returning TRUE in a given logical function).
0601 #' The same as `filter()' in Spark.
0602 #'
0603 #' @param x The RDD to be filtered.
0604 #' @param f A unary predicate function.
0605 #' @examples
0606 # nolint start
0607 #'\dontrun{
0608 #' sc <- sparkR.init()
0609 #' rdd <- parallelize(sc, 1:10)
0610 #' unlist(collectRDD(filterRDD(rdd, function (x) { x < 3 }))) # c(1, 2)
0611 #'}
0612 # nolint end
0613 #' @rdname filterRDD
0614 #' @aliases filterRDD,RDD,function-method
0615 #' @noRd
0616 setMethod("filterRDD",
0617           signature(x = "RDD", f = "function"),
0618           function(x, f) {
0619             filter.func <- function(part) {
0620               Filter(f, part)
0621             }
0622             lapplyPartition(x, filter.func)
0623           })
0624 
0625 #' @rdname filterRDD
0626 #' @aliases Filter
0627 #' @noRd
0628 setMethod("Filter",
0629           signature(f = "function", x = "RDD"),
0630           function(f, x) {
0631             filterRDD(x, f)
0632           })
0633 
0634 #' Reduce across elements of an RDD.
0635 #'
0636 #' This function reduces the elements of this RDD using the
0637 #' specified commutative and associative binary operator.
0638 #'
0639 #' @param x The RDD to reduce
0640 #' @param func Commutative and associative function to apply on elements
0641 #'             of the RDD.
0642 #' @examples
0643 #'\dontrun{
0644 #' sc <- sparkR.init()
0645 #' rdd <- parallelize(sc, 1:10)
0646 #' reduce(rdd, "+") # 55
0647 #'}
0648 #' @rdname reduce
0649 #' @aliases reduce,RDD,ANY-method
0650 #' @noRd
0651 setMethod("reduce",
0652           signature(x = "RDD", func = "ANY"),
0653           function(x, func) {
0654 
0655             reducePartition <- function(part) {
0656               Reduce(func, part)
0657             }
0658 
0659             partitionList <- collectRDD(lapplyPartition(x, reducePartition),
0660                                      flatten = FALSE)
0661             Reduce(func, partitionList)
0662           })
0663 
0664 #' Get the maximum element of an RDD.
0665 #'
0666 #' @param x The RDD to get the maximum element from
0667 #' @examples
0668 #'\dontrun{
0669 #' sc <- sparkR.init()
0670 #' rdd <- parallelize(sc, 1:10)
0671 #' maximum(rdd) # 10
0672 #'}
0673 #' @rdname maximum
0674 #' @aliases maximum,RDD
0675 #' @noRd
0676 setMethod("maximum",
0677           signature(x = "RDD"),
0678           function(x) {
0679             reduce(x, max)
0680           })
0681 
0682 #' Get the minimum element of an RDD.
0683 #'
0684 #' @param x The RDD to get the minimum element from
0685 #' @examples
0686 #'\dontrun{
0687 #' sc <- sparkR.init()
0688 #' rdd <- parallelize(sc, 1:10)
0689 #' minimum(rdd) # 1
0690 #'}
0691 #' @rdname minimum
0692 #' @aliases minimum,RDD
0693 #' @noRd
0694 setMethod("minimum",
0695           signature(x = "RDD"),
0696           function(x) {
0697             reduce(x, min)
0698           })
0699 
0700 #' Add up the elements in an RDD.
0701 #'
0702 #' @param x The RDD to add up the elements in
0703 #' @examples
0704 #'\dontrun{
0705 #' sc <- sparkR.init()
0706 #' rdd <- parallelize(sc, 1:10)
0707 #' sumRDD(rdd) # 55
0708 #'}
0709 #' @rdname sumRDD
0710 #' @aliases sumRDD,RDD
0711 #' @noRd
0712 setMethod("sumRDD",
0713           signature(x = "RDD"),
0714           function(x) {
0715             reduce(x, "+")
0716           })
0717 
0718 #' Applies a function to all elements in an RDD, and forces evaluation.
0719 #'
0720 #' @param x The RDD to apply the function
0721 #' @param func The function to be applied.
0722 #' @return invisible NULL.
0723 #' @examples
0724 #'\dontrun{
0725 #' sc <- sparkR.init()
0726 #' rdd <- parallelize(sc, 1:10)
0727 #' foreach(rdd, function(x) { save(x, file=...) })
0728 #'}
0729 #' @rdname foreach
0730 #' @aliases foreach,RDD,function-method
0731 #' @noRd
0732 setMethod("foreach",
0733           signature(x = "RDD", func = "function"),
0734           function(x, func) {
0735             partition.func <- function(x) {
0736               lapply(x, func)
0737               NULL
0738             }
0739             invisible(collectRDD(mapPartitions(x, partition.func)))
0740           })
0741 
0742 #' Applies a function to each partition in an RDD, and forces evaluation.
0743 #'
0744 #' @examples
0745 #'\dontrun{
0746 #' sc <- sparkR.init()
0747 #' rdd <- parallelize(sc, 1:10)
0748 #' foreachPartition(rdd, function(part) { save(part, file=...); NULL })
0749 #'}
0750 #' @rdname foreach
0751 #' @aliases foreachPartition,RDD,function-method
0752 #' @noRd
0753 setMethod("foreachPartition",
0754           signature(x = "RDD", func = "function"),
0755           function(x, func) {
0756             invisible(collectRDD(mapPartitions(x, func)))
0757           })
0758 
0759 #' Take elements from an RDD.
0760 #'
0761 #' This function takes the first NUM elements in the RDD and
0762 #' returns them in a list.
0763 #'
0764 #' @param x The RDD to take elements from
0765 #' @param num Number of elements to take
0766 #' @examples
0767 # nolint start
0768 #'\dontrun{
0769 #' sc <- sparkR.init()
0770 #' rdd <- parallelize(sc, 1:10)
0771 #' takeRDD(rdd, 2L) # list(1, 2)
0772 #'}
0773 # nolint end
0774 #' @rdname take
0775 #' @aliases take,RDD,numeric-method
0776 #' @noRd
0777 setMethod("takeRDD",
0778           signature(x = "RDD", num = "numeric"),
0779           function(x, num) {
0780             resList <- list()
0781             index <- -1
0782             jrdd <- getJRDD(x)
0783             numPartitions <- getNumPartitionsRDD(x)
0784             serializedModeRDD <- getSerializedMode(x)
0785 
0786             # TODO(shivaram): Collect more than one partition based on size
0787             # estimates similar to the scala version of `take`.
0788             while (TRUE) {
0789               index <- index + 1
0790 
0791               if (length(resList) >= num || index >= numPartitions)
0792                 break
0793 
0794               # a JList of byte arrays
0795               partitionArr <- callJMethod(jrdd, "collectPartitions", as.list(as.integer(index)))
0796               partition <- partitionArr[[1]]
0797 
0798               size <- num - length(resList)
0799               # elems is capped to have at most `size` elements
0800               elems <- convertJListToRList(partition,
0801                                            flatten = TRUE,
0802                                            logicalUpperBound = size,
0803                                            serializedMode = serializedModeRDD)
0804 
0805               resList <- append(resList, elems)
0806             }
0807             resList
0808           })
0809 
0810 
0811 #' First
0812 #'
0813 #' Return the first element of an RDD
0814 #'
0815 #' @rdname first
0816 #' @examples
0817 #'\dontrun{
0818 #' sc <- sparkR.init()
0819 #' rdd <- parallelize(sc, 1:10)
0820 #' firstRDD(rdd)
0821 #' }
0822 #' @noRd
0823 setMethod("firstRDD",
0824           signature(x = "RDD"),
0825           function(x) {
0826             takeRDD(x, 1)[[1]]
0827           })
0828 
0829 #' Removes the duplicates from RDD.
0830 #'
0831 #' This function returns a new RDD containing the distinct elements in the
0832 #' given RDD. The same as `distinct()' in Spark.
0833 #'
0834 #' @param x The RDD to remove duplicates from.
0835 #' @param numPartitions Number of partitions to create.
0836 #' @examples
0837 # nolint start
0838 #'\dontrun{
0839 #' sc <- sparkR.init()
0840 #' rdd <- parallelize(sc, c(1,2,2,3,3,3))
0841 #' sort(unlist(collectRDD(distinctRDD(rdd)))) # c(1, 2, 3)
0842 #'}
0843 # nolint end
0844 #' @rdname distinct
0845 #' @aliases distinct,RDD-method
0846 #' @noRd
0847 setMethod("distinctRDD",
0848           signature(x = "RDD"),
0849           function(x, numPartitions = SparkR:::getNumPartitionsRDD(x)) {
0850             identical.mapped <- lapply(x, function(x) { list(x, NULL) })
0851             reduced <- reduceByKey(identical.mapped,
0852                                    function(x, y) { x },
0853                                    numPartitions)
0854             resRDD <- lapply(reduced, function(x) { x[[1]] })
0855             resRDD
0856           })
0857 
0858 #' Return an RDD that is a sampled subset of the given RDD.
0859 #'
0860 #' The same as `sample()' in Spark. (We rename it due to signature
0861 #' inconsistencies with the `sample()' function in R's base package.)
0862 #'
0863 #' @param x The RDD to sample elements from
0864 #' @param withReplacement Sampling with replacement or not
0865 #' @param fraction The (rough) sample target fraction
0866 #' @param seed Randomness seed value
0867 #' @examples
0868 #'\dontrun{
0869 #' sc <- sparkR.init()
0870 #' rdd <- parallelize(sc, 1:10)
0871 #' collectRDD(sampleRDD(rdd, FALSE, 0.5, 1618L)) # ~5 distinct elements
0872 #' collectRDD(sampleRDD(rdd, TRUE, 0.5, 9L)) # ~5 elements possibly with duplicates
0873 #'}
0874 #' @rdname sampleRDD
0875 #' @aliases sampleRDD,RDD
0876 #' @noRd
0877 setMethod("sampleRDD",
0878           signature(x = "RDD", withReplacement = "logical",
0879                     fraction = "numeric", seed = "integer"),
0880           function(x, withReplacement, fraction, seed) {
0881 
0882             # The sampler: takes a partition and returns its sampled version.
0883             samplingFunc <- function(partIndex, part) {
0884               set.seed(seed)
0885               res <- vector("list", length(part))
0886               len <- 0
0887 
0888               # Discards some random values to ensure each partition has a
0889               # different random seed.
0890               stats::runif(partIndex)
0891 
0892               for (elem in part) {
0893                 if (withReplacement) {
0894                   count <- stats::rpois(1, fraction)
0895                   if (count > 0) {
0896                     res[(len + 1) : (len + count)] <- rep(list(elem), count)
0897                     len <- len + count
0898                   }
0899                 } else {
0900                   if (stats::runif(1) < fraction) {
0901                     len <- len + 1
0902                     res[[len]] <- elem
0903                   }
0904                 }
0905               }
0906 
0907               # TODO(zongheng): look into the performance of the current
0908               # implementation. Look into some iterator package? Note that
0909               # Scala avoids many calls to creating an empty list and PySpark
0910               # similarly achieves this using `yield'.
0911               if (len > 0)
0912                 res[1:len]
0913               else
0914                 list()
0915             }
0916 
0917             lapplyPartitionsWithIndex(x, samplingFunc)
0918           })
0919 
0920 #' Return a list of the elements that are a sampled subset of the given RDD.
0921 #'
0922 #' @param x The RDD to sample elements from
0923 #' @param withReplacement Sampling with replacement or not
0924 #' @param num Number of elements to return
0925 #' @param seed Randomness seed value
0926 #' @examples
0927 #'\dontrun{
0928 #' sc <- sparkR.init()
0929 #' rdd <- parallelize(sc, 1:100)
0930 #' # exactly 5 elements sampled, which may not be distinct
0931 #' takeSample(rdd, TRUE, 5L, 1618L)
0932 #' # exactly 5 distinct elements sampled
0933 #' takeSample(rdd, FALSE, 5L, 16181618L)
0934 #'}
0935 #' @rdname takeSample
0936 #' @aliases takeSample,RDD
0937 #' @noRd
0938 setMethod("takeSample", signature(x = "RDD", withReplacement = "logical",
0939                                   num = "integer", seed = "integer"),
0940           function(x, withReplacement, num, seed) {
0941             # This function is ported from RDD.scala.
0942             fraction <- 0.0
0943             total <- 0
0944             multiplier <- 3.0
0945             initialCount <- countRDD(x)
0946             maxSelected <- 0
0947             MAXINT <- .Machine$integer.max
0948 
0949             if (num < 0)
0950               stop("Negative number of elements requested")
0951 
0952             if (initialCount > MAXINT - 1) {
0953               maxSelected <- MAXINT - 1
0954             } else {
0955               maxSelected <- initialCount
0956             }
0957 
0958             if (num > initialCount && !withReplacement) {
0959               total <- maxSelected
0960               fraction <- multiplier * (maxSelected + 1) / initialCount
0961             } else {
0962               total <- num
0963               fraction <- multiplier * (num + 1) / initialCount
0964             }
0965 
0966             set.seed(seed)
0967             samples <- collectRDD(sampleRDD(x, withReplacement, fraction,
0968                                          as.integer(ceiling(stats::runif(1,
0969                                                                   -MAXINT,
0970                                                                   MAXINT)))))
0971             # If the first sample didn't turn out large enough, keep trying to
0972             # take samples; this shouldn't happen often because we use a big
0973             # multiplier for thei initial size
0974             while (length(samples) < total)
0975               samples <- collectRDD(sampleRDD(x, withReplacement, fraction,
0976                                            as.integer(ceiling(stats::runif(1,
0977                                                                     -MAXINT,
0978                                                                     MAXINT)))))
0979 
0980             # TODO(zongheng): investigate if this call is an in-place shuffle?
0981             base::sample(samples)[1:total]
0982           })
0983 
0984 #' Creates tuples of the elements in this RDD by applying a function.
0985 #'
0986 #' @param x The RDD.
0987 #' @param func The function to be applied.
0988 #' @examples
0989 # nolint start
0990 #'\dontrun{
0991 #' sc <- sparkR.init()
0992 #' rdd <- parallelize(sc, list(1, 2, 3))
0993 #' collectRDD(keyBy(rdd, function(x) { x*x })) # list(list(1, 1), list(4, 2), list(9, 3))
0994 #'}
0995 # nolint end
0996 #' @rdname keyBy
0997 #' @aliases keyBy,RDD
0998 #' @noRd
0999 setMethod("keyBy",
1000           signature(x = "RDD", func = "function"),
1001           function(x, func) {
1002             apply.func <- function(x) {
1003               list(func(x), x)
1004             }
1005             lapply(x, apply.func)
1006           })
1007 
1008 #' Return a new RDD that has exactly numPartitions partitions.
1009 #' Can increase or decrease the level of parallelism in this RDD. Internally,
1010 #' this uses a shuffle to redistribute data.
1011 #' If you are decreasing the number of partitions in this RDD, consider using
1012 #' coalesce, which can avoid performing a shuffle.
1013 #'
1014 #' @param x The RDD.
1015 #' @param numPartitions Number of partitions to create.
1016 #' @seealso coalesce
1017 #' @examples
1018 #'\dontrun{
1019 #' sc <- sparkR.init()
1020 #' rdd <- parallelize(sc, list(1, 2, 3, 4, 5, 6, 7), 4L)
1021 #' getNumPartitions(rdd)                   # 4
1022 #' getNumPartitions(repartitionRDD(rdd, 2L))  # 2
1023 #'}
1024 #' @rdname repartition
1025 #' @aliases repartition,RDD
1026 #' @noRd
1027 setMethod("repartitionRDD",
1028           signature(x = "RDD"),
1029           function(x, numPartitions) {
1030             if (!is.null(numPartitions) && is.numeric(numPartitions)) {
1031               coalesceRDD(x, numPartitions, TRUE)
1032             } else {
1033               stop("Please, specify the number of partitions")
1034             }
1035           })
1036 
1037 #' Return a new RDD that is reduced into numPartitions partitions.
1038 #'
1039 #' @param x The RDD.
1040 #' @param numPartitions Number of partitions to create.
1041 #' @seealso repartition
1042 #' @examples
1043 #'\dontrun{
1044 #' sc <- sparkR.init()
1045 #' rdd <- parallelize(sc, list(1, 2, 3, 4, 5), 3L)
1046 #' getNumPartitions(rdd)               # 3
1047 #' getNumPartitions(coalesce(rdd, 1L)) # 1
1048 #'}
1049 #' @rdname coalesce
1050 #' @aliases coalesce,RDD
1051 #' @noRd
1052 setMethod("coalesceRDD",
1053            signature(x = "RDD", numPartitions = "numeric"),
1054            function(x, numPartitions, shuffle = FALSE) {
1055              numPartitions <- numToInt(numPartitions)
1056              if (shuffle || numPartitions > SparkR:::getNumPartitionsRDD(x)) {
1057                func <- function(partIndex, part) {
1058                  set.seed(partIndex)  # partIndex as seed
1059                  start <- as.integer(base::sample(numPartitions, 1) - 1)
1060                  lapply(seq_along(part),
1061                         function(i) {
1062                           pos <- (start + i) %% numPartitions
1063                           list(pos, part[[i]])
1064                         })
1065                }
1066                shuffled <- lapplyPartitionsWithIndex(x, func)
1067                repartitioned <- partitionByRDD(shuffled, numPartitions)
1068                values(repartitioned)
1069              } else {
1070                jrdd <- callJMethod(getJRDD(x), "coalesce", numPartitions, shuffle)
1071                RDD(jrdd)
1072              }
1073            })
1074 
1075 #' Save this RDD as a SequenceFile of serialized objects.
1076 #'
1077 #' @param x The RDD to save
1078 #' @param path The directory where the file is saved
1079 #' @seealso objectFile
1080 #' @examples
1081 #'\dontrun{
1082 #' sc <- sparkR.init()
1083 #' rdd <- parallelize(sc, 1:3)
1084 #' saveAsObjectFile(rdd, "/tmp/sparkR-tmp")
1085 #'}
1086 #' @rdname saveAsObjectFile
1087 #' @aliases saveAsObjectFile,RDD
1088 #' @noRd
1089 setMethod("saveAsObjectFile",
1090           signature(x = "RDD", path = "character"),
1091           function(x, path) {
1092             # If serializedMode == "string" we need to serialize the data before saving it since
1093             # objectFile() assumes serializedMode == "byte".
1094             if (getSerializedMode(x) != "byte") {
1095               x <- serializeToBytes(x)
1096             }
1097             # Return nothing
1098             invisible(callJMethod(getJRDD(x), "saveAsObjectFile", path))
1099           })
1100 
1101 #' Save this RDD as a text file, using string representations of elements.
1102 #'
1103 #' @param x The RDD to save
1104 #' @param path The directory where the partitions of the text file are saved
1105 #' @examples
1106 #'\dontrun{
1107 #' sc <- sparkR.init()
1108 #' rdd <- parallelize(sc, 1:3)
1109 #' saveAsTextFile(rdd, "/tmp/sparkR-tmp")
1110 #'}
1111 #' @rdname saveAsTextFile
1112 #' @aliases saveAsTextFile,RDD
1113 #' @noRd
1114 setMethod("saveAsTextFile",
1115           signature(x = "RDD", path = "character"),
1116           function(x, path) {
1117             func <- function(str) {
1118               toString(str)
1119             }
1120             stringRdd <- lapply(x, func)
1121             # Return nothing
1122             invisible(
1123               callJMethod(getJRDD(stringRdd, serializedMode = "string"), "saveAsTextFile", path))
1124           })
1125 
1126 #' Sort an RDD by the given key function.
1127 #'
1128 #' @param x An RDD to be sorted.
1129 #' @param func A function used to compute the sort key for each element.
1130 #' @param ascending A flag to indicate whether the sorting is ascending or descending.
1131 #' @param numPartitions Number of partitions to create.
1132 #' @return An RDD where all elements are sorted.
1133 #' @examples
1134 # nolint start
1135 #'\dontrun{
1136 #' sc <- sparkR.init()
1137 #' rdd <- parallelize(sc, list(3, 2, 1))
1138 #' collectRDD(sortBy(rdd, function(x) { x })) # list (1, 2, 3)
1139 #'}
1140 # nolint end
1141 #' @rdname sortBy
1142 #' @aliases sortBy,RDD,RDD-method
1143 #' @noRd
1144 setMethod("sortBy",
1145           signature(x = "RDD", func = "function"),
1146           function(x, func, ascending = TRUE, numPartitions = SparkR:::getNumPartitionsRDD(x)) {
1147             values(sortByKey(keyBy(x, func), ascending, numPartitions))
1148           })
1149 
1150 # Helper function to get first N elements from an RDD in the specified order.
1151 # Param:
1152 #   x An RDD.
1153 #   num Number of elements to return.
1154 #   ascending A flag to indicate whether the sorting is ascending or descending.
1155 # Return:
1156 #   A list of the first N elements from the RDD in the specified order.
1157 #
1158 takeOrderedElem <- function(x, num, ascending = TRUE) {
1159   if (num <= 0L) {
1160     return(list())
1161   }
1162 
1163   partitionFunc <- function(part) {
1164     if (num < length(part)) {
1165       # R limitation: order works only on primitive types!
1166       ord <- order(unlist(part, recursive = FALSE), decreasing = !ascending)
1167       part[ord[1:num]]
1168     } else {
1169       part
1170     }
1171   }
1172 
1173   newRdd <- mapPartitions(x, partitionFunc)
1174 
1175   resList <- list()
1176   index <- -1
1177   jrdd <- getJRDD(newRdd)
1178   numPartitions <- getNumPartitionsRDD(newRdd)
1179   serializedModeRDD <- getSerializedMode(newRdd)
1180 
1181   while (TRUE) {
1182     index <- index + 1
1183 
1184     if (index >= numPartitions) {
1185       ord <- order(unlist(resList, recursive = FALSE), decreasing = !ascending)
1186       resList <- resList[ord[1:num]]
1187       break
1188     }
1189 
1190     # a JList of byte arrays
1191     partitionArr <- callJMethod(jrdd, "collectPartitions", as.list(as.integer(index)))
1192     partition <- partitionArr[[1]]
1193 
1194     # elems is capped to have at most `num` elements
1195     elems <- convertJListToRList(partition,
1196                                  flatten = TRUE,
1197                                  logicalUpperBound = num,
1198                                  serializedMode = serializedModeRDD)
1199 
1200     resList <- append(resList, elems)
1201   }
1202   resList
1203 }
1204 
1205 #' Returns the first N elements from an RDD in ascending order.
1206 #'
1207 #' @param x An RDD.
1208 #' @param num Number of elements to return.
1209 #' @return The first N elements from the RDD in ascending order.
1210 #' @examples
1211 # nolint start
1212 #'\dontrun{
1213 #' sc <- sparkR.init()
1214 #' rdd <- parallelize(sc, list(10, 1, 2, 9, 3, 4, 5, 6, 7))
1215 #' takeOrdered(rdd, 6L) # list(1, 2, 3, 4, 5, 6)
1216 #'}
1217 # nolint end
1218 #' @rdname takeOrdered
1219 #' @aliases takeOrdered,RDD,RDD-method
1220 #' @noRd
1221 setMethod("takeOrdered",
1222           signature(x = "RDD", num = "integer"),
1223           function(x, num) {
1224             takeOrderedElem(x, num)
1225           })
1226 
1227 #' Returns the top N elements from an RDD.
1228 #'
1229 #' @param x An RDD.
1230 #' @param num Number of elements to return.
1231 #' @return The top N elements from the RDD.
1232 #' @rdname top
1233 #' @examples
1234 # nolint start
1235 #'\dontrun{
1236 #' sc <- sparkR.init()
1237 #' rdd <- parallelize(sc, list(10, 1, 2, 9, 3, 4, 5, 6, 7))
1238 #' top(rdd, 6L) # list(10, 9, 7, 6, 5, 4)
1239 #'}
1240 # nolint end
1241 #' @aliases top,RDD,RDD-method
1242 #' @noRd
1243 setMethod("top",
1244           signature(x = "RDD", num = "integer"),
1245           function(x, num) {
1246             takeOrderedElem(x, num, FALSE)
1247           })
1248 
1249 #' Fold an RDD using a given associative function and a neutral "zero value".
1250 #'
1251 #' Aggregate the elements of each partition, and then the results for all the
1252 #' partitions, using a given associative function and a neutral "zero value".
1253 #'
1254 #' @param x An RDD.
1255 #' @param zeroValue A neutral "zero value".
1256 #' @param op An associative function for the folding operation.
1257 #' @return The folding result.
1258 #' @rdname fold
1259 #' @seealso reduce
1260 #' @examples
1261 #'\dontrun{
1262 #' sc <- sparkR.init()
1263 #' rdd <- parallelize(sc, list(1, 2, 3, 4, 5))
1264 #' fold(rdd, 0, "+") # 15
1265 #'}
1266 #' @aliases fold,RDD,RDD-method
1267 #' @noRd
1268 setMethod("fold",
1269           signature(x = "RDD", zeroValue = "ANY", op = "ANY"),
1270           function(x, zeroValue, op) {
1271             aggregateRDD(x, zeroValue, op, op)
1272           })
1273 
1274 #' Aggregate an RDD using the given combine functions and a neutral "zero value".
1275 #'
1276 #' Aggregate the elements of each partition, and then the results for all the
1277 #' partitions, using given combine functions and a neutral "zero value".
1278 #'
1279 #' @param x An RDD.
1280 #' @param zeroValue A neutral "zero value".
1281 #' @param seqOp A function to aggregate the RDD elements. It may return a different
1282 #'              result type from the type of the RDD elements.
1283 #' @param combOp A function to aggregate results of seqOp.
1284 #' @return The aggregation result.
1285 #' @rdname aggregateRDD
1286 #' @seealso reduce
1287 #' @examples
1288 # nolint start
1289 #'\dontrun{
1290 #' sc <- sparkR.init()
1291 #' rdd <- parallelize(sc, list(1, 2, 3, 4))
1292 #' zeroValue <- list(0, 0)
1293 #' seqOp <- function(x, y) { list(x[[1]] + y, x[[2]] + 1) }
1294 #' combOp <- function(x, y) { list(x[[1]] + y[[1]], x[[2]] + y[[2]]) }
1295 #' aggregateRDD(rdd, zeroValue, seqOp, combOp) # list(10, 4)
1296 #'}
1297 # nolint end
1298 #' @aliases aggregateRDD,RDD,RDD-method
1299 #' @noRd
1300 setMethod("aggregateRDD",
1301           signature(x = "RDD", zeroValue = "ANY", seqOp = "ANY", combOp = "ANY"),
1302           function(x, zeroValue, seqOp, combOp) {
1303             partitionFunc <- function(part) {
1304               Reduce(seqOp, part, zeroValue)
1305             }
1306 
1307             partitionList <- collectRDD(lapplyPartition(x, partitionFunc),
1308                                      flatten = FALSE)
1309             Reduce(combOp, partitionList, zeroValue)
1310           })
1311 
1312 #' Pipes elements to a forked external process.
1313 #'
1314 #' The same as 'pipe()' in Spark.
1315 #'
1316 #' @param x The RDD whose elements are piped to the forked external process.
1317 #' @param command The command to fork an external process.
1318 #' @param env A named list to set environment variables of the external process.
1319 #' @return A new RDD created by piping all elements to a forked external process.
1320 #' @rdname pipeRDD
1321 #' @examples
1322 #'\dontrun{
1323 #' sc <- sparkR.init()
1324 #' rdd <- parallelize(sc, 1:10)
1325 #' pipeRDD(rdd, "more")
1326 #' Output: c("1", "2", ..., "10")
1327 #'}
1328 #' @aliases pipeRDD,RDD,character-method
1329 #' @noRd
1330 setMethod("pipeRDD",
1331           signature(x = "RDD", command = "character"),
1332           function(x, command, env = list()) {
1333             func <- function(part) {
1334               trim_trailing_func <- function(x) {
1335                 sub("[\r\n]*$", "", toString(x))
1336               }
1337               input <- unlist(lapply(part, trim_trailing_func))
1338               res <- system2(command, stdout = TRUE, input = input, env = env)
1339               lapply(res, trim_trailing_func)
1340             }
1341             lapplyPartition(x, func)
1342           })
1343 
1344 #' TODO: Consider caching the name in the RDD's environment
1345 #' Return an RDD's name.
1346 #'
1347 #' @param x The RDD whose name is returned.
1348 #' @rdname name
1349 #' @examples
1350 #'\dontrun{
1351 #' sc <- sparkR.init()
1352 #' rdd <- parallelize(sc, list(1,2,3))
1353 #' name(rdd) # NULL (if not set before)
1354 #'}
1355 #' @aliases name,RDD
1356 #' @noRd
1357 setMethod("name",
1358           signature(x = "RDD"),
1359           function(x) {
1360             callJMethod(getJRDD(x), "name")
1361           })
1362 
1363 #' Set an RDD's name.
1364 #'
1365 #' @param x The RDD whose name is to be set.
1366 #' @param name The RDD name to be set.
1367 #' @return a new RDD renamed.
1368 #' @rdname setName
1369 #' @examples
1370 #'\dontrun{
1371 #' sc <- sparkR.init()
1372 #' rdd <- parallelize(sc, list(1,2,3))
1373 #' setName(rdd, "myRDD")
1374 #' name(rdd) # "myRDD"
1375 #'}
1376 #' @aliases setName,RDD
1377 #' @noRd
1378 setMethod("setName",
1379           signature(x = "RDD", name = "character"),
1380           function(x, name) {
1381             callJMethod(getJRDD(x), "setName", name)
1382             x
1383           })
1384 
1385 #' Zip an RDD with generated unique Long IDs.
1386 #'
1387 #' Items in the kth partition will get ids k, n+k, 2*n+k, ..., where
1388 #' n is the number of partitions. So there may exist gaps, but this
1389 #' method won't trigger a spark job, which is different from
1390 #' zipWithIndex.
1391 #'
1392 #' @param x An RDD to be zipped.
1393 #' @return An RDD with zipped items.
1394 #' @seealso zipWithIndex
1395 #' @examples
1396 # nolint start
1397 #'\dontrun{
1398 #' sc <- sparkR.init()
1399 #' rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 3L)
1400 #' collectRDD(zipWithUniqueId(rdd))
1401 #' # list(list("a", 0), list("b", 3), list("c", 1), list("d", 4), list("e", 2))
1402 #'}
1403 # nolint end
1404 #' @rdname zipWithUniqueId
1405 #' @aliases zipWithUniqueId,RDD
1406 #' @noRd
1407 setMethod("zipWithUniqueId",
1408           signature(x = "RDD"),
1409           function(x) {
1410             n <- getNumPartitionsRDD(x)
1411 
1412             partitionFunc <- function(partIndex, part) {
1413               mapply(
1414                 function(item, index) {
1415                   list(item, (index - 1) * n + partIndex)
1416                 },
1417                 part,
1418                 seq_along(part),
1419                 SIMPLIFY = FALSE)
1420             }
1421 
1422             lapplyPartitionsWithIndex(x, partitionFunc)
1423           })
1424 
1425 #' Zip an RDD with its element indices.
1426 #'
1427 #' The ordering is first based on the partition index and then the
1428 #' ordering of items within each partition. So the first item in
1429 #' the first partition gets index 0, and the last item in the last
1430 #' partition receives the largest index.
1431 #'
1432 #' This method needs to trigger a Spark job when this RDD contains
1433 #' more than one partition.
1434 #'
1435 #' @param x An RDD to be zipped.
1436 #' @return An RDD with zipped items.
1437 #' @seealso zipWithUniqueId
1438 #' @examples
1439 # nolint start
1440 #'\dontrun{
1441 #' sc <- sparkR.init()
1442 #' rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 3L)
1443 #' collectRDD(zipWithIndex(rdd))
1444 #' # list(list("a", 0), list("b", 1), list("c", 2), list("d", 3), list("e", 4))
1445 #'}
1446 # nolint end
1447 #' @rdname zipWithIndex
1448 #' @aliases zipWithIndex,RDD
1449 #' @noRd
1450 setMethod("zipWithIndex",
1451           signature(x = "RDD"),
1452           function(x) {
1453             n <- getNumPartitionsRDD(x)
1454             if (n > 1) {
1455               nums <- collectRDD(lapplyPartition(x,
1456                                               function(part) {
1457                                                 list(length(part))
1458                                               }))
1459               startIndices <- Reduce("+", nums, accumulate = TRUE)
1460             }
1461 
1462             partitionFunc <- function(partIndex, part) {
1463               if (partIndex == 0) {
1464                 startIndex <- 0
1465               } else {
1466                 startIndex <- startIndices[[partIndex]]
1467               }
1468 
1469               mapply(
1470                 function(item, index) {
1471                   list(item, index - 1 + startIndex)
1472                 },
1473                 part,
1474                 seq_along(part),
1475                 SIMPLIFY = FALSE)
1476            }
1477 
1478            lapplyPartitionsWithIndex(x, partitionFunc)
1479          })
1480 
1481 #' Coalesce all elements within each partition of an RDD into a list.
1482 #'
1483 #' @param x An RDD.
1484 #' @return An RDD created by coalescing all elements within
1485 #'         each partition into a list.
1486 #' @examples
1487 # nolint start
1488 #'\dontrun{
1489 #' sc <- sparkR.init()
1490 #' rdd <- parallelize(sc, as.list(1:4), 2L)
1491 #' collectRDD(glom(rdd))
1492 #' # list(list(1, 2), list(3, 4))
1493 #'}
1494 # nolint end
1495 #' @rdname glom
1496 #' @aliases glom,RDD
1497 #' @noRd
1498 setMethod("glom",
1499           signature(x = "RDD"),
1500           function(x) {
1501             partitionFunc <- function(part) {
1502               list(part)
1503             }
1504 
1505             lapplyPartition(x, partitionFunc)
1506           })
1507 
1508 ############ Binary Functions #############
1509 
1510 #' Return the union RDD of two RDDs.
1511 #' The same as union() in Spark.
1512 #'
1513 #' @param x An RDD.
1514 #' @param y An RDD.
1515 #' @return a new RDD created by performing the simple union (witout removing
1516 #' duplicates) of two input RDDs.
1517 #' @examples
1518 #'\dontrun{
1519 #' sc <- sparkR.init()
1520 #' rdd <- parallelize(sc, 1:3)
1521 #' unionRDD(rdd, rdd) # 1, 2, 3, 1, 2, 3
1522 #'}
1523 #' @rdname unionRDD
1524 #' @aliases unionRDD,RDD,RDD-method
1525 #' @noRd
1526 setMethod("unionRDD",
1527           signature(x = "RDD", y = "RDD"),
1528           function(x, y) {
1529             if (getSerializedMode(x) == getSerializedMode(y)) {
1530               jrdd <- callJMethod(getJRDD(x), "union", getJRDD(y))
1531               union.rdd <- RDD(jrdd, getSerializedMode(x))
1532             } else {
1533               # One of the RDDs is not serialized, we need to serialize it first.
1534               if (getSerializedMode(x) != "byte") x <- serializeToBytes(x)
1535               if (getSerializedMode(y) != "byte") y <- serializeToBytes(y)
1536               jrdd <- callJMethod(getJRDD(x), "union", getJRDD(y))
1537               union.rdd <- RDD(jrdd, "byte")
1538             }
1539             union.rdd
1540           })
1541 
1542 #' Zip an RDD with another RDD.
1543 #'
1544 #' Zips this RDD with another one, returning key-value pairs with the
1545 #' first element in each RDD second element in each RDD, etc. Assumes
1546 #' that the two RDDs have the same number of partitions and the same
1547 #' number of elements in each partition (e.g. one was made through
1548 #' a map on the other).
1549 #'
1550 #' @param x An RDD to be zipped.
1551 #' @param other Another RDD to be zipped.
1552 #' @return An RDD zipped from the two RDDs.
1553 #' @examples
1554 # nolint start
1555 #'\dontrun{
1556 #' sc <- sparkR.init()
1557 #' rdd1 <- parallelize(sc, 0:4)
1558 #' rdd2 <- parallelize(sc, 1000:1004)
1559 #' collectRDD(zipRDD(rdd1, rdd2))
1560 #' # list(list(0, 1000), list(1, 1001), list(2, 1002), list(3, 1003), list(4, 1004))
1561 #'}
1562 # nolint end
1563 #' @rdname zipRDD
1564 #' @aliases zipRDD,RDD
1565 #' @noRd
1566 setMethod("zipRDD",
1567           signature(x = "RDD", other = "RDD"),
1568           function(x, other) {
1569             n1 <- getNumPartitionsRDD(x)
1570             n2 <- getNumPartitionsRDD(other)
1571             if (n1 != n2) {
1572               stop("Can only zip RDDs which have the same number of partitions.")
1573             }
1574 
1575             rdds <- appendPartitionLengths(x, other)
1576             jrdd <- callJMethod(getJRDD(rdds[[1]]), "zip", getJRDD(rdds[[2]]))
1577             # The jrdd's elements are of scala Tuple2 type. The serialized
1578             # flag here is used for the elements inside the tuples.
1579             rdd <- RDD(jrdd, getSerializedMode(rdds[[1]]))
1580 
1581             mergePartitions(rdd, TRUE)
1582           })
1583 
1584 #' Cartesian product of this RDD and another one.
1585 #'
1586 #' Return the Cartesian product of this RDD and another one,
1587 #' that is, the RDD of all pairs of elements (a, b) where a
1588 #' is in this and b is in other.
1589 #'
1590 #' @param x An RDD.
1591 #' @param other An RDD.
1592 #' @return A new RDD which is the Cartesian product of these two RDDs.
1593 #' @examples
1594 # nolint start
1595 #'\dontrun{
1596 #' sc <- sparkR.init()
1597 #' rdd <- parallelize(sc, 1:2)
1598 #' sortByKey(cartesian(rdd, rdd))
1599 #' # list(list(1, 1), list(1, 2), list(2, 1), list(2, 2))
1600 #'}
1601 # nolint end
1602 #' @rdname cartesian
1603 #' @aliases cartesian,RDD,RDD-method
1604 #' @noRd
1605 setMethod("cartesian",
1606           signature(x = "RDD", other = "RDD"),
1607           function(x, other) {
1608             rdds <- appendPartitionLengths(x, other)
1609             jrdd <- callJMethod(getJRDD(rdds[[1]]), "cartesian", getJRDD(rdds[[2]]))
1610             # The jrdd's elements are of scala Tuple2 type. The serialized
1611             # flag here is used for the elements inside the tuples.
1612             rdd <- RDD(jrdd, getSerializedMode(rdds[[1]]))
1613 
1614             mergePartitions(rdd, FALSE)
1615           })
1616 
1617 #' Subtract an RDD with another RDD.
1618 #'
1619 #' Return an RDD with the elements from this that are not in other.
1620 #'
1621 #' @param x An RDD.
1622 #' @param other An RDD.
1623 #' @param numPartitions Number of the partitions in the result RDD.
1624 #' @return An RDD with the elements from this that are not in other.
1625 #' @examples
1626 # nolint start
1627 #'\dontrun{
1628 #' sc <- sparkR.init()
1629 #' rdd1 <- parallelize(sc, list(1, 1, 2, 2, 3, 4))
1630 #' rdd2 <- parallelize(sc, list(2, 4))
1631 #' collectRDD(subtract(rdd1, rdd2))
1632 #' # list(1, 1, 3)
1633 #'}
1634 # nolint end
1635 #' @rdname subtract
1636 #' @aliases subtract,RDD
1637 #' @noRd
1638 setMethod("subtract",
1639           signature(x = "RDD", other = "RDD"),
1640           function(x, other, numPartitions = SparkR:::getNumPartitionsRDD(x)) {
1641             mapFunction <- function(e) { list(e, NA) }
1642             rdd1 <- map(x, mapFunction)
1643             rdd2 <- map(other, mapFunction)
1644             keys(subtractByKey(rdd1, rdd2, numPartitions))
1645           })
1646 
1647 #' Intersection of this RDD and another one.
1648 #'
1649 #' Return the intersection of this RDD and another one.
1650 #' The output will not contain any duplicate elements,
1651 #' even if the input RDDs did. Performs a hash partition
1652 #' across the cluster.
1653 #' Note that this method performs a shuffle internally.
1654 #'
1655 #' @param x An RDD.
1656 #' @param other An RDD.
1657 #' @param numPartitions The number of partitions in the result RDD.
1658 #' @return An RDD which is the intersection of these two RDDs.
1659 #' @examples
1660 # nolint start
1661 #'\dontrun{
1662 #' sc <- sparkR.init()
1663 #' rdd1 <- parallelize(sc, list(1, 10, 2, 3, 4, 5))
1664 #' rdd2 <- parallelize(sc, list(1, 6, 2, 3, 7, 8))
1665 #' collectRDD(sortBy(intersection(rdd1, rdd2), function(x) { x }))
1666 #' # list(1, 2, 3)
1667 #'}
1668 # nolint end
1669 #' @rdname intersection
1670 #' @aliases intersection,RDD
1671 #' @noRd
1672 setMethod("intersection",
1673           signature(x = "RDD", other = "RDD"),
1674           function(x, other, numPartitions = SparkR:::getNumPartitionsRDD(x)) {
1675             rdd1 <- map(x, function(v) { list(v, NA) })
1676             rdd2 <- map(other, function(v) { list(v, NA) })
1677 
1678             filterFunction <- function(elem) {
1679               iters <- elem[[2]]
1680               all(as.vector(
1681                 lapply(iters, function(iter) { length(iter) > 0 }), mode = "logical"))
1682             }
1683 
1684             keys(filterRDD(cogroup(rdd1, rdd2, numPartitions = numPartitions), filterFunction))
1685           })
1686 
1687 #' Zips an RDD's partitions with one (or more) RDD(s).
1688 #' Same as zipPartitions in Spark.
1689 #'
1690 #' @param ... RDDs to be zipped.
1691 #' @param func A function to transform zipped partitions.
1692 #' @return A new RDD by applying a function to the zipped partitions.
1693 #'         Assumes that all the RDDs have the *same number of partitions*, but
1694 #'         does *not* require them to have the same number of elements in each partition.
1695 #' @examples
1696 # nolint start
1697 #'\dontrun{
1698 #' sc <- sparkR.init()
1699 #' rdd1 <- parallelize(sc, 1:2, 2L)  # 1, 2
1700 #' rdd2 <- parallelize(sc, 1:4, 2L)  # 1:2, 3:4
1701 #' rdd3 <- parallelize(sc, 1:6, 2L)  # 1:3, 4:6
1702 #' collectRDD(zipPartitions(rdd1, rdd2, rdd3,
1703 #'                       func = function(x, y, z) { list(list(x, y, z))} ))
1704 #' # list(list(1, c(1,2), c(1,2,3)), list(2, c(3,4), c(4,5,6)))
1705 #'}
1706 # nolint end
1707 #' @rdname zipRDD
1708 #' @aliases zipPartitions,RDD
1709 #' @noRd
1710 setMethod("zipPartitions",
1711           "RDD",
1712           function(..., func) {
1713             rrdds <- list(...)
1714             if (length(rrdds) == 1) {
1715               return(rrdds[[1]])
1716             }
1717             nPart <- sapply(rrdds, getNumPartitionsRDD)
1718             if (length(unique(nPart)) != 1) {
1719               stop("Can only zipPartitions RDDs which have the same number of partitions.")
1720             }
1721 
1722             rrdds <- lapply(rrdds, function(rdd) {
1723               mapPartitionsWithIndex(rdd, function(partIndex, part) {
1724                 print(length(part))
1725                 list(list(partIndex, part))
1726               })
1727             })
1728             union.rdd <- Reduce(unionRDD, rrdds)
1729             zipped.rdd <- values(groupByKey(union.rdd, numPartitions = nPart[1]))
1730             res <- mapPartitions(zipped.rdd, function(plist) {
1731               do.call(func, plist[[1]])
1732             })
1733             res
1734           })