Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.network.shuffle.protocol;
0019 
0020 import java.util.Arrays;
0021 import java.util.Objects;
0022 
0023 import io.netty.buffer.ByteBuf;
0024 import org.apache.commons.lang3.builder.ToStringBuilder;
0025 import org.apache.commons.lang3.builder.ToStringStyle;
0026 
0027 import org.apache.spark.network.protocol.Encoders;
0028 
0029 // Needed by ScalaDoc. See SPARK-7726
0030 import static org.apache.spark.network.shuffle.protocol.BlockTransferMessage.Type;
0031 
0032 /**
0033  * A request to Upload a block, which the destination should receive as a stream.
0034  *
0035  * The actual block data is not contained here.  It will be passed to the StreamCallbackWithID
0036  * that is returned from RpcHandler.receiveStream()
0037  */
0038 public class UploadBlockStream extends BlockTransferMessage {
0039   public final String blockId;
0040   public final byte[] metadata;
0041 
0042   public UploadBlockStream(String blockId, byte[] metadata) {
0043     this.blockId = blockId;
0044     this.metadata = metadata;
0045   }
0046 
0047   @Override
0048   protected Type type() { return Type.UPLOAD_BLOCK_STREAM; }
0049 
0050   @Override
0051   public int hashCode() {
0052     int objectsHashCode = Objects.hashCode(blockId);
0053     return objectsHashCode * 41 + Arrays.hashCode(metadata);
0054   }
0055 
0056   @Override
0057   public String toString() {
0058     return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
0059       .append("blockId", blockId)
0060       .append("metadata size", metadata.length)
0061       .toString();
0062   }
0063 
0064   @Override
0065   public boolean equals(Object other) {
0066     if (other != null && other instanceof UploadBlockStream) {
0067       UploadBlockStream o = (UploadBlockStream) other;
0068       return Objects.equals(blockId, o.blockId)
0069         && Arrays.equals(metadata, o.metadata);
0070     }
0071     return false;
0072   }
0073 
0074   @Override
0075   public int encodedLength() {
0076     return Encoders.Strings.encodedLength(blockId)
0077       + Encoders.ByteArrays.encodedLength(metadata);
0078   }
0079 
0080   @Override
0081   public void encode(ByteBuf buf) {
0082     Encoders.Strings.encode(buf, blockId);
0083     Encoders.ByteArrays.encode(buf, metadata);
0084   }
0085 
0086   public static UploadBlockStream decode(ByteBuf buf) {
0087     String blockId = Encoders.Strings.decode(buf);
0088     byte[] metadata = Encoders.ByteArrays.decode(buf);
0089     return new UploadBlockStream(blockId, metadata);
0090   }
0091 }