0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.network.shuffle;
0019
0020 import java.io.DataOutputStream;
0021 import java.io.File;
0022 import java.io.FileOutputStream;
0023 import java.io.IOException;
0024 import java.io.OutputStream;
0025
0026 import com.google.common.io.Closeables;
0027 import com.google.common.io.Files;
0028
0029 import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
0030 import org.apache.spark.network.util.JavaUtils;
0031 import org.junit.Assert;
0032 import org.slf4j.Logger;
0033 import org.slf4j.LoggerFactory;
0034
0035
0036
0037
0038
0039 public class TestShuffleDataContext {
0040 private static final Logger logger = LoggerFactory.getLogger(TestShuffleDataContext.class);
0041
0042 public final String[] localDirs;
0043 public final int subDirsPerLocalDir;
0044
0045 public TestShuffleDataContext(int numLocalDirs, int subDirsPerLocalDir) {
0046 this.localDirs = new String[numLocalDirs];
0047 this.subDirsPerLocalDir = subDirsPerLocalDir;
0048 }
0049
0050 public void create() {
0051 for (int i = 0; i < localDirs.length; i ++) {
0052 localDirs[i] = Files.createTempDir().getAbsolutePath();
0053
0054 for (int p = 0; p < subDirsPerLocalDir; p ++) {
0055 new File(localDirs[i], String.format("%02x", p)).mkdirs();
0056 }
0057 }
0058 }
0059
0060 public void cleanup() {
0061 for (String localDir : localDirs) {
0062 try {
0063 JavaUtils.deleteRecursively(new File(localDir));
0064 } catch (IOException e) {
0065 logger.warn("Unable to cleanup localDir = " + localDir, e);
0066 }
0067 }
0068 }
0069
0070
0071 public void insertSortShuffleData(int shuffleId, int mapId, byte[][] blocks) throws IOException {
0072 String blockId = "shuffle_" + shuffleId + "_" + mapId + "_0";
0073
0074 OutputStream dataStream = null;
0075 DataOutputStream indexStream = null;
0076 boolean suppressExceptionsDuringClose = true;
0077
0078 try {
0079 dataStream = new FileOutputStream(
0080 ExecutorDiskUtils.getFile(localDirs, subDirsPerLocalDir, blockId + ".data"));
0081 indexStream = new DataOutputStream(new FileOutputStream(
0082 ExecutorDiskUtils.getFile(localDirs, subDirsPerLocalDir, blockId + ".index")));
0083
0084 long offset = 0;
0085 indexStream.writeLong(offset);
0086 for (byte[] block : blocks) {
0087 offset += block.length;
0088 dataStream.write(block);
0089 indexStream.writeLong(offset);
0090 }
0091 suppressExceptionsDuringClose = false;
0092 } finally {
0093 Closeables.close(dataStream, suppressExceptionsDuringClose);
0094 Closeables.close(indexStream, suppressExceptionsDuringClose);
0095 }
0096 }
0097
0098
0099 public void insertSpillData() throws IOException {
0100 String filename = "temp_local_uuid";
0101 insertFile(filename);
0102 }
0103
0104 public void insertBroadcastData() throws IOException {
0105 String filename = "broadcast_12_uuid";
0106 insertFile(filename);
0107 }
0108
0109 public void insertTempShuffleData() throws IOException {
0110 String filename = "temp_shuffle_uuid";
0111 insertFile(filename);
0112 }
0113
0114 public void insertCachedRddData(int rddId, int splitId, byte[] block) throws IOException {
0115 String blockId = "rdd_" + rddId + "_" + splitId;
0116 insertFile(blockId, block);
0117 }
0118
0119 private void insertFile(String filename) throws IOException {
0120 insertFile(filename, new byte[] { 42 });
0121 }
0122
0123 private void insertFile(String filename, byte[] block) throws IOException {
0124 OutputStream dataStream = null;
0125 File file = ExecutorDiskUtils.getFile(localDirs, subDirsPerLocalDir, filename);
0126 Assert.assertFalse("this test file has been already generated", file.exists());
0127 try {
0128 dataStream = new FileOutputStream(
0129 ExecutorDiskUtils.getFile(localDirs, subDirsPerLocalDir, filename));
0130 dataStream.write(block);
0131 } finally {
0132 Closeables.close(dataStream, false);
0133 }
0134 }
0135
0136
0137
0138
0139
0140 public ExecutorShuffleInfo createExecutorInfo(String shuffleManager) {
0141 return new ExecutorShuffleInfo(localDirs, subDirsPerLocalDir, shuffleManager);
0142 }
0143 }