Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.launcher;
0019 
0020 import java.io.File;
0021 import java.io.IOException;
0022 import java.util.*;
0023 
0024 import static org.apache.spark.launcher.CommandBuilderUtils.*;
0025 
0026 /**
0027  * Special command builder for handling a CLI invocation of SparkSubmit.
0028  * <p>
0029  * This builder adds command line parsing compatible with SparkSubmit. It handles setting
0030  * driver-side options and special parsing behavior needed for the special-casing certain internal
0031  * Spark applications.
0032  * <p>
0033  * This class has also some special features to aid launching shells (pyspark and sparkR) and also
0034  * examples.
0035  */
0036 class SparkSubmitCommandBuilder extends AbstractCommandBuilder {
0037 
0038   /**
0039    * Name of the app resource used to identify the PySpark shell. The command line parser expects
0040    * the resource name to be the very first argument to spark-submit in this case.
0041    *
0042    * NOTE: this cannot be "pyspark-shell" since that identifies the PySpark shell to SparkSubmit
0043    * (see java_gateway.py), and can cause this code to enter into an infinite loop.
0044    */
0045   static final String PYSPARK_SHELL = "pyspark-shell-main";
0046 
0047   /**
0048    * This is the actual resource name that identifies the PySpark shell to SparkSubmit.
0049    */
0050   static final String PYSPARK_SHELL_RESOURCE = "pyspark-shell";
0051 
0052   /**
0053    * Name of the app resource used to identify the SparkR shell. The command line parser expects
0054    * the resource name to be the very first argument to spark-submit in this case.
0055    *
0056    * NOTE: this cannot be "sparkr-shell" since that identifies the SparkR shell to SparkSubmit
0057    * (see sparkR.R), and can cause this code to enter into an infinite loop.
0058    */
0059   static final String SPARKR_SHELL = "sparkr-shell-main";
0060 
0061   /**
0062    * This is the actual resource name that identifies the SparkR shell to SparkSubmit.
0063    */
0064   static final String SPARKR_SHELL_RESOURCE = "sparkr-shell";
0065 
0066   /**
0067    * Name of app resource used to identify examples. When running examples, args[0] should be
0068    * this name. The app resource will identify the example class to run.
0069    */
0070   static final String RUN_EXAMPLE = "run-example";
0071 
0072   /**
0073    * Prefix for example class names.
0074    */
0075   static final String EXAMPLE_CLASS_PREFIX = "org.apache.spark.examples.";
0076 
0077   /**
0078    * This map must match the class names for available special classes, since this modifies the way
0079    * command line parsing works. This maps the class name to the resource to use when calling
0080    * spark-submit.
0081    */
0082   private static final Map<String, String> specialClasses = new HashMap<>();
0083   static {
0084     specialClasses.put("org.apache.spark.repl.Main", "spark-shell");
0085     specialClasses.put("org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver",
0086       SparkLauncher.NO_RESOURCE);
0087     specialClasses.put("org.apache.spark.sql.hive.thriftserver.HiveThriftServer2",
0088       SparkLauncher.NO_RESOURCE);
0089   }
0090 
0091   final List<String> userArgs;
0092   private final List<String> parsedArgs;
0093   // Special command means no appResource and no mainClass required
0094   private final boolean isSpecialCommand;
0095   private final boolean isExample;
0096 
0097   /**
0098    * Controls whether mixing spark-submit arguments with app arguments is allowed. This is needed
0099    * to parse the command lines for things like bin/spark-shell, which allows users to mix and
0100    * match arguments (e.g. "bin/spark-shell SparkShellArg --master foo").
0101    */
0102   private boolean allowsMixedArguments;
0103 
0104   /**
0105    * This constructor is used when creating a user-configurable launcher. It allows the
0106    * spark-submit argument list to be modified after creation.
0107    */
0108   SparkSubmitCommandBuilder() {
0109     this.isSpecialCommand = false;
0110     this.isExample = false;
0111     this.parsedArgs = new ArrayList<>();
0112     this.userArgs = new ArrayList<>();
0113   }
0114 
0115   /**
0116    * This constructor is used when invoking spark-submit; it parses and validates arguments
0117    * provided by the user on the command line.
0118    */
0119   SparkSubmitCommandBuilder(List<String> args) {
0120     this.allowsMixedArguments = false;
0121     this.parsedArgs = new ArrayList<>();
0122     boolean isExample = false;
0123     List<String> submitArgs = args;
0124     this.userArgs = Collections.emptyList();
0125 
0126     if (args.size() > 0) {
0127       switch (args.get(0)) {
0128         case PYSPARK_SHELL:
0129           this.allowsMixedArguments = true;
0130           appResource = PYSPARK_SHELL;
0131           submitArgs = args.subList(1, args.size());
0132           break;
0133 
0134         case SPARKR_SHELL:
0135           this.allowsMixedArguments = true;
0136           appResource = SPARKR_SHELL;
0137           submitArgs = args.subList(1, args.size());
0138           break;
0139 
0140         case RUN_EXAMPLE:
0141           isExample = true;
0142           appResource = SparkLauncher.NO_RESOURCE;
0143           submitArgs = args.subList(1, args.size());
0144       }
0145 
0146       this.isExample = isExample;
0147       OptionParser parser = new OptionParser(true);
0148       parser.parse(submitArgs);
0149       this.isSpecialCommand = parser.isSpecialCommand;
0150     } else {
0151       this.isExample = isExample;
0152       this.isSpecialCommand = true;
0153     }
0154   }
0155 
0156   @Override
0157   public List<String> buildCommand(Map<String, String> env)
0158       throws IOException, IllegalArgumentException {
0159     if (PYSPARK_SHELL.equals(appResource) && !isSpecialCommand) {
0160       return buildPySparkShellCommand(env);
0161     } else if (SPARKR_SHELL.equals(appResource) && !isSpecialCommand) {
0162       return buildSparkRCommand(env);
0163     } else {
0164       return buildSparkSubmitCommand(env);
0165     }
0166   }
0167 
0168   List<String> buildSparkSubmitArgs() {
0169     List<String> args = new ArrayList<>();
0170     OptionParser parser = new OptionParser(false);
0171     final boolean isSpecialCommand;
0172 
0173     // If the user args array is not empty, we need to parse it to detect exactly what
0174     // the user is trying to run, so that checks below are correct.
0175     if (!userArgs.isEmpty()) {
0176       parser.parse(userArgs);
0177       isSpecialCommand = parser.isSpecialCommand;
0178     } else {
0179       isSpecialCommand = this.isSpecialCommand;
0180     }
0181 
0182     if (!allowsMixedArguments && !isSpecialCommand) {
0183       checkArgument(appResource != null, "Missing application resource.");
0184     }
0185 
0186     if (verbose) {
0187       args.add(parser.VERBOSE);
0188     }
0189 
0190     if (master != null) {
0191       args.add(parser.MASTER);
0192       args.add(master);
0193     }
0194 
0195     if (deployMode != null) {
0196       args.add(parser.DEPLOY_MODE);
0197       args.add(deployMode);
0198     }
0199 
0200     if (appName != null) {
0201       args.add(parser.NAME);
0202       args.add(appName);
0203     }
0204 
0205     for (Map.Entry<String, String> e : conf.entrySet()) {
0206       args.add(parser.CONF);
0207       args.add(String.format("%s=%s", e.getKey(), e.getValue()));
0208     }
0209 
0210     if (propertiesFile != null) {
0211       args.add(parser.PROPERTIES_FILE);
0212       args.add(propertiesFile);
0213     }
0214 
0215     if (isExample) {
0216       jars.addAll(findExamplesJars());
0217     }
0218 
0219     if (!jars.isEmpty()) {
0220       args.add(parser.JARS);
0221       args.add(join(",", jars));
0222     }
0223 
0224     if (!files.isEmpty()) {
0225       args.add(parser.FILES);
0226       args.add(join(",", files));
0227     }
0228 
0229     if (!pyFiles.isEmpty()) {
0230       args.add(parser.PY_FILES);
0231       args.add(join(",", pyFiles));
0232     }
0233 
0234     if (isExample && !isSpecialCommand) {
0235       checkArgument(mainClass != null, "Missing example class name.");
0236     }
0237 
0238     if (mainClass != null) {
0239       args.add(parser.CLASS);
0240       args.add(mainClass);
0241     }
0242 
0243     args.addAll(parsedArgs);
0244     if (appResource != null) {
0245       args.add(appResource);
0246     }
0247     args.addAll(appArgs);
0248 
0249     return args;
0250   }
0251 
0252   private List<String> buildSparkSubmitCommand(Map<String, String> env)
0253       throws IOException, IllegalArgumentException {
0254     // Load the properties file and check whether spark-submit will be running the app's driver
0255     // or just launching a cluster app. When running the driver, the JVM's argument will be
0256     // modified to cover the driver's configuration.
0257     Map<String, String> config = getEffectiveConfig();
0258     boolean isClientMode = isClientMode(config);
0259     String extraClassPath = isClientMode ? config.get(SparkLauncher.DRIVER_EXTRA_CLASSPATH) : null;
0260 
0261     List<String> cmd = buildJavaCommand(extraClassPath);
0262     // Take Thrift Server as daemon
0263     if (isThriftServer(mainClass)) {
0264       addOptionString(cmd, System.getenv("SPARK_DAEMON_JAVA_OPTS"));
0265     }
0266     addOptionString(cmd, System.getenv("SPARK_SUBMIT_OPTS"));
0267 
0268     // We don't want the client to specify Xmx. These have to be set by their corresponding
0269     // memory flag --driver-memory or configuration entry spark.driver.memory
0270     String driverDefaultJavaOptions = config.get(SparkLauncher.DRIVER_DEFAULT_JAVA_OPTIONS);
0271     checkJavaOptions(driverDefaultJavaOptions);
0272     String driverExtraJavaOptions = config.get(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS);
0273     checkJavaOptions(driverExtraJavaOptions);
0274 
0275     if (isClientMode) {
0276       // Figuring out where the memory value come from is a little tricky due to precedence.
0277       // Precedence is observed in the following order:
0278       // - explicit configuration (setConf()), which also covers --driver-memory cli argument.
0279       // - properties file.
0280       // - SPARK_DRIVER_MEMORY env variable
0281       // - SPARK_MEM env variable
0282       // - default value (1g)
0283       // Take Thrift Server as daemon
0284       String tsMemory =
0285         isThriftServer(mainClass) ? System.getenv("SPARK_DAEMON_MEMORY") : null;
0286       String memory = firstNonEmpty(tsMemory, config.get(SparkLauncher.DRIVER_MEMORY),
0287         System.getenv("SPARK_DRIVER_MEMORY"), System.getenv("SPARK_MEM"), DEFAULT_MEM);
0288       cmd.add("-Xmx" + memory);
0289       addOptionString(cmd, driverDefaultJavaOptions);
0290       addOptionString(cmd, driverExtraJavaOptions);
0291       mergeEnvPathList(env, getLibPathEnvName(),
0292         config.get(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH));
0293     }
0294 
0295     cmd.add("org.apache.spark.deploy.SparkSubmit");
0296     cmd.addAll(buildSparkSubmitArgs());
0297     return cmd;
0298   }
0299 
0300   private void checkJavaOptions(String javaOptions) {
0301     if (!isEmpty(javaOptions) && javaOptions.contains("Xmx")) {
0302       String msg = String.format("Not allowed to specify max heap(Xmx) memory settings through " +
0303         "java options (was %s). Use the corresponding --driver-memory or " +
0304         "spark.driver.memory configuration instead.", javaOptions);
0305       throw new IllegalArgumentException(msg);
0306     }
0307   }
0308 
0309   private List<String> buildPySparkShellCommand(Map<String, String> env) throws IOException {
0310     // For backwards compatibility, if a script is specified in
0311     // the pyspark command line, then run it using spark-submit.
0312     if (!appArgs.isEmpty() && appArgs.get(0).endsWith(".py")) {
0313       System.err.println(
0314         "Running python applications through 'pyspark' is not supported as of Spark 2.0.\n" +
0315         "Use ./bin/spark-submit <python file>");
0316       System.exit(-1);
0317     }
0318 
0319     checkArgument(appArgs.isEmpty(), "pyspark does not support any application options.");
0320 
0321     // When launching the pyspark shell, the spark-submit arguments should be stored in the
0322     // PYSPARK_SUBMIT_ARGS env variable.
0323     appResource = PYSPARK_SHELL_RESOURCE;
0324     constructEnvVarArgs(env, "PYSPARK_SUBMIT_ARGS");
0325 
0326     // Will pick up the binary executable in the following order
0327     // 1. conf spark.pyspark.driver.python
0328     // 2. conf spark.pyspark.python
0329     // 3. environment variable PYSPARK_DRIVER_PYTHON
0330     // 4. environment variable PYSPARK_PYTHON
0331     // 5. python
0332     List<String> pyargs = new ArrayList<>();
0333     pyargs.add(firstNonEmpty(conf.get(SparkLauncher.PYSPARK_DRIVER_PYTHON),
0334       conf.get(SparkLauncher.PYSPARK_PYTHON),
0335       System.getenv("PYSPARK_DRIVER_PYTHON"),
0336       System.getenv("PYSPARK_PYTHON"),
0337       "python"));
0338     String pyOpts = System.getenv("PYSPARK_DRIVER_PYTHON_OPTS");
0339     if (conf.containsKey(SparkLauncher.PYSPARK_PYTHON)) {
0340       // pass conf spark.pyspark.python to python by environment variable.
0341       env.put("PYSPARK_PYTHON", conf.get(SparkLauncher.PYSPARK_PYTHON));
0342     }
0343     if (!isEmpty(pyOpts)) {
0344       pyargs.addAll(parseOptionString(pyOpts));
0345     }
0346 
0347     return pyargs;
0348   }
0349 
0350   private List<String> buildSparkRCommand(Map<String, String> env) throws IOException {
0351     if (!appArgs.isEmpty() && (appArgs.get(0).endsWith(".R") || appArgs.get(0).endsWith(".r"))) {
0352       System.err.println(
0353         "Running R applications through 'sparkR' is not supported as of Spark 2.0.\n" +
0354         "Use ./bin/spark-submit <R file>");
0355       System.exit(-1);
0356     }
0357     // When launching the SparkR shell, store the spark-submit arguments in the SPARKR_SUBMIT_ARGS
0358     // env variable.
0359     appResource = SPARKR_SHELL_RESOURCE;
0360     constructEnvVarArgs(env, "SPARKR_SUBMIT_ARGS");
0361 
0362     // Set shell.R as R_PROFILE_USER to load the SparkR package when the shell comes up.
0363     String sparkHome = System.getenv("SPARK_HOME");
0364     env.put("R_PROFILE_USER",
0365             join(File.separator, sparkHome, "R", "lib", "SparkR", "profile", "shell.R"));
0366 
0367     List<String> args = new ArrayList<>();
0368     args.add(firstNonEmpty(conf.get(SparkLauncher.SPARKR_R_SHELL),
0369       System.getenv("SPARKR_DRIVER_R"), "R"));
0370     return args;
0371   }
0372 
0373   private void constructEnvVarArgs(
0374       Map<String, String> env,
0375       String submitArgsEnvVariable) throws IOException {
0376     mergeEnvPathList(env, getLibPathEnvName(),
0377       getEffectiveConfig().get(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH));
0378 
0379     StringBuilder submitArgs = new StringBuilder();
0380     for (String arg : buildSparkSubmitArgs()) {
0381       if (submitArgs.length() > 0) {
0382         submitArgs.append(" ");
0383       }
0384       submitArgs.append(quoteForCommandString(arg));
0385     }
0386     env.put(submitArgsEnvVariable, submitArgs.toString());
0387   }
0388 
0389   boolean isClientMode(Map<String, String> userProps) {
0390     String userMaster = firstNonEmpty(master, userProps.get(SparkLauncher.SPARK_MASTER));
0391     String userDeployMode = firstNonEmpty(deployMode, userProps.get(SparkLauncher.DEPLOY_MODE));
0392     // Default master is "local[*]", so assume client mode in that case
0393     return userMaster == null || userDeployMode == null || "client".equals(userDeployMode);
0394   }
0395 
0396   /**
0397    * Return whether the given main class represents a thrift server.
0398    */
0399   private boolean isThriftServer(String mainClass) {
0400     return (mainClass != null &&
0401       mainClass.equals("org.apache.spark.sql.hive.thriftserver.HiveThriftServer2"));
0402   }
0403 
0404   private List<String> findExamplesJars() {
0405     boolean isTesting = "1".equals(getenv("SPARK_TESTING"));
0406     List<String> examplesJars = new ArrayList<>();
0407     String sparkHome = getSparkHome();
0408 
0409     File jarsDir;
0410     if (new File(sparkHome, "RELEASE").isFile()) {
0411       jarsDir = new File(sparkHome, "examples/jars");
0412     } else {
0413       jarsDir = new File(sparkHome,
0414         String.format("examples/target/scala-%s/jars", getScalaVersion()));
0415     }
0416 
0417     boolean foundDir = jarsDir.isDirectory();
0418     checkState(isTesting || foundDir, "Examples jars directory '%s' does not exist.",
0419         jarsDir.getAbsolutePath());
0420 
0421     if (foundDir) {
0422       for (File f: jarsDir.listFiles()) {
0423         examplesJars.add(f.getAbsolutePath());
0424       }
0425     }
0426     return examplesJars;
0427   }
0428 
0429   private class OptionParser extends SparkSubmitOptionParser {
0430 
0431     boolean isSpecialCommand = false;
0432     private final boolean errorOnUnknownArgs;
0433 
0434     OptionParser(boolean errorOnUnknownArgs) {
0435       this.errorOnUnknownArgs = errorOnUnknownArgs;
0436     }
0437 
0438     @Override
0439     protected boolean handle(String opt, String value) {
0440       switch (opt) {
0441         case MASTER:
0442           master = value;
0443           break;
0444         case DEPLOY_MODE:
0445           deployMode = value;
0446           break;
0447         case PROPERTIES_FILE:
0448           propertiesFile = value;
0449           break;
0450         case DRIVER_MEMORY:
0451           conf.put(SparkLauncher.DRIVER_MEMORY, value);
0452           break;
0453         case DRIVER_JAVA_OPTIONS:
0454           conf.put(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, value);
0455           break;
0456         case DRIVER_LIBRARY_PATH:
0457           conf.put(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH, value);
0458           break;
0459         case DRIVER_CLASS_PATH:
0460           conf.put(SparkLauncher.DRIVER_EXTRA_CLASSPATH, value);
0461           break;
0462         case CONF:
0463           checkArgument(value != null, "Missing argument to %s", CONF);
0464           String[] setConf = value.split("=", 2);
0465           checkArgument(setConf.length == 2, "Invalid argument to %s: %s", CONF, value);
0466           conf.put(setConf[0], setConf[1]);
0467           break;
0468         case CLASS:
0469           // The special classes require some special command line handling, since they allow
0470           // mixing spark-submit arguments with arguments that should be propagated to the shell
0471           // itself. Note that for this to work, the "--class" argument must come before any
0472           // non-spark-submit arguments.
0473           mainClass = value;
0474           if (specialClasses.containsKey(value)) {
0475             allowsMixedArguments = true;
0476             appResource = specialClasses.get(value);
0477           }
0478           break;
0479         case KILL_SUBMISSION:
0480         case STATUS:
0481           isSpecialCommand = true;
0482           parsedArgs.add(opt);
0483           parsedArgs.add(value);
0484           break;
0485         case HELP:
0486         case USAGE_ERROR:
0487         case VERSION:
0488           isSpecialCommand = true;
0489           parsedArgs.add(opt);
0490           break;
0491         default:
0492           parsedArgs.add(opt);
0493           if (value != null) {
0494             parsedArgs.add(value);
0495           }
0496           break;
0497       }
0498       return true;
0499     }
0500 
0501     @Override
0502     protected boolean handleUnknown(String opt) {
0503       // When mixing arguments, add unrecognized parameters directly to the user arguments list. In
0504       // normal mode, any unrecognized parameter triggers the end of command line parsing, and the
0505       // parameter itself will be interpreted by SparkSubmit as the application resource. The
0506       // remaining params will be appended to the list of SparkSubmit arguments.
0507       if (allowsMixedArguments) {
0508         appArgs.add(opt);
0509         return true;
0510       } else if (isExample) {
0511         String className = opt;
0512         if (!className.startsWith(EXAMPLE_CLASS_PREFIX)) {
0513           className = EXAMPLE_CLASS_PREFIX + className;
0514         }
0515         mainClass = className;
0516         appResource = SparkLauncher.NO_RESOURCE;
0517         return false;
0518       } else if (errorOnUnknownArgs) {
0519         checkArgument(!opt.startsWith("-"), "Unrecognized option: %s", opt);
0520         checkState(appResource == null, "Found unrecognized argument but resource is already set.");
0521         appResource = opt;
0522         return false;
0523       }
0524       return true;
0525     }
0526 
0527     @Override
0528     protected void handleExtraArgs(List<String> extra) {
0529       appArgs.addAll(extra);
0530     }
0531 
0532   }
0533 
0534 }