Back to home page

OSCL-LXR

 
 

    


0001 #
0002 # Licensed to the Apache Software Foundation (ASF) under one or more
0003 # contributor license agreements.  See the NOTICE file distributed with
0004 # this work for additional information regarding copyright ownership.
0005 # The ASF licenses this file to You under the Apache License, Version 2.0
0006 # (the "License"); you may not use this file except in compliance with
0007 # the License.  You may obtain a copy of the License at
0008 #
0009 #    http://www.apache.org/licenses/LICENSE-2.0
0010 #
0011 # Unless required by applicable law or agreed to in writing, software
0012 # distributed under the License is distributed on an "AS IS" BASIS,
0013 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014 # See the License for the specific language governing permissions and
0015 # limitations under the License.
0016 #
0017 
0018 .sparkREnv <- new.env()
0019 
0020 # Utility function that returns TRUE if we have an active connection to the
0021 # backend and FALSE otherwise
0022 connExists <- function(env) {
0023   tryCatch({
0024     exists(".sparkRCon", envir = env) && isOpen(env[[".sparkRCon"]])
0025   },
0026   error = function(err) {
0027     return(FALSE)
0028   })
0029 }
0030 
0031 #' Stop the Spark Session and Spark Context
0032 #'
0033 #' Stop the Spark Session and Spark Context.
0034 #'
0035 #' Also terminates the backend this R session is connected to.
0036 #' @rdname sparkR.session.stop
0037 #' @name sparkR.session.stop
0038 #' @note sparkR.session.stop since 2.0.0
0039 sparkR.session.stop <- function() {
0040   env <- .sparkREnv
0041   if (exists(".sparkRCon", envir = env)) {
0042     if (exists(".sparkRjsc", envir = env)) {
0043       sc <- get(".sparkRjsc", envir = env)
0044       callJMethod(sc, "stop")
0045       rm(".sparkRjsc", envir = env)
0046 
0047       if (exists(".sparkRsession", envir = env)) {
0048         rm(".sparkRsession", envir = env)
0049       }
0050     }
0051 
0052     # Remove the R package lib path from .libPaths()
0053     if (exists(".libPath", envir = env)) {
0054       libPath <- get(".libPath", envir = env)
0055       .libPaths(.libPaths()[.libPaths() != libPath])
0056     }
0057 
0058     if (exists(".backendLaunched", envir = env)) {
0059       callJStatic("SparkRHandler", "stopBackend")
0060     }
0061 
0062     # Also close the connection and remove it from our env
0063     conn <- get(".sparkRCon", envir = env)
0064     close(conn)
0065 
0066     rm(".sparkRCon", envir = env)
0067     rm(".scStartTime", envir = env)
0068   }
0069 
0070   if (exists(".monitorConn", envir = env)) {
0071     conn <- get(".monitorConn", envir = env)
0072     close(conn)
0073     rm(".monitorConn", envir = env)
0074   }
0075 
0076   # Clear all broadcast variables we have
0077   # as the jobj will not be valid if we restart the JVM
0078   clearBroadcastVariables()
0079 
0080   # Clear jobj maps
0081   clearJobjs()
0082 }
0083 
0084 #' @rdname sparkR.session.stop
0085 #' @name sparkR.stop
0086 #' @note sparkR.stop since 1.4.0
0087 sparkR.stop <- function() {
0088   sparkR.session.stop()
0089 }
0090 
0091 #' (Deprecated) Initialize a new Spark Context
0092 #'
0093 #' This function initializes a new SparkContext.
0094 #'
0095 #' @param master The Spark master URL
0096 #' @param appName Application name to register with cluster manager
0097 #' @param sparkHome Spark Home directory
0098 #' @param sparkEnvir Named list of environment variables to set on worker nodes
0099 #' @param sparkExecutorEnv Named list of environment variables to be used when launching executors
0100 #' @param sparkJars Character vector of jar files to pass to the worker nodes
0101 #' @param sparkPackages Character vector of package coordinates
0102 #' @seealso \link{sparkR.session}
0103 #' @rdname sparkR.init-deprecated
0104 #' @examples
0105 #'\dontrun{
0106 #' sc <- sparkR.init("local[2]", "SparkR", "/home/spark")
0107 #' sc <- sparkR.init("local[2]", "SparkR", "/home/spark",
0108 #'                  list(spark.executor.memory="1g"))
0109 #' sc <- sparkR.init("yarn-client", "SparkR", "/home/spark",
0110 #'                  list(spark.executor.memory="4g"),
0111 #'                  list(LD_LIBRARY_PATH="/directory of JVM libraries (libjvm.so) on workers/"),
0112 #'                  c("one.jar", "two.jar", "three.jar"),
0113 #'                  c("com.databricks:spark-avro_2.11:2.0.1"))
0114 #'}
0115 #' @note sparkR.init since 1.4.0
0116 sparkR.init <- function(
0117   master = "",
0118   appName = "SparkR",
0119   sparkHome = Sys.getenv("SPARK_HOME"),
0120   sparkEnvir = list(),
0121   sparkExecutorEnv = list(),
0122   sparkJars = "",
0123   sparkPackages = "") {
0124   .Deprecated("sparkR.session")
0125   sparkR.sparkContext(master,
0126      appName,
0127      sparkHome,
0128      convertNamedListToEnv(sparkEnvir),
0129      convertNamedListToEnv(sparkExecutorEnv),
0130      sparkJars,
0131      sparkPackages)
0132 }
0133 
0134 # Internal function to handle creating the SparkContext.
0135 sparkR.sparkContext <- function(
0136   master = "",
0137   appName = "SparkR",
0138   sparkHome = Sys.getenv("SPARK_HOME"),
0139   sparkEnvirMap = new.env(),
0140   sparkExecutorEnvMap = new.env(),
0141   sparkJars = "",
0142   sparkPackages = "") {
0143 
0144   if (exists(".sparkRjsc", envir = .sparkREnv)) {
0145     cat(paste("Re-using existing Spark Context.",
0146               "Call sparkR.session.stop() or restart R to create a new Spark Context\n"))
0147     return(get(".sparkRjsc", envir = .sparkREnv))
0148   }
0149 
0150   jars <- processSparkJars(sparkJars)
0151   packages <- processSparkPackages(sparkPackages)
0152 
0153   existingPort <- Sys.getenv("EXISTING_SPARKR_BACKEND_PORT", "")
0154   connectionTimeout <- as.numeric(Sys.getenv("SPARKR_BACKEND_CONNECTION_TIMEOUT", "6000"))
0155   if (existingPort != "") {
0156     if (length(packages) != 0) {
0157       warning("sparkPackages has no effect when using spark-submit or sparkR shell, ",
0158               "please use the --packages commandline instead")
0159     }
0160     backendPort <- existingPort
0161     authSecret <- Sys.getenv("SPARKR_BACKEND_AUTH_SECRET")
0162     if (nchar(authSecret) == 0) {
0163       stop("Auth secret not provided in environment.")
0164     }
0165   } else {
0166     path <- tempfile(pattern = "backend_port")
0167     submitOps <- getClientModeSparkSubmitOpts(
0168         Sys.getenv("SPARKR_SUBMIT_ARGS", "sparkr-shell"),
0169         sparkEnvirMap)
0170     invisible(checkJavaVersion())
0171     launchBackend(
0172         args = path,
0173         sparkHome = sparkHome,
0174         jars = jars,
0175         sparkSubmitOpts = submitOps,
0176         packages = packages)
0177     # wait atmost 100 seconds for JVM to launch
0178     wait <- 0.1
0179     for (i in 1:25) {
0180       Sys.sleep(wait)
0181       if (file.exists(path)) {
0182         break
0183       }
0184       wait <- wait * 1.25
0185     }
0186     if (!file.exists(path)) {
0187       stop("JVM is not ready after 10 seconds")
0188     }
0189     f <- file(path, open = "rb")
0190     backendPort <- readInt(f)
0191     monitorPort <- readInt(f)
0192     rLibPath <- readString(f)
0193     connectionTimeout <- readInt(f)
0194 
0195     # Don't use readString() so that we can provide a useful
0196     # error message if the R and Java versions are mismatched.
0197     authSecretLen <- readInt(f)
0198     if (length(authSecretLen) == 0 || authSecretLen == 0) {
0199       stop("Unexpected EOF in JVM connection data. Mismatched versions?")
0200     }
0201     authSecret <- readStringData(f, authSecretLen)
0202     close(f)
0203     file.remove(path)
0204     if (length(backendPort) == 0 || backendPort == 0 ||
0205         length(monitorPort) == 0 || monitorPort == 0 ||
0206         length(rLibPath) != 1 || length(authSecret) == 0) {
0207       stop("JVM failed to launch")
0208     }
0209 
0210     monitorConn <- socketConnection(port = monitorPort, blocking = TRUE,
0211                                     timeout = connectionTimeout, open = "wb")
0212     doServerAuth(monitorConn, authSecret)
0213 
0214     assign(".monitorConn", monitorConn, envir = .sparkREnv)
0215     assign(".backendLaunched", 1, envir = .sparkREnv)
0216     if (rLibPath != "") {
0217       assign(".libPath", rLibPath, envir = .sparkREnv)
0218       .libPaths(c(rLibPath, .libPaths()))
0219     }
0220   }
0221 
0222   .sparkREnv$backendPort <- backendPort
0223   tryCatch({
0224     connectBackend("localhost", backendPort, timeout = connectionTimeout, authSecret = authSecret)
0225   },
0226   error = function(err) {
0227     stop("Failed to connect JVM\n")
0228   })
0229 
0230   if (nchar(sparkHome) != 0) {
0231     sparkHome <- suppressWarnings(normalizePath(sparkHome))
0232   }
0233 
0234   if (is.null(sparkExecutorEnvMap$LD_LIBRARY_PATH)) {
0235     sparkExecutorEnvMap[["LD_LIBRARY_PATH"]] <-
0236       paste0("$LD_LIBRARY_PATH:", Sys.getenv("LD_LIBRARY_PATH"))
0237   }
0238 
0239   # Classpath separator is ";" on Windows
0240   # URI needs four /// as from http://stackoverflow.com/a/18522792
0241   if (.Platform$OS.type == "unix") {
0242     uriSep <- "//"
0243   } else {
0244     uriSep <- "////"
0245   }
0246   localJarPaths <- lapply(jars,
0247                           function(j) { utils::URLencode(paste0("file:", uriSep, j)) })
0248 
0249   # Set the start time to identify jobjs
0250   # Seconds resolution is good enough for this purpose, so use ints
0251   assign(".scStartTime", as.integer(Sys.time()), envir = .sparkREnv)
0252 
0253   assign(
0254     ".sparkRjsc",
0255     callJStatic(
0256       "org.apache.spark.api.r.RRDD",
0257       "createSparkContext",
0258       master,
0259       appName,
0260       as.character(sparkHome),
0261       localJarPaths,
0262       sparkEnvirMap,
0263       sparkExecutorEnvMap),
0264     envir = .sparkREnv
0265   )
0266 
0267   sc <- get(".sparkRjsc", envir = .sparkREnv)
0268 
0269   # Register a finalizer to sleep 1 seconds on R exit to make RStudio happy
0270   reg.finalizer(.sparkREnv, function(x) { Sys.sleep(1) }, onexit = TRUE)
0271 
0272   sc
0273 }
0274 
0275 #' (Deprecated) Initialize a new SQLContext
0276 #'
0277 #' This function creates a SparkContext from an existing JavaSparkContext and
0278 #' then uses it to initialize a new SQLContext
0279 #'
0280 #' Starting SparkR 2.0, a SparkSession is initialized and returned instead.
0281 #' This API is deprecated and kept for backward compatibility only.
0282 #'
0283 #' @param jsc The existing JavaSparkContext created with SparkR.init()
0284 #' @seealso \link{sparkR.session}
0285 #' @rdname sparkRSQL.init-deprecated
0286 #' @examples
0287 #'\dontrun{
0288 #' sc <- sparkR.init()
0289 #' sqlContext <- sparkRSQL.init(sc)
0290 #'}
0291 #' @note sparkRSQL.init since 1.4.0
0292 sparkRSQL.init <- function(jsc = NULL) {
0293   .Deprecated("sparkR.session")
0294 
0295   if (exists(".sparkRsession", envir = .sparkREnv)) {
0296     return(get(".sparkRsession", envir = .sparkREnv))
0297   }
0298 
0299   # Default to without Hive support for backward compatibility.
0300   sparkR.session(enableHiveSupport = FALSE)
0301 }
0302 
0303 #' (Deprecated) Initialize a new HiveContext
0304 #'
0305 #' This function creates a HiveContext from an existing JavaSparkContext
0306 #'
0307 #' Starting SparkR 2.0, a SparkSession is initialized and returned instead.
0308 #' This API is deprecated and kept for backward compatibility only.
0309 #'
0310 #' @param jsc The existing JavaSparkContext created with SparkR.init()
0311 #' @seealso \link{sparkR.session}
0312 #' @rdname sparkRHive.init-deprecated
0313 #' @examples
0314 #'\dontrun{
0315 #' sc <- sparkR.init()
0316 #' sqlContext <- sparkRHive.init(sc)
0317 #'}
0318 #' @note sparkRHive.init since 1.4.0
0319 sparkRHive.init <- function(jsc = NULL) {
0320   .Deprecated("sparkR.session")
0321 
0322   if (exists(".sparkRsession", envir = .sparkREnv)) {
0323     return(get(".sparkRsession", envir = .sparkREnv))
0324   }
0325 
0326   # Default to without Hive support for backward compatibility.
0327   sparkR.session(enableHiveSupport = TRUE)
0328 }
0329 
0330 #' Get the existing SparkSession or initialize a new SparkSession.
0331 #'
0332 #' SparkSession is the entry point into SparkR. \code{sparkR.session} gets the existing
0333 #' SparkSession or initializes a new SparkSession.
0334 #' Additional Spark properties can be set in \code{...}, and these named parameters take priority
0335 #' over values in \code{master}, \code{appName}, named lists of \code{sparkConfig}.
0336 #'
0337 #' When called in an interactive session, this method checks for the Spark installation, and, if not
0338 #' found, it will be downloaded and cached automatically. Alternatively, \code{install.spark} can
0339 #' be called manually.
0340 #'
0341 #' A default warehouse is created automatically in the current directory when a managed table is
0342 #' created via \code{sql} statement \code{CREATE TABLE}, for example. To change the location of the
0343 #' warehouse, set the named parameter \code{spark.sql.warehouse.dir} to the SparkSession. Along with
0344 #' the warehouse, an accompanied metastore may also be automatically created in the current
0345 #' directory when a new SparkSession is initialized with \code{enableHiveSupport} set to
0346 #' \code{TRUE}, which is the default. For more details, refer to Hive configuration at
0347 #' \url{http://spark.apache.org/docs/latest/sql-programming-guide.html#hive-tables}.
0348 #'
0349 #' For details on how to initialize and use SparkR, refer to SparkR programming guide at
0350 #' \url{http://spark.apache.org/docs/latest/sparkr.html#starting-up-sparksession}.
0351 #'
0352 #' @param master the Spark master URL.
0353 #' @param appName application name to register with cluster manager.
0354 #' @param sparkHome Spark Home directory.
0355 #' @param sparkConfig named list of Spark configuration to set on worker nodes.
0356 #' @param sparkJars character vector of jar files to pass to the worker nodes.
0357 #' @param sparkPackages character vector of package coordinates
0358 #' @param enableHiveSupport enable support for Hive, fallback if not built with Hive support; once
0359 #'        set, this cannot be turned off on an existing session
0360 #' @param ... named Spark properties passed to the method.
0361 #' @examples
0362 #'\dontrun{
0363 #' sparkR.session()
0364 #' df <- read.json(path)
0365 #'
0366 #' sparkR.session("local[2]", "SparkR", "/home/spark")
0367 #' sparkR.session("yarn", "SparkR", "/home/spark",
0368 #'                list(spark.executor.memory="4g", spark.submit.deployMode="client"),
0369 #'                c("one.jar", "two.jar", "three.jar"),
0370 #'                c("com.databricks:spark-avro_2.12:2.0.1"))
0371 #' sparkR.session(spark.master = "yarn", spark.submit.deployMode = "client",
0372 #                 spark.executor.memory = "4g")
0373 #'}
0374 #' @note sparkR.session since 2.0.0
0375 sparkR.session <- function(
0376   master = "",
0377   appName = "SparkR",
0378   sparkHome = Sys.getenv("SPARK_HOME"),
0379   sparkConfig = list(),
0380   sparkJars = "",
0381   sparkPackages = "",
0382   enableHiveSupport = TRUE,
0383   ...) {
0384 
0385   sparkConfigMap <- convertNamedListToEnv(sparkConfig)
0386   namedParams <- list(...)
0387   if (length(namedParams) > 0) {
0388     paramMap <- convertNamedListToEnv(namedParams)
0389     # Override for certain named parameters
0390     if (exists("spark.master", envir = paramMap)) {
0391       master <- paramMap[["spark.master"]]
0392     }
0393     if (exists("spark.app.name", envir = paramMap)) {
0394       appName <- paramMap[["spark.app.name"]]
0395     }
0396     overrideEnvs(sparkConfigMap, paramMap)
0397   }
0398 
0399   deployMode <- ""
0400   if (exists("spark.submit.deployMode", envir = sparkConfigMap)) {
0401     deployMode <- sparkConfigMap[["spark.submit.deployMode"]]
0402   }
0403 
0404   if (!exists("spark.r.sql.derby.temp.dir", envir = sparkConfigMap)) {
0405     sparkConfigMap[["spark.r.sql.derby.temp.dir"]] <- tempdir()
0406   }
0407 
0408   if (!exists(".sparkRjsc", envir = .sparkREnv)) {
0409     retHome <- sparkCheckInstall(sparkHome, master, deployMode)
0410     if (!is.null(retHome)) sparkHome <- retHome
0411     sparkExecutorEnvMap <- new.env()
0412     sparkR.sparkContext(master, appName, sparkHome, sparkConfigMap, sparkExecutorEnvMap,
0413        sparkJars, sparkPackages)
0414     stopifnot(exists(".sparkRjsc", envir = .sparkREnv))
0415   }
0416 
0417   if (exists(".sparkRsession", envir = .sparkREnv)) {
0418     sparkSession <- get(".sparkRsession", envir = .sparkREnv)
0419     # Apply config to Spark Context and Spark Session if already there
0420     # Cannot change enableHiveSupport
0421     callJStatic("org.apache.spark.sql.api.r.SQLUtils",
0422                 "setSparkContextSessionConf",
0423                 sparkSession,
0424                 sparkConfigMap)
0425   } else {
0426     jsc <- get(".sparkRjsc", envir = .sparkREnv)
0427     sparkSession <- callJStatic("org.apache.spark.sql.api.r.SQLUtils",
0428                                 "getOrCreateSparkSession",
0429                                 jsc,
0430                                 sparkConfigMap,
0431                                 enableHiveSupport)
0432     assign(".sparkRsession", sparkSession, envir = .sparkREnv)
0433   }
0434 
0435   # Check if version number of SparkSession matches version number of SparkR package
0436   jvmVersion <- callJMethod(sparkSession, "version")
0437   # Remove -SNAPSHOT from jvm versions
0438   jvmVersionStrip <- gsub("-SNAPSHOT", "", jvmVersion, fixed = TRUE)
0439   rPackageVersion <- paste0(packageVersion("SparkR"))
0440 
0441   if (jvmVersionStrip != rPackageVersion) {
0442     warning("Version mismatch between Spark JVM and SparkR package. ",
0443             "JVM version was ", jvmVersion,
0444             ", while R package version was ", rPackageVersion)
0445   }
0446 
0447   sparkSession
0448 }
0449 
0450 #' Get the URL of the SparkUI instance for the current active SparkSession
0451 #'
0452 #' Get the URL of the SparkUI instance for the current active SparkSession.
0453 #'
0454 #' @return the SparkUI URL, or NA if it is disabled, or not started.
0455 #' @rdname sparkR.uiWebUrl
0456 #' @name sparkR.uiWebUrl
0457 #' @examples
0458 #'\dontrun{
0459 #' sparkR.session()
0460 #' url <- sparkR.uiWebUrl()
0461 #' }
0462 #' @note sparkR.uiWebUrl since 2.1.1
0463 sparkR.uiWebUrl <- function() {
0464   sc <- sparkR.callJMethod(getSparkContext(), "sc")
0465   u <- callJMethod(sc, "uiWebUrl")
0466   if (callJMethod(u, "isDefined")) {
0467     callJMethod(u, "get")
0468   } else {
0469     NA
0470   }
0471 }
0472 
0473 #' Assigns a group ID to all the jobs started by this thread until the group ID is set to a
0474 #' different value or cleared.
0475 #'
0476 #' @param groupId the ID to be assigned to job groups.
0477 #' @param description description for the job group ID.
0478 #' @param interruptOnCancel flag to indicate if the job is interrupted on job cancellation.
0479 #' @rdname setJobGroup
0480 #' @name setJobGroup
0481 #' @examples
0482 #'\dontrun{
0483 #' sparkR.session()
0484 #' setJobGroup("myJobGroup", "My job group description", TRUE)
0485 #'}
0486 #' @note setJobGroup since 1.5.0
0487 setJobGroup <- function(groupId, description, interruptOnCancel) {
0488   sc <- getSparkContext()
0489   invisible(callJMethod(sc, "setJobGroup", groupId, description, interruptOnCancel))
0490 }
0491 
0492 #' Clear current job group ID and its description
0493 #'
0494 #' @rdname clearJobGroup
0495 #' @name clearJobGroup
0496 #' @examples
0497 #'\dontrun{
0498 #' sparkR.session()
0499 #' clearJobGroup()
0500 #'}
0501 #' @note clearJobGroup since 1.5.0
0502 clearJobGroup <- function() {
0503   sc <- getSparkContext()
0504   invisible(callJMethod(sc, "clearJobGroup"))
0505 }
0506 
0507 #' Cancel active jobs for the specified group
0508 #'
0509 #' @param groupId the ID of job group to be cancelled
0510 #' @rdname cancelJobGroup
0511 #' @name cancelJobGroup
0512 #' @examples
0513 #'\dontrun{
0514 #' sparkR.session()
0515 #' cancelJobGroup("myJobGroup")
0516 #'}
0517 #' @note cancelJobGroup since 1.5.0
0518 cancelJobGroup <- function(groupId) {
0519   sc <- getSparkContext()
0520   invisible(callJMethod(sc, "cancelJobGroup", groupId))
0521 }
0522 
0523 #' Set a human readable description of the current job.
0524 #'
0525 #' Set a description that is shown as a job description in UI.
0526 #'
0527 #' @param value The job description of the current job.
0528 #' @rdname setJobDescription
0529 #' @name setJobDescription
0530 #' @examples
0531 #'\dontrun{
0532 #' setJobDescription("This is an example job.")
0533 #'}
0534 #' @note setJobDescription since 2.3.0
0535 setJobDescription <- function(value) {
0536   if (!is.null(value)) {
0537     value <- as.character(value)
0538   }
0539   sc <- getSparkContext()
0540   invisible(callJMethod(sc, "setJobDescription", value))
0541 }
0542 
0543 #' Set a local property that affects jobs submitted from this thread, such as the
0544 #' Spark fair scheduler pool.
0545 #'
0546 #' @param key The key for a local property.
0547 #' @param value The value for a local property.
0548 #' @rdname setLocalProperty
0549 #' @name setLocalProperty
0550 #' @examples
0551 #'\dontrun{
0552 #' setLocalProperty("spark.scheduler.pool", "poolA")
0553 #'}
0554 #' @note setLocalProperty since 2.3.0
0555 setLocalProperty <- function(key, value) {
0556   if (is.null(key) || is.na(key)) {
0557     stop("key should not be NULL or NA.")
0558   }
0559   if (!is.null(value)) {
0560     value <- as.character(value)
0561   }
0562   sc <- getSparkContext()
0563   invisible(callJMethod(sc, "setLocalProperty", as.character(key), value))
0564 }
0565 
0566 #' Get a local property set in this thread, or \code{NULL} if it is missing. See
0567 #' \code{setLocalProperty}.
0568 #'
0569 #' @param key The key for a local property.
0570 #' @rdname getLocalProperty
0571 #' @name getLocalProperty
0572 #' @examples
0573 #'\dontrun{
0574 #' getLocalProperty("spark.scheduler.pool")
0575 #'}
0576 #' @note getLocalProperty since 2.3.0
0577 getLocalProperty <- function(key) {
0578   if (is.null(key) || is.na(key)) {
0579     stop("key should not be NULL or NA.")
0580   }
0581   sc <- getSparkContext()
0582   callJMethod(sc, "getLocalProperty", as.character(key))
0583 }
0584 
0585 sparkConfToSubmitOps <- new.env()
0586 sparkConfToSubmitOps[["spark.driver.memory"]]           <- "--driver-memory"
0587 sparkConfToSubmitOps[["spark.driver.extraClassPath"]]   <- "--driver-class-path"
0588 sparkConfToSubmitOps[["spark.driver.extraJavaOptions"]] <- "--driver-java-options"
0589 sparkConfToSubmitOps[["spark.driver.extraLibraryPath"]] <- "--driver-library-path"
0590 sparkConfToSubmitOps[["spark.master"]] <- "--master"
0591 sparkConfToSubmitOps[["spark.yarn.keytab"]] <- "--keytab"
0592 sparkConfToSubmitOps[["spark.yarn.principal"]] <- "--principal"
0593 sparkConfToSubmitOps[["spark.kerberos.keytab"]] <- "--keytab"
0594 sparkConfToSubmitOps[["spark.kerberos.principal"]] <- "--principal"
0595 
0596 
0597 # Utility function that returns Spark Submit arguments as a string
0598 #
0599 # A few Spark Application and Runtime environment properties cannot take effect after driver
0600 # JVM has started, as documented in:
0601 # http://spark.apache.org/docs/latest/configuration.html#application-properties
0602 # When starting SparkR without using spark-submit, for example, from Rstudio, add them to
0603 # spark-submit commandline if not already set in SPARKR_SUBMIT_ARGS so that they can be effective.
0604 getClientModeSparkSubmitOpts <- function(submitOps, sparkEnvirMap) {
0605   envirToOps <- lapply(ls(sparkConfToSubmitOps), function(conf) {
0606     opsValue <- sparkEnvirMap[[conf]]
0607     # process only if --option is not already specified
0608     if (!is.null(opsValue) &&
0609         nchar(opsValue) > 1 &&
0610         !grepl(sparkConfToSubmitOps[[conf]], submitOps, fixed = TRUE)) {
0611       # put "" around value in case it has spaces
0612       paste0(sparkConfToSubmitOps[[conf]], " \"", opsValue, "\" ")
0613     } else {
0614       ""
0615     }
0616   })
0617   # --option must be before the application class "sparkr-shell" in submitOps
0618   paste0(paste0(envirToOps, collapse = ""), submitOps)
0619 }
0620 
0621 # Utility function that handles sparkJars argument, and normalize paths
0622 processSparkJars <- function(jars) {
0623   splittedJars <- splitString(jars)
0624   if (length(splittedJars) > length(jars)) {
0625     warning("sparkJars as a comma-separated string is deprecated, use character vector instead")
0626   }
0627   normalized <- suppressWarnings(normalizePath(splittedJars))
0628   normalized
0629 }
0630 
0631 # Utility function that handles sparkPackages argument
0632 processSparkPackages <- function(packages) {
0633   splittedPackages <- splitString(packages)
0634   if (length(splittedPackages) > length(packages)) {
0635     warning("sparkPackages as a comma-separated string is deprecated, use character vector instead")
0636   }
0637   splittedPackages
0638 }
0639 
0640 # Utility function that checks and install Spark to local folder if not found
0641 #
0642 # Installation will not be triggered if it's called from sparkR shell
0643 # or if the master url is not local
0644 #
0645 # @param sparkHome directory to find Spark package.
0646 # @param master the Spark master URL, used to check local or remote mode.
0647 # @param deployMode whether to deploy your driver on the worker nodes (cluster)
0648 #        or locally as an external client (client).
0649 # @return NULL if no need to update sparkHome, and new sparkHome otherwise.
0650 sparkCheckInstall <- function(sparkHome, master, deployMode) {
0651   if (!isSparkRShell()) {
0652     if (!is.na(file.info(sparkHome)$isdir)) {
0653       message("Spark package found in SPARK_HOME: ", sparkHome)
0654       NULL
0655     } else {
0656       if (interactive() || isMasterLocal(master)) {
0657         message("Spark not found in SPARK_HOME: ", sparkHome)
0658         packageLocalDir <- install.spark()
0659         packageLocalDir
0660       } else if (isClientMode(master) || deployMode == "client") {
0661         msg <- paste0("Spark not found in SPARK_HOME: ",
0662                       sparkHome, "\n", installInstruction("remote"))
0663         stop(msg)
0664       } else {
0665         NULL
0666       }
0667     }
0668   } else {
0669     NULL
0670   }
0671 }
0672 
0673 # Utility function for sending auth data over a socket and checking the server's reply.
0674 doServerAuth <- function(con, authSecret) {
0675   if (nchar(authSecret) == 0) {
0676     stop("Auth secret not provided.")
0677   }
0678   writeString(con, authSecret)
0679   flush(con)
0680   reply <- readString(con)
0681   if (reply != "ok") {
0682     close(con)
0683     stop("Unexpected reply from server.")
0684   }
0685 }