Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.network.shuffle.mesos;
0019 
0020 import java.io.IOException;
0021 import java.nio.ByteBuffer;
0022 import java.util.concurrent.Executors;
0023 import java.util.concurrent.ScheduledExecutorService;
0024 import java.util.concurrent.TimeUnit;
0025 
0026 import com.google.common.util.concurrent.ThreadFactoryBuilder;
0027 import org.apache.spark.network.shuffle.protocol.mesos.ShuffleServiceHeartbeat;
0028 import org.slf4j.Logger;
0029 import org.slf4j.LoggerFactory;
0030 
0031 import org.apache.spark.network.client.RpcResponseCallback;
0032 import org.apache.spark.network.client.TransportClient;
0033 import org.apache.spark.network.sasl.SecretKeyHolder;
0034 import org.apache.spark.network.shuffle.ExternalBlockStoreClient;
0035 import org.apache.spark.network.shuffle.protocol.mesos.RegisterDriver;
0036 import org.apache.spark.network.util.TransportConf;
0037 
0038 /**
0039  * A client for talking to the external shuffle service in Mesos coarse-grained mode.
0040  *
0041  * This is used by the Spark driver to register with each external shuffle service on the cluster.
0042  * The reason why the driver has to talk to the service is for cleaning up shuffle files reliably
0043  * after the application exits. Mesos does not provide a great alternative to do this, so Spark
0044  * has to detect this itself.
0045  */
0046 public class MesosExternalBlockStoreClient extends ExternalBlockStoreClient {
0047   private static final Logger logger =
0048       LoggerFactory.getLogger(MesosExternalBlockStoreClient.class);
0049 
0050   private final ScheduledExecutorService heartbeaterThread =
0051       Executors.newSingleThreadScheduledExecutor(
0052         new ThreadFactoryBuilder()
0053           .setDaemon(true)
0054           .setNameFormat("mesos-external-shuffle-client-heartbeater")
0055           .build());
0056 
0057   /**
0058    * Creates an Mesos external shuffle client that wraps the {@link ExternalBlockStoreClient}.
0059    * Please refer to docs on {@link ExternalBlockStoreClient} for more information.
0060    */
0061   public MesosExternalBlockStoreClient(
0062       TransportConf conf,
0063       SecretKeyHolder secretKeyHolder,
0064       boolean authEnabled,
0065       long registrationTimeoutMs) {
0066     super(conf, secretKeyHolder, authEnabled, registrationTimeoutMs);
0067   }
0068 
0069   public void registerDriverWithShuffleService(
0070       String host,
0071       int port,
0072       long heartbeatTimeoutMs,
0073       long heartbeatIntervalMs) throws IOException, InterruptedException {
0074 
0075     checkInit();
0076     ByteBuffer registerDriver = new RegisterDriver(appId, heartbeatTimeoutMs).toByteBuffer();
0077     TransportClient client = clientFactory.createClient(host, port);
0078     client.sendRpc(registerDriver, new RegisterDriverCallback(client, heartbeatIntervalMs));
0079   }
0080 
0081   private class RegisterDriverCallback implements RpcResponseCallback {
0082     private final TransportClient client;
0083     private final long heartbeatIntervalMs;
0084 
0085     private RegisterDriverCallback(TransportClient client, long heartbeatIntervalMs) {
0086       this.client = client;
0087       this.heartbeatIntervalMs = heartbeatIntervalMs;
0088     }
0089 
0090     @Override
0091     public void onSuccess(ByteBuffer response) {
0092       heartbeaterThread.scheduleAtFixedRate(
0093           new Heartbeater(client), 0, heartbeatIntervalMs, TimeUnit.MILLISECONDS);
0094       logger.info("Successfully registered app " + appId + " with external shuffle service.");
0095     }
0096 
0097     @Override
0098     public void onFailure(Throwable e) {
0099       logger.warn("Unable to register app " + appId + " with external shuffle service. " +
0100           "Please manually remove shuffle data after driver exit. Error: " + e);
0101     }
0102   }
0103 
0104   @Override
0105   public void close() {
0106     heartbeaterThread.shutdownNow();
0107     super.close();
0108   }
0109 
0110   private class Heartbeater implements Runnable {
0111 
0112     private final TransportClient client;
0113 
0114     private Heartbeater(TransportClient client) {
0115       this.client = client;
0116     }
0117 
0118     @Override
0119     public void run() {
0120       // TODO: Stop sending heartbeats if the shuffle service has lost the app due to timeout
0121       client.send(new ShuffleServiceHeartbeat(appId).toByteBuffer());
0122     }
0123   }
0124 }