Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.launcher;
0019 
0020 import java.io.BufferedReader;
0021 import java.io.File;
0022 import java.io.FileInputStream;
0023 import java.io.InputStreamReader;
0024 import java.io.IOException;
0025 import java.nio.charset.StandardCharsets;
0026 import java.util.ArrayList;
0027 import java.util.Arrays;
0028 import java.util.HashMap;
0029 import java.util.LinkedHashSet;
0030 import java.util.List;
0031 import java.util.Map;
0032 import java.util.Properties;
0033 import java.util.Set;
0034 import java.util.regex.Pattern;
0035 
0036 import static org.apache.spark.launcher.CommandBuilderUtils.*;
0037 
0038 /**
0039  * Abstract Spark command builder that defines common functionality.
0040  */
0041 abstract class AbstractCommandBuilder {
0042 
0043   boolean verbose;
0044   String appName;
0045   String appResource;
0046   String deployMode;
0047   String javaHome;
0048   String mainClass;
0049   String master;
0050   protected String propertiesFile;
0051   final List<String> appArgs;
0052   final List<String> jars;
0053   final List<String> files;
0054   final List<String> pyFiles;
0055   final Map<String, String> childEnv;
0056   final Map<String, String> conf;
0057 
0058   // The merged configuration for the application. Cached to avoid having to read / parse
0059   // properties files multiple times.
0060   private Map<String, String> effectiveConfig;
0061 
0062   AbstractCommandBuilder() {
0063     this.appArgs = new ArrayList<>();
0064     this.childEnv = new HashMap<>();
0065     this.conf = new HashMap<>();
0066     this.files = new ArrayList<>();
0067     this.jars = new ArrayList<>();
0068     this.pyFiles = new ArrayList<>();
0069   }
0070 
0071   /**
0072    * Builds the command to execute.
0073    *
0074    * @param env A map containing environment variables for the child process. It may already contain
0075    *            entries defined by the user (such as SPARK_HOME, or those defined by the
0076    *            SparkLauncher constructor that takes an environment), and may be modified to
0077    *            include other variables needed by the process to be executed.
0078    */
0079   abstract List<String> buildCommand(Map<String, String> env)
0080       throws IOException, IllegalArgumentException;
0081 
0082   /**
0083    * Builds a list of arguments to run java.
0084    *
0085    * This method finds the java executable to use and appends JVM-specific options for running a
0086    * class with Spark in the classpath. It also loads options from the "java-opts" file in the
0087    * configuration directory being used.
0088    *
0089    * Callers should still add at least the class to run, as well as any arguments to pass to the
0090    * class.
0091    */
0092   List<String> buildJavaCommand(String extraClassPath) throws IOException {
0093     List<String> cmd = new ArrayList<>();
0094 
0095     String[] candidateJavaHomes = new String[] {
0096       javaHome,
0097       childEnv.get("JAVA_HOME"),
0098       System.getenv("JAVA_HOME"),
0099       System.getProperty("java.home")
0100     };
0101     for (String javaHome : candidateJavaHomes) {
0102       if (javaHome != null) {
0103         cmd.add(join(File.separator, javaHome, "bin", "java"));
0104         break;
0105       }
0106     }
0107 
0108     // Load extra JAVA_OPTS from conf/java-opts, if it exists.
0109     File javaOpts = new File(join(File.separator, getConfDir(), "java-opts"));
0110     if (javaOpts.isFile()) {
0111       try (BufferedReader br = new BufferedReader(new InputStreamReader(
0112           new FileInputStream(javaOpts), StandardCharsets.UTF_8))) {
0113         String line;
0114         while ((line = br.readLine()) != null) {
0115           addOptionString(cmd, line);
0116         }
0117       }
0118     }
0119 
0120     cmd.add("-cp");
0121     cmd.add(join(File.pathSeparator, buildClassPath(extraClassPath)));
0122     return cmd;
0123   }
0124 
0125   void addOptionString(List<String> cmd, String options) {
0126     if (!isEmpty(options)) {
0127       for (String opt : parseOptionString(options)) {
0128         cmd.add(opt);
0129       }
0130     }
0131   }
0132 
0133   /**
0134    * Builds the classpath for the application. Returns a list with one classpath entry per element;
0135    * each entry is formatted in the way expected by <i>java.net.URLClassLoader</i> (more
0136    * specifically, with trailing slashes for directories).
0137    */
0138   List<String> buildClassPath(String appClassPath) throws IOException {
0139     String sparkHome = getSparkHome();
0140 
0141     Set<String> cp = new LinkedHashSet<>();
0142     addToClassPath(cp, appClassPath);
0143 
0144     addToClassPath(cp, getConfDir());
0145 
0146     boolean prependClasses = !isEmpty(getenv("SPARK_PREPEND_CLASSES"));
0147     boolean isTesting = "1".equals(getenv("SPARK_TESTING"));
0148     if (prependClasses || isTesting) {
0149       String scala = getScalaVersion();
0150       List<String> projects = Arrays.asList(
0151         "common/kvstore",
0152         "common/network-common",
0153         "common/network-shuffle",
0154         "common/network-yarn",
0155         "common/sketch",
0156         "common/tags",
0157         "common/unsafe",
0158         "core",
0159         "examples",
0160         "graphx",
0161         "launcher",
0162         "mllib",
0163         "repl",
0164         "resource-managers/mesos",
0165         "resource-managers/yarn",
0166         "sql/catalyst",
0167         "sql/core",
0168         "sql/hive",
0169         "sql/hive-thriftserver",
0170         "streaming"
0171       );
0172       if (prependClasses) {
0173         if (!isTesting) {
0174           System.err.println(
0175             "NOTE: SPARK_PREPEND_CLASSES is set, placing locally compiled Spark classes ahead of " +
0176             "assembly.");
0177         }
0178         for (String project : projects) {
0179           addToClassPath(cp, String.format("%s/%s/target/scala-%s/classes", sparkHome, project,
0180             scala));
0181         }
0182       }
0183       if (isTesting) {
0184         for (String project : projects) {
0185           addToClassPath(cp, String.format("%s/%s/target/scala-%s/test-classes", sparkHome,
0186             project, scala));
0187         }
0188       }
0189 
0190       // Add this path to include jars that are shaded in the final deliverable created during
0191       // the maven build. These jars are copied to this directory during the build.
0192       addToClassPath(cp, String.format("%s/core/target/jars/*", sparkHome));
0193       addToClassPath(cp, String.format("%s/mllib/target/jars/*", sparkHome));
0194     }
0195 
0196     // Add Spark jars to the classpath. For the testing case, we rely on the test code to set and
0197     // propagate the test classpath appropriately. For normal invocation, look for the jars
0198     // directory under SPARK_HOME.
0199     boolean isTestingSql = "1".equals(getenv("SPARK_SQL_TESTING"));
0200     String jarsDir = findJarsDir(getSparkHome(), getScalaVersion(), !isTesting && !isTestingSql);
0201     if (jarsDir != null) {
0202       addToClassPath(cp, join(File.separator, jarsDir, "*"));
0203     }
0204 
0205     addToClassPath(cp, getenv("HADOOP_CONF_DIR"));
0206     addToClassPath(cp, getenv("YARN_CONF_DIR"));
0207     addToClassPath(cp, getenv("SPARK_DIST_CLASSPATH"));
0208     return new ArrayList<>(cp);
0209   }
0210 
0211   /**
0212    * Adds entries to the classpath.
0213    *
0214    * @param cp List to which the new entries are appended.
0215    * @param entries New classpath entries (separated by File.pathSeparator).
0216    */
0217   private void addToClassPath(Set<String> cp, String entries) {
0218     if (isEmpty(entries)) {
0219       return;
0220     }
0221     String[] split = entries.split(Pattern.quote(File.pathSeparator));
0222     for (String entry : split) {
0223       if (!isEmpty(entry)) {
0224         if (new File(entry).isDirectory() && !entry.endsWith(File.separator)) {
0225           entry += File.separator;
0226         }
0227         cp.add(entry);
0228       }
0229     }
0230   }
0231 
0232   String getScalaVersion() {
0233     String scala = getenv("SPARK_SCALA_VERSION");
0234     if (scala != null) {
0235       return scala;
0236     }
0237     String sparkHome = getSparkHome();
0238     // TODO: revisit for Scala 2.13 support
0239     File scala212 = new File(sparkHome, "launcher/target/scala-2.12");
0240     // File scala211 = new File(sparkHome, "launcher/target/scala-2.11");
0241     // checkState(!scala212.isDirectory() || !scala211.isDirectory(),
0242     //   "Presence of build for multiple Scala versions detected.\n" +
0243     //   "Either clean one of them or set SPARK_SCALA_VERSION in your environment.");
0244     // if (scala212.isDirectory()) {
0245     //   return "2.12";
0246     // } else {
0247     //   checkState(scala211.isDirectory(), "Cannot find any build directories.");
0248     //   return "2.11";
0249     // }
0250     checkState(scala212.isDirectory(), "Cannot find any build directories.");
0251     return "2.12";
0252   }
0253 
0254   String getSparkHome() {
0255     String path = getenv(ENV_SPARK_HOME);
0256     if (path == null && "1".equals(getenv("SPARK_TESTING"))) {
0257       path = System.getProperty("spark.test.home");
0258     }
0259     checkState(path != null,
0260       "Spark home not found; set it explicitly or use the SPARK_HOME environment variable.");
0261     return path;
0262   }
0263 
0264   String getenv(String key) {
0265     return firstNonEmpty(childEnv.get(key), System.getenv(key));
0266   }
0267 
0268   void setPropertiesFile(String path) {
0269     effectiveConfig = null;
0270     this.propertiesFile = path;
0271   }
0272 
0273   Map<String, String> getEffectiveConfig() throws IOException {
0274     if (effectiveConfig == null) {
0275       effectiveConfig = new HashMap<>(conf);
0276       Properties p = loadPropertiesFile();
0277       for (String key : p.stringPropertyNames()) {
0278         if (!effectiveConfig.containsKey(key)) {
0279           effectiveConfig.put(key, p.getProperty(key));
0280         }
0281       }
0282     }
0283     return effectiveConfig;
0284   }
0285 
0286   /**
0287    * Loads the configuration file for the application, if it exists. This is either the
0288    * user-specified properties file, or the spark-defaults.conf file under the Spark configuration
0289    * directory.
0290    */
0291   private Properties loadPropertiesFile() throws IOException {
0292     Properties props = new Properties();
0293     File propsFile;
0294     if (propertiesFile != null) {
0295       propsFile = new File(propertiesFile);
0296       checkArgument(propsFile.isFile(), "Invalid properties file '%s'.", propertiesFile);
0297     } else {
0298       propsFile = new File(getConfDir(), DEFAULT_PROPERTIES_FILE);
0299     }
0300 
0301     if (propsFile.isFile()) {
0302       try (InputStreamReader isr = new InputStreamReader(
0303           new FileInputStream(propsFile), StandardCharsets.UTF_8)) {
0304         props.load(isr);
0305         for (Map.Entry<Object, Object> e : props.entrySet()) {
0306           e.setValue(e.getValue().toString().trim());
0307         }
0308       }
0309     }
0310     return props;
0311   }
0312 
0313   private String getConfDir() {
0314     String confDir = getenv("SPARK_CONF_DIR");
0315     return confDir != null ? confDir : join(File.separator, getSparkHome(), "conf");
0316   }
0317 
0318 }