0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.unsafe.memory;
0019
0020 import javax.annotation.concurrent.GuardedBy;
0021 import java.lang.ref.WeakReference;
0022 import java.util.HashMap;
0023 import java.util.LinkedList;
0024 import java.util.Map;
0025
0026 import org.apache.spark.unsafe.Platform;
0027
0028
0029
0030
0031 public class HeapMemoryAllocator implements MemoryAllocator {
0032
0033 @GuardedBy("this")
0034 private final Map<Long, LinkedList<WeakReference<long[]>>> bufferPoolsBySize = new HashMap<>();
0035
0036 private static final int POOLING_THRESHOLD_BYTES = 1024 * 1024;
0037
0038
0039
0040
0041
0042 private boolean shouldPool(long size) {
0043
0044 return size >= POOLING_THRESHOLD_BYTES;
0045 }
0046
0047 @Override
0048 public MemoryBlock allocate(long size) throws OutOfMemoryError {
0049 int numWords = (int) ((size + 7) / 8);
0050 long alignedSize = numWords * 8L;
0051 assert (alignedSize >= size);
0052 if (shouldPool(alignedSize)) {
0053 synchronized (this) {
0054 final LinkedList<WeakReference<long[]>> pool = bufferPoolsBySize.get(alignedSize);
0055 if (pool != null) {
0056 while (!pool.isEmpty()) {
0057 final WeakReference<long[]> arrayReference = pool.pop();
0058 final long[] array = arrayReference.get();
0059 if (array != null) {
0060 assert (array.length * 8L >= size);
0061 MemoryBlock memory = new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, size);
0062 if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
0063 memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_CLEAN_VALUE);
0064 }
0065 return memory;
0066 }
0067 }
0068 bufferPoolsBySize.remove(alignedSize);
0069 }
0070 }
0071 }
0072 long[] array = new long[numWords];
0073 MemoryBlock memory = new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, size);
0074 if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
0075 memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_CLEAN_VALUE);
0076 }
0077 return memory;
0078 }
0079
0080 @Override
0081 public void free(MemoryBlock memory) {
0082 assert (memory.obj != null) :
0083 "baseObject was null; are you trying to use the on-heap allocator to free off-heap memory?";
0084 assert (memory.pageNumber != MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER) :
0085 "page has already been freed";
0086 assert ((memory.pageNumber == MemoryBlock.NO_PAGE_NUMBER)
0087 || (memory.pageNumber == MemoryBlock.FREED_IN_TMM_PAGE_NUMBER)) :
0088 "TMM-allocated pages must first be freed via TMM.freePage(), not directly in allocator " +
0089 "free()";
0090
0091 final long size = memory.size();
0092 if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
0093 memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_FREED_VALUE);
0094 }
0095
0096
0097 memory.pageNumber = MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER;
0098
0099
0100
0101 long[] array = (long[]) memory.obj;
0102 memory.setObjAndOffset(null, 0);
0103
0104 long alignedSize = ((size + 7) / 8) * 8;
0105 if (shouldPool(alignedSize)) {
0106 synchronized (this) {
0107 LinkedList<WeakReference<long[]>> pool = bufferPoolsBySize.get(alignedSize);
0108 if (pool == null) {
0109 pool = new LinkedList<>();
0110 bufferPoolsBySize.put(alignedSize, pool);
0111 }
0112 pool.add(new WeakReference<>(array));
0113 }
0114 } else {
0115
0116 }
0117 }
0118 }