0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.launcher;
0019
0020 import java.io.IOException;
0021 import java.lang.reflect.Method;
0022 import java.lang.reflect.Modifier;
0023 import java.util.List;
0024 import java.util.logging.Logger;
0025
0026
0027
0028
0029
0030
0031
0032
0033
0034
0035
0036
0037
0038
0039
0040
0041
0042 public class InProcessLauncher extends AbstractLauncher<InProcessLauncher> {
0043
0044 private static final Logger LOG = Logger.getLogger(InProcessLauncher.class.getName());
0045
0046
0047
0048
0049
0050
0051
0052
0053 @Override
0054 public SparkAppHandle startApplication(SparkAppHandle.Listener... listeners) throws IOException {
0055 if (builder.isClientMode(builder.getEffectiveConfig())) {
0056 LOG.warning("It's not recommended to run client-mode applications using InProcessLauncher.");
0057 }
0058
0059 Method main = findSparkSubmit();
0060 LauncherServer server = LauncherServer.getOrCreateServer();
0061 InProcessAppHandle handle = new InProcessAppHandle(server);
0062 for (SparkAppHandle.Listener l : listeners) {
0063 handle.addListener(l);
0064 }
0065
0066 String secret = server.registerHandle(handle);
0067 setConf(LauncherProtocol.CONF_LAUNCHER_PORT, String.valueOf(server.getPort()));
0068 setConf(LauncherProtocol.CONF_LAUNCHER_SECRET, secret);
0069
0070 List<String> sparkArgs = builder.buildSparkSubmitArgs();
0071 String[] argv = sparkArgs.toArray(new String[sparkArgs.size()]);
0072
0073 String appName = CommandBuilderUtils.firstNonEmpty(builder.appName, builder.mainClass,
0074 "<unknown>");
0075 handle.start(appName, main, argv);
0076 return handle;
0077 }
0078
0079 @Override
0080 InProcessLauncher self() {
0081 return this;
0082 }
0083
0084
0085 Method findSparkSubmit() throws IOException {
0086 ClassLoader cl = Thread.currentThread().getContextClassLoader();
0087 if (cl == null) {
0088 cl = getClass().getClassLoader();
0089 }
0090
0091 Class<?> sparkSubmit;
0092
0093
0094
0095 try {
0096 sparkSubmit = cl.loadClass("org.apache.spark.deploy.InProcessSparkSubmit");
0097 } catch (Exception e1) {
0098 try {
0099 sparkSubmit = cl.loadClass("org.apache.spark.deploy.SparkSubmit");
0100 } catch (Exception e2) {
0101 throw new IOException("Cannot find SparkSubmit; make sure necessary jars are available.",
0102 e2);
0103 }
0104 }
0105
0106 Method main;
0107 try {
0108 main = sparkSubmit.getMethod("main", String[].class);
0109 } catch (Exception e) {
0110 throw new IOException("Cannot find SparkSubmit main method.", e);
0111 }
0112
0113 CommandBuilderUtils.checkState(Modifier.isStatic(main.getModifiers()),
0114 "main method is not static.");
0115 return main;
0116 }
0117
0118 }