Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.launcher;
0019 
0020 import java.io.Closeable;
0021 import java.io.EOFException;
0022 import java.io.IOException;
0023 import java.io.ObjectOutputStream;
0024 import java.net.Socket;
0025 import java.util.logging.Level;
0026 import java.util.logging.Logger;
0027 
0028 import static org.apache.spark.launcher.LauncherProtocol.*;
0029 
0030 /**
0031  * Encapsulates a connection between a launcher server and client. This takes care of the
0032  * communication (sending and receiving messages), while processing of messages is left for
0033  * the implementations.
0034  */
0035 abstract class LauncherConnection implements Closeable, Runnable {
0036 
0037   private static final Logger LOG = Logger.getLogger(LauncherConnection.class.getName());
0038 
0039   private final Socket socket;
0040   private final ObjectOutputStream out;
0041 
0042   private volatile boolean closed;
0043 
0044   LauncherConnection(Socket socket) throws IOException {
0045     this.socket = socket;
0046     this.out = new ObjectOutputStream(socket.getOutputStream());
0047     this.closed = false;
0048   }
0049 
0050   protected abstract void handle(Message msg) throws IOException;
0051 
0052   @Override
0053   public void run() {
0054     try {
0055       FilteredObjectInputStream in = new FilteredObjectInputStream(socket.getInputStream());
0056       while (isOpen()) {
0057         Message msg = (Message) in.readObject();
0058         handle(msg);
0059       }
0060     } catch (EOFException eof) {
0061       // Remote side has closed the connection, just cleanup.
0062       try {
0063         close();
0064       } catch (Exception unused) {
0065         // no-op.
0066       }
0067     } catch (Exception e) {
0068       if (!closed) {
0069         LOG.log(Level.WARNING, "Error in inbound message handling.", e);
0070         try {
0071           close();
0072         } catch (Exception unused) {
0073           // no-op.
0074         }
0075       }
0076     }
0077   }
0078 
0079   protected synchronized void send(Message msg) throws IOException {
0080     try {
0081       CommandBuilderUtils.checkState(!closed, "Disconnected.");
0082       out.writeObject(msg);
0083       out.flush();
0084     } catch (IOException ioe) {
0085       if (!closed) {
0086         LOG.log(Level.WARNING, "Error when sending message.", ioe);
0087         try {
0088           close();
0089         } catch (Exception unused) {
0090           // no-op.
0091         }
0092       }
0093       throw ioe;
0094     }
0095   }
0096 
0097   @Override
0098   public synchronized void close() throws IOException {
0099     if (isOpen()) {
0100       closed = true;
0101       socket.close();
0102     }
0103   }
0104 
0105   boolean isOpen() {
0106     return !closed;
0107   }
0108 
0109 }