Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.launcher;
0019 
0020 import java.lang.reflect.InvocationTargetException;
0021 import java.lang.reflect.Method;
0022 import java.util.Optional;
0023 import java.util.concurrent.atomic.AtomicLong;
0024 import java.util.logging.Level;
0025 import java.util.logging.Logger;
0026 
0027 class InProcessAppHandle extends AbstractAppHandle {
0028 
0029   private static final String THREAD_NAME_FMT = "spark-app-%d: '%s'";
0030   private static final Logger LOG = Logger.getLogger(InProcessAppHandle.class.getName());
0031   private static final AtomicLong THREAD_IDS = new AtomicLong();
0032 
0033   // Avoid really long thread names.
0034   private static final int MAX_APP_NAME_LEN = 16;
0035 
0036   private volatile Throwable error;
0037 
0038   private Thread app;
0039 
0040   InProcessAppHandle(LauncherServer server) {
0041     super(server);
0042   }
0043 
0044   @Override
0045   public synchronized void kill() {
0046     if (!isDisposed()) {
0047       LOG.warning("kill() may leave the underlying app running in in-process mode.");
0048       setState(State.KILLED);
0049       disconnect();
0050 
0051       // Interrupt the thread. This is not guaranteed to kill the app, though.
0052       if (app != null) {
0053         app.interrupt();
0054       }
0055     }
0056   }
0057 
0058   @Override
0059   public Optional<Throwable> getError() {
0060     return Optional.ofNullable(error);
0061   }
0062 
0063   synchronized void start(String appName, Method main, String[] args) {
0064     CommandBuilderUtils.checkState(app == null, "Handle already started.");
0065 
0066     if (appName.length() > MAX_APP_NAME_LEN) {
0067       appName = "..." + appName.substring(appName.length() - MAX_APP_NAME_LEN);
0068     }
0069 
0070     app = new Thread(() -> {
0071       try {
0072         main.invoke(null, (Object) args);
0073       } catch (Throwable t) {
0074         if (t instanceof InvocationTargetException) {
0075           t = t.getCause();
0076         }
0077         LOG.log(Level.WARNING, "Application failed with exception.", t);
0078         error = t;
0079         setState(State.FAILED);
0080       }
0081 
0082       dispose();
0083     });
0084 
0085     app.setName(String.format(THREAD_NAME_FMT, THREAD_IDS.incrementAndGet(), appName));
0086     app.start();
0087   }
0088 
0089 }