Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.network;
0019 
0020 import java.io.File;
0021 import java.io.FileOutputStream;
0022 import java.io.IOException;
0023 import java.nio.ByteBuffer;
0024 import java.util.Random;
0025 
0026 import com.google.common.io.Files;
0027 
0028 import org.apache.spark.network.buffer.FileSegmentManagedBuffer;
0029 import org.apache.spark.network.buffer.ManagedBuffer;
0030 import org.apache.spark.network.buffer.NioManagedBuffer;
0031 import org.apache.spark.network.util.JavaUtils;
0032 import org.apache.spark.network.util.TransportConf;
0033 
0034 class StreamTestHelper {
0035   static final String[] STREAMS = { "largeBuffer", "smallBuffer", "emptyBuffer", "file" };
0036 
0037   final File testFile;
0038   final File tempDir;
0039 
0040   final ByteBuffer emptyBuffer;
0041   final ByteBuffer smallBuffer;
0042   final ByteBuffer largeBuffer;
0043 
0044   private static ByteBuffer createBuffer(int bufSize) {
0045     ByteBuffer buf = ByteBuffer.allocate(bufSize);
0046     for (int i = 0; i < bufSize; i ++) {
0047       buf.put((byte) i);
0048     }
0049     buf.flip();
0050     return buf;
0051   }
0052 
0053   StreamTestHelper() throws Exception {
0054     tempDir = Files.createTempDir();
0055     emptyBuffer = createBuffer(0);
0056     smallBuffer = createBuffer(100);
0057     largeBuffer = createBuffer(100000);
0058 
0059     testFile = File.createTempFile("stream-test-file", "txt", tempDir);
0060     FileOutputStream fp = new FileOutputStream(testFile);
0061     try {
0062       Random rnd = new Random();
0063       for (int i = 0; i < 512; i++) {
0064         byte[] fileContent = new byte[1024];
0065         rnd.nextBytes(fileContent);
0066         fp.write(fileContent);
0067       }
0068     } finally {
0069       fp.close();
0070     }
0071   }
0072 
0073   public ByteBuffer srcBuffer(String name) {
0074     switch (name) {
0075       case "largeBuffer":
0076         return largeBuffer;
0077       case "smallBuffer":
0078         return smallBuffer;
0079       case "emptyBuffer":
0080         return emptyBuffer;
0081       default:
0082         throw new IllegalArgumentException("Invalid stream: " + name);
0083     }
0084   }
0085 
0086   public ManagedBuffer openStream(TransportConf conf, String streamId) {
0087     switch (streamId) {
0088       case "file":
0089         return new FileSegmentManagedBuffer(conf, testFile, 0, testFile.length());
0090       default:
0091         return new NioManagedBuffer(srcBuffer(streamId));
0092     }
0093   }
0094 
0095   void cleanup() {
0096     if (tempDir != null) {
0097       try {
0098         JavaUtils.deleteRecursively(tempDir);
0099       } catch (IOException io) {
0100         throw new RuntimeException(io);
0101       }
0102     }
0103   }
0104 }