Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.launcher;
0019 
0020 import java.io.IOException;
0021 import java.lang.reflect.Method;
0022 import java.lang.reflect.Modifier;
0023 import java.util.List;
0024 import java.util.logging.Logger;
0025 
0026 /**
0027  * In-process launcher for Spark applications.
0028  * <p>
0029  * Use this class to start Spark applications programmatically. Applications launched using this
0030  * class will run in the same process as the caller.
0031  * <p>
0032  * Because Spark only supports a single active instance of <code>SparkContext</code> per JVM, code
0033  * that uses this class should be careful about which applications are launched. It's recommended
0034  * that this launcher only be used to launch applications in cluster mode.
0035  * <p>
0036  * Also note that, when running applications in client mode, JVM-related configurations (like
0037  * driver memory or configs which modify the driver's class path) do not take effect. Logging
0038  * configuration is also inherited from the parent application.
0039  *
0040  * @since Spark 2.3.0
0041  */
0042 public class InProcessLauncher extends AbstractLauncher<InProcessLauncher> {
0043 
0044   private static final Logger LOG = Logger.getLogger(InProcessLauncher.class.getName());
0045 
0046   /**
0047    * Starts a Spark application.
0048    *
0049    * @see AbstractLauncher#startApplication(SparkAppHandle.Listener...)
0050    * @param listeners Listeners to add to the handle before the app is launched.
0051    * @return A handle for the launched application.
0052    */
0053   @Override
0054   public SparkAppHandle startApplication(SparkAppHandle.Listener... listeners) throws IOException {
0055     if (builder.isClientMode(builder.getEffectiveConfig())) {
0056       LOG.warning("It's not recommended to run client-mode applications using InProcessLauncher.");
0057     }
0058 
0059     Method main = findSparkSubmit();
0060     LauncherServer server = LauncherServer.getOrCreateServer();
0061     InProcessAppHandle handle = new InProcessAppHandle(server);
0062     for (SparkAppHandle.Listener l : listeners) {
0063       handle.addListener(l);
0064     }
0065 
0066     String secret = server.registerHandle(handle);
0067     setConf(LauncherProtocol.CONF_LAUNCHER_PORT, String.valueOf(server.getPort()));
0068     setConf(LauncherProtocol.CONF_LAUNCHER_SECRET, secret);
0069 
0070     List<String> sparkArgs = builder.buildSparkSubmitArgs();
0071     String[] argv = sparkArgs.toArray(new String[sparkArgs.size()]);
0072 
0073     String appName = CommandBuilderUtils.firstNonEmpty(builder.appName, builder.mainClass,
0074       "<unknown>");
0075     handle.start(appName, main, argv);
0076     return handle;
0077   }
0078 
0079   @Override
0080   InProcessLauncher self() {
0081     return this;
0082   }
0083 
0084   // Visible for testing.
0085   Method findSparkSubmit() throws IOException {
0086     ClassLoader cl = Thread.currentThread().getContextClassLoader();
0087     if (cl == null) {
0088       cl = getClass().getClassLoader();
0089     }
0090 
0091     Class<?> sparkSubmit;
0092     // SPARK-22941: first try the new SparkSubmit interface that has better error handling,
0093     // but fall back to the old interface in case someone is mixing & matching launcher and
0094     // Spark versions.
0095     try {
0096       sparkSubmit = cl.loadClass("org.apache.spark.deploy.InProcessSparkSubmit");
0097     } catch (Exception e1) {
0098       try {
0099         sparkSubmit = cl.loadClass("org.apache.spark.deploy.SparkSubmit");
0100       } catch (Exception e2) {
0101         throw new IOException("Cannot find SparkSubmit; make sure necessary jars are available.",
0102           e2);
0103       }
0104     }
0105 
0106     Method main;
0107     try {
0108       main = sparkSubmit.getMethod("main", String[].class);
0109     } catch (Exception e) {
0110       throw new IOException("Cannot find SparkSubmit main method.", e);
0111     }
0112 
0113     CommandBuilderUtils.checkState(Modifier.isStatic(main.getModifiers()),
0114       "main method is not static.");
0115     return main;
0116   }
0117 
0118 }