Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.network.shuffle.protocol;
0019 
0020 import java.util.Arrays;
0021 
0022 import io.netty.buffer.ByteBuf;
0023 import org.apache.commons.lang3.builder.ToStringBuilder;
0024 import org.apache.commons.lang3.builder.ToStringStyle;
0025 
0026 import org.apache.spark.network.protocol.Encoders;
0027 
0028 // Needed by ScalaDoc. See SPARK-7726
0029 import static org.apache.spark.network.shuffle.protocol.BlockTransferMessage.Type;
0030 
0031 /** Request to read a set of blocks. Returns {@link StreamHandle}. */
0032 public class FetchShuffleBlocks extends BlockTransferMessage {
0033   public final String appId;
0034   public final String execId;
0035   public final int shuffleId;
0036   // The length of mapIds must equal to reduceIds.size(), for the i-th mapId in mapIds,
0037   // it corresponds to the i-th int[] in reduceIds, which contains all reduce id for this map id.
0038   public final long[] mapIds;
0039   // When batchFetchEnabled=true, reduceIds[i] contains 2 elements: startReduceId (inclusive) and
0040   // endReduceId (exclusive) for the mapper mapIds[i].
0041   // When batchFetchEnabled=false, reduceIds[i] contains all the reduce IDs that mapper mapIds[i]
0042   // needs to fetch.
0043   public final int[][] reduceIds;
0044   public final boolean batchFetchEnabled;
0045 
0046   public FetchShuffleBlocks(
0047       String appId,
0048       String execId,
0049       int shuffleId,
0050       long[] mapIds,
0051       int[][] reduceIds,
0052       boolean batchFetchEnabled) {
0053     this.appId = appId;
0054     this.execId = execId;
0055     this.shuffleId = shuffleId;
0056     this.mapIds = mapIds;
0057     this.reduceIds = reduceIds;
0058     assert(mapIds.length == reduceIds.length);
0059     this.batchFetchEnabled = batchFetchEnabled;
0060     if (batchFetchEnabled) {
0061       for (int[] ids: reduceIds) {
0062         assert(ids.length == 2);
0063       }
0064     }
0065   }
0066 
0067   @Override
0068   protected Type type() { return Type.FETCH_SHUFFLE_BLOCKS; }
0069 
0070   @Override
0071   public String toString() {
0072     return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
0073       .append("appId", appId)
0074       .append("execId", execId)
0075       .append("shuffleId", shuffleId)
0076       .append("mapIds", Arrays.toString(mapIds))
0077       .append("reduceIds", Arrays.deepToString(reduceIds))
0078       .append("batchFetchEnabled", batchFetchEnabled)
0079       .toString();
0080   }
0081 
0082   @Override
0083   public boolean equals(Object o) {
0084     if (this == o) return true;
0085     if (o == null || getClass() != o.getClass()) return false;
0086 
0087     FetchShuffleBlocks that = (FetchShuffleBlocks) o;
0088 
0089     if (shuffleId != that.shuffleId) return false;
0090     if (batchFetchEnabled != that.batchFetchEnabled) return false;
0091     if (!appId.equals(that.appId)) return false;
0092     if (!execId.equals(that.execId)) return false;
0093     if (!Arrays.equals(mapIds, that.mapIds)) return false;
0094     return Arrays.deepEquals(reduceIds, that.reduceIds);
0095   }
0096 
0097   @Override
0098   public int hashCode() {
0099     int result = appId.hashCode();
0100     result = 31 * result + execId.hashCode();
0101     result = 31 * result + shuffleId;
0102     result = 31 * result + Arrays.hashCode(mapIds);
0103     result = 31 * result + Arrays.deepHashCode(reduceIds);
0104     result = 31 * result + (batchFetchEnabled ? 1 : 0);
0105     return result;
0106   }
0107 
0108   @Override
0109   public int encodedLength() {
0110     int encodedLengthOfReduceIds = 0;
0111     for (int[] ids: reduceIds) {
0112       encodedLengthOfReduceIds += Encoders.IntArrays.encodedLength(ids);
0113     }
0114     return Encoders.Strings.encodedLength(appId)
0115       + Encoders.Strings.encodedLength(execId)
0116       + 4 /* encoded length of shuffleId */
0117       + Encoders.LongArrays.encodedLength(mapIds)
0118       + 4 /* encoded length of reduceIds.size() */
0119       + encodedLengthOfReduceIds
0120       + 1; /* encoded length of batchFetchEnabled */
0121   }
0122 
0123   @Override
0124   public void encode(ByteBuf buf) {
0125     Encoders.Strings.encode(buf, appId);
0126     Encoders.Strings.encode(buf, execId);
0127     buf.writeInt(shuffleId);
0128     Encoders.LongArrays.encode(buf, mapIds);
0129     buf.writeInt(reduceIds.length);
0130     for (int[] ids: reduceIds) {
0131       Encoders.IntArrays.encode(buf, ids);
0132     }
0133     buf.writeBoolean(batchFetchEnabled);
0134   }
0135 
0136   public static FetchShuffleBlocks decode(ByteBuf buf) {
0137     String appId = Encoders.Strings.decode(buf);
0138     String execId = Encoders.Strings.decode(buf);
0139     int shuffleId = buf.readInt();
0140     long[] mapIds = Encoders.LongArrays.decode(buf);
0141     int reduceIdsSize = buf.readInt();
0142     int[][] reduceIds = new int[reduceIdsSize][];
0143     for (int i = 0; i < reduceIdsSize; i++) {
0144       reduceIds[i] = Encoders.IntArrays.decode(buf);
0145     }
0146     boolean batchFetchEnabled = buf.readBoolean();
0147     return new FetchShuffleBlocks(appId, execId, shuffleId, mapIds, reduceIds, batchFetchEnabled);
0148   }
0149 }