Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.launcher;
0019 
0020 import java.io.IOException;
0021 import java.util.Arrays;
0022 import java.util.List;
0023 
0024 import static org.apache.spark.launcher.CommandBuilderUtils.*;
0025 
0026 /**
0027  * Base class for launcher implementations.
0028  *
0029  * @since Spark 2.3.0
0030  */
0031 public abstract class AbstractLauncher<T extends AbstractLauncher<T>> {
0032 
0033   final SparkSubmitCommandBuilder builder;
0034 
0035   AbstractLauncher() {
0036     this.builder = new SparkSubmitCommandBuilder();
0037   }
0038 
0039   /**
0040    * Set a custom properties file with Spark configuration for the application.
0041    *
0042    * @param path Path to custom properties file to use.
0043    * @return This launcher.
0044    */
0045   public T setPropertiesFile(String path) {
0046     checkNotNull(path, "path");
0047     builder.setPropertiesFile(path);
0048     return self();
0049   }
0050 
0051   /**
0052    * Set a single configuration value for the application.
0053    *
0054    * @param key Configuration key.
0055    * @param value The value to use.
0056    * @return This launcher.
0057    */
0058   public T setConf(String key, String value) {
0059     checkNotNull(key, "key");
0060     checkNotNull(value, "value");
0061     checkArgument(key.startsWith("spark."), "'key' must start with 'spark.'");
0062     builder.conf.put(key, value);
0063     return self();
0064   }
0065 
0066   /**
0067    * Set the application name.
0068    *
0069    * @param appName Application name.
0070    * @return This launcher.
0071    */
0072   public T setAppName(String appName) {
0073     checkNotNull(appName, "appName");
0074     builder.appName = appName;
0075     return self();
0076   }
0077 
0078   /**
0079    * Set the Spark master for the application.
0080    *
0081    * @param master Spark master.
0082    * @return This launcher.
0083    */
0084   public T setMaster(String master) {
0085     checkNotNull(master, "master");
0086     builder.master = master;
0087     return self();
0088   }
0089 
0090   /**
0091    * Set the deploy mode for the application.
0092    *
0093    * @param mode Deploy mode.
0094    * @return This launcher.
0095    */
0096   public T setDeployMode(String mode) {
0097     checkNotNull(mode, "mode");
0098     builder.deployMode = mode;
0099     return self();
0100   }
0101 
0102   /**
0103    * Set the main application resource. This should be the location of a jar file for Scala/Java
0104    * applications, or a python script for PySpark applications.
0105    *
0106    * @param resource Path to the main application resource.
0107    * @return This launcher.
0108    */
0109   public T setAppResource(String resource) {
0110     checkNotNull(resource, "resource");
0111     builder.appResource = resource;
0112     return self();
0113   }
0114 
0115   /**
0116    * Sets the application class name for Java/Scala applications.
0117    *
0118    * @param mainClass Application's main class.
0119    * @return This launcher.
0120    */
0121   public T setMainClass(String mainClass) {
0122     checkNotNull(mainClass, "mainClass");
0123     builder.mainClass = mainClass;
0124     return self();
0125   }
0126 
0127   /**
0128    * Adds a no-value argument to the Spark invocation. If the argument is known, this method
0129    * validates whether the argument is indeed a no-value argument, and throws an exception
0130    * otherwise.
0131    * <p>
0132    * Use this method with caution. It is possible to create an invalid Spark command by passing
0133    * unknown arguments to this method, since those are allowed for forward compatibility.
0134    *
0135    * @since 1.5.0
0136    * @param arg Argument to add.
0137    * @return This launcher.
0138    */
0139   public T addSparkArg(String arg) {
0140     SparkSubmitOptionParser validator = new ArgumentValidator(false);
0141     validator.parse(Arrays.asList(arg));
0142     builder.userArgs.add(arg);
0143     return self();
0144   }
0145 
0146   /**
0147    * Adds an argument with a value to the Spark invocation. If the argument name corresponds to
0148    * a known argument, the code validates that the argument actually expects a value, and throws
0149    * an exception otherwise.
0150    * <p>
0151    * It is safe to add arguments modified by other methods in this class (such as
0152    * {@link #setMaster(String)} - the last invocation will be the one to take effect.
0153    * <p>
0154    * Use this method with caution. It is possible to create an invalid Spark command by passing
0155    * unknown arguments to this method, since those are allowed for forward compatibility.
0156    *
0157    * @since 1.5.0
0158    * @param name Name of argument to add.
0159    * @param value Value of the argument.
0160    * @return This launcher.
0161    */
0162   public T addSparkArg(String name, String value) {
0163     SparkSubmitOptionParser validator = new ArgumentValidator(true);
0164     if (validator.MASTER.equals(name)) {
0165       setMaster(value);
0166     } else if (validator.PROPERTIES_FILE.equals(name)) {
0167       setPropertiesFile(value);
0168     } else if (validator.CONF.equals(name)) {
0169       String[] vals = value.split("=", 2);
0170       setConf(vals[0], vals[1]);
0171     } else if (validator.CLASS.equals(name)) {
0172       setMainClass(value);
0173     } else if (validator.JARS.equals(name)) {
0174       builder.jars.clear();
0175       for (String jar : value.split(",")) {
0176         addJar(jar);
0177       }
0178     } else if (validator.FILES.equals(name)) {
0179       builder.files.clear();
0180       for (String file : value.split(",")) {
0181         addFile(file);
0182       }
0183     } else if (validator.PY_FILES.equals(name)) {
0184       builder.pyFiles.clear();
0185       for (String file : value.split(",")) {
0186         addPyFile(file);
0187       }
0188     } else {
0189       validator.parse(Arrays.asList(name, value));
0190       builder.userArgs.add(name);
0191       builder.userArgs.add(value);
0192     }
0193     return self();
0194   }
0195 
0196   /**
0197    * Adds command line arguments for the application.
0198    *
0199    * @param args Arguments to pass to the application's main class.
0200    * @return This launcher.
0201    */
0202   public T addAppArgs(String... args) {
0203     for (String arg : args) {
0204       checkNotNull(arg, "arg");
0205       builder.appArgs.add(arg);
0206     }
0207     return self();
0208   }
0209 
0210   /**
0211    * Adds a jar file to be submitted with the application.
0212    *
0213    * @param jar Path to the jar file.
0214    * @return This launcher.
0215    */
0216   public T addJar(String jar) {
0217     checkNotNull(jar, "jar");
0218     builder.jars.add(jar);
0219     return self();
0220   }
0221 
0222   /**
0223    * Adds a file to be submitted with the application.
0224    *
0225    * @param file Path to the file.
0226    * @return This launcher.
0227    */
0228   public T addFile(String file) {
0229     checkNotNull(file, "file");
0230     builder.files.add(file);
0231     return self();
0232   }
0233 
0234   /**
0235    * Adds a python file / zip / egg to be submitted with the application.
0236    *
0237    * @param file Path to the file.
0238    * @return This launcher.
0239    */
0240   public T addPyFile(String file) {
0241     checkNotNull(file, "file");
0242     builder.pyFiles.add(file);
0243     return self();
0244   }
0245 
0246   /**
0247    * Enables verbose reporting for SparkSubmit.
0248    *
0249    * @param verbose Whether to enable verbose output.
0250    * @return This launcher.
0251    */
0252   public T setVerbose(boolean verbose) {
0253     builder.verbose = verbose;
0254     return self();
0255   }
0256 
0257   /**
0258    * Starts a Spark application.
0259    *
0260    * <p>
0261    * This method returns a handle that provides information about the running application and can
0262    * be used to do basic interaction with it.
0263    * <p>
0264    * The returned handle assumes that the application will instantiate a single SparkContext
0265    * during its lifetime. Once that context reports a final state (one that indicates the
0266    * SparkContext has stopped), the handle will not perform new state transitions, so anything
0267    * that happens after that cannot be monitored. If the underlying application is launched as
0268    * a child process, {@link SparkAppHandle#kill()} can still be used to kill the child process.
0269    *
0270    * @since 1.6.0
0271    * @param listeners Listeners to add to the handle before the app is launched.
0272    * @return A handle for the launched application.
0273    */
0274   public abstract SparkAppHandle startApplication(SparkAppHandle.Listener... listeners)
0275     throws IOException;
0276 
0277   abstract T self();
0278 
0279   private static class ArgumentValidator extends SparkSubmitOptionParser {
0280 
0281     private final boolean hasValue;
0282 
0283     ArgumentValidator(boolean hasValue) {
0284       this.hasValue = hasValue;
0285     }
0286 
0287     @Override
0288     protected boolean handle(String opt, String value) {
0289       if (value == null && hasValue) {
0290         throw new IllegalArgumentException(String.format("'%s' expects a value.", opt));
0291       }
0292       return true;
0293     }
0294 
0295     @Override
0296     protected boolean handleUnknown(String opt) {
0297       // Do not fail on unknown arguments, to support future arguments added to SparkSubmit.
0298       return true;
0299     }
0300 
0301     protected void handleExtraArgs(List<String> extra) {
0302       // No op.
0303     }
0304 
0305   }
0306 
0307 }