0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.network;
0019
0020 import java.io.File;
0021 import java.io.FileOutputStream;
0022 import java.io.IOException;
0023 import java.nio.ByteBuffer;
0024 import java.util.Random;
0025
0026 import com.google.common.io.Files;
0027
0028 import org.apache.spark.network.buffer.FileSegmentManagedBuffer;
0029 import org.apache.spark.network.buffer.ManagedBuffer;
0030 import org.apache.spark.network.buffer.NioManagedBuffer;
0031 import org.apache.spark.network.util.JavaUtils;
0032 import org.apache.spark.network.util.TransportConf;
0033
0034 class StreamTestHelper {
0035 static final String[] STREAMS = { "largeBuffer", "smallBuffer", "emptyBuffer", "file" };
0036
0037 final File testFile;
0038 final File tempDir;
0039
0040 final ByteBuffer emptyBuffer;
0041 final ByteBuffer smallBuffer;
0042 final ByteBuffer largeBuffer;
0043
0044 private static ByteBuffer createBuffer(int bufSize) {
0045 ByteBuffer buf = ByteBuffer.allocate(bufSize);
0046 for (int i = 0; i < bufSize; i ++) {
0047 buf.put((byte) i);
0048 }
0049 buf.flip();
0050 return buf;
0051 }
0052
0053 StreamTestHelper() throws Exception {
0054 tempDir = Files.createTempDir();
0055 emptyBuffer = createBuffer(0);
0056 smallBuffer = createBuffer(100);
0057 largeBuffer = createBuffer(100000);
0058
0059 testFile = File.createTempFile("stream-test-file", "txt", tempDir);
0060 FileOutputStream fp = new FileOutputStream(testFile);
0061 try {
0062 Random rnd = new Random();
0063 for (int i = 0; i < 512; i++) {
0064 byte[] fileContent = new byte[1024];
0065 rnd.nextBytes(fileContent);
0066 fp.write(fileContent);
0067 }
0068 } finally {
0069 fp.close();
0070 }
0071 }
0072
0073 public ByteBuffer srcBuffer(String name) {
0074 switch (name) {
0075 case "largeBuffer":
0076 return largeBuffer;
0077 case "smallBuffer":
0078 return smallBuffer;
0079 case "emptyBuffer":
0080 return emptyBuffer;
0081 default:
0082 throw new IllegalArgumentException("Invalid stream: " + name);
0083 }
0084 }
0085
0086 public ManagedBuffer openStream(TransportConf conf, String streamId) {
0087 switch (streamId) {
0088 case "file":
0089 return new FileSegmentManagedBuffer(conf, testFile, 0, testFile.length());
0090 default:
0091 return new NioManagedBuffer(srcBuffer(streamId));
0092 }
0093 }
0094
0095 void cleanup() {
0096 if (tempDir != null) {
0097 try {
0098 JavaUtils.deleteRecursively(tempDir);
0099 } catch (IOException io) {
0100 throw new RuntimeException(io);
0101 }
0102 }
0103 }
0104 }