Back to home page




0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0018 package org.apache.spark.unsafe.memory;
0020 import javax.annotation.concurrent.GuardedBy;
0021 import java.lang.ref.WeakReference;
0022 import java.util.HashMap;
0023 import java.util.LinkedList;
0024 import java.util.Map;
0026 import org.apache.spark.unsafe.Platform;
0028 /**
0029  * A simple {@link MemoryAllocator} that can allocate up to 16GB using a JVM long primitive array.
0030  */
0031 public class HeapMemoryAllocator implements MemoryAllocator {
0033   @GuardedBy("this")
0034   private final Map<Long, LinkedList<WeakReference<long[]>>> bufferPoolsBySize = new HashMap<>();
0036   private static final int POOLING_THRESHOLD_BYTES = 1024 * 1024;
0038   /**
0039    * Returns true if allocations of the given size should go through the pooling mechanism and
0040    * false otherwise.
0041    */
0042   private boolean shouldPool(long size) {
0043     // Very small allocations are less likely to benefit from pooling.
0044     return size >= POOLING_THRESHOLD_BYTES;
0045   }
0047   @Override
0048   public MemoryBlock allocate(long size) throws OutOfMemoryError {
0049     int numWords = (int) ((size + 7) / 8);
0050     long alignedSize = numWords * 8L;
0051     assert (alignedSize >= size);
0052     if (shouldPool(alignedSize)) {
0053       synchronized (this) {
0054         final LinkedList<WeakReference<long[]>> pool = bufferPoolsBySize.get(alignedSize);
0055         if (pool != null) {
0056           while (!pool.isEmpty()) {
0057             final WeakReference<long[]> arrayReference = pool.pop();
0058             final long[] array = arrayReference.get();
0059             if (array != null) {
0060               assert (array.length * 8L >= size);
0061               MemoryBlock memory = new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, size);
0062               if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
0063                 memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_CLEAN_VALUE);
0064               }
0065               return memory;
0066             }
0067           }
0068           bufferPoolsBySize.remove(alignedSize);
0069         }
0070       }
0071     }
0072     long[] array = new long[numWords];
0073     MemoryBlock memory = new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, size);
0074     if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
0075       memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_CLEAN_VALUE);
0076     }
0077     return memory;
0078   }
0080   @Override
0081   public void free(MemoryBlock memory) {
0082     assert (memory.obj != null) :
0083       "baseObject was null; are you trying to use the on-heap allocator to free off-heap memory?";
0084     assert (memory.pageNumber != MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER) :
0085       "page has already been freed";
0086     assert ((memory.pageNumber == MemoryBlock.NO_PAGE_NUMBER)
0087             || (memory.pageNumber == MemoryBlock.FREED_IN_TMM_PAGE_NUMBER)) :
0088       "TMM-allocated pages must first be freed via TMM.freePage(), not directly in allocator " +
0089         "free()";
0091     final long size = memory.size();
0092     if (MemoryAllocator.MEMORY_DEBUG_FILL_ENABLED) {
0093       memory.fill(MemoryAllocator.MEMORY_DEBUG_FILL_FREED_VALUE);
0094     }
0096     // Mark the page as freed (so we can detect double-frees).
0097     memory.pageNumber = MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER;
0099     // As an additional layer of defense against use-after-free bugs, we mutate the
0100     // MemoryBlock to null out its reference to the long[] array.
0101     long[] array = (long[]) memory.obj;
0102     memory.setObjAndOffset(null, 0);
0104     long alignedSize = ((size + 7) / 8) * 8;
0105     if (shouldPool(alignedSize)) {
0106       synchronized (this) {
0107         LinkedList<WeakReference<long[]>> pool = bufferPoolsBySize.get(alignedSize);
0108         if (pool == null) {
0109           pool = new LinkedList<>();
0110           bufferPoolsBySize.put(alignedSize, pool);
0111         }
0112         pool.add(new WeakReference<>(array));
0113       }
0114     } else {
0115       // Do nothing
0116     }
0117   }
0118 }