Back to home page

OSCL-LXR

 
 

    


0001 #
0002 # Licensed to the Apache Software Foundation (ASF) under one or more
0003 # contributor license agreements.  See the NOTICE file distributed with
0004 # this work for additional information regarding copyright ownership.
0005 # The ASF licenses this file to You under the Apache License, Version 2.0
0006 # (the "License"); you may not use this file except in compliance with
0007 # the License.  You may obtain a copy of the License at
0008 #
0009 #    http://www.apache.org/licenses/LICENSE-2.0
0010 #
0011 # Unless required by applicable law or agreed to in writing, software
0012 # distributed under the License is distributed on an "AS IS" BASIS,
0013 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014 # See the License for the specific language governing permissions and
0015 # limitations under the License.
0016 #
0017 
0018 # Utilities and Helpers
0019 
0020 # Given a JList<T>, returns an R list containing the same elements, the number
0021 # of which is optionally upper bounded by `logicalUpperBound` (by default,
0022 # return all elements).  Takes care of deserializations and type conversions.
0023 convertJListToRList <- function(jList, flatten, logicalUpperBound = NULL,
0024   serializedMode = "byte") {
0025   arrSize <- callJMethod(jList, "size")
0026 
0027   # Datasets with serializedMode == "string" (such as an RDD directly generated by textFile()):
0028   # each partition is not dense-packed into one Array[Byte], and `arrSize`
0029   # here corresponds to number of logical elements. Thus we can prune here.
0030   if (serializedMode == "string" && !is.null(logicalUpperBound)) {
0031     arrSize <- min(arrSize, logicalUpperBound)
0032   }
0033 
0034   results <- if (arrSize > 0) {
0035     lapply(0 : (arrSize - 1),
0036           function(index) {
0037             obj <- callJMethod(jList, "get", as.integer(index))
0038 
0039             # Assume it is either an R object or a Java obj ref.
0040             if (inherits(obj, "jobj")) {
0041               if (isInstanceOf(obj, "scala.Tuple2")) {
0042                 # JavaPairRDD[Array[Byte], Array[Byte]].
0043 
0044                 keyBytes <- callJMethod(obj, "_1")
0045                 valBytes <- callJMethod(obj, "_2")
0046                 res <- list(unserialize(keyBytes),
0047                   unserialize(valBytes))
0048               } else {
0049                 stop("utils.R: convertJListToRList only supports ",
0050                   "RDD[Array[Byte]] and ",
0051                   "JavaPairRDD[Array[Byte], Array[Byte]] for now")
0052               }
0053             } else {
0054               if (inherits(obj, "raw")) {
0055                 if (serializedMode == "byte") {
0056                   # RDD[Array[Byte]]. `obj` is a whole partition.
0057                   res <- unserialize(obj)
0058                   # For serialized datasets, `obj` (and `rRaw`) here corresponds to
0059                   # one whole partition dense-packed together. We deserialize the
0060                   # whole partition first, then cap the number of elements to be returned.
0061                 } else if (serializedMode == "row") {
0062                   res <- readRowList(obj)
0063                   # For DataFrames that have been converted to RRDDs, we call readRowList
0064                   # which will read in each row of the RRDD as a list and deserialize
0065                   # each element.
0066                   flatten <<- FALSE
0067                   # Use global assignment to change the flatten flag. This means
0068                   # we don't have to worry about the default argument in other functions
0069                   # e.g. collect
0070                 }
0071                 # TODO: is it possible to distinguish element boundary so that we can
0072                 # unserialize only what we need?
0073                 if (!is.null(logicalUpperBound)) {
0074                   res <- head(res, n = logicalUpperBound)
0075                 }
0076               } else {
0077                 # obj is of a primitive Java type, is simplified to R's
0078                 # corresponding type.
0079                 res <- list(obj)
0080               }
0081             }
0082             res
0083           })
0084   } else {
0085     list()
0086   }
0087 
0088   if (flatten) {
0089     as.list(unlist(results, recursive = FALSE))
0090   } else {
0091     as.list(results)
0092   }
0093 }
0094 
0095 # Returns TRUE if `name` refers to an RDD in the given environment `env`
0096 isRDD <- function(name, env) {
0097   obj <- get(name, envir = env)
0098   inherits(obj, "RDD")
0099 }
0100 
0101 #' Compute the hashCode of an object
0102 #'
0103 #' Java-style function to compute the hashCode for the given object. Returns
0104 #' an integer value.
0105 #'
0106 #' @details
0107 #' This only works for integer, numeric and character types right now.
0108 #'
0109 #' @param key the object to be hashed
0110 #' @return the hash code as an integer
0111 #' @examples
0112 #'\dontrun{
0113 #' hashCode(1L) # 1
0114 #' hashCode(1.0) # 1072693248
0115 #' hashCode("1") # 49
0116 #'}
0117 #' @note hashCode since 1.4.0
0118 hashCode <- function(key) {
0119   if (class(key) == "integer") {
0120     as.integer(key[[1]])
0121   } else if (class(key) == "numeric") {
0122     # Convert the double to long and then calculate the hash code
0123     rawVec <- writeBin(key[[1]], con = raw())
0124     intBits <- packBits(rawToBits(rawVec), "integer")
0125     as.integer(bitwXor(intBits[2], intBits[1]))
0126   } else if (class(key) == "character") {
0127     # TODO: SPARK-7839 means we might not have the native library available
0128     n <- nchar(key)
0129     if (n == 0) {
0130       0L
0131     } else {
0132       asciiVals <- sapply(charToRaw(key), function(x) { strtoi(x, 16L) })
0133       hashC <- 0
0134       for (k in seq_len(length(asciiVals))) {
0135         hashC <- mult31AndAdd(hashC, asciiVals[k])
0136       }
0137       as.integer(hashC)
0138     }
0139   } else {
0140     warning("Could not hash object, returning 0")
0141     as.integer(0)
0142   }
0143 }
0144 
0145 # Helper function used to wrap a 'numeric' value to integer bounds.
0146 # Useful for implementing C-like integer arithmetic
0147 wrapInt <- function(value) {
0148   if (value > .Machine$integer.max) {
0149     value <- value - 2 * .Machine$integer.max - 2
0150   } else if (value < -1 * .Machine$integer.max) {
0151     value <- 2 * .Machine$integer.max + value + 2
0152   }
0153   value
0154 }
0155 
0156 # Multiply `val` by 31 and add `addVal` to the result. Ensures that
0157 # integer-overflows are handled at every step.
0158 #
0159 # TODO: this function does not handle integer overflow well
0160 mult31AndAdd <- function(val, addVal) {
0161   vec <- c(bitwShiftL(val, c(4, 3, 2, 1, 0)), addVal)
0162   vec[is.na(vec)] <- 0
0163   Reduce(function(a, b) {
0164           wrapInt(as.numeric(a) + as.numeric(b))
0165          },
0166          vec)
0167 }
0168 
0169 # Create a new RDD with serializedMode == "byte".
0170 # Return itself if already in "byte" format.
0171 serializeToBytes <- function(rdd) {
0172   if (!inherits(rdd, "RDD")) {
0173     stop("Argument 'rdd' is not an RDD type.")
0174   }
0175   if (getSerializedMode(rdd) != "byte") {
0176     ser.rdd <- lapply(rdd, function(x) { x })
0177     return(ser.rdd)
0178   } else {
0179     return(rdd)
0180   }
0181 }
0182 
0183 # Create a new RDD with serializedMode == "string".
0184 # Return itself if already in "string" format.
0185 serializeToString <- function(rdd) {
0186   if (!inherits(rdd, "RDD")) {
0187     stop("Argument 'rdd' is not an RDD type.")
0188   }
0189   if (getSerializedMode(rdd) != "string") {
0190     ser.rdd <- lapply(rdd, function(x) { toString(x) })
0191     # force it to create jrdd using "string"
0192     getJRDD(ser.rdd, serializedMode = "string")
0193     return(ser.rdd)
0194   } else {
0195     return(rdd)
0196   }
0197 }
0198 
0199 # Fast append to list by using an accumulator.
0200 # http://stackoverflow.com/questions/17046336/here-we-go-again-append-an-element-to-a-list-in-r
0201 #
0202 # The accumulator should has three fields size, counter and data.
0203 # This function amortizes the allocation cost by doubling
0204 # the size of the list every time it fills up.
0205 addItemToAccumulator <- function(acc, item) {
0206   if (acc$counter == acc$size) {
0207     acc$size <- acc$size * 2
0208     length(acc$data) <- acc$size
0209   }
0210   acc$counter <- acc$counter + 1
0211   acc$data[[acc$counter]] <- item
0212 }
0213 
0214 initAccumulator <- function() {
0215   acc <- new.env()
0216   acc$counter <- 0
0217   acc$data <- list(NULL)
0218   acc$size <- 1
0219   acc
0220 }
0221 
0222 # Utility function to sort a list of key value pairs
0223 # Used in unit tests
0224 sortKeyValueList <- function(kv_list, decreasing = FALSE) {
0225   keys <- sapply(kv_list, function(x) x[[1]])
0226   kv_list[order(keys, decreasing = decreasing)]
0227 }
0228 
0229 # Utility function to generate compact R lists from grouped rdd
0230 # Used in Join-family functions
0231 # param:
0232 #   tagged_list R list generated via groupByKey with tags(1L, 2L, ...)
0233 #   cnull Boolean list where each element determines whether the corresponding list should
0234 #         be converted to list(NULL)
0235 genCompactLists <- function(tagged_list, cnull) {
0236   len <- length(tagged_list)
0237   lists <- list(vector("list", len), vector("list", len))
0238   index <- list(1, 1)
0239 
0240   for (x in tagged_list) {
0241     tag <- x[[1]]
0242     idx <- index[[tag]]
0243     lists[[tag]][[idx]] <- x[[2]]
0244     index[[tag]] <- idx + 1
0245   }
0246 
0247   len <- lapply(index, function(x) x - 1)
0248   for (i in (1:2)) {
0249     if (cnull[[i]] && len[[i]] == 0) {
0250       lists[[i]] <- list(NULL)
0251     } else {
0252       length(lists[[i]]) <- len[[i]]
0253     }
0254   }
0255 
0256   lists
0257 }
0258 
0259 # Utility function to merge compact R lists
0260 # Used in Join-family functions
0261 # param:
0262 #   left/right Two compact lists ready for Cartesian product
0263 mergeCompactLists <- function(left, right) {
0264   result <- list()
0265   length(result) <- length(left) * length(right)
0266   index <- 1
0267   for (i in left) {
0268     for (j in right) {
0269       result[[index]] <- list(i, j)
0270       index <- index + 1
0271     }
0272   }
0273   result
0274 }
0275 
0276 # Utility function to wrapper above two operations
0277 # Used in Join-family functions
0278 # param (same as genCompactLists):
0279 #   tagged_list R list generated via groupByKey with tags(1L, 2L, ...)
0280 #   cnull Boolean list where each element determines whether the corresponding list should
0281 #         be converted to list(NULL)
0282 joinTaggedList <- function(tagged_list, cnull) {
0283   lists <- genCompactLists(tagged_list, cnull)
0284   mergeCompactLists(lists[[1]], lists[[2]])
0285 }
0286 
0287 # Utility function to reduce a key-value list with predicate
0288 # Used in *ByKey functions
0289 # param
0290 #   pair key-value pair
0291 #   keys/vals env of key/value with hashes
0292 #   updateOrCreatePred predicate function
0293 #   updateFn update or merge function for existing pair, similar with `mergeVal` @combineByKey
0294 #   createFn create function for new pair, similar with `createCombiner` @combinebykey
0295 updateOrCreatePair <- function(pair, keys, vals, updateOrCreatePred, updateFn, createFn) {
0296   # assume hashVal bind to `$hash`, key/val with index 1/2
0297   hashVal <- pair$hash
0298   key <- pair[[1]]
0299   val <- pair[[2]]
0300   if (updateOrCreatePred(pair)) {
0301     assign(hashVal, do.call(updateFn, list(get(hashVal, envir = vals), val)), envir = vals)
0302   } else {
0303     assign(hashVal, do.call(createFn, list(val)), envir = vals)
0304     assign(hashVal, key, envir = keys)
0305   }
0306 }
0307 
0308 # Utility function to convert key&values envs into key-val list
0309 convertEnvsToList <- function(keys, vals) {
0310   lapply(ls(keys),
0311          function(name) {
0312            list(keys[[name]], vals[[name]])
0313          })
0314 }
0315 
0316 # Utility function to merge 2 environments with the second overriding values in the first
0317 # env1 is changed in place
0318 overrideEnvs <- function(env1, env2) {
0319   lapply(ls(env2),
0320          function(name) {
0321            env1[[name]] <- env2[[name]]
0322          })
0323 }
0324 
0325 # Utility function to capture the varargs into environment object
0326 varargsToEnv <- function(...) {
0327   # Based on http://stackoverflow.com/a/3057419/4577954
0328   pairs <- list(...)
0329   env <- new.env()
0330   for (name in names(pairs)) {
0331     env[[name]] <- pairs[[name]]
0332   }
0333   env
0334 }
0335 
0336 # Utility function to capture the varargs into environment object but all values are converted
0337 # into string.
0338 varargsToStrEnv <- function(...) {
0339   pairs <- list(...)
0340   nameList <- names(pairs)
0341   env <- new.env()
0342   ignoredNames <- list()
0343 
0344   if (is.null(nameList)) {
0345     # When all arguments are not named, names(..) returns NULL.
0346     ignoredNames <- pairs
0347   } else {
0348     for (i in seq_along(pairs)) {
0349       name <- nameList[i]
0350       value <- pairs[i]
0351       if (identical(name, "")) {
0352         # When some of arguments are not named, name is "".
0353         ignoredNames <- append(ignoredNames, value)
0354       } else {
0355         value <- pairs[[name]]
0356         if (!(is.logical(value) || is.numeric(value) || is.character(value) || is.null(value))) {
0357           stop("Unsupported type for ", name, " : ", toString(class(value)), ". ",
0358                "Supported types are logical, numeric, character and NULL.", call. = FALSE)
0359         }
0360         if (is.logical(value)) {
0361           env[[name]] <- tolower(as.character(value))
0362         } else if (is.null(value)) {
0363           env[[name]] <- value
0364         } else {
0365           env[[name]] <- as.character(value)
0366         }
0367       }
0368     }
0369   }
0370 
0371   if (length(ignoredNames) != 0) {
0372     warning("Unnamed arguments ignored: ", toString(ignoredNames), ".", call. = FALSE)
0373   }
0374   env
0375 }
0376 
0377 getStorageLevel <- function(newLevel = c("DISK_ONLY",
0378                                          "DISK_ONLY_2",
0379                                          "MEMORY_AND_DISK",
0380                                          "MEMORY_AND_DISK_2",
0381                                          "MEMORY_AND_DISK_SER",
0382                                          "MEMORY_AND_DISK_SER_2",
0383                                          "MEMORY_ONLY",
0384                                          "MEMORY_ONLY_2",
0385                                          "MEMORY_ONLY_SER",
0386                                          "MEMORY_ONLY_SER_2",
0387                                          "OFF_HEAP")) {
0388   match.arg(newLevel)
0389   storageLevelClass <- "org.apache.spark.storage.StorageLevel"
0390   storageLevel <- switch(newLevel,
0391                          "DISK_ONLY" = callJStatic(storageLevelClass, "DISK_ONLY"),
0392                          "DISK_ONLY_2" = callJStatic(storageLevelClass, "DISK_ONLY_2"),
0393                          "MEMORY_AND_DISK" = callJStatic(storageLevelClass, "MEMORY_AND_DISK"),
0394                          "MEMORY_AND_DISK_2" = callJStatic(storageLevelClass, "MEMORY_AND_DISK_2"),
0395                          "MEMORY_AND_DISK_SER" = callJStatic(storageLevelClass,
0396                                                              "MEMORY_AND_DISK_SER"),
0397                          "MEMORY_AND_DISK_SER_2" = callJStatic(storageLevelClass,
0398                                                                "MEMORY_AND_DISK_SER_2"),
0399                          "MEMORY_ONLY" = callJStatic(storageLevelClass, "MEMORY_ONLY"),
0400                          "MEMORY_ONLY_2" = callJStatic(storageLevelClass, "MEMORY_ONLY_2"),
0401                          "MEMORY_ONLY_SER" = callJStatic(storageLevelClass, "MEMORY_ONLY_SER"),
0402                          "MEMORY_ONLY_SER_2" = callJStatic(storageLevelClass, "MEMORY_ONLY_SER_2"),
0403                          "OFF_HEAP" = callJStatic(storageLevelClass, "OFF_HEAP"))
0404 }
0405 
0406 storageLevelToString <- function(levelObj) {
0407   useDisk <- callJMethod(levelObj, "useDisk")
0408   useMemory <- callJMethod(levelObj, "useMemory")
0409   useOffHeap <- callJMethod(levelObj, "useOffHeap")
0410   deserialized <- callJMethod(levelObj, "deserialized")
0411   replication <- callJMethod(levelObj, "replication")
0412   shortName <- if (!useDisk && !useMemory && !useOffHeap && !deserialized && replication == 1) {
0413     "NONE"
0414   } else if (useDisk && !useMemory && !useOffHeap && !deserialized && replication == 1) {
0415     "DISK_ONLY"
0416   } else if (useDisk && !useMemory && !useOffHeap && !deserialized && replication == 2) {
0417     "DISK_ONLY_2"
0418   } else if (!useDisk && useMemory && !useOffHeap && deserialized && replication == 1) {
0419     "MEMORY_ONLY"
0420   } else if (!useDisk && useMemory && !useOffHeap && deserialized && replication == 2) {
0421     "MEMORY_ONLY_2"
0422   } else if (!useDisk && useMemory && !useOffHeap && !deserialized && replication == 1) {
0423     "MEMORY_ONLY_SER"
0424   } else if (!useDisk && useMemory && !useOffHeap && !deserialized && replication == 2) {
0425     "MEMORY_ONLY_SER_2"
0426   } else if (useDisk && useMemory && !useOffHeap && deserialized && replication == 1) {
0427     "MEMORY_AND_DISK"
0428   } else if (useDisk && useMemory && !useOffHeap && deserialized && replication == 2) {
0429     "MEMORY_AND_DISK_2"
0430   } else if (useDisk && useMemory && !useOffHeap && !deserialized && replication == 1) {
0431     "MEMORY_AND_DISK_SER"
0432   } else if (useDisk && useMemory && !useOffHeap && !deserialized && replication == 2) {
0433     "MEMORY_AND_DISK_SER_2"
0434   } else if (useDisk && useMemory && useOffHeap && !deserialized && replication == 1) {
0435     "OFF_HEAP"
0436   } else {
0437     NULL
0438   }
0439   fullInfo <- callJMethod(levelObj, "toString")
0440   if (is.null(shortName)) {
0441     fullInfo
0442   } else {
0443     paste(shortName, "-", fullInfo)
0444   }
0445 }
0446 
0447 # Utility function for functions where an argument needs to be integer but we want to allow
0448 # the user to type (for example) `5` instead of `5L` to avoid a confusing error message.
0449 numToInt <- function(num) {
0450   if (as.integer(num) != num) {
0451     warning("Coercing ", as.list(sys.call())[[2L]], " to integer.")
0452   }
0453   as.integer(num)
0454 }
0455 
0456 # Utility function to recursively traverse the Abstract Syntax Tree (AST) of a
0457 # user defined function (UDF), and to examine variables in the UDF to decide
0458 # if their values should be included in the new function environment.
0459 # param
0460 #   node The current AST node in the traversal.
0461 #   oldEnv The original function environment.
0462 #   defVars An Accumulator of variables names defined in the function's calling environment,
0463 #           including function argument and local variable names.
0464 #   checkedFunc An environment of function objects examined during cleanClosure. It can
0465 #               be considered as a "name"-to-"list of functions" mapping.
0466 #   newEnv A new function environment to store necessary function dependencies, an output argument.
0467 processClosure <- function(node, oldEnv, defVars, checkedFuncs, newEnv) {
0468   nodeLen <- length(node)
0469 
0470   if (nodeLen > 1 && typeof(node) == "language") {
0471     # Recursive case: current AST node is an internal node, check for its children.
0472     if (length(node[[1]]) > 1) {
0473       for (i in 1:nodeLen) {
0474         processClosure(node[[i]], oldEnv, defVars, checkedFuncs, newEnv)
0475       }
0476     } else {
0477       # if node[[1]] is length of 1, check for some R special functions.
0478       nodeChar <- as.character(node[[1]])
0479       if (nodeChar == "{" || nodeChar == "(") {
0480         # Skip start symbol.
0481         for (i in 2:nodeLen) {
0482           processClosure(node[[i]], oldEnv, defVars, checkedFuncs, newEnv)
0483         }
0484       } else if (nodeChar == "<-" || nodeChar == "=" ||
0485                    nodeChar == "<<-") {
0486         # Assignment Ops.
0487         defVar <- node[[2]]
0488         if (length(defVar) == 1 && typeof(defVar) == "symbol") {
0489           # Add the defined variable name into defVars.
0490           addItemToAccumulator(defVars, as.character(defVar))
0491         } else {
0492           processClosure(node[[2]], oldEnv, defVars, checkedFuncs, newEnv)
0493         }
0494         for (i in 3:nodeLen) {
0495           processClosure(node[[i]], oldEnv, defVars, checkedFuncs, newEnv)
0496         }
0497       } else if (nodeChar == "function") {
0498         # Function definition.
0499         # Add parameter names.
0500         newArgs <- names(node[[2]])
0501         lapply(newArgs, function(arg) { addItemToAccumulator(defVars, arg) })
0502         for (i in 3:nodeLen) {
0503           processClosure(node[[i]], oldEnv, defVars, checkedFuncs, newEnv)
0504         }
0505       } else if (nodeChar == "$") {
0506         # Skip the field.
0507         processClosure(node[[2]], oldEnv, defVars, checkedFuncs, newEnv)
0508       } else if (nodeChar == "::" || nodeChar == ":::") {
0509         processClosure(node[[3]], oldEnv, defVars, checkedFuncs, newEnv)
0510       } else {
0511         for (i in 1:nodeLen) {
0512           processClosure(node[[i]], oldEnv, defVars, checkedFuncs, newEnv)
0513         }
0514       }
0515     }
0516   } else if (nodeLen == 1 &&
0517                (typeof(node) == "symbol" || typeof(node) == "language")) {
0518     # Base case: current AST node is a leaf node and a symbol or a function call.
0519     nodeChar <- as.character(node)
0520     if (!nodeChar %in% defVars$data) {
0521       # Not a function parameter or local variable.
0522       func.env <- oldEnv
0523       topEnv <- parent.env(.GlobalEnv)
0524       # Search in function environment, and function's enclosing environments
0525       # up to global environment. There is no need to look into package environments
0526       # above the global or namespace environment that is not SparkR below the global,
0527       # as they are assumed to be loaded on workers.
0528       while (!identical(func.env, topEnv)) {
0529         # Namespaces other than "SparkR" will not be searched.
0530         if (!isNamespace(func.env) ||
0531             (getNamespaceName(func.env) == "SparkR" &&
0532                !(nodeChar %in% getNamespaceExports("SparkR")))) {
0533           # Only include SparkR internals.
0534 
0535           # Set parameter 'inherits' to FALSE since we do not need to search in
0536           # attached package environments.
0537           if (tryCatch(exists(nodeChar, envir = func.env, inherits = FALSE),
0538                        error = function(e) { FALSE })) {
0539             obj <- get(nodeChar, envir = func.env, inherits = FALSE)
0540             if (is.function(obj)) {
0541               # If the node is a function call.
0542               funcList <- mget(nodeChar, envir = checkedFuncs, inherits = F,
0543                                ifnotfound = list(list(NULL)))[[1]]
0544               found <- sapply(funcList, function(func) {
0545                 ifelse(
0546                   identical(func, obj) &&
0547                     # Also check if the parent environment is identical to current parent
0548                     identical(parent.env(environment(func)), func.env),
0549                   TRUE, FALSE)
0550               })
0551               if (sum(found) > 0) {
0552                 # If function has been examined ignore
0553                 break
0554               }
0555               # Function has not been examined, record it and recursively clean its closure.
0556               assign(nodeChar,
0557                      if (is.null(funcList[[1]])) {
0558                        list(obj)
0559                      } else {
0560                        append(funcList, obj)
0561                      },
0562                      envir = checkedFuncs)
0563               obj <- cleanClosure(obj, checkedFuncs)
0564             }
0565             assign(nodeChar, obj, envir = newEnv)
0566             break
0567           }
0568         }
0569 
0570         # Continue to search in enclosure.
0571         func.env <- parent.env(func.env)
0572       }
0573     }
0574   }
0575 }
0576 
0577 # Utility function to get user defined function (UDF) dependencies (closure).
0578 # More specifically, this function captures the values of free variables defined
0579 # outside a UDF, and stores them in the function's environment.
0580 # param
0581 #   func A function whose closure needs to be captured.
0582 #   checkedFunc An environment of function objects examined during cleanClosure. It can be
0583 #               considered as a "name"-to-"list of functions" mapping.
0584 # return value
0585 #   a new version of func that has a correct environment (closure).
0586 cleanClosure <- function(func, checkedFuncs = new.env()) {
0587   if (is.function(func)) {
0588     newEnv <- new.env(parent = .GlobalEnv)
0589     func.body <- body(func)
0590     oldEnv <- environment(func)
0591     # defVars is an Accumulator of variables names defined in the function's calling
0592     # environment. First, function's arguments are added to defVars.
0593     defVars <- initAccumulator()
0594     argNames <- names(as.list(args(func)))
0595     for (i in 1:(length(argNames) - 1)) {
0596       # Remove the ending NULL in pairlist.
0597       addItemToAccumulator(defVars, argNames[i])
0598     }
0599     # Recursively examine variables in the function body.
0600     processClosure(func.body, oldEnv, defVars, checkedFuncs, newEnv)
0601     environment(func) <- newEnv
0602   }
0603   func
0604 }
0605 
0606 # Append partition lengths to each partition in two input RDDs if needed.
0607 # param
0608 #   x An RDD.
0609 #   Other An RDD.
0610 # return value
0611 #   A list of two result RDDs.
0612 appendPartitionLengths <- function(x, other) {
0613   if (getSerializedMode(x) != getSerializedMode(other) ||
0614       getSerializedMode(x) == "byte") {
0615     # Append the number of elements in each partition to that partition so that we can later
0616     # know the boundary of elements from x and other.
0617     #
0618     # Note that this appending also serves the purpose of reserialization, because even if
0619     # any RDD is serialized, we need to reserialize it to make sure its partitions are encoded
0620     # as a single byte array. For example, partitions of an RDD generated from partitionBy()
0621     # may be encoded as multiple byte arrays.
0622     appendLength <- function(part) {
0623       len <- length(part)
0624       part[[len + 1]] <- len + 1
0625       part
0626     }
0627     x <- lapplyPartition(x, appendLength)
0628     other <- lapplyPartition(other, appendLength)
0629   }
0630   list(x, other)
0631 }
0632 
0633 # Perform zip or cartesian between elements from two RDDs in each partition
0634 # param
0635 #   rdd An RDD.
0636 #   zip A boolean flag indicating this call is for zip operation or not.
0637 # return value
0638 #   A result RDD.
0639 mergePartitions <- function(rdd, zip) {
0640   serializerMode <- getSerializedMode(rdd)
0641   partitionFunc <- function(partIndex, part) {
0642     len <- length(part)
0643     if (len > 0) {
0644       if (serializerMode == "byte") {
0645         lengthOfValues <- part[[len]]
0646         lengthOfKeys <- part[[len - lengthOfValues]]
0647         stopifnot(len == lengthOfKeys + lengthOfValues)
0648 
0649         # For zip operation, check if corresponding partitions
0650         # of both RDDs have the same number of elements.
0651         if (zip && lengthOfKeys != lengthOfValues) {
0652           stop("Can only zip RDDs with same number of elements ",
0653                "in each pair of corresponding partitions.")
0654         }
0655 
0656         if (lengthOfKeys > 1) {
0657           keys <- part[1 : (lengthOfKeys - 1)]
0658         } else {
0659           keys <- list()
0660         }
0661         if (lengthOfValues > 1) {
0662           values <- part[(lengthOfKeys + 1) : (len - 1)]
0663         } else {
0664           values <- list()
0665         }
0666 
0667         if (!zip) {
0668           return(mergeCompactLists(keys, values))
0669         }
0670       } else {
0671         keys <- part[c(TRUE, FALSE)]
0672         values <- part[c(FALSE, TRUE)]
0673       }
0674       mapply(
0675         function(k, v) { list(k, v) },
0676         keys,
0677         values,
0678         SIMPLIFY = FALSE,
0679         USE.NAMES = FALSE)
0680     } else {
0681       part
0682     }
0683   }
0684 
0685   PipelinedRDD(rdd, partitionFunc)
0686 }
0687 
0688 # Convert a named list to struct so that
0689 # SerDe won't confuse between a normal named list and struct
0690 listToStruct <- function(list) {
0691   stopifnot(class(list) == "list")
0692   stopifnot(!is.null(names(list)))
0693   class(list) <- "struct"
0694   list
0695 }
0696 
0697 # Convert a struct to a named list
0698 structToList <- function(struct) {
0699   stopifnot(class(list) == "struct")
0700 
0701   class(struct) <- "list"
0702   struct
0703 }
0704 
0705 # Convert a named list to an environment to be passed to JVM
0706 convertNamedListToEnv <- function(namedList) {
0707   # Make sure each item in the list has a name
0708   names <- names(namedList)
0709   stopifnot(
0710     if (is.null(names)) {
0711       length(namedList) == 0
0712     } else {
0713       !any(is.na(names))
0714     })
0715 
0716   env <- new.env()
0717   for (name in names) {
0718     env[[name]] <- namedList[[name]]
0719   }
0720   env
0721 }
0722 
0723 # Assign a new environment for attach() and with() methods
0724 assignNewEnv <- function(data) {
0725   stopifnot(class(data) == "SparkDataFrame")
0726   cols <- columns(data)
0727   stopifnot(length(cols) > 0)
0728 
0729   env <- new.env()
0730   for (i in seq_len(length(cols))) {
0731     assign(x = cols[i], value = data[, cols[i], drop = F], envir = env)
0732   }
0733   env
0734 }
0735 
0736 # Utility function to split by ',' and whitespace, remove empty tokens
0737 splitString <- function(input) {
0738   Filter(nzchar, unlist(strsplit(input, ",|\\s")))
0739 }
0740 
0741 varargsToJProperties <- function(...) {
0742   pairs <- list(...)
0743   props <- newJObject("java.util.Properties")
0744   if (length(pairs) > 0) {
0745     lapply(ls(pairs), function(k) {
0746       callJMethod(props, "setProperty", as.character(k), as.character(pairs[[k]]))
0747     })
0748   }
0749   props
0750 }
0751 
0752 launchScript <- function(script, combinedArgs, wait = FALSE, stdout = "", stderr = "") {
0753   if (.Platform$OS.type == "windows") {
0754     scriptWithArgs <- paste(script, combinedArgs, sep = " ")
0755     # on Windows, intern = F seems to mean output to the console. (documentation on this is missing)
0756     shell(scriptWithArgs, translate = TRUE, wait = wait, intern = wait)
0757   } else {
0758     # http://stat.ethz.ch/R-manual/R-devel/library/base/html/system2.html
0759     # stdout = F means discard output
0760     # stdout = "" means to its console (default)
0761     # Note that the console of this child process might not be the same as the running R process.
0762     system2(script, combinedArgs, stdout = stdout, wait = wait, stderr = stderr)
0763   }
0764 }
0765 
0766 getSparkContext <- function() {
0767   if (!exists(".sparkRjsc", envir = .sparkREnv)) {
0768     stop("SparkR has not been initialized. Please call sparkR.session()")
0769   }
0770   sc <- get(".sparkRjsc", envir = .sparkREnv)
0771   sc
0772 }
0773 
0774 isMasterLocal <- function(master) {
0775   grepl("^local(\\[([0-9]+|\\*)\\])?$", master, perl = TRUE)
0776 }
0777 
0778 isClientMode <- function(master) {
0779   grepl("([a-z]+)-client$", master, perl = TRUE)
0780 }
0781 
0782 isSparkRShell <- function() {
0783   grepl(".*shell\\.R$", Sys.getenv("R_PROFILE_USER"), perl = TRUE)
0784 }
0785 
0786 # Works identically with `callJStatic(...)` but throws a pretty formatted exception.
0787 handledCallJStatic <- function(cls, method, ...) {
0788   result <- tryCatch(callJStatic(cls, method, ...),
0789                      error = function(e) {
0790                        captureJVMException(e, method)
0791                      })
0792   result
0793 }
0794 
0795 # Works identically with `callJMethod(...)` but throws a pretty formatted exception.
0796 handledCallJMethod <- function(obj, method, ...) {
0797   result <- tryCatch(callJMethod(obj, method, ...),
0798                      error = function(e) {
0799                        captureJVMException(e, method)
0800                      })
0801   result
0802 }
0803 
0804 captureJVMException <- function(e, method) {
0805   rawmsg <- as.character(e)
0806   if (any(grepl("^Error in .*?: ", rawmsg))) {
0807     # If the exception message starts with "Error in ...", this is possibly
0808     # "Error in invokeJava(...)". Here, it replaces the characters to
0809     # `paste("Error in", method, ":")` in order to identify which function
0810     # was called in JVM side.
0811     stacktrace <- strsplit(rawmsg, "Error in .*?: ")[[1]]
0812     rmsg <- paste("Error in", method, ":")
0813     stacktrace <- paste(rmsg[1], stacktrace[2])
0814   } else {
0815     # Otherwise, do not convert the error message just in case.
0816     stacktrace <- rawmsg
0817   }
0818 
0819   # StreamingQueryException could wrap an IllegalArgumentException, so look for that first
0820   if (any(grepl("org.apache.spark.sql.streaming.StreamingQueryException: ",
0821                 stacktrace, fixed = TRUE))) {
0822     msg <- strsplit(stacktrace, "org.apache.spark.sql.streaming.StreamingQueryException: ",
0823                     fixed = TRUE)[[1]]
0824     # Extract "Error in ..." message.
0825     rmsg <- msg[1]
0826     # Extract the first message of JVM exception.
0827     first <- strsplit(msg[2], "\r?\n\tat")[[1]][1]
0828     stop(rmsg, "streaming query error - ", first, call. = FALSE)
0829   } else if (any(grepl("java.lang.IllegalArgumentException: ", stacktrace, fixed = TRUE))) {
0830     msg <- strsplit(stacktrace, "java.lang.IllegalArgumentException: ", fixed = TRUE)[[1]]
0831     # Extract "Error in ..." message.
0832     rmsg <- msg[1]
0833     # Extract the first message of JVM exception.
0834     first <- strsplit(msg[2], "\r?\n\tat")[[1]][1]
0835     stop(rmsg, "illegal argument - ", first, call. = FALSE)
0836   } else if (any(grepl("org.apache.spark.sql.AnalysisException: ", stacktrace, fixed = TRUE))) {
0837     msg <- strsplit(stacktrace, "org.apache.spark.sql.AnalysisException: ", fixed = TRUE)[[1]]
0838     # Extract "Error in ..." message.
0839     rmsg <- msg[1]
0840     # Extract the first message of JVM exception.
0841     first <- strsplit(msg[2], "\r?\n\tat")[[1]][1]
0842     stop(rmsg, "analysis error - ", first, call. = FALSE)
0843   } else
0844     if (any(grepl("org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException: ",
0845                   stacktrace, fixed = TRUE))) {
0846     msg <- strsplit(stacktrace, "org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException: ",
0847                     fixed = TRUE)[[1]]
0848     # Extract "Error in ..." message.
0849     rmsg <- msg[1]
0850     # Extract the first message of JVM exception.
0851     first <- strsplit(msg[2], "\r?\n\tat")[[1]][1]
0852     stop(rmsg, "no such database - ", first, call. = FALSE)
0853   } else
0854     if (any(grepl("org.apache.spark.sql.catalyst.analysis.NoSuchTableException: ",
0855                   stacktrace, fixed = TRUE))) {
0856     msg <- strsplit(stacktrace, "org.apache.spark.sql.catalyst.analysis.NoSuchTableException: ",
0857                     fixed = TRUE)[[1]]
0858     # Extract "Error in ..." message.
0859     rmsg <- msg[1]
0860     # Extract the first message of JVM exception.
0861     first <- strsplit(msg[2], "\r?\n\tat")[[1]][1]
0862     stop(rmsg, "no such table - ", first, call. = FALSE)
0863   } else if (any(grepl("org.apache.spark.sql.catalyst.parser.ParseException: ",
0864                        stacktrace, fixed = TRUE))) {
0865     msg <- strsplit(stacktrace, "org.apache.spark.sql.catalyst.parser.ParseException: ",
0866                     fixed = TRUE)[[1]]
0867     # Extract "Error in ..." message.
0868     rmsg <- msg[1]
0869     # Extract the first message of JVM exception.
0870     first <- strsplit(msg[2], "\r?\n\tat")[[1]][1]
0871     stop(rmsg, "parse error - ", first, call. = FALSE)
0872   } else {
0873     stop(stacktrace, call. = FALSE)
0874   }
0875 }
0876 
0877 # rbind a list of rows with raw (binary) columns
0878 #
0879 # @param inputData a list of rows, with each row a list
0880 # @return data.frame with raw columns as lists
0881 rbindRaws <- function(inputData) {
0882   row1 <- inputData[[1]]
0883   rawcolumns <- ("raw" == sapply(row1, class))
0884 
0885   listmatrix <- do.call(rbind, inputData)
0886   # A dataframe with all list columns
0887   out <- as.data.frame(listmatrix)
0888   out[!rawcolumns] <- lapply(out[!rawcolumns], unlist)
0889   out
0890 }
0891 
0892 # Get basename without extension from URL
0893 basenameSansExtFromUrl <- function(url) {
0894   # split by '/'
0895   splits <- unlist(strsplit(url, "^.+/"))
0896   last <- tail(splits, 1)
0897   # this is from file_path_sans_ext
0898   # first, remove any compression extension
0899   filename <- sub("[.](gz|bz2|xz)$", "", last)
0900   # then, strip extension by the last '.'
0901   sub("([^.]+)\\.[[:alnum:]]+$", "\\1", filename)
0902 }
0903 
0904 isAtomicLengthOne <- function(x) {
0905   is.atomic(x) && length(x) == 1
0906 }
0907 
0908 is_windows <- function() {
0909   .Platform$OS.type == "windows"
0910 }
0911 
0912 hadoop_home_set <- function() {
0913   !identical(Sys.getenv("HADOOP_HOME"), "")
0914 }
0915 
0916 windows_with_hadoop <- function() {
0917   !is_windows() || hadoop_home_set()
0918 }
0919 
0920 # get0 not supported before R 3.2.0
0921 getOne <- function(x, envir, inherits = TRUE, ifnotfound = NULL) {
0922   mget(x[1L], envir = envir, inherits = inherits, ifnotfound = list(ifnotfound))[[1L]]
0923 }
0924 
0925 # Returns a vector of parent directories, traversing up count times, starting with a full path
0926 # eg. traverseParentDirs("/Users/user/Library/Caches/spark/spark2.2", 1) should return
0927 # this "/Users/user/Library/Caches/spark/spark2.2"
0928 # and  "/Users/user/Library/Caches/spark"
0929 traverseParentDirs <- function(x, count) {
0930   if (dirname(x) == x || count <= 0) x else c(x, Recall(dirname(x), count - 1))
0931 }