0001 #
0002 # Licensed to the Apache Software Foundation (ASF) under one or more
0003 # contributor license agreements. See the NOTICE file distributed with
0004 # this work for additional information regarding copyright ownership.
0005 # The ASF licenses this file to You under the Apache License, Version 2.0
0006 # (the "License"); you may not use this file except in compliance with
0007 # the License. You may obtain a copy of the License at
0008 #
0009 # http://www.apache.org/licenses/LICENSE-2.0
0010 #
0011 # Unless required by applicable law or agreed to in writing, software
0012 # distributed under the License is distributed on an "AS IS" BASIS,
0013 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014 # See the License for the specific language governing permissions and
0015 # limitations under the License.
0016 #
0017
0018 .sparkREnv <- new.env()
0019
0020 # Utility function that returns TRUE if we have an active connection to the
0021 # backend and FALSE otherwise
0022 connExists <- function(env) {
0023 tryCatch({
0024 exists(".sparkRCon", envir = env) && isOpen(env[[".sparkRCon"]])
0025 },
0026 error = function(err) {
0027 return(FALSE)
0028 })
0029 }
0030
0031 #' Stop the Spark Session and Spark Context
0032 #'
0033 #' Stop the Spark Session and Spark Context.
0034 #'
0035 #' Also terminates the backend this R session is connected to.
0036 #' @rdname sparkR.session.stop
0037 #' @name sparkR.session.stop
0038 #' @note sparkR.session.stop since 2.0.0
0039 sparkR.session.stop <- function() {
0040 env <- .sparkREnv
0041 if (exists(".sparkRCon", envir = env)) {
0042 if (exists(".sparkRjsc", envir = env)) {
0043 sc <- get(".sparkRjsc", envir = env)
0044 callJMethod(sc, "stop")
0045 rm(".sparkRjsc", envir = env)
0046
0047 if (exists(".sparkRsession", envir = env)) {
0048 rm(".sparkRsession", envir = env)
0049 }
0050 }
0051
0052 # Remove the R package lib path from .libPaths()
0053 if (exists(".libPath", envir = env)) {
0054 libPath <- get(".libPath", envir = env)
0055 .libPaths(.libPaths()[.libPaths() != libPath])
0056 }
0057
0058 if (exists(".backendLaunched", envir = env)) {
0059 callJStatic("SparkRHandler", "stopBackend")
0060 }
0061
0062 # Also close the connection and remove it from our env
0063 conn <- get(".sparkRCon", envir = env)
0064 close(conn)
0065
0066 rm(".sparkRCon", envir = env)
0067 rm(".scStartTime", envir = env)
0068 }
0069
0070 if (exists(".monitorConn", envir = env)) {
0071 conn <- get(".monitorConn", envir = env)
0072 close(conn)
0073 rm(".monitorConn", envir = env)
0074 }
0075
0076 # Clear all broadcast variables we have
0077 # as the jobj will not be valid if we restart the JVM
0078 clearBroadcastVariables()
0079
0080 # Clear jobj maps
0081 clearJobjs()
0082 }
0083
0084 #' @rdname sparkR.session.stop
0085 #' @name sparkR.stop
0086 #' @note sparkR.stop since 1.4.0
0087 sparkR.stop <- function() {
0088 sparkR.session.stop()
0089 }
0090
0091 #' (Deprecated) Initialize a new Spark Context
0092 #'
0093 #' This function initializes a new SparkContext.
0094 #'
0095 #' @param master The Spark master URL
0096 #' @param appName Application name to register with cluster manager
0097 #' @param sparkHome Spark Home directory
0098 #' @param sparkEnvir Named list of environment variables to set on worker nodes
0099 #' @param sparkExecutorEnv Named list of environment variables to be used when launching executors
0100 #' @param sparkJars Character vector of jar files to pass to the worker nodes
0101 #' @param sparkPackages Character vector of package coordinates
0102 #' @seealso \link{sparkR.session}
0103 #' @rdname sparkR.init-deprecated
0104 #' @examples
0105 #'\dontrun{
0106 #' sc <- sparkR.init("local[2]", "SparkR", "/home/spark")
0107 #' sc <- sparkR.init("local[2]", "SparkR", "/home/spark",
0108 #' list(spark.executor.memory="1g"))
0109 #' sc <- sparkR.init("yarn-client", "SparkR", "/home/spark",
0110 #' list(spark.executor.memory="4g"),
0111 #' list(LD_LIBRARY_PATH="/directory of JVM libraries (libjvm.so) on workers/"),
0112 #' c("one.jar", "two.jar", "three.jar"),
0113 #' c("com.databricks:spark-avro_2.11:2.0.1"))
0114 #'}
0115 #' @note sparkR.init since 1.4.0
0116 sparkR.init <- function(
0117 master = "",
0118 appName = "SparkR",
0119 sparkHome = Sys.getenv("SPARK_HOME"),
0120 sparkEnvir = list(),
0121 sparkExecutorEnv = list(),
0122 sparkJars = "",
0123 sparkPackages = "") {
0124 .Deprecated("sparkR.session")
0125 sparkR.sparkContext(master,
0126 appName,
0127 sparkHome,
0128 convertNamedListToEnv(sparkEnvir),
0129 convertNamedListToEnv(sparkExecutorEnv),
0130 sparkJars,
0131 sparkPackages)
0132 }
0133
0134 # Internal function to handle creating the SparkContext.
0135 sparkR.sparkContext <- function(
0136 master = "",
0137 appName = "SparkR",
0138 sparkHome = Sys.getenv("SPARK_HOME"),
0139 sparkEnvirMap = new.env(),
0140 sparkExecutorEnvMap = new.env(),
0141 sparkJars = "",
0142 sparkPackages = "") {
0143
0144 if (exists(".sparkRjsc", envir = .sparkREnv)) {
0145 cat(paste("Re-using existing Spark Context.",
0146 "Call sparkR.session.stop() or restart R to create a new Spark Context\n"))
0147 return(get(".sparkRjsc", envir = .sparkREnv))
0148 }
0149
0150 jars <- processSparkJars(sparkJars)
0151 packages <- processSparkPackages(sparkPackages)
0152
0153 existingPort <- Sys.getenv("EXISTING_SPARKR_BACKEND_PORT", "")
0154 connectionTimeout <- as.numeric(Sys.getenv("SPARKR_BACKEND_CONNECTION_TIMEOUT", "6000"))
0155 if (existingPort != "") {
0156 if (length(packages) != 0) {
0157 warning("sparkPackages has no effect when using spark-submit or sparkR shell, ",
0158 "please use the --packages commandline instead")
0159 }
0160 backendPort <- existingPort
0161 authSecret <- Sys.getenv("SPARKR_BACKEND_AUTH_SECRET")
0162 if (nchar(authSecret) == 0) {
0163 stop("Auth secret not provided in environment.")
0164 }
0165 } else {
0166 path <- tempfile(pattern = "backend_port")
0167 submitOps <- getClientModeSparkSubmitOpts(
0168 Sys.getenv("SPARKR_SUBMIT_ARGS", "sparkr-shell"),
0169 sparkEnvirMap)
0170 invisible(checkJavaVersion())
0171 launchBackend(
0172 args = path,
0173 sparkHome = sparkHome,
0174 jars = jars,
0175 sparkSubmitOpts = submitOps,
0176 packages = packages)
0177 # wait atmost 100 seconds for JVM to launch
0178 wait <- 0.1
0179 for (i in 1:25) {
0180 Sys.sleep(wait)
0181 if (file.exists(path)) {
0182 break
0183 }
0184 wait <- wait * 1.25
0185 }
0186 if (!file.exists(path)) {
0187 stop("JVM is not ready after 10 seconds")
0188 }
0189 f <- file(path, open = "rb")
0190 backendPort <- readInt(f)
0191 monitorPort <- readInt(f)
0192 rLibPath <- readString(f)
0193 connectionTimeout <- readInt(f)
0194
0195 # Don't use readString() so that we can provide a useful
0196 # error message if the R and Java versions are mismatched.
0197 authSecretLen <- readInt(f)
0198 if (length(authSecretLen) == 0 || authSecretLen == 0) {
0199 stop("Unexpected EOF in JVM connection data. Mismatched versions?")
0200 }
0201 authSecret <- readStringData(f, authSecretLen)
0202 close(f)
0203 file.remove(path)
0204 if (length(backendPort) == 0 || backendPort == 0 ||
0205 length(monitorPort) == 0 || monitorPort == 0 ||
0206 length(rLibPath) != 1 || length(authSecret) == 0) {
0207 stop("JVM failed to launch")
0208 }
0209
0210 monitorConn <- socketConnection(port = monitorPort, blocking = TRUE,
0211 timeout = connectionTimeout, open = "wb")
0212 doServerAuth(monitorConn, authSecret)
0213
0214 assign(".monitorConn", monitorConn, envir = .sparkREnv)
0215 assign(".backendLaunched", 1, envir = .sparkREnv)
0216 if (rLibPath != "") {
0217 assign(".libPath", rLibPath, envir = .sparkREnv)
0218 .libPaths(c(rLibPath, .libPaths()))
0219 }
0220 }
0221
0222 .sparkREnv$backendPort <- backendPort
0223 tryCatch({
0224 connectBackend("localhost", backendPort, timeout = connectionTimeout, authSecret = authSecret)
0225 },
0226 error = function(err) {
0227 stop("Failed to connect JVM\n")
0228 })
0229
0230 if (nchar(sparkHome) != 0) {
0231 sparkHome <- suppressWarnings(normalizePath(sparkHome))
0232 }
0233
0234 if (is.null(sparkExecutorEnvMap$LD_LIBRARY_PATH)) {
0235 sparkExecutorEnvMap[["LD_LIBRARY_PATH"]] <-
0236 paste0("$LD_LIBRARY_PATH:", Sys.getenv("LD_LIBRARY_PATH"))
0237 }
0238
0239 # Classpath separator is ";" on Windows
0240 # URI needs four /// as from http://stackoverflow.com/a/18522792
0241 if (.Platform$OS.type == "unix") {
0242 uriSep <- "//"
0243 } else {
0244 uriSep <- "////"
0245 }
0246 localJarPaths <- lapply(jars,
0247 function(j) { utils::URLencode(paste0("file:", uriSep, j)) })
0248
0249 # Set the start time to identify jobjs
0250 # Seconds resolution is good enough for this purpose, so use ints
0251 assign(".scStartTime", as.integer(Sys.time()), envir = .sparkREnv)
0252
0253 assign(
0254 ".sparkRjsc",
0255 callJStatic(
0256 "org.apache.spark.api.r.RRDD",
0257 "createSparkContext",
0258 master,
0259 appName,
0260 as.character(sparkHome),
0261 localJarPaths,
0262 sparkEnvirMap,
0263 sparkExecutorEnvMap),
0264 envir = .sparkREnv
0265 )
0266
0267 sc <- get(".sparkRjsc", envir = .sparkREnv)
0268
0269 # Register a finalizer to sleep 1 seconds on R exit to make RStudio happy
0270 reg.finalizer(.sparkREnv, function(x) { Sys.sleep(1) }, onexit = TRUE)
0271
0272 sc
0273 }
0274
0275 #' (Deprecated) Initialize a new SQLContext
0276 #'
0277 #' This function creates a SparkContext from an existing JavaSparkContext and
0278 #' then uses it to initialize a new SQLContext
0279 #'
0280 #' Starting SparkR 2.0, a SparkSession is initialized and returned instead.
0281 #' This API is deprecated and kept for backward compatibility only.
0282 #'
0283 #' @param jsc The existing JavaSparkContext created with SparkR.init()
0284 #' @seealso \link{sparkR.session}
0285 #' @rdname sparkRSQL.init-deprecated
0286 #' @examples
0287 #'\dontrun{
0288 #' sc <- sparkR.init()
0289 #' sqlContext <- sparkRSQL.init(sc)
0290 #'}
0291 #' @note sparkRSQL.init since 1.4.0
0292 sparkRSQL.init <- function(jsc = NULL) {
0293 .Deprecated("sparkR.session")
0294
0295 if (exists(".sparkRsession", envir = .sparkREnv)) {
0296 return(get(".sparkRsession", envir = .sparkREnv))
0297 }
0298
0299 # Default to without Hive support for backward compatibility.
0300 sparkR.session(enableHiveSupport = FALSE)
0301 }
0302
0303 #' (Deprecated) Initialize a new HiveContext
0304 #'
0305 #' This function creates a HiveContext from an existing JavaSparkContext
0306 #'
0307 #' Starting SparkR 2.0, a SparkSession is initialized and returned instead.
0308 #' This API is deprecated and kept for backward compatibility only.
0309 #'
0310 #' @param jsc The existing JavaSparkContext created with SparkR.init()
0311 #' @seealso \link{sparkR.session}
0312 #' @rdname sparkRHive.init-deprecated
0313 #' @examples
0314 #'\dontrun{
0315 #' sc <- sparkR.init()
0316 #' sqlContext <- sparkRHive.init(sc)
0317 #'}
0318 #' @note sparkRHive.init since 1.4.0
0319 sparkRHive.init <- function(jsc = NULL) {
0320 .Deprecated("sparkR.session")
0321
0322 if (exists(".sparkRsession", envir = .sparkREnv)) {
0323 return(get(".sparkRsession", envir = .sparkREnv))
0324 }
0325
0326 # Default to without Hive support for backward compatibility.
0327 sparkR.session(enableHiveSupport = TRUE)
0328 }
0329
0330 #' Get the existing SparkSession or initialize a new SparkSession.
0331 #'
0332 #' SparkSession is the entry point into SparkR. \code{sparkR.session} gets the existing
0333 #' SparkSession or initializes a new SparkSession.
0334 #' Additional Spark properties can be set in \code{...}, and these named parameters take priority
0335 #' over values in \code{master}, \code{appName}, named lists of \code{sparkConfig}.
0336 #'
0337 #' When called in an interactive session, this method checks for the Spark installation, and, if not
0338 #' found, it will be downloaded and cached automatically. Alternatively, \code{install.spark} can
0339 #' be called manually.
0340 #'
0341 #' A default warehouse is created automatically in the current directory when a managed table is
0342 #' created via \code{sql} statement \code{CREATE TABLE}, for example. To change the location of the
0343 #' warehouse, set the named parameter \code{spark.sql.warehouse.dir} to the SparkSession. Along with
0344 #' the warehouse, an accompanied metastore may also be automatically created in the current
0345 #' directory when a new SparkSession is initialized with \code{enableHiveSupport} set to
0346 #' \code{TRUE}, which is the default. For more details, refer to Hive configuration at
0347 #' \url{http://spark.apache.org/docs/latest/sql-programming-guide.html#hive-tables}.
0348 #'
0349 #' For details on how to initialize and use SparkR, refer to SparkR programming guide at
0350 #' \url{http://spark.apache.org/docs/latest/sparkr.html#starting-up-sparksession}.
0351 #'
0352 #' @param master the Spark master URL.
0353 #' @param appName application name to register with cluster manager.
0354 #' @param sparkHome Spark Home directory.
0355 #' @param sparkConfig named list of Spark configuration to set on worker nodes.
0356 #' @param sparkJars character vector of jar files to pass to the worker nodes.
0357 #' @param sparkPackages character vector of package coordinates
0358 #' @param enableHiveSupport enable support for Hive, fallback if not built with Hive support; once
0359 #' set, this cannot be turned off on an existing session
0360 #' @param ... named Spark properties passed to the method.
0361 #' @examples
0362 #'\dontrun{
0363 #' sparkR.session()
0364 #' df <- read.json(path)
0365 #'
0366 #' sparkR.session("local[2]", "SparkR", "/home/spark")
0367 #' sparkR.session("yarn", "SparkR", "/home/spark",
0368 #' list(spark.executor.memory="4g", spark.submit.deployMode="client"),
0369 #' c("one.jar", "two.jar", "three.jar"),
0370 #' c("com.databricks:spark-avro_2.12:2.0.1"))
0371 #' sparkR.session(spark.master = "yarn", spark.submit.deployMode = "client",
0372 # spark.executor.memory = "4g")
0373 #'}
0374 #' @note sparkR.session since 2.0.0
0375 sparkR.session <- function(
0376 master = "",
0377 appName = "SparkR",
0378 sparkHome = Sys.getenv("SPARK_HOME"),
0379 sparkConfig = list(),
0380 sparkJars = "",
0381 sparkPackages = "",
0382 enableHiveSupport = TRUE,
0383 ...) {
0384
0385 sparkConfigMap <- convertNamedListToEnv(sparkConfig)
0386 namedParams <- list(...)
0387 if (length(namedParams) > 0) {
0388 paramMap <- convertNamedListToEnv(namedParams)
0389 # Override for certain named parameters
0390 if (exists("spark.master", envir = paramMap)) {
0391 master <- paramMap[["spark.master"]]
0392 }
0393 if (exists("spark.app.name", envir = paramMap)) {
0394 appName <- paramMap[["spark.app.name"]]
0395 }
0396 overrideEnvs(sparkConfigMap, paramMap)
0397 }
0398
0399 deployMode <- ""
0400 if (exists("spark.submit.deployMode", envir = sparkConfigMap)) {
0401 deployMode <- sparkConfigMap[["spark.submit.deployMode"]]
0402 }
0403
0404 if (!exists("spark.r.sql.derby.temp.dir", envir = sparkConfigMap)) {
0405 sparkConfigMap[["spark.r.sql.derby.temp.dir"]] <- tempdir()
0406 }
0407
0408 if (!exists(".sparkRjsc", envir = .sparkREnv)) {
0409 retHome <- sparkCheckInstall(sparkHome, master, deployMode)
0410 if (!is.null(retHome)) sparkHome <- retHome
0411 sparkExecutorEnvMap <- new.env()
0412 sparkR.sparkContext(master, appName, sparkHome, sparkConfigMap, sparkExecutorEnvMap,
0413 sparkJars, sparkPackages)
0414 stopifnot(exists(".sparkRjsc", envir = .sparkREnv))
0415 }
0416
0417 if (exists(".sparkRsession", envir = .sparkREnv)) {
0418 sparkSession <- get(".sparkRsession", envir = .sparkREnv)
0419 # Apply config to Spark Context and Spark Session if already there
0420 # Cannot change enableHiveSupport
0421 callJStatic("org.apache.spark.sql.api.r.SQLUtils",
0422 "setSparkContextSessionConf",
0423 sparkSession,
0424 sparkConfigMap)
0425 } else {
0426 jsc <- get(".sparkRjsc", envir = .sparkREnv)
0427 sparkSession <- callJStatic("org.apache.spark.sql.api.r.SQLUtils",
0428 "getOrCreateSparkSession",
0429 jsc,
0430 sparkConfigMap,
0431 enableHiveSupport)
0432 assign(".sparkRsession", sparkSession, envir = .sparkREnv)
0433 }
0434
0435 # Check if version number of SparkSession matches version number of SparkR package
0436 jvmVersion <- callJMethod(sparkSession, "version")
0437 # Remove -SNAPSHOT from jvm versions
0438 jvmVersionStrip <- gsub("-SNAPSHOT", "", jvmVersion, fixed = TRUE)
0439 rPackageVersion <- paste0(packageVersion("SparkR"))
0440
0441 if (jvmVersionStrip != rPackageVersion) {
0442 warning("Version mismatch between Spark JVM and SparkR package. ",
0443 "JVM version was ", jvmVersion,
0444 ", while R package version was ", rPackageVersion)
0445 }
0446
0447 sparkSession
0448 }
0449
0450 #' Get the URL of the SparkUI instance for the current active SparkSession
0451 #'
0452 #' Get the URL of the SparkUI instance for the current active SparkSession.
0453 #'
0454 #' @return the SparkUI URL, or NA if it is disabled, or not started.
0455 #' @rdname sparkR.uiWebUrl
0456 #' @name sparkR.uiWebUrl
0457 #' @examples
0458 #'\dontrun{
0459 #' sparkR.session()
0460 #' url <- sparkR.uiWebUrl()
0461 #' }
0462 #' @note sparkR.uiWebUrl since 2.1.1
0463 sparkR.uiWebUrl <- function() {
0464 sc <- sparkR.callJMethod(getSparkContext(), "sc")
0465 u <- callJMethod(sc, "uiWebUrl")
0466 if (callJMethod(u, "isDefined")) {
0467 callJMethod(u, "get")
0468 } else {
0469 NA
0470 }
0471 }
0472
0473 #' Assigns a group ID to all the jobs started by this thread until the group ID is set to a
0474 #' different value or cleared.
0475 #'
0476 #' @param groupId the ID to be assigned to job groups.
0477 #' @param description description for the job group ID.
0478 #' @param interruptOnCancel flag to indicate if the job is interrupted on job cancellation.
0479 #' @rdname setJobGroup
0480 #' @name setJobGroup
0481 #' @examples
0482 #'\dontrun{
0483 #' sparkR.session()
0484 #' setJobGroup("myJobGroup", "My job group description", TRUE)
0485 #'}
0486 #' @note setJobGroup since 1.5.0
0487 setJobGroup <- function(groupId, description, interruptOnCancel) {
0488 sc <- getSparkContext()
0489 invisible(callJMethod(sc, "setJobGroup", groupId, description, interruptOnCancel))
0490 }
0491
0492 #' Clear current job group ID and its description
0493 #'
0494 #' @rdname clearJobGroup
0495 #' @name clearJobGroup
0496 #' @examples
0497 #'\dontrun{
0498 #' sparkR.session()
0499 #' clearJobGroup()
0500 #'}
0501 #' @note clearJobGroup since 1.5.0
0502 clearJobGroup <- function() {
0503 sc <- getSparkContext()
0504 invisible(callJMethod(sc, "clearJobGroup"))
0505 }
0506
0507 #' Cancel active jobs for the specified group
0508 #'
0509 #' @param groupId the ID of job group to be cancelled
0510 #' @rdname cancelJobGroup
0511 #' @name cancelJobGroup
0512 #' @examples
0513 #'\dontrun{
0514 #' sparkR.session()
0515 #' cancelJobGroup("myJobGroup")
0516 #'}
0517 #' @note cancelJobGroup since 1.5.0
0518 cancelJobGroup <- function(groupId) {
0519 sc <- getSparkContext()
0520 invisible(callJMethod(sc, "cancelJobGroup", groupId))
0521 }
0522
0523 #' Set a human readable description of the current job.
0524 #'
0525 #' Set a description that is shown as a job description in UI.
0526 #'
0527 #' @param value The job description of the current job.
0528 #' @rdname setJobDescription
0529 #' @name setJobDescription
0530 #' @examples
0531 #'\dontrun{
0532 #' setJobDescription("This is an example job.")
0533 #'}
0534 #' @note setJobDescription since 2.3.0
0535 setJobDescription <- function(value) {
0536 if (!is.null(value)) {
0537 value <- as.character(value)
0538 }
0539 sc <- getSparkContext()
0540 invisible(callJMethod(sc, "setJobDescription", value))
0541 }
0542
0543 #' Set a local property that affects jobs submitted from this thread, such as the
0544 #' Spark fair scheduler pool.
0545 #'
0546 #' @param key The key for a local property.
0547 #' @param value The value for a local property.
0548 #' @rdname setLocalProperty
0549 #' @name setLocalProperty
0550 #' @examples
0551 #'\dontrun{
0552 #' setLocalProperty("spark.scheduler.pool", "poolA")
0553 #'}
0554 #' @note setLocalProperty since 2.3.0
0555 setLocalProperty <- function(key, value) {
0556 if (is.null(key) || is.na(key)) {
0557 stop("key should not be NULL or NA.")
0558 }
0559 if (!is.null(value)) {
0560 value <- as.character(value)
0561 }
0562 sc <- getSparkContext()
0563 invisible(callJMethod(sc, "setLocalProperty", as.character(key), value))
0564 }
0565
0566 #' Get a local property set in this thread, or \code{NULL} if it is missing. See
0567 #' \code{setLocalProperty}.
0568 #'
0569 #' @param key The key for a local property.
0570 #' @rdname getLocalProperty
0571 #' @name getLocalProperty
0572 #' @examples
0573 #'\dontrun{
0574 #' getLocalProperty("spark.scheduler.pool")
0575 #'}
0576 #' @note getLocalProperty since 2.3.0
0577 getLocalProperty <- function(key) {
0578 if (is.null(key) || is.na(key)) {
0579 stop("key should not be NULL or NA.")
0580 }
0581 sc <- getSparkContext()
0582 callJMethod(sc, "getLocalProperty", as.character(key))
0583 }
0584
0585 sparkConfToSubmitOps <- new.env()
0586 sparkConfToSubmitOps[["spark.driver.memory"]] <- "--driver-memory"
0587 sparkConfToSubmitOps[["spark.driver.extraClassPath"]] <- "--driver-class-path"
0588 sparkConfToSubmitOps[["spark.driver.extraJavaOptions"]] <- "--driver-java-options"
0589 sparkConfToSubmitOps[["spark.driver.extraLibraryPath"]] <- "--driver-library-path"
0590 sparkConfToSubmitOps[["spark.master"]] <- "--master"
0591 sparkConfToSubmitOps[["spark.yarn.keytab"]] <- "--keytab"
0592 sparkConfToSubmitOps[["spark.yarn.principal"]] <- "--principal"
0593 sparkConfToSubmitOps[["spark.kerberos.keytab"]] <- "--keytab"
0594 sparkConfToSubmitOps[["spark.kerberos.principal"]] <- "--principal"
0595
0596
0597 # Utility function that returns Spark Submit arguments as a string
0598 #
0599 # A few Spark Application and Runtime environment properties cannot take effect after driver
0600 # JVM has started, as documented in:
0601 # http://spark.apache.org/docs/latest/configuration.html#application-properties
0602 # When starting SparkR without using spark-submit, for example, from Rstudio, add them to
0603 # spark-submit commandline if not already set in SPARKR_SUBMIT_ARGS so that they can be effective.
0604 getClientModeSparkSubmitOpts <- function(submitOps, sparkEnvirMap) {
0605 envirToOps <- lapply(ls(sparkConfToSubmitOps), function(conf) {
0606 opsValue <- sparkEnvirMap[[conf]]
0607 # process only if --option is not already specified
0608 if (!is.null(opsValue) &&
0609 nchar(opsValue) > 1 &&
0610 !grepl(sparkConfToSubmitOps[[conf]], submitOps, fixed = TRUE)) {
0611 # put "" around value in case it has spaces
0612 paste0(sparkConfToSubmitOps[[conf]], " \"", opsValue, "\" ")
0613 } else {
0614 ""
0615 }
0616 })
0617 # --option must be before the application class "sparkr-shell" in submitOps
0618 paste0(paste0(envirToOps, collapse = ""), submitOps)
0619 }
0620
0621 # Utility function that handles sparkJars argument, and normalize paths
0622 processSparkJars <- function(jars) {
0623 splittedJars <- splitString(jars)
0624 if (length(splittedJars) > length(jars)) {
0625 warning("sparkJars as a comma-separated string is deprecated, use character vector instead")
0626 }
0627 normalized <- suppressWarnings(normalizePath(splittedJars))
0628 normalized
0629 }
0630
0631 # Utility function that handles sparkPackages argument
0632 processSparkPackages <- function(packages) {
0633 splittedPackages <- splitString(packages)
0634 if (length(splittedPackages) > length(packages)) {
0635 warning("sparkPackages as a comma-separated string is deprecated, use character vector instead")
0636 }
0637 splittedPackages
0638 }
0639
0640 # Utility function that checks and install Spark to local folder if not found
0641 #
0642 # Installation will not be triggered if it's called from sparkR shell
0643 # or if the master url is not local
0644 #
0645 # @param sparkHome directory to find Spark package.
0646 # @param master the Spark master URL, used to check local or remote mode.
0647 # @param deployMode whether to deploy your driver on the worker nodes (cluster)
0648 # or locally as an external client (client).
0649 # @return NULL if no need to update sparkHome, and new sparkHome otherwise.
0650 sparkCheckInstall <- function(sparkHome, master, deployMode) {
0651 if (!isSparkRShell()) {
0652 if (!is.na(file.info(sparkHome)$isdir)) {
0653 message("Spark package found in SPARK_HOME: ", sparkHome)
0654 NULL
0655 } else {
0656 if (interactive() || isMasterLocal(master)) {
0657 message("Spark not found in SPARK_HOME: ", sparkHome)
0658 packageLocalDir <- install.spark()
0659 packageLocalDir
0660 } else if (isClientMode(master) || deployMode == "client") {
0661 msg <- paste0("Spark not found in SPARK_HOME: ",
0662 sparkHome, "\n", installInstruction("remote"))
0663 stop(msg)
0664 } else {
0665 NULL
0666 }
0667 }
0668 } else {
0669 NULL
0670 }
0671 }
0672
0673 # Utility function for sending auth data over a socket and checking the server's reply.
0674 doServerAuth <- function(con, authSecret) {
0675 if (nchar(authSecret) == 0) {
0676 stop("Auth secret not provided.")
0677 }
0678 writeString(con, authSecret)
0679 flush(con)
0680 reply <- readString(con)
0681 if (reply != "ok") {
0682 close(con)
0683 stop("Unexpected reply from server.")
0684 }
0685 }