Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.launcher;
0019 
0020 import java.io.File;
0021 import java.io.IOException;
0022 import java.io.InputStream;
0023 import java.util.ArrayList;
0024 import java.util.HashMap;
0025 import java.util.List;
0026 import java.util.Map;
0027 import java.util.concurrent.ThreadFactory;
0028 import java.util.concurrent.atomic.AtomicInteger;
0029 import java.util.logging.Level;
0030 import java.util.logging.Logger;
0031 
0032 import static org.apache.spark.launcher.CommandBuilderUtils.*;
0033 import static org.apache.spark.launcher.CommandBuilderUtils.join;
0034 
0035 /**
0036  * Launcher for Spark applications.
0037  * <p>
0038  * Use this class to start Spark applications programmatically. The class uses a builder pattern
0039  * to allow clients to configure the Spark application and launch it as a child process.
0040  * </p>
0041  */
0042 public class SparkLauncher extends AbstractLauncher<SparkLauncher> {
0043 
0044   private static final Logger LOG = Logger.getLogger(SparkLauncher.class.getName());
0045 
0046   /** The Spark master. */
0047   public static final String SPARK_MASTER = "spark.master";
0048 
0049   /** The Spark deploy mode. */
0050   public static final String DEPLOY_MODE = "spark.submit.deployMode";
0051 
0052   /** Configuration key for the driver memory. */
0053   public static final String DRIVER_MEMORY = "spark.driver.memory";
0054   /** Configuration key for the driver class path. */
0055   public static final String DRIVER_EXTRA_CLASSPATH = "spark.driver.extraClassPath";
0056   /** Configuration key for the default driver VM options. */
0057   public static final String DRIVER_DEFAULT_JAVA_OPTIONS = "spark.driver.defaultJavaOptions";
0058   /** Configuration key for the driver VM options. */
0059   public static final String DRIVER_EXTRA_JAVA_OPTIONS = "spark.driver.extraJavaOptions";
0060   /** Configuration key for the driver native library path. */
0061   public static final String DRIVER_EXTRA_LIBRARY_PATH = "spark.driver.extraLibraryPath";
0062 
0063   /** Configuration key for the executor memory. */
0064   public static final String EXECUTOR_MEMORY = "spark.executor.memory";
0065   /** Configuration key for the executor class path. */
0066   public static final String EXECUTOR_EXTRA_CLASSPATH = "spark.executor.extraClassPath";
0067   /** Configuration key for the default executor VM options. */
0068   public static final String EXECUTOR_DEFAULT_JAVA_OPTIONS = "spark.executor.defaultJavaOptions";
0069   /** Configuration key for the executor VM options. */
0070   public static final String EXECUTOR_EXTRA_JAVA_OPTIONS = "spark.executor.extraJavaOptions";
0071   /** Configuration key for the executor native library path. */
0072   public static final String EXECUTOR_EXTRA_LIBRARY_PATH = "spark.executor.extraLibraryPath";
0073   /** Configuration key for the number of executor CPU cores. */
0074   public static final String EXECUTOR_CORES = "spark.executor.cores";
0075 
0076   static final String PYSPARK_DRIVER_PYTHON = "spark.pyspark.driver.python";
0077 
0078   static final String PYSPARK_PYTHON = "spark.pyspark.python";
0079 
0080   static final String SPARKR_R_SHELL = "spark.r.shell.command";
0081 
0082   /** Logger name to use when launching a child process. */
0083   public static final String CHILD_PROCESS_LOGGER_NAME = "spark.launcher.childProcLoggerName";
0084 
0085   /**
0086    * A special value for the resource that tells Spark to not try to process the app resource as a
0087    * file. This is useful when the class being executed is added to the application using other
0088    * means - for example, by adding jars using the package download feature.
0089    */
0090   public static final String NO_RESOURCE = "spark-internal";
0091 
0092   /**
0093    * Maximum time (in ms) to wait for a child process to connect back to the launcher server
0094    * when using @link{#start()}.
0095    */
0096   public static final String CHILD_CONNECTION_TIMEOUT = "spark.launcher.childConectionTimeout";
0097 
0098   /** Used internally to create unique logger names. */
0099   private static final AtomicInteger COUNTER = new AtomicInteger();
0100 
0101   /** Factory for creating OutputRedirector threads. **/
0102   static final ThreadFactory REDIRECTOR_FACTORY = new NamedThreadFactory("launcher-proc-%d");
0103 
0104   static final Map<String, String> launcherConfig = new HashMap<>();
0105 
0106   /**
0107    * Set a configuration value for the launcher library. These config values do not affect the
0108    * launched application, but rather the behavior of the launcher library itself when managing
0109    * applications.
0110    *
0111    * @since 1.6.0
0112    * @param name Config name.
0113    * @param value Config value.
0114    */
0115   public static void setConfig(String name, String value) {
0116     launcherConfig.put(name, value);
0117   }
0118 
0119   // Visible for testing.
0120   File workingDir;
0121   boolean redirectErrorStream;
0122   ProcessBuilder.Redirect errorStream;
0123   ProcessBuilder.Redirect outputStream;
0124 
0125   public SparkLauncher() {
0126     this(null);
0127   }
0128 
0129   /**
0130    * Creates a launcher that will set the given environment variables in the child.
0131    *
0132    * @param env Environment variables to set.
0133    */
0134   public SparkLauncher(Map<String, String> env) {
0135     if (env != null) {
0136       this.builder.childEnv.putAll(env);
0137     }
0138   }
0139 
0140   /**
0141    * Set a custom JAVA_HOME for launching the Spark application.
0142    *
0143    * @param javaHome Path to the JAVA_HOME to use.
0144    * @return This launcher.
0145    */
0146   public SparkLauncher setJavaHome(String javaHome) {
0147     checkNotNull(javaHome, "javaHome");
0148     builder.javaHome = javaHome;
0149     return this;
0150   }
0151 
0152   /**
0153    * Set a custom Spark installation location for the application.
0154    *
0155    * @param sparkHome Path to the Spark installation to use.
0156    * @return This launcher.
0157    */
0158   public SparkLauncher setSparkHome(String sparkHome) {
0159     checkNotNull(sparkHome, "sparkHome");
0160     builder.childEnv.put(ENV_SPARK_HOME, sparkHome);
0161     return this;
0162   }
0163 
0164   /**
0165    * Sets the working directory of spark-submit.
0166    *
0167    * @param dir The directory to set as spark-submit's working directory.
0168    * @return This launcher.
0169    */
0170   public SparkLauncher directory(File dir) {
0171     workingDir = dir;
0172     return this;
0173   }
0174 
0175   /**
0176    * Specifies that stderr in spark-submit should be redirected to stdout.
0177    *
0178    * @return This launcher.
0179    */
0180   public SparkLauncher redirectError() {
0181     redirectErrorStream = true;
0182     return this;
0183   }
0184 
0185   /**
0186    * Redirects error output to the specified Redirect.
0187    *
0188    * @param to The method of redirection.
0189    * @return This launcher.
0190    */
0191   public SparkLauncher redirectError(ProcessBuilder.Redirect to) {
0192     errorStream = to;
0193     return this;
0194   }
0195 
0196   /**
0197    * Redirects standard output to the specified Redirect.
0198    *
0199    * @param to The method of redirection.
0200    * @return This launcher.
0201    */
0202   public SparkLauncher redirectOutput(ProcessBuilder.Redirect to) {
0203     outputStream = to;
0204     return this;
0205   }
0206 
0207   /**
0208    * Redirects error output to the specified File.
0209    *
0210    * @param errFile The file to which stderr is written.
0211    * @return This launcher.
0212    */
0213   public SparkLauncher redirectError(File errFile) {
0214     errorStream = ProcessBuilder.Redirect.to(errFile);
0215     return this;
0216   }
0217 
0218   /**
0219    * Redirects error output to the specified File.
0220    *
0221    * @param outFile The file to which stdout is written.
0222    * @return This launcher.
0223    */
0224   public SparkLauncher redirectOutput(File outFile) {
0225     outputStream = ProcessBuilder.Redirect.to(outFile);
0226     return this;
0227   }
0228 
0229   /**
0230    * Sets all output to be logged and redirected to a logger with the specified name.
0231    *
0232    * @param loggerName The name of the logger to log stdout and stderr.
0233    * @return This launcher.
0234    */
0235   public SparkLauncher redirectToLog(String loggerName) {
0236     setConf(CHILD_PROCESS_LOGGER_NAME, loggerName);
0237     return this;
0238   }
0239 
0240   // The following methods just delegate to the parent class, but they are needed to keep
0241   // binary compatibility with previous versions of this class.
0242 
0243   @Override
0244   public SparkLauncher setPropertiesFile(String path) {
0245     return super.setPropertiesFile(path);
0246   }
0247 
0248   @Override
0249   public SparkLauncher setConf(String key, String value) {
0250     return super.setConf(key, value);
0251   }
0252 
0253   @Override
0254   public SparkLauncher setAppName(String appName) {
0255     return super.setAppName(appName);
0256   }
0257 
0258   @Override
0259   public SparkLauncher setMaster(String master) {
0260     return super.setMaster(master);
0261   }
0262 
0263   @Override
0264   public SparkLauncher setDeployMode(String mode) {
0265     return super.setDeployMode(mode);
0266   }
0267 
0268   @Override
0269   public SparkLauncher setAppResource(String resource) {
0270     return super.setAppResource(resource);
0271   }
0272 
0273   @Override
0274   public SparkLauncher setMainClass(String mainClass) {
0275     return super.setMainClass(mainClass);
0276   }
0277 
0278   @Override
0279   public SparkLauncher addSparkArg(String arg) {
0280     return super.addSparkArg(arg);
0281   }
0282 
0283   @Override
0284   public SparkLauncher addSparkArg(String name, String value) {
0285     return super.addSparkArg(name, value);
0286   }
0287 
0288   @Override
0289   public SparkLauncher addAppArgs(String... args) {
0290     return super.addAppArgs(args);
0291   }
0292 
0293   @Override
0294   public SparkLauncher addJar(String jar) {
0295     return super.addJar(jar);
0296   }
0297 
0298   @Override
0299   public SparkLauncher addFile(String file) {
0300     return super.addFile(file);
0301   }
0302 
0303   @Override
0304   public SparkLauncher addPyFile(String file) {
0305     return super.addPyFile(file);
0306   }
0307 
0308   @Override
0309   public SparkLauncher setVerbose(boolean verbose) {
0310     return super.setVerbose(verbose);
0311   }
0312 
0313   /**
0314    * Launches a sub-process that will start the configured Spark application.
0315    * <p>
0316    * The {@link #startApplication(SparkAppHandle.Listener...)} method is preferred when launching
0317    * Spark, since it provides better control of the child application.
0318    *
0319    * @return A process handle for the Spark app.
0320    */
0321   public Process launch() throws IOException {
0322     ProcessBuilder pb = createBuilder();
0323 
0324     boolean outputToLog = outputStream == null;
0325     boolean errorToLog = !redirectErrorStream && errorStream == null;
0326 
0327     String loggerName = getLoggerName();
0328     if (loggerName != null && outputToLog && errorToLog) {
0329       pb.redirectErrorStream(true);
0330     }
0331 
0332     Process childProc = pb.start();
0333     if (loggerName != null) {
0334       InputStream logStream = outputToLog ? childProc.getInputStream() : childProc.getErrorStream();
0335       new OutputRedirector(logStream, loggerName, REDIRECTOR_FACTORY);
0336     }
0337 
0338     return childProc;
0339   }
0340 
0341   /**
0342    * Starts a Spark application.
0343    *
0344    * <p>
0345    * Applications launched by this launcher run as child processes. The child's stdout and stderr
0346    * are merged and written to a logger (see <code>java.util.logging</code>) only if redirection
0347    * has not otherwise been configured on this <code>SparkLauncher</code>. The logger's name can be
0348    * defined by setting {@link #CHILD_PROCESS_LOGGER_NAME} in the app's configuration. If that
0349    * option is not set, the code will try to derive a name from the application's name or main
0350    * class / script file. If those cannot be determined, an internal, unique name will be used.
0351    * In all cases, the logger name will start with "org.apache.spark.launcher.app", to fit more
0352    * easily into the configuration of commonly-used logging systems.
0353    *
0354    * @since 1.6.0
0355    * @see AbstractLauncher#startApplication(SparkAppHandle.Listener...)
0356    * @param listeners Listeners to add to the handle before the app is launched.
0357    * @return A handle for the launched application.
0358    */
0359   @Override
0360   public SparkAppHandle startApplication(SparkAppHandle.Listener... listeners) throws IOException {
0361     LauncherServer server = LauncherServer.getOrCreateServer();
0362     ChildProcAppHandle handle = new ChildProcAppHandle(server);
0363     for (SparkAppHandle.Listener l : listeners) {
0364       handle.addListener(l);
0365     }
0366 
0367     String secret = server.registerHandle(handle);
0368 
0369     String loggerName = getLoggerName();
0370     ProcessBuilder pb = createBuilder();
0371     if (LOG.isLoggable(Level.FINE)) {
0372       LOG.fine(String.format("Launching Spark application:%n%s", join(" ", pb.command())));
0373     }
0374 
0375     boolean outputToLog = outputStream == null;
0376     boolean errorToLog = !redirectErrorStream && errorStream == null;
0377 
0378     // Only setup stderr + stdout to logger redirection if user has not otherwise configured output
0379     // redirection.
0380     if (loggerName == null && (outputToLog || errorToLog)) {
0381       String appName;
0382       if (builder.appName != null) {
0383         appName = builder.appName;
0384       } else if (builder.mainClass != null) {
0385         int dot = builder.mainClass.lastIndexOf(".");
0386         if (dot >= 0 && dot < builder.mainClass.length() - 1) {
0387           appName = builder.mainClass.substring(dot + 1, builder.mainClass.length());
0388         } else {
0389           appName = builder.mainClass;
0390         }
0391       } else if (builder.appResource != null) {
0392         appName = new File(builder.appResource).getName();
0393       } else {
0394         appName = String.valueOf(COUNTER.incrementAndGet());
0395       }
0396       String loggerPrefix = getClass().getPackage().getName();
0397       loggerName = String.format("%s.app.%s", loggerPrefix, appName);
0398     }
0399 
0400     if (outputToLog && errorToLog) {
0401       pb.redirectErrorStream(true);
0402     }
0403 
0404     pb.environment().put(LauncherProtocol.ENV_LAUNCHER_PORT, String.valueOf(server.getPort()));
0405     pb.environment().put(LauncherProtocol.ENV_LAUNCHER_SECRET, secret);
0406     try {
0407       Process child = pb.start();
0408       InputStream logStream = null;
0409       if (loggerName != null) {
0410         logStream = outputToLog ? child.getInputStream() : child.getErrorStream();
0411       }
0412       handle.setChildProc(child, loggerName, logStream);
0413     } catch (IOException ioe) {
0414       handle.kill();
0415       throw ioe;
0416     }
0417 
0418     return handle;
0419   }
0420 
0421   private ProcessBuilder createBuilder() throws IOException {
0422     List<String> cmd = new ArrayList<>();
0423     cmd.add(findSparkSubmit());
0424     cmd.addAll(builder.buildSparkSubmitArgs());
0425 
0426     // Since the child process is a batch script, let's quote things so that special characters are
0427     // preserved, otherwise the batch interpreter will mess up the arguments. Batch scripts are
0428     // weird.
0429     if (isWindows()) {
0430       List<String> winCmd = new ArrayList<>();
0431       for (String arg : cmd) {
0432         winCmd.add(quoteForBatchScript(arg));
0433       }
0434       cmd = winCmd;
0435     }
0436 
0437     ProcessBuilder pb = new ProcessBuilder(cmd.toArray(new String[cmd.size()]));
0438     for (Map.Entry<String, String> e : builder.childEnv.entrySet()) {
0439       pb.environment().put(e.getKey(), e.getValue());
0440     }
0441 
0442     if (workingDir != null) {
0443       pb.directory(workingDir);
0444     }
0445 
0446     // Only one of redirectError and redirectError(...) can be specified.
0447     // Similarly, if redirectToLog is specified, no other redirections should be specified.
0448     checkState(!redirectErrorStream || errorStream == null,
0449       "Cannot specify both redirectError() and redirectError(...) ");
0450     checkState(getLoggerName() == null ||
0451       ((!redirectErrorStream && errorStream == null) || outputStream == null),
0452       "Cannot used redirectToLog() in conjunction with other redirection methods.");
0453 
0454     if (redirectErrorStream) {
0455       pb.redirectErrorStream(true);
0456     }
0457     if (errorStream != null) {
0458       pb.redirectError(errorStream);
0459     }
0460     if (outputStream != null) {
0461       pb.redirectOutput(outputStream);
0462     }
0463 
0464     return pb;
0465   }
0466 
0467   @Override
0468   SparkLauncher self() {
0469     return this;
0470   }
0471 
0472   // Visible for testing.
0473   String findSparkSubmit() {
0474     String script = isWindows() ? "spark-submit.cmd" : "spark-submit";
0475     return join(File.separator, builder.getSparkHome(), "bin", script);
0476   }
0477 
0478   private String getLoggerName() throws IOException {
0479     return builder.getEffectiveConfig().get(CHILD_PROCESS_LOGGER_NAME);
0480   }
0481 
0482 }