Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.network.protocol;
0019 
0020 import java.io.IOException;
0021 import java.nio.ByteBuffer;
0022 import java.nio.channels.WritableByteChannel;
0023 import javax.annotation.Nullable;
0024 
0025 import com.google.common.base.Preconditions;
0026 import io.netty.buffer.ByteBuf;
0027 import io.netty.channel.FileRegion;
0028 import io.netty.util.ReferenceCountUtil;
0029 
0030 import org.apache.spark.network.buffer.ManagedBuffer;
0031 import org.apache.spark.network.util.AbstractFileRegion;
0032 
0033 /**
0034  * A wrapper message that holds two separate pieces (a header and a body).
0035  *
0036  * The header must be a ByteBuf, while the body can be a ByteBuf or a FileRegion.
0037  */
0038 class MessageWithHeader extends AbstractFileRegion {
0039 
0040   @Nullable private final ManagedBuffer managedBuffer;
0041   private final ByteBuf header;
0042   private final int headerLength;
0043   private final Object body;
0044   private final long bodyLength;
0045   private long totalBytesTransferred;
0046 
0047   /**
0048    * When the write buffer size is larger than this limit, I/O will be done in chunks of this size.
0049    * The size should not be too large as it will waste underlying memory copy. e.g. If network
0050    * available buffer is smaller than this limit, the data cannot be sent within one single write
0051    * operation while it still will make memory copy with this size.
0052    */
0053   private static final int NIO_BUFFER_LIMIT = 256 * 1024;
0054 
0055   /**
0056    * Construct a new MessageWithHeader.
0057    *
0058    * @param managedBuffer the {@link ManagedBuffer} that the message body came from. This needs to
0059    *                      be passed in so that the buffer can be freed when this message is
0060    *                      deallocated. Ownership of the caller's reference to this buffer is
0061    *                      transferred to this class, so if the caller wants to continue to use the
0062    *                      ManagedBuffer in other messages then they will need to call retain() on
0063    *                      it before passing it to this constructor. This may be null if and only if
0064    *                      `body` is a {@link FileRegion}.
0065    * @param header the message header.
0066    * @param body the message body. Must be either a {@link ByteBuf} or a {@link FileRegion}.
0067    * @param bodyLength the length of the message body, in bytes.
0068      */
0069   MessageWithHeader(
0070       @Nullable ManagedBuffer managedBuffer,
0071       ByteBuf header,
0072       Object body,
0073       long bodyLength) {
0074     Preconditions.checkArgument(body instanceof ByteBuf || body instanceof FileRegion,
0075       "Body must be a ByteBuf or a FileRegion.");
0076     this.managedBuffer = managedBuffer;
0077     this.header = header;
0078     this.headerLength = header.readableBytes();
0079     this.body = body;
0080     this.bodyLength = bodyLength;
0081   }
0082 
0083   @Override
0084   public long count() {
0085     return headerLength + bodyLength;
0086   }
0087 
0088   @Override
0089   public long position() {
0090     return 0;
0091   }
0092 
0093   @Override
0094   public long transferred() {
0095     return totalBytesTransferred;
0096   }
0097 
0098   /**
0099    * This code is more complicated than you would think because we might require multiple
0100    * transferTo invocations in order to transfer a single MessageWithHeader to avoid busy waiting.
0101    *
0102    * The contract is that the caller will ensure position is properly set to the total number
0103    * of bytes transferred so far (i.e. value returned by transferred()).
0104    */
0105   @Override
0106   public long transferTo(final WritableByteChannel target, final long position) throws IOException {
0107     Preconditions.checkArgument(position == totalBytesTransferred, "Invalid position.");
0108     // Bytes written for header in this call.
0109     long writtenHeader = 0;
0110     if (header.readableBytes() > 0) {
0111       writtenHeader = copyByteBuf(header, target);
0112       totalBytesTransferred += writtenHeader;
0113       if (header.readableBytes() > 0) {
0114         return writtenHeader;
0115       }
0116     }
0117 
0118     // Bytes written for body in this call.
0119     long writtenBody = 0;
0120     if (body instanceof FileRegion) {
0121       writtenBody = ((FileRegion) body).transferTo(target, totalBytesTransferred - headerLength);
0122     } else if (body instanceof ByteBuf) {
0123       writtenBody = copyByteBuf((ByteBuf) body, target);
0124     }
0125     totalBytesTransferred += writtenBody;
0126 
0127     return writtenHeader + writtenBody;
0128   }
0129 
0130   @Override
0131   protected void deallocate() {
0132     header.release();
0133     ReferenceCountUtil.release(body);
0134     if (managedBuffer != null) {
0135       managedBuffer.release();
0136     }
0137   }
0138 
0139   private int copyByteBuf(ByteBuf buf, WritableByteChannel target) throws IOException {
0140     // SPARK-24578: cap the sub-region's size of returned nio buffer to improve the performance
0141     // for the case that the passed-in buffer has too many components.
0142     int length = Math.min(buf.readableBytes(), NIO_BUFFER_LIMIT);
0143     // If the ByteBuf holds more then one ByteBuffer we should better call nioBuffers(...)
0144     // to eliminate extra memory copies.
0145     int written = 0;
0146     if (buf.nioBufferCount() == 1) {
0147       ByteBuffer buffer = buf.nioBuffer(buf.readerIndex(), length);
0148       written = target.write(buffer);
0149     } else {
0150       ByteBuffer[] buffers = buf.nioBuffers(buf.readerIndex(), length);
0151       for (ByteBuffer buffer: buffers) {
0152         int remaining = buffer.remaining();
0153         int w = target.write(buffer);
0154         written += w;
0155         if (w < remaining) {
0156           // Could not write all, we need to break now.
0157           break;
0158         }
0159       }
0160     }
0161     buf.skipBytes(written);
0162     return written;
0163   }
0164 
0165   @Override
0166   public MessageWithHeader touch(Object o) {
0167     super.touch(o);
0168     header.touch(o);
0169     ReferenceCountUtil.touch(body, o);
0170     return this;
0171   }
0172 
0173   @Override
0174   public MessageWithHeader retain(int increment) {
0175     super.retain(increment);
0176     header.retain(increment);
0177     ReferenceCountUtil.retain(body, increment);
0178     if (managedBuffer != null) {
0179       for (int i = 0; i < increment; i++) {
0180         managedBuffer.retain();
0181       }
0182     }
0183     return this;
0184   }
0185 
0186   @Override
0187   public boolean release(int decrement) {
0188     header.release(decrement);
0189     ReferenceCountUtil.release(body, decrement);
0190     if (managedBuffer != null) {
0191       for (int i = 0; i < decrement; i++) {
0192         managedBuffer.release();
0193       }
0194     }
0195     return super.release(decrement);
0196   }
0197 }