Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.launcher;
0019 
0020 import java.io.File;
0021 import java.nio.file.Files;
0022 import java.nio.file.Path;
0023 import java.util.ArrayList;
0024 import java.util.Arrays;
0025 import java.util.EnumSet;
0026 import java.util.List;
0027 import java.util.stream.Collectors;
0028 import static java.nio.file.attribute.PosixFilePermission.*;
0029 
0030 import org.apache.log4j.AppenderSkeleton;
0031 import org.apache.log4j.spi.LoggingEvent;
0032 import org.junit.AfterClass;
0033 import org.junit.Before;
0034 import org.junit.BeforeClass;
0035 import org.junit.Test;
0036 import static org.junit.Assert.*;
0037 import static org.junit.Assume.*;
0038 
0039 import static org.apache.spark.launcher.CommandBuilderUtils.*;
0040 
0041 public class ChildProcAppHandleSuite extends BaseSuite {
0042 
0043   private static final List<String> MESSAGES = new ArrayList<>();
0044 
0045   private static final List<String> TEST_SCRIPT = Arrays.asList(
0046     "#!/bin/sh",
0047     "echo \"output\"",
0048     "echo \"error\" 1>&2",
0049     "while [ -n \"$1\" ]; do EC=$1; shift; done",
0050     "exit $EC");
0051 
0052   private static File TEST_SCRIPT_PATH;
0053 
0054   @AfterClass
0055   public static void cleanupClass() throws Exception {
0056     if (TEST_SCRIPT_PATH != null) {
0057       TEST_SCRIPT_PATH.delete();
0058       TEST_SCRIPT_PATH = null;
0059     }
0060   }
0061 
0062   @BeforeClass
0063   public static void setupClass() throws Exception {
0064     TEST_SCRIPT_PATH = File.createTempFile("output-redir-test", ".sh");
0065     Files.setPosixFilePermissions(TEST_SCRIPT_PATH.toPath(),
0066       EnumSet.of(OWNER_READ, OWNER_EXECUTE, OWNER_WRITE));
0067     Files.write(TEST_SCRIPT_PATH.toPath(), TEST_SCRIPT);
0068   }
0069 
0070   @Before
0071   public void cleanupLog() {
0072     MESSAGES.clear();
0073   }
0074 
0075   @Test
0076   public void testRedirectsSimple() throws Exception {
0077     SparkLauncher launcher = new SparkLauncher();
0078     launcher.redirectError(ProcessBuilder.Redirect.PIPE);
0079     assertNotNull(launcher.errorStream);
0080     assertEquals(ProcessBuilder.Redirect.Type.PIPE, launcher.errorStream.type());
0081 
0082     launcher.redirectOutput(ProcessBuilder.Redirect.PIPE);
0083     assertNotNull(launcher.outputStream);
0084     assertEquals(ProcessBuilder.Redirect.Type.PIPE, launcher.outputStream.type());
0085   }
0086 
0087   @Test
0088   public void testRedirectLastWins() throws Exception {
0089     SparkLauncher launcher = new SparkLauncher();
0090     launcher.redirectError(ProcessBuilder.Redirect.PIPE)
0091       .redirectError(ProcessBuilder.Redirect.INHERIT);
0092     assertEquals(ProcessBuilder.Redirect.Type.INHERIT, launcher.errorStream.type());
0093 
0094     launcher.redirectOutput(ProcessBuilder.Redirect.PIPE)
0095       .redirectOutput(ProcessBuilder.Redirect.INHERIT);
0096     assertEquals(ProcessBuilder.Redirect.Type.INHERIT, launcher.outputStream.type());
0097   }
0098 
0099   @Test
0100   public void testRedirectToLog() throws Exception {
0101     assumeFalse(isWindows());
0102 
0103     SparkAppHandle handle = (ChildProcAppHandle) new TestSparkLauncher()
0104       .startApplication();
0105     waitFor(handle);
0106 
0107     assertTrue(MESSAGES.contains("output"));
0108     assertTrue(MESSAGES.contains("error"));
0109   }
0110 
0111   @Test
0112   public void testRedirectErrorToLog() throws Exception {
0113     assumeFalse(isWindows());
0114 
0115     Path err = Files.createTempFile("stderr", "txt");
0116     err.toFile().deleteOnExit();
0117 
0118     SparkAppHandle handle = (ChildProcAppHandle) new TestSparkLauncher()
0119       .redirectError(err.toFile())
0120       .startApplication();
0121     waitFor(handle);
0122 
0123     assertTrue(MESSAGES.contains("output"));
0124     assertEquals(Arrays.asList("error"), Files.lines(err).collect(Collectors.toList()));
0125   }
0126 
0127   @Test
0128   public void testRedirectOutputToLog() throws Exception {
0129     assumeFalse(isWindows());
0130 
0131     Path out = Files.createTempFile("stdout", "txt");
0132     out.toFile().deleteOnExit();
0133 
0134     SparkAppHandle handle = (ChildProcAppHandle) new TestSparkLauncher()
0135       .redirectOutput(out.toFile())
0136       .startApplication();
0137     waitFor(handle);
0138 
0139     assertTrue(MESSAGES.contains("error"));
0140     assertEquals(Arrays.asList("output"), Files.lines(out).collect(Collectors.toList()));
0141   }
0142 
0143   @Test
0144   public void testNoRedirectToLog() throws Exception {
0145     assumeFalse(isWindows());
0146 
0147     Path out = Files.createTempFile("stdout", "txt");
0148     Path err = Files.createTempFile("stderr", "txt");
0149     out.toFile().deleteOnExit();
0150     err.toFile().deleteOnExit();
0151 
0152     ChildProcAppHandle handle = (ChildProcAppHandle) new TestSparkLauncher()
0153       .redirectError(err.toFile())
0154       .redirectOutput(out.toFile())
0155       .startApplication();
0156     waitFor(handle);
0157 
0158     assertTrue(MESSAGES.isEmpty());
0159     assertEquals(Arrays.asList("error"), Files.lines(err).collect(Collectors.toList()));
0160     assertEquals(Arrays.asList("output"), Files.lines(out).collect(Collectors.toList()));
0161   }
0162 
0163   @Test(expected = IllegalArgumentException.class)
0164   public void testBadLogRedirect() throws Exception {
0165     File out = Files.createTempFile("stdout", "txt").toFile();
0166     out.deleteOnExit();
0167     new SparkLauncher()
0168       .redirectError()
0169       .redirectOutput(out)
0170       .redirectToLog("foo")
0171       .launch()
0172       .waitFor();
0173   }
0174 
0175   @Test(expected = IllegalArgumentException.class)
0176   public void testRedirectErrorTwiceFails() throws Exception {
0177     File err = Files.createTempFile("stderr", "txt").toFile();
0178     err.deleteOnExit();
0179     new SparkLauncher()
0180       .redirectError()
0181       .redirectError(err)
0182       .launch()
0183       .waitFor();
0184   }
0185 
0186   @Test
0187   public void testProcMonitorWithOutputRedirection() throws Exception {
0188     assumeFalse(isWindows());
0189     File err = Files.createTempFile("out", "txt").toFile();
0190     err.deleteOnExit();
0191     SparkAppHandle handle = new TestSparkLauncher()
0192       .redirectError()
0193       .redirectOutput(err)
0194       .startApplication();
0195     waitFor(handle);
0196     assertEquals(SparkAppHandle.State.LOST, handle.getState());
0197   }
0198 
0199   @Test
0200   public void testProcMonitorWithLogRedirection() throws Exception {
0201     assumeFalse(isWindows());
0202     SparkAppHandle handle = new TestSparkLauncher()
0203       .redirectToLog(getClass().getName())
0204       .startApplication();
0205     waitFor(handle);
0206     assertEquals(SparkAppHandle.State.LOST, handle.getState());
0207   }
0208 
0209   @Test
0210   public void testFailedChildProc() throws Exception {
0211     assumeFalse(isWindows());
0212     SparkAppHandle handle = new TestSparkLauncher(1)
0213       .redirectToLog(getClass().getName())
0214       .startApplication();
0215     waitFor(handle);
0216     assertEquals(SparkAppHandle.State.FAILED, handle.getState());
0217   }
0218 
0219   private static class TestSparkLauncher extends SparkLauncher {
0220 
0221     TestSparkLauncher() {
0222       this(0);
0223     }
0224 
0225     TestSparkLauncher(int ec) {
0226       setAppResource("outputredirtest");
0227       addAppArgs(String.valueOf(ec));
0228     }
0229 
0230     @Override
0231     String findSparkSubmit() {
0232       return TEST_SCRIPT_PATH.getAbsolutePath();
0233     }
0234 
0235   }
0236 
0237   /**
0238    * A log4j appender used by child apps of this test. It records all messages logged through it in
0239    * memory so the test can check them.
0240    */
0241   public static class LogAppender extends AppenderSkeleton {
0242 
0243     @Override
0244     protected void append(LoggingEvent event) {
0245       MESSAGES.add(event.getMessage().toString());
0246     }
0247 
0248     @Override
0249     public boolean requiresLayout() {
0250       return false;
0251     }
0252 
0253     @Override
0254     public void close() {
0255 
0256     }
0257 
0258   }
0259 }