0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.launcher;
0019
0020 import java.io.IOException;
0021 import java.lang.reflect.Method;
0022 import java.util.Arrays;
0023 import java.util.List;
0024 import java.util.concurrent.atomic.AtomicReference;
0025
0026 import org.junit.Before;
0027 import org.junit.Test;
0028 import static org.junit.Assert.*;
0029
0030 public class InProcessLauncherSuite extends BaseSuite {
0031
0032
0033 private static final String TEST_SUCCESS = "success";
0034 private static final String TEST_FAILURE = "failure";
0035 private static final String TEST_KILL = "kill";
0036
0037 private static final String TEST_FAILURE_MESSAGE = "d'oh";
0038
0039 private static Throwable lastError;
0040
0041 @Before
0042 public void testSetup() {
0043 lastError = null;
0044 }
0045
0046 @Test
0047 public void testLauncher() throws Exception {
0048 SparkAppHandle app = startTest(TEST_SUCCESS);
0049 waitFor(app);
0050 assertNull(lastError);
0051
0052
0053
0054 assertEquals(SparkAppHandle.State.LOST, app.getState());
0055 }
0056
0057 @Test
0058 public void testKill() throws Exception {
0059 SparkAppHandle app = startTest(TEST_KILL);
0060 app.kill();
0061 waitFor(app);
0062 assertNull(lastError);
0063 assertEquals(SparkAppHandle.State.KILLED, app.getState());
0064 }
0065
0066 @Test
0067 public void testErrorPropagation() throws Exception {
0068 SparkAppHandle app = startTest(TEST_FAILURE);
0069 waitFor(app);
0070 assertEquals(SparkAppHandle.State.FAILED, app.getState());
0071
0072 assertNotNull(lastError);
0073 assertEquals(TEST_FAILURE_MESSAGE, lastError.getMessage());
0074 }
0075
0076 private SparkAppHandle startTest(String test) throws Exception {
0077 return new TestInProcessLauncher()
0078 .addAppArgs(test)
0079 .setAppResource(SparkLauncher.NO_RESOURCE)
0080 .startApplication();
0081 }
0082
0083 public static void runTest(String[] args) {
0084 try {
0085 assertTrue(args.length != 0);
0086
0087
0088 final AtomicReference<String> port = new AtomicReference<>();
0089 final AtomicReference<String> secret = new AtomicReference<>();
0090 SparkSubmitOptionParser parser = new SparkSubmitOptionParser() {
0091
0092 @Override
0093 protected boolean handle(String opt, String value) {
0094 if (opt == CONF) {
0095 String[] conf = value.split("=");
0096 switch(conf[0]) {
0097 case LauncherProtocol.CONF_LAUNCHER_PORT:
0098 port.set(conf[1]);
0099 break;
0100
0101 case LauncherProtocol.CONF_LAUNCHER_SECRET:
0102 secret.set(conf[1]);
0103 break;
0104
0105 default:
0106
0107 }
0108 }
0109
0110 return true;
0111 }
0112
0113 @Override
0114 protected boolean handleUnknown(String opt) {
0115 return true;
0116 }
0117
0118 @Override
0119 protected void handleExtraArgs(List<String> extra) {
0120
0121 }
0122
0123 };
0124
0125 parser.parse(Arrays.asList(args));
0126 assertNotNull("Launcher port not found.", port.get());
0127 assertNotNull("Launcher secret not found.", secret.get());
0128
0129 String test = args[args.length - 1];
0130 switch (test) {
0131 case TEST_SUCCESS:
0132 break;
0133
0134 case TEST_FAILURE:
0135 throw new IllegalStateException(TEST_FAILURE_MESSAGE);
0136
0137 case TEST_KILL:
0138 try {
0139
0140
0141 Thread.sleep(10000);
0142 fail("Did not get expected interrupt after 10s.");
0143 } catch (InterruptedException ie) {
0144
0145 }
0146 break;
0147
0148 default:
0149 fail("Unknown test " + test);
0150 }
0151 } catch (Throwable t) {
0152 lastError = t;
0153 throw new RuntimeException(t);
0154 }
0155 }
0156
0157 private static class TestInProcessLauncher extends InProcessLauncher {
0158
0159 @Override
0160 Method findSparkSubmit() throws IOException {
0161 try {
0162 return InProcessLauncherSuite.class.getMethod("runTest", String[].class);
0163 } catch (Exception e) {
0164 throw new IOException(e);
0165 }
0166 }
0167
0168 }
0169
0170 }