Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.launcher;
0019 
0020 import java.io.IOException;
0021 import java.util.List;
0022 import java.util.concurrent.CopyOnWriteArrayList;
0023 import java.util.concurrent.atomic.AtomicReference;
0024 import java.util.logging.Level;
0025 import java.util.logging.Logger;
0026 
0027 abstract class AbstractAppHandle implements SparkAppHandle {
0028 
0029   private static final Logger LOG = Logger.getLogger(AbstractAppHandle.class.getName());
0030 
0031   private final LauncherServer server;
0032 
0033   private LauncherServer.ServerConnection connection;
0034   private List<SparkAppHandle.Listener> listeners;
0035   private AtomicReference<SparkAppHandle.State> state;
0036   private volatile String appId;
0037   private volatile boolean disposed;
0038 
0039   protected AbstractAppHandle(LauncherServer server) {
0040     this.server = server;
0041     this.state = new AtomicReference<>(State.UNKNOWN);
0042   }
0043 
0044   @Override
0045   public synchronized void addListener(SparkAppHandle.Listener l) {
0046     if (listeners == null) {
0047       listeners = new CopyOnWriteArrayList<>();
0048     }
0049     listeners.add(l);
0050   }
0051 
0052   @Override
0053   public SparkAppHandle.State getState() {
0054     return state.get();
0055   }
0056 
0057   @Override
0058   public String getAppId() {
0059     return appId;
0060   }
0061 
0062   @Override
0063   public void stop() {
0064     CommandBuilderUtils.checkState(connection != null, "Application is still not connected.");
0065     try {
0066       connection.send(new LauncherProtocol.Stop());
0067     } catch (IOException ioe) {
0068       throw new RuntimeException(ioe);
0069     }
0070   }
0071 
0072   @Override
0073   public synchronized void disconnect() {
0074     if (connection != null && connection.isOpen()) {
0075       try {
0076         connection.close();
0077       } catch (IOException ioe) {
0078         // no-op.
0079       }
0080     }
0081     dispose();
0082   }
0083 
0084   void setConnection(LauncherServer.ServerConnection connection) {
0085     this.connection = connection;
0086   }
0087 
0088   LauncherConnection getConnection() {
0089     return connection;
0090   }
0091 
0092   boolean isDisposed() {
0093     return disposed;
0094   }
0095 
0096   /**
0097    * Mark the handle as disposed, and set it as LOST in case the current state is not final.
0098    *
0099    * This method should be called only when there's a reasonable expectation that the communication
0100    * with the child application is not needed anymore, either because the code managing the handle
0101    * has said so, or because the child application is finished.
0102    */
0103   synchronized void dispose() {
0104     if (!isDisposed()) {
0105       // First wait for all data from the connection to be read. Then unregister the handle.
0106       // Otherwise, unregistering might cause the server to be stopped and all child connections
0107       // to be closed.
0108       if (connection != null) {
0109         try {
0110           connection.waitForClose();
0111         } catch (IOException ioe) {
0112           // no-op.
0113         }
0114       }
0115       server.unregister(this);
0116 
0117       // Set state to LOST if not yet final.
0118       setState(State.LOST, false);
0119       this.disposed = true;
0120     }
0121   }
0122 
0123   void setState(SparkAppHandle.State s) {
0124     setState(s, false);
0125   }
0126 
0127   void setState(SparkAppHandle.State s, boolean force) {
0128     if (force) {
0129       state.set(s);
0130       fireEvent(false);
0131       return;
0132     }
0133 
0134     State current = state.get();
0135     while (!current.isFinal()) {
0136       if (state.compareAndSet(current, s)) {
0137         fireEvent(false);
0138         return;
0139       }
0140       current = state.get();
0141     }
0142 
0143     if (s != State.LOST) {
0144       LOG.log(Level.WARNING, "Backend requested transition from final state {0} to {1}.",
0145         new Object[] { current, s });
0146     }
0147   }
0148 
0149   void setAppId(String appId) {
0150     this.appId = appId;
0151     fireEvent(true);
0152   }
0153 
0154   private void fireEvent(boolean isInfoChanged) {
0155     if (listeners != null) {
0156       for (Listener l : listeners) {
0157         if (isInfoChanged) {
0158           l.infoChanged(this);
0159         } else {
0160           l.stateChanged(this);
0161         }
0162       }
0163     }
0164   }
0165 
0166 }