Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.memory;
0019 
0020 import javax.annotation.concurrent.GuardedBy;
0021 import java.io.IOException;
0022 import java.nio.channels.ClosedByInterruptException;
0023 import java.util.Arrays;
0024 import java.util.ArrayList;
0025 import java.util.BitSet;
0026 import java.util.HashSet;
0027 import java.util.List;
0028 import java.util.Map;
0029 import java.util.TreeMap;
0030 
0031 import com.google.common.annotations.VisibleForTesting;
0032 import org.slf4j.Logger;
0033 import org.slf4j.LoggerFactory;
0034 
0035 import org.apache.spark.unsafe.memory.MemoryBlock;
0036 import org.apache.spark.util.Utils;
0037 
0038 /**
0039  * Manages the memory allocated by an individual task.
0040  * <p>
0041  * Most of the complexity in this class deals with encoding of off-heap addresses into 64-bit longs.
0042  * In off-heap mode, memory can be directly addressed with 64-bit longs. In on-heap mode, memory is
0043  * addressed by the combination of a base Object reference and a 64-bit offset within that object.
0044  * This is a problem when we want to store pointers to data structures inside of other structures,
0045  * such as record pointers inside hashmaps or sorting buffers. Even if we decided to use 128 bits
0046  * to address memory, we can't just store the address of the base object since it's not guaranteed
0047  * to remain stable as the heap gets reorganized due to GC.
0048  * <p>
0049  * Instead, we use the following approach to encode record pointers in 64-bit longs: for off-heap
0050  * mode, just store the raw address, and for on-heap mode use the upper 13 bits of the address to
0051  * store a "page number" and the lower 51 bits to store an offset within this page. These page
0052  * numbers are used to index into a "page table" array inside of the MemoryManager in order to
0053  * retrieve the base object.
0054  * <p>
0055  * This allows us to address 8192 pages. In on-heap mode, the maximum page size is limited by the
0056  * maximum size of a long[] array, allowing us to address 8192 * (2^31 - 1) * 8 bytes, which is
0057  * approximately 140 terabytes of memory.
0058  */
0059 public class TaskMemoryManager {
0060 
0061   private static final Logger logger = LoggerFactory.getLogger(TaskMemoryManager.class);
0062 
0063   /** The number of bits used to address the page table. */
0064   private static final int PAGE_NUMBER_BITS = 13;
0065 
0066   /** The number of bits used to encode offsets in data pages. */
0067   @VisibleForTesting
0068   static final int OFFSET_BITS = 64 - PAGE_NUMBER_BITS;  // 51
0069 
0070   /** The number of entries in the page table. */
0071   private static final int PAGE_TABLE_SIZE = 1 << PAGE_NUMBER_BITS;
0072 
0073   /**
0074    * Maximum supported data page size (in bytes). In principle, the maximum addressable page size is
0075    * (1L &lt;&lt; OFFSET_BITS) bytes, which is 2+ petabytes. However, the on-heap allocator's
0076    * maximum page size is limited by the maximum amount of data that can be stored in a long[]
0077    * array, which is (2^31 - 1) * 8 bytes (or about 17 gigabytes). Therefore, we cap this at 17
0078    * gigabytes.
0079    */
0080   public static final long MAXIMUM_PAGE_SIZE_BYTES = ((1L << 31) - 1) * 8L;
0081 
0082   /** Bit mask for the lower 51 bits of a long. */
0083   private static final long MASK_LONG_LOWER_51_BITS = 0x7FFFFFFFFFFFFL;
0084 
0085   /**
0086    * Similar to an operating system's page table, this array maps page numbers into base object
0087    * pointers, allowing us to translate between the hashtable's internal 64-bit address
0088    * representation and the baseObject+offset representation which we use to support both on- and
0089    * off-heap addresses. When using an off-heap allocator, every entry in this map will be `null`.
0090    * When using an on-heap allocator, the entries in this map will point to pages' base objects.
0091    * Entries are added to this map as new data pages are allocated.
0092    */
0093   private final MemoryBlock[] pageTable = new MemoryBlock[PAGE_TABLE_SIZE];
0094 
0095   /**
0096    * Bitmap for tracking free pages.
0097    */
0098   private final BitSet allocatedPages = new BitSet(PAGE_TABLE_SIZE);
0099 
0100   private final MemoryManager memoryManager;
0101 
0102   private final long taskAttemptId;
0103 
0104   /**
0105    * Tracks whether we're on-heap or off-heap. For off-heap, we short-circuit most of these methods
0106    * without doing any masking or lookups. Since this branching should be well-predicted by the JIT,
0107    * this extra layer of indirection / abstraction hopefully shouldn't be too expensive.
0108    */
0109   final MemoryMode tungstenMemoryMode;
0110 
0111   /**
0112    * Tracks spillable memory consumers.
0113    */
0114   @GuardedBy("this")
0115   private final HashSet<MemoryConsumer> consumers;
0116 
0117   /**
0118    * The amount of memory that is acquired but not used.
0119    */
0120   private volatile long acquiredButNotUsed = 0L;
0121 
0122   /**
0123    * Construct a new TaskMemoryManager.
0124    */
0125   public TaskMemoryManager(MemoryManager memoryManager, long taskAttemptId) {
0126     this.tungstenMemoryMode = memoryManager.tungstenMemoryMode();
0127     this.memoryManager = memoryManager;
0128     this.taskAttemptId = taskAttemptId;
0129     this.consumers = new HashSet<>();
0130   }
0131 
0132   /**
0133    * Acquire N bytes of memory for a consumer. If there is no enough memory, it will call
0134    * spill() of consumers to release more memory.
0135    *
0136    * @return number of bytes successfully granted (<= N).
0137    */
0138   public long acquireExecutionMemory(long required, MemoryConsumer consumer) {
0139     assert(required >= 0);
0140     assert(consumer != null);
0141     MemoryMode mode = consumer.getMode();
0142     // If we are allocating Tungsten pages off-heap and receive a request to allocate on-heap
0143     // memory here, then it may not make sense to spill since that would only end up freeing
0144     // off-heap memory. This is subject to change, though, so it may be risky to make this
0145     // optimization now in case we forget to undo it late when making changes.
0146     synchronized (this) {
0147       long got = memoryManager.acquireExecutionMemory(required, taskAttemptId, mode);
0148 
0149       // Try to release memory from other consumers first, then we can reduce the frequency of
0150       // spilling, avoid to have too many spilled files.
0151       if (got < required) {
0152         // Call spill() on other consumers to release memory
0153         // Sort the consumers according their memory usage. So we avoid spilling the same consumer
0154         // which is just spilled in last few times and re-spilling on it will produce many small
0155         // spill files.
0156         TreeMap<Long, List<MemoryConsumer>> sortedConsumers = new TreeMap<>();
0157         for (MemoryConsumer c: consumers) {
0158           if (c != consumer && c.getUsed() > 0 && c.getMode() == mode) {
0159             long key = c.getUsed();
0160             List<MemoryConsumer> list =
0161                 sortedConsumers.computeIfAbsent(key, k -> new ArrayList<>(1));
0162             list.add(c);
0163           }
0164         }
0165         while (!sortedConsumers.isEmpty()) {
0166           // Get the consumer using the least memory more than the remaining required memory.
0167           Map.Entry<Long, List<MemoryConsumer>> currentEntry =
0168             sortedConsumers.ceilingEntry(required - got);
0169           // No consumer has used memory more than the remaining required memory.
0170           // Get the consumer of largest used memory.
0171           if (currentEntry == null) {
0172             currentEntry = sortedConsumers.lastEntry();
0173           }
0174           List<MemoryConsumer> cList = currentEntry.getValue();
0175           MemoryConsumer c = cList.get(cList.size() - 1);
0176           try {
0177             long released = c.spill(required - got, consumer);
0178             if (released > 0) {
0179               logger.debug("Task {} released {} from {} for {}", taskAttemptId,
0180                 Utils.bytesToString(released), c, consumer);
0181               got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode);
0182               if (got >= required) {
0183                 break;
0184               }
0185             } else {
0186               cList.remove(cList.size() - 1);
0187               if (cList.isEmpty()) {
0188                 sortedConsumers.remove(currentEntry.getKey());
0189               }
0190             }
0191           } catch (ClosedByInterruptException e) {
0192             // This called by user to kill a task (e.g: speculative task).
0193             logger.error("error while calling spill() on " + c, e);
0194             throw new RuntimeException(e.getMessage());
0195           } catch (IOException e) {
0196             logger.error("error while calling spill() on " + c, e);
0197             // checkstyle.off: RegexpSinglelineJava
0198             throw new SparkOutOfMemoryError("error while calling spill() on " + c + " : "
0199               + e.getMessage());
0200             // checkstyle.on: RegexpSinglelineJava
0201           }
0202         }
0203       }
0204 
0205       // call spill() on itself
0206       if (got < required) {
0207         try {
0208           long released = consumer.spill(required - got, consumer);
0209           if (released > 0) {
0210             logger.debug("Task {} released {} from itself ({})", taskAttemptId,
0211               Utils.bytesToString(released), consumer);
0212             got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode);
0213           }
0214         } catch (ClosedByInterruptException e) {
0215           // This called by user to kill a task (e.g: speculative task).
0216           logger.error("error while calling spill() on " + consumer, e);
0217           throw new RuntimeException(e.getMessage());
0218         } catch (IOException e) {
0219           logger.error("error while calling spill() on " + consumer, e);
0220           // checkstyle.off: RegexpSinglelineJava
0221           throw new SparkOutOfMemoryError("error while calling spill() on " + consumer + " : "
0222             + e.getMessage());
0223           // checkstyle.on: RegexpSinglelineJava
0224         }
0225       }
0226 
0227       consumers.add(consumer);
0228       logger.debug("Task {} acquired {} for {}", taskAttemptId, Utils.bytesToString(got), consumer);
0229       return got;
0230     }
0231   }
0232 
0233   /**
0234    * Release N bytes of execution memory for a MemoryConsumer.
0235    */
0236   public void releaseExecutionMemory(long size, MemoryConsumer consumer) {
0237     logger.debug("Task {} release {} from {}", taskAttemptId, Utils.bytesToString(size), consumer);
0238     memoryManager.releaseExecutionMemory(size, taskAttemptId, consumer.getMode());
0239   }
0240 
0241   /**
0242    * Dump the memory usage of all consumers.
0243    */
0244   public void showMemoryUsage() {
0245     logger.info("Memory used in task " + taskAttemptId);
0246     synchronized (this) {
0247       long memoryAccountedForByConsumers = 0;
0248       for (MemoryConsumer c: consumers) {
0249         long totalMemUsage = c.getUsed();
0250         memoryAccountedForByConsumers += totalMemUsage;
0251         if (totalMemUsage > 0) {
0252           logger.info("Acquired by " + c + ": " + Utils.bytesToString(totalMemUsage));
0253         }
0254       }
0255       long memoryNotAccountedFor =
0256         memoryManager.getExecutionMemoryUsageForTask(taskAttemptId) - memoryAccountedForByConsumers;
0257       logger.info(
0258         "{} bytes of memory were used by task {} but are not associated with specific consumers",
0259         memoryNotAccountedFor, taskAttemptId);
0260       logger.info(
0261         "{} bytes of memory are used for execution and {} bytes of memory are used for storage",
0262         memoryManager.executionMemoryUsed(), memoryManager.storageMemoryUsed());
0263     }
0264   }
0265 
0266   /**
0267    * Return the page size in bytes.
0268    */
0269   public long pageSizeBytes() {
0270     return memoryManager.pageSizeBytes();
0271   }
0272 
0273   /**
0274    * Allocate a block of memory that will be tracked in the MemoryManager's page table; this is
0275    * intended for allocating large blocks of Tungsten memory that will be shared between operators.
0276    *
0277    * Returns `null` if there was not enough memory to allocate the page. May return a page that
0278    * contains fewer bytes than requested, so callers should verify the size of returned pages.
0279    *
0280    * @throws TooLargePageException
0281    */
0282   public MemoryBlock allocatePage(long size, MemoryConsumer consumer) {
0283     assert(consumer != null);
0284     assert(consumer.getMode() == tungstenMemoryMode);
0285     if (size > MAXIMUM_PAGE_SIZE_BYTES) {
0286       throw new TooLargePageException(size);
0287     }
0288 
0289     long acquired = acquireExecutionMemory(size, consumer);
0290     if (acquired <= 0) {
0291       return null;
0292     }
0293 
0294     final int pageNumber;
0295     synchronized (this) {
0296       pageNumber = allocatedPages.nextClearBit(0);
0297       if (pageNumber >= PAGE_TABLE_SIZE) {
0298         releaseExecutionMemory(acquired, consumer);
0299         throw new IllegalStateException(
0300           "Have already allocated a maximum of " + PAGE_TABLE_SIZE + " pages");
0301       }
0302       allocatedPages.set(pageNumber);
0303     }
0304     MemoryBlock page = null;
0305     try {
0306       page = memoryManager.tungstenMemoryAllocator().allocate(acquired);
0307     } catch (OutOfMemoryError e) {
0308       logger.warn("Failed to allocate a page ({} bytes), try again.", acquired);
0309       // there is no enough memory actually, it means the actual free memory is smaller than
0310       // MemoryManager thought, we should keep the acquired memory.
0311       synchronized (this) {
0312         acquiredButNotUsed += acquired;
0313         allocatedPages.clear(pageNumber);
0314       }
0315       // this could trigger spilling to free some pages.
0316       return allocatePage(size, consumer);
0317     }
0318     page.pageNumber = pageNumber;
0319     pageTable[pageNumber] = page;
0320     if (logger.isTraceEnabled()) {
0321       logger.trace("Allocate page number {} ({} bytes)", pageNumber, acquired);
0322     }
0323     return page;
0324   }
0325 
0326   /**
0327    * Free a block of memory allocated via {@link TaskMemoryManager#allocatePage}.
0328    */
0329   public void freePage(MemoryBlock page, MemoryConsumer consumer) {
0330     assert (page.pageNumber != MemoryBlock.NO_PAGE_NUMBER) :
0331       "Called freePage() on memory that wasn't allocated with allocatePage()";
0332     assert (page.pageNumber != MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER) :
0333       "Called freePage() on a memory block that has already been freed";
0334     assert (page.pageNumber != MemoryBlock.FREED_IN_TMM_PAGE_NUMBER) :
0335             "Called freePage() on a memory block that has already been freed";
0336     assert(allocatedPages.get(page.pageNumber));
0337     pageTable[page.pageNumber] = null;
0338     synchronized (this) {
0339       allocatedPages.clear(page.pageNumber);
0340     }
0341     if (logger.isTraceEnabled()) {
0342       logger.trace("Freed page number {} ({} bytes)", page.pageNumber, page.size());
0343     }
0344     long pageSize = page.size();
0345     // Clear the page number before passing the block to the MemoryAllocator's free().
0346     // Doing this allows the MemoryAllocator to detect when a TaskMemoryManager-managed
0347     // page has been inappropriately directly freed without calling TMM.freePage().
0348     page.pageNumber = MemoryBlock.FREED_IN_TMM_PAGE_NUMBER;
0349     memoryManager.tungstenMemoryAllocator().free(page);
0350     releaseExecutionMemory(pageSize, consumer);
0351   }
0352 
0353   /**
0354    * Given a memory page and offset within that page, encode this address into a 64-bit long.
0355    * This address will remain valid as long as the corresponding page has not been freed.
0356    *
0357    * @param page a data page allocated by {@link TaskMemoryManager#allocatePage}/
0358    * @param offsetInPage an offset in this page which incorporates the base offset. In other words,
0359    *                     this should be the value that you would pass as the base offset into an
0360    *                     UNSAFE call (e.g. page.baseOffset() + something).
0361    * @return an encoded page address.
0362    */
0363   public long encodePageNumberAndOffset(MemoryBlock page, long offsetInPage) {
0364     if (tungstenMemoryMode == MemoryMode.OFF_HEAP) {
0365       // In off-heap mode, an offset is an absolute address that may require a full 64 bits to
0366       // encode. Due to our page size limitation, though, we can convert this into an offset that's
0367       // relative to the page's base offset; this relative offset will fit in 51 bits.
0368       offsetInPage -= page.getBaseOffset();
0369     }
0370     return encodePageNumberAndOffset(page.pageNumber, offsetInPage);
0371   }
0372 
0373   @VisibleForTesting
0374   public static long encodePageNumberAndOffset(int pageNumber, long offsetInPage) {
0375     assert (pageNumber >= 0) : "encodePageNumberAndOffset called with invalid page";
0376     return (((long) pageNumber) << OFFSET_BITS) | (offsetInPage & MASK_LONG_LOWER_51_BITS);
0377   }
0378 
0379   @VisibleForTesting
0380   public static int decodePageNumber(long pagePlusOffsetAddress) {
0381     return (int) (pagePlusOffsetAddress >>> OFFSET_BITS);
0382   }
0383 
0384   private static long decodeOffset(long pagePlusOffsetAddress) {
0385     return (pagePlusOffsetAddress & MASK_LONG_LOWER_51_BITS);
0386   }
0387 
0388   /**
0389    * Get the page associated with an address encoded by
0390    * {@link TaskMemoryManager#encodePageNumberAndOffset(MemoryBlock, long)}
0391    */
0392   public Object getPage(long pagePlusOffsetAddress) {
0393     if (tungstenMemoryMode == MemoryMode.ON_HEAP) {
0394       final int pageNumber = decodePageNumber(pagePlusOffsetAddress);
0395       assert (pageNumber >= 0 && pageNumber < PAGE_TABLE_SIZE);
0396       final MemoryBlock page = pageTable[pageNumber];
0397       assert (page != null);
0398       assert (page.getBaseObject() != null);
0399       return page.getBaseObject();
0400     } else {
0401       return null;
0402     }
0403   }
0404 
0405   /**
0406    * Get the offset associated with an address encoded by
0407    * {@link TaskMemoryManager#encodePageNumberAndOffset(MemoryBlock, long)}
0408    */
0409   public long getOffsetInPage(long pagePlusOffsetAddress) {
0410     final long offsetInPage = decodeOffset(pagePlusOffsetAddress);
0411     if (tungstenMemoryMode == MemoryMode.ON_HEAP) {
0412       return offsetInPage;
0413     } else {
0414       // In off-heap mode, an offset is an absolute address. In encodePageNumberAndOffset, we
0415       // converted the absolute address into a relative address. Here, we invert that operation:
0416       final int pageNumber = decodePageNumber(pagePlusOffsetAddress);
0417       assert (pageNumber >= 0 && pageNumber < PAGE_TABLE_SIZE);
0418       final MemoryBlock page = pageTable[pageNumber];
0419       assert (page != null);
0420       return page.getBaseOffset() + offsetInPage;
0421     }
0422   }
0423 
0424   /**
0425    * Clean up all allocated memory and pages. Returns the number of bytes freed. A non-zero return
0426    * value can be used to detect memory leaks.
0427    */
0428   public long cleanUpAllAllocatedMemory() {
0429     synchronized (this) {
0430       for (MemoryConsumer c: consumers) {
0431         if (c != null && c.getUsed() > 0) {
0432           // In case of failed task, it's normal to see leaked memory
0433           logger.debug("unreleased " + Utils.bytesToString(c.getUsed()) + " memory from " + c);
0434         }
0435       }
0436       consumers.clear();
0437 
0438       for (MemoryBlock page : pageTable) {
0439         if (page != null) {
0440           logger.debug("unreleased page: " + page + " in task " + taskAttemptId);
0441           page.pageNumber = MemoryBlock.FREED_IN_TMM_PAGE_NUMBER;
0442           memoryManager.tungstenMemoryAllocator().free(page);
0443         }
0444       }
0445       Arrays.fill(pageTable, null);
0446     }
0447 
0448     // release the memory that is not used by any consumer (acquired for pages in tungsten mode).
0449     memoryManager.releaseExecutionMemory(acquiredButNotUsed, taskAttemptId, tungstenMemoryMode);
0450 
0451     return memoryManager.releaseAllExecutionMemoryForTask(taskAttemptId);
0452   }
0453 
0454   /**
0455    * Returns the memory consumption, in bytes, for the current task.
0456    */
0457   public long getMemoryConsumptionForThisTask() {
0458     return memoryManager.getExecutionMemoryUsageForTask(taskAttemptId);
0459   }
0460 
0461   /**
0462    * Returns Tungsten memory mode
0463    */
0464   public MemoryMode getTungstenMemoryMode() {
0465     return tungstenMemoryMode;
0466   }
0467 }