Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.network;
0019 
0020 import com.google.common.util.concurrent.Uninterruptibles;
0021 import org.apache.spark.network.buffer.ManagedBuffer;
0022 import org.apache.spark.network.buffer.NioManagedBuffer;
0023 import org.apache.spark.network.client.ChunkReceivedCallback;
0024 import org.apache.spark.network.client.RpcResponseCallback;
0025 import org.apache.spark.network.client.TransportClient;
0026 import org.apache.spark.network.client.TransportClientFactory;
0027 import org.apache.spark.network.server.RpcHandler;
0028 import org.apache.spark.network.server.StreamManager;
0029 import org.apache.spark.network.server.TransportServer;
0030 import org.apache.spark.network.util.MapConfigProvider;
0031 import org.apache.spark.network.util.TransportConf;
0032 import org.junit.*;
0033 import static org.junit.Assert.*;
0034 
0035 import java.io.IOException;
0036 import java.nio.ByteBuffer;
0037 import java.util.*;
0038 import java.util.concurrent.CountDownLatch;
0039 import java.util.concurrent.Semaphore;
0040 import java.util.concurrent.TimeUnit;
0041 
0042 /**
0043  * Suite which ensures that requests that go without a response for the network timeout period are
0044  * failed, and the connection closed.
0045  *
0046  * In this suite, we use 10 seconds as the connection timeout, with some slack given in the tests,
0047  * to ensure stability in different test environments.
0048  */
0049 public class RequestTimeoutIntegrationSuite {
0050 
0051   private TransportContext context;
0052   private TransportServer server;
0053   private TransportClientFactory clientFactory;
0054 
0055   private StreamManager defaultManager;
0056   private TransportConf conf;
0057 
0058   // A large timeout that "shouldn't happen", for the sake of faulty tests not hanging forever.
0059   private static final int FOREVER = 60 * 1000;
0060 
0061   @Before
0062   public void setUp() throws Exception {
0063     Map<String, String> configMap = new HashMap<>();
0064     configMap.put("spark.shuffle.io.connectionTimeout", "10s");
0065     conf = new TransportConf("shuffle", new MapConfigProvider(configMap));
0066 
0067     defaultManager = new StreamManager() {
0068       @Override
0069       public ManagedBuffer getChunk(long streamId, int chunkIndex) {
0070         throw new UnsupportedOperationException();
0071       }
0072     };
0073   }
0074 
0075   @After
0076   public void tearDown() {
0077     if (server != null) {
0078       server.close();
0079     }
0080     if (clientFactory != null) {
0081       clientFactory.close();
0082     }
0083     if (context !=  null) {
0084       context.close();
0085     }
0086   }
0087 
0088   // Basic suite: First request completes quickly, and second waits for longer than network timeout.
0089   @Test
0090   public void timeoutInactiveRequests() throws Exception {
0091     final Semaphore semaphore = new Semaphore(1);
0092     final int responseSize = 16;
0093     RpcHandler handler = new RpcHandler() {
0094       @Override
0095       public void receive(
0096           TransportClient client,
0097           ByteBuffer message,
0098           RpcResponseCallback callback) {
0099         try {
0100           semaphore.acquire();
0101           callback.onSuccess(ByteBuffer.allocate(responseSize));
0102         } catch (InterruptedException e) {
0103           // do nothing
0104         }
0105       }
0106 
0107       @Override
0108       public StreamManager getStreamManager() {
0109         return defaultManager;
0110       }
0111     };
0112 
0113     context = new TransportContext(conf, handler);
0114     server = context.createServer();
0115     clientFactory = context.createClientFactory();
0116     TransportClient client = clientFactory.createClient(TestUtils.getLocalHost(), server.getPort());
0117 
0118     // First completes quickly (semaphore starts at 1).
0119     TestCallback callback0 = new TestCallback();
0120     client.sendRpc(ByteBuffer.allocate(0), callback0);
0121     callback0.latch.await();
0122     assertEquals(responseSize, callback0.successLength);
0123 
0124     // Second times out after 10 seconds, with slack. Must be IOException.
0125     TestCallback callback1 = new TestCallback();
0126     client.sendRpc(ByteBuffer.allocate(0), callback1);
0127     callback1.latch.await(60, TimeUnit.SECONDS);
0128     assertNotNull(callback1.failure);
0129     assertTrue(callback1.failure instanceof IOException);
0130 
0131     semaphore.release();
0132   }
0133 
0134   // A timeout will cause the connection to be closed, invalidating the current TransportClient.
0135   // It should be the case that requesting a client from the factory produces a new, valid one.
0136   @Test
0137   public void timeoutCleanlyClosesClient() throws Exception {
0138     final Semaphore semaphore = new Semaphore(0);
0139     final int responseSize = 16;
0140     RpcHandler handler = new RpcHandler() {
0141       @Override
0142       public void receive(
0143           TransportClient client,
0144           ByteBuffer message,
0145           RpcResponseCallback callback) {
0146         try {
0147           semaphore.acquire();
0148           callback.onSuccess(ByteBuffer.allocate(responseSize));
0149         } catch (InterruptedException e) {
0150           // do nothing
0151         }
0152       }
0153 
0154       @Override
0155       public StreamManager getStreamManager() {
0156         return defaultManager;
0157       }
0158     };
0159 
0160     context = new TransportContext(conf, handler);
0161     server = context.createServer();
0162     clientFactory = context.createClientFactory();
0163 
0164     // First request should eventually fail.
0165     TransportClient client0 =
0166       clientFactory.createClient(TestUtils.getLocalHost(), server.getPort());
0167     TestCallback callback0 = new TestCallback();
0168     client0.sendRpc(ByteBuffer.allocate(0), callback0);
0169     callback0.latch.await();
0170     assertTrue(callback0.failure instanceof IOException);
0171     assertFalse(client0.isActive());
0172 
0173     // Increment the semaphore and the second request should succeed quickly.
0174     semaphore.release(2);
0175     TransportClient client1 =
0176       clientFactory.createClient(TestUtils.getLocalHost(), server.getPort());
0177     TestCallback callback1 = new TestCallback();
0178     client1.sendRpc(ByteBuffer.allocate(0), callback1);
0179     callback1.latch.await();
0180     assertEquals(responseSize, callback1.successLength);
0181     assertNull(callback1.failure);
0182   }
0183 
0184   // The timeout is relative to the LAST request sent, which is kinda weird, but still.
0185   // This test also makes sure the timeout works for Fetch requests as well as RPCs.
0186   @Test
0187   public void furtherRequestsDelay() throws Exception {
0188     final byte[] response = new byte[16];
0189     final StreamManager manager = new StreamManager() {
0190       @Override
0191       public ManagedBuffer getChunk(long streamId, int chunkIndex) {
0192         Uninterruptibles.sleepUninterruptibly(FOREVER, TimeUnit.MILLISECONDS);
0193         return new NioManagedBuffer(ByteBuffer.wrap(response));
0194       }
0195     };
0196     RpcHandler handler = new RpcHandler() {
0197       @Override
0198       public void receive(
0199           TransportClient client,
0200           ByteBuffer message,
0201           RpcResponseCallback callback) {
0202         throw new UnsupportedOperationException();
0203       }
0204 
0205       @Override
0206       public StreamManager getStreamManager() {
0207         return manager;
0208       }
0209     };
0210 
0211     context = new TransportContext(conf, handler);
0212     server = context.createServer();
0213     clientFactory = context.createClientFactory();
0214     TransportClient client = clientFactory.createClient(TestUtils.getLocalHost(), server.getPort());
0215 
0216     // Send one request, which will eventually fail.
0217     TestCallback callback0 = new TestCallback();
0218     client.fetchChunk(0, 0, callback0);
0219     Uninterruptibles.sleepUninterruptibly(1200, TimeUnit.MILLISECONDS);
0220 
0221     // Send a second request before the first has failed.
0222     TestCallback callback1 = new TestCallback();
0223     client.fetchChunk(0, 1, callback1);
0224     Uninterruptibles.sleepUninterruptibly(1200, TimeUnit.MILLISECONDS);
0225 
0226     // not complete yet, but should complete soon
0227     assertEquals(-1, callback0.successLength);
0228     assertNull(callback0.failure);
0229     callback0.latch.await(60, TimeUnit.SECONDS);
0230     assertTrue(callback0.failure instanceof IOException);
0231 
0232     // make sure callback1 is called.
0233     callback1.latch.await(60, TimeUnit.SECONDS);
0234     // failed at same time as previous
0235     assertTrue(callback1.failure instanceof IOException);
0236   }
0237 
0238   /**
0239    * Callback which sets 'success' or 'failure' on completion.
0240    * Additionally notifies all waiters on this callback when invoked.
0241    */
0242   static class TestCallback implements RpcResponseCallback, ChunkReceivedCallback {
0243 
0244     int successLength = -1;
0245     Throwable failure;
0246     final CountDownLatch latch = new CountDownLatch(1);
0247 
0248     @Override
0249     public void onSuccess(ByteBuffer response) {
0250       successLength = response.remaining();
0251       latch.countDown();
0252     }
0253 
0254     @Override
0255     public void onFailure(Throwable e) {
0256       failure = e;
0257       latch.countDown();
0258     }
0259 
0260     @Override
0261     public void onSuccess(int chunkIndex, ManagedBuffer buffer) {
0262       try {
0263         successLength = buffer.nioByteBuffer().remaining();
0264       } catch (IOException e) {
0265         // weird
0266       } finally {
0267         latch.countDown();
0268       }
0269     }
0270 
0271     @Override
0272     public void onFailure(int chunkIndex, Throwable e) {
0273       failure = e;
0274       latch.countDown();
0275     }
0276   }
0277 }