Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.network.shuffle.protocol;
0019 
0020 import java.util.*;
0021 
0022 import io.netty.buffer.ByteBuf;
0023 import org.apache.commons.lang3.builder.ToStringBuilder;
0024 import org.apache.commons.lang3.builder.ToStringStyle;
0025 
0026 import org.apache.spark.network.protocol.Encoders;
0027 
0028 // Needed by ScalaDoc. See SPARK-7726
0029 import static org.apache.spark.network.shuffle.protocol.BlockTransferMessage.Type;
0030 
0031 /** The reply to get local dirs giving back the dirs for each of the requested executors. */
0032 public class LocalDirsForExecutors extends BlockTransferMessage {
0033   private final String[] execIds;
0034   private final int[] numLocalDirsByExec;
0035   private final String[] allLocalDirs;
0036 
0037   public LocalDirsForExecutors(Map<String, String[]> localDirsByExec) {
0038     this.execIds = new String[localDirsByExec.size()];
0039     this.numLocalDirsByExec = new int[localDirsByExec.size()];
0040     ArrayList<String> localDirs = new ArrayList<>();
0041     int index = 0;
0042     for (Map.Entry<String, String[]> e: localDirsByExec.entrySet()) {
0043       execIds[index] = e.getKey();
0044       numLocalDirsByExec[index] = e.getValue().length;
0045       Collections.addAll(localDirs, e.getValue());
0046       index++;
0047     }
0048     this.allLocalDirs = localDirs.toArray(new String[0]);
0049   }
0050 
0051   private LocalDirsForExecutors(String[] execIds, int[] numLocalDirsByExec, String[] allLocalDirs) {
0052     this.execIds = execIds;
0053     this.numLocalDirsByExec = numLocalDirsByExec;
0054     this.allLocalDirs = allLocalDirs;
0055   }
0056 
0057   @Override
0058   protected Type type() { return Type.LOCAL_DIRS_FOR_EXECUTORS; }
0059 
0060   @Override
0061   public int hashCode() {
0062     return Arrays.hashCode(execIds);
0063   }
0064 
0065   @Override
0066   public String toString() {
0067     return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
0068       .append("execIds", Arrays.toString(execIds))
0069       .append("numLocalDirsByExec", Arrays.toString(numLocalDirsByExec))
0070       .append("allLocalDirs", Arrays.toString(allLocalDirs))
0071       .toString();
0072   }
0073 
0074   @Override
0075   public boolean equals(Object other) {
0076     if (other instanceof LocalDirsForExecutors) {
0077       LocalDirsForExecutors o = (LocalDirsForExecutors) other;
0078       return Arrays.equals(execIds, o.execIds)
0079         && Arrays.equals(numLocalDirsByExec, o.numLocalDirsByExec)
0080         && Arrays.equals(allLocalDirs, o.allLocalDirs);
0081     }
0082     return false;
0083   }
0084 
0085   @Override
0086   public int encodedLength() {
0087     return Encoders.StringArrays.encodedLength(execIds)
0088       + Encoders.IntArrays.encodedLength(numLocalDirsByExec)
0089       + Encoders.StringArrays.encodedLength(allLocalDirs);
0090   }
0091 
0092   @Override
0093   public void encode(ByteBuf buf) {
0094     Encoders.StringArrays.encode(buf, execIds);
0095     Encoders.IntArrays.encode(buf, numLocalDirsByExec);
0096     Encoders.StringArrays.encode(buf, allLocalDirs);
0097   }
0098 
0099   public static LocalDirsForExecutors decode(ByteBuf buf) {
0100     String[] execIds = Encoders.StringArrays.decode(buf);
0101     int[] numLocalDirsByExec = Encoders.IntArrays.decode(buf);
0102     String[] allLocalDirs = Encoders.StringArrays.decode(buf);
0103     return new LocalDirsForExecutors(execIds, numLocalDirsByExec, allLocalDirs);
0104   }
0105 
0106   public Map<String, String[]> getLocalDirsByExec() {
0107     Map<String, String[]> localDirsByExec = new HashMap<>();
0108     int index = 0;
0109     int localDirsIndex = 0;
0110     for (int length: numLocalDirsByExec) {
0111       localDirsByExec.put(execIds[index],
0112         Arrays.copyOfRange(allLocalDirs, localDirsIndex, localDirsIndex + length));
0113       localDirsIndex += length;
0114       index++;
0115     }
0116     return localDirsByExec;
0117   }
0118 }