0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.network.shuffle.mesos;
0019
0020 import java.io.IOException;
0021 import java.nio.ByteBuffer;
0022 import java.util.concurrent.Executors;
0023 import java.util.concurrent.ScheduledExecutorService;
0024 import java.util.concurrent.TimeUnit;
0025
0026 import com.google.common.util.concurrent.ThreadFactoryBuilder;
0027 import org.apache.spark.network.shuffle.protocol.mesos.ShuffleServiceHeartbeat;
0028 import org.slf4j.Logger;
0029 import org.slf4j.LoggerFactory;
0030
0031 import org.apache.spark.network.client.RpcResponseCallback;
0032 import org.apache.spark.network.client.TransportClient;
0033 import org.apache.spark.network.sasl.SecretKeyHolder;
0034 import org.apache.spark.network.shuffle.ExternalBlockStoreClient;
0035 import org.apache.spark.network.shuffle.protocol.mesos.RegisterDriver;
0036 import org.apache.spark.network.util.TransportConf;
0037
0038
0039
0040
0041
0042
0043
0044
0045
0046 public class MesosExternalBlockStoreClient extends ExternalBlockStoreClient {
0047 private static final Logger logger =
0048 LoggerFactory.getLogger(MesosExternalBlockStoreClient.class);
0049
0050 private final ScheduledExecutorService heartbeaterThread =
0051 Executors.newSingleThreadScheduledExecutor(
0052 new ThreadFactoryBuilder()
0053 .setDaemon(true)
0054 .setNameFormat("mesos-external-shuffle-client-heartbeater")
0055 .build());
0056
0057
0058
0059
0060
0061 public MesosExternalBlockStoreClient(
0062 TransportConf conf,
0063 SecretKeyHolder secretKeyHolder,
0064 boolean authEnabled,
0065 long registrationTimeoutMs) {
0066 super(conf, secretKeyHolder, authEnabled, registrationTimeoutMs);
0067 }
0068
0069 public void registerDriverWithShuffleService(
0070 String host,
0071 int port,
0072 long heartbeatTimeoutMs,
0073 long heartbeatIntervalMs) throws IOException, InterruptedException {
0074
0075 checkInit();
0076 ByteBuffer registerDriver = new RegisterDriver(appId, heartbeatTimeoutMs).toByteBuffer();
0077 TransportClient client = clientFactory.createClient(host, port);
0078 client.sendRpc(registerDriver, new RegisterDriverCallback(client, heartbeatIntervalMs));
0079 }
0080
0081 private class RegisterDriverCallback implements RpcResponseCallback {
0082 private final TransportClient client;
0083 private final long heartbeatIntervalMs;
0084
0085 private RegisterDriverCallback(TransportClient client, long heartbeatIntervalMs) {
0086 this.client = client;
0087 this.heartbeatIntervalMs = heartbeatIntervalMs;
0088 }
0089
0090 @Override
0091 public void onSuccess(ByteBuffer response) {
0092 heartbeaterThread.scheduleAtFixedRate(
0093 new Heartbeater(client), 0, heartbeatIntervalMs, TimeUnit.MILLISECONDS);
0094 logger.info("Successfully registered app " + appId + " with external shuffle service.");
0095 }
0096
0097 @Override
0098 public void onFailure(Throwable e) {
0099 logger.warn("Unable to register app " + appId + " with external shuffle service. " +
0100 "Please manually remove shuffle data after driver exit. Error: " + e);
0101 }
0102 }
0103
0104 @Override
0105 public void close() {
0106 heartbeaterThread.shutdownNow();
0107 super.close();
0108 }
0109
0110 private class Heartbeater implements Runnable {
0111
0112 private final TransportClient client;
0113
0114 private Heartbeater(TransportClient client) {
0115 this.client = client;
0116 }
0117
0118 @Override
0119 public void run() {
0120
0121 client.send(new ShuffleServiceHeartbeat(appId).toByteBuffer());
0122 }
0123 }
0124 }