0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.network.util;
0019
0020 import java.io.IOException;
0021 import java.nio.ByteBuffer;
0022 import java.nio.channels.ClosedChannelException;
0023 import java.nio.channels.ReadableByteChannel;
0024
0025 import io.netty.buffer.ByteBuf;
0026
0027 public class ByteArrayReadableChannel implements ReadableByteChannel {
0028 private ByteBuf data;
0029 private boolean closed;
0030
0031 public void feedData(ByteBuf buf) throws ClosedChannelException {
0032 if (closed) {
0033 throw new ClosedChannelException();
0034 }
0035 data = buf;
0036 }
0037
0038 @Override
0039 public int read(ByteBuffer dst) throws IOException {
0040 if (closed) {
0041 throw new ClosedChannelException();
0042 }
0043 int totalRead = 0;
0044 while (data.readableBytes() > 0 && dst.remaining() > 0) {
0045 int bytesToRead = Math.min(data.readableBytes(), dst.remaining());
0046 dst.put(data.readSlice(bytesToRead).nioBuffer());
0047 totalRead += bytesToRead;
0048 }
0049
0050 return totalRead;
0051 }
0052
0053 @Override
0054 public void close() {
0055 closed = true;
0056 }
0057
0058 @Override
0059 public boolean isOpen() {
0060 return !closed;
0061 }
0062 }