0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.network.protocol;
0019
0020 import java.io.IOException;
0021 import java.nio.ByteBuffer;
0022 import java.nio.channels.WritableByteChannel;
0023 import javax.annotation.Nullable;
0024
0025 import com.google.common.base.Preconditions;
0026 import io.netty.buffer.ByteBuf;
0027 import io.netty.channel.FileRegion;
0028 import io.netty.util.ReferenceCountUtil;
0029
0030 import org.apache.spark.network.buffer.ManagedBuffer;
0031 import org.apache.spark.network.util.AbstractFileRegion;
0032
0033
0034
0035
0036
0037
0038 class MessageWithHeader extends AbstractFileRegion {
0039
0040 @Nullable private final ManagedBuffer managedBuffer;
0041 private final ByteBuf header;
0042 private final int headerLength;
0043 private final Object body;
0044 private final long bodyLength;
0045 private long totalBytesTransferred;
0046
0047
0048
0049
0050
0051
0052
0053 private static final int NIO_BUFFER_LIMIT = 256 * 1024;
0054
0055
0056
0057
0058
0059
0060
0061
0062
0063
0064
0065
0066
0067
0068
0069 MessageWithHeader(
0070 @Nullable ManagedBuffer managedBuffer,
0071 ByteBuf header,
0072 Object body,
0073 long bodyLength) {
0074 Preconditions.checkArgument(body instanceof ByteBuf || body instanceof FileRegion,
0075 "Body must be a ByteBuf or a FileRegion.");
0076 this.managedBuffer = managedBuffer;
0077 this.header = header;
0078 this.headerLength = header.readableBytes();
0079 this.body = body;
0080 this.bodyLength = bodyLength;
0081 }
0082
0083 @Override
0084 public long count() {
0085 return headerLength + bodyLength;
0086 }
0087
0088 @Override
0089 public long position() {
0090 return 0;
0091 }
0092
0093 @Override
0094 public long transferred() {
0095 return totalBytesTransferred;
0096 }
0097
0098
0099
0100
0101
0102
0103
0104
0105 @Override
0106 public long transferTo(final WritableByteChannel target, final long position) throws IOException {
0107 Preconditions.checkArgument(position == totalBytesTransferred, "Invalid position.");
0108
0109 long writtenHeader = 0;
0110 if (header.readableBytes() > 0) {
0111 writtenHeader = copyByteBuf(header, target);
0112 totalBytesTransferred += writtenHeader;
0113 if (header.readableBytes() > 0) {
0114 return writtenHeader;
0115 }
0116 }
0117
0118
0119 long writtenBody = 0;
0120 if (body instanceof FileRegion) {
0121 writtenBody = ((FileRegion) body).transferTo(target, totalBytesTransferred - headerLength);
0122 } else if (body instanceof ByteBuf) {
0123 writtenBody = copyByteBuf((ByteBuf) body, target);
0124 }
0125 totalBytesTransferred += writtenBody;
0126
0127 return writtenHeader + writtenBody;
0128 }
0129
0130 @Override
0131 protected void deallocate() {
0132 header.release();
0133 ReferenceCountUtil.release(body);
0134 if (managedBuffer != null) {
0135 managedBuffer.release();
0136 }
0137 }
0138
0139 private int copyByteBuf(ByteBuf buf, WritableByteChannel target) throws IOException {
0140
0141
0142 int length = Math.min(buf.readableBytes(), NIO_BUFFER_LIMIT);
0143
0144
0145 int written = 0;
0146 if (buf.nioBufferCount() == 1) {
0147 ByteBuffer buffer = buf.nioBuffer(buf.readerIndex(), length);
0148 written = target.write(buffer);
0149 } else {
0150 ByteBuffer[] buffers = buf.nioBuffers(buf.readerIndex(), length);
0151 for (ByteBuffer buffer: buffers) {
0152 int remaining = buffer.remaining();
0153 int w = target.write(buffer);
0154 written += w;
0155 if (w < remaining) {
0156
0157 break;
0158 }
0159 }
0160 }
0161 buf.skipBytes(written);
0162 return written;
0163 }
0164
0165 @Override
0166 public MessageWithHeader touch(Object o) {
0167 super.touch(o);
0168 header.touch(o);
0169 ReferenceCountUtil.touch(body, o);
0170 return this;
0171 }
0172
0173 @Override
0174 public MessageWithHeader retain(int increment) {
0175 super.retain(increment);
0176 header.retain(increment);
0177 ReferenceCountUtil.retain(body, increment);
0178 if (managedBuffer != null) {
0179 for (int i = 0; i < increment; i++) {
0180 managedBuffer.retain();
0181 }
0182 }
0183 return this;
0184 }
0185
0186 @Override
0187 public boolean release(int decrement) {
0188 header.release(decrement);
0189 ReferenceCountUtil.release(body, decrement);
0190 if (managedBuffer != null) {
0191 for (int i = 0; i < decrement; i++) {
0192 managedBuffer.release();
0193 }
0194 }
0195 return super.release(decrement);
0196 }
0197 }