Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.network.protocol;
0019 
0020 import java.io.IOException;
0021 import java.nio.ByteBuffer;
0022 
0023 import io.netty.buffer.ByteBuf;
0024 import org.apache.commons.lang3.builder.ToStringBuilder;
0025 import org.apache.commons.lang3.builder.ToStringStyle;
0026 
0027 import org.apache.spark.network.buffer.ManagedBuffer;
0028 import org.apache.spark.network.buffer.NettyManagedBuffer;
0029 
0030 /**
0031  * An RPC with data that is sent outside of the frame, so it can be read as a stream.
0032  */
0033 public final class UploadStream extends AbstractMessage implements RequestMessage {
0034   /** Used to link an RPC request with its response. */
0035   public final long requestId;
0036   public final ManagedBuffer meta;
0037   public final long bodyByteCount;
0038 
0039   public UploadStream(long requestId, ManagedBuffer meta, ManagedBuffer body) {
0040     super(body, false); // body is *not* included in the frame
0041     this.requestId = requestId;
0042     this.meta = meta;
0043     bodyByteCount = body.size();
0044   }
0045 
0046   // this version is called when decoding the bytes on the receiving end.  The body is handled
0047   // separately.
0048   private UploadStream(long requestId, ManagedBuffer meta, long bodyByteCount) {
0049     super(null, false);
0050     this.requestId = requestId;
0051     this.meta = meta;
0052     this.bodyByteCount = bodyByteCount;
0053   }
0054 
0055   @Override
0056   public Message.Type type() { return Type.UploadStream; }
0057 
0058   @Override
0059   public int encodedLength() {
0060     // the requestId, meta size, meta and bodyByteCount (body is not included)
0061     return 8 + 4 + ((int) meta.size()) + 8;
0062   }
0063 
0064   @Override
0065   public void encode(ByteBuf buf) {
0066     buf.writeLong(requestId);
0067     try {
0068       ByteBuffer metaBuf = meta.nioByteBuffer();
0069       buf.writeInt(metaBuf.remaining());
0070       buf.writeBytes(metaBuf);
0071     } catch (IOException io) {
0072       throw new RuntimeException(io);
0073     }
0074     buf.writeLong(bodyByteCount);
0075   }
0076 
0077   public static UploadStream decode(ByteBuf buf) {
0078     long requestId = buf.readLong();
0079     int metaSize = buf.readInt();
0080     ManagedBuffer meta = new NettyManagedBuffer(buf.readRetainedSlice(metaSize));
0081     long bodyByteCount = buf.readLong();
0082     // This is called by the frame decoder, so the data is still null.  We need a StreamInterceptor
0083     // to read the data.
0084     return new UploadStream(requestId, meta, bodyByteCount);
0085   }
0086 
0087   @Override
0088   public int hashCode() {
0089     return Long.hashCode(requestId);
0090   }
0091 
0092   @Override
0093   public boolean equals(Object other) {
0094     if (other instanceof UploadStream) {
0095       UploadStream o = (UploadStream) other;
0096       return requestId == o.requestId && super.equals(o);
0097     }
0098     return false;
0099   }
0100 
0101   @Override
0102   public String toString() {
0103     return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
0104       .append("requestId", requestId)
0105       .append("body", body())
0106       .toString();
0107   }
0108 }