0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.network;
0019
0020 import com.google.common.util.concurrent.Uninterruptibles;
0021 import org.apache.spark.network.buffer.ManagedBuffer;
0022 import org.apache.spark.network.buffer.NioManagedBuffer;
0023 import org.apache.spark.network.client.ChunkReceivedCallback;
0024 import org.apache.spark.network.client.RpcResponseCallback;
0025 import org.apache.spark.network.client.TransportClient;
0026 import org.apache.spark.network.client.TransportClientFactory;
0027 import org.apache.spark.network.server.RpcHandler;
0028 import org.apache.spark.network.server.StreamManager;
0029 import org.apache.spark.network.server.TransportServer;
0030 import org.apache.spark.network.util.MapConfigProvider;
0031 import org.apache.spark.network.util.TransportConf;
0032 import org.junit.*;
0033 import static org.junit.Assert.*;
0034
0035 import java.io.IOException;
0036 import java.nio.ByteBuffer;
0037 import java.util.*;
0038 import java.util.concurrent.CountDownLatch;
0039 import java.util.concurrent.Semaphore;
0040 import java.util.concurrent.TimeUnit;
0041
0042
0043
0044
0045
0046
0047
0048
0049 public class RequestTimeoutIntegrationSuite {
0050
0051 private TransportContext context;
0052 private TransportServer server;
0053 private TransportClientFactory clientFactory;
0054
0055 private StreamManager defaultManager;
0056 private TransportConf conf;
0057
0058
0059 private static final int FOREVER = 60 * 1000;
0060
0061 @Before
0062 public void setUp() throws Exception {
0063 Map<String, String> configMap = new HashMap<>();
0064 configMap.put("spark.shuffle.io.connectionTimeout", "10s");
0065 conf = new TransportConf("shuffle", new MapConfigProvider(configMap));
0066
0067 defaultManager = new StreamManager() {
0068 @Override
0069 public ManagedBuffer getChunk(long streamId, int chunkIndex) {
0070 throw new UnsupportedOperationException();
0071 }
0072 };
0073 }
0074
0075 @After
0076 public void tearDown() {
0077 if (server != null) {
0078 server.close();
0079 }
0080 if (clientFactory != null) {
0081 clientFactory.close();
0082 }
0083 if (context != null) {
0084 context.close();
0085 }
0086 }
0087
0088
0089 @Test
0090 public void timeoutInactiveRequests() throws Exception {
0091 final Semaphore semaphore = new Semaphore(1);
0092 final int responseSize = 16;
0093 RpcHandler handler = new RpcHandler() {
0094 @Override
0095 public void receive(
0096 TransportClient client,
0097 ByteBuffer message,
0098 RpcResponseCallback callback) {
0099 try {
0100 semaphore.acquire();
0101 callback.onSuccess(ByteBuffer.allocate(responseSize));
0102 } catch (InterruptedException e) {
0103
0104 }
0105 }
0106
0107 @Override
0108 public StreamManager getStreamManager() {
0109 return defaultManager;
0110 }
0111 };
0112
0113 context = new TransportContext(conf, handler);
0114 server = context.createServer();
0115 clientFactory = context.createClientFactory();
0116 TransportClient client = clientFactory.createClient(TestUtils.getLocalHost(), server.getPort());
0117
0118
0119 TestCallback callback0 = new TestCallback();
0120 client.sendRpc(ByteBuffer.allocate(0), callback0);
0121 callback0.latch.await();
0122 assertEquals(responseSize, callback0.successLength);
0123
0124
0125 TestCallback callback1 = new TestCallback();
0126 client.sendRpc(ByteBuffer.allocate(0), callback1);
0127 callback1.latch.await(60, TimeUnit.SECONDS);
0128 assertNotNull(callback1.failure);
0129 assertTrue(callback1.failure instanceof IOException);
0130
0131 semaphore.release();
0132 }
0133
0134
0135
0136 @Test
0137 public void timeoutCleanlyClosesClient() throws Exception {
0138 final Semaphore semaphore = new Semaphore(0);
0139 final int responseSize = 16;
0140 RpcHandler handler = new RpcHandler() {
0141 @Override
0142 public void receive(
0143 TransportClient client,
0144 ByteBuffer message,
0145 RpcResponseCallback callback) {
0146 try {
0147 semaphore.acquire();
0148 callback.onSuccess(ByteBuffer.allocate(responseSize));
0149 } catch (InterruptedException e) {
0150
0151 }
0152 }
0153
0154 @Override
0155 public StreamManager getStreamManager() {
0156 return defaultManager;
0157 }
0158 };
0159
0160 context = new TransportContext(conf, handler);
0161 server = context.createServer();
0162 clientFactory = context.createClientFactory();
0163
0164
0165 TransportClient client0 =
0166 clientFactory.createClient(TestUtils.getLocalHost(), server.getPort());
0167 TestCallback callback0 = new TestCallback();
0168 client0.sendRpc(ByteBuffer.allocate(0), callback0);
0169 callback0.latch.await();
0170 assertTrue(callback0.failure instanceof IOException);
0171 assertFalse(client0.isActive());
0172
0173
0174 semaphore.release(2);
0175 TransportClient client1 =
0176 clientFactory.createClient(TestUtils.getLocalHost(), server.getPort());
0177 TestCallback callback1 = new TestCallback();
0178 client1.sendRpc(ByteBuffer.allocate(0), callback1);
0179 callback1.latch.await();
0180 assertEquals(responseSize, callback1.successLength);
0181 assertNull(callback1.failure);
0182 }
0183
0184
0185
0186 @Test
0187 public void furtherRequestsDelay() throws Exception {
0188 final byte[] response = new byte[16];
0189 final StreamManager manager = new StreamManager() {
0190 @Override
0191 public ManagedBuffer getChunk(long streamId, int chunkIndex) {
0192 Uninterruptibles.sleepUninterruptibly(FOREVER, TimeUnit.MILLISECONDS);
0193 return new NioManagedBuffer(ByteBuffer.wrap(response));
0194 }
0195 };
0196 RpcHandler handler = new RpcHandler() {
0197 @Override
0198 public void receive(
0199 TransportClient client,
0200 ByteBuffer message,
0201 RpcResponseCallback callback) {
0202 throw new UnsupportedOperationException();
0203 }
0204
0205 @Override
0206 public StreamManager getStreamManager() {
0207 return manager;
0208 }
0209 };
0210
0211 context = new TransportContext(conf, handler);
0212 server = context.createServer();
0213 clientFactory = context.createClientFactory();
0214 TransportClient client = clientFactory.createClient(TestUtils.getLocalHost(), server.getPort());
0215
0216
0217 TestCallback callback0 = new TestCallback();
0218 client.fetchChunk(0, 0, callback0);
0219 Uninterruptibles.sleepUninterruptibly(1200, TimeUnit.MILLISECONDS);
0220
0221
0222 TestCallback callback1 = new TestCallback();
0223 client.fetchChunk(0, 1, callback1);
0224 Uninterruptibles.sleepUninterruptibly(1200, TimeUnit.MILLISECONDS);
0225
0226
0227 assertEquals(-1, callback0.successLength);
0228 assertNull(callback0.failure);
0229 callback0.latch.await(60, TimeUnit.SECONDS);
0230 assertTrue(callback0.failure instanceof IOException);
0231
0232
0233 callback1.latch.await(60, TimeUnit.SECONDS);
0234
0235 assertTrue(callback1.failure instanceof IOException);
0236 }
0237
0238
0239
0240
0241
0242 static class TestCallback implements RpcResponseCallback, ChunkReceivedCallback {
0243
0244 int successLength = -1;
0245 Throwable failure;
0246 final CountDownLatch latch = new CountDownLatch(1);
0247
0248 @Override
0249 public void onSuccess(ByteBuffer response) {
0250 successLength = response.remaining();
0251 latch.countDown();
0252 }
0253
0254 @Override
0255 public void onFailure(Throwable e) {
0256 failure = e;
0257 latch.countDown();
0258 }
0259
0260 @Override
0261 public void onSuccess(int chunkIndex, ManagedBuffer buffer) {
0262 try {
0263 successLength = buffer.nioByteBuffer().remaining();
0264 } catch (IOException e) {
0265
0266 } finally {
0267 latch.countDown();
0268 }
0269 }
0270
0271 @Override
0272 public void onFailure(int chunkIndex, Throwable e) {
0273 failure = e;
0274 latch.countDown();
0275 }
0276 }
0277 }