0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.launcher;
0019
0020 import java.io.Closeable;
0021 import java.io.EOFException;
0022 import java.io.IOException;
0023 import java.io.ObjectOutputStream;
0024 import java.net.Socket;
0025 import java.util.logging.Level;
0026 import java.util.logging.Logger;
0027
0028 import static org.apache.spark.launcher.LauncherProtocol.*;
0029
0030
0031
0032
0033
0034
0035 abstract class LauncherConnection implements Closeable, Runnable {
0036
0037 private static final Logger LOG = Logger.getLogger(LauncherConnection.class.getName());
0038
0039 private final Socket socket;
0040 private final ObjectOutputStream out;
0041
0042 private volatile boolean closed;
0043
0044 LauncherConnection(Socket socket) throws IOException {
0045 this.socket = socket;
0046 this.out = new ObjectOutputStream(socket.getOutputStream());
0047 this.closed = false;
0048 }
0049
0050 protected abstract void handle(Message msg) throws IOException;
0051
0052 @Override
0053 public void run() {
0054 try {
0055 FilteredObjectInputStream in = new FilteredObjectInputStream(socket.getInputStream());
0056 while (isOpen()) {
0057 Message msg = (Message) in.readObject();
0058 handle(msg);
0059 }
0060 } catch (EOFException eof) {
0061
0062 try {
0063 close();
0064 } catch (Exception unused) {
0065
0066 }
0067 } catch (Exception e) {
0068 if (!closed) {
0069 LOG.log(Level.WARNING, "Error in inbound message handling.", e);
0070 try {
0071 close();
0072 } catch (Exception unused) {
0073
0074 }
0075 }
0076 }
0077 }
0078
0079 protected synchronized void send(Message msg) throws IOException {
0080 try {
0081 CommandBuilderUtils.checkState(!closed, "Disconnected.");
0082 out.writeObject(msg);
0083 out.flush();
0084 } catch (IOException ioe) {
0085 if (!closed) {
0086 LOG.log(Level.WARNING, "Error when sending message.", ioe);
0087 try {
0088 close();
0089 } catch (Exception unused) {
0090
0091 }
0092 }
0093 throw ioe;
0094 }
0095 }
0096
0097 @Override
0098 public synchronized void close() throws IOException {
0099 if (isOpen()) {
0100 closed = true;
0101 socket.close();
0102 }
0103 }
0104
0105 boolean isOpen() {
0106 return !closed;
0107 }
0108
0109 }