0001 #
0002 # Licensed to the Apache Software Foundation (ASF) under one or more
0003 # contributor license agreements. See the NOTICE file distributed with
0004 # this work for additional information regarding copyright ownership.
0005 # The ASF licenses this file to You under the Apache License, Version 2.0
0006 # (the "License"); you may not use this file except in compliance with
0007 # the License. You may obtain a copy of the License at
0008 #
0009 # http://www.apache.org/licenses/LICENSE-2.0
0010 #
0011 # Unless required by applicable law or agreed to in writing, software
0012 # distributed under the License is distributed on an "AS IS" BASIS,
0013 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014 # See the License for the specific language governing permissions and
0015 # limitations under the License.
0016 #
0017
0018 # Utilities and Helpers
0019
0020 # Given a JList<T>, returns an R list containing the same elements, the number
0021 # of which is optionally upper bounded by `logicalUpperBound` (by default,
0022 # return all elements). Takes care of deserializations and type conversions.
0023 convertJListToRList <- function(jList, flatten, logicalUpperBound = NULL,
0024 serializedMode = "byte") {
0025 arrSize <- callJMethod(jList, "size")
0026
0027 # Datasets with serializedMode == "string" (such as an RDD directly generated by textFile()):
0028 # each partition is not dense-packed into one Array[Byte], and `arrSize`
0029 # here corresponds to number of logical elements. Thus we can prune here.
0030 if (serializedMode == "string" && !is.null(logicalUpperBound)) {
0031 arrSize <- min(arrSize, logicalUpperBound)
0032 }
0033
0034 results <- if (arrSize > 0) {
0035 lapply(0 : (arrSize - 1),
0036 function(index) {
0037 obj <- callJMethod(jList, "get", as.integer(index))
0038
0039 # Assume it is either an R object or a Java obj ref.
0040 if (inherits(obj, "jobj")) {
0041 if (isInstanceOf(obj, "scala.Tuple2")) {
0042 # JavaPairRDD[Array[Byte], Array[Byte]].
0043
0044 keyBytes <- callJMethod(obj, "_1")
0045 valBytes <- callJMethod(obj, "_2")
0046 res <- list(unserialize(keyBytes),
0047 unserialize(valBytes))
0048 } else {
0049 stop("utils.R: convertJListToRList only supports ",
0050 "RDD[Array[Byte]] and ",
0051 "JavaPairRDD[Array[Byte], Array[Byte]] for now")
0052 }
0053 } else {
0054 if (inherits(obj, "raw")) {
0055 if (serializedMode == "byte") {
0056 # RDD[Array[Byte]]. `obj` is a whole partition.
0057 res <- unserialize(obj)
0058 # For serialized datasets, `obj` (and `rRaw`) here corresponds to
0059 # one whole partition dense-packed together. We deserialize the
0060 # whole partition first, then cap the number of elements to be returned.
0061 } else if (serializedMode == "row") {
0062 res <- readRowList(obj)
0063 # For DataFrames that have been converted to RRDDs, we call readRowList
0064 # which will read in each row of the RRDD as a list and deserialize
0065 # each element.
0066 flatten <<- FALSE
0067 # Use global assignment to change the flatten flag. This means
0068 # we don't have to worry about the default argument in other functions
0069 # e.g. collect
0070 }
0071 # TODO: is it possible to distinguish element boundary so that we can
0072 # unserialize only what we need?
0073 if (!is.null(logicalUpperBound)) {
0074 res <- head(res, n = logicalUpperBound)
0075 }
0076 } else {
0077 # obj is of a primitive Java type, is simplified to R's
0078 # corresponding type.
0079 res <- list(obj)
0080 }
0081 }
0082 res
0083 })
0084 } else {
0085 list()
0086 }
0087
0088 if (flatten) {
0089 as.list(unlist(results, recursive = FALSE))
0090 } else {
0091 as.list(results)
0092 }
0093 }
0094
0095 # Returns TRUE if `name` refers to an RDD in the given environment `env`
0096 isRDD <- function(name, env) {
0097 obj <- get(name, envir = env)
0098 inherits(obj, "RDD")
0099 }
0100
0101 #' Compute the hashCode of an object
0102 #'
0103 #' Java-style function to compute the hashCode for the given object. Returns
0104 #' an integer value.
0105 #'
0106 #' @details
0107 #' This only works for integer, numeric and character types right now.
0108 #'
0109 #' @param key the object to be hashed
0110 #' @return the hash code as an integer
0111 #' @examples
0112 #'\dontrun{
0113 #' hashCode(1L) # 1
0114 #' hashCode(1.0) # 1072693248
0115 #' hashCode("1") # 49
0116 #'}
0117 #' @note hashCode since 1.4.0
0118 hashCode <- function(key) {
0119 if (class(key) == "integer") {
0120 as.integer(key[[1]])
0121 } else if (class(key) == "numeric") {
0122 # Convert the double to long and then calculate the hash code
0123 rawVec <- writeBin(key[[1]], con = raw())
0124 intBits <- packBits(rawToBits(rawVec), "integer")
0125 as.integer(bitwXor(intBits[2], intBits[1]))
0126 } else if (class(key) == "character") {
0127 # TODO: SPARK-7839 means we might not have the native library available
0128 n <- nchar(key)
0129 if (n == 0) {
0130 0L
0131 } else {
0132 asciiVals <- sapply(charToRaw(key), function(x) { strtoi(x, 16L) })
0133 hashC <- 0
0134 for (k in seq_len(length(asciiVals))) {
0135 hashC <- mult31AndAdd(hashC, asciiVals[k])
0136 }
0137 as.integer(hashC)
0138 }
0139 } else {
0140 warning("Could not hash object, returning 0")
0141 as.integer(0)
0142 }
0143 }
0144
0145 # Helper function used to wrap a 'numeric' value to integer bounds.
0146 # Useful for implementing C-like integer arithmetic
0147 wrapInt <- function(value) {
0148 if (value > .Machine$integer.max) {
0149 value <- value - 2 * .Machine$integer.max - 2
0150 } else if (value < -1 * .Machine$integer.max) {
0151 value <- 2 * .Machine$integer.max + value + 2
0152 }
0153 value
0154 }
0155
0156 # Multiply `val` by 31 and add `addVal` to the result. Ensures that
0157 # integer-overflows are handled at every step.
0158 #
0159 # TODO: this function does not handle integer overflow well
0160 mult31AndAdd <- function(val, addVal) {
0161 vec <- c(bitwShiftL(val, c(4, 3, 2, 1, 0)), addVal)
0162 vec[is.na(vec)] <- 0
0163 Reduce(function(a, b) {
0164 wrapInt(as.numeric(a) + as.numeric(b))
0165 },
0166 vec)
0167 }
0168
0169 # Create a new RDD with serializedMode == "byte".
0170 # Return itself if already in "byte" format.
0171 serializeToBytes <- function(rdd) {
0172 if (!inherits(rdd, "RDD")) {
0173 stop("Argument 'rdd' is not an RDD type.")
0174 }
0175 if (getSerializedMode(rdd) != "byte") {
0176 ser.rdd <- lapply(rdd, function(x) { x })
0177 return(ser.rdd)
0178 } else {
0179 return(rdd)
0180 }
0181 }
0182
0183 # Create a new RDD with serializedMode == "string".
0184 # Return itself if already in "string" format.
0185 serializeToString <- function(rdd) {
0186 if (!inherits(rdd, "RDD")) {
0187 stop("Argument 'rdd' is not an RDD type.")
0188 }
0189 if (getSerializedMode(rdd) != "string") {
0190 ser.rdd <- lapply(rdd, function(x) { toString(x) })
0191 # force it to create jrdd using "string"
0192 getJRDD(ser.rdd, serializedMode = "string")
0193 return(ser.rdd)
0194 } else {
0195 return(rdd)
0196 }
0197 }
0198
0199 # Fast append to list by using an accumulator.
0200 # http://stackoverflow.com/questions/17046336/here-we-go-again-append-an-element-to-a-list-in-r
0201 #
0202 # The accumulator should has three fields size, counter and data.
0203 # This function amortizes the allocation cost by doubling
0204 # the size of the list every time it fills up.
0205 addItemToAccumulator <- function(acc, item) {
0206 if (acc$counter == acc$size) {
0207 acc$size <- acc$size * 2
0208 length(acc$data) <- acc$size
0209 }
0210 acc$counter <- acc$counter + 1
0211 acc$data[[acc$counter]] <- item
0212 }
0213
0214 initAccumulator <- function() {
0215 acc <- new.env()
0216 acc$counter <- 0
0217 acc$data <- list(NULL)
0218 acc$size <- 1
0219 acc
0220 }
0221
0222 # Utility function to sort a list of key value pairs
0223 # Used in unit tests
0224 sortKeyValueList <- function(kv_list, decreasing = FALSE) {
0225 keys <- sapply(kv_list, function(x) x[[1]])
0226 kv_list[order(keys, decreasing = decreasing)]
0227 }
0228
0229 # Utility function to generate compact R lists from grouped rdd
0230 # Used in Join-family functions
0231 # param:
0232 # tagged_list R list generated via groupByKey with tags(1L, 2L, ...)
0233 # cnull Boolean list where each element determines whether the corresponding list should
0234 # be converted to list(NULL)
0235 genCompactLists <- function(tagged_list, cnull) {
0236 len <- length(tagged_list)
0237 lists <- list(vector("list", len), vector("list", len))
0238 index <- list(1, 1)
0239
0240 for (x in tagged_list) {
0241 tag <- x[[1]]
0242 idx <- index[[tag]]
0243 lists[[tag]][[idx]] <- x[[2]]
0244 index[[tag]] <- idx + 1
0245 }
0246
0247 len <- lapply(index, function(x) x - 1)
0248 for (i in (1:2)) {
0249 if (cnull[[i]] && len[[i]] == 0) {
0250 lists[[i]] <- list(NULL)
0251 } else {
0252 length(lists[[i]]) <- len[[i]]
0253 }
0254 }
0255
0256 lists
0257 }
0258
0259 # Utility function to merge compact R lists
0260 # Used in Join-family functions
0261 # param:
0262 # left/right Two compact lists ready for Cartesian product
0263 mergeCompactLists <- function(left, right) {
0264 result <- list()
0265 length(result) <- length(left) * length(right)
0266 index <- 1
0267 for (i in left) {
0268 for (j in right) {
0269 result[[index]] <- list(i, j)
0270 index <- index + 1
0271 }
0272 }
0273 result
0274 }
0275
0276 # Utility function to wrapper above two operations
0277 # Used in Join-family functions
0278 # param (same as genCompactLists):
0279 # tagged_list R list generated via groupByKey with tags(1L, 2L, ...)
0280 # cnull Boolean list where each element determines whether the corresponding list should
0281 # be converted to list(NULL)
0282 joinTaggedList <- function(tagged_list, cnull) {
0283 lists <- genCompactLists(tagged_list, cnull)
0284 mergeCompactLists(lists[[1]], lists[[2]])
0285 }
0286
0287 # Utility function to reduce a key-value list with predicate
0288 # Used in *ByKey functions
0289 # param
0290 # pair key-value pair
0291 # keys/vals env of key/value with hashes
0292 # updateOrCreatePred predicate function
0293 # updateFn update or merge function for existing pair, similar with `mergeVal` @combineByKey
0294 # createFn create function for new pair, similar with `createCombiner` @combinebykey
0295 updateOrCreatePair <- function(pair, keys, vals, updateOrCreatePred, updateFn, createFn) {
0296 # assume hashVal bind to `$hash`, key/val with index 1/2
0297 hashVal <- pair$hash
0298 key <- pair[[1]]
0299 val <- pair[[2]]
0300 if (updateOrCreatePred(pair)) {
0301 assign(hashVal, do.call(updateFn, list(get(hashVal, envir = vals), val)), envir = vals)
0302 } else {
0303 assign(hashVal, do.call(createFn, list(val)), envir = vals)
0304 assign(hashVal, key, envir = keys)
0305 }
0306 }
0307
0308 # Utility function to convert key&values envs into key-val list
0309 convertEnvsToList <- function(keys, vals) {
0310 lapply(ls(keys),
0311 function(name) {
0312 list(keys[[name]], vals[[name]])
0313 })
0314 }
0315
0316 # Utility function to merge 2 environments with the second overriding values in the first
0317 # env1 is changed in place
0318 overrideEnvs <- function(env1, env2) {
0319 lapply(ls(env2),
0320 function(name) {
0321 env1[[name]] <- env2[[name]]
0322 })
0323 }
0324
0325 # Utility function to capture the varargs into environment object
0326 varargsToEnv <- function(...) {
0327 # Based on http://stackoverflow.com/a/3057419/4577954
0328 pairs <- list(...)
0329 env <- new.env()
0330 for (name in names(pairs)) {
0331 env[[name]] <- pairs[[name]]
0332 }
0333 env
0334 }
0335
0336 # Utility function to capture the varargs into environment object but all values are converted
0337 # into string.
0338 varargsToStrEnv <- function(...) {
0339 pairs <- list(...)
0340 nameList <- names(pairs)
0341 env <- new.env()
0342 ignoredNames <- list()
0343
0344 if (is.null(nameList)) {
0345 # When all arguments are not named, names(..) returns NULL.
0346 ignoredNames <- pairs
0347 } else {
0348 for (i in seq_along(pairs)) {
0349 name <- nameList[i]
0350 value <- pairs[i]
0351 if (identical(name, "")) {
0352 # When some of arguments are not named, name is "".
0353 ignoredNames <- append(ignoredNames, value)
0354 } else {
0355 value <- pairs[[name]]
0356 if (!(is.logical(value) || is.numeric(value) || is.character(value) || is.null(value))) {
0357 stop("Unsupported type for ", name, " : ", toString(class(value)), ". ",
0358 "Supported types are logical, numeric, character and NULL.", call. = FALSE)
0359 }
0360 if (is.logical(value)) {
0361 env[[name]] <- tolower(as.character(value))
0362 } else if (is.null(value)) {
0363 env[[name]] <- value
0364 } else {
0365 env[[name]] <- as.character(value)
0366 }
0367 }
0368 }
0369 }
0370
0371 if (length(ignoredNames) != 0) {
0372 warning("Unnamed arguments ignored: ", toString(ignoredNames), ".", call. = FALSE)
0373 }
0374 env
0375 }
0376
0377 getStorageLevel <- function(newLevel = c("DISK_ONLY",
0378 "DISK_ONLY_2",
0379 "MEMORY_AND_DISK",
0380 "MEMORY_AND_DISK_2",
0381 "MEMORY_AND_DISK_SER",
0382 "MEMORY_AND_DISK_SER_2",
0383 "MEMORY_ONLY",
0384 "MEMORY_ONLY_2",
0385 "MEMORY_ONLY_SER",
0386 "MEMORY_ONLY_SER_2",
0387 "OFF_HEAP")) {
0388 match.arg(newLevel)
0389 storageLevelClass <- "org.apache.spark.storage.StorageLevel"
0390 storageLevel <- switch(newLevel,
0391 "DISK_ONLY" = callJStatic(storageLevelClass, "DISK_ONLY"),
0392 "DISK_ONLY_2" = callJStatic(storageLevelClass, "DISK_ONLY_2"),
0393 "MEMORY_AND_DISK" = callJStatic(storageLevelClass, "MEMORY_AND_DISK"),
0394 "MEMORY_AND_DISK_2" = callJStatic(storageLevelClass, "MEMORY_AND_DISK_2"),
0395 "MEMORY_AND_DISK_SER" = callJStatic(storageLevelClass,
0396 "MEMORY_AND_DISK_SER"),
0397 "MEMORY_AND_DISK_SER_2" = callJStatic(storageLevelClass,
0398 "MEMORY_AND_DISK_SER_2"),
0399 "MEMORY_ONLY" = callJStatic(storageLevelClass, "MEMORY_ONLY"),
0400 "MEMORY_ONLY_2" = callJStatic(storageLevelClass, "MEMORY_ONLY_2"),
0401 "MEMORY_ONLY_SER" = callJStatic(storageLevelClass, "MEMORY_ONLY_SER"),
0402 "MEMORY_ONLY_SER_2" = callJStatic(storageLevelClass, "MEMORY_ONLY_SER_2"),
0403 "OFF_HEAP" = callJStatic(storageLevelClass, "OFF_HEAP"))
0404 }
0405
0406 storageLevelToString <- function(levelObj) {
0407 useDisk <- callJMethod(levelObj, "useDisk")
0408 useMemory <- callJMethod(levelObj, "useMemory")
0409 useOffHeap <- callJMethod(levelObj, "useOffHeap")
0410 deserialized <- callJMethod(levelObj, "deserialized")
0411 replication <- callJMethod(levelObj, "replication")
0412 shortName <- if (!useDisk && !useMemory && !useOffHeap && !deserialized && replication == 1) {
0413 "NONE"
0414 } else if (useDisk && !useMemory && !useOffHeap && !deserialized && replication == 1) {
0415 "DISK_ONLY"
0416 } else if (useDisk && !useMemory && !useOffHeap && !deserialized && replication == 2) {
0417 "DISK_ONLY_2"
0418 } else if (!useDisk && useMemory && !useOffHeap && deserialized && replication == 1) {
0419 "MEMORY_ONLY"
0420 } else if (!useDisk && useMemory && !useOffHeap && deserialized && replication == 2) {
0421 "MEMORY_ONLY_2"
0422 } else if (!useDisk && useMemory && !useOffHeap && !deserialized && replication == 1) {
0423 "MEMORY_ONLY_SER"
0424 } else if (!useDisk && useMemory && !useOffHeap && !deserialized && replication == 2) {
0425 "MEMORY_ONLY_SER_2"
0426 } else if (useDisk && useMemory && !useOffHeap && deserialized && replication == 1) {
0427 "MEMORY_AND_DISK"
0428 } else if (useDisk && useMemory && !useOffHeap && deserialized && replication == 2) {
0429 "MEMORY_AND_DISK_2"
0430 } else if (useDisk && useMemory && !useOffHeap && !deserialized && replication == 1) {
0431 "MEMORY_AND_DISK_SER"
0432 } else if (useDisk && useMemory && !useOffHeap && !deserialized && replication == 2) {
0433 "MEMORY_AND_DISK_SER_2"
0434 } else if (useDisk && useMemory && useOffHeap && !deserialized && replication == 1) {
0435 "OFF_HEAP"
0436 } else {
0437 NULL
0438 }
0439 fullInfo <- callJMethod(levelObj, "toString")
0440 if (is.null(shortName)) {
0441 fullInfo
0442 } else {
0443 paste(shortName, "-", fullInfo)
0444 }
0445 }
0446
0447 # Utility function for functions where an argument needs to be integer but we want to allow
0448 # the user to type (for example) `5` instead of `5L` to avoid a confusing error message.
0449 numToInt <- function(num) {
0450 if (as.integer(num) != num) {
0451 warning("Coercing ", as.list(sys.call())[[2L]], " to integer.")
0452 }
0453 as.integer(num)
0454 }
0455
0456 # Utility function to recursively traverse the Abstract Syntax Tree (AST) of a
0457 # user defined function (UDF), and to examine variables in the UDF to decide
0458 # if their values should be included in the new function environment.
0459 # param
0460 # node The current AST node in the traversal.
0461 # oldEnv The original function environment.
0462 # defVars An Accumulator of variables names defined in the function's calling environment,
0463 # including function argument and local variable names.
0464 # checkedFunc An environment of function objects examined during cleanClosure. It can
0465 # be considered as a "name"-to-"list of functions" mapping.
0466 # newEnv A new function environment to store necessary function dependencies, an output argument.
0467 processClosure <- function(node, oldEnv, defVars, checkedFuncs, newEnv) {
0468 nodeLen <- length(node)
0469
0470 if (nodeLen > 1 && typeof(node) == "language") {
0471 # Recursive case: current AST node is an internal node, check for its children.
0472 if (length(node[[1]]) > 1) {
0473 for (i in 1:nodeLen) {
0474 processClosure(node[[i]], oldEnv, defVars, checkedFuncs, newEnv)
0475 }
0476 } else {
0477 # if node[[1]] is length of 1, check for some R special functions.
0478 nodeChar <- as.character(node[[1]])
0479 if (nodeChar == "{" || nodeChar == "(") {
0480 # Skip start symbol.
0481 for (i in 2:nodeLen) {
0482 processClosure(node[[i]], oldEnv, defVars, checkedFuncs, newEnv)
0483 }
0484 } else if (nodeChar == "<-" || nodeChar == "=" ||
0485 nodeChar == "<<-") {
0486 # Assignment Ops.
0487 defVar <- node[[2]]
0488 if (length(defVar) == 1 && typeof(defVar) == "symbol") {
0489 # Add the defined variable name into defVars.
0490 addItemToAccumulator(defVars, as.character(defVar))
0491 } else {
0492 processClosure(node[[2]], oldEnv, defVars, checkedFuncs, newEnv)
0493 }
0494 for (i in 3:nodeLen) {
0495 processClosure(node[[i]], oldEnv, defVars, checkedFuncs, newEnv)
0496 }
0497 } else if (nodeChar == "function") {
0498 # Function definition.
0499 # Add parameter names.
0500 newArgs <- names(node[[2]])
0501 lapply(newArgs, function(arg) { addItemToAccumulator(defVars, arg) })
0502 for (i in 3:nodeLen) {
0503 processClosure(node[[i]], oldEnv, defVars, checkedFuncs, newEnv)
0504 }
0505 } else if (nodeChar == "$") {
0506 # Skip the field.
0507 processClosure(node[[2]], oldEnv, defVars, checkedFuncs, newEnv)
0508 } else if (nodeChar == "::" || nodeChar == ":::") {
0509 processClosure(node[[3]], oldEnv, defVars, checkedFuncs, newEnv)
0510 } else {
0511 for (i in 1:nodeLen) {
0512 processClosure(node[[i]], oldEnv, defVars, checkedFuncs, newEnv)
0513 }
0514 }
0515 }
0516 } else if (nodeLen == 1 &&
0517 (typeof(node) == "symbol" || typeof(node) == "language")) {
0518 # Base case: current AST node is a leaf node and a symbol or a function call.
0519 nodeChar <- as.character(node)
0520 if (!nodeChar %in% defVars$data) {
0521 # Not a function parameter or local variable.
0522 func.env <- oldEnv
0523 topEnv <- parent.env(.GlobalEnv)
0524 # Search in function environment, and function's enclosing environments
0525 # up to global environment. There is no need to look into package environments
0526 # above the global or namespace environment that is not SparkR below the global,
0527 # as they are assumed to be loaded on workers.
0528 while (!identical(func.env, topEnv)) {
0529 # Namespaces other than "SparkR" will not be searched.
0530 if (!isNamespace(func.env) ||
0531 (getNamespaceName(func.env) == "SparkR" &&
0532 !(nodeChar %in% getNamespaceExports("SparkR")))) {
0533 # Only include SparkR internals.
0534
0535 # Set parameter 'inherits' to FALSE since we do not need to search in
0536 # attached package environments.
0537 if (tryCatch(exists(nodeChar, envir = func.env, inherits = FALSE),
0538 error = function(e) { FALSE })) {
0539 obj <- get(nodeChar, envir = func.env, inherits = FALSE)
0540 if (is.function(obj)) {
0541 # If the node is a function call.
0542 funcList <- mget(nodeChar, envir = checkedFuncs, inherits = F,
0543 ifnotfound = list(list(NULL)))[[1]]
0544 found <- sapply(funcList, function(func) {
0545 ifelse(
0546 identical(func, obj) &&
0547 # Also check if the parent environment is identical to current parent
0548 identical(parent.env(environment(func)), func.env),
0549 TRUE, FALSE)
0550 })
0551 if (sum(found) > 0) {
0552 # If function has been examined ignore
0553 break
0554 }
0555 # Function has not been examined, record it and recursively clean its closure.
0556 assign(nodeChar,
0557 if (is.null(funcList[[1]])) {
0558 list(obj)
0559 } else {
0560 append(funcList, obj)
0561 },
0562 envir = checkedFuncs)
0563 obj <- cleanClosure(obj, checkedFuncs)
0564 }
0565 assign(nodeChar, obj, envir = newEnv)
0566 break
0567 }
0568 }
0569
0570 # Continue to search in enclosure.
0571 func.env <- parent.env(func.env)
0572 }
0573 }
0574 }
0575 }
0576
0577 # Utility function to get user defined function (UDF) dependencies (closure).
0578 # More specifically, this function captures the values of free variables defined
0579 # outside a UDF, and stores them in the function's environment.
0580 # param
0581 # func A function whose closure needs to be captured.
0582 # checkedFunc An environment of function objects examined during cleanClosure. It can be
0583 # considered as a "name"-to-"list of functions" mapping.
0584 # return value
0585 # a new version of func that has a correct environment (closure).
0586 cleanClosure <- function(func, checkedFuncs = new.env()) {
0587 if (is.function(func)) {
0588 newEnv <- new.env(parent = .GlobalEnv)
0589 func.body <- body(func)
0590 oldEnv <- environment(func)
0591 # defVars is an Accumulator of variables names defined in the function's calling
0592 # environment. First, function's arguments are added to defVars.
0593 defVars <- initAccumulator()
0594 argNames <- names(as.list(args(func)))
0595 for (i in 1:(length(argNames) - 1)) {
0596 # Remove the ending NULL in pairlist.
0597 addItemToAccumulator(defVars, argNames[i])
0598 }
0599 # Recursively examine variables in the function body.
0600 processClosure(func.body, oldEnv, defVars, checkedFuncs, newEnv)
0601 environment(func) <- newEnv
0602 }
0603 func
0604 }
0605
0606 # Append partition lengths to each partition in two input RDDs if needed.
0607 # param
0608 # x An RDD.
0609 # Other An RDD.
0610 # return value
0611 # A list of two result RDDs.
0612 appendPartitionLengths <- function(x, other) {
0613 if (getSerializedMode(x) != getSerializedMode(other) ||
0614 getSerializedMode(x) == "byte") {
0615 # Append the number of elements in each partition to that partition so that we can later
0616 # know the boundary of elements from x and other.
0617 #
0618 # Note that this appending also serves the purpose of reserialization, because even if
0619 # any RDD is serialized, we need to reserialize it to make sure its partitions are encoded
0620 # as a single byte array. For example, partitions of an RDD generated from partitionBy()
0621 # may be encoded as multiple byte arrays.
0622 appendLength <- function(part) {
0623 len <- length(part)
0624 part[[len + 1]] <- len + 1
0625 part
0626 }
0627 x <- lapplyPartition(x, appendLength)
0628 other <- lapplyPartition(other, appendLength)
0629 }
0630 list(x, other)
0631 }
0632
0633 # Perform zip or cartesian between elements from two RDDs in each partition
0634 # param
0635 # rdd An RDD.
0636 # zip A boolean flag indicating this call is for zip operation or not.
0637 # return value
0638 # A result RDD.
0639 mergePartitions <- function(rdd, zip) {
0640 serializerMode <- getSerializedMode(rdd)
0641 partitionFunc <- function(partIndex, part) {
0642 len <- length(part)
0643 if (len > 0) {
0644 if (serializerMode == "byte") {
0645 lengthOfValues <- part[[len]]
0646 lengthOfKeys <- part[[len - lengthOfValues]]
0647 stopifnot(len == lengthOfKeys + lengthOfValues)
0648
0649 # For zip operation, check if corresponding partitions
0650 # of both RDDs have the same number of elements.
0651 if (zip && lengthOfKeys != lengthOfValues) {
0652 stop("Can only zip RDDs with same number of elements ",
0653 "in each pair of corresponding partitions.")
0654 }
0655
0656 if (lengthOfKeys > 1) {
0657 keys <- part[1 : (lengthOfKeys - 1)]
0658 } else {
0659 keys <- list()
0660 }
0661 if (lengthOfValues > 1) {
0662 values <- part[(lengthOfKeys + 1) : (len - 1)]
0663 } else {
0664 values <- list()
0665 }
0666
0667 if (!zip) {
0668 return(mergeCompactLists(keys, values))
0669 }
0670 } else {
0671 keys <- part[c(TRUE, FALSE)]
0672 values <- part[c(FALSE, TRUE)]
0673 }
0674 mapply(
0675 function(k, v) { list(k, v) },
0676 keys,
0677 values,
0678 SIMPLIFY = FALSE,
0679 USE.NAMES = FALSE)
0680 } else {
0681 part
0682 }
0683 }
0684
0685 PipelinedRDD(rdd, partitionFunc)
0686 }
0687
0688 # Convert a named list to struct so that
0689 # SerDe won't confuse between a normal named list and struct
0690 listToStruct <- function(list) {
0691 stopifnot(class(list) == "list")
0692 stopifnot(!is.null(names(list)))
0693 class(list) <- "struct"
0694 list
0695 }
0696
0697 # Convert a struct to a named list
0698 structToList <- function(struct) {
0699 stopifnot(class(list) == "struct")
0700
0701 class(struct) <- "list"
0702 struct
0703 }
0704
0705 # Convert a named list to an environment to be passed to JVM
0706 convertNamedListToEnv <- function(namedList) {
0707 # Make sure each item in the list has a name
0708 names <- names(namedList)
0709 stopifnot(
0710 if (is.null(names)) {
0711 length(namedList) == 0
0712 } else {
0713 !any(is.na(names))
0714 })
0715
0716 env <- new.env()
0717 for (name in names) {
0718 env[[name]] <- namedList[[name]]
0719 }
0720 env
0721 }
0722
0723 # Assign a new environment for attach() and with() methods
0724 assignNewEnv <- function(data) {
0725 stopifnot(class(data) == "SparkDataFrame")
0726 cols <- columns(data)
0727 stopifnot(length(cols) > 0)
0728
0729 env <- new.env()
0730 for (i in seq_len(length(cols))) {
0731 assign(x = cols[i], value = data[, cols[i], drop = F], envir = env)
0732 }
0733 env
0734 }
0735
0736 # Utility function to split by ',' and whitespace, remove empty tokens
0737 splitString <- function(input) {
0738 Filter(nzchar, unlist(strsplit(input, ",|\\s")))
0739 }
0740
0741 varargsToJProperties <- function(...) {
0742 pairs <- list(...)
0743 props <- newJObject("java.util.Properties")
0744 if (length(pairs) > 0) {
0745 lapply(ls(pairs), function(k) {
0746 callJMethod(props, "setProperty", as.character(k), as.character(pairs[[k]]))
0747 })
0748 }
0749 props
0750 }
0751
0752 launchScript <- function(script, combinedArgs, wait = FALSE, stdout = "", stderr = "") {
0753 if (.Platform$OS.type == "windows") {
0754 scriptWithArgs <- paste(script, combinedArgs, sep = " ")
0755 # on Windows, intern = F seems to mean output to the console. (documentation on this is missing)
0756 shell(scriptWithArgs, translate = TRUE, wait = wait, intern = wait)
0757 } else {
0758 # http://stat.ethz.ch/R-manual/R-devel/library/base/html/system2.html
0759 # stdout = F means discard output
0760 # stdout = "" means to its console (default)
0761 # Note that the console of this child process might not be the same as the running R process.
0762 system2(script, combinedArgs, stdout = stdout, wait = wait, stderr = stderr)
0763 }
0764 }
0765
0766 getSparkContext <- function() {
0767 if (!exists(".sparkRjsc", envir = .sparkREnv)) {
0768 stop("SparkR has not been initialized. Please call sparkR.session()")
0769 }
0770 sc <- get(".sparkRjsc", envir = .sparkREnv)
0771 sc
0772 }
0773
0774 isMasterLocal <- function(master) {
0775 grepl("^local(\\[([0-9]+|\\*)\\])?$", master, perl = TRUE)
0776 }
0777
0778 isClientMode <- function(master) {
0779 grepl("([a-z]+)-client$", master, perl = TRUE)
0780 }
0781
0782 isSparkRShell <- function() {
0783 grepl(".*shell\\.R$", Sys.getenv("R_PROFILE_USER"), perl = TRUE)
0784 }
0785
0786 # Works identically with `callJStatic(...)` but throws a pretty formatted exception.
0787 handledCallJStatic <- function(cls, method, ...) {
0788 result <- tryCatch(callJStatic(cls, method, ...),
0789 error = function(e) {
0790 captureJVMException(e, method)
0791 })
0792 result
0793 }
0794
0795 # Works identically with `callJMethod(...)` but throws a pretty formatted exception.
0796 handledCallJMethod <- function(obj, method, ...) {
0797 result <- tryCatch(callJMethod(obj, method, ...),
0798 error = function(e) {
0799 captureJVMException(e, method)
0800 })
0801 result
0802 }
0803
0804 captureJVMException <- function(e, method) {
0805 rawmsg <- as.character(e)
0806 if (any(grepl("^Error in .*?: ", rawmsg))) {
0807 # If the exception message starts with "Error in ...", this is possibly
0808 # "Error in invokeJava(...)". Here, it replaces the characters to
0809 # `paste("Error in", method, ":")` in order to identify which function
0810 # was called in JVM side.
0811 stacktrace <- strsplit(rawmsg, "Error in .*?: ")[[1]]
0812 rmsg <- paste("Error in", method, ":")
0813 stacktrace <- paste(rmsg[1], stacktrace[2])
0814 } else {
0815 # Otherwise, do not convert the error message just in case.
0816 stacktrace <- rawmsg
0817 }
0818
0819 # StreamingQueryException could wrap an IllegalArgumentException, so look for that first
0820 if (any(grepl("org.apache.spark.sql.streaming.StreamingQueryException: ",
0821 stacktrace, fixed = TRUE))) {
0822 msg <- strsplit(stacktrace, "org.apache.spark.sql.streaming.StreamingQueryException: ",
0823 fixed = TRUE)[[1]]
0824 # Extract "Error in ..." message.
0825 rmsg <- msg[1]
0826 # Extract the first message of JVM exception.
0827 first <- strsplit(msg[2], "\r?\n\tat")[[1]][1]
0828 stop(rmsg, "streaming query error - ", first, call. = FALSE)
0829 } else if (any(grepl("java.lang.IllegalArgumentException: ", stacktrace, fixed = TRUE))) {
0830 msg <- strsplit(stacktrace, "java.lang.IllegalArgumentException: ", fixed = TRUE)[[1]]
0831 # Extract "Error in ..." message.
0832 rmsg <- msg[1]
0833 # Extract the first message of JVM exception.
0834 first <- strsplit(msg[2], "\r?\n\tat")[[1]][1]
0835 stop(rmsg, "illegal argument - ", first, call. = FALSE)
0836 } else if (any(grepl("org.apache.spark.sql.AnalysisException: ", stacktrace, fixed = TRUE))) {
0837 msg <- strsplit(stacktrace, "org.apache.spark.sql.AnalysisException: ", fixed = TRUE)[[1]]
0838 # Extract "Error in ..." message.
0839 rmsg <- msg[1]
0840 # Extract the first message of JVM exception.
0841 first <- strsplit(msg[2], "\r?\n\tat")[[1]][1]
0842 stop(rmsg, "analysis error - ", first, call. = FALSE)
0843 } else
0844 if (any(grepl("org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException: ",
0845 stacktrace, fixed = TRUE))) {
0846 msg <- strsplit(stacktrace, "org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException: ",
0847 fixed = TRUE)[[1]]
0848 # Extract "Error in ..." message.
0849 rmsg <- msg[1]
0850 # Extract the first message of JVM exception.
0851 first <- strsplit(msg[2], "\r?\n\tat")[[1]][1]
0852 stop(rmsg, "no such database - ", first, call. = FALSE)
0853 } else
0854 if (any(grepl("org.apache.spark.sql.catalyst.analysis.NoSuchTableException: ",
0855 stacktrace, fixed = TRUE))) {
0856 msg <- strsplit(stacktrace, "org.apache.spark.sql.catalyst.analysis.NoSuchTableException: ",
0857 fixed = TRUE)[[1]]
0858 # Extract "Error in ..." message.
0859 rmsg <- msg[1]
0860 # Extract the first message of JVM exception.
0861 first <- strsplit(msg[2], "\r?\n\tat")[[1]][1]
0862 stop(rmsg, "no such table - ", first, call. = FALSE)
0863 } else if (any(grepl("org.apache.spark.sql.catalyst.parser.ParseException: ",
0864 stacktrace, fixed = TRUE))) {
0865 msg <- strsplit(stacktrace, "org.apache.spark.sql.catalyst.parser.ParseException: ",
0866 fixed = TRUE)[[1]]
0867 # Extract "Error in ..." message.
0868 rmsg <- msg[1]
0869 # Extract the first message of JVM exception.
0870 first <- strsplit(msg[2], "\r?\n\tat")[[1]][1]
0871 stop(rmsg, "parse error - ", first, call. = FALSE)
0872 } else {
0873 stop(stacktrace, call. = FALSE)
0874 }
0875 }
0876
0877 # rbind a list of rows with raw (binary) columns
0878 #
0879 # @param inputData a list of rows, with each row a list
0880 # @return data.frame with raw columns as lists
0881 rbindRaws <- function(inputData) {
0882 row1 <- inputData[[1]]
0883 rawcolumns <- ("raw" == sapply(row1, class))
0884
0885 listmatrix <- do.call(rbind, inputData)
0886 # A dataframe with all list columns
0887 out <- as.data.frame(listmatrix)
0888 out[!rawcolumns] <- lapply(out[!rawcolumns], unlist)
0889 out
0890 }
0891
0892 # Get basename without extension from URL
0893 basenameSansExtFromUrl <- function(url) {
0894 # split by '/'
0895 splits <- unlist(strsplit(url, "^.+/"))
0896 last <- tail(splits, 1)
0897 # this is from file_path_sans_ext
0898 # first, remove any compression extension
0899 filename <- sub("[.](gz|bz2|xz)$", "", last)
0900 # then, strip extension by the last '.'
0901 sub("([^.]+)\\.[[:alnum:]]+$", "\\1", filename)
0902 }
0903
0904 isAtomicLengthOne <- function(x) {
0905 is.atomic(x) && length(x) == 1
0906 }
0907
0908 is_windows <- function() {
0909 .Platform$OS.type == "windows"
0910 }
0911
0912 hadoop_home_set <- function() {
0913 !identical(Sys.getenv("HADOOP_HOME"), "")
0914 }
0915
0916 windows_with_hadoop <- function() {
0917 !is_windows() || hadoop_home_set()
0918 }
0919
0920 # get0 not supported before R 3.2.0
0921 getOne <- function(x, envir, inherits = TRUE, ifnotfound = NULL) {
0922 mget(x[1L], envir = envir, inherits = inherits, ifnotfound = list(ifnotfound))[[1L]]
0923 }
0924
0925 # Returns a vector of parent directories, traversing up count times, starting with a full path
0926 # eg. traverseParentDirs("/Users/user/Library/Caches/spark/spark2.2", 1) should return
0927 # this "/Users/user/Library/Caches/spark/spark2.2"
0928 # and "/Users/user/Library/Caches/spark"
0929 traverseParentDirs <- function(x, count) {
0930 if (dirname(x) == x || count <= 0) x else c(x, Recall(dirname(x), count - 1))
0931 }