0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.memory;
0019
0020 import org.junit.Assert;
0021 import org.junit.Test;
0022
0023 import org.apache.spark.SparkConf;
0024 import org.apache.spark.unsafe.memory.MemoryAllocator;
0025 import org.apache.spark.unsafe.memory.MemoryBlock;
0026 import org.apache.spark.internal.config.package$;
0027
0028 public class TaskMemoryManagerSuite {
0029
0030 @Test
0031 public void leakedPageMemoryIsDetected() {
0032 final TaskMemoryManager manager = new TaskMemoryManager(
0033 new UnifiedMemoryManager(
0034 new SparkConf().set(package$.MODULE$.MEMORY_OFFHEAP_ENABLED(), false),
0035 Long.MAX_VALUE,
0036 Long.MAX_VALUE / 2,
0037 1),
0038 0);
0039 final MemoryConsumer c = new TestMemoryConsumer(manager);
0040 manager.allocatePage(4096, c);
0041 Assert.assertEquals(4096, manager.getMemoryConsumptionForThisTask());
0042 Assert.assertEquals(4096, manager.cleanUpAllAllocatedMemory());
0043 }
0044
0045 @Test
0046 public void encodePageNumberAndOffsetOffHeap() {
0047 final SparkConf conf = new SparkConf()
0048 .set(package$.MODULE$.MEMORY_OFFHEAP_ENABLED(), true)
0049 .set(package$.MODULE$.MEMORY_OFFHEAP_SIZE(), 1000L);
0050 final TaskMemoryManager manager = new TaskMemoryManager(new TestMemoryManager(conf), 0);
0051 final MemoryConsumer c = new TestMemoryConsumer(manager, MemoryMode.OFF_HEAP);
0052 final MemoryBlock dataPage = manager.allocatePage(256, c);
0053
0054
0055 final long offset = ((1L << TaskMemoryManager.OFFSET_BITS) + 10);
0056 final long encodedAddress = manager.encodePageNumberAndOffset(dataPage, offset);
0057 Assert.assertEquals(null, manager.getPage(encodedAddress));
0058 Assert.assertEquals(offset, manager.getOffsetInPage(encodedAddress));
0059 manager.freePage(dataPage, c);
0060 }
0061
0062 @Test
0063 public void encodePageNumberAndOffsetOnHeap() {
0064 final TaskMemoryManager manager = new TaskMemoryManager(
0065 new TestMemoryManager(
0066 new SparkConf().set(package$.MODULE$.MEMORY_OFFHEAP_ENABLED(), false)), 0);
0067 final MemoryConsumer c = new TestMemoryConsumer(manager, MemoryMode.ON_HEAP);
0068 final MemoryBlock dataPage = manager.allocatePage(256, c);
0069 final long encodedAddress = manager.encodePageNumberAndOffset(dataPage, 64);
0070 Assert.assertEquals(dataPage.getBaseObject(), manager.getPage(encodedAddress));
0071 Assert.assertEquals(64, manager.getOffsetInPage(encodedAddress));
0072 }
0073
0074 @Test
0075 public void freeingPageSetsPageNumberToSpecialConstant() {
0076 final TaskMemoryManager manager = new TaskMemoryManager(
0077 new TestMemoryManager(
0078 new SparkConf().set(package$.MODULE$.MEMORY_OFFHEAP_ENABLED(), false)), 0);
0079 final MemoryConsumer c = new TestMemoryConsumer(manager, MemoryMode.ON_HEAP);
0080 final MemoryBlock dataPage = manager.allocatePage(256, c);
0081 c.freePage(dataPage);
0082 Assert.assertEquals(MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER, dataPage.pageNumber);
0083 }
0084
0085 @Test(expected = AssertionError.class)
0086 public void freeingPageDirectlyInAllocatorTriggersAssertionError() {
0087 final TaskMemoryManager manager = new TaskMemoryManager(
0088 new TestMemoryManager(
0089 new SparkConf().set(package$.MODULE$.MEMORY_OFFHEAP_ENABLED(), false)), 0);
0090 final MemoryConsumer c = new TestMemoryConsumer(manager, MemoryMode.ON_HEAP);
0091 final MemoryBlock dataPage = manager.allocatePage(256, c);
0092 MemoryAllocator.HEAP.free(dataPage);
0093 }
0094
0095 @Test(expected = AssertionError.class)
0096 public void callingFreePageOnDirectlyAllocatedPageTriggersAssertionError() {
0097 final TaskMemoryManager manager = new TaskMemoryManager(
0098 new TestMemoryManager(
0099 new SparkConf().set(package$.MODULE$.MEMORY_OFFHEAP_ENABLED(), false)), 0);
0100 final MemoryConsumer c = new TestMemoryConsumer(manager, MemoryMode.ON_HEAP);
0101 final MemoryBlock dataPage = MemoryAllocator.HEAP.allocate(256);
0102 manager.freePage(dataPage, c);
0103 }
0104
0105 @Test
0106 public void cooperativeSpilling() {
0107 final TestMemoryManager memoryManager = new TestMemoryManager(new SparkConf());
0108 memoryManager.limit(100);
0109 final TaskMemoryManager manager = new TaskMemoryManager(memoryManager, 0);
0110
0111 TestMemoryConsumer c1 = new TestMemoryConsumer(manager);
0112 TestMemoryConsumer c2 = new TestMemoryConsumer(manager);
0113 c1.use(100);
0114 Assert.assertEquals(100, c1.getUsed());
0115 c2.use(100);
0116 Assert.assertEquals(100, c2.getUsed());
0117 Assert.assertEquals(0, c1.getUsed());
0118 c1.use(100);
0119 Assert.assertEquals(100, c1.getUsed());
0120 Assert.assertEquals(0, c2.getUsed());
0121
0122 c1.use(50);
0123 Assert.assertEquals(50, c1.getUsed());
0124 Assert.assertEquals(0, c2.getUsed());
0125 c2.use(50);
0126 Assert.assertEquals(50, c1.getUsed());
0127 Assert.assertEquals(50, c2.getUsed());
0128
0129 c1.use(100);
0130 Assert.assertEquals(100, c1.getUsed());
0131 Assert.assertEquals(0, c2.getUsed());
0132
0133 c1.free(20);
0134 Assert.assertEquals(80, c1.getUsed());
0135 c2.use(10);
0136 Assert.assertEquals(80, c1.getUsed());
0137 Assert.assertEquals(10, c2.getUsed());
0138 c2.use(100);
0139 Assert.assertEquals(100, c2.getUsed());
0140 Assert.assertEquals(0, c1.getUsed());
0141
0142 c1.free(0);
0143 c2.free(100);
0144 Assert.assertEquals(0, manager.cleanUpAllAllocatedMemory());
0145 }
0146
0147 @Test
0148 public void cooperativeSpilling2() {
0149 final TestMemoryManager memoryManager = new TestMemoryManager(new SparkConf());
0150 memoryManager.limit(100);
0151 final TaskMemoryManager manager = new TaskMemoryManager(memoryManager, 0);
0152
0153 TestMemoryConsumer c1 = new TestMemoryConsumer(manager);
0154 TestMemoryConsumer c2 = new TestMemoryConsumer(manager);
0155 TestMemoryConsumer c3 = new TestMemoryConsumer(manager);
0156
0157 c1.use(20);
0158 Assert.assertEquals(20, c1.getUsed());
0159 c2.use(80);
0160 Assert.assertEquals(80, c2.getUsed());
0161 c3.use(80);
0162 Assert.assertEquals(20, c1.getUsed());
0163 Assert.assertEquals(0, c2.getUsed());
0164 Assert.assertEquals(80, c3.getUsed());
0165
0166 c2.use(80);
0167 Assert.assertEquals(20, c1.getUsed());
0168 Assert.assertEquals(0, c3.getUsed());
0169 Assert.assertEquals(80, c2.getUsed());
0170
0171 c3.use(10);
0172 Assert.assertEquals(0, c1.getUsed());
0173 Assert.assertEquals(80, c2.getUsed());
0174 Assert.assertEquals(10, c3.getUsed());
0175
0176 c1.free(0);
0177 c2.free(80);
0178 c3.free(10);
0179 Assert.assertEquals(0, manager.cleanUpAllAllocatedMemory());
0180 }
0181
0182 @Test
0183 public void shouldNotForceSpillingInDifferentModes() {
0184 final TestMemoryManager memoryManager = new TestMemoryManager(new SparkConf());
0185 memoryManager.limit(100);
0186 final TaskMemoryManager manager = new TaskMemoryManager(memoryManager, 0);
0187
0188 TestMemoryConsumer c1 = new TestMemoryConsumer(manager, MemoryMode.ON_HEAP);
0189 TestMemoryConsumer c2 = new TestMemoryConsumer(manager, MemoryMode.OFF_HEAP);
0190 c1.use(80);
0191 Assert.assertEquals(80, c1.getUsed());
0192 c2.use(80);
0193 Assert.assertEquals(20, c2.getUsed());
0194 Assert.assertEquals(80, c1.getUsed());
0195
0196 c2.use(10);
0197 Assert.assertEquals(10, c2.getUsed());
0198 Assert.assertEquals(80, c1.getUsed());
0199 }
0200
0201 @Test
0202 public void offHeapConfigurationBackwardsCompatibility() {
0203
0204
0205 final SparkConf conf = new SparkConf()
0206 .set("spark.unsafe.offHeap", "true")
0207 .set(package$.MODULE$.MEMORY_OFFHEAP_SIZE(), 1000L);
0208 final TaskMemoryManager manager = new TaskMemoryManager(new TestMemoryManager(conf), 0);
0209 Assert.assertSame(MemoryMode.OFF_HEAP, manager.tungstenMemoryMode);
0210 }
0211
0212 }