Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.launcher;
0019 
0020 import java.io.File;
0021 import java.util.ArrayList;
0022 import java.util.Arrays;
0023 import java.util.Collections;
0024 import java.util.HashMap;
0025 import java.util.List;
0026 import java.util.Map;
0027 import java.util.regex.Pattern;
0028 
0029 import org.junit.AfterClass;
0030 import org.junit.BeforeClass;
0031 import org.junit.Rule;
0032 import org.junit.Test;
0033 import org.junit.rules.ExpectedException;
0034 
0035 import static org.junit.Assert.*;
0036 
0037 public class SparkSubmitCommandBuilderSuite extends BaseSuite {
0038 
0039   private static File dummyPropsFile;
0040   private static SparkSubmitOptionParser parser;
0041 
0042   @Rule
0043   public ExpectedException expectedException = ExpectedException.none();
0044 
0045   @BeforeClass
0046   public static void setUp() throws Exception {
0047     dummyPropsFile = File.createTempFile("spark", "properties");
0048     parser = new SparkSubmitOptionParser();
0049   }
0050 
0051   @AfterClass
0052   public static void cleanUp() throws Exception {
0053     dummyPropsFile.delete();
0054   }
0055 
0056   @Test
0057   public void testDriverCmdBuilder() throws Exception {
0058     testCmdBuilder(true, true);
0059     testCmdBuilder(true, false);
0060   }
0061 
0062   @Test
0063   public void testClusterCmdBuilder() throws Exception {
0064     testCmdBuilder(false, true);
0065     testCmdBuilder(false, false);
0066   }
0067 
0068   @Test
0069   public void testCliHelpAndNoArg() throws Exception {
0070     List<String> helpArgs = Arrays.asList(parser.HELP);
0071     Map<String, String> env = new HashMap<>();
0072     List<String> cmd = buildCommand(helpArgs, env);
0073     assertTrue("--help should be contained in the final cmd.", cmd.contains(parser.HELP));
0074 
0075     List<String> sparkEmptyArgs = Collections.emptyList();
0076     cmd = buildCommand(sparkEmptyArgs, env);
0077     assertTrue(
0078       "org.apache.spark.deploy.SparkSubmit should be contained in the final cmd of empty input.",
0079       cmd.contains("org.apache.spark.deploy.SparkSubmit"));
0080   }
0081 
0082   @Test
0083   public void testCliKillAndStatus() throws Exception {
0084     List<String> params = Arrays.asList("driver-20160531171222-0000");
0085     testCLIOpts(null, parser.STATUS, params);
0086     testCLIOpts(null, parser.KILL_SUBMISSION, params);
0087     testCLIOpts(SparkSubmitCommandBuilder.RUN_EXAMPLE, parser.STATUS, params);
0088     testCLIOpts(SparkSubmitCommandBuilder.RUN_EXAMPLE, parser.KILL_SUBMISSION, params);
0089   }
0090 
0091   @Test
0092   public void testCliParser() throws Exception {
0093     List<String> sparkSubmitArgs = Arrays.asList(
0094       parser.MASTER,
0095       "local",
0096       parser.DRIVER_MEMORY,
0097       "42g",
0098       parser.DRIVER_CLASS_PATH,
0099       "/driverCp",
0100       parser.DRIVER_JAVA_OPTIONS,
0101       "extraJavaOpt",
0102       parser.CONF,
0103       "spark.randomOption=foo",
0104       parser.CONF,
0105       SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH + "=/driverLibPath",
0106       SparkLauncher.NO_RESOURCE);
0107     Map<String, String> env = new HashMap<>();
0108     List<String> cmd = buildCommand(sparkSubmitArgs, env);
0109 
0110     assertTrue(findInStringList(env.get(CommandBuilderUtils.getLibPathEnvName()),
0111         File.pathSeparator, "/driverLibPath"));
0112     assertTrue(findInStringList(findArgValue(cmd, "-cp"), File.pathSeparator, "/driverCp"));
0113     assertTrue("Driver -Xmx should be configured.", cmd.contains("-Xmx42g"));
0114     assertTrue("Command should contain user-defined conf.",
0115       Collections.indexOfSubList(cmd, Arrays.asList(parser.CONF, "spark.randomOption=foo")) > 0);
0116   }
0117 
0118   @Test
0119   public void testShellCliParser() throws Exception {
0120     List<String> sparkSubmitArgs = Arrays.asList(
0121       parser.CLASS,
0122       "org.apache.spark.repl.Main",
0123       parser.MASTER,
0124       "foo",
0125       "--app-arg",
0126       "bar",
0127       "--app-switch",
0128       parser.FILES,
0129       "baz",
0130       parser.NAME,
0131       "appName");
0132 
0133     List<String> args = newCommandBuilder(sparkSubmitArgs).buildSparkSubmitArgs();
0134     List<String> expected = Arrays.asList("spark-shell", "--app-arg", "bar", "--app-switch");
0135     assertEquals(expected, args.subList(args.size() - expected.size(), args.size()));
0136   }
0137 
0138   @Test
0139   public void testAlternateSyntaxParsing() throws Exception {
0140     List<String> sparkSubmitArgs = Arrays.asList(
0141       parser.CLASS + "=org.my.Class",
0142       parser.MASTER + "=foo",
0143       parser.DEPLOY_MODE + "=bar",
0144       SparkLauncher.NO_RESOURCE);
0145 
0146     List<String> cmd = newCommandBuilder(sparkSubmitArgs).buildSparkSubmitArgs();
0147     assertEquals("org.my.Class", findArgValue(cmd, parser.CLASS));
0148     assertEquals("foo", findArgValue(cmd, parser.MASTER));
0149     assertEquals("bar", findArgValue(cmd, parser.DEPLOY_MODE));
0150   }
0151 
0152   @Test
0153   public void testPySparkLauncher() throws Exception {
0154     List<String> sparkSubmitArgs = Arrays.asList(
0155       SparkSubmitCommandBuilder.PYSPARK_SHELL,
0156       "--master=foo",
0157       "--deploy-mode=bar");
0158 
0159     Map<String, String> env = new HashMap<>();
0160     List<String> cmd = buildCommand(sparkSubmitArgs, env);
0161     assertTrue(Arrays.asList("python", "python2", "python3").contains(cmd.get(cmd.size() - 1)));
0162     assertEquals(
0163       String.format("\"%s\" \"foo\" \"%s\" \"bar\" \"%s\"",
0164         parser.MASTER, parser.DEPLOY_MODE, SparkSubmitCommandBuilder.PYSPARK_SHELL_RESOURCE),
0165       env.get("PYSPARK_SUBMIT_ARGS"));
0166   }
0167 
0168   @Test
0169   public void testPySparkFallback() throws Exception {
0170     List<String> sparkSubmitArgs = Arrays.asList(
0171       "--master=foo",
0172       "--deploy-mode=bar",
0173       "script.py",
0174       "arg1");
0175 
0176     Map<String, String> env = new HashMap<>();
0177     List<String> cmd = buildCommand(sparkSubmitArgs, env);
0178 
0179     assertEquals("foo", findArgValue(cmd, "--master"));
0180     assertEquals("bar", findArgValue(cmd, "--deploy-mode"));
0181     assertEquals("script.py", cmd.get(cmd.size() - 2));
0182     assertEquals("arg1", cmd.get(cmd.size() - 1));
0183   }
0184 
0185   @Test
0186   public void testSparkRShell() throws Exception {
0187     List<String> sparkSubmitArgs = Arrays.asList(
0188       SparkSubmitCommandBuilder.SPARKR_SHELL,
0189       "--master=foo",
0190       "--deploy-mode=bar",
0191       "--conf", "spark.r.shell.command=/usr/bin/R");
0192 
0193     Map<String, String> env = new HashMap<>();
0194     List<String> cmd = buildCommand(sparkSubmitArgs, env);
0195     assertEquals("/usr/bin/R", cmd.get(cmd.size() - 1));
0196     assertEquals(
0197       String.format(
0198         "\"%s\" \"foo\" \"%s\" \"bar\" \"--conf\" \"spark.r.shell.command=/usr/bin/R\" \"%s\"",
0199         parser.MASTER, parser.DEPLOY_MODE, SparkSubmitCommandBuilder.SPARKR_SHELL_RESOURCE),
0200       env.get("SPARKR_SUBMIT_ARGS"));
0201   }
0202 
0203   @Test(expected = IllegalArgumentException.class)
0204   public void testExamplesRunnerNoArg() throws Exception {
0205     List<String> sparkSubmitArgs = Arrays.asList(SparkSubmitCommandBuilder.RUN_EXAMPLE);
0206     Map<String, String> env = new HashMap<>();
0207     buildCommand(sparkSubmitArgs, env);
0208   }
0209 
0210   @Test
0211   public void testExamplesRunnerNoMainClass() throws Exception {
0212     testCLIOpts(SparkSubmitCommandBuilder.RUN_EXAMPLE, parser.HELP, null);
0213     testCLIOpts(SparkSubmitCommandBuilder.RUN_EXAMPLE, parser.USAGE_ERROR, null);
0214     testCLIOpts(SparkSubmitCommandBuilder.RUN_EXAMPLE, parser.VERSION, null);
0215   }
0216 
0217   @Test
0218   public void testExamplesRunnerWithMasterNoMainClass() throws Exception {
0219     expectedException.expect(IllegalArgumentException.class);
0220     expectedException.expectMessage("Missing example class name.");
0221 
0222     List<String> sparkSubmitArgs = Arrays.asList(
0223       SparkSubmitCommandBuilder.RUN_EXAMPLE,
0224       parser.MASTER + "=foo"
0225     );
0226     Map<String, String> env = new HashMap<>();
0227     buildCommand(sparkSubmitArgs, env);
0228   }
0229 
0230   @Test
0231   public void testExamplesRunner() throws Exception {
0232     List<String> sparkSubmitArgs = Arrays.asList(
0233       SparkSubmitCommandBuilder.RUN_EXAMPLE,
0234       parser.MASTER + "=foo",
0235       parser.DEPLOY_MODE + "=bar",
0236       "SparkPi",
0237       "42");
0238 
0239     Map<String, String> env = new HashMap<>();
0240     List<String> cmd = buildCommand(sparkSubmitArgs, env);
0241     assertEquals("foo", findArgValue(cmd, parser.MASTER));
0242     assertEquals("bar", findArgValue(cmd, parser.DEPLOY_MODE));
0243     assertEquals(SparkSubmitCommandBuilder.EXAMPLE_CLASS_PREFIX + "SparkPi",
0244       findArgValue(cmd, parser.CLASS));
0245     assertEquals("42", cmd.get(cmd.size() - 1));
0246   }
0247 
0248   @Test(expected = IllegalArgumentException.class)
0249   public void testMissingAppResource() {
0250     new SparkSubmitCommandBuilder().buildSparkSubmitArgs();
0251   }
0252 
0253   @Test
0254   public void testIsClientMode() {
0255     // Default master is "local[*]"
0256     SparkSubmitCommandBuilder builder = newCommandBuilder(Collections.emptyList());
0257     assertTrue("By default application run in local mode",
0258       builder.isClientMode(Collections.emptyMap()));
0259     // --master yarn or it can be any RM
0260     List<String> sparkSubmitArgs = Arrays.asList(parser.MASTER, "yarn");
0261     builder = newCommandBuilder(sparkSubmitArgs);
0262     assertTrue("By default deploy mode is client", builder.isClientMode(Collections.emptyMap()));
0263     // --master yarn and set spark.submit.deployMode to client
0264     Map<String, String> userProps = new HashMap<>();
0265     userProps.put("spark.submit.deployMode", "client");
0266     assertTrue(builder.isClientMode(userProps));
0267     // --master mesos --deploy-mode cluster
0268     sparkSubmitArgs = Arrays.asList(parser.MASTER, "mesos", parser.DEPLOY_MODE, "cluster");
0269     builder = newCommandBuilder(sparkSubmitArgs);
0270     assertFalse(builder.isClientMode(Collections.emptyMap()));
0271   }
0272 
0273   private void testCmdBuilder(boolean isDriver, boolean useDefaultPropertyFile) throws Exception {
0274     final String DRIVER_DEFAULT_PARAM = "-Ddriver-default";
0275     final String DRIVER_EXTRA_PARAM = "-Ddriver-extra";
0276     String deployMode = isDriver ? "client" : "cluster";
0277 
0278     SparkSubmitCommandBuilder launcher =
0279       newCommandBuilder(Collections.emptyList());
0280     launcher.childEnv.put(CommandBuilderUtils.ENV_SPARK_HOME,
0281       System.getProperty("spark.test.home"));
0282     launcher.master = "yarn";
0283     launcher.deployMode = deployMode;
0284     launcher.appResource = "/foo";
0285     launcher.appName = "MyApp";
0286     launcher.mainClass = "my.Class";
0287     launcher.appArgs.add("foo");
0288     launcher.appArgs.add("bar");
0289     launcher.conf.put("spark.foo", "foo");
0290     // either set the property through "--conf" or through default property file
0291     if (!useDefaultPropertyFile) {
0292       launcher.setPropertiesFile(dummyPropsFile.getAbsolutePath());
0293       launcher.conf.put(SparkLauncher.DRIVER_MEMORY, "1g");
0294       launcher.conf.put(SparkLauncher.DRIVER_EXTRA_CLASSPATH, "/driver");
0295       launcher.conf.put(SparkLauncher.DRIVER_DEFAULT_JAVA_OPTIONS, DRIVER_DEFAULT_PARAM);
0296       launcher.conf.put(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, DRIVER_EXTRA_PARAM);
0297       launcher.conf.put(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH, "/native");
0298     } else {
0299       launcher.childEnv.put("SPARK_CONF_DIR", System.getProperty("spark.test.home")
0300           + "/launcher/src/test/resources");
0301     }
0302 
0303     Map<String, String> env = new HashMap<>();
0304     List<String> cmd = launcher.buildCommand(env);
0305 
0306     // Checks below are different for driver and non-driver mode.
0307 
0308     if (isDriver) {
0309       assertTrue("Driver -Xmx should be configured.", cmd.contains("-Xmx1g"));
0310       assertTrue("Driver default options should be configured.",
0311         cmd.contains(DRIVER_DEFAULT_PARAM));
0312       assertTrue("Driver extra options should be configured.", cmd.contains(DRIVER_EXTRA_PARAM));
0313     } else {
0314       boolean found = false;
0315       for (String arg : cmd) {
0316         if (arg.startsWith("-Xmx")) {
0317           found = true;
0318           break;
0319         }
0320       }
0321       assertFalse("Memory arguments should not be set.", found);
0322       assertFalse("Driver default options should not be configured.",
0323         cmd.contains(DRIVER_DEFAULT_PARAM));
0324       assertFalse("Driver extra options should not be configured.",
0325         cmd.contains(DRIVER_EXTRA_PARAM));
0326     }
0327 
0328     String[] cp = findArgValue(cmd, "-cp").split(Pattern.quote(File.pathSeparator));
0329     if (isDriver) {
0330       assertTrue("Driver classpath should contain provided entry.", contains("/driver", cp));
0331     } else {
0332       assertFalse("Driver classpath should not be in command.", contains("/driver", cp));
0333     }
0334 
0335     String libPath = env.get(CommandBuilderUtils.getLibPathEnvName());
0336     if (isDriver) {
0337       assertNotNull("Native library path should be set.", libPath);
0338       assertTrue("Native library path should contain provided entry.",
0339         contains("/native", libPath.split(Pattern.quote(File.pathSeparator))));
0340     } else {
0341       assertNull("Native library should not be set.", libPath);
0342     }
0343 
0344     // Checks below are the same for both driver and non-driver mode.
0345     if (!useDefaultPropertyFile) {
0346       assertEquals(dummyPropsFile.getAbsolutePath(), findArgValue(cmd, parser.PROPERTIES_FILE));
0347     }
0348     assertEquals("yarn", findArgValue(cmd, parser.MASTER));
0349     assertEquals(deployMode, findArgValue(cmd, parser.DEPLOY_MODE));
0350     assertEquals("my.Class", findArgValue(cmd, parser.CLASS));
0351     assertEquals("MyApp", findArgValue(cmd, parser.NAME));
0352 
0353     boolean appArgsOk = false;
0354     for (int i = 0; i < cmd.size(); i++) {
0355       if (cmd.get(i).equals("/foo")) {
0356         assertEquals("foo", cmd.get(i + 1));
0357         assertEquals("bar", cmd.get(i + 2));
0358         assertEquals(cmd.size(), i + 3);
0359         appArgsOk = true;
0360         break;
0361       }
0362     }
0363     assertTrue("App resource and args should be added to command.", appArgsOk);
0364 
0365     Map<String, String> conf = parseConf(cmd, parser);
0366     assertEquals("foo", conf.get("spark.foo"));
0367   }
0368 
0369   private boolean contains(String needle, String[] haystack) {
0370     for (String entry : haystack) {
0371       if (entry.equals(needle)) {
0372         return true;
0373       }
0374     }
0375     return false;
0376   }
0377 
0378   private Map<String, String> parseConf(List<String> cmd, SparkSubmitOptionParser parser) {
0379     Map<String, String> conf = new HashMap<>();
0380     for (int i = 0; i < cmd.size(); i++) {
0381       if (cmd.get(i).equals(parser.CONF)) {
0382         String[] val = cmd.get(i + 1).split("=", 2);
0383         conf.put(val[0], val[1]);
0384         i += 1;
0385       }
0386     }
0387     return conf;
0388   }
0389 
0390   private String findArgValue(List<String> cmd, String name) {
0391     for (int i = 0; i < cmd.size(); i++) {
0392       if (cmd.get(i).equals(name)) {
0393         return cmd.get(i + 1);
0394       }
0395     }
0396     fail(String.format("arg '%s' not found", name));
0397     return null;
0398   }
0399 
0400   private boolean findInStringList(String list, String sep, String needle) {
0401     return contains(needle, list.split(sep));
0402   }
0403 
0404   private SparkSubmitCommandBuilder newCommandBuilder(List<String> args) {
0405     SparkSubmitCommandBuilder builder = new SparkSubmitCommandBuilder(args);
0406     builder.childEnv.put(CommandBuilderUtils.ENV_SPARK_HOME, System.getProperty("spark.test.home"));
0407     return builder;
0408   }
0409 
0410   private List<String> buildCommand(List<String> args, Map<String, String> env) throws Exception {
0411     return newCommandBuilder(args).buildCommand(env);
0412   }
0413 
0414   private void testCLIOpts(String appResource, String opt, List<String> params) throws Exception {
0415     List<String> args = new ArrayList<>();
0416     if (appResource != null) {
0417       args.add(appResource);
0418     }
0419     args.add(opt);
0420     if (params != null) {
0421       args.addAll(params);
0422     }
0423     Map<String, String> env = new HashMap<>();
0424     List<String> cmd = buildCommand(args, env);
0425     assertTrue(opt + " should be contained in the final cmd.",
0426       cmd.contains(opt));
0427   }
0428 
0429 }