Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.memory;
0019 
0020 import org.junit.Assert;
0021 import org.junit.Test;
0022 
0023 import org.apache.spark.SparkConf;
0024 import org.apache.spark.unsafe.memory.MemoryAllocator;
0025 import org.apache.spark.unsafe.memory.MemoryBlock;
0026 import org.apache.spark.internal.config.package$;
0027 
0028 public class TaskMemoryManagerSuite {
0029 
0030   @Test
0031   public void leakedPageMemoryIsDetected() {
0032     final TaskMemoryManager manager = new TaskMemoryManager(
0033       new UnifiedMemoryManager(
0034         new SparkConf().set(package$.MODULE$.MEMORY_OFFHEAP_ENABLED(), false),
0035         Long.MAX_VALUE,
0036         Long.MAX_VALUE / 2,
0037         1),
0038       0);
0039     final MemoryConsumer c = new TestMemoryConsumer(manager);
0040     manager.allocatePage(4096, c);  // leak memory
0041     Assert.assertEquals(4096, manager.getMemoryConsumptionForThisTask());
0042     Assert.assertEquals(4096, manager.cleanUpAllAllocatedMemory());
0043   }
0044 
0045   @Test
0046   public void encodePageNumberAndOffsetOffHeap() {
0047     final SparkConf conf = new SparkConf()
0048       .set(package$.MODULE$.MEMORY_OFFHEAP_ENABLED(), true)
0049       .set(package$.MODULE$.MEMORY_OFFHEAP_SIZE(), 1000L);
0050     final TaskMemoryManager manager = new TaskMemoryManager(new TestMemoryManager(conf), 0);
0051     final MemoryConsumer c = new TestMemoryConsumer(manager, MemoryMode.OFF_HEAP);
0052     final MemoryBlock dataPage = manager.allocatePage(256, c);
0053     // In off-heap mode, an offset is an absolute address that may require more than 51 bits to
0054     // encode. This test exercises that corner-case:
0055     final long offset = ((1L << TaskMemoryManager.OFFSET_BITS) + 10);
0056     final long encodedAddress = manager.encodePageNumberAndOffset(dataPage, offset);
0057     Assert.assertEquals(null, manager.getPage(encodedAddress));
0058     Assert.assertEquals(offset, manager.getOffsetInPage(encodedAddress));
0059     manager.freePage(dataPage, c);
0060   }
0061 
0062   @Test
0063   public void encodePageNumberAndOffsetOnHeap() {
0064     final TaskMemoryManager manager = new TaskMemoryManager(
0065       new TestMemoryManager(
0066         new SparkConf().set(package$.MODULE$.MEMORY_OFFHEAP_ENABLED(), false)), 0);
0067     final MemoryConsumer c = new TestMemoryConsumer(manager, MemoryMode.ON_HEAP);
0068     final MemoryBlock dataPage = manager.allocatePage(256, c);
0069     final long encodedAddress = manager.encodePageNumberAndOffset(dataPage, 64);
0070     Assert.assertEquals(dataPage.getBaseObject(), manager.getPage(encodedAddress));
0071     Assert.assertEquals(64, manager.getOffsetInPage(encodedAddress));
0072   }
0073 
0074   @Test
0075   public void freeingPageSetsPageNumberToSpecialConstant() {
0076     final TaskMemoryManager manager = new TaskMemoryManager(
0077       new TestMemoryManager(
0078         new SparkConf().set(package$.MODULE$.MEMORY_OFFHEAP_ENABLED(), false)), 0);
0079     final MemoryConsumer c = new TestMemoryConsumer(manager, MemoryMode.ON_HEAP);
0080     final MemoryBlock dataPage = manager.allocatePage(256, c);
0081     c.freePage(dataPage);
0082     Assert.assertEquals(MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER, dataPage.pageNumber);
0083   }
0084 
0085   @Test(expected = AssertionError.class)
0086   public void freeingPageDirectlyInAllocatorTriggersAssertionError() {
0087     final TaskMemoryManager manager = new TaskMemoryManager(
0088       new TestMemoryManager(
0089         new SparkConf().set(package$.MODULE$.MEMORY_OFFHEAP_ENABLED(), false)), 0);
0090     final MemoryConsumer c = new TestMemoryConsumer(manager, MemoryMode.ON_HEAP);
0091     final MemoryBlock dataPage = manager.allocatePage(256, c);
0092     MemoryAllocator.HEAP.free(dataPage);
0093   }
0094 
0095   @Test(expected = AssertionError.class)
0096   public void callingFreePageOnDirectlyAllocatedPageTriggersAssertionError() {
0097     final TaskMemoryManager manager = new TaskMemoryManager(
0098       new TestMemoryManager(
0099         new SparkConf().set(package$.MODULE$.MEMORY_OFFHEAP_ENABLED(), false)), 0);
0100     final MemoryConsumer c = new TestMemoryConsumer(manager, MemoryMode.ON_HEAP);
0101     final MemoryBlock dataPage = MemoryAllocator.HEAP.allocate(256);
0102     manager.freePage(dataPage, c);
0103   }
0104 
0105   @Test
0106   public void cooperativeSpilling() {
0107     final TestMemoryManager memoryManager = new TestMemoryManager(new SparkConf());
0108     memoryManager.limit(100);
0109     final TaskMemoryManager manager = new TaskMemoryManager(memoryManager, 0);
0110 
0111     TestMemoryConsumer c1 = new TestMemoryConsumer(manager);
0112     TestMemoryConsumer c2 = new TestMemoryConsumer(manager);
0113     c1.use(100);
0114     Assert.assertEquals(100, c1.getUsed());
0115     c2.use(100);
0116     Assert.assertEquals(100, c2.getUsed());
0117     Assert.assertEquals(0, c1.getUsed());  // spilled
0118     c1.use(100);
0119     Assert.assertEquals(100, c1.getUsed());
0120     Assert.assertEquals(0, c2.getUsed());  // spilled
0121 
0122     c1.use(50);
0123     Assert.assertEquals(50, c1.getUsed());  // spilled
0124     Assert.assertEquals(0, c2.getUsed());
0125     c2.use(50);
0126     Assert.assertEquals(50, c1.getUsed());
0127     Assert.assertEquals(50, c2.getUsed());
0128 
0129     c1.use(100);
0130     Assert.assertEquals(100, c1.getUsed());
0131     Assert.assertEquals(0, c2.getUsed());  // spilled
0132 
0133     c1.free(20);
0134     Assert.assertEquals(80, c1.getUsed());
0135     c2.use(10);
0136     Assert.assertEquals(80, c1.getUsed());
0137     Assert.assertEquals(10, c2.getUsed());
0138     c2.use(100);
0139     Assert.assertEquals(100, c2.getUsed());
0140     Assert.assertEquals(0, c1.getUsed());  // spilled
0141 
0142     c1.free(0);
0143     c2.free(100);
0144     Assert.assertEquals(0, manager.cleanUpAllAllocatedMemory());
0145   }
0146 
0147   @Test
0148   public void cooperativeSpilling2() {
0149     final TestMemoryManager memoryManager = new TestMemoryManager(new SparkConf());
0150     memoryManager.limit(100);
0151     final TaskMemoryManager manager = new TaskMemoryManager(memoryManager, 0);
0152 
0153     TestMemoryConsumer c1 = new TestMemoryConsumer(manager);
0154     TestMemoryConsumer c2 = new TestMemoryConsumer(manager);
0155     TestMemoryConsumer c3 = new TestMemoryConsumer(manager);
0156 
0157     c1.use(20);
0158     Assert.assertEquals(20, c1.getUsed());
0159     c2.use(80);
0160     Assert.assertEquals(80, c2.getUsed());
0161     c3.use(80);
0162     Assert.assertEquals(20, c1.getUsed());  // c1: not spilled
0163     Assert.assertEquals(0, c2.getUsed());   // c2: spilled as it has required size of memory
0164     Assert.assertEquals(80, c3.getUsed());
0165 
0166     c2.use(80);
0167     Assert.assertEquals(20, c1.getUsed());  // c1: not spilled
0168     Assert.assertEquals(0, c3.getUsed());   // c3: spilled as it has required size of memory
0169     Assert.assertEquals(80, c2.getUsed());
0170 
0171     c3.use(10);
0172     Assert.assertEquals(0, c1.getUsed());   // c1: spilled as it has required size of memory
0173     Assert.assertEquals(80, c2.getUsed());  // c2: not spilled as spilling c1 already satisfies c3
0174     Assert.assertEquals(10, c3.getUsed());
0175 
0176     c1.free(0);
0177     c2.free(80);
0178     c3.free(10);
0179     Assert.assertEquals(0, manager.cleanUpAllAllocatedMemory());
0180   }
0181 
0182   @Test
0183   public void shouldNotForceSpillingInDifferentModes() {
0184     final TestMemoryManager memoryManager = new TestMemoryManager(new SparkConf());
0185     memoryManager.limit(100);
0186     final TaskMemoryManager manager = new TaskMemoryManager(memoryManager, 0);
0187 
0188     TestMemoryConsumer c1 = new TestMemoryConsumer(manager, MemoryMode.ON_HEAP);
0189     TestMemoryConsumer c2 = new TestMemoryConsumer(manager, MemoryMode.OFF_HEAP);
0190     c1.use(80);
0191     Assert.assertEquals(80, c1.getUsed());
0192     c2.use(80);
0193     Assert.assertEquals(20, c2.getUsed());  // not enough memory
0194     Assert.assertEquals(80, c1.getUsed());  // not spilled
0195 
0196     c2.use(10);
0197     Assert.assertEquals(10, c2.getUsed());  // spilled
0198     Assert.assertEquals(80, c1.getUsed());  // not spilled
0199   }
0200 
0201   @Test
0202   public void offHeapConfigurationBackwardsCompatibility() {
0203     // Tests backwards-compatibility with the old `spark.unsafe.offHeap` configuration, which
0204     // was deprecated in Spark 1.6 and replaced by `spark.memory.offHeap.enabled` (see SPARK-12251).
0205     final SparkConf conf = new SparkConf()
0206       .set("spark.unsafe.offHeap", "true")
0207       .set(package$.MODULE$.MEMORY_OFFHEAP_SIZE(), 1000L);
0208     final TaskMemoryManager manager = new TaskMemoryManager(new TestMemoryManager(conf), 0);
0209     Assert.assertSame(MemoryMode.OFF_HEAP, manager.tungstenMemoryMode);
0210   }
0211 
0212 }