0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.launcher;
0019
0020 import java.time.Duration;
0021
0022 import org.junit.After;
0023 import org.slf4j.bridge.SLF4JBridgeHandler;
0024 import static org.junit.Assert.*;
0025
0026
0027
0028
0029 class BaseSuite {
0030
0031 static {
0032 SLF4JBridgeHandler.removeHandlersForRootLogger();
0033 SLF4JBridgeHandler.install();
0034 }
0035
0036 @After
0037 public void postChecks() {
0038 LauncherServer server = LauncherServer.getServer();
0039 if (server != null) {
0040
0041 try {
0042 server.close();
0043 } catch (Exception e) {
0044
0045 }
0046 }
0047 assertNull(server);
0048 }
0049
0050 protected void waitFor(final SparkAppHandle handle) throws Exception {
0051 try {
0052 eventually(Duration.ofSeconds(10), Duration.ofMillis(10), () -> {
0053 assertTrue("Handle is not in final state.", handle.getState().isFinal());
0054 });
0055 } finally {
0056 if (!handle.getState().isFinal()) {
0057 handle.kill();
0058 }
0059 }
0060
0061
0062
0063 AbstractAppHandle ahandle = (AbstractAppHandle) handle;
0064 eventually(Duration.ofSeconds(10), Duration.ofMillis(10), () -> {
0065 assertTrue("Handle is still not marked as disposed.", ahandle.isDisposed());
0066 });
0067 }
0068
0069
0070
0071
0072
0073 protected void eventually(Duration timeout, Duration period, Runnable check) throws Exception {
0074 assertTrue("Timeout needs to be larger than period.", timeout.compareTo(period) > 0);
0075 long deadline = System.nanoTime() + timeout.toNanos();
0076 int count = 0;
0077 while (true) {
0078 try {
0079 count++;
0080 check.run();
0081 return;
0082 } catch (Throwable t) {
0083 if (System.nanoTime() >= deadline) {
0084 String msg = String.format("Failed check after %d tries: %s.", count, t.getMessage());
0085 throw new IllegalStateException(msg, t);
0086 }
0087 Thread.sleep(period.toMillis());
0088 }
0089 }
0090 }
0091
0092 }