Back to home page

OSCL-LXR

 
 

    


0001 #
0002 # Licensed to the Apache Software Foundation (ASF) under one or more
0003 # contributor license agreements.  See the NOTICE file distributed with
0004 # this work for additional information regarding copyright ownership.
0005 # The ASF licenses this file to You under the Apache License, Version 2.0
0006 # (the "License"); you may not use this file except in compliance with
0007 # the License.  You may obtain a copy of the License at
0008 #
0009 #    http://www.apache.org/licenses/LICENSE-2.0
0010 #
0011 # Unless required by applicable law or agreed to in writing, software
0012 # distributed under the License is distributed on an "AS IS" BASIS,
0013 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014 # See the License for the specific language governing permissions and
0015 # limitations under the License.
0016 #
0017 
0018 # Operations supported on RDDs contains pairs (i.e key, value)
0019 #' @include generics.R jobj.R RDD.R
0020 NULL
0021 
0022 ############ Actions and Transformations ############
0023 
0024 #' Look up elements of a key in an RDD
0025 #'
0026 #' @description
0027 #' \code{lookup} returns a list of values in this RDD for key key.
0028 #'
0029 #' @param x The RDD to collect
0030 #' @param key The key to look up for
0031 #' @return a list of values in this RDD for key key
0032 #' @examples
0033 # nolint start
0034 #'\dontrun{
0035 #' sc <- sparkR.init()
0036 #' pairs <- list(c(1, 1), c(2, 2), c(1, 3))
0037 #' rdd <- parallelize(sc, pairs)
0038 #' lookup(rdd, 1) # list(1, 3)
0039 #'}
0040 # nolint end
0041 #' @rdname lookup
0042 #' @aliases lookup,RDD-method
0043 #' @noRd
0044 setMethod("lookup",
0045           signature(x = "RDD", key = "ANY"),
0046           function(x, key) {
0047             partitionFunc <- function(part) {
0048               filtered <- part[unlist(lapply(part, function(i) { identical(key, i[[1]]) }))]
0049               lapply(filtered, function(i) { i[[2]] })
0050             }
0051             valsRDD <- lapplyPartition(x, partitionFunc)
0052             collectRDD(valsRDD)
0053           })
0054 
0055 #' Count the number of elements for each key, and return the result to the
0056 #' master as lists of (key, count) pairs.
0057 #'
0058 #' Same as countByKey in Spark.
0059 #'
0060 #' @param x The RDD to count keys.
0061 #' @return list of (key, count) pairs, where count is number of each key in rdd.
0062 #' @examples
0063 # nolint start
0064 #'\dontrun{
0065 #' sc <- sparkR.init()
0066 #' rdd <- parallelize(sc, list(c("a", 1), c("b", 1), c("a", 1)))
0067 #' countByKey(rdd) # ("a", 2L), ("b", 1L)
0068 #'}
0069 # nolint end
0070 #' @rdname countByKey
0071 #' @aliases countByKey,RDD-method
0072 #' @noRd
0073 setMethod("countByKey",
0074           signature(x = "RDD"),
0075           function(x) {
0076             keys <- lapply(x, function(item) { item[[1]] })
0077             countByValue(keys)
0078           })
0079 
0080 #' Return an RDD with the keys of each tuple.
0081 #'
0082 #' @param x The RDD from which the keys of each tuple is returned.
0083 #' @examples
0084 # nolint start
0085 #'\dontrun{
0086 #' sc <- sparkR.init()
0087 #' rdd <- parallelize(sc, list(list(1, 2), list(3, 4)))
0088 #' collectRDD(keys(rdd)) # list(1, 3)
0089 #'}
0090 # nolint end
0091 #' @rdname keys
0092 #' @aliases keys,RDD
0093 #' @noRd
0094 setMethod("keys",
0095           signature(x = "RDD"),
0096           function(x) {
0097             func <- function(k) {
0098               k[[1]]
0099             }
0100             lapply(x, func)
0101           })
0102 
0103 #' Return an RDD with the values of each tuple.
0104 #'
0105 #' @param x The RDD from which the values of each tuple is returned.
0106 #' @examples
0107 # nolint start
0108 #'\dontrun{
0109 #' sc <- sparkR.init()
0110 #' rdd <- parallelize(sc, list(list(1, 2), list(3, 4)))
0111 #' collectRDD(values(rdd)) # list(2, 4)
0112 #'}
0113 # nolint end
0114 #' @rdname values
0115 #' @aliases values,RDD
0116 #' @noRd
0117 setMethod("values",
0118           signature(x = "RDD"),
0119           function(x) {
0120             func <- function(v) {
0121               v[[2]]
0122             }
0123             lapply(x, func)
0124           })
0125 
0126 #' Applies a function to all values of the elements, without modifying the keys.
0127 #'
0128 #' The same as `mapValues()' in Spark.
0129 #'
0130 #' @param X The RDD to apply the transformation.
0131 #' @param FUN the transformation to apply on the value of each element.
0132 #' @return a new RDD created by the transformation.
0133 #' @examples
0134 #'\dontrun{
0135 #' sc <- sparkR.init()
0136 #' rdd <- parallelize(sc, 1:10)
0137 #' makePairs <- lapply(rdd, function(x) { list(x, x) })
0138 #' collectRDD(mapValues(makePairs, function(x) { x * 2) })
0139 #' Output: list(list(1,2), list(2,4), list(3,6), ...)
0140 #'}
0141 #' @rdname mapValues
0142 #' @aliases mapValues,RDD,function-method
0143 #' @noRd
0144 setMethod("mapValues",
0145           signature(X = "RDD", FUN = "function"),
0146           function(X, FUN) {
0147             func <- function(x) {
0148               list(x[[1]], FUN(x[[2]]))
0149             }
0150             lapply(X, func)
0151           })
0152 
0153 #' Pass each value in the key-value pair RDD through a flatMap function without
0154 #' changing the keys; this also retains the original RDD's partitioning.
0155 #'
0156 #' The same as 'flatMapValues()' in Spark.
0157 #'
0158 #' @param X The RDD to apply the transformation.
0159 #' @param FUN the transformation to apply on the value of each element.
0160 #' @return a new RDD created by the transformation.
0161 #' @examples
0162 #'\dontrun{
0163 #' sc <- sparkR.init()
0164 #' rdd <- parallelize(sc, list(list(1, c(1,2)), list(2, c(3,4))))
0165 #' collectRDD(flatMapValues(rdd, function(x) { x }))
0166 #' Output: list(list(1,1), list(1,2), list(2,3), list(2,4))
0167 #'}
0168 #' @rdname flatMapValues
0169 #' @aliases flatMapValues,RDD,function-method
0170 #' @noRd
0171 setMethod("flatMapValues",
0172           signature(X = "RDD", FUN = "function"),
0173           function(X, FUN) {
0174             flatMapFunc <- function(x) {
0175               lapply(FUN(x[[2]]), function(v) { list(x[[1]], v) })
0176             }
0177             flatMap(X, flatMapFunc)
0178           })
0179 
0180 ############ Shuffle Functions ############
0181 
0182 #' Partition an RDD by key
0183 #'
0184 #' This function operates on RDDs where every element is of the form list(K, V) or c(K, V).
0185 #' For each element of this RDD, the partitioner is used to compute a hash
0186 #' function and the RDD is partitioned using this hash value.
0187 #'
0188 #' @param x The RDD to partition. Should be an RDD where each element is
0189 #'             list(K, V) or c(K, V).
0190 #' @param numPartitions Number of partitions to create.
0191 #' @param ... Other optional arguments to partitionBy.
0192 #'
0193 #' @param partitionFunc The partition function to use. Uses a default hashCode
0194 #'                      function if not provided
0195 #' @return An RDD partitioned using the specified partitioner.
0196 #' @examples
0197 #'\dontrun{
0198 #' sc <- sparkR.init()
0199 #' pairs <- list(list(1, 2), list(1.1, 3), list(1, 4))
0200 #' rdd <- parallelize(sc, pairs)
0201 #' parts <- partitionByRDD(rdd, 2L)
0202 #' collectPartition(parts, 0L) # First partition should contain list(1, 2) and list(1, 4)
0203 #'}
0204 #' @rdname partitionBy
0205 #' @aliases partitionBy,RDD,integer-method
0206 #' @noRd
0207 setMethod("partitionByRDD",
0208           signature(x = "RDD"),
0209           function(x, numPartitions, partitionFunc = hashCode) {
0210             stopifnot(is.numeric(numPartitions))
0211 
0212             partitionFunc <- cleanClosure(partitionFunc)
0213             serializedHashFuncBytes <- serialize(partitionFunc, connection = NULL)
0214 
0215             packageNamesArr <- serialize(.sparkREnv$.packages,
0216                                          connection = NULL)
0217             broadcastArr <- lapply(ls(.broadcastNames),
0218                                    function(name) { get(name, .broadcastNames) })
0219             jrdd <- getJRDD(x)
0220 
0221             # We create a PairwiseRRDD that extends RDD[(Int, Array[Byte])],
0222             # where the key is the target partition number, the value is
0223             # the content (key-val pairs).
0224             pairwiseRRDD <- newJObject("org.apache.spark.api.r.PairwiseRRDD",
0225                                        callJMethod(jrdd, "rdd"),
0226                                        numToInt(numPartitions),
0227                                        serializedHashFuncBytes,
0228                                        getSerializedMode(x),
0229                                        packageNamesArr,
0230                                        broadcastArr,
0231                                        callJMethod(jrdd, "classTag"))
0232 
0233             # Create a corresponding partitioner.
0234             rPartitioner <- newJObject("org.apache.spark.HashPartitioner",
0235                                        numToInt(numPartitions))
0236 
0237             # Call partitionBy on the obtained PairwiseRDD.
0238             javaPairRDD <- callJMethod(pairwiseRRDD, "asJavaPairRDD")
0239             javaPairRDD <- callJMethod(javaPairRDD, "partitionBy", rPartitioner)
0240 
0241             # Call .values() on the result to get back the final result, the
0242             # shuffled acutal content key-val pairs.
0243             r <- callJMethod(javaPairRDD, "values")
0244 
0245             RDD(r, serializedMode = "byte")
0246           })
0247 
0248 #' Group values by key
0249 #'
0250 #' This function operates on RDDs where every element is of the form list(K, V) or c(K, V).
0251 #' and group values for each key in the RDD into a single sequence.
0252 #'
0253 #' @param x The RDD to group. Should be an RDD where each element is
0254 #'             list(K, V) or c(K, V).
0255 #' @param numPartitions Number of partitions to create.
0256 #' @return An RDD where each element is list(K, list(V))
0257 #' @seealso reduceByKey
0258 #' @examples
0259 #'\dontrun{
0260 #' sc <- sparkR.init()
0261 #' pairs <- list(list(1, 2), list(1.1, 3), list(1, 4))
0262 #' rdd <- parallelize(sc, pairs)
0263 #' parts <- groupByKey(rdd, 2L)
0264 #' grouped <- collectRDD(parts)
0265 #' grouped[[1]] # Should be a list(1, list(2, 4))
0266 #'}
0267 #' @rdname groupByKey
0268 #' @aliases groupByKey,RDD,integer-method
0269 #' @noRd
0270 setMethod("groupByKey",
0271           signature(x = "RDD", numPartitions = "numeric"),
0272           function(x, numPartitions) {
0273             shuffled <- partitionByRDD(x, numPartitions)
0274             groupVals <- function(part) {
0275               vals <- new.env()
0276               keys <- new.env()
0277               pred <- function(item) exists(item$hash, keys)
0278               appendList <- function(acc, i) {
0279                 addItemToAccumulator(acc, i)
0280                 acc
0281               }
0282               makeList <- function(i) {
0283                 acc <- initAccumulator()
0284                 addItemToAccumulator(acc, i)
0285                 acc
0286               }
0287               # Each item in the partition is list of (K, V)
0288               lapply(part,
0289                      function(item) {
0290                        item$hash <- as.character(hashCode(item[[1]]))
0291                        updateOrCreatePair(item, keys, vals, pred,
0292                                           appendList, makeList)
0293                      })
0294               # extract out data field
0295               vals <- eapply(vals,
0296                              function(i) {
0297                                length(i$data) <- i$counter
0298                                i$data
0299                              })
0300               # Every key in the environment contains a list
0301               # Convert that to list(K, Seq[V])
0302               convertEnvsToList(keys, vals)
0303             }
0304             lapplyPartition(shuffled, groupVals)
0305           })
0306 
0307 #'  Merge values by key
0308 #'
0309 #' This function operates on RDDs where every element is of the form list(K, V) or c(K, V).
0310 #' and merges the values for each key using an associative and commutative reduce function.
0311 #'
0312 #' @param x The RDD to reduce by key. Should be an RDD where each element is
0313 #'             list(K, V) or c(K, V).
0314 #' @param combineFunc The associative and commutative reduce function to use.
0315 #' @param numPartitions Number of partitions to create.
0316 #' @return An RDD where each element is list(K, V') where V' is the merged
0317 #'         value
0318 #' @examples
0319 #'\dontrun{
0320 #' sc <- sparkR.init()
0321 #' pairs <- list(list(1, 2), list(1.1, 3), list(1, 4))
0322 #' rdd <- parallelize(sc, pairs)
0323 #' parts <- reduceByKey(rdd, "+", 2L)
0324 #' reduced <- collectRDD(parts)
0325 #' reduced[[1]] # Should be a list(1, 6)
0326 #'}
0327 #' @rdname reduceByKey
0328 #' @aliases reduceByKey,RDD,integer-method
0329 #' @noRd
0330 setMethod("reduceByKey",
0331           signature(x = "RDD", combineFunc = "ANY", numPartitions = "numeric"),
0332           function(x, combineFunc, numPartitions) {
0333             reduceVals <- function(part) {
0334               vals <- new.env()
0335               keys <- new.env()
0336               pred <- function(item) exists(item$hash, keys)
0337               lapply(part,
0338                      function(item) {
0339                        item$hash <- as.character(hashCode(item[[1]]))
0340                        updateOrCreatePair(item, keys, vals, pred, combineFunc, identity)
0341                      })
0342               convertEnvsToList(keys, vals)
0343             }
0344             locallyReduced <- lapplyPartition(x, reduceVals)
0345             shuffled <- partitionByRDD(locallyReduced, numToInt(numPartitions))
0346             lapplyPartition(shuffled, reduceVals)
0347           })
0348 
0349 #' Merge values by key locally
0350 #'
0351 #' This function operates on RDDs where every element is of the form list(K, V) or c(K, V).
0352 #' and merges the values for each key using an associative and commutative reduce function, but
0353 #' return the results immediately to the driver as an R list.
0354 #'
0355 #' @param x The RDD to reduce by key. Should be an RDD where each element is
0356 #'             list(K, V) or c(K, V).
0357 #' @param combineFunc The associative and commutative reduce function to use.
0358 #' @return A list of elements of type list(K, V') where V' is the merged value for each key
0359 #' @seealso reduceByKey
0360 #' @examples
0361 # nolint start
0362 #'\dontrun{
0363 #' sc <- sparkR.init()
0364 #' pairs <- list(list(1, 2), list(1.1, 3), list(1, 4))
0365 #' rdd <- parallelize(sc, pairs)
0366 #' reduced <- reduceByKeyLocally(rdd, "+")
0367 #' reduced # list(list(1, 6), list(1.1, 3))
0368 #'}
0369 # nolint end
0370 #' @rdname reduceByKeyLocally
0371 #' @aliases reduceByKeyLocally,RDD,integer-method
0372 #' @noRd
0373 setMethod("reduceByKeyLocally",
0374           signature(x = "RDD", combineFunc = "ANY"),
0375           function(x, combineFunc) {
0376             reducePart <- function(part) {
0377               vals <- new.env()
0378               keys <- new.env()
0379               pred <- function(item) exists(item$hash, keys)
0380               lapply(part,
0381                      function(item) {
0382                        item$hash <- as.character(hashCode(item[[1]]))
0383                        updateOrCreatePair(item, keys, vals, pred, combineFunc, identity)
0384                      })
0385               list(list(keys, vals)) # return hash to avoid re-compute in merge
0386             }
0387             mergeParts <- function(accum, x) {
0388               pred <- function(item) {
0389                 exists(item$hash, accum[[1]])
0390               }
0391               lapply(ls(x[[1]]),
0392                      function(name) {
0393                        item <- list(x[[1]][[name]], x[[2]][[name]])
0394                        item$hash <- name
0395                        updateOrCreatePair(item, accum[[1]], accum[[2]], pred, combineFunc, identity)
0396                      })
0397               accum
0398             }
0399             reduced <- mapPartitions(x, reducePart)
0400             merged <- reduce(reduced, mergeParts)
0401             convertEnvsToList(merged[[1]], merged[[2]])
0402           })
0403 
0404 #' Combine values by key
0405 #'
0406 #' Generic function to combine the elements for each key using a custom set of
0407 #' aggregation functions. Turns an RDD[(K, V)] into a result of type RDD[(K, C)],
0408 #' for a "combined type" C. Note that V and C can be different -- for example, one
0409 #' might group an RDD of type (Int, Int) into an RDD of type (Int, Seq[Int]).
0410 #' Users provide three functions:
0411 #' \itemize{
0412 #'   \item createCombiner, which turns a V into a C (e.g., creates a one-element list)
0413 #'   \item mergeValue, to merge a V into a C (e.g., adds it to the end of a list) -
0414 #'   \item mergeCombiners, to combine two C's into a single one (e.g., concatentates
0415 #'    two lists).
0416 #' }
0417 #'
0418 #' @param x The RDD to combine. Should be an RDD where each element is
0419 #'             list(K, V) or c(K, V).
0420 #' @param createCombiner Create a combiner (C) given a value (V)
0421 #' @param mergeValue Merge the given value (V) with an existing combiner (C)
0422 #' @param mergeCombiners Merge two combiners and return a new combiner
0423 #' @param numPartitions Number of partitions to create.
0424 #' @return An RDD where each element is list(K, C) where C is the combined type
0425 #' @seealso groupByKey, reduceByKey
0426 #' @examples
0427 # nolint start
0428 #'\dontrun{
0429 #' sc <- sparkR.init()
0430 #' pairs <- list(list(1, 2), list(1.1, 3), list(1, 4))
0431 #' rdd <- parallelize(sc, pairs)
0432 #' parts <- combineByKey(rdd, function(x) { x }, "+", "+", 2L)
0433 #' combined <- collectRDD(parts)
0434 #' combined[[1]] # Should be a list(1, 6)
0435 #'}
0436 # nolint end
0437 #' @rdname combineByKey
0438 #' @aliases combineByKey,RDD,ANY,ANY,ANY,integer-method
0439 #' @noRd
0440 setMethod("combineByKey",
0441           signature(x = "RDD", createCombiner = "ANY", mergeValue = "ANY",
0442                     mergeCombiners = "ANY", numPartitions = "numeric"),
0443           function(x, createCombiner, mergeValue, mergeCombiners, numPartitions) {
0444             combineLocally <- function(part) {
0445               combiners <- new.env()
0446               keys <- new.env()
0447               pred <- function(item) exists(item$hash, keys)
0448               lapply(part,
0449                      function(item) {
0450                        item$hash <- as.character(hashCode(item[[1]]))
0451                        updateOrCreatePair(item, keys, combiners, pred, mergeValue, createCombiner)
0452                      })
0453               convertEnvsToList(keys, combiners)
0454             }
0455             locallyCombined <- lapplyPartition(x, combineLocally)
0456             shuffled <- partitionByRDD(locallyCombined, numToInt(numPartitions))
0457             mergeAfterShuffle <- function(part) {
0458               combiners <- new.env()
0459               keys <- new.env()
0460               pred <- function(item) exists(item$hash, keys)
0461               lapply(part,
0462                      function(item) {
0463                        item$hash <- as.character(hashCode(item[[1]]))
0464                        updateOrCreatePair(item, keys, combiners, pred, mergeCombiners, identity)
0465                      })
0466               convertEnvsToList(keys, combiners)
0467             }
0468             lapplyPartition(shuffled, mergeAfterShuffle)
0469           })
0470 
0471 #' Aggregate a pair RDD by each key.
0472 #'
0473 #' Aggregate the values of each key in an RDD, using given combine functions
0474 #' and a neutral "zero value". This function can return a different result type,
0475 #' U, than the type of the values in this RDD, V. Thus, we need one operation
0476 #' for merging a V into a U and one operation for merging two U's, The former
0477 #' operation is used for merging values within a partition, and the latter is
0478 #' used for merging values between partitions. To avoid memory allocation, both
0479 #' of these functions are allowed to modify and return their first argument
0480 #' instead of creating a new U.
0481 #'
0482 #' @param x An RDD.
0483 #' @param zeroValue A neutral "zero value".
0484 #' @param seqOp A function to aggregate the values of each key. It may return
0485 #'              a different result type from the type of the values.
0486 #' @param combOp A function to aggregate results of seqOp.
0487 #' @return An RDD containing the aggregation result.
0488 #' @seealso foldByKey, combineByKey
0489 #' @examples
0490 # nolint start
0491 #'\dontrun{
0492 #' sc <- sparkR.init()
0493 #' rdd <- parallelize(sc, list(list(1, 1), list(1, 2), list(2, 3), list(2, 4)))
0494 #' zeroValue <- list(0, 0)
0495 #' seqOp <- function(x, y) { list(x[[1]] + y, x[[2]] + 1) }
0496 #' combOp <- function(x, y) { list(x[[1]] + y[[1]], x[[2]] + y[[2]]) }
0497 #' aggregateByKey(rdd, zeroValue, seqOp, combOp, 2L)
0498 #'   # list(list(1, list(3, 2)), list(2, list(7, 2)))
0499 #'}
0500 # nolint end
0501 #' @rdname aggregateByKey
0502 #' @aliases aggregateByKey,RDD,ANY,ANY,ANY,integer-method
0503 #' @noRd
0504 setMethod("aggregateByKey",
0505           signature(x = "RDD", zeroValue = "ANY", seqOp = "ANY",
0506                     combOp = "ANY", numPartitions = "numeric"),
0507           function(x, zeroValue, seqOp, combOp, numPartitions) {
0508             createCombiner <- function(v) {
0509               do.call(seqOp, list(zeroValue, v))
0510             }
0511 
0512             combineByKey(x, createCombiner, seqOp, combOp, numPartitions)
0513           })
0514 
0515 #' Fold a pair RDD by each key.
0516 #'
0517 #' Aggregate the values of each key in an RDD, using an associative function "func"
0518 #' and a neutral "zero value" which may be added to the result an arbitrary
0519 #' number of times, and must not change the result (e.g., 0 for addition, or
0520 #' 1 for multiplication.).
0521 #'
0522 #' @param x An RDD.
0523 #' @param zeroValue A neutral "zero value".
0524 #' @param func An associative function for folding values of each key.
0525 #' @return An RDD containing the aggregation result.
0526 #' @seealso aggregateByKey, combineByKey
0527 #' @examples
0528 # nolint start
0529 #'\dontrun{
0530 #' sc <- sparkR.init()
0531 #' rdd <- parallelize(sc, list(list(1, 1), list(1, 2), list(2, 3), list(2, 4)))
0532 #' foldByKey(rdd, 0, "+", 2L) # list(list(1, 3), list(2, 7))
0533 #'}
0534 # nolint end
0535 #' @rdname foldByKey
0536 #' @aliases foldByKey,RDD,ANY,ANY,integer-method
0537 #' @noRd
0538 setMethod("foldByKey",
0539           signature(x = "RDD", zeroValue = "ANY",
0540                     func = "ANY", numPartitions = "numeric"),
0541           function(x, zeroValue, func, numPartitions) {
0542             aggregateByKey(x, zeroValue, func, func, numPartitions)
0543           })
0544 
0545 ############ Binary Functions #############
0546 
0547 #' Join two RDDs
0548 #'
0549 #' @description
0550 #' \code{join} This function joins two RDDs where every element is of the form list(K, V).
0551 #' The key types of the two RDDs should be the same.
0552 #'
0553 #' @param x An RDD to be joined. Should be an RDD where each element is
0554 #'             list(K, V).
0555 #' @param y An RDD to be joined. Should be an RDD where each element is
0556 #'             list(K, V).
0557 #' @param numPartitions Number of partitions to create.
0558 #' @return a new RDD containing all pairs of elements with matching keys in
0559 #'         two input RDDs.
0560 #' @examples
0561 # nolint start
0562 #'\dontrun{
0563 #' sc <- sparkR.init()
0564 #' rdd1 <- parallelize(sc, list(list(1, 1), list(2, 4)))
0565 #' rdd2 <- parallelize(sc, list(list(1, 2), list(1, 3)))
0566 #' joinRDD(rdd1, rdd2, 2L) # list(list(1, list(1, 2)), list(1, list(1, 3))
0567 #'}
0568 # nolint end
0569 #' @rdname join-methods
0570 #' @aliases join,RDD,RDD-method
0571 #' @noRd
0572 setMethod("joinRDD",
0573           signature(x = "RDD", y = "RDD"),
0574           function(x, y, numPartitions) {
0575             xTagged <- lapply(x, function(i) { list(i[[1]], list(1L, i[[2]])) })
0576             yTagged <- lapply(y, function(i) { list(i[[1]], list(2L, i[[2]])) })
0577 
0578             doJoin <- function(v) {
0579               joinTaggedList(v, list(FALSE, FALSE))
0580             }
0581 
0582             joined <- flatMapValues(groupByKey(unionRDD(xTagged, yTagged), numPartitions),
0583                                     doJoin)
0584           })
0585 
0586 #' Left outer join two RDDs
0587 #'
0588 #' @description
0589 #' \code{leftouterjoin} This function left-outer-joins two RDDs where every element is of
0590 #' the form list(K, V). The key types of the two RDDs should be the same.
0591 #'
0592 #' @param x An RDD to be joined. Should be an RDD where each element is
0593 #'             list(K, V).
0594 #' @param y An RDD to be joined. Should be an RDD where each element is
0595 #'             list(K, V).
0596 #' @param numPartitions Number of partitions to create.
0597 #' @return For each element (k, v) in x, the resulting RDD will either contain
0598 #'         all pairs (k, (v, w)) for (k, w) in rdd2, or the pair (k, (v, NULL))
0599 #'         if no elements in rdd2 have key k.
0600 #' @examples
0601 # nolint start
0602 #'\dontrun{
0603 #' sc <- sparkR.init()
0604 #' rdd1 <- parallelize(sc, list(list(1, 1), list(2, 4)))
0605 #' rdd2 <- parallelize(sc, list(list(1, 2), list(1, 3)))
0606 #' leftOuterJoin(rdd1, rdd2, 2L)
0607 #' # list(list(1, list(1, 2)), list(1, list(1, 3)), list(2, list(4, NULL)))
0608 #'}
0609 # nolint end
0610 #' @rdname join-methods
0611 #' @aliases leftOuterJoin,RDD,RDD-method
0612 #' @noRd
0613 setMethod("leftOuterJoin",
0614           signature(x = "RDD", y = "RDD", numPartitions = "numeric"),
0615           function(x, y, numPartitions) {
0616             xTagged <- lapply(x, function(i) { list(i[[1]], list(1L, i[[2]])) })
0617             yTagged <- lapply(y, function(i) { list(i[[1]], list(2L, i[[2]])) })
0618 
0619             doJoin <- function(v) {
0620               joinTaggedList(v, list(FALSE, TRUE))
0621             }
0622 
0623             joined <- flatMapValues(groupByKey(unionRDD(xTagged, yTagged), numPartitions), doJoin)
0624           })
0625 
0626 #' Right outer join two RDDs
0627 #'
0628 #' @description
0629 #' \code{rightouterjoin} This function right-outer-joins two RDDs where every element is of
0630 #' the form list(K, V). The key types of the two RDDs should be the same.
0631 #'
0632 #' @param x An RDD to be joined. Should be an RDD where each element is
0633 #'             list(K, V).
0634 #' @param y An RDD to be joined. Should be an RDD where each element is
0635 #'             list(K, V).
0636 #' @param numPartitions Number of partitions to create.
0637 #' @return For each element (k, w) in y, the resulting RDD will either contain
0638 #'         all pairs (k, (v, w)) for (k, v) in x, or the pair (k, (NULL, w))
0639 #'         if no elements in x have key k.
0640 #' @examples
0641 # nolint start
0642 #'\dontrun{
0643 #' sc <- sparkR.init()
0644 #' rdd1 <- parallelize(sc, list(list(1, 2), list(1, 3)))
0645 #' rdd2 <- parallelize(sc, list(list(1, 1), list(2, 4)))
0646 #' rightOuterJoin(rdd1, rdd2, 2L)
0647 #' # list(list(1, list(2, 1)), list(1, list(3, 1)), list(2, list(NULL, 4)))
0648 #'}
0649 # nolint end
0650 #' @rdname join-methods
0651 #' @aliases rightOuterJoin,RDD,RDD-method
0652 #' @noRd
0653 setMethod("rightOuterJoin",
0654           signature(x = "RDD", y = "RDD", numPartitions = "numeric"),
0655           function(x, y, numPartitions) {
0656             xTagged <- lapply(x, function(i) { list(i[[1]], list(1L, i[[2]])) })
0657             yTagged <- lapply(y, function(i) { list(i[[1]], list(2L, i[[2]])) })
0658 
0659             doJoin <- function(v) {
0660               joinTaggedList(v, list(TRUE, FALSE))
0661             }
0662 
0663             joined <- flatMapValues(groupByKey(unionRDD(xTagged, yTagged), numPartitions), doJoin)
0664           })
0665 
0666 #' Full outer join two RDDs
0667 #'
0668 #' @description
0669 #' \code{fullouterjoin} This function full-outer-joins two RDDs where every element is of
0670 #' the form list(K, V). The key types of the two RDDs should be the same.
0671 #'
0672 #' @param x An RDD to be joined. Should be an RDD where each element is
0673 #'             list(K, V).
0674 #' @param y An RDD to be joined. Should be an RDD where each element is
0675 #'             list(K, V).
0676 #' @param numPartitions Number of partitions to create.
0677 #' @return For each element (k, v) in x and (k, w) in y, the resulting RDD
0678 #'         will contain all pairs (k, (v, w)) for both (k, v) in x and
0679 #'         (k, w) in y, or the pair (k, (NULL, w))/(k, (v, NULL)) if no elements
0680 #'         in x/y have key k.
0681 #' @examples
0682 # nolint start
0683 #'\dontrun{
0684 #' sc <- sparkR.init()
0685 #' rdd1 <- parallelize(sc, list(list(1, 2), list(1, 3), list(3, 3)))
0686 #' rdd2 <- parallelize(sc, list(list(1, 1), list(2, 4)))
0687 #' fullOuterJoin(rdd1, rdd2, 2L) # list(list(1, list(2, 1)),
0688 #'                               #      list(1, list(3, 1)),
0689 #'                               #      list(2, list(NULL, 4)))
0690 #'                               #      list(3, list(3, NULL)),
0691 #'}
0692 # nolint end
0693 #' @rdname join-methods
0694 #' @aliases fullOuterJoin,RDD,RDD-method
0695 #' @noRd
0696 setMethod("fullOuterJoin",
0697           signature(x = "RDD", y = "RDD", numPartitions = "numeric"),
0698           function(x, y, numPartitions) {
0699             xTagged <- lapply(x, function(i) { list(i[[1]], list(1L, i[[2]])) })
0700             yTagged <- lapply(y, function(i) { list(i[[1]], list(2L, i[[2]])) })
0701 
0702             doJoin <- function(v) {
0703               joinTaggedList(v, list(TRUE, TRUE))
0704             }
0705 
0706             joined <- flatMapValues(groupByKey(unionRDD(xTagged, yTagged), numPartitions), doJoin)
0707           })
0708 
0709 #' For each key k in several RDDs, return a resulting RDD that
0710 #' whose values are a list of values for the key in all RDDs.
0711 #'
0712 #' @param ... Several RDDs.
0713 #' @param numPartitions Number of partitions to create.
0714 #' @return a new RDD containing all pairs of elements with values in a list
0715 #' in all RDDs.
0716 #' @examples
0717 # nolint start
0718 #'\dontrun{
0719 #' sc <- sparkR.init()
0720 #' rdd1 <- parallelize(sc, list(list(1, 1), list(2, 4)))
0721 #' rdd2 <- parallelize(sc, list(list(1, 2), list(1, 3)))
0722 #' cogroup(rdd1, rdd2, numPartitions = 2L)
0723 #' # list(list(1, list(1, list(2, 3))), list(2, list(list(4), list()))
0724 #'}
0725 # nolint end
0726 #' @rdname cogroup
0727 #' @aliases cogroup,RDD-method
0728 #' @noRd
0729 setMethod("cogroup",
0730           "RDD",
0731           function(..., numPartitions) {
0732             rdds <- list(...)
0733             rddsLen <- length(rdds)
0734             for (i in 1:rddsLen) {
0735               rdds[[i]] <- lapply(rdds[[i]],
0736                                   function(x) { list(x[[1]], list(i, x[[2]])) })
0737             }
0738             union.rdd <- Reduce(unionRDD, rdds)
0739             group.func <- function(vlist) {
0740               res <- list()
0741               length(res) <- rddsLen
0742               for (x in vlist) {
0743                 i <- x[[1]]
0744                 acc <- res[[i]]
0745                 # Create an accumulator.
0746                 if (is.null(acc)) {
0747                   acc <- initAccumulator()
0748                 }
0749                 addItemToAccumulator(acc, x[[2]])
0750                 res[[i]] <- acc
0751               }
0752               lapply(res, function(acc) {
0753                 if (is.null(acc)) {
0754                   list()
0755                 } else {
0756                   acc$data
0757                 }
0758               })
0759             }
0760             cogroup.rdd <- mapValues(groupByKey(union.rdd, numPartitions),
0761                                      group.func)
0762           })
0763 
0764 #' Sort a (k, v) pair RDD by k.
0765 #'
0766 #' @param x A (k, v) pair RDD to be sorted.
0767 #' @param ascending A flag to indicate whether the sorting is ascending or descending.
0768 #' @param numPartitions Number of partitions to create.
0769 #' @return An RDD where all (k, v) pair elements are sorted.
0770 #' @examples
0771 # nolint start
0772 #'\dontrun{
0773 #' sc <- sparkR.init()
0774 #' rdd <- parallelize(sc, list(list(3, 1), list(2, 2), list(1, 3)))
0775 #' collectRDD(sortByKey(rdd)) # list (list(1, 3), list(2, 2), list(3, 1))
0776 #'}
0777 # nolint end
0778 #' @rdname sortByKey
0779 #' @aliases sortByKey,RDD,RDD-method
0780 #' @noRd
0781 setMethod("sortByKey",
0782           signature(x = "RDD"),
0783           function(x, ascending = TRUE, numPartitions = SparkR:::getNumPartitionsRDD(x)) {
0784             rangeBounds <- list()
0785 
0786             if (numPartitions > 1) {
0787               rddSize <- countRDD(x)
0788               # constant from Spark's RangePartitioner
0789               maxSampleSize <- numPartitions * 20
0790               fraction <- min(maxSampleSize / max(rddSize, 1), 1.0)
0791 
0792               samples <- collectRDD(keys(sampleRDD(x, FALSE, fraction, 1L)))
0793 
0794               # Note: the built-in R sort() function only works on atomic vectors
0795               samples <- sort(unlist(samples, recursive = FALSE), decreasing = !ascending)
0796 
0797               if (length(samples) > 0) {
0798                 rangeBounds <- lapply(seq_len(numPartitions - 1),
0799                                       function(i) {
0800                                         j <- ceiling(length(samples) * i / numPartitions)
0801                                         samples[j]
0802                                       })
0803               }
0804             }
0805 
0806             rangePartitionFunc <- function(key) {
0807               partition <- 0
0808 
0809               # TODO: Use binary search instead of linear search, similar with Spark
0810               while (partition < length(rangeBounds) && key > rangeBounds[[partition + 1]]) {
0811                 partition <- partition + 1
0812               }
0813 
0814               if (ascending) {
0815                 partition
0816               } else {
0817                 numPartitions - partition - 1
0818               }
0819             }
0820 
0821             partitionFunc <- function(part) {
0822               sortKeyValueList(part, decreasing = !ascending)
0823             }
0824 
0825             newRDD <- partitionByRDD(x, numPartitions, rangePartitionFunc)
0826             lapplyPartition(newRDD, partitionFunc)
0827           })
0828 
0829 #' Subtract a pair RDD with another pair RDD.
0830 #'
0831 #' Return an RDD with the pairs from x whose keys are not in other.
0832 #'
0833 #' @param x An RDD.
0834 #' @param other An RDD.
0835 #' @param numPartitions Number of the partitions in the result RDD.
0836 #' @return An RDD with the pairs from x whose keys are not in other.
0837 #' @examples
0838 # nolint start
0839 #'\dontrun{
0840 #' sc <- sparkR.init()
0841 #' rdd1 <- parallelize(sc, list(list("a", 1), list("b", 4),
0842 #'                              list("b", 5), list("a", 2)))
0843 #' rdd2 <- parallelize(sc, list(list("a", 3), list("c", 1)))
0844 #' collectRDD(subtractByKey(rdd1, rdd2))
0845 #' # list(list("b", 4), list("b", 5))
0846 #'}
0847 # nolint end
0848 #' @rdname subtractByKey
0849 #' @aliases subtractByKey,RDD
0850 #' @noRd
0851 setMethod("subtractByKey",
0852           signature(x = "RDD", other = "RDD"),
0853           function(x, other, numPartitions = SparkR:::getNumPartitionsRDD(x)) {
0854             filterFunction <- function(elem) {
0855               iters <- elem[[2]]
0856               (length(iters[[1]]) > 0) && (length(iters[[2]]) == 0)
0857             }
0858 
0859             flatMapValues(filterRDD(cogroup(x,
0860                                             other,
0861                                             numPartitions = numPartitions),
0862                                     filterFunction),
0863                           function(v) { v[[1]] })
0864           })
0865 
0866 #' Return a subset of this RDD sampled by key.
0867 #'
0868 #' @description
0869 #' \code{sampleByKey} Create a sample of this RDD using variable sampling rates
0870 #' for different keys as specified by fractions, a key to sampling rate map.
0871 #'
0872 #' @param x The RDD to sample elements by key, where each element is
0873 #'             list(K, V) or c(K, V).
0874 #' @param withReplacement Sampling with replacement or not
0875 #' @param fraction The (rough) sample target fraction
0876 #' @param seed Randomness seed value
0877 #' @examples
0878 #'\dontrun{
0879 #' sc <- sparkR.init()
0880 #' rdd <- parallelize(sc, 1:3000)
0881 #' pairs <- lapply(rdd, function(x) { if (x %% 3 == 0) list("a", x)
0882 #'                                    else { if (x %% 3 == 1) list("b", x) else list("c", x) }})
0883 #' fractions <- list(a = 0.2, b = 0.1, c = 0.3)
0884 #' sample <- sampleByKey(pairs, FALSE, fractions, 1618L)
0885 #' 100 < length(lookup(sample, "a")) && 300 > length(lookup(sample, "a")) # TRUE
0886 #' 50 < length(lookup(sample, "b")) && 150 > length(lookup(sample, "b")) # TRUE
0887 #' 200 < length(lookup(sample, "c")) && 400 > length(lookup(sample, "c")) # TRUE
0888 #' lookup(sample, "a")[which.min(lookup(sample, "a"))] >= 0 # TRUE
0889 #' lookup(sample, "a")[which.max(lookup(sample, "a"))] <= 2000 # TRUE
0890 #' lookup(sample, "b")[which.min(lookup(sample, "b"))] >= 0 # TRUE
0891 #' lookup(sample, "b")[which.max(lookup(sample, "b"))] <= 2000 # TRUE
0892 #' lookup(sample, "c")[which.min(lookup(sample, "c"))] >= 0 # TRUE
0893 #' lookup(sample, "c")[which.max(lookup(sample, "c"))] <= 2000 # TRUE
0894 #' fractions <- list(a = 0.2, b = 0.1, c = 0.3, d = 0.4)
0895 #' sample <- sampleByKey(pairs, FALSE, fractions, 1618L) # Key "d" will be ignored
0896 #' fractions <- list(a = 0.2, b = 0.1)
0897 #' sample <- sampleByKey(pairs, FALSE, fractions, 1618L) # KeyError: "c"
0898 #'}
0899 #' @rdname sampleByKey
0900 #' @aliases sampleByKey,RDD-method
0901 #' @noRd
0902 setMethod("sampleByKey",
0903           signature(x = "RDD", withReplacement = "logical",
0904                     fractions = "vector", seed = "integer"),
0905           function(x, withReplacement, fractions, seed) {
0906 
0907             for (elem in fractions) {
0908               if (elem < 0.0) {
0909                 stop("Negative fraction value ", fractions[which(fractions == elem)])
0910               }
0911             }
0912 
0913             # The sampler: takes a partition and returns its sampled version.
0914             samplingFunc <- function(partIndex, part) {
0915               set.seed(bitwXor(seed, partIndex))
0916               res <- vector("list", length(part))
0917               len <- 0
0918 
0919               # mixing because the initial seeds are close to each other
0920               stats::runif(10)
0921 
0922               for (elem in part) {
0923                 if (elem[[1]] %in% names(fractions)) {
0924                   frac <- as.numeric(fractions[which(elem[[1]] == names(fractions))])
0925                   if (withReplacement) {
0926                     count <- stats::rpois(1, frac)
0927                     if (count > 0) {
0928                       res[(len + 1) : (len + count)] <- rep(list(elem), count)
0929                       len <- len + count
0930                     }
0931                   } else {
0932                     if (stats::runif(1) < frac) {
0933                       len <- len + 1
0934                       res[[len]] <- elem
0935                     }
0936                   }
0937                 } else {
0938                   stop("KeyError: \"", elem[[1]], "\"")
0939                 }
0940               }
0941 
0942               # TODO(zongheng): look into the performance of the current
0943               # implementation. Look into some iterator package? Note that
0944               # Scala avoids many calls to creating an empty list and PySpark
0945               # similarly achieves this using `yield'. (duplicated from sampleRDD)
0946               if (len > 0) {
0947                 res[1:len]
0948               } else {
0949                 list()
0950               }
0951             }
0952 
0953             lapplyPartitionsWithIndex(x, samplingFunc)
0954           })