Back to home page

OSCL-LXR

 
 

    


0001 #
0002 # Licensed to the Apache Software Foundation (ASF) under one or more
0003 # contributor license agreements.  See the NOTICE file distributed with
0004 # this work for additional information regarding copyright ownership.
0005 # The ASF licenses this file to You under the Apache License, Version 2.0
0006 # (the "License"); you may not use this file except in compliance with
0007 # the License.  You may obtain a copy of the License at
0008 #
0009 #    http://www.apache.org/licenses/LICENSE-2.0
0010 #
0011 # Unless required by applicable law or agreed to in writing, software
0012 # distributed under the License is distributed on an "AS IS" BASIS,
0013 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014 # See the License for the specific language governing permissions and
0015 # limitations under the License.
0016 #
0017 
0018 # Functions to install Spark in case the user directly downloads SparkR
0019 # from CRAN.
0020 
0021 #' Download and Install Apache Spark to a Local Directory
0022 #'
0023 #' \code{install.spark} downloads and installs Spark to a local directory if
0024 #' it is not found. If SPARK_HOME is set in the environment, and that directory is found, that is
0025 #' returned. The Spark version we use is the same as the SparkR version. Users can specify a desired
0026 #' Hadoop version, the remote mirror site, and the directory where the package is installed locally.
0027 #'
0028 #' The full url of remote file is inferred from \code{mirrorUrl} and \code{hadoopVersion}.
0029 #' \code{mirrorUrl} specifies the remote path to a Spark folder. It is followed by a subfolder
0030 #' named after the Spark version (that corresponds to SparkR), and then the tar filename.
0031 #' The filename is composed of four parts, i.e. [Spark version]-bin-[Hadoop version].tgz.
0032 #' For example, the full path for a Spark 2.0.0 package for Hadoop 2.7 from
0033 #' \code{http://apache.osuosl.org} has path:
0034 #' \code{http://apache.osuosl.org/spark/spark-2.0.0/spark-2.0.0-bin-hadoop2.7.tgz}.
0035 #' For \code{hadoopVersion = "without"}, [Hadoop version] in the filename is then
0036 #' \code{without-hadoop}.
0037 #'
0038 #' @param hadoopVersion Version of Hadoop to install. Default is \code{"2.7"}. It can take other
0039 #'                      version number in the format of "x.y" where x and y are integer.
0040 #'                      If \code{hadoopVersion = "without"}, "Hadoop free" build is installed.
0041 #'                      See
0042 #'                      \href{http://spark.apache.org/docs/latest/hadoop-provided.html}{
0043 #'                      "Hadoop Free" Build} for more information.
0044 #'                      Other patched version names can also be used, e.g. \code{"cdh4"}
0045 #' @param mirrorUrl base URL of the repositories to use. The directory layout should follow
0046 #'                  \href{http://www.apache.org/dyn/closer.lua/spark/}{Apache mirrors}.
0047 #' @param localDir a local directory where Spark is installed. The directory contains
0048 #'                 version-specific folders of Spark packages. Default is path to
0049 #'                 the cache directory:
0050 #'                 \itemize{
0051 #'                   \item Mac OS X: \file{~/Library/Caches/spark}
0052 #'                   \item Unix: \env{$XDG_CACHE_HOME} if defined, otherwise \file{~/.cache/spark}
0053 #'                   \item Windows: \file{\%LOCALAPPDATA\%\\Apache\\Spark\\Cache}.
0054 #'                 }
0055 #' @param overwrite If \code{TRUE}, download and overwrite the existing tar file in localDir
0056 #'                  and force re-install Spark (in case the local directory or file is corrupted)
0057 #' @return the (invisible) local directory where Spark is found or installed
0058 #' @rdname install.spark
0059 #' @name install.spark
0060 #' @aliases install.spark
0061 #' @examples
0062 #'\dontrun{
0063 #' install.spark()
0064 #'}
0065 #' @note install.spark since 2.1.0
0066 #' @seealso See available Hadoop versions:
0067 #'          \href{http://spark.apache.org/downloads.html}{Apache Spark}
0068 install.spark <- function(hadoopVersion = "2.7", mirrorUrl = NULL,
0069                           localDir = NULL, overwrite = FALSE) {
0070   sparkHome <- Sys.getenv("SPARK_HOME")
0071   if (isSparkRShell()) {
0072     stopifnot(nchar(sparkHome) > 0)
0073     message("Spark is already running in sparkR shell.")
0074     return(invisible(sparkHome))
0075   } else if (!is.na(file.info(sparkHome)$isdir)) {
0076     message("Spark package found in SPARK_HOME: ", sparkHome)
0077     return(invisible(sparkHome))
0078   }
0079 
0080   version <- paste0("spark-", packageVersion("SparkR"))
0081   hadoopVersion <- tolower(hadoopVersion)
0082   hadoopVersionName <- hadoopVersionName(hadoopVersion)
0083   packageName <- paste(version, "bin", hadoopVersionName, sep = "-")
0084   localDir <- ifelse(is.null(localDir), sparkCachePath(),
0085                      normalizePath(localDir, mustWork = FALSE))
0086 
0087   if (is.na(file.info(localDir)$isdir)) {
0088     dir.create(localDir, recursive = TRUE)
0089   }
0090 
0091   if (overwrite) {
0092     message("Overwrite = TRUE: download and overwrite the tar file",
0093             "and Spark package directory if they exist.")
0094   }
0095 
0096   releaseUrl <- Sys.getenv("SPARKR_RELEASE_DOWNLOAD_URL")
0097   if (releaseUrl != "") {
0098     packageName <- basenameSansExtFromUrl(releaseUrl)
0099   }
0100 
0101   packageLocalDir <- file.path(localDir, packageName)
0102 
0103   # can use dir.exists(packageLocalDir) under R 3.2.0 or later
0104   if (!is.na(file.info(packageLocalDir)$isdir) && !overwrite) {
0105     if (releaseUrl != "") {
0106       message(packageName, " found, setting SPARK_HOME to ", packageLocalDir)
0107     } else {
0108       message(version, " for Hadoop ",
0109               if (hadoopVersion == "without") "Free build" else hadoopVersion,
0110               " found, setting SPARK_HOME to ", packageLocalDir)
0111     }
0112     Sys.setenv(SPARK_HOME = packageLocalDir)
0113     return(invisible(packageLocalDir))
0114   } else {
0115     message("Spark not found in the cache directory. Installation will start.")
0116   }
0117 
0118   packageLocalPath <- paste0(packageLocalDir, ".tgz")
0119   tarExists <- file.exists(packageLocalPath)
0120 
0121   if (tarExists && !overwrite) {
0122     message("tar file found.")
0123   } else {
0124     if (releaseUrl != "") {
0125       message("Downloading from alternate URL:\n- ", releaseUrl)
0126       success <- downloadUrl(releaseUrl, packageLocalPath)
0127       if (!success) {
0128         unlink(packageLocalPath)
0129         stop("Fetch failed from ", releaseUrl)
0130       }
0131     } else {
0132       robustDownloadTar(mirrorUrl, version, hadoopVersion, packageName, packageLocalPath)
0133     }
0134   }
0135 
0136   message("Installing to ", localDir)
0137   # There are two ways untar can fail - untar could stop() on errors like incomplete block on file
0138   # or, tar command can return failure code
0139   success <- tryCatch(untar(tarfile = packageLocalPath, exdir = localDir) == 0,
0140                      error = function(e) {
0141                        message(e, "\n")
0142                        FALSE
0143                      },
0144                      warning = function(w) {
0145                        message(w, "\n")
0146                        FALSE
0147                      })
0148   if (!tarExists || overwrite || !success) {
0149     unlink(packageLocalPath)
0150     if (success) {
0151       # if tar file was not there before (or it was, but we are told to overwrite it),
0152       # and untar is successful - set a flag that we have downloaded (and untar) Spark package.
0153       assign(".sparkDownloaded", TRUE, envir = .sparkREnv)
0154     }
0155   }
0156   if (!success) stop("Extract archive failed.")
0157   message("DONE.")
0158   Sys.setenv(SPARK_HOME = packageLocalDir)
0159   message("SPARK_HOME set to ", packageLocalDir)
0160   invisible(packageLocalDir)
0161 }
0162 
0163 robustDownloadTar <- function(mirrorUrl, version, hadoopVersion, packageName, packageLocalPath) {
0164   # step 1: use user-provided url
0165   if (!is.null(mirrorUrl)) {
0166     message("Use user-provided mirror site: ", mirrorUrl)
0167     success <- directDownloadTar(mirrorUrl, version, hadoopVersion,
0168                                    packageName, packageLocalPath)
0169     if (success) {
0170       return()
0171     } else {
0172       message("Unable to download from mirrorUrl: ", mirrorUrl)
0173     }
0174   } else {
0175     message("MirrorUrl not provided.")
0176   }
0177 
0178   # step 2: use url suggested from apache website
0179   message("Looking for preferred site from apache website...")
0180   mirrorUrl <- getPreferredMirror(version, packageName)
0181   if (!is.null(mirrorUrl)) {
0182     success <- directDownloadTar(mirrorUrl, version, hadoopVersion,
0183                                    packageName, packageLocalPath)
0184     if (success) return()
0185   } else {
0186     message("Unable to download from preferred mirror site: ", mirrorUrl)
0187   }
0188 
0189   # step 3: use backup option
0190   message("To use backup site...")
0191   mirrorUrl <- defaultMirrorUrl()
0192   success <- directDownloadTar(mirrorUrl, version, hadoopVersion,
0193                                  packageName, packageLocalPath)
0194   if (success) {
0195     return()
0196   } else {
0197     # remove any partially downloaded file
0198     unlink(packageLocalPath)
0199     message("Unable to download from default mirror site: ", mirrorUrl)
0200     stop("Unable to download Spark ", version,
0201          " for Hadoop ", if (hadoopVersion == "without") "Free build" else hadoopVersion,
0202          ". Please check network connection, Hadoop version, or provide other mirror sites.")
0203   }
0204 }
0205 
0206 getPreferredMirror <- function(version, packageName) {
0207   jsonUrl <- paste0("http://www.apache.org/dyn/closer.cgi?path=",
0208                         file.path("spark", version, packageName),
0209                         ".tgz&as_json=1")
0210   textLines <- readLines(jsonUrl, warn = FALSE)
0211   rowNum <- grep('"preferred"', textLines, fixed = TRUE)
0212   linePreferred <- textLines[rowNum]
0213   matchInfo <- regexpr('"[A-Za-z][A-Za-z0-9+-.]*://.+"', linePreferred)
0214   if (matchInfo != -1) {
0215     startPos <- matchInfo + 1
0216     endPos <- matchInfo + attr(matchInfo, "match.length") - 2
0217     mirrorPreferred <- base::substr(linePreferred, startPos, endPos)
0218     mirrorPreferred <- paste0(mirrorPreferred, "spark")
0219     message("Preferred mirror site found: ", mirrorPreferred)
0220   } else {
0221     mirrorPreferred <- NULL
0222   }
0223   mirrorPreferred
0224 }
0225 
0226 directDownloadTar <- function(mirrorUrl, version, hadoopVersion, packageName, packageLocalPath) {
0227   packageRemotePath <- paste0(file.path(mirrorUrl, version, packageName), ".tgz")
0228   message("Downloading ", version, " for Hadoop ",
0229           if (hadoopVersion == "without") "Free build" else hadoopVersion,
0230           " from:\n- ", packageRemotePath)
0231   downloadUrl(packageRemotePath, packageLocalPath)
0232 }
0233 
0234 downloadUrl <- function(remotePath, localPath) {
0235   isFail <- tryCatch(download.file(remotePath, localPath),
0236                      error = function(e) {
0237                        message(e, "\n")
0238                        TRUE
0239                      },
0240                      warning = function(w) {
0241                        message(w, "\n")
0242                        TRUE
0243                      })
0244   !isFail
0245 }
0246 
0247 defaultMirrorUrl <- function() {
0248   "http://www-us.apache.org/dist/spark"
0249 }
0250 
0251 hadoopVersionName <- function(hadoopVersion) {
0252   if (hadoopVersion == "without") {
0253     "without-hadoop"
0254   } else if (grepl("^[0-9]+\\.[0-9]+$", hadoopVersion, perl = TRUE)) {
0255     paste0("hadoop", hadoopVersion)
0256   } else {
0257     hadoopVersion
0258   }
0259 }
0260 
0261 # The implementation refers to appdirs package: https://pypi.python.org/pypi/appdirs and
0262 # adapt to Spark context
0263 # see also sparkCacheRelPathLength()
0264 sparkCachePath <- function() {
0265   if (is_windows()) {
0266     winAppPath <- Sys.getenv("LOCALAPPDATA", unset = NA)
0267     if (is.na(winAppPath)) {
0268       message("%LOCALAPPDATA% not found. Falling back to %USERPROFILE%.")
0269       winAppPath <- Sys.getenv("USERPROFILE", unset = NA)
0270     }
0271     if (is.na(winAppPath)) {
0272       stop("%LOCALAPPDATA% and %USERPROFILE% not found. ",
0273            "Please define the environment variable ",
0274            "or restart and enter an installation path in localDir.")
0275     } else {
0276       path <- file.path(winAppPath, "Apache", "Spark", "Cache")
0277     }
0278   } else if (.Platform$OS.type == "unix") {
0279     if (Sys.info()["sysname"] == "Darwin") {
0280       path <- file.path(Sys.getenv("HOME"), "Library", "Caches", "spark")
0281     } else {
0282       path <- file.path(
0283         Sys.getenv("XDG_CACHE_HOME", file.path(Sys.getenv("HOME"), ".cache")), "spark")
0284     }
0285   } else {
0286     stop("Unknown OS: ", .Platform$OS.type)
0287   }
0288   normalizePath(path, mustWork = FALSE)
0289 }
0290 
0291 # Length of the Spark cache specific relative path segments for each platform
0292 # eg. "Apache\Spark\Cache" is 3 in Windows, or "spark" is 1 in unix
0293 # Must match sparkCachePath() exactly.
0294 sparkCacheRelPathLength <- function() {
0295   if (is_windows()) {
0296     3
0297   } else {
0298     1
0299   }
0300 }
0301 
0302 installInstruction <- function(mode) {
0303   if (mode == "remote") {
0304     paste0("Connecting to a remote Spark master. ",
0305            "Please make sure Spark package is also installed in this machine.\n",
0306            "- If there is one, set the path in sparkHome parameter or ",
0307            "environment variable SPARK_HOME.\n",
0308            "- If not, you may run install.spark function to do the job. ",
0309            "Please make sure the Spark and the Hadoop versions ",
0310            "match the versions on the cluster. ",
0311            "SparkR package is compatible with Spark ", packageVersion("SparkR"), ".",
0312            "If you need further help, ",
0313            "contact the administrators of the cluster.")
0314   } else {
0315     stop("No instruction found for mode ", mode)
0316   }
0317 }
0318 
0319 uninstallDownloadedSpark <- function() {
0320   # clean up if Spark was downloaded
0321   sparkDownloaded <- getOne(".sparkDownloaded",
0322                             envir = .sparkREnv,
0323                             inherits = TRUE,
0324                             ifnotfound = FALSE)
0325   sparkDownloadedDir <- Sys.getenv("SPARK_HOME")
0326   if (sparkDownloaded && nchar(sparkDownloadedDir) > 0) {
0327     unlink(sparkDownloadedDir, recursive = TRUE, force = TRUE)
0328 
0329     dirs <- traverseParentDirs(sparkCachePath(), sparkCacheRelPathLength())
0330     lapply(dirs, function(d) {
0331       if (length(list.files(d, all.files = TRUE, include.dirs = TRUE, no.. = TRUE)) == 0) {
0332         unlink(d, recursive = TRUE, force = TRUE)
0333       }
0334     })
0335   }
0336 }