0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.launcher;
0019
0020 import java.time.Duration;
0021 import java.util.Arrays;
0022 import java.util.ArrayList;
0023 import java.util.HashMap;
0024 import java.util.List;
0025 import java.util.Map;
0026 import java.util.Properties;
0027
0028 import org.junit.Test;
0029 import static org.junit.Assert.*;
0030 import static org.junit.Assume.*;
0031 import static org.mockito.Mockito.*;
0032
0033 import org.apache.spark.SparkContext;
0034 import org.apache.spark.SparkContext$;
0035 import org.apache.spark.internal.config.package$;
0036 import org.apache.spark.util.Utils;
0037
0038
0039
0040
0041 public class SparkLauncherSuite extends BaseSuite {
0042
0043 private static final NamedThreadFactory TF = new NamedThreadFactory("SparkLauncherSuite-%d");
0044 private static final String EXCEPTION_MESSAGE = "dummy-exception";
0045 private static final RuntimeException DUMMY_EXCEPTION = new RuntimeException(EXCEPTION_MESSAGE);
0046
0047 private final SparkLauncher launcher = new SparkLauncher();
0048
0049 @Test
0050 public void testSparkArgumentHandling() throws Exception {
0051 SparkSubmitOptionParser opts = new SparkSubmitOptionParser();
0052
0053 launcher.addSparkArg(opts.HELP);
0054 try {
0055 launcher.addSparkArg(opts.PROXY_USER);
0056 fail("Expected IllegalArgumentException.");
0057 } catch (IllegalArgumentException e) {
0058
0059 }
0060
0061 launcher.addSparkArg(opts.PROXY_USER, "someUser");
0062 try {
0063 launcher.addSparkArg(opts.HELP, "someValue");
0064 fail("Expected IllegalArgumentException.");
0065 } catch (IllegalArgumentException e) {
0066
0067 }
0068
0069 launcher.addSparkArg("--future-argument");
0070 launcher.addSparkArg("--future-argument", "someValue");
0071
0072 launcher.addSparkArg(opts.MASTER, "myMaster");
0073 assertEquals("myMaster", launcher.builder.master);
0074
0075 launcher.addJar("foo");
0076 launcher.addSparkArg(opts.JARS, "bar");
0077 assertEquals(Arrays.asList("bar"), launcher.builder.jars);
0078
0079 launcher.addFile("foo");
0080 launcher.addSparkArg(opts.FILES, "bar");
0081 assertEquals(Arrays.asList("bar"), launcher.builder.files);
0082
0083 launcher.addPyFile("foo");
0084 launcher.addSparkArg(opts.PY_FILES, "bar");
0085 assertEquals(Arrays.asList("bar"), launcher.builder.pyFiles);
0086
0087 launcher.setConf("spark.foo", "foo");
0088 launcher.addSparkArg(opts.CONF, "spark.foo=bar");
0089 assertEquals("bar", launcher.builder.conf.get("spark.foo"));
0090
0091 launcher.setConf(SparkLauncher.PYSPARK_DRIVER_PYTHON, "python3.4");
0092 launcher.setConf(SparkLauncher.PYSPARK_PYTHON, "python3.5");
0093 assertEquals("python3.4", launcher.builder.conf.get(
0094 package$.MODULE$.PYSPARK_DRIVER_PYTHON().key()));
0095 assertEquals("python3.5", launcher.builder.conf.get(package$.MODULE$.PYSPARK_PYTHON().key()));
0096 }
0097
0098 @Test
0099 public void testChildProcLauncher() throws Exception {
0100
0101
0102 assumeTrue(!Utils.isWindows());
0103
0104 SparkSubmitOptionParser opts = new SparkSubmitOptionParser();
0105 Map<String, String> env = new HashMap<>();
0106 env.put("SPARK_PRINT_LAUNCH_COMMAND", "1");
0107
0108 launcher
0109 .setMaster("local")
0110 .setAppResource(SparkLauncher.NO_RESOURCE)
0111 .addSparkArg(opts.CONF,
0112 String.format("%s=-Dfoo=ShouldBeOverriddenBelow", SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS))
0113 .setConf(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS,
0114 "-Dfoo=bar -Dtest.appender=console")
0115 .setConf(SparkLauncher.DRIVER_EXTRA_CLASSPATH, System.getProperty("java.class.path"))
0116 .addSparkArg(opts.CLASS, "ShouldBeOverriddenBelow")
0117 .setMainClass(SparkLauncherTestApp.class.getName())
0118 .redirectError()
0119 .addAppArgs("proc");
0120 final Process app = launcher.launch();
0121
0122 new OutputRedirector(app.getInputStream(), getClass().getName() + ".child", TF);
0123 assertEquals(0, app.waitFor());
0124 }
0125
0126 @Test
0127 public void testInProcessLauncher() throws Exception {
0128
0129
0130
0131 Map<Object, Object> properties = new HashMap<>(System.getProperties());
0132 try {
0133 inProcessLauncherTestImpl();
0134 } finally {
0135 restoreSystemProperties(properties);
0136 waitForSparkContextShutdown();
0137 }
0138 }
0139
0140 private void inProcessLauncherTestImpl() throws Exception {
0141 final List<SparkAppHandle.State> transitions = new ArrayList<>();
0142 SparkAppHandle.Listener listener = mock(SparkAppHandle.Listener.class);
0143 doAnswer(invocation -> {
0144 SparkAppHandle h = (SparkAppHandle) invocation.getArguments()[0];
0145 synchronized (transitions) {
0146 transitions.add(h.getState());
0147 }
0148 return null;
0149 }).when(listener).stateChanged(any(SparkAppHandle.class));
0150
0151 SparkAppHandle handle = null;
0152 try {
0153 synchronized (InProcessTestApp.LOCK) {
0154 handle = new InProcessLauncher()
0155 .setMaster("local")
0156 .setAppResource(SparkLauncher.NO_RESOURCE)
0157 .setMainClass(InProcessTestApp.class.getName())
0158 .addAppArgs("hello")
0159 .startApplication(listener);
0160
0161
0162
0163
0164 final SparkAppHandle _handle = handle;
0165 eventually(Duration.ofSeconds(5), Duration.ofMillis(10), () -> {
0166 assertNotEquals(SparkAppHandle.State.UNKNOWN, _handle.getState());
0167 });
0168
0169 InProcessTestApp.LOCK.wait(5000);
0170 }
0171
0172 waitFor(handle);
0173 assertEquals(SparkAppHandle.State.FINISHED, handle.getState());
0174
0175
0176 List<SparkAppHandle.State> expected = Arrays.asList(
0177 SparkAppHandle.State.CONNECTED,
0178 SparkAppHandle.State.RUNNING,
0179 SparkAppHandle.State.FINISHED);
0180 assertEquals(expected, transitions);
0181 } finally {
0182 if (handle != null) {
0183 handle.kill();
0184 }
0185 }
0186 }
0187
0188 @Test
0189 public void testInProcessLauncherDoesNotKillJvm() throws Exception {
0190 SparkSubmitOptionParser opts = new SparkSubmitOptionParser();
0191 List<String[]> wrongArgs = Arrays.asList(
0192 new String[] { "--unknown" },
0193 new String[] { opts.DEPLOY_MODE, "invalid" });
0194
0195 for (String[] args : wrongArgs) {
0196 InProcessLauncher launcher = new InProcessLauncher()
0197 .setAppResource(SparkLauncher.NO_RESOURCE);
0198 switch (args.length) {
0199 case 2:
0200 launcher.addSparkArg(args[0], args[1]);
0201 break;
0202
0203 case 1:
0204 launcher.addSparkArg(args[0]);
0205 break;
0206
0207 default:
0208 fail("FIXME: invalid test.");
0209 }
0210
0211 SparkAppHandle handle = launcher.startApplication();
0212 waitFor(handle);
0213 assertEquals(SparkAppHandle.State.FAILED, handle.getState());
0214 }
0215
0216
0217
0218 SparkAppHandle handle = new InProcessLauncher().addSparkArg(opts.VERSION).startApplication();
0219 waitFor(handle);
0220 assertEquals(SparkAppHandle.State.LOST, handle.getState());
0221 }
0222
0223 @Test
0224 public void testInProcessLauncherGetError() throws Exception {
0225
0226
0227
0228 Map<Object, Object> properties = new HashMap<>(System.getProperties());
0229
0230 SparkAppHandle handle = null;
0231 try {
0232 handle = new InProcessLauncher()
0233 .setMaster("local")
0234 .setAppResource(SparkLauncher.NO_RESOURCE)
0235 .setMainClass(ErrorInProcessTestApp.class.getName())
0236 .addAppArgs("hello")
0237 .startApplication();
0238
0239 final SparkAppHandle _handle = handle;
0240 eventually(Duration.ofSeconds(60), Duration.ofMillis(1000), () -> {
0241 assertEquals(SparkAppHandle.State.FAILED, _handle.getState());
0242 });
0243
0244 assertNotNull(handle.getError());
0245 assertTrue(handle.getError().isPresent());
0246 assertSame(handle.getError().get(), DUMMY_EXCEPTION);
0247 } finally {
0248 if (handle != null) {
0249 handle.kill();
0250 }
0251 restoreSystemProperties(properties);
0252 waitForSparkContextShutdown();
0253 }
0254 }
0255
0256 @Test
0257 public void testSparkLauncherGetError() throws Exception {
0258 SparkAppHandle handle = null;
0259 try {
0260 handle = new SparkLauncher()
0261 .setMaster("local")
0262 .setAppResource(SparkLauncher.NO_RESOURCE)
0263 .setMainClass(ErrorInProcessTestApp.class.getName())
0264 .addAppArgs("hello")
0265 .startApplication();
0266
0267 final SparkAppHandle _handle = handle;
0268 eventually(Duration.ofSeconds(60), Duration.ofMillis(1000), () -> {
0269 assertEquals(SparkAppHandle.State.FAILED, _handle.getState());
0270 });
0271
0272 assertNotNull(handle.getError());
0273 assertTrue(handle.getError().isPresent());
0274 assertTrue(handle.getError().get().getMessage().contains(EXCEPTION_MESSAGE));
0275 } finally {
0276 if (handle != null) {
0277 handle.kill();
0278 }
0279 }
0280 }
0281
0282 private void restoreSystemProperties(Map<Object, Object> properties) {
0283 Properties p = new Properties();
0284 for (Map.Entry<Object, Object> e : properties.entrySet()) {
0285 p.put(e.getKey(), e.getValue());
0286 }
0287 System.setProperties(p);
0288 }
0289
0290 private void waitForSparkContextShutdown() throws Exception {
0291
0292
0293
0294 eventually(Duration.ofSeconds(5), Duration.ofMillis(10), () -> {
0295 assertTrue("SparkContext is still alive.", SparkContext$.MODULE$.getActive().isEmpty());
0296 });
0297 }
0298
0299 public static class SparkLauncherTestApp {
0300
0301 public static void main(String[] args) throws Exception {
0302 assertEquals(1, args.length);
0303 assertEquals("proc", args[0]);
0304 assertEquals("bar", System.getProperty("foo"));
0305 assertEquals("local", System.getProperty(SparkLauncher.SPARK_MASTER));
0306 }
0307
0308 }
0309
0310 public static class InProcessTestApp {
0311
0312
0313
0314
0315
0316
0317
0318
0319
0320
0321
0322 public static final Object LOCK = new Object();
0323
0324 public static void main(String[] args) throws Exception {
0325 assertNotEquals(0, args.length);
0326 assertEquals("hello", args[0]);
0327 new SparkContext().stop();
0328
0329 synchronized (LOCK) {
0330 LOCK.notifyAll();
0331 }
0332 }
0333
0334 }
0335
0336
0337
0338
0339 public static class ErrorInProcessTestApp {
0340
0341 public static void main(String[] args) {
0342 assertNotEquals(0, args.length);
0343 assertEquals("hello", args[0]);
0344 throw DUMMY_EXCEPTION;
0345 }
0346 }
0347 }