0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.network.shuffle.protocol;
0019
0020 import java.util.Objects;
0021
0022 import io.netty.buffer.ByteBuf;
0023 import org.apache.commons.lang3.builder.ToStringBuilder;
0024 import org.apache.commons.lang3.builder.ToStringStyle;
0025
0026
0027 import static org.apache.spark.network.shuffle.protocol.BlockTransferMessage.Type;
0028
0029
0030
0031
0032
0033 public class StreamHandle extends BlockTransferMessage {
0034 public final long streamId;
0035 public final int numChunks;
0036
0037 public StreamHandle(long streamId, int numChunks) {
0038 this.streamId = streamId;
0039 this.numChunks = numChunks;
0040 }
0041
0042 @Override
0043 protected Type type() { return Type.STREAM_HANDLE; }
0044
0045 @Override
0046 public int hashCode() {
0047 return Objects.hash(streamId, numChunks);
0048 }
0049
0050 @Override
0051 public String toString() {
0052 return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
0053 .append("streamId", streamId)
0054 .append("numChunks", numChunks)
0055 .toString();
0056 }
0057
0058 @Override
0059 public boolean equals(Object other) {
0060 if (other != null && other instanceof StreamHandle) {
0061 StreamHandle o = (StreamHandle) other;
0062 return Objects.equals(streamId, o.streamId)
0063 && Objects.equals(numChunks, o.numChunks);
0064 }
0065 return false;
0066 }
0067
0068 @Override
0069 public int encodedLength() {
0070 return 8 + 4;
0071 }
0072
0073 @Override
0074 public void encode(ByteBuf buf) {
0075 buf.writeLong(streamId);
0076 buf.writeInt(numChunks);
0077 }
0078
0079 public static StreamHandle decode(ByteBuf buf) {
0080 long streamId = buf.readLong();
0081 int numChunks = buf.readInt();
0082 return new StreamHandle(streamId, numChunks);
0083 }
0084 }