Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.network.protocol;
0019 
0020 import io.netty.buffer.ByteBuf;
0021 
0022 import org.apache.spark.network.buffer.ManagedBuffer;
0023 
0024 /** An on-the-wire transmittable message. */
0025 public interface Message extends Encodable {
0026   /** Used to identify this request type. */
0027   Type type();
0028 
0029   /** An optional body for the message. */
0030   ManagedBuffer body();
0031 
0032   /** Whether to include the body of the message in the same frame as the message. */
0033   boolean isBodyInFrame();
0034 
0035   /** Preceding every serialized Message is its type, which allows us to deserialize it. */
0036   enum Type implements Encodable {
0037     ChunkFetchRequest(0), ChunkFetchSuccess(1), ChunkFetchFailure(2),
0038     RpcRequest(3), RpcResponse(4), RpcFailure(5),
0039     StreamRequest(6), StreamResponse(7), StreamFailure(8),
0040     OneWayMessage(9), UploadStream(10), User(-1);
0041 
0042     private final byte id;
0043 
0044     Type(int id) {
0045       assert id < 128 : "Cannot have more than 128 message types";
0046       this.id = (byte) id;
0047     }
0048 
0049     public byte id() { return id; }
0050 
0051     @Override public int encodedLength() { return 1; }
0052 
0053     @Override public void encode(ByteBuf buf) { buf.writeByte(id); }
0054 
0055     public static Type decode(ByteBuf buf) {
0056       byte id = buf.readByte();
0057       switch (id) {
0058         case 0: return ChunkFetchRequest;
0059         case 1: return ChunkFetchSuccess;
0060         case 2: return ChunkFetchFailure;
0061         case 3: return RpcRequest;
0062         case 4: return RpcResponse;
0063         case 5: return RpcFailure;
0064         case 6: return StreamRequest;
0065         case 7: return StreamResponse;
0066         case 8: return StreamFailure;
0067         case 9: return OneWayMessage;
0068         case 10: return UploadStream;
0069         case -1: throw new IllegalArgumentException("User type messages cannot be decoded.");
0070         default: throw new IllegalArgumentException("Unknown message type: " + id);
0071       }
0072     }
0073   }
0074 }