0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.launcher;
0019
0020 import java.io.IOException;
0021 import java.util.List;
0022 import java.util.concurrent.CopyOnWriteArrayList;
0023 import java.util.concurrent.atomic.AtomicReference;
0024 import java.util.logging.Level;
0025 import java.util.logging.Logger;
0026
0027 abstract class AbstractAppHandle implements SparkAppHandle {
0028
0029 private static final Logger LOG = Logger.getLogger(AbstractAppHandle.class.getName());
0030
0031 private final LauncherServer server;
0032
0033 private LauncherServer.ServerConnection connection;
0034 private List<SparkAppHandle.Listener> listeners;
0035 private AtomicReference<SparkAppHandle.State> state;
0036 private volatile String appId;
0037 private volatile boolean disposed;
0038
0039 protected AbstractAppHandle(LauncherServer server) {
0040 this.server = server;
0041 this.state = new AtomicReference<>(State.UNKNOWN);
0042 }
0043
0044 @Override
0045 public synchronized void addListener(SparkAppHandle.Listener l) {
0046 if (listeners == null) {
0047 listeners = new CopyOnWriteArrayList<>();
0048 }
0049 listeners.add(l);
0050 }
0051
0052 @Override
0053 public SparkAppHandle.State getState() {
0054 return state.get();
0055 }
0056
0057 @Override
0058 public String getAppId() {
0059 return appId;
0060 }
0061
0062 @Override
0063 public void stop() {
0064 CommandBuilderUtils.checkState(connection != null, "Application is still not connected.");
0065 try {
0066 connection.send(new LauncherProtocol.Stop());
0067 } catch (IOException ioe) {
0068 throw new RuntimeException(ioe);
0069 }
0070 }
0071
0072 @Override
0073 public synchronized void disconnect() {
0074 if (connection != null && connection.isOpen()) {
0075 try {
0076 connection.close();
0077 } catch (IOException ioe) {
0078
0079 }
0080 }
0081 dispose();
0082 }
0083
0084 void setConnection(LauncherServer.ServerConnection connection) {
0085 this.connection = connection;
0086 }
0087
0088 LauncherConnection getConnection() {
0089 return connection;
0090 }
0091
0092 boolean isDisposed() {
0093 return disposed;
0094 }
0095
0096
0097
0098
0099
0100
0101
0102
0103 synchronized void dispose() {
0104 if (!isDisposed()) {
0105
0106
0107
0108 if (connection != null) {
0109 try {
0110 connection.waitForClose();
0111 } catch (IOException ioe) {
0112
0113 }
0114 }
0115 server.unregister(this);
0116
0117
0118 setState(State.LOST, false);
0119 this.disposed = true;
0120 }
0121 }
0122
0123 void setState(SparkAppHandle.State s) {
0124 setState(s, false);
0125 }
0126
0127 void setState(SparkAppHandle.State s, boolean force) {
0128 if (force) {
0129 state.set(s);
0130 fireEvent(false);
0131 return;
0132 }
0133
0134 State current = state.get();
0135 while (!current.isFinal()) {
0136 if (state.compareAndSet(current, s)) {
0137 fireEvent(false);
0138 return;
0139 }
0140 current = state.get();
0141 }
0142
0143 if (s != State.LOST) {
0144 LOG.log(Level.WARNING, "Backend requested transition from final state {0} to {1}.",
0145 new Object[] { current, s });
0146 }
0147 }
0148
0149 void setAppId(String appId) {
0150 this.appId = appId;
0151 fireEvent(true);
0152 }
0153
0154 private void fireEvent(boolean isInfoChanged) {
0155 if (listeners != null) {
0156 for (Listener l : listeners) {
0157 if (isInfoChanged) {
0158 l.infoChanged(this);
0159 } else {
0160 l.stateChanged(this);
0161 }
0162 }
0163 }
0164 }
0165
0166 }