0001 #
0002 # Licensed to the Apache Software Foundation (ASF) under one or more
0003 # contributor license agreements. See the NOTICE file distributed with
0004 # this work for additional information regarding copyright ownership.
0005 # The ASF licenses this file to You under the Apache License, Version 2.0
0006 # (the "License"); you may not use this file except in compliance with
0007 # the License. You may obtain a copy of the License at
0008 #
0009 # http://www.apache.org/licenses/LICENSE-2.0
0010 #
0011 # Unless required by applicable law or agreed to in writing, software
0012 # distributed under the License is distributed on an "AS IS" BASIS,
0013 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014 # See the License for the specific language governing permissions and
0015 # limitations under the License.
0016 #
0017
0018 # Functions to install Spark in case the user directly downloads SparkR
0019 # from CRAN.
0020
0021 #' Download and Install Apache Spark to a Local Directory
0022 #'
0023 #' \code{install.spark} downloads and installs Spark to a local directory if
0024 #' it is not found. If SPARK_HOME is set in the environment, and that directory is found, that is
0025 #' returned. The Spark version we use is the same as the SparkR version. Users can specify a desired
0026 #' Hadoop version, the remote mirror site, and the directory where the package is installed locally.
0027 #'
0028 #' The full url of remote file is inferred from \code{mirrorUrl} and \code{hadoopVersion}.
0029 #' \code{mirrorUrl} specifies the remote path to a Spark folder. It is followed by a subfolder
0030 #' named after the Spark version (that corresponds to SparkR), and then the tar filename.
0031 #' The filename is composed of four parts, i.e. [Spark version]-bin-[Hadoop version].tgz.
0032 #' For example, the full path for a Spark 2.0.0 package for Hadoop 2.7 from
0033 #' \code{http://apache.osuosl.org} has path:
0034 #' \code{http://apache.osuosl.org/spark/spark-2.0.0/spark-2.0.0-bin-hadoop2.7.tgz}.
0035 #' For \code{hadoopVersion = "without"}, [Hadoop version] in the filename is then
0036 #' \code{without-hadoop}.
0037 #'
0038 #' @param hadoopVersion Version of Hadoop to install. Default is \code{"2.7"}. It can take other
0039 #' version number in the format of "x.y" where x and y are integer.
0040 #' If \code{hadoopVersion = "without"}, "Hadoop free" build is installed.
0041 #' See
0042 #' \href{http://spark.apache.org/docs/latest/hadoop-provided.html}{
0043 #' "Hadoop Free" Build} for more information.
0044 #' Other patched version names can also be used, e.g. \code{"cdh4"}
0045 #' @param mirrorUrl base URL of the repositories to use. The directory layout should follow
0046 #' \href{http://www.apache.org/dyn/closer.lua/spark/}{Apache mirrors}.
0047 #' @param localDir a local directory where Spark is installed. The directory contains
0048 #' version-specific folders of Spark packages. Default is path to
0049 #' the cache directory:
0050 #' \itemize{
0051 #' \item Mac OS X: \file{~/Library/Caches/spark}
0052 #' \item Unix: \env{$XDG_CACHE_HOME} if defined, otherwise \file{~/.cache/spark}
0053 #' \item Windows: \file{\%LOCALAPPDATA\%\\Apache\\Spark\\Cache}.
0054 #' }
0055 #' @param overwrite If \code{TRUE}, download and overwrite the existing tar file in localDir
0056 #' and force re-install Spark (in case the local directory or file is corrupted)
0057 #' @return the (invisible) local directory where Spark is found or installed
0058 #' @rdname install.spark
0059 #' @name install.spark
0060 #' @aliases install.spark
0061 #' @examples
0062 #'\dontrun{
0063 #' install.spark()
0064 #'}
0065 #' @note install.spark since 2.1.0
0066 #' @seealso See available Hadoop versions:
0067 #' \href{http://spark.apache.org/downloads.html}{Apache Spark}
0068 install.spark <- function(hadoopVersion = "2.7", mirrorUrl = NULL,
0069 localDir = NULL, overwrite = FALSE) {
0070 sparkHome <- Sys.getenv("SPARK_HOME")
0071 if (isSparkRShell()) {
0072 stopifnot(nchar(sparkHome) > 0)
0073 message("Spark is already running in sparkR shell.")
0074 return(invisible(sparkHome))
0075 } else if (!is.na(file.info(sparkHome)$isdir)) {
0076 message("Spark package found in SPARK_HOME: ", sparkHome)
0077 return(invisible(sparkHome))
0078 }
0079
0080 version <- paste0("spark-", packageVersion("SparkR"))
0081 hadoopVersion <- tolower(hadoopVersion)
0082 hadoopVersionName <- hadoopVersionName(hadoopVersion)
0083 packageName <- paste(version, "bin", hadoopVersionName, sep = "-")
0084 localDir <- ifelse(is.null(localDir), sparkCachePath(),
0085 normalizePath(localDir, mustWork = FALSE))
0086
0087 if (is.na(file.info(localDir)$isdir)) {
0088 dir.create(localDir, recursive = TRUE)
0089 }
0090
0091 if (overwrite) {
0092 message("Overwrite = TRUE: download and overwrite the tar file",
0093 "and Spark package directory if they exist.")
0094 }
0095
0096 releaseUrl <- Sys.getenv("SPARKR_RELEASE_DOWNLOAD_URL")
0097 if (releaseUrl != "") {
0098 packageName <- basenameSansExtFromUrl(releaseUrl)
0099 }
0100
0101 packageLocalDir <- file.path(localDir, packageName)
0102
0103 # can use dir.exists(packageLocalDir) under R 3.2.0 or later
0104 if (!is.na(file.info(packageLocalDir)$isdir) && !overwrite) {
0105 if (releaseUrl != "") {
0106 message(packageName, " found, setting SPARK_HOME to ", packageLocalDir)
0107 } else {
0108 message(version, " for Hadoop ",
0109 if (hadoopVersion == "without") "Free build" else hadoopVersion,
0110 " found, setting SPARK_HOME to ", packageLocalDir)
0111 }
0112 Sys.setenv(SPARK_HOME = packageLocalDir)
0113 return(invisible(packageLocalDir))
0114 } else {
0115 message("Spark not found in the cache directory. Installation will start.")
0116 }
0117
0118 packageLocalPath <- paste0(packageLocalDir, ".tgz")
0119 tarExists <- file.exists(packageLocalPath)
0120
0121 if (tarExists && !overwrite) {
0122 message("tar file found.")
0123 } else {
0124 if (releaseUrl != "") {
0125 message("Downloading from alternate URL:\n- ", releaseUrl)
0126 success <- downloadUrl(releaseUrl, packageLocalPath)
0127 if (!success) {
0128 unlink(packageLocalPath)
0129 stop("Fetch failed from ", releaseUrl)
0130 }
0131 } else {
0132 robustDownloadTar(mirrorUrl, version, hadoopVersion, packageName, packageLocalPath)
0133 }
0134 }
0135
0136 message("Installing to ", localDir)
0137 # There are two ways untar can fail - untar could stop() on errors like incomplete block on file
0138 # or, tar command can return failure code
0139 success <- tryCatch(untar(tarfile = packageLocalPath, exdir = localDir) == 0,
0140 error = function(e) {
0141 message(e, "\n")
0142 FALSE
0143 },
0144 warning = function(w) {
0145 message(w, "\n")
0146 FALSE
0147 })
0148 if (!tarExists || overwrite || !success) {
0149 unlink(packageLocalPath)
0150 if (success) {
0151 # if tar file was not there before (or it was, but we are told to overwrite it),
0152 # and untar is successful - set a flag that we have downloaded (and untar) Spark package.
0153 assign(".sparkDownloaded", TRUE, envir = .sparkREnv)
0154 }
0155 }
0156 if (!success) stop("Extract archive failed.")
0157 message("DONE.")
0158 Sys.setenv(SPARK_HOME = packageLocalDir)
0159 message("SPARK_HOME set to ", packageLocalDir)
0160 invisible(packageLocalDir)
0161 }
0162
0163 robustDownloadTar <- function(mirrorUrl, version, hadoopVersion, packageName, packageLocalPath) {
0164 # step 1: use user-provided url
0165 if (!is.null(mirrorUrl)) {
0166 message("Use user-provided mirror site: ", mirrorUrl)
0167 success <- directDownloadTar(mirrorUrl, version, hadoopVersion,
0168 packageName, packageLocalPath)
0169 if (success) {
0170 return()
0171 } else {
0172 message("Unable to download from mirrorUrl: ", mirrorUrl)
0173 }
0174 } else {
0175 message("MirrorUrl not provided.")
0176 }
0177
0178 # step 2: use url suggested from apache website
0179 message("Looking for preferred site from apache website...")
0180 mirrorUrl <- getPreferredMirror(version, packageName)
0181 if (!is.null(mirrorUrl)) {
0182 success <- directDownloadTar(mirrorUrl, version, hadoopVersion,
0183 packageName, packageLocalPath)
0184 if (success) return()
0185 } else {
0186 message("Unable to download from preferred mirror site: ", mirrorUrl)
0187 }
0188
0189 # step 3: use backup option
0190 message("To use backup site...")
0191 mirrorUrl <- defaultMirrorUrl()
0192 success <- directDownloadTar(mirrorUrl, version, hadoopVersion,
0193 packageName, packageLocalPath)
0194 if (success) {
0195 return()
0196 } else {
0197 # remove any partially downloaded file
0198 unlink(packageLocalPath)
0199 message("Unable to download from default mirror site: ", mirrorUrl)
0200 stop("Unable to download Spark ", version,
0201 " for Hadoop ", if (hadoopVersion == "without") "Free build" else hadoopVersion,
0202 ". Please check network connection, Hadoop version, or provide other mirror sites.")
0203 }
0204 }
0205
0206 getPreferredMirror <- function(version, packageName) {
0207 jsonUrl <- paste0("http://www.apache.org/dyn/closer.cgi?path=",
0208 file.path("spark", version, packageName),
0209 ".tgz&as_json=1")
0210 textLines <- readLines(jsonUrl, warn = FALSE)
0211 rowNum <- grep('"preferred"', textLines, fixed = TRUE)
0212 linePreferred <- textLines[rowNum]
0213 matchInfo <- regexpr('"[A-Za-z][A-Za-z0-9+-.]*://.+"', linePreferred)
0214 if (matchInfo != -1) {
0215 startPos <- matchInfo + 1
0216 endPos <- matchInfo + attr(matchInfo, "match.length") - 2
0217 mirrorPreferred <- base::substr(linePreferred, startPos, endPos)
0218 mirrorPreferred <- paste0(mirrorPreferred, "spark")
0219 message("Preferred mirror site found: ", mirrorPreferred)
0220 } else {
0221 mirrorPreferred <- NULL
0222 }
0223 mirrorPreferred
0224 }
0225
0226 directDownloadTar <- function(mirrorUrl, version, hadoopVersion, packageName, packageLocalPath) {
0227 packageRemotePath <- paste0(file.path(mirrorUrl, version, packageName), ".tgz")
0228 message("Downloading ", version, " for Hadoop ",
0229 if (hadoopVersion == "without") "Free build" else hadoopVersion,
0230 " from:\n- ", packageRemotePath)
0231 downloadUrl(packageRemotePath, packageLocalPath)
0232 }
0233
0234 downloadUrl <- function(remotePath, localPath) {
0235 isFail <- tryCatch(download.file(remotePath, localPath),
0236 error = function(e) {
0237 message(e, "\n")
0238 TRUE
0239 },
0240 warning = function(w) {
0241 message(w, "\n")
0242 TRUE
0243 })
0244 !isFail
0245 }
0246
0247 defaultMirrorUrl <- function() {
0248 "http://www-us.apache.org/dist/spark"
0249 }
0250
0251 hadoopVersionName <- function(hadoopVersion) {
0252 if (hadoopVersion == "without") {
0253 "without-hadoop"
0254 } else if (grepl("^[0-9]+\\.[0-9]+$", hadoopVersion, perl = TRUE)) {
0255 paste0("hadoop", hadoopVersion)
0256 } else {
0257 hadoopVersion
0258 }
0259 }
0260
0261 # The implementation refers to appdirs package: https://pypi.python.org/pypi/appdirs and
0262 # adapt to Spark context
0263 # see also sparkCacheRelPathLength()
0264 sparkCachePath <- function() {
0265 if (is_windows()) {
0266 winAppPath <- Sys.getenv("LOCALAPPDATA", unset = NA)
0267 if (is.na(winAppPath)) {
0268 message("%LOCALAPPDATA% not found. Falling back to %USERPROFILE%.")
0269 winAppPath <- Sys.getenv("USERPROFILE", unset = NA)
0270 }
0271 if (is.na(winAppPath)) {
0272 stop("%LOCALAPPDATA% and %USERPROFILE% not found. ",
0273 "Please define the environment variable ",
0274 "or restart and enter an installation path in localDir.")
0275 } else {
0276 path <- file.path(winAppPath, "Apache", "Spark", "Cache")
0277 }
0278 } else if (.Platform$OS.type == "unix") {
0279 if (Sys.info()["sysname"] == "Darwin") {
0280 path <- file.path(Sys.getenv("HOME"), "Library", "Caches", "spark")
0281 } else {
0282 path <- file.path(
0283 Sys.getenv("XDG_CACHE_HOME", file.path(Sys.getenv("HOME"), ".cache")), "spark")
0284 }
0285 } else {
0286 stop("Unknown OS: ", .Platform$OS.type)
0287 }
0288 normalizePath(path, mustWork = FALSE)
0289 }
0290
0291 # Length of the Spark cache specific relative path segments for each platform
0292 # eg. "Apache\Spark\Cache" is 3 in Windows, or "spark" is 1 in unix
0293 # Must match sparkCachePath() exactly.
0294 sparkCacheRelPathLength <- function() {
0295 if (is_windows()) {
0296 3
0297 } else {
0298 1
0299 }
0300 }
0301
0302 installInstruction <- function(mode) {
0303 if (mode == "remote") {
0304 paste0("Connecting to a remote Spark master. ",
0305 "Please make sure Spark package is also installed in this machine.\n",
0306 "- If there is one, set the path in sparkHome parameter or ",
0307 "environment variable SPARK_HOME.\n",
0308 "- If not, you may run install.spark function to do the job. ",
0309 "Please make sure the Spark and the Hadoop versions ",
0310 "match the versions on the cluster. ",
0311 "SparkR package is compatible with Spark ", packageVersion("SparkR"), ".",
0312 "If you need further help, ",
0313 "contact the administrators of the cluster.")
0314 } else {
0315 stop("No instruction found for mode ", mode)
0316 }
0317 }
0318
0319 uninstallDownloadedSpark <- function() {
0320 # clean up if Spark was downloaded
0321 sparkDownloaded <- getOne(".sparkDownloaded",
0322 envir = .sparkREnv,
0323 inherits = TRUE,
0324 ifnotfound = FALSE)
0325 sparkDownloadedDir <- Sys.getenv("SPARK_HOME")
0326 if (sparkDownloaded && nchar(sparkDownloadedDir) > 0) {
0327 unlink(sparkDownloadedDir, recursive = TRUE, force = TRUE)
0328
0329 dirs <- traverseParentDirs(sparkCachePath(), sparkCacheRelPathLength())
0330 lapply(dirs, function(d) {
0331 if (length(list.files(d, all.files = TRUE, include.dirs = TRUE, no.. = TRUE)) == 0) {
0332 unlink(d, recursive = TRUE, force = TRUE)
0333 }
0334 })
0335 }
0336 }