0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.network.protocol;
0019
0020 import java.io.IOException;
0021 import java.nio.ByteBuffer;
0022
0023 import io.netty.buffer.ByteBuf;
0024 import org.apache.commons.lang3.builder.ToStringBuilder;
0025 import org.apache.commons.lang3.builder.ToStringStyle;
0026
0027 import org.apache.spark.network.buffer.ManagedBuffer;
0028 import org.apache.spark.network.buffer.NettyManagedBuffer;
0029
0030
0031
0032
0033 public final class UploadStream extends AbstractMessage implements RequestMessage {
0034
0035 public final long requestId;
0036 public final ManagedBuffer meta;
0037 public final long bodyByteCount;
0038
0039 public UploadStream(long requestId, ManagedBuffer meta, ManagedBuffer body) {
0040 super(body, false);
0041 this.requestId = requestId;
0042 this.meta = meta;
0043 bodyByteCount = body.size();
0044 }
0045
0046
0047
0048 private UploadStream(long requestId, ManagedBuffer meta, long bodyByteCount) {
0049 super(null, false);
0050 this.requestId = requestId;
0051 this.meta = meta;
0052 this.bodyByteCount = bodyByteCount;
0053 }
0054
0055 @Override
0056 public Message.Type type() { return Type.UploadStream; }
0057
0058 @Override
0059 public int encodedLength() {
0060
0061 return 8 + 4 + ((int) meta.size()) + 8;
0062 }
0063
0064 @Override
0065 public void encode(ByteBuf buf) {
0066 buf.writeLong(requestId);
0067 try {
0068 ByteBuffer metaBuf = meta.nioByteBuffer();
0069 buf.writeInt(metaBuf.remaining());
0070 buf.writeBytes(metaBuf);
0071 } catch (IOException io) {
0072 throw new RuntimeException(io);
0073 }
0074 buf.writeLong(bodyByteCount);
0075 }
0076
0077 public static UploadStream decode(ByteBuf buf) {
0078 long requestId = buf.readLong();
0079 int metaSize = buf.readInt();
0080 ManagedBuffer meta = new NettyManagedBuffer(buf.readRetainedSlice(metaSize));
0081 long bodyByteCount = buf.readLong();
0082
0083
0084 return new UploadStream(requestId, meta, bodyByteCount);
0085 }
0086
0087 @Override
0088 public int hashCode() {
0089 return Long.hashCode(requestId);
0090 }
0091
0092 @Override
0093 public boolean equals(Object other) {
0094 if (other instanceof UploadStream) {
0095 UploadStream o = (UploadStream) other;
0096 return requestId == o.requestId && super.equals(o);
0097 }
0098 return false;
0099 }
0100
0101 @Override
0102 public String toString() {
0103 return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
0104 .append("requestId", requestId)
0105 .append("body", body())
0106 .toString();
0107 }
0108 }