Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.network.shuffle;
0019 
0020 import java.io.DataOutputStream;
0021 import java.io.File;
0022 import java.io.FileOutputStream;
0023 import java.io.IOException;
0024 import java.io.OutputStream;
0025 
0026 import com.google.common.io.Closeables;
0027 import com.google.common.io.Files;
0028 
0029 import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
0030 import org.apache.spark.network.util.JavaUtils;
0031 import org.junit.Assert;
0032 import org.slf4j.Logger;
0033 import org.slf4j.LoggerFactory;
0034 
0035 /**
0036  * Manages some sort-shuffle data, including the creation
0037  * and cleanup of directories that can be read by the {@link ExternalShuffleBlockResolver}.
0038  */
0039 public class TestShuffleDataContext {
0040   private static final Logger logger = LoggerFactory.getLogger(TestShuffleDataContext.class);
0041 
0042   public final String[] localDirs;
0043   public final int subDirsPerLocalDir;
0044 
0045   public TestShuffleDataContext(int numLocalDirs, int subDirsPerLocalDir) {
0046     this.localDirs = new String[numLocalDirs];
0047     this.subDirsPerLocalDir = subDirsPerLocalDir;
0048   }
0049 
0050   public void create() {
0051     for (int i = 0; i < localDirs.length; i ++) {
0052       localDirs[i] = Files.createTempDir().getAbsolutePath();
0053 
0054       for (int p = 0; p < subDirsPerLocalDir; p ++) {
0055         new File(localDirs[i], String.format("%02x", p)).mkdirs();
0056       }
0057     }
0058   }
0059 
0060   public void cleanup() {
0061     for (String localDir : localDirs) {
0062       try {
0063         JavaUtils.deleteRecursively(new File(localDir));
0064       } catch (IOException e) {
0065         logger.warn("Unable to cleanup localDir = " + localDir, e);
0066       }
0067     }
0068   }
0069 
0070   /** Creates reducer blocks in a sort-based data format within our local dirs. */
0071   public void insertSortShuffleData(int shuffleId, int mapId, byte[][] blocks) throws IOException {
0072     String blockId = "shuffle_" + shuffleId + "_" + mapId + "_0";
0073 
0074     OutputStream dataStream = null;
0075     DataOutputStream indexStream = null;
0076     boolean suppressExceptionsDuringClose = true;
0077 
0078     try {
0079       dataStream = new FileOutputStream(
0080         ExecutorDiskUtils.getFile(localDirs, subDirsPerLocalDir, blockId + ".data"));
0081       indexStream = new DataOutputStream(new FileOutputStream(
0082         ExecutorDiskUtils.getFile(localDirs, subDirsPerLocalDir, blockId + ".index")));
0083 
0084       long offset = 0;
0085       indexStream.writeLong(offset);
0086       for (byte[] block : blocks) {
0087         offset += block.length;
0088         dataStream.write(block);
0089         indexStream.writeLong(offset);
0090       }
0091       suppressExceptionsDuringClose = false;
0092     } finally {
0093       Closeables.close(dataStream, suppressExceptionsDuringClose);
0094       Closeables.close(indexStream, suppressExceptionsDuringClose);
0095     }
0096   }
0097 
0098   /** Creates spill file(s) within the local dirs. */
0099   public void insertSpillData() throws IOException {
0100     String filename = "temp_local_uuid";
0101     insertFile(filename);
0102   }
0103 
0104   public void insertBroadcastData() throws IOException {
0105     String filename = "broadcast_12_uuid";
0106     insertFile(filename);
0107   }
0108 
0109   public void insertTempShuffleData() throws IOException {
0110     String filename = "temp_shuffle_uuid";
0111     insertFile(filename);
0112   }
0113 
0114   public void insertCachedRddData(int rddId, int splitId, byte[] block) throws IOException {
0115     String blockId = "rdd_" + rddId + "_" + splitId;
0116     insertFile(blockId, block);
0117   }
0118 
0119   private void insertFile(String filename) throws IOException {
0120     insertFile(filename, new byte[] { 42 });
0121   }
0122 
0123   private void insertFile(String filename, byte[] block) throws IOException {
0124     OutputStream dataStream = null;
0125     File file = ExecutorDiskUtils.getFile(localDirs, subDirsPerLocalDir, filename);
0126     Assert.assertFalse("this test file has been already generated", file.exists());
0127     try {
0128       dataStream = new FileOutputStream(
0129         ExecutorDiskUtils.getFile(localDirs, subDirsPerLocalDir, filename));
0130       dataStream.write(block);
0131     } finally {
0132       Closeables.close(dataStream, false);
0133     }
0134   }
0135 
0136   /**
0137    * Creates an ExecutorShuffleInfo object based on the given shuffle manager which targets this
0138    * context's directories.
0139    */
0140   public ExecutorShuffleInfo createExecutorInfo(String shuffleManager) {
0141     return new ExecutorShuffleInfo(localDirs, subDirsPerLocalDir, shuffleManager);
0142   }
0143 }