Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.network.util;
0019 
0020 import java.io.IOException;
0021 import java.nio.ByteBuffer;
0022 import java.nio.channels.ClosedChannelException;
0023 import java.nio.channels.ReadableByteChannel;
0024 
0025 import io.netty.buffer.ByteBuf;
0026 
0027 public class ByteArrayReadableChannel implements ReadableByteChannel {
0028   private ByteBuf data;
0029   private boolean closed;
0030 
0031   public void feedData(ByteBuf buf) throws ClosedChannelException {
0032     if (closed) {
0033       throw new ClosedChannelException();
0034     }
0035     data = buf;
0036   }
0037 
0038   @Override
0039   public int read(ByteBuffer dst) throws IOException {
0040     if (closed) {
0041       throw new ClosedChannelException();
0042     }
0043     int totalRead = 0;
0044     while (data.readableBytes() > 0 && dst.remaining() > 0) {
0045       int bytesToRead = Math.min(data.readableBytes(), dst.remaining());
0046       dst.put(data.readSlice(bytesToRead).nioBuffer());
0047       totalRead += bytesToRead;
0048     }
0049 
0050     return totalRead;
0051   }
0052 
0053   @Override
0054   public void close() {
0055     closed = true;
0056   }
0057 
0058   @Override
0059   public boolean isOpen() {
0060     return !closed;
0061   }
0062 }