0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.launcher;
0019
0020 import java.io.InputStream;
0021 import java.util.Optional;
0022 import java.util.logging.Level;
0023 import java.util.logging.Logger;
0024
0025
0026
0027
0028 class ChildProcAppHandle extends AbstractAppHandle {
0029
0030 private static final Logger LOG = Logger.getLogger(ChildProcAppHandle.class.getName());
0031
0032 private volatile Process childProc;
0033 private volatile OutputRedirector redirector;
0034
0035 ChildProcAppHandle(LauncherServer server) {
0036 super(server);
0037 }
0038
0039 @Override
0040 public synchronized void disconnect() {
0041 try {
0042 super.disconnect();
0043 } finally {
0044 if (redirector != null) {
0045 redirector.stop();
0046 }
0047 }
0048 }
0049
0050
0051
0052
0053
0054
0055
0056
0057
0058
0059
0060
0061
0062 @Override
0063 public Optional<Throwable> getError() {
0064 return redirector != null ? Optional.ofNullable(redirector.getError()) : Optional.empty();
0065 }
0066
0067 @Override
0068 public synchronized void kill() {
0069 if (!isDisposed()) {
0070 setState(State.KILLED);
0071 disconnect();
0072 if (childProc != null) {
0073 if (childProc.isAlive()) {
0074 childProc.destroyForcibly();
0075 }
0076 childProc = null;
0077 }
0078 }
0079 }
0080
0081 void setChildProc(Process childProc, String loggerName, InputStream logStream) {
0082 this.childProc = childProc;
0083 if (logStream != null) {
0084 this.redirector = new OutputRedirector(logStream, loggerName,
0085 SparkLauncher.REDIRECTOR_FACTORY, this);
0086 } else {
0087
0088
0089 SparkLauncher.REDIRECTOR_FACTORY.newThread(this::monitorChild).start();
0090 }
0091 }
0092
0093
0094
0095
0096
0097 void monitorChild() {
0098 Process proc = childProc;
0099 if (proc == null) {
0100
0101 return;
0102 }
0103
0104 while (proc.isAlive()) {
0105 try {
0106 proc.waitFor();
0107 } catch (Exception e) {
0108 LOG.log(Level.WARNING, "Exception waiting for child process to exit.", e);
0109 }
0110 }
0111
0112 synchronized (this) {
0113 if (isDisposed()) {
0114 return;
0115 }
0116
0117 int ec;
0118 try {
0119 ec = proc.exitValue();
0120 } catch (Exception e) {
0121 LOG.log(Level.WARNING, "Exception getting child process exit code, assuming failure.", e);
0122 ec = 1;
0123 }
0124
0125 if (ec != 0) {
0126 State currState = getState();
0127
0128 if (!currState.isFinal() || currState == State.FINISHED) {
0129 setState(State.FAILED, true);
0130 }
0131 }
0132
0133 dispose();
0134 }
0135 }
0136
0137 }