0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.network.shuffle.protocol;
0019
0020 import java.util.*;
0021
0022 import io.netty.buffer.ByteBuf;
0023 import org.apache.commons.lang3.builder.ToStringBuilder;
0024 import org.apache.commons.lang3.builder.ToStringStyle;
0025
0026 import org.apache.spark.network.protocol.Encoders;
0027
0028
0029 import static org.apache.spark.network.shuffle.protocol.BlockTransferMessage.Type;
0030
0031
0032 public class LocalDirsForExecutors extends BlockTransferMessage {
0033 private final String[] execIds;
0034 private final int[] numLocalDirsByExec;
0035 private final String[] allLocalDirs;
0036
0037 public LocalDirsForExecutors(Map<String, String[]> localDirsByExec) {
0038 this.execIds = new String[localDirsByExec.size()];
0039 this.numLocalDirsByExec = new int[localDirsByExec.size()];
0040 ArrayList<String> localDirs = new ArrayList<>();
0041 int index = 0;
0042 for (Map.Entry<String, String[]> e: localDirsByExec.entrySet()) {
0043 execIds[index] = e.getKey();
0044 numLocalDirsByExec[index] = e.getValue().length;
0045 Collections.addAll(localDirs, e.getValue());
0046 index++;
0047 }
0048 this.allLocalDirs = localDirs.toArray(new String[0]);
0049 }
0050
0051 private LocalDirsForExecutors(String[] execIds, int[] numLocalDirsByExec, String[] allLocalDirs) {
0052 this.execIds = execIds;
0053 this.numLocalDirsByExec = numLocalDirsByExec;
0054 this.allLocalDirs = allLocalDirs;
0055 }
0056
0057 @Override
0058 protected Type type() { return Type.LOCAL_DIRS_FOR_EXECUTORS; }
0059
0060 @Override
0061 public int hashCode() {
0062 return Arrays.hashCode(execIds);
0063 }
0064
0065 @Override
0066 public String toString() {
0067 return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
0068 .append("execIds", Arrays.toString(execIds))
0069 .append("numLocalDirsByExec", Arrays.toString(numLocalDirsByExec))
0070 .append("allLocalDirs", Arrays.toString(allLocalDirs))
0071 .toString();
0072 }
0073
0074 @Override
0075 public boolean equals(Object other) {
0076 if (other instanceof LocalDirsForExecutors) {
0077 LocalDirsForExecutors o = (LocalDirsForExecutors) other;
0078 return Arrays.equals(execIds, o.execIds)
0079 && Arrays.equals(numLocalDirsByExec, o.numLocalDirsByExec)
0080 && Arrays.equals(allLocalDirs, o.allLocalDirs);
0081 }
0082 return false;
0083 }
0084
0085 @Override
0086 public int encodedLength() {
0087 return Encoders.StringArrays.encodedLength(execIds)
0088 + Encoders.IntArrays.encodedLength(numLocalDirsByExec)
0089 + Encoders.StringArrays.encodedLength(allLocalDirs);
0090 }
0091
0092 @Override
0093 public void encode(ByteBuf buf) {
0094 Encoders.StringArrays.encode(buf, execIds);
0095 Encoders.IntArrays.encode(buf, numLocalDirsByExec);
0096 Encoders.StringArrays.encode(buf, allLocalDirs);
0097 }
0098
0099 public static LocalDirsForExecutors decode(ByteBuf buf) {
0100 String[] execIds = Encoders.StringArrays.decode(buf);
0101 int[] numLocalDirsByExec = Encoders.IntArrays.decode(buf);
0102 String[] allLocalDirs = Encoders.StringArrays.decode(buf);
0103 return new LocalDirsForExecutors(execIds, numLocalDirsByExec, allLocalDirs);
0104 }
0105
0106 public Map<String, String[]> getLocalDirsByExec() {
0107 Map<String, String[]> localDirsByExec = new HashMap<>();
0108 int index = 0;
0109 int localDirsIndex = 0;
0110 for (int length: numLocalDirsByExec) {
0111 localDirsByExec.put(execIds[index],
0112 Arrays.copyOfRange(allLocalDirs, localDirsIndex, localDirsIndex + length));
0113 localDirsIndex += length;
0114 index++;
0115 }
0116 return localDirsByExec;
0117 }
0118 }