Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.launcher;
0019 
0020 import java.io.InputStream;
0021 import java.util.Optional;
0022 import java.util.logging.Level;
0023 import java.util.logging.Logger;
0024 
0025 /**
0026  * Handle implementation for monitoring apps started as a child process.
0027  */
0028 class ChildProcAppHandle extends AbstractAppHandle {
0029 
0030   private static final Logger LOG = Logger.getLogger(ChildProcAppHandle.class.getName());
0031 
0032   private volatile Process childProc;
0033   private volatile OutputRedirector redirector;
0034 
0035   ChildProcAppHandle(LauncherServer server) {
0036     super(server);
0037   }
0038 
0039   @Override
0040   public synchronized void disconnect() {
0041     try {
0042       super.disconnect();
0043     } finally {
0044       if (redirector != null) {
0045         redirector.stop();
0046       }
0047     }
0048   }
0049 
0050   /**
0051    * Parses the logs of {@code spark-submit} and returns the last exception thrown.
0052    * <p>
0053    * Since {@link SparkLauncher} runs {@code spark-submit} in a sub-process, it's difficult to
0054    * accurately retrieve the full {@link Throwable} from the {@code spark-submit} process.
0055    * This method parses the logs of the sub-process and provides a best-effort attempt at
0056    * returning the last exception thrown by the {@code spark-submit} process. Only the exception
0057    * message is parsed, the associated stacktrace is meaningless.
0058    *
0059    * @return an {@link Optional} containing a {@link RuntimeException} with the parsed
0060    * exception, otherwise returns a {@link Optional#EMPTY}
0061    */
0062   @Override
0063   public Optional<Throwable> getError() {
0064     return redirector != null ? Optional.ofNullable(redirector.getError()) : Optional.empty();
0065   }
0066 
0067   @Override
0068   public synchronized void kill() {
0069     if (!isDisposed()) {
0070       setState(State.KILLED);
0071       disconnect();
0072       if (childProc != null) {
0073         if (childProc.isAlive()) {
0074           childProc.destroyForcibly();
0075         }
0076         childProc = null;
0077       }
0078     }
0079   }
0080 
0081   void setChildProc(Process childProc, String loggerName, InputStream logStream) {
0082     this.childProc = childProc;
0083     if (logStream != null) {
0084       this.redirector = new OutputRedirector(logStream, loggerName,
0085         SparkLauncher.REDIRECTOR_FACTORY, this);
0086     } else {
0087       // If there is no log redirection, spawn a thread that will wait for the child process
0088       // to finish.
0089       SparkLauncher.REDIRECTOR_FACTORY.newThread(this::monitorChild).start();
0090     }
0091   }
0092 
0093   /**
0094    * Wait for the child process to exit and update the handle's state if necessary, according to
0095    * the exit code.
0096    */
0097   void monitorChild() {
0098     Process proc = childProc;
0099     if (proc == null) {
0100       // Process may have already been disposed of, e.g. by calling kill().
0101       return;
0102     }
0103 
0104     while (proc.isAlive()) {
0105       try {
0106         proc.waitFor();
0107       } catch (Exception e) {
0108         LOG.log(Level.WARNING, "Exception waiting for child process to exit.", e);
0109       }
0110     }
0111 
0112     synchronized (this) {
0113       if (isDisposed()) {
0114         return;
0115       }
0116 
0117       int ec;
0118       try {
0119         ec = proc.exitValue();
0120       } catch (Exception e) {
0121         LOG.log(Level.WARNING, "Exception getting child process exit code, assuming failure.", e);
0122         ec = 1;
0123       }
0124 
0125       if (ec != 0) {
0126         State currState = getState();
0127         // Override state with failure if the current state is not final, or is success.
0128         if (!currState.isFinal() || currState == State.FINISHED) {
0129           setState(State.FAILED, true);
0130         }
0131       }
0132 
0133       dispose();
0134     }
0135   }
0136 
0137 }