0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.launcher;
0019
0020 import java.io.File;
0021 import java.util.ArrayList;
0022 import java.util.Arrays;
0023 import java.util.Collections;
0024 import java.util.HashMap;
0025 import java.util.List;
0026 import java.util.Map;
0027 import java.util.regex.Pattern;
0028
0029 import org.junit.AfterClass;
0030 import org.junit.BeforeClass;
0031 import org.junit.Rule;
0032 import org.junit.Test;
0033 import org.junit.rules.ExpectedException;
0034
0035 import static org.junit.Assert.*;
0036
0037 public class SparkSubmitCommandBuilderSuite extends BaseSuite {
0038
0039 private static File dummyPropsFile;
0040 private static SparkSubmitOptionParser parser;
0041
0042 @Rule
0043 public ExpectedException expectedException = ExpectedException.none();
0044
0045 @BeforeClass
0046 public static void setUp() throws Exception {
0047 dummyPropsFile = File.createTempFile("spark", "properties");
0048 parser = new SparkSubmitOptionParser();
0049 }
0050
0051 @AfterClass
0052 public static void cleanUp() throws Exception {
0053 dummyPropsFile.delete();
0054 }
0055
0056 @Test
0057 public void testDriverCmdBuilder() throws Exception {
0058 testCmdBuilder(true, true);
0059 testCmdBuilder(true, false);
0060 }
0061
0062 @Test
0063 public void testClusterCmdBuilder() throws Exception {
0064 testCmdBuilder(false, true);
0065 testCmdBuilder(false, false);
0066 }
0067
0068 @Test
0069 public void testCliHelpAndNoArg() throws Exception {
0070 List<String> helpArgs = Arrays.asList(parser.HELP);
0071 Map<String, String> env = new HashMap<>();
0072 List<String> cmd = buildCommand(helpArgs, env);
0073 assertTrue("--help should be contained in the final cmd.", cmd.contains(parser.HELP));
0074
0075 List<String> sparkEmptyArgs = Collections.emptyList();
0076 cmd = buildCommand(sparkEmptyArgs, env);
0077 assertTrue(
0078 "org.apache.spark.deploy.SparkSubmit should be contained in the final cmd of empty input.",
0079 cmd.contains("org.apache.spark.deploy.SparkSubmit"));
0080 }
0081
0082 @Test
0083 public void testCliKillAndStatus() throws Exception {
0084 List<String> params = Arrays.asList("driver-20160531171222-0000");
0085 testCLIOpts(null, parser.STATUS, params);
0086 testCLIOpts(null, parser.KILL_SUBMISSION, params);
0087 testCLIOpts(SparkSubmitCommandBuilder.RUN_EXAMPLE, parser.STATUS, params);
0088 testCLIOpts(SparkSubmitCommandBuilder.RUN_EXAMPLE, parser.KILL_SUBMISSION, params);
0089 }
0090
0091 @Test
0092 public void testCliParser() throws Exception {
0093 List<String> sparkSubmitArgs = Arrays.asList(
0094 parser.MASTER,
0095 "local",
0096 parser.DRIVER_MEMORY,
0097 "42g",
0098 parser.DRIVER_CLASS_PATH,
0099 "/driverCp",
0100 parser.DRIVER_JAVA_OPTIONS,
0101 "extraJavaOpt",
0102 parser.CONF,
0103 "spark.randomOption=foo",
0104 parser.CONF,
0105 SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH + "=/driverLibPath",
0106 SparkLauncher.NO_RESOURCE);
0107 Map<String, String> env = new HashMap<>();
0108 List<String> cmd = buildCommand(sparkSubmitArgs, env);
0109
0110 assertTrue(findInStringList(env.get(CommandBuilderUtils.getLibPathEnvName()),
0111 File.pathSeparator, "/driverLibPath"));
0112 assertTrue(findInStringList(findArgValue(cmd, "-cp"), File.pathSeparator, "/driverCp"));
0113 assertTrue("Driver -Xmx should be configured.", cmd.contains("-Xmx42g"));
0114 assertTrue("Command should contain user-defined conf.",
0115 Collections.indexOfSubList(cmd, Arrays.asList(parser.CONF, "spark.randomOption=foo")) > 0);
0116 }
0117
0118 @Test
0119 public void testShellCliParser() throws Exception {
0120 List<String> sparkSubmitArgs = Arrays.asList(
0121 parser.CLASS,
0122 "org.apache.spark.repl.Main",
0123 parser.MASTER,
0124 "foo",
0125 "--app-arg",
0126 "bar",
0127 "--app-switch",
0128 parser.FILES,
0129 "baz",
0130 parser.NAME,
0131 "appName");
0132
0133 List<String> args = newCommandBuilder(sparkSubmitArgs).buildSparkSubmitArgs();
0134 List<String> expected = Arrays.asList("spark-shell", "--app-arg", "bar", "--app-switch");
0135 assertEquals(expected, args.subList(args.size() - expected.size(), args.size()));
0136 }
0137
0138 @Test
0139 public void testAlternateSyntaxParsing() throws Exception {
0140 List<String> sparkSubmitArgs = Arrays.asList(
0141 parser.CLASS + "=org.my.Class",
0142 parser.MASTER + "=foo",
0143 parser.DEPLOY_MODE + "=bar",
0144 SparkLauncher.NO_RESOURCE);
0145
0146 List<String> cmd = newCommandBuilder(sparkSubmitArgs).buildSparkSubmitArgs();
0147 assertEquals("org.my.Class", findArgValue(cmd, parser.CLASS));
0148 assertEquals("foo", findArgValue(cmd, parser.MASTER));
0149 assertEquals("bar", findArgValue(cmd, parser.DEPLOY_MODE));
0150 }
0151
0152 @Test
0153 public void testPySparkLauncher() throws Exception {
0154 List<String> sparkSubmitArgs = Arrays.asList(
0155 SparkSubmitCommandBuilder.PYSPARK_SHELL,
0156 "--master=foo",
0157 "--deploy-mode=bar");
0158
0159 Map<String, String> env = new HashMap<>();
0160 List<String> cmd = buildCommand(sparkSubmitArgs, env);
0161 assertTrue(Arrays.asList("python", "python2", "python3").contains(cmd.get(cmd.size() - 1)));
0162 assertEquals(
0163 String.format("\"%s\" \"foo\" \"%s\" \"bar\" \"%s\"",
0164 parser.MASTER, parser.DEPLOY_MODE, SparkSubmitCommandBuilder.PYSPARK_SHELL_RESOURCE),
0165 env.get("PYSPARK_SUBMIT_ARGS"));
0166 }
0167
0168 @Test
0169 public void testPySparkFallback() throws Exception {
0170 List<String> sparkSubmitArgs = Arrays.asList(
0171 "--master=foo",
0172 "--deploy-mode=bar",
0173 "script.py",
0174 "arg1");
0175
0176 Map<String, String> env = new HashMap<>();
0177 List<String> cmd = buildCommand(sparkSubmitArgs, env);
0178
0179 assertEquals("foo", findArgValue(cmd, "--master"));
0180 assertEquals("bar", findArgValue(cmd, "--deploy-mode"));
0181 assertEquals("script.py", cmd.get(cmd.size() - 2));
0182 assertEquals("arg1", cmd.get(cmd.size() - 1));
0183 }
0184
0185 @Test
0186 public void testSparkRShell() throws Exception {
0187 List<String> sparkSubmitArgs = Arrays.asList(
0188 SparkSubmitCommandBuilder.SPARKR_SHELL,
0189 "--master=foo",
0190 "--deploy-mode=bar",
0191 "--conf", "spark.r.shell.command=/usr/bin/R");
0192
0193 Map<String, String> env = new HashMap<>();
0194 List<String> cmd = buildCommand(sparkSubmitArgs, env);
0195 assertEquals("/usr/bin/R", cmd.get(cmd.size() - 1));
0196 assertEquals(
0197 String.format(
0198 "\"%s\" \"foo\" \"%s\" \"bar\" \"--conf\" \"spark.r.shell.command=/usr/bin/R\" \"%s\"",
0199 parser.MASTER, parser.DEPLOY_MODE, SparkSubmitCommandBuilder.SPARKR_SHELL_RESOURCE),
0200 env.get("SPARKR_SUBMIT_ARGS"));
0201 }
0202
0203 @Test(expected = IllegalArgumentException.class)
0204 public void testExamplesRunnerNoArg() throws Exception {
0205 List<String> sparkSubmitArgs = Arrays.asList(SparkSubmitCommandBuilder.RUN_EXAMPLE);
0206 Map<String, String> env = new HashMap<>();
0207 buildCommand(sparkSubmitArgs, env);
0208 }
0209
0210 @Test
0211 public void testExamplesRunnerNoMainClass() throws Exception {
0212 testCLIOpts(SparkSubmitCommandBuilder.RUN_EXAMPLE, parser.HELP, null);
0213 testCLIOpts(SparkSubmitCommandBuilder.RUN_EXAMPLE, parser.USAGE_ERROR, null);
0214 testCLIOpts(SparkSubmitCommandBuilder.RUN_EXAMPLE, parser.VERSION, null);
0215 }
0216
0217 @Test
0218 public void testExamplesRunnerWithMasterNoMainClass() throws Exception {
0219 expectedException.expect(IllegalArgumentException.class);
0220 expectedException.expectMessage("Missing example class name.");
0221
0222 List<String> sparkSubmitArgs = Arrays.asList(
0223 SparkSubmitCommandBuilder.RUN_EXAMPLE,
0224 parser.MASTER + "=foo"
0225 );
0226 Map<String, String> env = new HashMap<>();
0227 buildCommand(sparkSubmitArgs, env);
0228 }
0229
0230 @Test
0231 public void testExamplesRunner() throws Exception {
0232 List<String> sparkSubmitArgs = Arrays.asList(
0233 SparkSubmitCommandBuilder.RUN_EXAMPLE,
0234 parser.MASTER + "=foo",
0235 parser.DEPLOY_MODE + "=bar",
0236 "SparkPi",
0237 "42");
0238
0239 Map<String, String> env = new HashMap<>();
0240 List<String> cmd = buildCommand(sparkSubmitArgs, env);
0241 assertEquals("foo", findArgValue(cmd, parser.MASTER));
0242 assertEquals("bar", findArgValue(cmd, parser.DEPLOY_MODE));
0243 assertEquals(SparkSubmitCommandBuilder.EXAMPLE_CLASS_PREFIX + "SparkPi",
0244 findArgValue(cmd, parser.CLASS));
0245 assertEquals("42", cmd.get(cmd.size() - 1));
0246 }
0247
0248 @Test(expected = IllegalArgumentException.class)
0249 public void testMissingAppResource() {
0250 new SparkSubmitCommandBuilder().buildSparkSubmitArgs();
0251 }
0252
0253 @Test
0254 public void testIsClientMode() {
0255
0256 SparkSubmitCommandBuilder builder = newCommandBuilder(Collections.emptyList());
0257 assertTrue("By default application run in local mode",
0258 builder.isClientMode(Collections.emptyMap()));
0259
0260 List<String> sparkSubmitArgs = Arrays.asList(parser.MASTER, "yarn");
0261 builder = newCommandBuilder(sparkSubmitArgs);
0262 assertTrue("By default deploy mode is client", builder.isClientMode(Collections.emptyMap()));
0263
0264 Map<String, String> userProps = new HashMap<>();
0265 userProps.put("spark.submit.deployMode", "client");
0266 assertTrue(builder.isClientMode(userProps));
0267
0268 sparkSubmitArgs = Arrays.asList(parser.MASTER, "mesos", parser.DEPLOY_MODE, "cluster");
0269 builder = newCommandBuilder(sparkSubmitArgs);
0270 assertFalse(builder.isClientMode(Collections.emptyMap()));
0271 }
0272
0273 private void testCmdBuilder(boolean isDriver, boolean useDefaultPropertyFile) throws Exception {
0274 final String DRIVER_DEFAULT_PARAM = "-Ddriver-default";
0275 final String DRIVER_EXTRA_PARAM = "-Ddriver-extra";
0276 String deployMode = isDriver ? "client" : "cluster";
0277
0278 SparkSubmitCommandBuilder launcher =
0279 newCommandBuilder(Collections.emptyList());
0280 launcher.childEnv.put(CommandBuilderUtils.ENV_SPARK_HOME,
0281 System.getProperty("spark.test.home"));
0282 launcher.master = "yarn";
0283 launcher.deployMode = deployMode;
0284 launcher.appResource = "/foo";
0285 launcher.appName = "MyApp";
0286 launcher.mainClass = "my.Class";
0287 launcher.appArgs.add("foo");
0288 launcher.appArgs.add("bar");
0289 launcher.conf.put("spark.foo", "foo");
0290
0291 if (!useDefaultPropertyFile) {
0292 launcher.setPropertiesFile(dummyPropsFile.getAbsolutePath());
0293 launcher.conf.put(SparkLauncher.DRIVER_MEMORY, "1g");
0294 launcher.conf.put(SparkLauncher.DRIVER_EXTRA_CLASSPATH, "/driver");
0295 launcher.conf.put(SparkLauncher.DRIVER_DEFAULT_JAVA_OPTIONS, DRIVER_DEFAULT_PARAM);
0296 launcher.conf.put(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, DRIVER_EXTRA_PARAM);
0297 launcher.conf.put(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH, "/native");
0298 } else {
0299 launcher.childEnv.put("SPARK_CONF_DIR", System.getProperty("spark.test.home")
0300 + "/launcher/src/test/resources");
0301 }
0302
0303 Map<String, String> env = new HashMap<>();
0304 List<String> cmd = launcher.buildCommand(env);
0305
0306
0307
0308 if (isDriver) {
0309 assertTrue("Driver -Xmx should be configured.", cmd.contains("-Xmx1g"));
0310 assertTrue("Driver default options should be configured.",
0311 cmd.contains(DRIVER_DEFAULT_PARAM));
0312 assertTrue("Driver extra options should be configured.", cmd.contains(DRIVER_EXTRA_PARAM));
0313 } else {
0314 boolean found = false;
0315 for (String arg : cmd) {
0316 if (arg.startsWith("-Xmx")) {
0317 found = true;
0318 break;
0319 }
0320 }
0321 assertFalse("Memory arguments should not be set.", found);
0322 assertFalse("Driver default options should not be configured.",
0323 cmd.contains(DRIVER_DEFAULT_PARAM));
0324 assertFalse("Driver extra options should not be configured.",
0325 cmd.contains(DRIVER_EXTRA_PARAM));
0326 }
0327
0328 String[] cp = findArgValue(cmd, "-cp").split(Pattern.quote(File.pathSeparator));
0329 if (isDriver) {
0330 assertTrue("Driver classpath should contain provided entry.", contains("/driver", cp));
0331 } else {
0332 assertFalse("Driver classpath should not be in command.", contains("/driver", cp));
0333 }
0334
0335 String libPath = env.get(CommandBuilderUtils.getLibPathEnvName());
0336 if (isDriver) {
0337 assertNotNull("Native library path should be set.", libPath);
0338 assertTrue("Native library path should contain provided entry.",
0339 contains("/native", libPath.split(Pattern.quote(File.pathSeparator))));
0340 } else {
0341 assertNull("Native library should not be set.", libPath);
0342 }
0343
0344
0345 if (!useDefaultPropertyFile) {
0346 assertEquals(dummyPropsFile.getAbsolutePath(), findArgValue(cmd, parser.PROPERTIES_FILE));
0347 }
0348 assertEquals("yarn", findArgValue(cmd, parser.MASTER));
0349 assertEquals(deployMode, findArgValue(cmd, parser.DEPLOY_MODE));
0350 assertEquals("my.Class", findArgValue(cmd, parser.CLASS));
0351 assertEquals("MyApp", findArgValue(cmd, parser.NAME));
0352
0353 boolean appArgsOk = false;
0354 for (int i = 0; i < cmd.size(); i++) {
0355 if (cmd.get(i).equals("/foo")) {
0356 assertEquals("foo", cmd.get(i + 1));
0357 assertEquals("bar", cmd.get(i + 2));
0358 assertEquals(cmd.size(), i + 3);
0359 appArgsOk = true;
0360 break;
0361 }
0362 }
0363 assertTrue("App resource and args should be added to command.", appArgsOk);
0364
0365 Map<String, String> conf = parseConf(cmd, parser);
0366 assertEquals("foo", conf.get("spark.foo"));
0367 }
0368
0369 private boolean contains(String needle, String[] haystack) {
0370 for (String entry : haystack) {
0371 if (entry.equals(needle)) {
0372 return true;
0373 }
0374 }
0375 return false;
0376 }
0377
0378 private Map<String, String> parseConf(List<String> cmd, SparkSubmitOptionParser parser) {
0379 Map<String, String> conf = new HashMap<>();
0380 for (int i = 0; i < cmd.size(); i++) {
0381 if (cmd.get(i).equals(parser.CONF)) {
0382 String[] val = cmd.get(i + 1).split("=", 2);
0383 conf.put(val[0], val[1]);
0384 i += 1;
0385 }
0386 }
0387 return conf;
0388 }
0389
0390 private String findArgValue(List<String> cmd, String name) {
0391 for (int i = 0; i < cmd.size(); i++) {
0392 if (cmd.get(i).equals(name)) {
0393 return cmd.get(i + 1);
0394 }
0395 }
0396 fail(String.format("arg '%s' not found", name));
0397 return null;
0398 }
0399
0400 private boolean findInStringList(String list, String sep, String needle) {
0401 return contains(needle, list.split(sep));
0402 }
0403
0404 private SparkSubmitCommandBuilder newCommandBuilder(List<String> args) {
0405 SparkSubmitCommandBuilder builder = new SparkSubmitCommandBuilder(args);
0406 builder.childEnv.put(CommandBuilderUtils.ENV_SPARK_HOME, System.getProperty("spark.test.home"));
0407 return builder;
0408 }
0409
0410 private List<String> buildCommand(List<String> args, Map<String, String> env) throws Exception {
0411 return newCommandBuilder(args).buildCommand(env);
0412 }
0413
0414 private void testCLIOpts(String appResource, String opt, List<String> params) throws Exception {
0415 List<String> args = new ArrayList<>();
0416 if (appResource != null) {
0417 args.add(appResource);
0418 }
0419 args.add(opt);
0420 if (params != null) {
0421 args.addAll(params);
0422 }
0423 Map<String, String> env = new HashMap<>();
0424 List<String> cmd = buildCommand(args, env);
0425 assertTrue(opt + " should be contained in the final cmd.",
0426 cmd.contains(opt));
0427 }
0428
0429 }