0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.network.shuffle.protocol;
0019
0020 import java.util.Arrays;
0021
0022 import io.netty.buffer.ByteBuf;
0023 import org.apache.commons.lang3.builder.ToStringBuilder;
0024 import org.apache.commons.lang3.builder.ToStringStyle;
0025
0026 import org.apache.spark.network.protocol.Encoders;
0027
0028
0029 import static org.apache.spark.network.shuffle.protocol.BlockTransferMessage.Type;
0030
0031
0032 public class FetchShuffleBlocks extends BlockTransferMessage {
0033 public final String appId;
0034 public final String execId;
0035 public final int shuffleId;
0036
0037
0038 public final long[] mapIds;
0039
0040
0041
0042
0043 public final int[][] reduceIds;
0044 public final boolean batchFetchEnabled;
0045
0046 public FetchShuffleBlocks(
0047 String appId,
0048 String execId,
0049 int shuffleId,
0050 long[] mapIds,
0051 int[][] reduceIds,
0052 boolean batchFetchEnabled) {
0053 this.appId = appId;
0054 this.execId = execId;
0055 this.shuffleId = shuffleId;
0056 this.mapIds = mapIds;
0057 this.reduceIds = reduceIds;
0058 assert(mapIds.length == reduceIds.length);
0059 this.batchFetchEnabled = batchFetchEnabled;
0060 if (batchFetchEnabled) {
0061 for (int[] ids: reduceIds) {
0062 assert(ids.length == 2);
0063 }
0064 }
0065 }
0066
0067 @Override
0068 protected Type type() { return Type.FETCH_SHUFFLE_BLOCKS; }
0069
0070 @Override
0071 public String toString() {
0072 return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
0073 .append("appId", appId)
0074 .append("execId", execId)
0075 .append("shuffleId", shuffleId)
0076 .append("mapIds", Arrays.toString(mapIds))
0077 .append("reduceIds", Arrays.deepToString(reduceIds))
0078 .append("batchFetchEnabled", batchFetchEnabled)
0079 .toString();
0080 }
0081
0082 @Override
0083 public boolean equals(Object o) {
0084 if (this == o) return true;
0085 if (o == null || getClass() != o.getClass()) return false;
0086
0087 FetchShuffleBlocks that = (FetchShuffleBlocks) o;
0088
0089 if (shuffleId != that.shuffleId) return false;
0090 if (batchFetchEnabled != that.batchFetchEnabled) return false;
0091 if (!appId.equals(that.appId)) return false;
0092 if (!execId.equals(that.execId)) return false;
0093 if (!Arrays.equals(mapIds, that.mapIds)) return false;
0094 return Arrays.deepEquals(reduceIds, that.reduceIds);
0095 }
0096
0097 @Override
0098 public int hashCode() {
0099 int result = appId.hashCode();
0100 result = 31 * result + execId.hashCode();
0101 result = 31 * result + shuffleId;
0102 result = 31 * result + Arrays.hashCode(mapIds);
0103 result = 31 * result + Arrays.deepHashCode(reduceIds);
0104 result = 31 * result + (batchFetchEnabled ? 1 : 0);
0105 return result;
0106 }
0107
0108 @Override
0109 public int encodedLength() {
0110 int encodedLengthOfReduceIds = 0;
0111 for (int[] ids: reduceIds) {
0112 encodedLengthOfReduceIds += Encoders.IntArrays.encodedLength(ids);
0113 }
0114 return Encoders.Strings.encodedLength(appId)
0115 + Encoders.Strings.encodedLength(execId)
0116 + 4
0117 + Encoders.LongArrays.encodedLength(mapIds)
0118 + 4
0119 + encodedLengthOfReduceIds
0120 + 1;
0121 }
0122
0123 @Override
0124 public void encode(ByteBuf buf) {
0125 Encoders.Strings.encode(buf, appId);
0126 Encoders.Strings.encode(buf, execId);
0127 buf.writeInt(shuffleId);
0128 Encoders.LongArrays.encode(buf, mapIds);
0129 buf.writeInt(reduceIds.length);
0130 for (int[] ids: reduceIds) {
0131 Encoders.IntArrays.encode(buf, ids);
0132 }
0133 buf.writeBoolean(batchFetchEnabled);
0134 }
0135
0136 public static FetchShuffleBlocks decode(ByteBuf buf) {
0137 String appId = Encoders.Strings.decode(buf);
0138 String execId = Encoders.Strings.decode(buf);
0139 int shuffleId = buf.readInt();
0140 long[] mapIds = Encoders.LongArrays.decode(buf);
0141 int reduceIdsSize = buf.readInt();
0142 int[][] reduceIds = new int[reduceIdsSize][];
0143 for (int i = 0; i < reduceIdsSize; i++) {
0144 reduceIds[i] = Encoders.IntArrays.decode(buf);
0145 }
0146 boolean batchFetchEnabled = buf.readBoolean();
0147 return new FetchShuffleBlocks(appId, execId, shuffleId, mapIds, reduceIds, batchFetchEnabled);
0148 }
0149 }