Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.shuffle.sort;
0019 
0020 import java.io.IOException;
0021 
0022 import org.junit.Test;
0023 
0024 import org.apache.spark.SparkConf;
0025 import org.apache.spark.internal.config.package$;
0026 import org.apache.spark.memory.*;
0027 import org.apache.spark.unsafe.memory.MemoryBlock;
0028 
0029 import static org.apache.spark.shuffle.sort.PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES;
0030 import static org.apache.spark.shuffle.sort.PackedRecordPointer.MAXIMUM_PARTITION_ID;
0031 import static org.junit.Assert.assertEquals;
0032 import static org.junit.Assert.assertFalse;
0033 
0034 public class PackedRecordPointerSuite {
0035 
0036   @Test
0037   public void heap() throws IOException {
0038     final SparkConf conf = new SparkConf().set(package$.MODULE$.MEMORY_OFFHEAP_ENABLED(), false);
0039     final TaskMemoryManager memoryManager =
0040       new TaskMemoryManager(new TestMemoryManager(conf), 0);
0041     final MemoryConsumer c = new TestMemoryConsumer(memoryManager, MemoryMode.ON_HEAP);
0042     final MemoryBlock page0 = memoryManager.allocatePage(128, c);
0043     final MemoryBlock page1 = memoryManager.allocatePage(128, c);
0044     final long addressInPage1 = memoryManager.encodePageNumberAndOffset(page1,
0045       page1.getBaseOffset() + 42);
0046     PackedRecordPointer packedPointer = new PackedRecordPointer();
0047     packedPointer.set(PackedRecordPointer.packPointer(addressInPage1, 360));
0048     assertEquals(360, packedPointer.getPartitionId());
0049     final long recordPointer = packedPointer.getRecordPointer();
0050     assertEquals(1, TaskMemoryManager.decodePageNumber(recordPointer));
0051     assertEquals(page1.getBaseOffset() + 42, memoryManager.getOffsetInPage(recordPointer));
0052     assertEquals(addressInPage1, recordPointer);
0053     memoryManager.cleanUpAllAllocatedMemory();
0054   }
0055 
0056   @Test
0057   public void offHeap() throws IOException {
0058     final SparkConf conf = new SparkConf()
0059       .set(package$.MODULE$.MEMORY_OFFHEAP_ENABLED(), true)
0060       .set(package$.MODULE$.MEMORY_OFFHEAP_SIZE(), 10000L);
0061     final TaskMemoryManager memoryManager =
0062       new TaskMemoryManager(new TestMemoryManager(conf), 0);
0063     final MemoryConsumer c = new TestMemoryConsumer(memoryManager, MemoryMode.OFF_HEAP);
0064     final MemoryBlock page0 = memoryManager.allocatePage(128, c);
0065     final MemoryBlock page1 = memoryManager.allocatePage(128, c);
0066     final long addressInPage1 = memoryManager.encodePageNumberAndOffset(page1,
0067       page1.getBaseOffset() + 42);
0068     PackedRecordPointer packedPointer = new PackedRecordPointer();
0069     packedPointer.set(PackedRecordPointer.packPointer(addressInPage1, 360));
0070     assertEquals(360, packedPointer.getPartitionId());
0071     final long recordPointer = packedPointer.getRecordPointer();
0072     assertEquals(1, TaskMemoryManager.decodePageNumber(recordPointer));
0073     assertEquals(page1.getBaseOffset() + 42, memoryManager.getOffsetInPage(recordPointer));
0074     assertEquals(addressInPage1, recordPointer);
0075     memoryManager.cleanUpAllAllocatedMemory();
0076   }
0077 
0078   @Test
0079   public void maximumPartitionIdCanBeEncoded() {
0080     PackedRecordPointer packedPointer = new PackedRecordPointer();
0081     packedPointer.set(PackedRecordPointer.packPointer(0, MAXIMUM_PARTITION_ID));
0082     assertEquals(MAXIMUM_PARTITION_ID, packedPointer.getPartitionId());
0083   }
0084 
0085   @Test
0086   public void partitionIdsGreaterThanMaximumPartitionIdWillOverflowOrTriggerError() {
0087     PackedRecordPointer packedPointer = new PackedRecordPointer();
0088     try {
0089       // Pointers greater than the maximum partition ID will overflow or trigger an assertion error
0090       packedPointer.set(PackedRecordPointer.packPointer(0, MAXIMUM_PARTITION_ID + 1));
0091       assertFalse(MAXIMUM_PARTITION_ID  + 1 == packedPointer.getPartitionId());
0092     } catch (AssertionError e ) {
0093       // pass
0094     }
0095   }
0096 
0097   @Test
0098   public void maximumOffsetInPageCanBeEncoded() {
0099     PackedRecordPointer packedPointer = new PackedRecordPointer();
0100     long address = TaskMemoryManager.encodePageNumberAndOffset(0, MAXIMUM_PAGE_SIZE_BYTES - 1);
0101     packedPointer.set(PackedRecordPointer.packPointer(address, 0));
0102     assertEquals(address, packedPointer.getRecordPointer());
0103   }
0104 
0105   @Test
0106   public void offsetsPastMaxOffsetInPageWillOverflow() {
0107     PackedRecordPointer packedPointer = new PackedRecordPointer();
0108     long address = TaskMemoryManager.encodePageNumberAndOffset(0, MAXIMUM_PAGE_SIZE_BYTES);
0109     packedPointer.set(PackedRecordPointer.packPointer(address, 0));
0110     assertEquals(0, packedPointer.getRecordPointer());
0111   }
0112 }