Back to home page

OSCL-LXR

 
 

    


0001 #
0002 # Licensed to the Apache Software Foundation (ASF) under one or more
0003 # contributor license agreements.  See the NOTICE file distributed with
0004 # this work for additional information regarding copyright ownership.
0005 # The ASF licenses this file to You under the Apache License, Version 2.0
0006 # (the "License"); you may not use this file except in compliance with
0007 # the License.  You may obtain a copy of the License at
0008 #
0009 #    http://www.apache.org/licenses/LICENSE-2.0
0010 #
0011 # Unless required by applicable law or agreed to in writing, software
0012 # distributed under the License is distributed on an "AS IS" BASIS,
0013 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014 # See the License for the specific language governing permissions and
0015 # limitations under the License.
0016 #
0017 
0018 # context.R: SparkContext driven functions
0019 
0020 getMinPartitions <- function(sc, minPartitions) {
0021   if (is.null(minPartitions)) {
0022     defaultParallelism <- callJMethod(sc, "defaultParallelism")
0023     minPartitions <- min(defaultParallelism, 2)
0024   }
0025   as.integer(minPartitions)
0026 }
0027 
0028 #' Create an RDD from a text file.
0029 #'
0030 #' This function reads a text file from HDFS, a local file system (available on all
0031 #' nodes), or any Hadoop-supported file system URI, and creates an
0032 #' RDD of strings from it. The text files must be encoded as UTF-8.
0033 #'
0034 #' @param sc SparkContext to use
0035 #' @param path Path of file to read. A vector of multiple paths is allowed.
0036 #' @param minPartitions Minimum number of partitions to be created. If NULL, the default
0037 #'  value is chosen based on available parallelism.
0038 #' @return RDD where each item is of type \code{character}
0039 #' @noRd
0040 #' @examples
0041 #'\dontrun{
0042 #'  sc <- sparkR.init()
0043 #'  lines <- textFile(sc, "myfile.txt")
0044 #'}
0045 textFile <- function(sc, path, minPartitions = NULL) {
0046   # Allow the user to have a more flexible definition of the text file path
0047   path <- suppressWarnings(normalizePath(path))
0048   # Convert a string vector of paths to a string containing comma separated paths
0049   path <- paste(path, collapse = ",")
0050 
0051   jrdd <- callJMethod(sc, "textFile", path, getMinPartitions(sc, minPartitions))
0052   # jrdd is of type JavaRDD[String]
0053   RDD(jrdd, "string")
0054 }
0055 
0056 #' Load an RDD saved as a SequenceFile containing serialized objects.
0057 #'
0058 #' The file to be loaded should be one that was previously generated by calling
0059 #' saveAsObjectFile() of the RDD class.
0060 #'
0061 #' @param sc SparkContext to use
0062 #' @param path Path of file to read. A vector of multiple paths is allowed.
0063 #' @param minPartitions Minimum number of partitions to be created. If NULL, the default
0064 #'  value is chosen based on available parallelism.
0065 #' @return RDD containing serialized R objects.
0066 #' @seealso saveAsObjectFile
0067 #' @noRd
0068 #' @examples
0069 #'\dontrun{
0070 #'  sc <- sparkR.init()
0071 #'  rdd <- objectFile(sc, "myfile")
0072 #'}
0073 objectFile <- function(sc, path, minPartitions = NULL) {
0074   # Allow the user to have a more flexible definition of the text file path
0075   path <- suppressWarnings(normalizePath(path))
0076   # Convert a string vector of paths to a string containing comma separated paths
0077   path <- paste(path, collapse = ",")
0078 
0079   jrdd <- callJMethod(sc, "objectFile", path, getMinPartitions(sc, minPartitions))
0080   # Assume the RDD contains serialized R objects.
0081   RDD(jrdd, "byte")
0082 }
0083 
0084 makeSplits <- function(numSerializedSlices, length) {
0085   # Generate the slice ids to put each row
0086   # For instance, for numSerializedSlices of 22, length of 50
0087   #  [1]  0  0  2  2  4  4  6  6  6  9  9 11 11 13 13 15 15 15 18 18 20 20 22 22 22
0088   # [26] 25 25 27 27 29 29 31 31 31 34 34 36 36 38 38 40 40 40 43 43 45 45 47 47 47
0089   # Notice the slice group with 3 slices (ie. 6, 15, 22) are roughly evenly spaced.
0090   # We are trying to reimplement the calculation in the positions method in ParallelCollectionRDD
0091   if (numSerializedSlices > 0) {
0092     unlist(lapply(0: (numSerializedSlices - 1), function(x) {
0093       # nolint start
0094       start <- trunc((as.numeric(x) * length) / numSerializedSlices)
0095       end <- trunc(((as.numeric(x) + 1) * length) / numSerializedSlices)
0096       # nolint end
0097       rep(start, end - start)
0098     }))
0099   } else {
0100     1
0101   }
0102 }
0103 
0104 #' Create an RDD from a homogeneous list or vector.
0105 #'
0106 #' This function creates an RDD from a local homogeneous list in R. The elements
0107 #' in the list are split into \code{numSlices} slices and distributed to nodes
0108 #' in the cluster.
0109 #'
0110 #' If size of serialized slices is larger than spark.r.maxAllocationLimit or (200MiB), the function
0111 #' will write it to disk and send the file name to JVM. Also to make sure each slice is not
0112 #' larger than that limit, number of slices may be increased.
0113 #'
0114 #' In 2.2.0 we are changing how the numSlices are used/computed to handle
0115 #' 1 < (length(coll) / numSlices) << length(coll) better, and to get the exact number of slices.
0116 #' This change affects both createDataFrame and spark.lapply.
0117 #' In the specific one case that it is used to convert R native object into SparkDataFrame, it has
0118 #' always been kept at the default of 1. In the case the object is large, we are explicitly setting
0119 #' the parallism to numSlices (which is still 1).
0120 #'
0121 #' Specifically, we are changing to split positions to match the calculation in positions() of
0122 #' ParallelCollectionRDD in Spark.
0123 #'
0124 #' @param sc SparkContext to use
0125 #' @param coll collection to parallelize
0126 #' @param numSlices number of partitions to create in the RDD
0127 #' @return an RDD created from this collection
0128 #' @noRd
0129 #' @examples
0130 #'\dontrun{
0131 #' sc <- sparkR.init()
0132 #' rdd <- parallelize(sc, 1:10, 2)
0133 #' # The RDD should contain 10 elements
0134 #' length(rdd)
0135 #'}
0136 parallelize <- function(sc, coll, numSlices = 1) {
0137   # TODO: bound/safeguard numSlices
0138   # TODO: unit tests for if the split works for all primitives
0139   # TODO: support matrix, data frame, etc
0140 
0141   # Note, for data.frame, createDataFrame turns it into a list before it calls here.
0142   # nolint start
0143   # suppress lintr warning: Place a space before left parenthesis, except in a function call.
0144   if ((!is.list(coll) && !is.vector(coll)) || is.data.frame(coll)) {
0145   # nolint end
0146     if (is.data.frame(coll)) {
0147       message("context.R: A data frame is parallelized by columns.")
0148     } else {
0149       if (is.matrix(coll)) {
0150         message("context.R: A matrix is parallelized by elements.")
0151       } else {
0152         message("context.R: parallelize() currently only supports lists and vectors. ",
0153                 "Calling as.list() to coerce coll into a list.")
0154       }
0155     }
0156     coll <- as.list(coll)
0157   }
0158 
0159   sizeLimit <- getMaxAllocationLimit(sc)
0160   objectSize <- object.size(coll)
0161   len <- length(coll)
0162 
0163   # For large objects we make sure the size of each slice is also smaller than sizeLimit
0164   numSerializedSlices <- min(len, max(numSlices, ceiling(objectSize / sizeLimit)))
0165 
0166   slices <- split(coll, makeSplits(numSerializedSlices, len))
0167 
0168   # Serialize each slice: obtain a list of raws, or a list of lists (slices) of
0169   # 2-tuples of raws
0170   serializedSlices <- lapply(slices, serialize, connection = NULL)
0171 
0172   # The RPC backend cannot handle arguments larger than 2GB (INT_MAX)
0173   # If serialized data is safely less than that threshold we send it over the PRC channel.
0174   # Otherwise, we write it to a file and send the file name
0175   if (objectSize < sizeLimit) {
0176     jrdd <- callJStatic("org.apache.spark.api.r.RRDD", "createRDDFromArray", sc, serializedSlices)
0177   } else {
0178     if (callJStatic("org.apache.spark.api.r.RUtils", "isEncryptionEnabled", sc)) {
0179       connectionTimeout <- as.numeric(Sys.getenv("SPARKR_BACKEND_CONNECTION_TIMEOUT", "6000"))
0180       # the length of slices here is the parallelism to use in the jvm's sc.parallelize()
0181       parallelism <- as.integer(numSlices)
0182       jserver <- newJObject("org.apache.spark.api.r.RParallelizeServer", sc, parallelism)
0183       authSecret <- callJMethod(jserver, "secret")
0184       port <- callJMethod(jserver, "port")
0185       conn <- socketConnection(
0186         port = port, blocking = TRUE, open = "wb", timeout = connectionTimeout)
0187       doServerAuth(conn, authSecret)
0188       writeToConnection(serializedSlices, conn)
0189       jrdd <- callJMethod(jserver, "getResult")
0190     } else {
0191       fileName <- writeToTempFile(serializedSlices)
0192       jrdd <- tryCatch(callJStatic(
0193           "org.apache.spark.api.r.RRDD", "createRDDFromFile", sc, fileName, as.integer(numSlices)),
0194         finally = {
0195           file.remove(fileName)
0196       })
0197     }
0198   }
0199 
0200   RDD(jrdd, "byte")
0201 }
0202 
0203 getMaxAllocationLimit <- function(sc) {
0204   conf <- callJMethod(sc, "getConf")
0205   as.numeric(
0206     callJMethod(conf,
0207       "get",
0208       "spark.r.maxAllocationLimit",
0209       toString(.Machine$integer.max / 10) # Default to a safe value: 200MB
0210   ))
0211 }
0212 
0213 writeToConnection <- function(serializedSlices, conn) {
0214   tryCatch({
0215     for (slice in serializedSlices) {
0216       writeBin(as.integer(length(slice)), conn, endian = "big")
0217       writeBin(slice, conn, endian = "big")
0218     }
0219   }, finally = {
0220     close(conn)
0221   })
0222 }
0223 
0224 writeToTempFile <- function(serializedSlices) {
0225   fileName <- tempfile()
0226   conn <- file(fileName, "wb")
0227   writeToConnection(serializedSlices, conn)
0228   fileName
0229 }
0230 
0231 #' Include this specified package on all workers
0232 #'
0233 #' This function can be used to include a package on all workers before the
0234 #' user's code is executed. This is useful in scenarios where other R package
0235 #' functions are used in a function passed to functions like \code{lapply}.
0236 #' NOTE: The package is assumed to be installed on every node in the Spark
0237 #' cluster.
0238 #'
0239 #' @param sc SparkContext to use
0240 #' @param pkg Package name
0241 #' @noRd
0242 #' @examples
0243 #'\dontrun{
0244 #'  library(Matrix)
0245 #'
0246 #'  sc <- sparkR.init()
0247 #'  # Include the matrix library we will be using
0248 #'  includePackage(sc, Matrix)
0249 #'
0250 #'  generateSparse <- function(x) {
0251 #'    sparseMatrix(i=c(1, 2, 3), j=c(1, 2, 3), x=c(1, 2, 3))
0252 #'  }
0253 #'
0254 #'  rdd <- lapplyPartition(parallelize(sc, 1:2, 2L), generateSparse)
0255 #'  collect(rdd)
0256 #'}
0257 includePackage <- function(sc, pkg) {
0258   pkg <- as.character(substitute(pkg))
0259   if (exists(".packages", .sparkREnv)) {
0260     packages <- .sparkREnv$.packages
0261   } else {
0262     packages <- list()
0263   }
0264   packages <- c(packages, pkg)
0265   .sparkREnv$.packages <- packages
0266 }
0267 
0268 #' Broadcast a variable to all workers
0269 #'
0270 #' Broadcast a read-only variable to the cluster, returning a \code{Broadcast}
0271 #' object for reading it in distributed functions.
0272 #'
0273 #' @param sc Spark Context to use
0274 #' @param object Object to be broadcast
0275 #' @noRd
0276 #' @examples
0277 #'\dontrun{
0278 #' sc <- sparkR.init()
0279 #' rdd <- parallelize(sc, 1:2, 2L)
0280 #'
0281 #' # Large Matrix object that we want to broadcast
0282 #' randomMat <- matrix(nrow=100, ncol=10, data=rnorm(1000))
0283 #' randomMatBr <- broadcastRDD(sc, randomMat)
0284 #'
0285 #' # Use the broadcast variable inside the function
0286 #' useBroadcast <- function(x) {
0287 #'   sum(value(randomMatBr) * x)
0288 #' }
0289 #' sumRDD <- lapply(rdd, useBroadcast)
0290 #'}
0291 broadcastRDD <- function(sc, object) {
0292   objName <- as.character(substitute(object))
0293   serializedObj <- serialize(object, connection = NULL)
0294 
0295   jBroadcast <- callJMethod(sc, "broadcast", serializedObj)
0296   id <- as.character(callJMethod(jBroadcast, "id"))
0297 
0298   Broadcast(id, object, jBroadcast, objName)
0299 }
0300 
0301 #' Set the checkpoint directory
0302 #'
0303 #' Set the directory under which RDDs are going to be checkpointed. The
0304 #' directory must be an HDFS path if running on a cluster.
0305 #'
0306 #' @param sc Spark Context to use
0307 #' @param dirName Directory path
0308 #' @noRd
0309 #' @examples
0310 #'\dontrun{
0311 #' sc <- sparkR.init()
0312 #' setCheckpointDir(sc, "~/checkpoint")
0313 #' rdd <- parallelize(sc, 1:2, 2L)
0314 #' checkpoint(rdd)
0315 #'}
0316 setCheckpointDirSC <- function(sc, dirName) {
0317   invisible(callJMethod(sc, "setCheckpointDir", suppressWarnings(normalizePath(dirName))))
0318 }
0319 
0320 #' Add a file or directory to be downloaded with this Spark job on every node.
0321 #'
0322 #' The path passed can be either a local file, a file in HDFS (or other Hadoop-supported
0323 #' filesystems), or an HTTP, HTTPS or FTP URI. To access the file in Spark jobs,
0324 #' use spark.getSparkFiles(fileName) to find its download location.
0325 #'
0326 #' A directory can be given if the recursive option is set to true.
0327 #' Currently directories are only supported for Hadoop-supported filesystems.
0328 #' Refer Hadoop-supported filesystems at
0329 #' \url{https://cwiki.apache.org/confluence/display/HADOOP2/HCFS}.
0330 #'
0331 #' Note: A path can be added only once. Subsequent additions of the same path are ignored.
0332 #'
0333 #' @rdname spark.addFile
0334 #' @param path The path of the file to be added
0335 #' @param recursive Whether to add files recursively from the path. Default is FALSE.
0336 #' @examples
0337 #'\dontrun{
0338 #' spark.addFile("~/myfile")
0339 #'}
0340 #' @note spark.addFile since 2.1.0
0341 spark.addFile <- function(path, recursive = FALSE) {
0342   sc <- getSparkContext()
0343   invisible(callJMethod(sc, "addFile", suppressWarnings(normalizePath(path)), recursive))
0344 }
0345 
0346 #' Get the root directory that contains files added through spark.addFile.
0347 #'
0348 #' @rdname spark.getSparkFilesRootDirectory
0349 #' @return the root directory that contains files added through spark.addFile
0350 #' @examples
0351 #'\dontrun{
0352 #' spark.getSparkFilesRootDirectory()
0353 #'}
0354 #' @note spark.getSparkFilesRootDirectory since 2.1.0
0355 spark.getSparkFilesRootDirectory <- function() { # nolint
0356   if (Sys.getenv("SPARKR_IS_RUNNING_ON_WORKER") == "") {
0357     # Running on driver.
0358     callJStatic("org.apache.spark.SparkFiles", "getRootDirectory")
0359   } else {
0360     # Running on worker.
0361     Sys.getenv("SPARKR_SPARKFILES_ROOT_DIR")
0362   }
0363 }
0364 
0365 #' Get the absolute path of a file added through spark.addFile.
0366 #'
0367 #' @rdname spark.getSparkFiles
0368 #' @param fileName The name of the file added through spark.addFile
0369 #' @return the absolute path of a file added through spark.addFile.
0370 #' @examples
0371 #'\dontrun{
0372 #' spark.getSparkFiles("myfile")
0373 #'}
0374 #' @note spark.getSparkFiles since 2.1.0
0375 spark.getSparkFiles <- function(fileName) {
0376   if (Sys.getenv("SPARKR_IS_RUNNING_ON_WORKER") == "") {
0377     # Running on driver.
0378     callJStatic("org.apache.spark.SparkFiles", "get", as.character(fileName))
0379   } else {
0380     # Running on worker.
0381     file.path(spark.getSparkFilesRootDirectory(), as.character(fileName))
0382   }
0383 }
0384 
0385 #' Run a function over a list of elements, distributing the computations with Spark
0386 #'
0387 #' Run a function over a list of elements, distributing the computations with Spark. Applies a
0388 #' function in a manner that is similar to doParallel or lapply to elements of a list.
0389 #' The computations are distributed using Spark. It is conceptually the same as the following code:
0390 #'   lapply(list, func)
0391 #'
0392 #' Known limitations:
0393 #' \itemize{
0394 #'    \item variable scoping and capture: compared to R's rich support for variable resolutions,
0395 #'    the distributed nature of SparkR limits how variables are resolved at runtime. All the
0396 #'    variables that are available through lexical scoping are embedded in the closure of the
0397 #'    function and available as read-only variables within the function. The environment variables
0398 #'    should be stored into temporary variables outside the function, and not directly accessed
0399 #'    within the function.
0400 #'
0401 #'   \item loading external packages: In order to use a package, you need to load it inside the
0402 #'   closure. For example, if you rely on the MASS module, here is how you would use it:
0403 #'   \preformatted{
0404 #'     train <- function(hyperparam) {
0405 #'       library(MASS)
0406 #'       lm.ridge("y ~ x+z", data, lambda=hyperparam)
0407 #'       model
0408 #'     }
0409 #'   }
0410 #' }
0411 #'
0412 #' @rdname spark.lapply
0413 #' @param list the list of elements
0414 #' @param func a function that takes one argument.
0415 #' @return a list of results (the exact type being determined by the function)
0416 #' @examples
0417 #'\dontrun{
0418 #' sparkR.session()
0419 #' doubled <- spark.lapply(1:10, function(x) {2 * x})
0420 #'}
0421 #' @note spark.lapply since 2.0.0
0422 spark.lapply <- function(list, func) {
0423   sc <- getSparkContext()
0424   rdd <- parallelize(sc, list, length(list))
0425   results <- map(rdd, func)
0426   local <- collectRDD(results)
0427   local
0428 }
0429 
0430 #' Set new log level
0431 #'
0432 #' Set new log level: "ALL", "DEBUG", "ERROR", "FATAL", "INFO", "OFF", "TRACE", "WARN"
0433 #'
0434 #' @rdname setLogLevel
0435 #' @param level New log level
0436 #' @examples
0437 #'\dontrun{
0438 #' setLogLevel("ERROR")
0439 #'}
0440 #' @note setLogLevel since 2.0.0
0441 setLogLevel <- function(level) {
0442   sc <- getSparkContext()
0443   invisible(callJMethod(sc, "setLogLevel", level))
0444 }
0445 
0446 #' Set checkpoint directory
0447 #'
0448 #' Set the directory under which SparkDataFrame are going to be checkpointed. The directory must be
0449 #' an HDFS path if running on a cluster.
0450 #'
0451 #' @rdname setCheckpointDir
0452 #' @param directory Directory path to checkpoint to
0453 #' @seealso \link{checkpoint}
0454 #' @examples
0455 #'\dontrun{
0456 #' setCheckpointDir("/checkpoint")
0457 #'}
0458 #' @note setCheckpointDir since 2.2.0
0459 setCheckpointDir <- function(directory) {
0460   sc <- getSparkContext()
0461   invisible(callJMethod(sc, "setCheckpointDir", suppressWarnings(normalizePath(directory))))
0462 }