Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.launcher;
0019 
0020 import java.io.IOException;
0021 import java.lang.reflect.Method;
0022 import java.util.Arrays;
0023 import java.util.List;
0024 import java.util.concurrent.atomic.AtomicReference;
0025 
0026 import org.junit.Before;
0027 import org.junit.Test;
0028 import static org.junit.Assert.*;
0029 
0030 public class InProcessLauncherSuite extends BaseSuite {
0031 
0032   // Arguments passed to the test class to identify the test being run.
0033   private static final String TEST_SUCCESS = "success";
0034   private static final String TEST_FAILURE = "failure";
0035   private static final String TEST_KILL = "kill";
0036 
0037   private static final String TEST_FAILURE_MESSAGE = "d'oh";
0038 
0039   private static Throwable lastError;
0040 
0041   @Before
0042   public void testSetup() {
0043     lastError = null;
0044   }
0045 
0046   @Test
0047   public void testLauncher() throws Exception {
0048     SparkAppHandle app = startTest(TEST_SUCCESS);
0049     waitFor(app);
0050     assertNull(lastError);
0051 
0052     // Because the test doesn't implement the launcher protocol, the final state here will be
0053     // LOST instead of FINISHED.
0054     assertEquals(SparkAppHandle.State.LOST, app.getState());
0055   }
0056 
0057   @Test
0058   public void testKill() throws Exception {
0059     SparkAppHandle app = startTest(TEST_KILL);
0060     app.kill();
0061     waitFor(app);
0062     assertNull(lastError);
0063     assertEquals(SparkAppHandle.State.KILLED, app.getState());
0064   }
0065 
0066   @Test
0067   public void testErrorPropagation() throws Exception {
0068     SparkAppHandle app = startTest(TEST_FAILURE);
0069     waitFor(app);
0070     assertEquals(SparkAppHandle.State.FAILED, app.getState());
0071 
0072     assertNotNull(lastError);
0073     assertEquals(TEST_FAILURE_MESSAGE, lastError.getMessage());
0074   }
0075 
0076   private SparkAppHandle startTest(String test) throws Exception {
0077     return new TestInProcessLauncher()
0078       .addAppArgs(test)
0079       .setAppResource(SparkLauncher.NO_RESOURCE)
0080       .startApplication();
0081   }
0082 
0083   public static void runTest(String[] args) {
0084     try {
0085       assertTrue(args.length != 0);
0086 
0087       // Make sure at least the launcher-provided config options are in the args array.
0088       final AtomicReference<String> port = new AtomicReference<>();
0089       final AtomicReference<String> secret = new AtomicReference<>();
0090       SparkSubmitOptionParser parser = new SparkSubmitOptionParser() {
0091 
0092         @Override
0093         protected boolean handle(String opt, String value) {
0094           if (opt == CONF) {
0095             String[] conf = value.split("=");
0096             switch(conf[0]) {
0097               case LauncherProtocol.CONF_LAUNCHER_PORT:
0098                 port.set(conf[1]);
0099                 break;
0100 
0101               case LauncherProtocol.CONF_LAUNCHER_SECRET:
0102                 secret.set(conf[1]);
0103                 break;
0104 
0105               default:
0106                 // no op
0107             }
0108           }
0109 
0110           return true;
0111         }
0112 
0113         @Override
0114         protected boolean handleUnknown(String opt) {
0115           return true;
0116         }
0117 
0118         @Override
0119         protected void handleExtraArgs(List<String> extra) {
0120           // no op.
0121         }
0122 
0123       };
0124 
0125       parser.parse(Arrays.asList(args));
0126       assertNotNull("Launcher port not found.", port.get());
0127       assertNotNull("Launcher secret not found.", secret.get());
0128 
0129       String test = args[args.length - 1];
0130       switch (test) {
0131       case TEST_SUCCESS:
0132         break;
0133 
0134       case TEST_FAILURE:
0135         throw new IllegalStateException(TEST_FAILURE_MESSAGE);
0136 
0137       case TEST_KILL:
0138         try {
0139           // Wait for a reasonable amount of time to avoid the test hanging forever on failure,
0140           // but still allowing for time outs to hopefully not occur on busy machines.
0141           Thread.sleep(10000);
0142           fail("Did not get expected interrupt after 10s.");
0143         } catch (InterruptedException ie) {
0144           // Expected.
0145         }
0146         break;
0147 
0148       default:
0149         fail("Unknown test " + test);
0150       }
0151     } catch (Throwable t) {
0152       lastError = t;
0153       throw new RuntimeException(t);
0154     }
0155   }
0156 
0157   private static class TestInProcessLauncher extends InProcessLauncher {
0158 
0159     @Override
0160     Method findSparkSubmit() throws IOException {
0161       try {
0162         return InProcessLauncherSuite.class.getMethod("runTest", String[].class);
0163       } catch (Exception e) {
0164         throw new IOException(e);
0165       }
0166     }
0167 
0168   }
0169 
0170 }