0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.shuffle.sort;
0019
0020 import java.io.IOException;
0021
0022 import org.junit.Test;
0023
0024 import org.apache.spark.SparkConf;
0025 import org.apache.spark.internal.config.package$;
0026 import org.apache.spark.memory.*;
0027 import org.apache.spark.unsafe.memory.MemoryBlock;
0028
0029 import static org.apache.spark.shuffle.sort.PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES;
0030 import static org.apache.spark.shuffle.sort.PackedRecordPointer.MAXIMUM_PARTITION_ID;
0031 import static org.junit.Assert.assertEquals;
0032 import static org.junit.Assert.assertFalse;
0033
0034 public class PackedRecordPointerSuite {
0035
0036 @Test
0037 public void heap() throws IOException {
0038 final SparkConf conf = new SparkConf().set(package$.MODULE$.MEMORY_OFFHEAP_ENABLED(), false);
0039 final TaskMemoryManager memoryManager =
0040 new TaskMemoryManager(new TestMemoryManager(conf), 0);
0041 final MemoryConsumer c = new TestMemoryConsumer(memoryManager, MemoryMode.ON_HEAP);
0042 final MemoryBlock page0 = memoryManager.allocatePage(128, c);
0043 final MemoryBlock page1 = memoryManager.allocatePage(128, c);
0044 final long addressInPage1 = memoryManager.encodePageNumberAndOffset(page1,
0045 page1.getBaseOffset() + 42);
0046 PackedRecordPointer packedPointer = new PackedRecordPointer();
0047 packedPointer.set(PackedRecordPointer.packPointer(addressInPage1, 360));
0048 assertEquals(360, packedPointer.getPartitionId());
0049 final long recordPointer = packedPointer.getRecordPointer();
0050 assertEquals(1, TaskMemoryManager.decodePageNumber(recordPointer));
0051 assertEquals(page1.getBaseOffset() + 42, memoryManager.getOffsetInPage(recordPointer));
0052 assertEquals(addressInPage1, recordPointer);
0053 memoryManager.cleanUpAllAllocatedMemory();
0054 }
0055
0056 @Test
0057 public void offHeap() throws IOException {
0058 final SparkConf conf = new SparkConf()
0059 .set(package$.MODULE$.MEMORY_OFFHEAP_ENABLED(), true)
0060 .set(package$.MODULE$.MEMORY_OFFHEAP_SIZE(), 10000L);
0061 final TaskMemoryManager memoryManager =
0062 new TaskMemoryManager(new TestMemoryManager(conf), 0);
0063 final MemoryConsumer c = new TestMemoryConsumer(memoryManager, MemoryMode.OFF_HEAP);
0064 final MemoryBlock page0 = memoryManager.allocatePage(128, c);
0065 final MemoryBlock page1 = memoryManager.allocatePage(128, c);
0066 final long addressInPage1 = memoryManager.encodePageNumberAndOffset(page1,
0067 page1.getBaseOffset() + 42);
0068 PackedRecordPointer packedPointer = new PackedRecordPointer();
0069 packedPointer.set(PackedRecordPointer.packPointer(addressInPage1, 360));
0070 assertEquals(360, packedPointer.getPartitionId());
0071 final long recordPointer = packedPointer.getRecordPointer();
0072 assertEquals(1, TaskMemoryManager.decodePageNumber(recordPointer));
0073 assertEquals(page1.getBaseOffset() + 42, memoryManager.getOffsetInPage(recordPointer));
0074 assertEquals(addressInPage1, recordPointer);
0075 memoryManager.cleanUpAllAllocatedMemory();
0076 }
0077
0078 @Test
0079 public void maximumPartitionIdCanBeEncoded() {
0080 PackedRecordPointer packedPointer = new PackedRecordPointer();
0081 packedPointer.set(PackedRecordPointer.packPointer(0, MAXIMUM_PARTITION_ID));
0082 assertEquals(MAXIMUM_PARTITION_ID, packedPointer.getPartitionId());
0083 }
0084
0085 @Test
0086 public void partitionIdsGreaterThanMaximumPartitionIdWillOverflowOrTriggerError() {
0087 PackedRecordPointer packedPointer = new PackedRecordPointer();
0088 try {
0089
0090 packedPointer.set(PackedRecordPointer.packPointer(0, MAXIMUM_PARTITION_ID + 1));
0091 assertFalse(MAXIMUM_PARTITION_ID + 1 == packedPointer.getPartitionId());
0092 } catch (AssertionError e ) {
0093
0094 }
0095 }
0096
0097 @Test
0098 public void maximumOffsetInPageCanBeEncoded() {
0099 PackedRecordPointer packedPointer = new PackedRecordPointer();
0100 long address = TaskMemoryManager.encodePageNumberAndOffset(0, MAXIMUM_PAGE_SIZE_BYTES - 1);
0101 packedPointer.set(PackedRecordPointer.packPointer(address, 0));
0102 assertEquals(address, packedPointer.getRecordPointer());
0103 }
0104
0105 @Test
0106 public void offsetsPastMaxOffsetInPageWillOverflow() {
0107 PackedRecordPointer packedPointer = new PackedRecordPointer();
0108 long address = TaskMemoryManager.encodePageNumberAndOffset(0, MAXIMUM_PAGE_SIZE_BYTES);
0109 packedPointer.set(PackedRecordPointer.packPointer(address, 0));
0110 assertEquals(0, packedPointer.getRecordPointer());
0111 }
0112 }