0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.launcher;
0019
0020 import java.io.File;
0021 import java.io.IOException;
0022 import java.io.InputStream;
0023 import java.util.ArrayList;
0024 import java.util.HashMap;
0025 import java.util.List;
0026 import java.util.Map;
0027 import java.util.concurrent.ThreadFactory;
0028 import java.util.concurrent.atomic.AtomicInteger;
0029 import java.util.logging.Level;
0030 import java.util.logging.Logger;
0031
0032 import static org.apache.spark.launcher.CommandBuilderUtils.*;
0033 import static org.apache.spark.launcher.CommandBuilderUtils.join;
0034
0035
0036
0037
0038
0039
0040
0041
0042 public class SparkLauncher extends AbstractLauncher<SparkLauncher> {
0043
0044 private static final Logger LOG = Logger.getLogger(SparkLauncher.class.getName());
0045
0046
0047 public static final String SPARK_MASTER = "spark.master";
0048
0049
0050 public static final String DEPLOY_MODE = "spark.submit.deployMode";
0051
0052
0053 public static final String DRIVER_MEMORY = "spark.driver.memory";
0054
0055 public static final String DRIVER_EXTRA_CLASSPATH = "spark.driver.extraClassPath";
0056
0057 public static final String DRIVER_DEFAULT_JAVA_OPTIONS = "spark.driver.defaultJavaOptions";
0058
0059 public static final String DRIVER_EXTRA_JAVA_OPTIONS = "spark.driver.extraJavaOptions";
0060
0061 public static final String DRIVER_EXTRA_LIBRARY_PATH = "spark.driver.extraLibraryPath";
0062
0063
0064 public static final String EXECUTOR_MEMORY = "spark.executor.memory";
0065
0066 public static final String EXECUTOR_EXTRA_CLASSPATH = "spark.executor.extraClassPath";
0067
0068 public static final String EXECUTOR_DEFAULT_JAVA_OPTIONS = "spark.executor.defaultJavaOptions";
0069
0070 public static final String EXECUTOR_EXTRA_JAVA_OPTIONS = "spark.executor.extraJavaOptions";
0071
0072 public static final String EXECUTOR_EXTRA_LIBRARY_PATH = "spark.executor.extraLibraryPath";
0073
0074 public static final String EXECUTOR_CORES = "spark.executor.cores";
0075
0076 static final String PYSPARK_DRIVER_PYTHON = "spark.pyspark.driver.python";
0077
0078 static final String PYSPARK_PYTHON = "spark.pyspark.python";
0079
0080 static final String SPARKR_R_SHELL = "spark.r.shell.command";
0081
0082
0083 public static final String CHILD_PROCESS_LOGGER_NAME = "spark.launcher.childProcLoggerName";
0084
0085
0086
0087
0088
0089
0090 public static final String NO_RESOURCE = "spark-internal";
0091
0092
0093
0094
0095
0096 public static final String CHILD_CONNECTION_TIMEOUT = "spark.launcher.childConectionTimeout";
0097
0098
0099 private static final AtomicInteger COUNTER = new AtomicInteger();
0100
0101
0102 static final ThreadFactory REDIRECTOR_FACTORY = new NamedThreadFactory("launcher-proc-%d");
0103
0104 static final Map<String, String> launcherConfig = new HashMap<>();
0105
0106
0107
0108
0109
0110
0111
0112
0113
0114
0115 public static void setConfig(String name, String value) {
0116 launcherConfig.put(name, value);
0117 }
0118
0119
0120 File workingDir;
0121 boolean redirectErrorStream;
0122 ProcessBuilder.Redirect errorStream;
0123 ProcessBuilder.Redirect outputStream;
0124
0125 public SparkLauncher() {
0126 this(null);
0127 }
0128
0129
0130
0131
0132
0133
0134 public SparkLauncher(Map<String, String> env) {
0135 if (env != null) {
0136 this.builder.childEnv.putAll(env);
0137 }
0138 }
0139
0140
0141
0142
0143
0144
0145
0146 public SparkLauncher setJavaHome(String javaHome) {
0147 checkNotNull(javaHome, "javaHome");
0148 builder.javaHome = javaHome;
0149 return this;
0150 }
0151
0152
0153
0154
0155
0156
0157
0158 public SparkLauncher setSparkHome(String sparkHome) {
0159 checkNotNull(sparkHome, "sparkHome");
0160 builder.childEnv.put(ENV_SPARK_HOME, sparkHome);
0161 return this;
0162 }
0163
0164
0165
0166
0167
0168
0169
0170 public SparkLauncher directory(File dir) {
0171 workingDir = dir;
0172 return this;
0173 }
0174
0175
0176
0177
0178
0179
0180 public SparkLauncher redirectError() {
0181 redirectErrorStream = true;
0182 return this;
0183 }
0184
0185
0186
0187
0188
0189
0190
0191 public SparkLauncher redirectError(ProcessBuilder.Redirect to) {
0192 errorStream = to;
0193 return this;
0194 }
0195
0196
0197
0198
0199
0200
0201
0202 public SparkLauncher redirectOutput(ProcessBuilder.Redirect to) {
0203 outputStream = to;
0204 return this;
0205 }
0206
0207
0208
0209
0210
0211
0212
0213 public SparkLauncher redirectError(File errFile) {
0214 errorStream = ProcessBuilder.Redirect.to(errFile);
0215 return this;
0216 }
0217
0218
0219
0220
0221
0222
0223
0224 public SparkLauncher redirectOutput(File outFile) {
0225 outputStream = ProcessBuilder.Redirect.to(outFile);
0226 return this;
0227 }
0228
0229
0230
0231
0232
0233
0234
0235 public SparkLauncher redirectToLog(String loggerName) {
0236 setConf(CHILD_PROCESS_LOGGER_NAME, loggerName);
0237 return this;
0238 }
0239
0240
0241
0242
0243 @Override
0244 public SparkLauncher setPropertiesFile(String path) {
0245 return super.setPropertiesFile(path);
0246 }
0247
0248 @Override
0249 public SparkLauncher setConf(String key, String value) {
0250 return super.setConf(key, value);
0251 }
0252
0253 @Override
0254 public SparkLauncher setAppName(String appName) {
0255 return super.setAppName(appName);
0256 }
0257
0258 @Override
0259 public SparkLauncher setMaster(String master) {
0260 return super.setMaster(master);
0261 }
0262
0263 @Override
0264 public SparkLauncher setDeployMode(String mode) {
0265 return super.setDeployMode(mode);
0266 }
0267
0268 @Override
0269 public SparkLauncher setAppResource(String resource) {
0270 return super.setAppResource(resource);
0271 }
0272
0273 @Override
0274 public SparkLauncher setMainClass(String mainClass) {
0275 return super.setMainClass(mainClass);
0276 }
0277
0278 @Override
0279 public SparkLauncher addSparkArg(String arg) {
0280 return super.addSparkArg(arg);
0281 }
0282
0283 @Override
0284 public SparkLauncher addSparkArg(String name, String value) {
0285 return super.addSparkArg(name, value);
0286 }
0287
0288 @Override
0289 public SparkLauncher addAppArgs(String... args) {
0290 return super.addAppArgs(args);
0291 }
0292
0293 @Override
0294 public SparkLauncher addJar(String jar) {
0295 return super.addJar(jar);
0296 }
0297
0298 @Override
0299 public SparkLauncher addFile(String file) {
0300 return super.addFile(file);
0301 }
0302
0303 @Override
0304 public SparkLauncher addPyFile(String file) {
0305 return super.addPyFile(file);
0306 }
0307
0308 @Override
0309 public SparkLauncher setVerbose(boolean verbose) {
0310 return super.setVerbose(verbose);
0311 }
0312
0313
0314
0315
0316
0317
0318
0319
0320
0321 public Process launch() throws IOException {
0322 ProcessBuilder pb = createBuilder();
0323
0324 boolean outputToLog = outputStream == null;
0325 boolean errorToLog = !redirectErrorStream && errorStream == null;
0326
0327 String loggerName = getLoggerName();
0328 if (loggerName != null && outputToLog && errorToLog) {
0329 pb.redirectErrorStream(true);
0330 }
0331
0332 Process childProc = pb.start();
0333 if (loggerName != null) {
0334 InputStream logStream = outputToLog ? childProc.getInputStream() : childProc.getErrorStream();
0335 new OutputRedirector(logStream, loggerName, REDIRECTOR_FACTORY);
0336 }
0337
0338 return childProc;
0339 }
0340
0341
0342
0343
0344
0345
0346
0347
0348
0349
0350
0351
0352
0353
0354
0355
0356
0357
0358
0359 @Override
0360 public SparkAppHandle startApplication(SparkAppHandle.Listener... listeners) throws IOException {
0361 LauncherServer server = LauncherServer.getOrCreateServer();
0362 ChildProcAppHandle handle = new ChildProcAppHandle(server);
0363 for (SparkAppHandle.Listener l : listeners) {
0364 handle.addListener(l);
0365 }
0366
0367 String secret = server.registerHandle(handle);
0368
0369 String loggerName = getLoggerName();
0370 ProcessBuilder pb = createBuilder();
0371 if (LOG.isLoggable(Level.FINE)) {
0372 LOG.fine(String.format("Launching Spark application:%n%s", join(" ", pb.command())));
0373 }
0374
0375 boolean outputToLog = outputStream == null;
0376 boolean errorToLog = !redirectErrorStream && errorStream == null;
0377
0378
0379
0380 if (loggerName == null && (outputToLog || errorToLog)) {
0381 String appName;
0382 if (builder.appName != null) {
0383 appName = builder.appName;
0384 } else if (builder.mainClass != null) {
0385 int dot = builder.mainClass.lastIndexOf(".");
0386 if (dot >= 0 && dot < builder.mainClass.length() - 1) {
0387 appName = builder.mainClass.substring(dot + 1, builder.mainClass.length());
0388 } else {
0389 appName = builder.mainClass;
0390 }
0391 } else if (builder.appResource != null) {
0392 appName = new File(builder.appResource).getName();
0393 } else {
0394 appName = String.valueOf(COUNTER.incrementAndGet());
0395 }
0396 String loggerPrefix = getClass().getPackage().getName();
0397 loggerName = String.format("%s.app.%s", loggerPrefix, appName);
0398 }
0399
0400 if (outputToLog && errorToLog) {
0401 pb.redirectErrorStream(true);
0402 }
0403
0404 pb.environment().put(LauncherProtocol.ENV_LAUNCHER_PORT, String.valueOf(server.getPort()));
0405 pb.environment().put(LauncherProtocol.ENV_LAUNCHER_SECRET, secret);
0406 try {
0407 Process child = pb.start();
0408 InputStream logStream = null;
0409 if (loggerName != null) {
0410 logStream = outputToLog ? child.getInputStream() : child.getErrorStream();
0411 }
0412 handle.setChildProc(child, loggerName, logStream);
0413 } catch (IOException ioe) {
0414 handle.kill();
0415 throw ioe;
0416 }
0417
0418 return handle;
0419 }
0420
0421 private ProcessBuilder createBuilder() throws IOException {
0422 List<String> cmd = new ArrayList<>();
0423 cmd.add(findSparkSubmit());
0424 cmd.addAll(builder.buildSparkSubmitArgs());
0425
0426
0427
0428
0429 if (isWindows()) {
0430 List<String> winCmd = new ArrayList<>();
0431 for (String arg : cmd) {
0432 winCmd.add(quoteForBatchScript(arg));
0433 }
0434 cmd = winCmd;
0435 }
0436
0437 ProcessBuilder pb = new ProcessBuilder(cmd.toArray(new String[cmd.size()]));
0438 for (Map.Entry<String, String> e : builder.childEnv.entrySet()) {
0439 pb.environment().put(e.getKey(), e.getValue());
0440 }
0441
0442 if (workingDir != null) {
0443 pb.directory(workingDir);
0444 }
0445
0446
0447
0448 checkState(!redirectErrorStream || errorStream == null,
0449 "Cannot specify both redirectError() and redirectError(...) ");
0450 checkState(getLoggerName() == null ||
0451 ((!redirectErrorStream && errorStream == null) || outputStream == null),
0452 "Cannot used redirectToLog() in conjunction with other redirection methods.");
0453
0454 if (redirectErrorStream) {
0455 pb.redirectErrorStream(true);
0456 }
0457 if (errorStream != null) {
0458 pb.redirectError(errorStream);
0459 }
0460 if (outputStream != null) {
0461 pb.redirectOutput(outputStream);
0462 }
0463
0464 return pb;
0465 }
0466
0467 @Override
0468 SparkLauncher self() {
0469 return this;
0470 }
0471
0472
0473 String findSparkSubmit() {
0474 String script = isWindows() ? "spark-submit.cmd" : "spark-submit";
0475 return join(File.separator, builder.getSparkHome(), "bin", script);
0476 }
0477
0478 private String getLoggerName() throws IOException {
0479 return builder.getEffectiveConfig().get(CHILD_PROCESS_LOGGER_NAME);
0480 }
0481
0482 }