0001 #
0002 # Licensed to the Apache Software Foundation (ASF) under one or more
0003 # contributor license agreements. See the NOTICE file distributed with
0004 # this work for additional information regarding copyright ownership.
0005 # The ASF licenses this file to You under the Apache License, Version 2.0
0006 # (the "License"); you may not use this file except in compliance with
0007 # the License. You may obtain a copy of the License at
0008 #
0009 # http://www.apache.org/licenses/LICENSE-2.0
0010 #
0011 # Unless required by applicable law or agreed to in writing, software
0012 # distributed under the License is distributed on an "AS IS" BASIS,
0013 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014 # See the License for the specific language governing permissions and
0015 # limitations under the License.
0016 #
0017
0018 # Operations supported on RDDs contains pairs (i.e key, value)
0019 #' @include generics.R jobj.R RDD.R
0020 NULL
0021
0022 ############ Actions and Transformations ############
0023
0024 #' Look up elements of a key in an RDD
0025 #'
0026 #' @description
0027 #' \code{lookup} returns a list of values in this RDD for key key.
0028 #'
0029 #' @param x The RDD to collect
0030 #' @param key The key to look up for
0031 #' @return a list of values in this RDD for key key
0032 #' @examples
0033 # nolint start
0034 #'\dontrun{
0035 #' sc <- sparkR.init()
0036 #' pairs <- list(c(1, 1), c(2, 2), c(1, 3))
0037 #' rdd <- parallelize(sc, pairs)
0038 #' lookup(rdd, 1) # list(1, 3)
0039 #'}
0040 # nolint end
0041 #' @rdname lookup
0042 #' @aliases lookup,RDD-method
0043 #' @noRd
0044 setMethod("lookup",
0045 signature(x = "RDD", key = "ANY"),
0046 function(x, key) {
0047 partitionFunc <- function(part) {
0048 filtered <- part[unlist(lapply(part, function(i) { identical(key, i[[1]]) }))]
0049 lapply(filtered, function(i) { i[[2]] })
0050 }
0051 valsRDD <- lapplyPartition(x, partitionFunc)
0052 collectRDD(valsRDD)
0053 })
0054
0055 #' Count the number of elements for each key, and return the result to the
0056 #' master as lists of (key, count) pairs.
0057 #'
0058 #' Same as countByKey in Spark.
0059 #'
0060 #' @param x The RDD to count keys.
0061 #' @return list of (key, count) pairs, where count is number of each key in rdd.
0062 #' @examples
0063 # nolint start
0064 #'\dontrun{
0065 #' sc <- sparkR.init()
0066 #' rdd <- parallelize(sc, list(c("a", 1), c("b", 1), c("a", 1)))
0067 #' countByKey(rdd) # ("a", 2L), ("b", 1L)
0068 #'}
0069 # nolint end
0070 #' @rdname countByKey
0071 #' @aliases countByKey,RDD-method
0072 #' @noRd
0073 setMethod("countByKey",
0074 signature(x = "RDD"),
0075 function(x) {
0076 keys <- lapply(x, function(item) { item[[1]] })
0077 countByValue(keys)
0078 })
0079
0080 #' Return an RDD with the keys of each tuple.
0081 #'
0082 #' @param x The RDD from which the keys of each tuple is returned.
0083 #' @examples
0084 # nolint start
0085 #'\dontrun{
0086 #' sc <- sparkR.init()
0087 #' rdd <- parallelize(sc, list(list(1, 2), list(3, 4)))
0088 #' collectRDD(keys(rdd)) # list(1, 3)
0089 #'}
0090 # nolint end
0091 #' @rdname keys
0092 #' @aliases keys,RDD
0093 #' @noRd
0094 setMethod("keys",
0095 signature(x = "RDD"),
0096 function(x) {
0097 func <- function(k) {
0098 k[[1]]
0099 }
0100 lapply(x, func)
0101 })
0102
0103 #' Return an RDD with the values of each tuple.
0104 #'
0105 #' @param x The RDD from which the values of each tuple is returned.
0106 #' @examples
0107 # nolint start
0108 #'\dontrun{
0109 #' sc <- sparkR.init()
0110 #' rdd <- parallelize(sc, list(list(1, 2), list(3, 4)))
0111 #' collectRDD(values(rdd)) # list(2, 4)
0112 #'}
0113 # nolint end
0114 #' @rdname values
0115 #' @aliases values,RDD
0116 #' @noRd
0117 setMethod("values",
0118 signature(x = "RDD"),
0119 function(x) {
0120 func <- function(v) {
0121 v[[2]]
0122 }
0123 lapply(x, func)
0124 })
0125
0126 #' Applies a function to all values of the elements, without modifying the keys.
0127 #'
0128 #' The same as `mapValues()' in Spark.
0129 #'
0130 #' @param X The RDD to apply the transformation.
0131 #' @param FUN the transformation to apply on the value of each element.
0132 #' @return a new RDD created by the transformation.
0133 #' @examples
0134 #'\dontrun{
0135 #' sc <- sparkR.init()
0136 #' rdd <- parallelize(sc, 1:10)
0137 #' makePairs <- lapply(rdd, function(x) { list(x, x) })
0138 #' collectRDD(mapValues(makePairs, function(x) { x * 2) })
0139 #' Output: list(list(1,2), list(2,4), list(3,6), ...)
0140 #'}
0141 #' @rdname mapValues
0142 #' @aliases mapValues,RDD,function-method
0143 #' @noRd
0144 setMethod("mapValues",
0145 signature(X = "RDD", FUN = "function"),
0146 function(X, FUN) {
0147 func <- function(x) {
0148 list(x[[1]], FUN(x[[2]]))
0149 }
0150 lapply(X, func)
0151 })
0152
0153 #' Pass each value in the key-value pair RDD through a flatMap function without
0154 #' changing the keys; this also retains the original RDD's partitioning.
0155 #'
0156 #' The same as 'flatMapValues()' in Spark.
0157 #'
0158 #' @param X The RDD to apply the transformation.
0159 #' @param FUN the transformation to apply on the value of each element.
0160 #' @return a new RDD created by the transformation.
0161 #' @examples
0162 #'\dontrun{
0163 #' sc <- sparkR.init()
0164 #' rdd <- parallelize(sc, list(list(1, c(1,2)), list(2, c(3,4))))
0165 #' collectRDD(flatMapValues(rdd, function(x) { x }))
0166 #' Output: list(list(1,1), list(1,2), list(2,3), list(2,4))
0167 #'}
0168 #' @rdname flatMapValues
0169 #' @aliases flatMapValues,RDD,function-method
0170 #' @noRd
0171 setMethod("flatMapValues",
0172 signature(X = "RDD", FUN = "function"),
0173 function(X, FUN) {
0174 flatMapFunc <- function(x) {
0175 lapply(FUN(x[[2]]), function(v) { list(x[[1]], v) })
0176 }
0177 flatMap(X, flatMapFunc)
0178 })
0179
0180 ############ Shuffle Functions ############
0181
0182 #' Partition an RDD by key
0183 #'
0184 #' This function operates on RDDs where every element is of the form list(K, V) or c(K, V).
0185 #' For each element of this RDD, the partitioner is used to compute a hash
0186 #' function and the RDD is partitioned using this hash value.
0187 #'
0188 #' @param x The RDD to partition. Should be an RDD where each element is
0189 #' list(K, V) or c(K, V).
0190 #' @param numPartitions Number of partitions to create.
0191 #' @param ... Other optional arguments to partitionBy.
0192 #'
0193 #' @param partitionFunc The partition function to use. Uses a default hashCode
0194 #' function if not provided
0195 #' @return An RDD partitioned using the specified partitioner.
0196 #' @examples
0197 #'\dontrun{
0198 #' sc <- sparkR.init()
0199 #' pairs <- list(list(1, 2), list(1.1, 3), list(1, 4))
0200 #' rdd <- parallelize(sc, pairs)
0201 #' parts <- partitionByRDD(rdd, 2L)
0202 #' collectPartition(parts, 0L) # First partition should contain list(1, 2) and list(1, 4)
0203 #'}
0204 #' @rdname partitionBy
0205 #' @aliases partitionBy,RDD,integer-method
0206 #' @noRd
0207 setMethod("partitionByRDD",
0208 signature(x = "RDD"),
0209 function(x, numPartitions, partitionFunc = hashCode) {
0210 stopifnot(is.numeric(numPartitions))
0211
0212 partitionFunc <- cleanClosure(partitionFunc)
0213 serializedHashFuncBytes <- serialize(partitionFunc, connection = NULL)
0214
0215 packageNamesArr <- serialize(.sparkREnv$.packages,
0216 connection = NULL)
0217 broadcastArr <- lapply(ls(.broadcastNames),
0218 function(name) { get(name, .broadcastNames) })
0219 jrdd <- getJRDD(x)
0220
0221 # We create a PairwiseRRDD that extends RDD[(Int, Array[Byte])],
0222 # where the key is the target partition number, the value is
0223 # the content (key-val pairs).
0224 pairwiseRRDD <- newJObject("org.apache.spark.api.r.PairwiseRRDD",
0225 callJMethod(jrdd, "rdd"),
0226 numToInt(numPartitions),
0227 serializedHashFuncBytes,
0228 getSerializedMode(x),
0229 packageNamesArr,
0230 broadcastArr,
0231 callJMethod(jrdd, "classTag"))
0232
0233 # Create a corresponding partitioner.
0234 rPartitioner <- newJObject("org.apache.spark.HashPartitioner",
0235 numToInt(numPartitions))
0236
0237 # Call partitionBy on the obtained PairwiseRDD.
0238 javaPairRDD <- callJMethod(pairwiseRRDD, "asJavaPairRDD")
0239 javaPairRDD <- callJMethod(javaPairRDD, "partitionBy", rPartitioner)
0240
0241 # Call .values() on the result to get back the final result, the
0242 # shuffled acutal content key-val pairs.
0243 r <- callJMethod(javaPairRDD, "values")
0244
0245 RDD(r, serializedMode = "byte")
0246 })
0247
0248 #' Group values by key
0249 #'
0250 #' This function operates on RDDs where every element is of the form list(K, V) or c(K, V).
0251 #' and group values for each key in the RDD into a single sequence.
0252 #'
0253 #' @param x The RDD to group. Should be an RDD where each element is
0254 #' list(K, V) or c(K, V).
0255 #' @param numPartitions Number of partitions to create.
0256 #' @return An RDD where each element is list(K, list(V))
0257 #' @seealso reduceByKey
0258 #' @examples
0259 #'\dontrun{
0260 #' sc <- sparkR.init()
0261 #' pairs <- list(list(1, 2), list(1.1, 3), list(1, 4))
0262 #' rdd <- parallelize(sc, pairs)
0263 #' parts <- groupByKey(rdd, 2L)
0264 #' grouped <- collectRDD(parts)
0265 #' grouped[[1]] # Should be a list(1, list(2, 4))
0266 #'}
0267 #' @rdname groupByKey
0268 #' @aliases groupByKey,RDD,integer-method
0269 #' @noRd
0270 setMethod("groupByKey",
0271 signature(x = "RDD", numPartitions = "numeric"),
0272 function(x, numPartitions) {
0273 shuffled <- partitionByRDD(x, numPartitions)
0274 groupVals <- function(part) {
0275 vals <- new.env()
0276 keys <- new.env()
0277 pred <- function(item) exists(item$hash, keys)
0278 appendList <- function(acc, i) {
0279 addItemToAccumulator(acc, i)
0280 acc
0281 }
0282 makeList <- function(i) {
0283 acc <- initAccumulator()
0284 addItemToAccumulator(acc, i)
0285 acc
0286 }
0287 # Each item in the partition is list of (K, V)
0288 lapply(part,
0289 function(item) {
0290 item$hash <- as.character(hashCode(item[[1]]))
0291 updateOrCreatePair(item, keys, vals, pred,
0292 appendList, makeList)
0293 })
0294 # extract out data field
0295 vals <- eapply(vals,
0296 function(i) {
0297 length(i$data) <- i$counter
0298 i$data
0299 })
0300 # Every key in the environment contains a list
0301 # Convert that to list(K, Seq[V])
0302 convertEnvsToList(keys, vals)
0303 }
0304 lapplyPartition(shuffled, groupVals)
0305 })
0306
0307 #' Merge values by key
0308 #'
0309 #' This function operates on RDDs where every element is of the form list(K, V) or c(K, V).
0310 #' and merges the values for each key using an associative and commutative reduce function.
0311 #'
0312 #' @param x The RDD to reduce by key. Should be an RDD where each element is
0313 #' list(K, V) or c(K, V).
0314 #' @param combineFunc The associative and commutative reduce function to use.
0315 #' @param numPartitions Number of partitions to create.
0316 #' @return An RDD where each element is list(K, V') where V' is the merged
0317 #' value
0318 #' @examples
0319 #'\dontrun{
0320 #' sc <- sparkR.init()
0321 #' pairs <- list(list(1, 2), list(1.1, 3), list(1, 4))
0322 #' rdd <- parallelize(sc, pairs)
0323 #' parts <- reduceByKey(rdd, "+", 2L)
0324 #' reduced <- collectRDD(parts)
0325 #' reduced[[1]] # Should be a list(1, 6)
0326 #'}
0327 #' @rdname reduceByKey
0328 #' @aliases reduceByKey,RDD,integer-method
0329 #' @noRd
0330 setMethod("reduceByKey",
0331 signature(x = "RDD", combineFunc = "ANY", numPartitions = "numeric"),
0332 function(x, combineFunc, numPartitions) {
0333 reduceVals <- function(part) {
0334 vals <- new.env()
0335 keys <- new.env()
0336 pred <- function(item) exists(item$hash, keys)
0337 lapply(part,
0338 function(item) {
0339 item$hash <- as.character(hashCode(item[[1]]))
0340 updateOrCreatePair(item, keys, vals, pred, combineFunc, identity)
0341 })
0342 convertEnvsToList(keys, vals)
0343 }
0344 locallyReduced <- lapplyPartition(x, reduceVals)
0345 shuffled <- partitionByRDD(locallyReduced, numToInt(numPartitions))
0346 lapplyPartition(shuffled, reduceVals)
0347 })
0348
0349 #' Merge values by key locally
0350 #'
0351 #' This function operates on RDDs where every element is of the form list(K, V) or c(K, V).
0352 #' and merges the values for each key using an associative and commutative reduce function, but
0353 #' return the results immediately to the driver as an R list.
0354 #'
0355 #' @param x The RDD to reduce by key. Should be an RDD where each element is
0356 #' list(K, V) or c(K, V).
0357 #' @param combineFunc The associative and commutative reduce function to use.
0358 #' @return A list of elements of type list(K, V') where V' is the merged value for each key
0359 #' @seealso reduceByKey
0360 #' @examples
0361 # nolint start
0362 #'\dontrun{
0363 #' sc <- sparkR.init()
0364 #' pairs <- list(list(1, 2), list(1.1, 3), list(1, 4))
0365 #' rdd <- parallelize(sc, pairs)
0366 #' reduced <- reduceByKeyLocally(rdd, "+")
0367 #' reduced # list(list(1, 6), list(1.1, 3))
0368 #'}
0369 # nolint end
0370 #' @rdname reduceByKeyLocally
0371 #' @aliases reduceByKeyLocally,RDD,integer-method
0372 #' @noRd
0373 setMethod("reduceByKeyLocally",
0374 signature(x = "RDD", combineFunc = "ANY"),
0375 function(x, combineFunc) {
0376 reducePart <- function(part) {
0377 vals <- new.env()
0378 keys <- new.env()
0379 pred <- function(item) exists(item$hash, keys)
0380 lapply(part,
0381 function(item) {
0382 item$hash <- as.character(hashCode(item[[1]]))
0383 updateOrCreatePair(item, keys, vals, pred, combineFunc, identity)
0384 })
0385 list(list(keys, vals)) # return hash to avoid re-compute in merge
0386 }
0387 mergeParts <- function(accum, x) {
0388 pred <- function(item) {
0389 exists(item$hash, accum[[1]])
0390 }
0391 lapply(ls(x[[1]]),
0392 function(name) {
0393 item <- list(x[[1]][[name]], x[[2]][[name]])
0394 item$hash <- name
0395 updateOrCreatePair(item, accum[[1]], accum[[2]], pred, combineFunc, identity)
0396 })
0397 accum
0398 }
0399 reduced <- mapPartitions(x, reducePart)
0400 merged <- reduce(reduced, mergeParts)
0401 convertEnvsToList(merged[[1]], merged[[2]])
0402 })
0403
0404 #' Combine values by key
0405 #'
0406 #' Generic function to combine the elements for each key using a custom set of
0407 #' aggregation functions. Turns an RDD[(K, V)] into a result of type RDD[(K, C)],
0408 #' for a "combined type" C. Note that V and C can be different -- for example, one
0409 #' might group an RDD of type (Int, Int) into an RDD of type (Int, Seq[Int]).
0410 #' Users provide three functions:
0411 #' \itemize{
0412 #' \item createCombiner, which turns a V into a C (e.g., creates a one-element list)
0413 #' \item mergeValue, to merge a V into a C (e.g., adds it to the end of a list) -
0414 #' \item mergeCombiners, to combine two C's into a single one (e.g., concatentates
0415 #' two lists).
0416 #' }
0417 #'
0418 #' @param x The RDD to combine. Should be an RDD where each element is
0419 #' list(K, V) or c(K, V).
0420 #' @param createCombiner Create a combiner (C) given a value (V)
0421 #' @param mergeValue Merge the given value (V) with an existing combiner (C)
0422 #' @param mergeCombiners Merge two combiners and return a new combiner
0423 #' @param numPartitions Number of partitions to create.
0424 #' @return An RDD where each element is list(K, C) where C is the combined type
0425 #' @seealso groupByKey, reduceByKey
0426 #' @examples
0427 # nolint start
0428 #'\dontrun{
0429 #' sc <- sparkR.init()
0430 #' pairs <- list(list(1, 2), list(1.1, 3), list(1, 4))
0431 #' rdd <- parallelize(sc, pairs)
0432 #' parts <- combineByKey(rdd, function(x) { x }, "+", "+", 2L)
0433 #' combined <- collectRDD(parts)
0434 #' combined[[1]] # Should be a list(1, 6)
0435 #'}
0436 # nolint end
0437 #' @rdname combineByKey
0438 #' @aliases combineByKey,RDD,ANY,ANY,ANY,integer-method
0439 #' @noRd
0440 setMethod("combineByKey",
0441 signature(x = "RDD", createCombiner = "ANY", mergeValue = "ANY",
0442 mergeCombiners = "ANY", numPartitions = "numeric"),
0443 function(x, createCombiner, mergeValue, mergeCombiners, numPartitions) {
0444 combineLocally <- function(part) {
0445 combiners <- new.env()
0446 keys <- new.env()
0447 pred <- function(item) exists(item$hash, keys)
0448 lapply(part,
0449 function(item) {
0450 item$hash <- as.character(hashCode(item[[1]]))
0451 updateOrCreatePair(item, keys, combiners, pred, mergeValue, createCombiner)
0452 })
0453 convertEnvsToList(keys, combiners)
0454 }
0455 locallyCombined <- lapplyPartition(x, combineLocally)
0456 shuffled <- partitionByRDD(locallyCombined, numToInt(numPartitions))
0457 mergeAfterShuffle <- function(part) {
0458 combiners <- new.env()
0459 keys <- new.env()
0460 pred <- function(item) exists(item$hash, keys)
0461 lapply(part,
0462 function(item) {
0463 item$hash <- as.character(hashCode(item[[1]]))
0464 updateOrCreatePair(item, keys, combiners, pred, mergeCombiners, identity)
0465 })
0466 convertEnvsToList(keys, combiners)
0467 }
0468 lapplyPartition(shuffled, mergeAfterShuffle)
0469 })
0470
0471 #' Aggregate a pair RDD by each key.
0472 #'
0473 #' Aggregate the values of each key in an RDD, using given combine functions
0474 #' and a neutral "zero value". This function can return a different result type,
0475 #' U, than the type of the values in this RDD, V. Thus, we need one operation
0476 #' for merging a V into a U and one operation for merging two U's, The former
0477 #' operation is used for merging values within a partition, and the latter is
0478 #' used for merging values between partitions. To avoid memory allocation, both
0479 #' of these functions are allowed to modify and return their first argument
0480 #' instead of creating a new U.
0481 #'
0482 #' @param x An RDD.
0483 #' @param zeroValue A neutral "zero value".
0484 #' @param seqOp A function to aggregate the values of each key. It may return
0485 #' a different result type from the type of the values.
0486 #' @param combOp A function to aggregate results of seqOp.
0487 #' @return An RDD containing the aggregation result.
0488 #' @seealso foldByKey, combineByKey
0489 #' @examples
0490 # nolint start
0491 #'\dontrun{
0492 #' sc <- sparkR.init()
0493 #' rdd <- parallelize(sc, list(list(1, 1), list(1, 2), list(2, 3), list(2, 4)))
0494 #' zeroValue <- list(0, 0)
0495 #' seqOp <- function(x, y) { list(x[[1]] + y, x[[2]] + 1) }
0496 #' combOp <- function(x, y) { list(x[[1]] + y[[1]], x[[2]] + y[[2]]) }
0497 #' aggregateByKey(rdd, zeroValue, seqOp, combOp, 2L)
0498 #' # list(list(1, list(3, 2)), list(2, list(7, 2)))
0499 #'}
0500 # nolint end
0501 #' @rdname aggregateByKey
0502 #' @aliases aggregateByKey,RDD,ANY,ANY,ANY,integer-method
0503 #' @noRd
0504 setMethod("aggregateByKey",
0505 signature(x = "RDD", zeroValue = "ANY", seqOp = "ANY",
0506 combOp = "ANY", numPartitions = "numeric"),
0507 function(x, zeroValue, seqOp, combOp, numPartitions) {
0508 createCombiner <- function(v) {
0509 do.call(seqOp, list(zeroValue, v))
0510 }
0511
0512 combineByKey(x, createCombiner, seqOp, combOp, numPartitions)
0513 })
0514
0515 #' Fold a pair RDD by each key.
0516 #'
0517 #' Aggregate the values of each key in an RDD, using an associative function "func"
0518 #' and a neutral "zero value" which may be added to the result an arbitrary
0519 #' number of times, and must not change the result (e.g., 0 for addition, or
0520 #' 1 for multiplication.).
0521 #'
0522 #' @param x An RDD.
0523 #' @param zeroValue A neutral "zero value".
0524 #' @param func An associative function for folding values of each key.
0525 #' @return An RDD containing the aggregation result.
0526 #' @seealso aggregateByKey, combineByKey
0527 #' @examples
0528 # nolint start
0529 #'\dontrun{
0530 #' sc <- sparkR.init()
0531 #' rdd <- parallelize(sc, list(list(1, 1), list(1, 2), list(2, 3), list(2, 4)))
0532 #' foldByKey(rdd, 0, "+", 2L) # list(list(1, 3), list(2, 7))
0533 #'}
0534 # nolint end
0535 #' @rdname foldByKey
0536 #' @aliases foldByKey,RDD,ANY,ANY,integer-method
0537 #' @noRd
0538 setMethod("foldByKey",
0539 signature(x = "RDD", zeroValue = "ANY",
0540 func = "ANY", numPartitions = "numeric"),
0541 function(x, zeroValue, func, numPartitions) {
0542 aggregateByKey(x, zeroValue, func, func, numPartitions)
0543 })
0544
0545 ############ Binary Functions #############
0546
0547 #' Join two RDDs
0548 #'
0549 #' @description
0550 #' \code{join} This function joins two RDDs where every element is of the form list(K, V).
0551 #' The key types of the two RDDs should be the same.
0552 #'
0553 #' @param x An RDD to be joined. Should be an RDD where each element is
0554 #' list(K, V).
0555 #' @param y An RDD to be joined. Should be an RDD where each element is
0556 #' list(K, V).
0557 #' @param numPartitions Number of partitions to create.
0558 #' @return a new RDD containing all pairs of elements with matching keys in
0559 #' two input RDDs.
0560 #' @examples
0561 # nolint start
0562 #'\dontrun{
0563 #' sc <- sparkR.init()
0564 #' rdd1 <- parallelize(sc, list(list(1, 1), list(2, 4)))
0565 #' rdd2 <- parallelize(sc, list(list(1, 2), list(1, 3)))
0566 #' joinRDD(rdd1, rdd2, 2L) # list(list(1, list(1, 2)), list(1, list(1, 3))
0567 #'}
0568 # nolint end
0569 #' @rdname join-methods
0570 #' @aliases join,RDD,RDD-method
0571 #' @noRd
0572 setMethod("joinRDD",
0573 signature(x = "RDD", y = "RDD"),
0574 function(x, y, numPartitions) {
0575 xTagged <- lapply(x, function(i) { list(i[[1]], list(1L, i[[2]])) })
0576 yTagged <- lapply(y, function(i) { list(i[[1]], list(2L, i[[2]])) })
0577
0578 doJoin <- function(v) {
0579 joinTaggedList(v, list(FALSE, FALSE))
0580 }
0581
0582 joined <- flatMapValues(groupByKey(unionRDD(xTagged, yTagged), numPartitions),
0583 doJoin)
0584 })
0585
0586 #' Left outer join two RDDs
0587 #'
0588 #' @description
0589 #' \code{leftouterjoin} This function left-outer-joins two RDDs where every element is of
0590 #' the form list(K, V). The key types of the two RDDs should be the same.
0591 #'
0592 #' @param x An RDD to be joined. Should be an RDD where each element is
0593 #' list(K, V).
0594 #' @param y An RDD to be joined. Should be an RDD where each element is
0595 #' list(K, V).
0596 #' @param numPartitions Number of partitions to create.
0597 #' @return For each element (k, v) in x, the resulting RDD will either contain
0598 #' all pairs (k, (v, w)) for (k, w) in rdd2, or the pair (k, (v, NULL))
0599 #' if no elements in rdd2 have key k.
0600 #' @examples
0601 # nolint start
0602 #'\dontrun{
0603 #' sc <- sparkR.init()
0604 #' rdd1 <- parallelize(sc, list(list(1, 1), list(2, 4)))
0605 #' rdd2 <- parallelize(sc, list(list(1, 2), list(1, 3)))
0606 #' leftOuterJoin(rdd1, rdd2, 2L)
0607 #' # list(list(1, list(1, 2)), list(1, list(1, 3)), list(2, list(4, NULL)))
0608 #'}
0609 # nolint end
0610 #' @rdname join-methods
0611 #' @aliases leftOuterJoin,RDD,RDD-method
0612 #' @noRd
0613 setMethod("leftOuterJoin",
0614 signature(x = "RDD", y = "RDD", numPartitions = "numeric"),
0615 function(x, y, numPartitions) {
0616 xTagged <- lapply(x, function(i) { list(i[[1]], list(1L, i[[2]])) })
0617 yTagged <- lapply(y, function(i) { list(i[[1]], list(2L, i[[2]])) })
0618
0619 doJoin <- function(v) {
0620 joinTaggedList(v, list(FALSE, TRUE))
0621 }
0622
0623 joined <- flatMapValues(groupByKey(unionRDD(xTagged, yTagged), numPartitions), doJoin)
0624 })
0625
0626 #' Right outer join two RDDs
0627 #'
0628 #' @description
0629 #' \code{rightouterjoin} This function right-outer-joins two RDDs where every element is of
0630 #' the form list(K, V). The key types of the two RDDs should be the same.
0631 #'
0632 #' @param x An RDD to be joined. Should be an RDD where each element is
0633 #' list(K, V).
0634 #' @param y An RDD to be joined. Should be an RDD where each element is
0635 #' list(K, V).
0636 #' @param numPartitions Number of partitions to create.
0637 #' @return For each element (k, w) in y, the resulting RDD will either contain
0638 #' all pairs (k, (v, w)) for (k, v) in x, or the pair (k, (NULL, w))
0639 #' if no elements in x have key k.
0640 #' @examples
0641 # nolint start
0642 #'\dontrun{
0643 #' sc <- sparkR.init()
0644 #' rdd1 <- parallelize(sc, list(list(1, 2), list(1, 3)))
0645 #' rdd2 <- parallelize(sc, list(list(1, 1), list(2, 4)))
0646 #' rightOuterJoin(rdd1, rdd2, 2L)
0647 #' # list(list(1, list(2, 1)), list(1, list(3, 1)), list(2, list(NULL, 4)))
0648 #'}
0649 # nolint end
0650 #' @rdname join-methods
0651 #' @aliases rightOuterJoin,RDD,RDD-method
0652 #' @noRd
0653 setMethod("rightOuterJoin",
0654 signature(x = "RDD", y = "RDD", numPartitions = "numeric"),
0655 function(x, y, numPartitions) {
0656 xTagged <- lapply(x, function(i) { list(i[[1]], list(1L, i[[2]])) })
0657 yTagged <- lapply(y, function(i) { list(i[[1]], list(2L, i[[2]])) })
0658
0659 doJoin <- function(v) {
0660 joinTaggedList(v, list(TRUE, FALSE))
0661 }
0662
0663 joined <- flatMapValues(groupByKey(unionRDD(xTagged, yTagged), numPartitions), doJoin)
0664 })
0665
0666 #' Full outer join two RDDs
0667 #'
0668 #' @description
0669 #' \code{fullouterjoin} This function full-outer-joins two RDDs where every element is of
0670 #' the form list(K, V). The key types of the two RDDs should be the same.
0671 #'
0672 #' @param x An RDD to be joined. Should be an RDD where each element is
0673 #' list(K, V).
0674 #' @param y An RDD to be joined. Should be an RDD where each element is
0675 #' list(K, V).
0676 #' @param numPartitions Number of partitions to create.
0677 #' @return For each element (k, v) in x and (k, w) in y, the resulting RDD
0678 #' will contain all pairs (k, (v, w)) for both (k, v) in x and
0679 #' (k, w) in y, or the pair (k, (NULL, w))/(k, (v, NULL)) if no elements
0680 #' in x/y have key k.
0681 #' @examples
0682 # nolint start
0683 #'\dontrun{
0684 #' sc <- sparkR.init()
0685 #' rdd1 <- parallelize(sc, list(list(1, 2), list(1, 3), list(3, 3)))
0686 #' rdd2 <- parallelize(sc, list(list(1, 1), list(2, 4)))
0687 #' fullOuterJoin(rdd1, rdd2, 2L) # list(list(1, list(2, 1)),
0688 #' # list(1, list(3, 1)),
0689 #' # list(2, list(NULL, 4)))
0690 #' # list(3, list(3, NULL)),
0691 #'}
0692 # nolint end
0693 #' @rdname join-methods
0694 #' @aliases fullOuterJoin,RDD,RDD-method
0695 #' @noRd
0696 setMethod("fullOuterJoin",
0697 signature(x = "RDD", y = "RDD", numPartitions = "numeric"),
0698 function(x, y, numPartitions) {
0699 xTagged <- lapply(x, function(i) { list(i[[1]], list(1L, i[[2]])) })
0700 yTagged <- lapply(y, function(i) { list(i[[1]], list(2L, i[[2]])) })
0701
0702 doJoin <- function(v) {
0703 joinTaggedList(v, list(TRUE, TRUE))
0704 }
0705
0706 joined <- flatMapValues(groupByKey(unionRDD(xTagged, yTagged), numPartitions), doJoin)
0707 })
0708
0709 #' For each key k in several RDDs, return a resulting RDD that
0710 #' whose values are a list of values for the key in all RDDs.
0711 #'
0712 #' @param ... Several RDDs.
0713 #' @param numPartitions Number of partitions to create.
0714 #' @return a new RDD containing all pairs of elements with values in a list
0715 #' in all RDDs.
0716 #' @examples
0717 # nolint start
0718 #'\dontrun{
0719 #' sc <- sparkR.init()
0720 #' rdd1 <- parallelize(sc, list(list(1, 1), list(2, 4)))
0721 #' rdd2 <- parallelize(sc, list(list(1, 2), list(1, 3)))
0722 #' cogroup(rdd1, rdd2, numPartitions = 2L)
0723 #' # list(list(1, list(1, list(2, 3))), list(2, list(list(4), list()))
0724 #'}
0725 # nolint end
0726 #' @rdname cogroup
0727 #' @aliases cogroup,RDD-method
0728 #' @noRd
0729 setMethod("cogroup",
0730 "RDD",
0731 function(..., numPartitions) {
0732 rdds <- list(...)
0733 rddsLen <- length(rdds)
0734 for (i in 1:rddsLen) {
0735 rdds[[i]] <- lapply(rdds[[i]],
0736 function(x) { list(x[[1]], list(i, x[[2]])) })
0737 }
0738 union.rdd <- Reduce(unionRDD, rdds)
0739 group.func <- function(vlist) {
0740 res <- list()
0741 length(res) <- rddsLen
0742 for (x in vlist) {
0743 i <- x[[1]]
0744 acc <- res[[i]]
0745 # Create an accumulator.
0746 if (is.null(acc)) {
0747 acc <- initAccumulator()
0748 }
0749 addItemToAccumulator(acc, x[[2]])
0750 res[[i]] <- acc
0751 }
0752 lapply(res, function(acc) {
0753 if (is.null(acc)) {
0754 list()
0755 } else {
0756 acc$data
0757 }
0758 })
0759 }
0760 cogroup.rdd <- mapValues(groupByKey(union.rdd, numPartitions),
0761 group.func)
0762 })
0763
0764 #' Sort a (k, v) pair RDD by k.
0765 #'
0766 #' @param x A (k, v) pair RDD to be sorted.
0767 #' @param ascending A flag to indicate whether the sorting is ascending or descending.
0768 #' @param numPartitions Number of partitions to create.
0769 #' @return An RDD where all (k, v) pair elements are sorted.
0770 #' @examples
0771 # nolint start
0772 #'\dontrun{
0773 #' sc <- sparkR.init()
0774 #' rdd <- parallelize(sc, list(list(3, 1), list(2, 2), list(1, 3)))
0775 #' collectRDD(sortByKey(rdd)) # list (list(1, 3), list(2, 2), list(3, 1))
0776 #'}
0777 # nolint end
0778 #' @rdname sortByKey
0779 #' @aliases sortByKey,RDD,RDD-method
0780 #' @noRd
0781 setMethod("sortByKey",
0782 signature(x = "RDD"),
0783 function(x, ascending = TRUE, numPartitions = SparkR:::getNumPartitionsRDD(x)) {
0784 rangeBounds <- list()
0785
0786 if (numPartitions > 1) {
0787 rddSize <- countRDD(x)
0788 # constant from Spark's RangePartitioner
0789 maxSampleSize <- numPartitions * 20
0790 fraction <- min(maxSampleSize / max(rddSize, 1), 1.0)
0791
0792 samples <- collectRDD(keys(sampleRDD(x, FALSE, fraction, 1L)))
0793
0794 # Note: the built-in R sort() function only works on atomic vectors
0795 samples <- sort(unlist(samples, recursive = FALSE), decreasing = !ascending)
0796
0797 if (length(samples) > 0) {
0798 rangeBounds <- lapply(seq_len(numPartitions - 1),
0799 function(i) {
0800 j <- ceiling(length(samples) * i / numPartitions)
0801 samples[j]
0802 })
0803 }
0804 }
0805
0806 rangePartitionFunc <- function(key) {
0807 partition <- 0
0808
0809 # TODO: Use binary search instead of linear search, similar with Spark
0810 while (partition < length(rangeBounds) && key > rangeBounds[[partition + 1]]) {
0811 partition <- partition + 1
0812 }
0813
0814 if (ascending) {
0815 partition
0816 } else {
0817 numPartitions - partition - 1
0818 }
0819 }
0820
0821 partitionFunc <- function(part) {
0822 sortKeyValueList(part, decreasing = !ascending)
0823 }
0824
0825 newRDD <- partitionByRDD(x, numPartitions, rangePartitionFunc)
0826 lapplyPartition(newRDD, partitionFunc)
0827 })
0828
0829 #' Subtract a pair RDD with another pair RDD.
0830 #'
0831 #' Return an RDD with the pairs from x whose keys are not in other.
0832 #'
0833 #' @param x An RDD.
0834 #' @param other An RDD.
0835 #' @param numPartitions Number of the partitions in the result RDD.
0836 #' @return An RDD with the pairs from x whose keys are not in other.
0837 #' @examples
0838 # nolint start
0839 #'\dontrun{
0840 #' sc <- sparkR.init()
0841 #' rdd1 <- parallelize(sc, list(list("a", 1), list("b", 4),
0842 #' list("b", 5), list("a", 2)))
0843 #' rdd2 <- parallelize(sc, list(list("a", 3), list("c", 1)))
0844 #' collectRDD(subtractByKey(rdd1, rdd2))
0845 #' # list(list("b", 4), list("b", 5))
0846 #'}
0847 # nolint end
0848 #' @rdname subtractByKey
0849 #' @aliases subtractByKey,RDD
0850 #' @noRd
0851 setMethod("subtractByKey",
0852 signature(x = "RDD", other = "RDD"),
0853 function(x, other, numPartitions = SparkR:::getNumPartitionsRDD(x)) {
0854 filterFunction <- function(elem) {
0855 iters <- elem[[2]]
0856 (length(iters[[1]]) > 0) && (length(iters[[2]]) == 0)
0857 }
0858
0859 flatMapValues(filterRDD(cogroup(x,
0860 other,
0861 numPartitions = numPartitions),
0862 filterFunction),
0863 function(v) { v[[1]] })
0864 })
0865
0866 #' Return a subset of this RDD sampled by key.
0867 #'
0868 #' @description
0869 #' \code{sampleByKey} Create a sample of this RDD using variable sampling rates
0870 #' for different keys as specified by fractions, a key to sampling rate map.
0871 #'
0872 #' @param x The RDD to sample elements by key, where each element is
0873 #' list(K, V) or c(K, V).
0874 #' @param withReplacement Sampling with replacement or not
0875 #' @param fraction The (rough) sample target fraction
0876 #' @param seed Randomness seed value
0877 #' @examples
0878 #'\dontrun{
0879 #' sc <- sparkR.init()
0880 #' rdd <- parallelize(sc, 1:3000)
0881 #' pairs <- lapply(rdd, function(x) { if (x %% 3 == 0) list("a", x)
0882 #' else { if (x %% 3 == 1) list("b", x) else list("c", x) }})
0883 #' fractions <- list(a = 0.2, b = 0.1, c = 0.3)
0884 #' sample <- sampleByKey(pairs, FALSE, fractions, 1618L)
0885 #' 100 < length(lookup(sample, "a")) && 300 > length(lookup(sample, "a")) # TRUE
0886 #' 50 < length(lookup(sample, "b")) && 150 > length(lookup(sample, "b")) # TRUE
0887 #' 200 < length(lookup(sample, "c")) && 400 > length(lookup(sample, "c")) # TRUE
0888 #' lookup(sample, "a")[which.min(lookup(sample, "a"))] >= 0 # TRUE
0889 #' lookup(sample, "a")[which.max(lookup(sample, "a"))] <= 2000 # TRUE
0890 #' lookup(sample, "b")[which.min(lookup(sample, "b"))] >= 0 # TRUE
0891 #' lookup(sample, "b")[which.max(lookup(sample, "b"))] <= 2000 # TRUE
0892 #' lookup(sample, "c")[which.min(lookup(sample, "c"))] >= 0 # TRUE
0893 #' lookup(sample, "c")[which.max(lookup(sample, "c"))] <= 2000 # TRUE
0894 #' fractions <- list(a = 0.2, b = 0.1, c = 0.3, d = 0.4)
0895 #' sample <- sampleByKey(pairs, FALSE, fractions, 1618L) # Key "d" will be ignored
0896 #' fractions <- list(a = 0.2, b = 0.1)
0897 #' sample <- sampleByKey(pairs, FALSE, fractions, 1618L) # KeyError: "c"
0898 #'}
0899 #' @rdname sampleByKey
0900 #' @aliases sampleByKey,RDD-method
0901 #' @noRd
0902 setMethod("sampleByKey",
0903 signature(x = "RDD", withReplacement = "logical",
0904 fractions = "vector", seed = "integer"),
0905 function(x, withReplacement, fractions, seed) {
0906
0907 for (elem in fractions) {
0908 if (elem < 0.0) {
0909 stop("Negative fraction value ", fractions[which(fractions == elem)])
0910 }
0911 }
0912
0913 # The sampler: takes a partition and returns its sampled version.
0914 samplingFunc <- function(partIndex, part) {
0915 set.seed(bitwXor(seed, partIndex))
0916 res <- vector("list", length(part))
0917 len <- 0
0918
0919 # mixing because the initial seeds are close to each other
0920 stats::runif(10)
0921
0922 for (elem in part) {
0923 if (elem[[1]] %in% names(fractions)) {
0924 frac <- as.numeric(fractions[which(elem[[1]] == names(fractions))])
0925 if (withReplacement) {
0926 count <- stats::rpois(1, frac)
0927 if (count > 0) {
0928 res[(len + 1) : (len + count)] <- rep(list(elem), count)
0929 len <- len + count
0930 }
0931 } else {
0932 if (stats::runif(1) < frac) {
0933 len <- len + 1
0934 res[[len]] <- elem
0935 }
0936 }
0937 } else {
0938 stop("KeyError: \"", elem[[1]], "\"")
0939 }
0940 }
0941
0942 # TODO(zongheng): look into the performance of the current
0943 # implementation. Look into some iterator package? Note that
0944 # Scala avoids many calls to creating an empty list and PySpark
0945 # similarly achieves this using `yield'. (duplicated from sampleRDD)
0946 if (len > 0) {
0947 res[1:len]
0948 } else {
0949 list()
0950 }
0951 }
0952
0953 lapplyPartitionsWithIndex(x, samplingFunc)
0954 })