0001 #
0002 # Licensed to the Apache Software Foundation (ASF) under one or more
0003 # contributor license agreements. See the NOTICE file distributed with
0004 # this work for additional information regarding copyright ownership.
0005 # The ASF licenses this file to You under the Apache License, Version 2.0
0006 # (the "License"); you may not use this file except in compliance with
0007 # the License. You may obtain a copy of the License at
0008 #
0009 # http://www.apache.org/licenses/LICENSE-2.0
0010 #
0011 # Unless required by applicable law or agreed to in writing, software
0012 # distributed under the License is distributed on an "AS IS" BASIS,
0013 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014 # See the License for the specific language governing permissions and
0015 # limitations under the License.
0016 #
0017
0018 # RDD in R implemented in S4 OO system.
0019
0020 setOldClass("jobj")
0021
0022 #' S4 class that represents an RDD
0023 #'
0024 #' RDD can be created using functions like
0025 #' \code{parallelize}, \code{textFile} etc.
0026 #'
0027 #' @rdname RDD
0028 #' @seealso parallelize, textFile
0029 #' @slot env An R environment that stores bookkeeping states of the RDD
0030 #' @slot jrdd Java object reference to the backing JavaRDD
0031 #' to an RDD
0032 #' @noRd
0033 setClass("RDD",
0034 slots = list(env = "environment",
0035 jrdd = "jobj"))
0036
0037 setClass("PipelinedRDD",
0038 slots = list(prev = "RDD",
0039 func = "function",
0040 prev_jrdd = "jobj"),
0041 contains = "RDD")
0042
0043 setMethod("initialize", "RDD", function(.Object, jrdd, serializedMode,
0044 isCached, isCheckpointed) {
0045 # Check that RDD constructor is using the correct version of serializedMode
0046 stopifnot(class(serializedMode) == "character")
0047 stopifnot(serializedMode %in% c("byte", "string", "row"))
0048 # RDD has three serialization types:
0049 # byte: The RDD stores data serialized in R.
0050 # string: The RDD stores data as strings.
0051 # row: The RDD stores the serialized rows of a SparkDataFrame.
0052
0053 # We use an environment to store mutable states inside an RDD object.
0054 # Note that R's call-by-value semantics makes modifying slots inside an
0055 # object (passed as an argument into a function, such as cache()) difficult:
0056 # i.e. one needs to make a copy of the RDD object and sets the new slot value
0057 # there.
0058
0059 # The slots are inheritable from superclass. Here, both `env' and `jrdd' are
0060 # inherited from RDD, but only the former is used.
0061 .Object@env <- new.env()
0062 .Object@env$isCached <- isCached
0063 .Object@env$isCheckpointed <- isCheckpointed
0064 .Object@env$serializedMode <- serializedMode
0065
0066 .Object@jrdd <- jrdd
0067 .Object
0068 })
0069
0070 setMethod("showRDD", "RDD",
0071 function(object) {
0072 cat(paste0(callJMethod(getJRDD(object), "toString"), "\n"))
0073 })
0074
0075 setMethod("initialize", "PipelinedRDD", function(.Object, prev, func, jrdd_val) {
0076 .Object@env <- new.env()
0077 .Object@env$isCached <- FALSE
0078 .Object@env$isCheckpointed <- FALSE
0079 .Object@env$jrdd_val <- jrdd_val
0080 if (!is.null(jrdd_val)) {
0081 # This tracks the serialization mode for jrdd_val
0082 .Object@env$serializedMode <- prev@env$serializedMode
0083 }
0084
0085 .Object@prev <- prev
0086
0087 isPipelinable <- function(rdd) {
0088 e <- rdd@env
0089 # nolint start
0090 !(e$isCached || e$isCheckpointed)
0091 # nolint end
0092 }
0093
0094 if (!inherits(prev, "PipelinedRDD") || !isPipelinable(prev)) {
0095 # This transformation is the first in its stage:
0096 .Object@func <- cleanClosure(func)
0097 .Object@prev_jrdd <- getJRDD(prev)
0098 .Object@env$prev_serializedMode <- prev@env$serializedMode
0099 # NOTE: We use prev_serializedMode to track the serialization mode of prev_JRDD
0100 # prev_serializedMode is used during the delayed computation of JRDD in getJRDD
0101 } else {
0102 pipelinedFunc <- function(partIndex, part) {
0103 f <- prev@func
0104 func(partIndex, f(partIndex, part))
0105 }
0106 .Object@func <- cleanClosure(pipelinedFunc)
0107 .Object@prev_jrdd <- prev@prev_jrdd # maintain the pipeline
0108 # Get the serialization mode of the parent RDD
0109 .Object@env$prev_serializedMode <- prev@env$prev_serializedMode
0110 }
0111
0112 .Object
0113 })
0114
0115 #' @rdname RDD
0116 #' @noRd
0117 #' @param jrdd Java object reference to the backing JavaRDD
0118 #' @param serializedMode Use "byte" if the RDD stores data serialized in R, "string" if the RDD
0119 #' stores strings, and "row" if the RDD stores the rows of a SparkDataFrame
0120 #' @param isCached TRUE if the RDD is cached
0121 #' @param isCheckpointed TRUE if the RDD has been checkpointed
0122 RDD <- function(jrdd, serializedMode = "byte", isCached = FALSE,
0123 isCheckpointed = FALSE) {
0124 new("RDD", jrdd, serializedMode, isCached, isCheckpointed)
0125 }
0126
0127 PipelinedRDD <- function(prev, func) {
0128 new("PipelinedRDD", prev, func, NULL)
0129 }
0130
0131 # Return the serialization mode for an RDD.
0132 setGeneric("getSerializedMode", function(rdd, ...) { standardGeneric("getSerializedMode") })
0133 # For normal RDDs we can directly read the serializedMode
0134 setMethod("getSerializedMode", signature(rdd = "RDD"), function(rdd) rdd@env$serializedMode)
0135 # For pipelined RDDs if jrdd_val is set then serializedMode should exist
0136 # if not we return the defaultSerialization mode of "byte" as we don't know the serialization
0137 # mode at this point in time.
0138 setMethod("getSerializedMode", signature(rdd = "PipelinedRDD"),
0139 function(rdd) {
0140 if (!is.null(rdd@env$jrdd_val)) {
0141 return(rdd@env$serializedMode)
0142 } else {
0143 return("byte")
0144 }
0145 })
0146
0147 # The jrdd accessor function.
0148 setMethod("getJRDD", signature(rdd = "RDD"), function(rdd) rdd@jrdd)
0149 setMethod("getJRDD", signature(rdd = "PipelinedRDD"),
0150 function(rdd, serializedMode = "byte") {
0151 if (!is.null(rdd@env$jrdd_val)) {
0152 return(rdd@env$jrdd_val)
0153 }
0154
0155 packageNamesArr <- serialize(.sparkREnv[[".packages"]],
0156 connection = NULL)
0157
0158 broadcastArr <- lapply(ls(.broadcastNames),
0159 function(name) { get(name, .broadcastNames) })
0160
0161 serializedFuncArr <- serialize(rdd@func, connection = NULL)
0162
0163 prev_jrdd <- rdd@prev_jrdd
0164
0165 if (serializedMode == "string") {
0166 rddRef <- newJObject("org.apache.spark.api.r.StringRRDD",
0167 callJMethod(prev_jrdd, "rdd"),
0168 serializedFuncArr,
0169 rdd@env$prev_serializedMode,
0170 packageNamesArr,
0171 broadcastArr,
0172 callJMethod(prev_jrdd, "classTag"))
0173 } else {
0174 rddRef <- newJObject("org.apache.spark.api.r.RRDD",
0175 callJMethod(prev_jrdd, "rdd"),
0176 serializedFuncArr,
0177 rdd@env$prev_serializedMode,
0178 serializedMode,
0179 packageNamesArr,
0180 broadcastArr,
0181 callJMethod(prev_jrdd, "classTag"))
0182 }
0183 # Save the serialization flag after we create a RRDD
0184 rdd@env$serializedMode <- serializedMode
0185 rdd@env$jrdd_val <- callJMethod(rddRef, "asJavaRDD")
0186 rdd@env$jrdd_val
0187 })
0188
0189 setValidity("RDD",
0190 function(object) {
0191 jrdd <- getJRDD(object)
0192 cls <- callJMethod(jrdd, "getClass")
0193 className <- callJMethod(cls, "getName")
0194 if (grep("spark.api.java.*RDD*", className) == 1) {
0195 TRUE
0196 } else {
0197 paste("Invalid RDD class ", className)
0198 }
0199 })
0200
0201
0202 ############ Actions and Transformations ############
0203
0204 #' Persist an RDD
0205 #'
0206 #' Persist this RDD with the default storage level (MEMORY_ONLY).
0207 #'
0208 #' @param x The RDD to cache
0209 #' @examples
0210 #'\dontrun{
0211 #' sc <- sparkR.init()
0212 #' rdd <- parallelize(sc, 1:10, 2L)
0213 #' cache(rdd)
0214 #'}
0215 #' @rdname cache-methods
0216 #' @aliases cache,RDD-method
0217 #' @noRd
0218 setMethod("cacheRDD",
0219 signature(x = "RDD"),
0220 function(x) {
0221 callJMethod(getJRDD(x), "cache")
0222 x@env$isCached <- TRUE
0223 x
0224 })
0225
0226 #' Persist an RDD
0227 #'
0228 #' Persist this RDD with the specified storage level. For details of the
0229 #' supported storage levels, refer to
0230 #'\url{http://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence}.
0231 #'
0232 #' @param x The RDD to persist
0233 #' @param newLevel The new storage level to be assigned
0234 #' @examples
0235 #'\dontrun{
0236 #' sc <- sparkR.init()
0237 #' rdd <- parallelize(sc, 1:10, 2L)
0238 #' persistRDD(rdd, "MEMORY_AND_DISK")
0239 #'}
0240 #' @rdname persist
0241 #' @aliases persist,RDD-method
0242 #' @noRd
0243 setMethod("persistRDD",
0244 signature(x = "RDD", newLevel = "character"),
0245 function(x, newLevel = "MEMORY_ONLY") {
0246 callJMethod(getJRDD(x), "persist", getStorageLevel(newLevel))
0247 x@env$isCached <- TRUE
0248 x
0249 })
0250
0251 #' Unpersist an RDD
0252 #'
0253 #' Mark the RDD as non-persistent, and remove all blocks for it from memory and
0254 #' disk.
0255 #'
0256 #' @param x The RDD to unpersist
0257 #' @examples
0258 #'\dontrun{
0259 #' sc <- sparkR.init()
0260 #' rdd <- parallelize(sc, 1:10, 2L)
0261 #' cache(rdd) # rdd@@env$isCached == TRUE
0262 #' unpersistRDD(rdd) # rdd@@env$isCached == FALSE
0263 #'}
0264 #' @rdname unpersist
0265 #' @aliases unpersist,RDD-method
0266 #' @noRd
0267 setMethod("unpersistRDD",
0268 signature(x = "RDD"),
0269 function(x) {
0270 callJMethod(getJRDD(x), "unpersist")
0271 x@env$isCached <- FALSE
0272 x
0273 })
0274
0275 #' Checkpoint an RDD
0276 #'
0277 #' Mark this RDD for checkpointing. It will be saved to a file inside the
0278 #' checkpoint directory set with setCheckpointDir() and all references to its
0279 #' parent RDDs will be removed. This function must be called before any job has
0280 #' been executed on this RDD. It is strongly recommended that this RDD is
0281 #' persisted in memory, otherwise saving it on a file will require recomputation.
0282 #'
0283 #' @param x The RDD to checkpoint
0284 #' @examples
0285 #'\dontrun{
0286 #' sc <- sparkR.init()
0287 #' setCheckpointDir(sc, "checkpoint")
0288 #' rdd <- parallelize(sc, 1:10, 2L)
0289 #' checkpoint(rdd)
0290 #'}
0291 #' @rdname checkpoint-methods
0292 #' @aliases checkpoint,RDD-method
0293 #' @noRd
0294 setMethod("checkpointRDD",
0295 signature(x = "RDD"),
0296 function(x) {
0297 jrdd <- getJRDD(x)
0298 callJMethod(jrdd, "checkpoint")
0299 x@env$isCheckpointed <- TRUE
0300 x
0301 })
0302
0303 #' Gets the number of partitions of an RDD
0304 #'
0305 #' @param x A RDD.
0306 #' @return the number of partitions of rdd as an integer.
0307 #' @examples
0308 #'\dontrun{
0309 #' sc <- sparkR.init()
0310 #' rdd <- parallelize(sc, 1:10, 2L)
0311 #' getNumPartitions(rdd) # 2L
0312 #'}
0313 #' @rdname getNumPartitions
0314 #' @aliases getNumPartitions,RDD-method
0315 #' @noRd
0316 setMethod("getNumPartitionsRDD",
0317 signature(x = "RDD"),
0318 function(x) {
0319 callJMethod(getJRDD(x), "getNumPartitions")
0320 })
0321
0322 #' Gets the number of partitions of an RDD, the same as getNumPartitions.
0323 #' But this function has been deprecated, please use getNumPartitions.
0324 #'
0325 #' @rdname getNumPartitions
0326 #' @aliases numPartitions,RDD-method
0327 #' @noRd
0328 setMethod("numPartitions",
0329 signature(x = "RDD"),
0330 function(x) {
0331 .Deprecated("getNumPartitions")
0332 getNumPartitionsRDD(x)
0333 })
0334
0335 #' Collect elements of an RDD
0336 #'
0337 #' @description
0338 #' \code{collect} returns a list that contains all of the elements in this RDD.
0339 #'
0340 #' @param x The RDD to collect
0341 #' @param ... Other optional arguments to collect
0342 #' @param flatten FALSE if the list should not flattened
0343 #' @return a list containing elements in the RDD
0344 #' @examples
0345 #'\dontrun{
0346 #' sc <- sparkR.init()
0347 #' rdd <- parallelize(sc, 1:10, 2L)
0348 #' collectRDD(rdd) # list from 1 to 10
0349 #' collectPartition(rdd, 0L) # list from 1 to 5
0350 #'}
0351 #' @rdname collect-methods
0352 #' @aliases collect,RDD-method
0353 #' @noRd
0354 setMethod("collectRDD",
0355 signature(x = "RDD"),
0356 function(x, flatten = TRUE) {
0357 # Assumes a pairwise RDD is backed by a JavaPairRDD.
0358 collected <- callJMethod(getJRDD(x), "collect")
0359 convertJListToRList(collected, flatten,
0360 serializedMode = getSerializedMode(x))
0361 })
0362
0363
0364 #' @description
0365 #' \code{collectPartition} returns a list that contains all of the elements
0366 #' in the specified partition of the RDD.
0367 #' @param partitionId the partition to collect (starts from 0)
0368 #' @rdname collect-methods
0369 #' @aliases collectPartition,integer,RDD-method
0370 #' @noRd
0371 setMethod("collectPartition",
0372 signature(x = "RDD", partitionId = "integer"),
0373 function(x, partitionId) {
0374 jPartitionsList <- callJMethod(getJRDD(x),
0375 "collectPartitions",
0376 as.list(as.integer(partitionId)))
0377
0378 jList <- jPartitionsList[[1]]
0379 convertJListToRList(jList, flatten = TRUE,
0380 serializedMode = getSerializedMode(x))
0381 })
0382
0383 #' @description
0384 #' \code{collectAsMap} returns a named list as a map that contains all of the elements
0385 #' in a key-value pair RDD.
0386 #' @examples
0387 # nolint start
0388 #'\dontrun{
0389 #' sc <- sparkR.init()
0390 #' rdd <- parallelize(sc, list(list(1, 2), list(3, 4)), 2L)
0391 #' collectAsMap(rdd) # list(`1` = 2, `3` = 4)
0392 #'}
0393 # nolint end
0394 #' @rdname collect-methods
0395 #' @aliases collectAsMap,RDD-method
0396 #' @noRd
0397 setMethod("collectAsMap",
0398 signature(x = "RDD"),
0399 function(x) {
0400 pairList <- collectRDD(x)
0401 map <- new.env()
0402 lapply(pairList, function(i) { assign(as.character(i[[1]]), i[[2]], envir = map) })
0403 as.list(map)
0404 })
0405
0406 #' Return the number of elements in the RDD.
0407 #'
0408 #' @param x The RDD to count
0409 #' @return number of elements in the RDD.
0410 #' @examples
0411 #'\dontrun{
0412 #' sc <- sparkR.init()
0413 #' rdd <- parallelize(sc, 1:10)
0414 #' countRDD(rdd) # 10
0415 #' length(rdd) # Same as count
0416 #'}
0417 #' @rdname count
0418 #' @aliases count,RDD-method
0419 #' @noRd
0420 setMethod("countRDD",
0421 signature(x = "RDD"),
0422 function(x) {
0423 countPartition <- function(part) {
0424 as.integer(length(part))
0425 }
0426 valsRDD <- lapplyPartition(x, countPartition)
0427 vals <- collectRDD(valsRDD)
0428 sum(as.integer(vals))
0429 })
0430
0431 #' Return the number of elements in the RDD
0432 #' @rdname count
0433 #' @noRd
0434 setMethod("lengthRDD",
0435 signature(x = "RDD"),
0436 function(x) {
0437 countRDD(x)
0438 })
0439
0440 #' Return the count of each unique value in this RDD as a list of
0441 #' (value, count) pairs.
0442 #'
0443 #' Same as countByValue in Spark.
0444 #'
0445 #' @param x The RDD to count
0446 #' @return list of (value, count) pairs, where count is number of each unique
0447 #' value in rdd.
0448 #' @examples
0449 # nolint start
0450 #'\dontrun{
0451 #' sc <- sparkR.init()
0452 #' rdd <- parallelize(sc, c(1,2,3,2,1))
0453 #' countByValue(rdd) # (1,2L), (2,2L), (3,1L)
0454 #'}
0455 # nolint end
0456 #' @rdname countByValue
0457 #' @aliases countByValue,RDD-method
0458 #' @noRd
0459 setMethod("countByValue",
0460 signature(x = "RDD"),
0461 function(x) {
0462 ones <- lapply(x, function(item) { list(item, 1L) })
0463 collectRDD(reduceByKey(ones, `+`, getNumPartitionsRDD(x)))
0464 })
0465
0466 #' Apply a function to all elements
0467 #'
0468 #' This function creates a new RDD by applying the given transformation to all
0469 #' elements of the given RDD
0470 #'
0471 #' @param X The RDD to apply the transformation.
0472 #' @param FUN the transformation to apply on each element
0473 #' @return a new RDD created by the transformation.
0474 #' @rdname lapply
0475 #' @noRd
0476 #' @aliases lapply
0477 #' @examples
0478 #'\dontrun{
0479 #' sc <- sparkR.init()
0480 #' rdd <- parallelize(sc, 1:10)
0481 #' multiplyByTwo <- lapply(rdd, function(x) { x * 2 })
0482 #' collectRDD(multiplyByTwo) # 2,4,6...
0483 #'}
0484 setMethod("lapply",
0485 signature(X = "RDD", FUN = "function"),
0486 function(X, FUN) {
0487 func <- function(partIndex, part) {
0488 lapply(part, FUN)
0489 }
0490 lapplyPartitionsWithIndex(X, func)
0491 })
0492
0493 #' @rdname lapply
0494 #' @aliases map,RDD,function-method
0495 #' @noRd
0496 setMethod("map",
0497 signature(X = "RDD", FUN = "function"),
0498 function(X, FUN) {
0499 lapply(X, FUN)
0500 })
0501
0502 #' Flatten results after applying a function to all elements
0503 #'
0504 #' This function returns a new RDD by first applying a function to all
0505 #' elements of this RDD, and then flattening the results.
0506 #'
0507 #' @param X The RDD to apply the transformation.
0508 #' @param FUN the transformation to apply on each element
0509 #' @return a new RDD created by the transformation.
0510 #' @examples
0511 #'\dontrun{
0512 #' sc <- sparkR.init()
0513 #' rdd <- parallelize(sc, 1:10)
0514 #' multiplyByTwo <- flatMap(rdd, function(x) { list(x*2, x*10) })
0515 #' collectRDD(multiplyByTwo) # 2,20,4,40,6,60...
0516 #'}
0517 #' @rdname flatMap
0518 #' @aliases flatMap,RDD,function-method
0519 #' @noRd
0520 setMethod("flatMap",
0521 signature(X = "RDD", FUN = "function"),
0522 function(X, FUN) {
0523 partitionFunc <- function(part) {
0524 unlist(
0525 lapply(part, FUN),
0526 recursive = F
0527 )
0528 }
0529 lapplyPartition(X, partitionFunc)
0530 })
0531
0532 #' Apply a function to each partition of an RDD
0533 #'
0534 #' Return a new RDD by applying a function to each partition of this RDD.
0535 #'
0536 #' @param X The RDD to apply the transformation.
0537 #' @param FUN the transformation to apply on each partition.
0538 #' @return a new RDD created by the transformation.
0539 #' @examples
0540 #'\dontrun{
0541 #' sc <- sparkR.init()
0542 #' rdd <- parallelize(sc, 1:10)
0543 #' partitionSum <- lapplyPartition(rdd, function(part) { Reduce("+", part) })
0544 #' collectRDD(partitionSum) # 15, 40
0545 #'}
0546 #' @rdname lapplyPartition
0547 #' @aliases lapplyPartition,RDD,function-method
0548 #' @noRd
0549 setMethod("lapplyPartition",
0550 signature(X = "RDD", FUN = "function"),
0551 function(X, FUN) {
0552 lapplyPartitionsWithIndex(X, function(s, part) { FUN(part) })
0553 })
0554
0555 #' mapPartitions is the same as lapplyPartition.
0556 #'
0557 #' @rdname lapplyPartition
0558 #' @aliases mapPartitions,RDD,function-method
0559 #' @noRd
0560 setMethod("mapPartitions",
0561 signature(X = "RDD", FUN = "function"),
0562 function(X, FUN) {
0563 lapplyPartition(X, FUN)
0564 })
0565
0566 #' Return a new RDD by applying a function to each partition of this RDD, while
0567 #' tracking the index of the original partition.
0568 #'
0569 #' @param X The RDD to apply the transformation.
0570 #' @param FUN the transformation to apply on each partition; takes the partition
0571 #' index and a list of elements in the particular partition.
0572 #' @return a new RDD created by the transformation.
0573 #' @examples
0574 #'\dontrun{
0575 #' sc <- sparkR.init()
0576 #' rdd <- parallelize(sc, 1:10, 5L)
0577 #' prod <- lapplyPartitionsWithIndex(rdd, function(partIndex, part) {
0578 #' partIndex * Reduce("+", part) })
0579 #' collectRDD(prod, flatten = FALSE) # 0, 7, 22, 45, 76
0580 #'}
0581 #' @rdname lapplyPartitionsWithIndex
0582 #' @aliases lapplyPartitionsWithIndex,RDD,function-method
0583 #' @noRd
0584 setMethod("lapplyPartitionsWithIndex",
0585 signature(X = "RDD", FUN = "function"),
0586 function(X, FUN) {
0587 PipelinedRDD(X, FUN)
0588 })
0589
0590 #' @rdname lapplyPartitionsWithIndex
0591 #' @aliases mapPartitionsWithIndex,RDD,function-method
0592 #' @noRd
0593 setMethod("mapPartitionsWithIndex",
0594 signature(X = "RDD", FUN = "function"),
0595 function(X, FUN) {
0596 lapplyPartitionsWithIndex(X, FUN)
0597 })
0598
0599 #' This function returns a new RDD containing only the elements that satisfy
0600 #' a predicate (i.e. returning TRUE in a given logical function).
0601 #' The same as `filter()' in Spark.
0602 #'
0603 #' @param x The RDD to be filtered.
0604 #' @param f A unary predicate function.
0605 #' @examples
0606 # nolint start
0607 #'\dontrun{
0608 #' sc <- sparkR.init()
0609 #' rdd <- parallelize(sc, 1:10)
0610 #' unlist(collectRDD(filterRDD(rdd, function (x) { x < 3 }))) # c(1, 2)
0611 #'}
0612 # nolint end
0613 #' @rdname filterRDD
0614 #' @aliases filterRDD,RDD,function-method
0615 #' @noRd
0616 setMethod("filterRDD",
0617 signature(x = "RDD", f = "function"),
0618 function(x, f) {
0619 filter.func <- function(part) {
0620 Filter(f, part)
0621 }
0622 lapplyPartition(x, filter.func)
0623 })
0624
0625 #' @rdname filterRDD
0626 #' @aliases Filter
0627 #' @noRd
0628 setMethod("Filter",
0629 signature(f = "function", x = "RDD"),
0630 function(f, x) {
0631 filterRDD(x, f)
0632 })
0633
0634 #' Reduce across elements of an RDD.
0635 #'
0636 #' This function reduces the elements of this RDD using the
0637 #' specified commutative and associative binary operator.
0638 #'
0639 #' @param x The RDD to reduce
0640 #' @param func Commutative and associative function to apply on elements
0641 #' of the RDD.
0642 #' @examples
0643 #'\dontrun{
0644 #' sc <- sparkR.init()
0645 #' rdd <- parallelize(sc, 1:10)
0646 #' reduce(rdd, "+") # 55
0647 #'}
0648 #' @rdname reduce
0649 #' @aliases reduce,RDD,ANY-method
0650 #' @noRd
0651 setMethod("reduce",
0652 signature(x = "RDD", func = "ANY"),
0653 function(x, func) {
0654
0655 reducePartition <- function(part) {
0656 Reduce(func, part)
0657 }
0658
0659 partitionList <- collectRDD(lapplyPartition(x, reducePartition),
0660 flatten = FALSE)
0661 Reduce(func, partitionList)
0662 })
0663
0664 #' Get the maximum element of an RDD.
0665 #'
0666 #' @param x The RDD to get the maximum element from
0667 #' @examples
0668 #'\dontrun{
0669 #' sc <- sparkR.init()
0670 #' rdd <- parallelize(sc, 1:10)
0671 #' maximum(rdd) # 10
0672 #'}
0673 #' @rdname maximum
0674 #' @aliases maximum,RDD
0675 #' @noRd
0676 setMethod("maximum",
0677 signature(x = "RDD"),
0678 function(x) {
0679 reduce(x, max)
0680 })
0681
0682 #' Get the minimum element of an RDD.
0683 #'
0684 #' @param x The RDD to get the minimum element from
0685 #' @examples
0686 #'\dontrun{
0687 #' sc <- sparkR.init()
0688 #' rdd <- parallelize(sc, 1:10)
0689 #' minimum(rdd) # 1
0690 #'}
0691 #' @rdname minimum
0692 #' @aliases minimum,RDD
0693 #' @noRd
0694 setMethod("minimum",
0695 signature(x = "RDD"),
0696 function(x) {
0697 reduce(x, min)
0698 })
0699
0700 #' Add up the elements in an RDD.
0701 #'
0702 #' @param x The RDD to add up the elements in
0703 #' @examples
0704 #'\dontrun{
0705 #' sc <- sparkR.init()
0706 #' rdd <- parallelize(sc, 1:10)
0707 #' sumRDD(rdd) # 55
0708 #'}
0709 #' @rdname sumRDD
0710 #' @aliases sumRDD,RDD
0711 #' @noRd
0712 setMethod("sumRDD",
0713 signature(x = "RDD"),
0714 function(x) {
0715 reduce(x, "+")
0716 })
0717
0718 #' Applies a function to all elements in an RDD, and forces evaluation.
0719 #'
0720 #' @param x The RDD to apply the function
0721 #' @param func The function to be applied.
0722 #' @return invisible NULL.
0723 #' @examples
0724 #'\dontrun{
0725 #' sc <- sparkR.init()
0726 #' rdd <- parallelize(sc, 1:10)
0727 #' foreach(rdd, function(x) { save(x, file=...) })
0728 #'}
0729 #' @rdname foreach
0730 #' @aliases foreach,RDD,function-method
0731 #' @noRd
0732 setMethod("foreach",
0733 signature(x = "RDD", func = "function"),
0734 function(x, func) {
0735 partition.func <- function(x) {
0736 lapply(x, func)
0737 NULL
0738 }
0739 invisible(collectRDD(mapPartitions(x, partition.func)))
0740 })
0741
0742 #' Applies a function to each partition in an RDD, and forces evaluation.
0743 #'
0744 #' @examples
0745 #'\dontrun{
0746 #' sc <- sparkR.init()
0747 #' rdd <- parallelize(sc, 1:10)
0748 #' foreachPartition(rdd, function(part) { save(part, file=...); NULL })
0749 #'}
0750 #' @rdname foreach
0751 #' @aliases foreachPartition,RDD,function-method
0752 #' @noRd
0753 setMethod("foreachPartition",
0754 signature(x = "RDD", func = "function"),
0755 function(x, func) {
0756 invisible(collectRDD(mapPartitions(x, func)))
0757 })
0758
0759 #' Take elements from an RDD.
0760 #'
0761 #' This function takes the first NUM elements in the RDD and
0762 #' returns them in a list.
0763 #'
0764 #' @param x The RDD to take elements from
0765 #' @param num Number of elements to take
0766 #' @examples
0767 # nolint start
0768 #'\dontrun{
0769 #' sc <- sparkR.init()
0770 #' rdd <- parallelize(sc, 1:10)
0771 #' takeRDD(rdd, 2L) # list(1, 2)
0772 #'}
0773 # nolint end
0774 #' @rdname take
0775 #' @aliases take,RDD,numeric-method
0776 #' @noRd
0777 setMethod("takeRDD",
0778 signature(x = "RDD", num = "numeric"),
0779 function(x, num) {
0780 resList <- list()
0781 index <- -1
0782 jrdd <- getJRDD(x)
0783 numPartitions <- getNumPartitionsRDD(x)
0784 serializedModeRDD <- getSerializedMode(x)
0785
0786 # TODO(shivaram): Collect more than one partition based on size
0787 # estimates similar to the scala version of `take`.
0788 while (TRUE) {
0789 index <- index + 1
0790
0791 if (length(resList) >= num || index >= numPartitions)
0792 break
0793
0794 # a JList of byte arrays
0795 partitionArr <- callJMethod(jrdd, "collectPartitions", as.list(as.integer(index)))
0796 partition <- partitionArr[[1]]
0797
0798 size <- num - length(resList)
0799 # elems is capped to have at most `size` elements
0800 elems <- convertJListToRList(partition,
0801 flatten = TRUE,
0802 logicalUpperBound = size,
0803 serializedMode = serializedModeRDD)
0804
0805 resList <- append(resList, elems)
0806 }
0807 resList
0808 })
0809
0810
0811 #' First
0812 #'
0813 #' Return the first element of an RDD
0814 #'
0815 #' @rdname first
0816 #' @examples
0817 #'\dontrun{
0818 #' sc <- sparkR.init()
0819 #' rdd <- parallelize(sc, 1:10)
0820 #' firstRDD(rdd)
0821 #' }
0822 #' @noRd
0823 setMethod("firstRDD",
0824 signature(x = "RDD"),
0825 function(x) {
0826 takeRDD(x, 1)[[1]]
0827 })
0828
0829 #' Removes the duplicates from RDD.
0830 #'
0831 #' This function returns a new RDD containing the distinct elements in the
0832 #' given RDD. The same as `distinct()' in Spark.
0833 #'
0834 #' @param x The RDD to remove duplicates from.
0835 #' @param numPartitions Number of partitions to create.
0836 #' @examples
0837 # nolint start
0838 #'\dontrun{
0839 #' sc <- sparkR.init()
0840 #' rdd <- parallelize(sc, c(1,2,2,3,3,3))
0841 #' sort(unlist(collectRDD(distinctRDD(rdd)))) # c(1, 2, 3)
0842 #'}
0843 # nolint end
0844 #' @rdname distinct
0845 #' @aliases distinct,RDD-method
0846 #' @noRd
0847 setMethod("distinctRDD",
0848 signature(x = "RDD"),
0849 function(x, numPartitions = SparkR:::getNumPartitionsRDD(x)) {
0850 identical.mapped <- lapply(x, function(x) { list(x, NULL) })
0851 reduced <- reduceByKey(identical.mapped,
0852 function(x, y) { x },
0853 numPartitions)
0854 resRDD <- lapply(reduced, function(x) { x[[1]] })
0855 resRDD
0856 })
0857
0858 #' Return an RDD that is a sampled subset of the given RDD.
0859 #'
0860 #' The same as `sample()' in Spark. (We rename it due to signature
0861 #' inconsistencies with the `sample()' function in R's base package.)
0862 #'
0863 #' @param x The RDD to sample elements from
0864 #' @param withReplacement Sampling with replacement or not
0865 #' @param fraction The (rough) sample target fraction
0866 #' @param seed Randomness seed value
0867 #' @examples
0868 #'\dontrun{
0869 #' sc <- sparkR.init()
0870 #' rdd <- parallelize(sc, 1:10)
0871 #' collectRDD(sampleRDD(rdd, FALSE, 0.5, 1618L)) # ~5 distinct elements
0872 #' collectRDD(sampleRDD(rdd, TRUE, 0.5, 9L)) # ~5 elements possibly with duplicates
0873 #'}
0874 #' @rdname sampleRDD
0875 #' @aliases sampleRDD,RDD
0876 #' @noRd
0877 setMethod("sampleRDD",
0878 signature(x = "RDD", withReplacement = "logical",
0879 fraction = "numeric", seed = "integer"),
0880 function(x, withReplacement, fraction, seed) {
0881
0882 # The sampler: takes a partition and returns its sampled version.
0883 samplingFunc <- function(partIndex, part) {
0884 set.seed(seed)
0885 res <- vector("list", length(part))
0886 len <- 0
0887
0888 # Discards some random values to ensure each partition has a
0889 # different random seed.
0890 stats::runif(partIndex)
0891
0892 for (elem in part) {
0893 if (withReplacement) {
0894 count <- stats::rpois(1, fraction)
0895 if (count > 0) {
0896 res[(len + 1) : (len + count)] <- rep(list(elem), count)
0897 len <- len + count
0898 }
0899 } else {
0900 if (stats::runif(1) < fraction) {
0901 len <- len + 1
0902 res[[len]] <- elem
0903 }
0904 }
0905 }
0906
0907 # TODO(zongheng): look into the performance of the current
0908 # implementation. Look into some iterator package? Note that
0909 # Scala avoids many calls to creating an empty list and PySpark
0910 # similarly achieves this using `yield'.
0911 if (len > 0)
0912 res[1:len]
0913 else
0914 list()
0915 }
0916
0917 lapplyPartitionsWithIndex(x, samplingFunc)
0918 })
0919
0920 #' Return a list of the elements that are a sampled subset of the given RDD.
0921 #'
0922 #' @param x The RDD to sample elements from
0923 #' @param withReplacement Sampling with replacement or not
0924 #' @param num Number of elements to return
0925 #' @param seed Randomness seed value
0926 #' @examples
0927 #'\dontrun{
0928 #' sc <- sparkR.init()
0929 #' rdd <- parallelize(sc, 1:100)
0930 #' # exactly 5 elements sampled, which may not be distinct
0931 #' takeSample(rdd, TRUE, 5L, 1618L)
0932 #' # exactly 5 distinct elements sampled
0933 #' takeSample(rdd, FALSE, 5L, 16181618L)
0934 #'}
0935 #' @rdname takeSample
0936 #' @aliases takeSample,RDD
0937 #' @noRd
0938 setMethod("takeSample", signature(x = "RDD", withReplacement = "logical",
0939 num = "integer", seed = "integer"),
0940 function(x, withReplacement, num, seed) {
0941 # This function is ported from RDD.scala.
0942 fraction <- 0.0
0943 total <- 0
0944 multiplier <- 3.0
0945 initialCount <- countRDD(x)
0946 maxSelected <- 0
0947 MAXINT <- .Machine$integer.max
0948
0949 if (num < 0)
0950 stop("Negative number of elements requested")
0951
0952 if (initialCount > MAXINT - 1) {
0953 maxSelected <- MAXINT - 1
0954 } else {
0955 maxSelected <- initialCount
0956 }
0957
0958 if (num > initialCount && !withReplacement) {
0959 total <- maxSelected
0960 fraction <- multiplier * (maxSelected + 1) / initialCount
0961 } else {
0962 total <- num
0963 fraction <- multiplier * (num + 1) / initialCount
0964 }
0965
0966 set.seed(seed)
0967 samples <- collectRDD(sampleRDD(x, withReplacement, fraction,
0968 as.integer(ceiling(stats::runif(1,
0969 -MAXINT,
0970 MAXINT)))))
0971 # If the first sample didn't turn out large enough, keep trying to
0972 # take samples; this shouldn't happen often because we use a big
0973 # multiplier for thei initial size
0974 while (length(samples) < total)
0975 samples <- collectRDD(sampleRDD(x, withReplacement, fraction,
0976 as.integer(ceiling(stats::runif(1,
0977 -MAXINT,
0978 MAXINT)))))
0979
0980 # TODO(zongheng): investigate if this call is an in-place shuffle?
0981 base::sample(samples)[1:total]
0982 })
0983
0984 #' Creates tuples of the elements in this RDD by applying a function.
0985 #'
0986 #' @param x The RDD.
0987 #' @param func The function to be applied.
0988 #' @examples
0989 # nolint start
0990 #'\dontrun{
0991 #' sc <- sparkR.init()
0992 #' rdd <- parallelize(sc, list(1, 2, 3))
0993 #' collectRDD(keyBy(rdd, function(x) { x*x })) # list(list(1, 1), list(4, 2), list(9, 3))
0994 #'}
0995 # nolint end
0996 #' @rdname keyBy
0997 #' @aliases keyBy,RDD
0998 #' @noRd
0999 setMethod("keyBy",
1000 signature(x = "RDD", func = "function"),
1001 function(x, func) {
1002 apply.func <- function(x) {
1003 list(func(x), x)
1004 }
1005 lapply(x, apply.func)
1006 })
1007
1008 #' Return a new RDD that has exactly numPartitions partitions.
1009 #' Can increase or decrease the level of parallelism in this RDD. Internally,
1010 #' this uses a shuffle to redistribute data.
1011 #' If you are decreasing the number of partitions in this RDD, consider using
1012 #' coalesce, which can avoid performing a shuffle.
1013 #'
1014 #' @param x The RDD.
1015 #' @param numPartitions Number of partitions to create.
1016 #' @seealso coalesce
1017 #' @examples
1018 #'\dontrun{
1019 #' sc <- sparkR.init()
1020 #' rdd <- parallelize(sc, list(1, 2, 3, 4, 5, 6, 7), 4L)
1021 #' getNumPartitions(rdd) # 4
1022 #' getNumPartitions(repartitionRDD(rdd, 2L)) # 2
1023 #'}
1024 #' @rdname repartition
1025 #' @aliases repartition,RDD
1026 #' @noRd
1027 setMethod("repartitionRDD",
1028 signature(x = "RDD"),
1029 function(x, numPartitions) {
1030 if (!is.null(numPartitions) && is.numeric(numPartitions)) {
1031 coalesceRDD(x, numPartitions, TRUE)
1032 } else {
1033 stop("Please, specify the number of partitions")
1034 }
1035 })
1036
1037 #' Return a new RDD that is reduced into numPartitions partitions.
1038 #'
1039 #' @param x The RDD.
1040 #' @param numPartitions Number of partitions to create.
1041 #' @seealso repartition
1042 #' @examples
1043 #'\dontrun{
1044 #' sc <- sparkR.init()
1045 #' rdd <- parallelize(sc, list(1, 2, 3, 4, 5), 3L)
1046 #' getNumPartitions(rdd) # 3
1047 #' getNumPartitions(coalesce(rdd, 1L)) # 1
1048 #'}
1049 #' @rdname coalesce
1050 #' @aliases coalesce,RDD
1051 #' @noRd
1052 setMethod("coalesceRDD",
1053 signature(x = "RDD", numPartitions = "numeric"),
1054 function(x, numPartitions, shuffle = FALSE) {
1055 numPartitions <- numToInt(numPartitions)
1056 if (shuffle || numPartitions > SparkR:::getNumPartitionsRDD(x)) {
1057 func <- function(partIndex, part) {
1058 set.seed(partIndex) # partIndex as seed
1059 start <- as.integer(base::sample(numPartitions, 1) - 1)
1060 lapply(seq_along(part),
1061 function(i) {
1062 pos <- (start + i) %% numPartitions
1063 list(pos, part[[i]])
1064 })
1065 }
1066 shuffled <- lapplyPartitionsWithIndex(x, func)
1067 repartitioned <- partitionByRDD(shuffled, numPartitions)
1068 values(repartitioned)
1069 } else {
1070 jrdd <- callJMethod(getJRDD(x), "coalesce", numPartitions, shuffle)
1071 RDD(jrdd)
1072 }
1073 })
1074
1075 #' Save this RDD as a SequenceFile of serialized objects.
1076 #'
1077 #' @param x The RDD to save
1078 #' @param path The directory where the file is saved
1079 #' @seealso objectFile
1080 #' @examples
1081 #'\dontrun{
1082 #' sc <- sparkR.init()
1083 #' rdd <- parallelize(sc, 1:3)
1084 #' saveAsObjectFile(rdd, "/tmp/sparkR-tmp")
1085 #'}
1086 #' @rdname saveAsObjectFile
1087 #' @aliases saveAsObjectFile,RDD
1088 #' @noRd
1089 setMethod("saveAsObjectFile",
1090 signature(x = "RDD", path = "character"),
1091 function(x, path) {
1092 # If serializedMode == "string" we need to serialize the data before saving it since
1093 # objectFile() assumes serializedMode == "byte".
1094 if (getSerializedMode(x) != "byte") {
1095 x <- serializeToBytes(x)
1096 }
1097 # Return nothing
1098 invisible(callJMethod(getJRDD(x), "saveAsObjectFile", path))
1099 })
1100
1101 #' Save this RDD as a text file, using string representations of elements.
1102 #'
1103 #' @param x The RDD to save
1104 #' @param path The directory where the partitions of the text file are saved
1105 #' @examples
1106 #'\dontrun{
1107 #' sc <- sparkR.init()
1108 #' rdd <- parallelize(sc, 1:3)
1109 #' saveAsTextFile(rdd, "/tmp/sparkR-tmp")
1110 #'}
1111 #' @rdname saveAsTextFile
1112 #' @aliases saveAsTextFile,RDD
1113 #' @noRd
1114 setMethod("saveAsTextFile",
1115 signature(x = "RDD", path = "character"),
1116 function(x, path) {
1117 func <- function(str) {
1118 toString(str)
1119 }
1120 stringRdd <- lapply(x, func)
1121 # Return nothing
1122 invisible(
1123 callJMethod(getJRDD(stringRdd, serializedMode = "string"), "saveAsTextFile", path))
1124 })
1125
1126 #' Sort an RDD by the given key function.
1127 #'
1128 #' @param x An RDD to be sorted.
1129 #' @param func A function used to compute the sort key for each element.
1130 #' @param ascending A flag to indicate whether the sorting is ascending or descending.
1131 #' @param numPartitions Number of partitions to create.
1132 #' @return An RDD where all elements are sorted.
1133 #' @examples
1134 # nolint start
1135 #'\dontrun{
1136 #' sc <- sparkR.init()
1137 #' rdd <- parallelize(sc, list(3, 2, 1))
1138 #' collectRDD(sortBy(rdd, function(x) { x })) # list (1, 2, 3)
1139 #'}
1140 # nolint end
1141 #' @rdname sortBy
1142 #' @aliases sortBy,RDD,RDD-method
1143 #' @noRd
1144 setMethod("sortBy",
1145 signature(x = "RDD", func = "function"),
1146 function(x, func, ascending = TRUE, numPartitions = SparkR:::getNumPartitionsRDD(x)) {
1147 values(sortByKey(keyBy(x, func), ascending, numPartitions))
1148 })
1149
1150 # Helper function to get first N elements from an RDD in the specified order.
1151 # Param:
1152 # x An RDD.
1153 # num Number of elements to return.
1154 # ascending A flag to indicate whether the sorting is ascending or descending.
1155 # Return:
1156 # A list of the first N elements from the RDD in the specified order.
1157 #
1158 takeOrderedElem <- function(x, num, ascending = TRUE) {
1159 if (num <= 0L) {
1160 return(list())
1161 }
1162
1163 partitionFunc <- function(part) {
1164 if (num < length(part)) {
1165 # R limitation: order works only on primitive types!
1166 ord <- order(unlist(part, recursive = FALSE), decreasing = !ascending)
1167 part[ord[1:num]]
1168 } else {
1169 part
1170 }
1171 }
1172
1173 newRdd <- mapPartitions(x, partitionFunc)
1174
1175 resList <- list()
1176 index <- -1
1177 jrdd <- getJRDD(newRdd)
1178 numPartitions <- getNumPartitionsRDD(newRdd)
1179 serializedModeRDD <- getSerializedMode(newRdd)
1180
1181 while (TRUE) {
1182 index <- index + 1
1183
1184 if (index >= numPartitions) {
1185 ord <- order(unlist(resList, recursive = FALSE), decreasing = !ascending)
1186 resList <- resList[ord[1:num]]
1187 break
1188 }
1189
1190 # a JList of byte arrays
1191 partitionArr <- callJMethod(jrdd, "collectPartitions", as.list(as.integer(index)))
1192 partition <- partitionArr[[1]]
1193
1194 # elems is capped to have at most `num` elements
1195 elems <- convertJListToRList(partition,
1196 flatten = TRUE,
1197 logicalUpperBound = num,
1198 serializedMode = serializedModeRDD)
1199
1200 resList <- append(resList, elems)
1201 }
1202 resList
1203 }
1204
1205 #' Returns the first N elements from an RDD in ascending order.
1206 #'
1207 #' @param x An RDD.
1208 #' @param num Number of elements to return.
1209 #' @return The first N elements from the RDD in ascending order.
1210 #' @examples
1211 # nolint start
1212 #'\dontrun{
1213 #' sc <- sparkR.init()
1214 #' rdd <- parallelize(sc, list(10, 1, 2, 9, 3, 4, 5, 6, 7))
1215 #' takeOrdered(rdd, 6L) # list(1, 2, 3, 4, 5, 6)
1216 #'}
1217 # nolint end
1218 #' @rdname takeOrdered
1219 #' @aliases takeOrdered,RDD,RDD-method
1220 #' @noRd
1221 setMethod("takeOrdered",
1222 signature(x = "RDD", num = "integer"),
1223 function(x, num) {
1224 takeOrderedElem(x, num)
1225 })
1226
1227 #' Returns the top N elements from an RDD.
1228 #'
1229 #' @param x An RDD.
1230 #' @param num Number of elements to return.
1231 #' @return The top N elements from the RDD.
1232 #' @rdname top
1233 #' @examples
1234 # nolint start
1235 #'\dontrun{
1236 #' sc <- sparkR.init()
1237 #' rdd <- parallelize(sc, list(10, 1, 2, 9, 3, 4, 5, 6, 7))
1238 #' top(rdd, 6L) # list(10, 9, 7, 6, 5, 4)
1239 #'}
1240 # nolint end
1241 #' @aliases top,RDD,RDD-method
1242 #' @noRd
1243 setMethod("top",
1244 signature(x = "RDD", num = "integer"),
1245 function(x, num) {
1246 takeOrderedElem(x, num, FALSE)
1247 })
1248
1249 #' Fold an RDD using a given associative function and a neutral "zero value".
1250 #'
1251 #' Aggregate the elements of each partition, and then the results for all the
1252 #' partitions, using a given associative function and a neutral "zero value".
1253 #'
1254 #' @param x An RDD.
1255 #' @param zeroValue A neutral "zero value".
1256 #' @param op An associative function for the folding operation.
1257 #' @return The folding result.
1258 #' @rdname fold
1259 #' @seealso reduce
1260 #' @examples
1261 #'\dontrun{
1262 #' sc <- sparkR.init()
1263 #' rdd <- parallelize(sc, list(1, 2, 3, 4, 5))
1264 #' fold(rdd, 0, "+") # 15
1265 #'}
1266 #' @aliases fold,RDD,RDD-method
1267 #' @noRd
1268 setMethod("fold",
1269 signature(x = "RDD", zeroValue = "ANY", op = "ANY"),
1270 function(x, zeroValue, op) {
1271 aggregateRDD(x, zeroValue, op, op)
1272 })
1273
1274 #' Aggregate an RDD using the given combine functions and a neutral "zero value".
1275 #'
1276 #' Aggregate the elements of each partition, and then the results for all the
1277 #' partitions, using given combine functions and a neutral "zero value".
1278 #'
1279 #' @param x An RDD.
1280 #' @param zeroValue A neutral "zero value".
1281 #' @param seqOp A function to aggregate the RDD elements. It may return a different
1282 #' result type from the type of the RDD elements.
1283 #' @param combOp A function to aggregate results of seqOp.
1284 #' @return The aggregation result.
1285 #' @rdname aggregateRDD
1286 #' @seealso reduce
1287 #' @examples
1288 # nolint start
1289 #'\dontrun{
1290 #' sc <- sparkR.init()
1291 #' rdd <- parallelize(sc, list(1, 2, 3, 4))
1292 #' zeroValue <- list(0, 0)
1293 #' seqOp <- function(x, y) { list(x[[1]] + y, x[[2]] + 1) }
1294 #' combOp <- function(x, y) { list(x[[1]] + y[[1]], x[[2]] + y[[2]]) }
1295 #' aggregateRDD(rdd, zeroValue, seqOp, combOp) # list(10, 4)
1296 #'}
1297 # nolint end
1298 #' @aliases aggregateRDD,RDD,RDD-method
1299 #' @noRd
1300 setMethod("aggregateRDD",
1301 signature(x = "RDD", zeroValue = "ANY", seqOp = "ANY", combOp = "ANY"),
1302 function(x, zeroValue, seqOp, combOp) {
1303 partitionFunc <- function(part) {
1304 Reduce(seqOp, part, zeroValue)
1305 }
1306
1307 partitionList <- collectRDD(lapplyPartition(x, partitionFunc),
1308 flatten = FALSE)
1309 Reduce(combOp, partitionList, zeroValue)
1310 })
1311
1312 #' Pipes elements to a forked external process.
1313 #'
1314 #' The same as 'pipe()' in Spark.
1315 #'
1316 #' @param x The RDD whose elements are piped to the forked external process.
1317 #' @param command The command to fork an external process.
1318 #' @param env A named list to set environment variables of the external process.
1319 #' @return A new RDD created by piping all elements to a forked external process.
1320 #' @rdname pipeRDD
1321 #' @examples
1322 #'\dontrun{
1323 #' sc <- sparkR.init()
1324 #' rdd <- parallelize(sc, 1:10)
1325 #' pipeRDD(rdd, "more")
1326 #' Output: c("1", "2", ..., "10")
1327 #'}
1328 #' @aliases pipeRDD,RDD,character-method
1329 #' @noRd
1330 setMethod("pipeRDD",
1331 signature(x = "RDD", command = "character"),
1332 function(x, command, env = list()) {
1333 func <- function(part) {
1334 trim_trailing_func <- function(x) {
1335 sub("[\r\n]*$", "", toString(x))
1336 }
1337 input <- unlist(lapply(part, trim_trailing_func))
1338 res <- system2(command, stdout = TRUE, input = input, env = env)
1339 lapply(res, trim_trailing_func)
1340 }
1341 lapplyPartition(x, func)
1342 })
1343
1344 #' TODO: Consider caching the name in the RDD's environment
1345 #' Return an RDD's name.
1346 #'
1347 #' @param x The RDD whose name is returned.
1348 #' @rdname name
1349 #' @examples
1350 #'\dontrun{
1351 #' sc <- sparkR.init()
1352 #' rdd <- parallelize(sc, list(1,2,3))
1353 #' name(rdd) # NULL (if not set before)
1354 #'}
1355 #' @aliases name,RDD
1356 #' @noRd
1357 setMethod("name",
1358 signature(x = "RDD"),
1359 function(x) {
1360 callJMethod(getJRDD(x), "name")
1361 })
1362
1363 #' Set an RDD's name.
1364 #'
1365 #' @param x The RDD whose name is to be set.
1366 #' @param name The RDD name to be set.
1367 #' @return a new RDD renamed.
1368 #' @rdname setName
1369 #' @examples
1370 #'\dontrun{
1371 #' sc <- sparkR.init()
1372 #' rdd <- parallelize(sc, list(1,2,3))
1373 #' setName(rdd, "myRDD")
1374 #' name(rdd) # "myRDD"
1375 #'}
1376 #' @aliases setName,RDD
1377 #' @noRd
1378 setMethod("setName",
1379 signature(x = "RDD", name = "character"),
1380 function(x, name) {
1381 callJMethod(getJRDD(x), "setName", name)
1382 x
1383 })
1384
1385 #' Zip an RDD with generated unique Long IDs.
1386 #'
1387 #' Items in the kth partition will get ids k, n+k, 2*n+k, ..., where
1388 #' n is the number of partitions. So there may exist gaps, but this
1389 #' method won't trigger a spark job, which is different from
1390 #' zipWithIndex.
1391 #'
1392 #' @param x An RDD to be zipped.
1393 #' @return An RDD with zipped items.
1394 #' @seealso zipWithIndex
1395 #' @examples
1396 # nolint start
1397 #'\dontrun{
1398 #' sc <- sparkR.init()
1399 #' rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 3L)
1400 #' collectRDD(zipWithUniqueId(rdd))
1401 #' # list(list("a", 0), list("b", 3), list("c", 1), list("d", 4), list("e", 2))
1402 #'}
1403 # nolint end
1404 #' @rdname zipWithUniqueId
1405 #' @aliases zipWithUniqueId,RDD
1406 #' @noRd
1407 setMethod("zipWithUniqueId",
1408 signature(x = "RDD"),
1409 function(x) {
1410 n <- getNumPartitionsRDD(x)
1411
1412 partitionFunc <- function(partIndex, part) {
1413 mapply(
1414 function(item, index) {
1415 list(item, (index - 1) * n + partIndex)
1416 },
1417 part,
1418 seq_along(part),
1419 SIMPLIFY = FALSE)
1420 }
1421
1422 lapplyPartitionsWithIndex(x, partitionFunc)
1423 })
1424
1425 #' Zip an RDD with its element indices.
1426 #'
1427 #' The ordering is first based on the partition index and then the
1428 #' ordering of items within each partition. So the first item in
1429 #' the first partition gets index 0, and the last item in the last
1430 #' partition receives the largest index.
1431 #'
1432 #' This method needs to trigger a Spark job when this RDD contains
1433 #' more than one partition.
1434 #'
1435 #' @param x An RDD to be zipped.
1436 #' @return An RDD with zipped items.
1437 #' @seealso zipWithUniqueId
1438 #' @examples
1439 # nolint start
1440 #'\dontrun{
1441 #' sc <- sparkR.init()
1442 #' rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 3L)
1443 #' collectRDD(zipWithIndex(rdd))
1444 #' # list(list("a", 0), list("b", 1), list("c", 2), list("d", 3), list("e", 4))
1445 #'}
1446 # nolint end
1447 #' @rdname zipWithIndex
1448 #' @aliases zipWithIndex,RDD
1449 #' @noRd
1450 setMethod("zipWithIndex",
1451 signature(x = "RDD"),
1452 function(x) {
1453 n <- getNumPartitionsRDD(x)
1454 if (n > 1) {
1455 nums <- collectRDD(lapplyPartition(x,
1456 function(part) {
1457 list(length(part))
1458 }))
1459 startIndices <- Reduce("+", nums, accumulate = TRUE)
1460 }
1461
1462 partitionFunc <- function(partIndex, part) {
1463 if (partIndex == 0) {
1464 startIndex <- 0
1465 } else {
1466 startIndex <- startIndices[[partIndex]]
1467 }
1468
1469 mapply(
1470 function(item, index) {
1471 list(item, index - 1 + startIndex)
1472 },
1473 part,
1474 seq_along(part),
1475 SIMPLIFY = FALSE)
1476 }
1477
1478 lapplyPartitionsWithIndex(x, partitionFunc)
1479 })
1480
1481 #' Coalesce all elements within each partition of an RDD into a list.
1482 #'
1483 #' @param x An RDD.
1484 #' @return An RDD created by coalescing all elements within
1485 #' each partition into a list.
1486 #' @examples
1487 # nolint start
1488 #'\dontrun{
1489 #' sc <- sparkR.init()
1490 #' rdd <- parallelize(sc, as.list(1:4), 2L)
1491 #' collectRDD(glom(rdd))
1492 #' # list(list(1, 2), list(3, 4))
1493 #'}
1494 # nolint end
1495 #' @rdname glom
1496 #' @aliases glom,RDD
1497 #' @noRd
1498 setMethod("glom",
1499 signature(x = "RDD"),
1500 function(x) {
1501 partitionFunc <- function(part) {
1502 list(part)
1503 }
1504
1505 lapplyPartition(x, partitionFunc)
1506 })
1507
1508 ############ Binary Functions #############
1509
1510 #' Return the union RDD of two RDDs.
1511 #' The same as union() in Spark.
1512 #'
1513 #' @param x An RDD.
1514 #' @param y An RDD.
1515 #' @return a new RDD created by performing the simple union (witout removing
1516 #' duplicates) of two input RDDs.
1517 #' @examples
1518 #'\dontrun{
1519 #' sc <- sparkR.init()
1520 #' rdd <- parallelize(sc, 1:3)
1521 #' unionRDD(rdd, rdd) # 1, 2, 3, 1, 2, 3
1522 #'}
1523 #' @rdname unionRDD
1524 #' @aliases unionRDD,RDD,RDD-method
1525 #' @noRd
1526 setMethod("unionRDD",
1527 signature(x = "RDD", y = "RDD"),
1528 function(x, y) {
1529 if (getSerializedMode(x) == getSerializedMode(y)) {
1530 jrdd <- callJMethod(getJRDD(x), "union", getJRDD(y))
1531 union.rdd <- RDD(jrdd, getSerializedMode(x))
1532 } else {
1533 # One of the RDDs is not serialized, we need to serialize it first.
1534 if (getSerializedMode(x) != "byte") x <- serializeToBytes(x)
1535 if (getSerializedMode(y) != "byte") y <- serializeToBytes(y)
1536 jrdd <- callJMethod(getJRDD(x), "union", getJRDD(y))
1537 union.rdd <- RDD(jrdd, "byte")
1538 }
1539 union.rdd
1540 })
1541
1542 #' Zip an RDD with another RDD.
1543 #'
1544 #' Zips this RDD with another one, returning key-value pairs with the
1545 #' first element in each RDD second element in each RDD, etc. Assumes
1546 #' that the two RDDs have the same number of partitions and the same
1547 #' number of elements in each partition (e.g. one was made through
1548 #' a map on the other).
1549 #'
1550 #' @param x An RDD to be zipped.
1551 #' @param other Another RDD to be zipped.
1552 #' @return An RDD zipped from the two RDDs.
1553 #' @examples
1554 # nolint start
1555 #'\dontrun{
1556 #' sc <- sparkR.init()
1557 #' rdd1 <- parallelize(sc, 0:4)
1558 #' rdd2 <- parallelize(sc, 1000:1004)
1559 #' collectRDD(zipRDD(rdd1, rdd2))
1560 #' # list(list(0, 1000), list(1, 1001), list(2, 1002), list(3, 1003), list(4, 1004))
1561 #'}
1562 # nolint end
1563 #' @rdname zipRDD
1564 #' @aliases zipRDD,RDD
1565 #' @noRd
1566 setMethod("zipRDD",
1567 signature(x = "RDD", other = "RDD"),
1568 function(x, other) {
1569 n1 <- getNumPartitionsRDD(x)
1570 n2 <- getNumPartitionsRDD(other)
1571 if (n1 != n2) {
1572 stop("Can only zip RDDs which have the same number of partitions.")
1573 }
1574
1575 rdds <- appendPartitionLengths(x, other)
1576 jrdd <- callJMethod(getJRDD(rdds[[1]]), "zip", getJRDD(rdds[[2]]))
1577 # The jrdd's elements are of scala Tuple2 type. The serialized
1578 # flag here is used for the elements inside the tuples.
1579 rdd <- RDD(jrdd, getSerializedMode(rdds[[1]]))
1580
1581 mergePartitions(rdd, TRUE)
1582 })
1583
1584 #' Cartesian product of this RDD and another one.
1585 #'
1586 #' Return the Cartesian product of this RDD and another one,
1587 #' that is, the RDD of all pairs of elements (a, b) where a
1588 #' is in this and b is in other.
1589 #'
1590 #' @param x An RDD.
1591 #' @param other An RDD.
1592 #' @return A new RDD which is the Cartesian product of these two RDDs.
1593 #' @examples
1594 # nolint start
1595 #'\dontrun{
1596 #' sc <- sparkR.init()
1597 #' rdd <- parallelize(sc, 1:2)
1598 #' sortByKey(cartesian(rdd, rdd))
1599 #' # list(list(1, 1), list(1, 2), list(2, 1), list(2, 2))
1600 #'}
1601 # nolint end
1602 #' @rdname cartesian
1603 #' @aliases cartesian,RDD,RDD-method
1604 #' @noRd
1605 setMethod("cartesian",
1606 signature(x = "RDD", other = "RDD"),
1607 function(x, other) {
1608 rdds <- appendPartitionLengths(x, other)
1609 jrdd <- callJMethod(getJRDD(rdds[[1]]), "cartesian", getJRDD(rdds[[2]]))
1610 # The jrdd's elements are of scala Tuple2 type. The serialized
1611 # flag here is used for the elements inside the tuples.
1612 rdd <- RDD(jrdd, getSerializedMode(rdds[[1]]))
1613
1614 mergePartitions(rdd, FALSE)
1615 })
1616
1617 #' Subtract an RDD with another RDD.
1618 #'
1619 #' Return an RDD with the elements from this that are not in other.
1620 #'
1621 #' @param x An RDD.
1622 #' @param other An RDD.
1623 #' @param numPartitions Number of the partitions in the result RDD.
1624 #' @return An RDD with the elements from this that are not in other.
1625 #' @examples
1626 # nolint start
1627 #'\dontrun{
1628 #' sc <- sparkR.init()
1629 #' rdd1 <- parallelize(sc, list(1, 1, 2, 2, 3, 4))
1630 #' rdd2 <- parallelize(sc, list(2, 4))
1631 #' collectRDD(subtract(rdd1, rdd2))
1632 #' # list(1, 1, 3)
1633 #'}
1634 # nolint end
1635 #' @rdname subtract
1636 #' @aliases subtract,RDD
1637 #' @noRd
1638 setMethod("subtract",
1639 signature(x = "RDD", other = "RDD"),
1640 function(x, other, numPartitions = SparkR:::getNumPartitionsRDD(x)) {
1641 mapFunction <- function(e) { list(e, NA) }
1642 rdd1 <- map(x, mapFunction)
1643 rdd2 <- map(other, mapFunction)
1644 keys(subtractByKey(rdd1, rdd2, numPartitions))
1645 })
1646
1647 #' Intersection of this RDD and another one.
1648 #'
1649 #' Return the intersection of this RDD and another one.
1650 #' The output will not contain any duplicate elements,
1651 #' even if the input RDDs did. Performs a hash partition
1652 #' across the cluster.
1653 #' Note that this method performs a shuffle internally.
1654 #'
1655 #' @param x An RDD.
1656 #' @param other An RDD.
1657 #' @param numPartitions The number of partitions in the result RDD.
1658 #' @return An RDD which is the intersection of these two RDDs.
1659 #' @examples
1660 # nolint start
1661 #'\dontrun{
1662 #' sc <- sparkR.init()
1663 #' rdd1 <- parallelize(sc, list(1, 10, 2, 3, 4, 5))
1664 #' rdd2 <- parallelize(sc, list(1, 6, 2, 3, 7, 8))
1665 #' collectRDD(sortBy(intersection(rdd1, rdd2), function(x) { x }))
1666 #' # list(1, 2, 3)
1667 #'}
1668 # nolint end
1669 #' @rdname intersection
1670 #' @aliases intersection,RDD
1671 #' @noRd
1672 setMethod("intersection",
1673 signature(x = "RDD", other = "RDD"),
1674 function(x, other, numPartitions = SparkR:::getNumPartitionsRDD(x)) {
1675 rdd1 <- map(x, function(v) { list(v, NA) })
1676 rdd2 <- map(other, function(v) { list(v, NA) })
1677
1678 filterFunction <- function(elem) {
1679 iters <- elem[[2]]
1680 all(as.vector(
1681 lapply(iters, function(iter) { length(iter) > 0 }), mode = "logical"))
1682 }
1683
1684 keys(filterRDD(cogroup(rdd1, rdd2, numPartitions = numPartitions), filterFunction))
1685 })
1686
1687 #' Zips an RDD's partitions with one (or more) RDD(s).
1688 #' Same as zipPartitions in Spark.
1689 #'
1690 #' @param ... RDDs to be zipped.
1691 #' @param func A function to transform zipped partitions.
1692 #' @return A new RDD by applying a function to the zipped partitions.
1693 #' Assumes that all the RDDs have the *same number of partitions*, but
1694 #' does *not* require them to have the same number of elements in each partition.
1695 #' @examples
1696 # nolint start
1697 #'\dontrun{
1698 #' sc <- sparkR.init()
1699 #' rdd1 <- parallelize(sc, 1:2, 2L) # 1, 2
1700 #' rdd2 <- parallelize(sc, 1:4, 2L) # 1:2, 3:4
1701 #' rdd3 <- parallelize(sc, 1:6, 2L) # 1:3, 4:6
1702 #' collectRDD(zipPartitions(rdd1, rdd2, rdd3,
1703 #' func = function(x, y, z) { list(list(x, y, z))} ))
1704 #' # list(list(1, c(1,2), c(1,2,3)), list(2, c(3,4), c(4,5,6)))
1705 #'}
1706 # nolint end
1707 #' @rdname zipRDD
1708 #' @aliases zipPartitions,RDD
1709 #' @noRd
1710 setMethod("zipPartitions",
1711 "RDD",
1712 function(..., func) {
1713 rrdds <- list(...)
1714 if (length(rrdds) == 1) {
1715 return(rrdds[[1]])
1716 }
1717 nPart <- sapply(rrdds, getNumPartitionsRDD)
1718 if (length(unique(nPart)) != 1) {
1719 stop("Can only zipPartitions RDDs which have the same number of partitions.")
1720 }
1721
1722 rrdds <- lapply(rrdds, function(rdd) {
1723 mapPartitionsWithIndex(rdd, function(partIndex, part) {
1724 print(length(part))
1725 list(list(partIndex, part))
1726 })
1727 })
1728 union.rdd <- Reduce(unionRDD, rrdds)
1729 zipped.rdd <- values(groupByKey(union.rdd, numPartitions = nPart[1]))
1730 res <- mapPartitions(zipped.rdd, function(plist) {
1731 do.call(func, plist[[1]])
1732 })
1733 res
1734 })