0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.launcher;
0019
0020 import java.io.File;
0021 import java.nio.file.Files;
0022 import java.nio.file.Path;
0023 import java.util.ArrayList;
0024 import java.util.Arrays;
0025 import java.util.EnumSet;
0026 import java.util.List;
0027 import java.util.stream.Collectors;
0028 import static java.nio.file.attribute.PosixFilePermission.*;
0029
0030 import org.apache.log4j.AppenderSkeleton;
0031 import org.apache.log4j.spi.LoggingEvent;
0032 import org.junit.AfterClass;
0033 import org.junit.Before;
0034 import org.junit.BeforeClass;
0035 import org.junit.Test;
0036 import static org.junit.Assert.*;
0037 import static org.junit.Assume.*;
0038
0039 import static org.apache.spark.launcher.CommandBuilderUtils.*;
0040
0041 public class ChildProcAppHandleSuite extends BaseSuite {
0042
0043 private static final List<String> MESSAGES = new ArrayList<>();
0044
0045 private static final List<String> TEST_SCRIPT = Arrays.asList(
0046 "#!/bin/sh",
0047 "echo \"output\"",
0048 "echo \"error\" 1>&2",
0049 "while [ -n \"$1\" ]; do EC=$1; shift; done",
0050 "exit $EC");
0051
0052 private static File TEST_SCRIPT_PATH;
0053
0054 @AfterClass
0055 public static void cleanupClass() throws Exception {
0056 if (TEST_SCRIPT_PATH != null) {
0057 TEST_SCRIPT_PATH.delete();
0058 TEST_SCRIPT_PATH = null;
0059 }
0060 }
0061
0062 @BeforeClass
0063 public static void setupClass() throws Exception {
0064 TEST_SCRIPT_PATH = File.createTempFile("output-redir-test", ".sh");
0065 Files.setPosixFilePermissions(TEST_SCRIPT_PATH.toPath(),
0066 EnumSet.of(OWNER_READ, OWNER_EXECUTE, OWNER_WRITE));
0067 Files.write(TEST_SCRIPT_PATH.toPath(), TEST_SCRIPT);
0068 }
0069
0070 @Before
0071 public void cleanupLog() {
0072 MESSAGES.clear();
0073 }
0074
0075 @Test
0076 public void testRedirectsSimple() throws Exception {
0077 SparkLauncher launcher = new SparkLauncher();
0078 launcher.redirectError(ProcessBuilder.Redirect.PIPE);
0079 assertNotNull(launcher.errorStream);
0080 assertEquals(ProcessBuilder.Redirect.Type.PIPE, launcher.errorStream.type());
0081
0082 launcher.redirectOutput(ProcessBuilder.Redirect.PIPE);
0083 assertNotNull(launcher.outputStream);
0084 assertEquals(ProcessBuilder.Redirect.Type.PIPE, launcher.outputStream.type());
0085 }
0086
0087 @Test
0088 public void testRedirectLastWins() throws Exception {
0089 SparkLauncher launcher = new SparkLauncher();
0090 launcher.redirectError(ProcessBuilder.Redirect.PIPE)
0091 .redirectError(ProcessBuilder.Redirect.INHERIT);
0092 assertEquals(ProcessBuilder.Redirect.Type.INHERIT, launcher.errorStream.type());
0093
0094 launcher.redirectOutput(ProcessBuilder.Redirect.PIPE)
0095 .redirectOutput(ProcessBuilder.Redirect.INHERIT);
0096 assertEquals(ProcessBuilder.Redirect.Type.INHERIT, launcher.outputStream.type());
0097 }
0098
0099 @Test
0100 public void testRedirectToLog() throws Exception {
0101 assumeFalse(isWindows());
0102
0103 SparkAppHandle handle = (ChildProcAppHandle) new TestSparkLauncher()
0104 .startApplication();
0105 waitFor(handle);
0106
0107 assertTrue(MESSAGES.contains("output"));
0108 assertTrue(MESSAGES.contains("error"));
0109 }
0110
0111 @Test
0112 public void testRedirectErrorToLog() throws Exception {
0113 assumeFalse(isWindows());
0114
0115 Path err = Files.createTempFile("stderr", "txt");
0116 err.toFile().deleteOnExit();
0117
0118 SparkAppHandle handle = (ChildProcAppHandle) new TestSparkLauncher()
0119 .redirectError(err.toFile())
0120 .startApplication();
0121 waitFor(handle);
0122
0123 assertTrue(MESSAGES.contains("output"));
0124 assertEquals(Arrays.asList("error"), Files.lines(err).collect(Collectors.toList()));
0125 }
0126
0127 @Test
0128 public void testRedirectOutputToLog() throws Exception {
0129 assumeFalse(isWindows());
0130
0131 Path out = Files.createTempFile("stdout", "txt");
0132 out.toFile().deleteOnExit();
0133
0134 SparkAppHandle handle = (ChildProcAppHandle) new TestSparkLauncher()
0135 .redirectOutput(out.toFile())
0136 .startApplication();
0137 waitFor(handle);
0138
0139 assertTrue(MESSAGES.contains("error"));
0140 assertEquals(Arrays.asList("output"), Files.lines(out).collect(Collectors.toList()));
0141 }
0142
0143 @Test
0144 public void testNoRedirectToLog() throws Exception {
0145 assumeFalse(isWindows());
0146
0147 Path out = Files.createTempFile("stdout", "txt");
0148 Path err = Files.createTempFile("stderr", "txt");
0149 out.toFile().deleteOnExit();
0150 err.toFile().deleteOnExit();
0151
0152 ChildProcAppHandle handle = (ChildProcAppHandle) new TestSparkLauncher()
0153 .redirectError(err.toFile())
0154 .redirectOutput(out.toFile())
0155 .startApplication();
0156 waitFor(handle);
0157
0158 assertTrue(MESSAGES.isEmpty());
0159 assertEquals(Arrays.asList("error"), Files.lines(err).collect(Collectors.toList()));
0160 assertEquals(Arrays.asList("output"), Files.lines(out).collect(Collectors.toList()));
0161 }
0162
0163 @Test(expected = IllegalArgumentException.class)
0164 public void testBadLogRedirect() throws Exception {
0165 File out = Files.createTempFile("stdout", "txt").toFile();
0166 out.deleteOnExit();
0167 new SparkLauncher()
0168 .redirectError()
0169 .redirectOutput(out)
0170 .redirectToLog("foo")
0171 .launch()
0172 .waitFor();
0173 }
0174
0175 @Test(expected = IllegalArgumentException.class)
0176 public void testRedirectErrorTwiceFails() throws Exception {
0177 File err = Files.createTempFile("stderr", "txt").toFile();
0178 err.deleteOnExit();
0179 new SparkLauncher()
0180 .redirectError()
0181 .redirectError(err)
0182 .launch()
0183 .waitFor();
0184 }
0185
0186 @Test
0187 public void testProcMonitorWithOutputRedirection() throws Exception {
0188 assumeFalse(isWindows());
0189 File err = Files.createTempFile("out", "txt").toFile();
0190 err.deleteOnExit();
0191 SparkAppHandle handle = new TestSparkLauncher()
0192 .redirectError()
0193 .redirectOutput(err)
0194 .startApplication();
0195 waitFor(handle);
0196 assertEquals(SparkAppHandle.State.LOST, handle.getState());
0197 }
0198
0199 @Test
0200 public void testProcMonitorWithLogRedirection() throws Exception {
0201 assumeFalse(isWindows());
0202 SparkAppHandle handle = new TestSparkLauncher()
0203 .redirectToLog(getClass().getName())
0204 .startApplication();
0205 waitFor(handle);
0206 assertEquals(SparkAppHandle.State.LOST, handle.getState());
0207 }
0208
0209 @Test
0210 public void testFailedChildProc() throws Exception {
0211 assumeFalse(isWindows());
0212 SparkAppHandle handle = new TestSparkLauncher(1)
0213 .redirectToLog(getClass().getName())
0214 .startApplication();
0215 waitFor(handle);
0216 assertEquals(SparkAppHandle.State.FAILED, handle.getState());
0217 }
0218
0219 private static class TestSparkLauncher extends SparkLauncher {
0220
0221 TestSparkLauncher() {
0222 this(0);
0223 }
0224
0225 TestSparkLauncher(int ec) {
0226 setAppResource("outputredirtest");
0227 addAppArgs(String.valueOf(ec));
0228 }
0229
0230 @Override
0231 String findSparkSubmit() {
0232 return TEST_SCRIPT_PATH.getAbsolutePath();
0233 }
0234
0235 }
0236
0237
0238
0239
0240
0241 public static class LogAppender extends AppenderSkeleton {
0242
0243 @Override
0244 protected void append(LoggingEvent event) {
0245 MESSAGES.add(event.getMessage().toString());
0246 }
0247
0248 @Override
0249 public boolean requiresLayout() {
0250 return false;
0251 }
0252
0253 @Override
0254 public void close() {
0255
0256 }
0257
0258 }
0259 }