Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.launcher;
0019 
0020 import java.io.BufferedReader;
0021 import java.io.InputStream;
0022 import java.io.InputStreamReader;
0023 import java.io.IOException;
0024 import java.nio.charset.StandardCharsets;
0025 import java.util.concurrent.ThreadFactory;
0026 import java.util.logging.Level;
0027 import java.util.logging.Logger;
0028 
0029 /**
0030  * Redirects lines read from a given input stream to a j.u.l.Logger (at INFO level).
0031  */
0032 class OutputRedirector {
0033 
0034   private final BufferedReader reader;
0035   private final Logger sink;
0036   private final Thread thread;
0037   private final ChildProcAppHandle callback;
0038 
0039   private volatile boolean active;
0040   private volatile Throwable error;
0041 
0042   OutputRedirector(InputStream in, String loggerName, ThreadFactory tf) {
0043     this(in, loggerName, tf, null);
0044   }
0045 
0046   OutputRedirector(
0047       InputStream in,
0048       String loggerName,
0049       ThreadFactory tf,
0050       ChildProcAppHandle callback) {
0051     this.active = true;
0052     this.reader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8));
0053     this.thread = tf.newThread(this::redirect);
0054     this.sink = Logger.getLogger(loggerName);
0055     this.callback = callback;
0056     thread.start();
0057   }
0058 
0059   private void redirect() {
0060     try {
0061       String line;
0062       while ((line = reader.readLine()) != null) {
0063         if (active) {
0064           sink.info(line.replaceFirst("\\s*$", ""));
0065           if ((containsIgnoreCase(line, "Error") || containsIgnoreCase(line, "Exception")) &&
0066               !line.contains("at ")) {
0067             error = new RuntimeException(line);
0068           }
0069         }
0070       }
0071     } catch (IOException e) {
0072       sink.log(Level.FINE, "Error reading child process output.", e);
0073     } finally {
0074       if (callback != null) {
0075         callback.monitorChild();
0076       }
0077     }
0078   }
0079 
0080   /**
0081    * This method just stops the output of the process from showing up in the local logs.
0082    * The child's output will still be read (and, thus, the redirect thread will still be
0083    * alive) to avoid the child process hanging because of lack of output buffer.
0084    */
0085   void stop() {
0086     active = false;
0087   }
0088 
0089   boolean isAlive() {
0090     return thread.isAlive();
0091   }
0092 
0093   Throwable getError() {
0094     return error;
0095   }
0096 
0097   /**
0098    * Copied from Apache Commons Lang {@code StringUtils#containsIgnoreCase(String, String)}
0099    */
0100   private static boolean containsIgnoreCase(String str, String searchStr) {
0101     if (str == null || searchStr == null) {
0102       return false;
0103     }
0104     int len = searchStr.length();
0105     int max = str.length() - len;
0106     for (int i = 0; i <= max; i++) {
0107       if (str.regionMatches(true, i, searchStr, 0, len)) {
0108         return true;
0109       }
0110     }
0111     return false;
0112   }
0113 }