0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.launcher;
0019
0020 import java.io.BufferedReader;
0021 import java.io.InputStream;
0022 import java.io.InputStreamReader;
0023 import java.io.IOException;
0024 import java.nio.charset.StandardCharsets;
0025 import java.util.concurrent.ThreadFactory;
0026 import java.util.logging.Level;
0027 import java.util.logging.Logger;
0028
0029
0030
0031
0032 class OutputRedirector {
0033
0034 private final BufferedReader reader;
0035 private final Logger sink;
0036 private final Thread thread;
0037 private final ChildProcAppHandle callback;
0038
0039 private volatile boolean active;
0040 private volatile Throwable error;
0041
0042 OutputRedirector(InputStream in, String loggerName, ThreadFactory tf) {
0043 this(in, loggerName, tf, null);
0044 }
0045
0046 OutputRedirector(
0047 InputStream in,
0048 String loggerName,
0049 ThreadFactory tf,
0050 ChildProcAppHandle callback) {
0051 this.active = true;
0052 this.reader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8));
0053 this.thread = tf.newThread(this::redirect);
0054 this.sink = Logger.getLogger(loggerName);
0055 this.callback = callback;
0056 thread.start();
0057 }
0058
0059 private void redirect() {
0060 try {
0061 String line;
0062 while ((line = reader.readLine()) != null) {
0063 if (active) {
0064 sink.info(line.replaceFirst("\\s*$", ""));
0065 if ((containsIgnoreCase(line, "Error") || containsIgnoreCase(line, "Exception")) &&
0066 !line.contains("at ")) {
0067 error = new RuntimeException(line);
0068 }
0069 }
0070 }
0071 } catch (IOException e) {
0072 sink.log(Level.FINE, "Error reading child process output.", e);
0073 } finally {
0074 if (callback != null) {
0075 callback.monitorChild();
0076 }
0077 }
0078 }
0079
0080
0081
0082
0083
0084
0085 void stop() {
0086 active = false;
0087 }
0088
0089 boolean isAlive() {
0090 return thread.isAlive();
0091 }
0092
0093 Throwable getError() {
0094 return error;
0095 }
0096
0097
0098
0099
0100 private static boolean containsIgnoreCase(String str, String searchStr) {
0101 if (str == null || searchStr == null) {
0102 return false;
0103 }
0104 int len = searchStr.length();
0105 int max = str.length() - len;
0106 for (int i = 0; i <= max; i++) {
0107 if (str.regionMatches(true, i, searchStr, 0, len)) {
0108 return true;
0109 }
0110 }
0111 return false;
0112 }
0113 }