0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.launcher;
0019
0020 import java.io.File;
0021 import java.io.IOException;
0022 import java.util.*;
0023
0024 import static org.apache.spark.launcher.CommandBuilderUtils.*;
0025
0026
0027
0028
0029
0030
0031
0032
0033
0034
0035
0036 class SparkSubmitCommandBuilder extends AbstractCommandBuilder {
0037
0038
0039
0040
0041
0042
0043
0044
0045 static final String PYSPARK_SHELL = "pyspark-shell-main";
0046
0047
0048
0049
0050 static final String PYSPARK_SHELL_RESOURCE = "pyspark-shell";
0051
0052
0053
0054
0055
0056
0057
0058
0059 static final String SPARKR_SHELL = "sparkr-shell-main";
0060
0061
0062
0063
0064 static final String SPARKR_SHELL_RESOURCE = "sparkr-shell";
0065
0066
0067
0068
0069
0070 static final String RUN_EXAMPLE = "run-example";
0071
0072
0073
0074
0075 static final String EXAMPLE_CLASS_PREFIX = "org.apache.spark.examples.";
0076
0077
0078
0079
0080
0081
0082 private static final Map<String, String> specialClasses = new HashMap<>();
0083 static {
0084 specialClasses.put("org.apache.spark.repl.Main", "spark-shell");
0085 specialClasses.put("org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver",
0086 SparkLauncher.NO_RESOURCE);
0087 specialClasses.put("org.apache.spark.sql.hive.thriftserver.HiveThriftServer2",
0088 SparkLauncher.NO_RESOURCE);
0089 }
0090
0091 final List<String> userArgs;
0092 private final List<String> parsedArgs;
0093
0094 private final boolean isSpecialCommand;
0095 private final boolean isExample;
0096
0097
0098
0099
0100
0101
0102 private boolean allowsMixedArguments;
0103
0104
0105
0106
0107
0108 SparkSubmitCommandBuilder() {
0109 this.isSpecialCommand = false;
0110 this.isExample = false;
0111 this.parsedArgs = new ArrayList<>();
0112 this.userArgs = new ArrayList<>();
0113 }
0114
0115
0116
0117
0118
0119 SparkSubmitCommandBuilder(List<String> args) {
0120 this.allowsMixedArguments = false;
0121 this.parsedArgs = new ArrayList<>();
0122 boolean isExample = false;
0123 List<String> submitArgs = args;
0124 this.userArgs = Collections.emptyList();
0125
0126 if (args.size() > 0) {
0127 switch (args.get(0)) {
0128 case PYSPARK_SHELL:
0129 this.allowsMixedArguments = true;
0130 appResource = PYSPARK_SHELL;
0131 submitArgs = args.subList(1, args.size());
0132 break;
0133
0134 case SPARKR_SHELL:
0135 this.allowsMixedArguments = true;
0136 appResource = SPARKR_SHELL;
0137 submitArgs = args.subList(1, args.size());
0138 break;
0139
0140 case RUN_EXAMPLE:
0141 isExample = true;
0142 appResource = SparkLauncher.NO_RESOURCE;
0143 submitArgs = args.subList(1, args.size());
0144 }
0145
0146 this.isExample = isExample;
0147 OptionParser parser = new OptionParser(true);
0148 parser.parse(submitArgs);
0149 this.isSpecialCommand = parser.isSpecialCommand;
0150 } else {
0151 this.isExample = isExample;
0152 this.isSpecialCommand = true;
0153 }
0154 }
0155
0156 @Override
0157 public List<String> buildCommand(Map<String, String> env)
0158 throws IOException, IllegalArgumentException {
0159 if (PYSPARK_SHELL.equals(appResource) && !isSpecialCommand) {
0160 return buildPySparkShellCommand(env);
0161 } else if (SPARKR_SHELL.equals(appResource) && !isSpecialCommand) {
0162 return buildSparkRCommand(env);
0163 } else {
0164 return buildSparkSubmitCommand(env);
0165 }
0166 }
0167
0168 List<String> buildSparkSubmitArgs() {
0169 List<String> args = new ArrayList<>();
0170 OptionParser parser = new OptionParser(false);
0171 final boolean isSpecialCommand;
0172
0173
0174
0175 if (!userArgs.isEmpty()) {
0176 parser.parse(userArgs);
0177 isSpecialCommand = parser.isSpecialCommand;
0178 } else {
0179 isSpecialCommand = this.isSpecialCommand;
0180 }
0181
0182 if (!allowsMixedArguments && !isSpecialCommand) {
0183 checkArgument(appResource != null, "Missing application resource.");
0184 }
0185
0186 if (verbose) {
0187 args.add(parser.VERBOSE);
0188 }
0189
0190 if (master != null) {
0191 args.add(parser.MASTER);
0192 args.add(master);
0193 }
0194
0195 if (deployMode != null) {
0196 args.add(parser.DEPLOY_MODE);
0197 args.add(deployMode);
0198 }
0199
0200 if (appName != null) {
0201 args.add(parser.NAME);
0202 args.add(appName);
0203 }
0204
0205 for (Map.Entry<String, String> e : conf.entrySet()) {
0206 args.add(parser.CONF);
0207 args.add(String.format("%s=%s", e.getKey(), e.getValue()));
0208 }
0209
0210 if (propertiesFile != null) {
0211 args.add(parser.PROPERTIES_FILE);
0212 args.add(propertiesFile);
0213 }
0214
0215 if (isExample) {
0216 jars.addAll(findExamplesJars());
0217 }
0218
0219 if (!jars.isEmpty()) {
0220 args.add(parser.JARS);
0221 args.add(join(",", jars));
0222 }
0223
0224 if (!files.isEmpty()) {
0225 args.add(parser.FILES);
0226 args.add(join(",", files));
0227 }
0228
0229 if (!pyFiles.isEmpty()) {
0230 args.add(parser.PY_FILES);
0231 args.add(join(",", pyFiles));
0232 }
0233
0234 if (isExample && !isSpecialCommand) {
0235 checkArgument(mainClass != null, "Missing example class name.");
0236 }
0237
0238 if (mainClass != null) {
0239 args.add(parser.CLASS);
0240 args.add(mainClass);
0241 }
0242
0243 args.addAll(parsedArgs);
0244 if (appResource != null) {
0245 args.add(appResource);
0246 }
0247 args.addAll(appArgs);
0248
0249 return args;
0250 }
0251
0252 private List<String> buildSparkSubmitCommand(Map<String, String> env)
0253 throws IOException, IllegalArgumentException {
0254
0255
0256
0257 Map<String, String> config = getEffectiveConfig();
0258 boolean isClientMode = isClientMode(config);
0259 String extraClassPath = isClientMode ? config.get(SparkLauncher.DRIVER_EXTRA_CLASSPATH) : null;
0260
0261 List<String> cmd = buildJavaCommand(extraClassPath);
0262
0263 if (isThriftServer(mainClass)) {
0264 addOptionString(cmd, System.getenv("SPARK_DAEMON_JAVA_OPTS"));
0265 }
0266 addOptionString(cmd, System.getenv("SPARK_SUBMIT_OPTS"));
0267
0268
0269
0270 String driverDefaultJavaOptions = config.get(SparkLauncher.DRIVER_DEFAULT_JAVA_OPTIONS);
0271 checkJavaOptions(driverDefaultJavaOptions);
0272 String driverExtraJavaOptions = config.get(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS);
0273 checkJavaOptions(driverExtraJavaOptions);
0274
0275 if (isClientMode) {
0276
0277
0278
0279
0280
0281
0282
0283
0284 String tsMemory =
0285 isThriftServer(mainClass) ? System.getenv("SPARK_DAEMON_MEMORY") : null;
0286 String memory = firstNonEmpty(tsMemory, config.get(SparkLauncher.DRIVER_MEMORY),
0287 System.getenv("SPARK_DRIVER_MEMORY"), System.getenv("SPARK_MEM"), DEFAULT_MEM);
0288 cmd.add("-Xmx" + memory);
0289 addOptionString(cmd, driverDefaultJavaOptions);
0290 addOptionString(cmd, driverExtraJavaOptions);
0291 mergeEnvPathList(env, getLibPathEnvName(),
0292 config.get(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH));
0293 }
0294
0295 cmd.add("org.apache.spark.deploy.SparkSubmit");
0296 cmd.addAll(buildSparkSubmitArgs());
0297 return cmd;
0298 }
0299
0300 private void checkJavaOptions(String javaOptions) {
0301 if (!isEmpty(javaOptions) && javaOptions.contains("Xmx")) {
0302 String msg = String.format("Not allowed to specify max heap(Xmx) memory settings through " +
0303 "java options (was %s). Use the corresponding --driver-memory or " +
0304 "spark.driver.memory configuration instead.", javaOptions);
0305 throw new IllegalArgumentException(msg);
0306 }
0307 }
0308
0309 private List<String> buildPySparkShellCommand(Map<String, String> env) throws IOException {
0310
0311
0312 if (!appArgs.isEmpty() && appArgs.get(0).endsWith(".py")) {
0313 System.err.println(
0314 "Running python applications through 'pyspark' is not supported as of Spark 2.0.\n" +
0315 "Use ./bin/spark-submit <python file>");
0316 System.exit(-1);
0317 }
0318
0319 checkArgument(appArgs.isEmpty(), "pyspark does not support any application options.");
0320
0321
0322
0323 appResource = PYSPARK_SHELL_RESOURCE;
0324 constructEnvVarArgs(env, "PYSPARK_SUBMIT_ARGS");
0325
0326
0327
0328
0329
0330
0331
0332 List<String> pyargs = new ArrayList<>();
0333 pyargs.add(firstNonEmpty(conf.get(SparkLauncher.PYSPARK_DRIVER_PYTHON),
0334 conf.get(SparkLauncher.PYSPARK_PYTHON),
0335 System.getenv("PYSPARK_DRIVER_PYTHON"),
0336 System.getenv("PYSPARK_PYTHON"),
0337 "python"));
0338 String pyOpts = System.getenv("PYSPARK_DRIVER_PYTHON_OPTS");
0339 if (conf.containsKey(SparkLauncher.PYSPARK_PYTHON)) {
0340
0341 env.put("PYSPARK_PYTHON", conf.get(SparkLauncher.PYSPARK_PYTHON));
0342 }
0343 if (!isEmpty(pyOpts)) {
0344 pyargs.addAll(parseOptionString(pyOpts));
0345 }
0346
0347 return pyargs;
0348 }
0349
0350 private List<String> buildSparkRCommand(Map<String, String> env) throws IOException {
0351 if (!appArgs.isEmpty() && (appArgs.get(0).endsWith(".R") || appArgs.get(0).endsWith(".r"))) {
0352 System.err.println(
0353 "Running R applications through 'sparkR' is not supported as of Spark 2.0.\n" +
0354 "Use ./bin/spark-submit <R file>");
0355 System.exit(-1);
0356 }
0357
0358
0359 appResource = SPARKR_SHELL_RESOURCE;
0360 constructEnvVarArgs(env, "SPARKR_SUBMIT_ARGS");
0361
0362
0363 String sparkHome = System.getenv("SPARK_HOME");
0364 env.put("R_PROFILE_USER",
0365 join(File.separator, sparkHome, "R", "lib", "SparkR", "profile", "shell.R"));
0366
0367 List<String> args = new ArrayList<>();
0368 args.add(firstNonEmpty(conf.get(SparkLauncher.SPARKR_R_SHELL),
0369 System.getenv("SPARKR_DRIVER_R"), "R"));
0370 return args;
0371 }
0372
0373 private void constructEnvVarArgs(
0374 Map<String, String> env,
0375 String submitArgsEnvVariable) throws IOException {
0376 mergeEnvPathList(env, getLibPathEnvName(),
0377 getEffectiveConfig().get(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH));
0378
0379 StringBuilder submitArgs = new StringBuilder();
0380 for (String arg : buildSparkSubmitArgs()) {
0381 if (submitArgs.length() > 0) {
0382 submitArgs.append(" ");
0383 }
0384 submitArgs.append(quoteForCommandString(arg));
0385 }
0386 env.put(submitArgsEnvVariable, submitArgs.toString());
0387 }
0388
0389 boolean isClientMode(Map<String, String> userProps) {
0390 String userMaster = firstNonEmpty(master, userProps.get(SparkLauncher.SPARK_MASTER));
0391 String userDeployMode = firstNonEmpty(deployMode, userProps.get(SparkLauncher.DEPLOY_MODE));
0392
0393 return userMaster == null || userDeployMode == null || "client".equals(userDeployMode);
0394 }
0395
0396
0397
0398
0399 private boolean isThriftServer(String mainClass) {
0400 return (mainClass != null &&
0401 mainClass.equals("org.apache.spark.sql.hive.thriftserver.HiveThriftServer2"));
0402 }
0403
0404 private List<String> findExamplesJars() {
0405 boolean isTesting = "1".equals(getenv("SPARK_TESTING"));
0406 List<String> examplesJars = new ArrayList<>();
0407 String sparkHome = getSparkHome();
0408
0409 File jarsDir;
0410 if (new File(sparkHome, "RELEASE").isFile()) {
0411 jarsDir = new File(sparkHome, "examples/jars");
0412 } else {
0413 jarsDir = new File(sparkHome,
0414 String.format("examples/target/scala-%s/jars", getScalaVersion()));
0415 }
0416
0417 boolean foundDir = jarsDir.isDirectory();
0418 checkState(isTesting || foundDir, "Examples jars directory '%s' does not exist.",
0419 jarsDir.getAbsolutePath());
0420
0421 if (foundDir) {
0422 for (File f: jarsDir.listFiles()) {
0423 examplesJars.add(f.getAbsolutePath());
0424 }
0425 }
0426 return examplesJars;
0427 }
0428
0429 private class OptionParser extends SparkSubmitOptionParser {
0430
0431 boolean isSpecialCommand = false;
0432 private final boolean errorOnUnknownArgs;
0433
0434 OptionParser(boolean errorOnUnknownArgs) {
0435 this.errorOnUnknownArgs = errorOnUnknownArgs;
0436 }
0437
0438 @Override
0439 protected boolean handle(String opt, String value) {
0440 switch (opt) {
0441 case MASTER:
0442 master = value;
0443 break;
0444 case DEPLOY_MODE:
0445 deployMode = value;
0446 break;
0447 case PROPERTIES_FILE:
0448 propertiesFile = value;
0449 break;
0450 case DRIVER_MEMORY:
0451 conf.put(SparkLauncher.DRIVER_MEMORY, value);
0452 break;
0453 case DRIVER_JAVA_OPTIONS:
0454 conf.put(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, value);
0455 break;
0456 case DRIVER_LIBRARY_PATH:
0457 conf.put(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH, value);
0458 break;
0459 case DRIVER_CLASS_PATH:
0460 conf.put(SparkLauncher.DRIVER_EXTRA_CLASSPATH, value);
0461 break;
0462 case CONF:
0463 checkArgument(value != null, "Missing argument to %s", CONF);
0464 String[] setConf = value.split("=", 2);
0465 checkArgument(setConf.length == 2, "Invalid argument to %s: %s", CONF, value);
0466 conf.put(setConf[0], setConf[1]);
0467 break;
0468 case CLASS:
0469
0470
0471
0472
0473 mainClass = value;
0474 if (specialClasses.containsKey(value)) {
0475 allowsMixedArguments = true;
0476 appResource = specialClasses.get(value);
0477 }
0478 break;
0479 case KILL_SUBMISSION:
0480 case STATUS:
0481 isSpecialCommand = true;
0482 parsedArgs.add(opt);
0483 parsedArgs.add(value);
0484 break;
0485 case HELP:
0486 case USAGE_ERROR:
0487 case VERSION:
0488 isSpecialCommand = true;
0489 parsedArgs.add(opt);
0490 break;
0491 default:
0492 parsedArgs.add(opt);
0493 if (value != null) {
0494 parsedArgs.add(value);
0495 }
0496 break;
0497 }
0498 return true;
0499 }
0500
0501 @Override
0502 protected boolean handleUnknown(String opt) {
0503
0504
0505
0506
0507 if (allowsMixedArguments) {
0508 appArgs.add(opt);
0509 return true;
0510 } else if (isExample) {
0511 String className = opt;
0512 if (!className.startsWith(EXAMPLE_CLASS_PREFIX)) {
0513 className = EXAMPLE_CLASS_PREFIX + className;
0514 }
0515 mainClass = className;
0516 appResource = SparkLauncher.NO_RESOURCE;
0517 return false;
0518 } else if (errorOnUnknownArgs) {
0519 checkArgument(!opt.startsWith("-"), "Unrecognized option: %s", opt);
0520 checkState(appResource == null, "Found unrecognized argument but resource is already set.");
0521 appResource = opt;
0522 return false;
0523 }
0524 return true;
0525 }
0526
0527 @Override
0528 protected void handleExtraArgs(List<String> extra) {
0529 appArgs.addAll(extra);
0530 }
0531
0532 }
0533
0534 }