Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.unsafe.map;
0019 
0020 import javax.annotation.Nullable;
0021 import java.io.File;
0022 import java.io.IOException;
0023 import java.util.Iterator;
0024 import java.util.LinkedList;
0025 
0026 import com.google.common.annotations.VisibleForTesting;
0027 import com.google.common.io.Closeables;
0028 import org.slf4j.Logger;
0029 import org.slf4j.LoggerFactory;
0030 
0031 import org.apache.spark.SparkEnv;
0032 import org.apache.spark.executor.ShuffleWriteMetrics;
0033 import org.apache.spark.memory.MemoryConsumer;
0034 import org.apache.spark.memory.SparkOutOfMemoryError;
0035 import org.apache.spark.memory.TaskMemoryManager;
0036 import org.apache.spark.serializer.SerializerManager;
0037 import org.apache.spark.storage.BlockManager;
0038 import org.apache.spark.unsafe.Platform;
0039 import org.apache.spark.unsafe.UnsafeAlignedOffset;
0040 import org.apache.spark.unsafe.array.ByteArrayMethods;
0041 import org.apache.spark.unsafe.array.LongArray;
0042 import org.apache.spark.unsafe.hash.Murmur3_x86_32;
0043 import org.apache.spark.unsafe.memory.MemoryBlock;
0044 import org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillReader;
0045 import org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillWriter;
0046 
0047 /**
0048  * An append-only hash map where keys and values are contiguous regions of bytes.
0049  *
0050  * This is backed by a power-of-2-sized hash table, using quadratic probing with triangular numbers,
0051  * which is guaranteed to exhaust the space.
0052  *
0053  * The map can support up to 2^29 keys. If the key cardinality is higher than this, you should
0054  * probably be using sorting instead of hashing for better cache locality.
0055  *
0056  * The key and values under the hood are stored together, in the following format:
0057  *   First uaoSize bytes: len(k) (key length in bytes) + len(v) (value length in bytes) + uaoSize
0058  *   Next uaoSize bytes: len(k)
0059  *   Next len(k) bytes: key data
0060  *   Next len(v) bytes: value data
0061  *   Last 8 bytes: pointer to next pair
0062  *
0063  * It means first uaoSize bytes store the entire record (key + value + uaoSize) length. This format
0064  * is compatible with {@link org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter},
0065  * so we can pass records from this map directly into the sorter to sort records in place.
0066  */
0067 public final class BytesToBytesMap extends MemoryConsumer {
0068 
0069   private static final Logger logger = LoggerFactory.getLogger(BytesToBytesMap.class);
0070 
0071   private static final HashMapGrowthStrategy growthStrategy = HashMapGrowthStrategy.DOUBLING;
0072 
0073   private final TaskMemoryManager taskMemoryManager;
0074 
0075   /**
0076    * A linked list for tracking all allocated data pages so that we can free all of our memory.
0077    */
0078   private final LinkedList<MemoryBlock> dataPages = new LinkedList<>();
0079 
0080   /**
0081    * The data page that will be used to store keys and values for new hashtable entries. When this
0082    * page becomes full, a new page will be allocated and this pointer will change to point to that
0083    * new page.
0084    */
0085   private MemoryBlock currentPage = null;
0086 
0087   /**
0088    * Offset into `currentPage` that points to the location where new data can be inserted into
0089    * the page. This does not incorporate the page's base offset.
0090    */
0091   private long pageCursor = 0;
0092 
0093   /**
0094    * The maximum number of keys that BytesToBytesMap supports. The hash table has to be
0095    * power-of-2-sized and its backing Java array can contain at most (1 &lt;&lt; 30) elements,
0096    * since that's the largest power-of-2 that's less than Integer.MAX_VALUE. We need two long array
0097    * entries per key, giving us a maximum capacity of (1 &lt;&lt; 29).
0098    */
0099   public static final int MAX_CAPACITY = (1 << 29);
0100 
0101   // This choice of page table size and page size means that we can address up to 500 gigabytes
0102   // of memory.
0103 
0104   /**
0105    * A single array to store the key and value.
0106    *
0107    * Position {@code 2 * i} in the array is used to track a pointer to the key at index {@code i},
0108    * while position {@code 2 * i + 1} in the array holds key's full 32-bit hashcode.
0109    */
0110   @Nullable private LongArray longArray;
0111   // TODO: we're wasting 32 bits of space here; we can probably store fewer bits of the hashcode
0112   // and exploit word-alignment to use fewer bits to hold the address.  This might let us store
0113   // only one long per map entry, increasing the chance that this array will fit in cache at the
0114   // expense of maybe performing more lookups if we have hash collisions.  Say that we stored only
0115   // 27 bits of the hashcode and 37 bits of the address.  37 bits is enough to address 1 terabyte
0116   // of RAM given word-alignment.  If we use 13 bits of this for our page table, that gives us a
0117   // maximum page size of 2^24 * 8 = ~134 megabytes per page. This change will require us to store
0118   // full base addresses in the page table for off-heap mode so that we can reconstruct the full
0119   // absolute memory addresses.
0120 
0121   /**
0122    * Whether or not the longArray can grow. We will not insert more elements if it's false.
0123    */
0124   private boolean canGrowArray = true;
0125 
0126   private final double loadFactor;
0127 
0128   /**
0129    * The size of the data pages that hold key and value data. Map entries cannot span multiple
0130    * pages, so this limits the maximum entry size.
0131    */
0132   private final long pageSizeBytes;
0133 
0134   /**
0135    * Number of keys defined in the map.
0136    */
0137   private int numKeys;
0138 
0139   /**
0140    * Number of values defined in the map. A key could have multiple values.
0141    */
0142   private int numValues;
0143 
0144   /**
0145    * The map will be expanded once the number of keys exceeds this threshold.
0146    */
0147   private int growthThreshold;
0148 
0149   /**
0150    * Mask for truncating hashcodes so that they do not exceed the long array's size.
0151    * This is a strength reduction optimization; we're essentially performing a modulus operation,
0152    * but doing so with a bitmask because this is a power-of-2-sized hash map.
0153    */
0154   private int mask;
0155 
0156   /**
0157    * Return value of {@link BytesToBytesMap#lookup(Object, long, int)}.
0158    */
0159   private final Location loc;
0160 
0161   private long numProbes = 0L;
0162 
0163   private long numKeyLookups = 0L;
0164 
0165   private long peakMemoryUsedBytes = 0L;
0166 
0167   private final int initialCapacity;
0168 
0169   private final BlockManager blockManager;
0170   private final SerializerManager serializerManager;
0171   private volatile MapIterator destructiveIterator = null;
0172   private LinkedList<UnsafeSorterSpillWriter> spillWriters = new LinkedList<>();
0173 
0174   public BytesToBytesMap(
0175       TaskMemoryManager taskMemoryManager,
0176       BlockManager blockManager,
0177       SerializerManager serializerManager,
0178       int initialCapacity,
0179       double loadFactor,
0180       long pageSizeBytes) {
0181     super(taskMemoryManager, pageSizeBytes, taskMemoryManager.getTungstenMemoryMode());
0182     this.taskMemoryManager = taskMemoryManager;
0183     this.blockManager = blockManager;
0184     this.serializerManager = serializerManager;
0185     this.loadFactor = loadFactor;
0186     this.loc = new Location();
0187     this.pageSizeBytes = pageSizeBytes;
0188     if (initialCapacity <= 0) {
0189       throw new IllegalArgumentException("Initial capacity must be greater than 0");
0190     }
0191     if (initialCapacity > MAX_CAPACITY) {
0192       throw new IllegalArgumentException(
0193         "Initial capacity " + initialCapacity + " exceeds maximum capacity of " + MAX_CAPACITY);
0194     }
0195     if (pageSizeBytes > TaskMemoryManager.MAXIMUM_PAGE_SIZE_BYTES) {
0196       throw new IllegalArgumentException("Page size " + pageSizeBytes + " cannot exceed " +
0197         TaskMemoryManager.MAXIMUM_PAGE_SIZE_BYTES);
0198     }
0199     this.initialCapacity = initialCapacity;
0200     allocate(initialCapacity);
0201   }
0202 
0203   public BytesToBytesMap(
0204       TaskMemoryManager taskMemoryManager,
0205       int initialCapacity,
0206       long pageSizeBytes) {
0207     this(
0208       taskMemoryManager,
0209       SparkEnv.get() != null ? SparkEnv.get().blockManager() :  null,
0210       SparkEnv.get() != null ? SparkEnv.get().serializerManager() :  null,
0211       initialCapacity,
0212       // In order to re-use the longArray for sorting, the load factor cannot be larger than 0.5.
0213       0.5,
0214       pageSizeBytes);
0215   }
0216 
0217   /**
0218    * Returns the number of keys defined in the map.
0219    */
0220   public int numKeys() { return numKeys; }
0221 
0222   /**
0223    * Returns the number of values defined in the map. A key could have multiple values.
0224    */
0225   public int numValues() { return numValues; }
0226 
0227   public final class MapIterator implements Iterator<Location> {
0228 
0229     private int numRecords;
0230     private final Location loc;
0231 
0232     private MemoryBlock currentPage = null;
0233     private int recordsInPage = 0;
0234     private Object pageBaseObject;
0235     private long offsetInPage;
0236 
0237     // If this iterator destructive or not. When it is true, it frees each page as it moves onto
0238     // next one.
0239     private boolean destructive = false;
0240     private UnsafeSorterSpillReader reader = null;
0241 
0242     private MapIterator(int numRecords, Location loc, boolean destructive) {
0243       this.numRecords = numRecords;
0244       this.loc = loc;
0245       this.destructive = destructive;
0246       if (destructive) {
0247         destructiveIterator = this;
0248         // longArray will not be used anymore if destructive is true, release it now.
0249         if (longArray != null) {
0250           freeArray(longArray);
0251           longArray = null;
0252         }
0253       }
0254     }
0255 
0256     private void advanceToNextPage() {
0257       // SPARK-26265: We will first lock this `MapIterator` and then `TaskMemoryManager` when going
0258       // to free a memory page by calling `freePage`. At the same time, it is possibly that another
0259       // memory consumer first locks `TaskMemoryManager` and then this `MapIterator` when it
0260       // acquires memory and causes spilling on this `MapIterator`. To avoid deadlock here, we keep
0261       // reference to the page to free and free it after releasing the lock of `MapIterator`.
0262       MemoryBlock pageToFree = null;
0263 
0264       try {
0265         synchronized (this) {
0266           int nextIdx = dataPages.indexOf(currentPage) + 1;
0267           if (destructive && currentPage != null) {
0268             dataPages.remove(currentPage);
0269             pageToFree = currentPage;
0270             nextIdx--;
0271           }
0272           if (dataPages.size() > nextIdx) {
0273             currentPage = dataPages.get(nextIdx);
0274             pageBaseObject = currentPage.getBaseObject();
0275             offsetInPage = currentPage.getBaseOffset();
0276             recordsInPage = UnsafeAlignedOffset.getSize(pageBaseObject, offsetInPage);
0277             offsetInPage += UnsafeAlignedOffset.getUaoSize();
0278           } else {
0279             currentPage = null;
0280             if (reader != null) {
0281               handleFailedDelete();
0282             }
0283             try {
0284               Closeables.close(reader, /* swallowIOException = */ false);
0285               reader = spillWriters.getFirst().getReader(serializerManager);
0286               recordsInPage = -1;
0287             } catch (IOException e) {
0288               // Scala iterator does not handle exception
0289               Platform.throwException(e);
0290             }
0291           }
0292         }
0293       } finally {
0294         if (pageToFree != null) {
0295           freePage(pageToFree);
0296         }
0297       }
0298     }
0299 
0300     @Override
0301     public boolean hasNext() {
0302       if (numRecords == 0) {
0303         if (reader != null) {
0304           handleFailedDelete();
0305         }
0306       }
0307       return numRecords > 0;
0308     }
0309 
0310     @Override
0311     public Location next() {
0312       if (recordsInPage == 0) {
0313         advanceToNextPage();
0314       }
0315       numRecords--;
0316       if (currentPage != null) {
0317         int totalLength = UnsafeAlignedOffset.getSize(pageBaseObject, offsetInPage);
0318         loc.with(currentPage, offsetInPage);
0319         // [total size] [key size] [key] [value] [pointer to next]
0320         offsetInPage += UnsafeAlignedOffset.getUaoSize() + totalLength + 8;
0321         recordsInPage --;
0322         return loc;
0323       } else {
0324         assert(reader != null);
0325         if (!reader.hasNext()) {
0326           advanceToNextPage();
0327         }
0328         try {
0329           reader.loadNext();
0330         } catch (IOException e) {
0331           try {
0332             reader.close();
0333           } catch(IOException e2) {
0334             logger.error("Error while closing spill reader", e2);
0335           }
0336           // Scala iterator does not handle exception
0337           Platform.throwException(e);
0338         }
0339         loc.with(reader.getBaseObject(), reader.getBaseOffset(), reader.getRecordLength());
0340         return loc;
0341       }
0342     }
0343 
0344     public synchronized long spill(long numBytes) throws IOException {
0345       if (!destructive || dataPages.size() == 1) {
0346         return 0L;
0347       }
0348 
0349       updatePeakMemoryUsed();
0350 
0351       // TODO: use existing ShuffleWriteMetrics
0352       ShuffleWriteMetrics writeMetrics = new ShuffleWriteMetrics();
0353 
0354       long released = 0L;
0355       while (dataPages.size() > 0) {
0356         MemoryBlock block = dataPages.getLast();
0357         // The currentPage is used, cannot be released
0358         if (block == currentPage) {
0359           break;
0360         }
0361 
0362         Object base = block.getBaseObject();
0363         long offset = block.getBaseOffset();
0364         int numRecords = UnsafeAlignedOffset.getSize(base, offset);
0365         int uaoSize = UnsafeAlignedOffset.getUaoSize();
0366         offset += uaoSize;
0367         final UnsafeSorterSpillWriter writer =
0368                 new UnsafeSorterSpillWriter(blockManager, 32 * 1024, writeMetrics, numRecords);
0369         while (numRecords > 0) {
0370           int length = UnsafeAlignedOffset.getSize(base, offset);
0371           writer.write(base, offset + uaoSize, length, 0);
0372           offset += uaoSize + length + 8;
0373           numRecords--;
0374         }
0375         writer.close();
0376         spillWriters.add(writer);
0377 
0378         dataPages.removeLast();
0379         released += block.size();
0380         freePage(block);
0381 
0382         if (released >= numBytes) {
0383           break;
0384         }
0385       }
0386 
0387       return released;
0388     }
0389 
0390     @Override
0391     public void remove() {
0392       throw new UnsupportedOperationException();
0393     }
0394 
0395     private void handleFailedDelete() {
0396       // remove the spill file from disk
0397       File file = spillWriters.removeFirst().getFile();
0398       if (file != null && file.exists() && !file.delete()) {
0399         logger.error("Was unable to delete spill file {}", file.getAbsolutePath());
0400       }
0401     }
0402   }
0403 
0404   /**
0405    * Returns an iterator for iterating over the entries of this map.
0406    *
0407    * For efficiency, all calls to `next()` will return the same {@link Location} object.
0408    *
0409    * If any other lookups or operations are performed on this map while iterating over it, including
0410    * `lookup()`, the behavior of the returned iterator is undefined.
0411    */
0412   public MapIterator iterator() {
0413     return new MapIterator(numValues, loc, false);
0414   }
0415 
0416   /**
0417    * Returns a thread safe iterator that iterates of the entries of this map.
0418    */
0419   public MapIterator safeIterator() {
0420     return new MapIterator(numValues, new Location(), false);
0421   }
0422 
0423   /**
0424    * Returns a destructive iterator for iterating over the entries of this map. It frees each page
0425    * as it moves onto next one. Notice: it is illegal to call any method on the map after
0426    * `destructiveIterator()` has been called.
0427    *
0428    * For efficiency, all calls to `next()` will return the same {@link Location} object.
0429    *
0430    * If any other lookups or operations are performed on this map while iterating over it, including
0431    * `lookup()`, the behavior of the returned iterator is undefined.
0432    */
0433   public MapIterator destructiveIterator() {
0434     updatePeakMemoryUsed();
0435     return new MapIterator(numValues, loc, true);
0436   }
0437 
0438   /**
0439    * Looks up a key, and return a {@link Location} handle that can be used to test existence
0440    * and read/write values.
0441    *
0442    * This function always return the same {@link Location} instance to avoid object allocation.
0443    */
0444   public Location lookup(Object keyBase, long keyOffset, int keyLength) {
0445     safeLookup(keyBase, keyOffset, keyLength, loc,
0446       Murmur3_x86_32.hashUnsafeWords(keyBase, keyOffset, keyLength, 42));
0447     return loc;
0448   }
0449 
0450   /**
0451    * Looks up a key, and return a {@link Location} handle that can be used to test existence
0452    * and read/write values.
0453    *
0454    * This function always return the same {@link Location} instance to avoid object allocation.
0455    */
0456   public Location lookup(Object keyBase, long keyOffset, int keyLength, int hash) {
0457     safeLookup(keyBase, keyOffset, keyLength, loc, hash);
0458     return loc;
0459   }
0460 
0461   /**
0462    * Looks up a key, and saves the result in provided `loc`.
0463    *
0464    * This is a thread-safe version of `lookup`, could be used by multiple threads.
0465    */
0466   public void safeLookup(Object keyBase, long keyOffset, int keyLength, Location loc, int hash) {
0467     assert(longArray != null);
0468 
0469     numKeyLookups++;
0470 
0471     int pos = hash & mask;
0472     int step = 1;
0473     while (true) {
0474       numProbes++;
0475       if (longArray.get(pos * 2) == 0) {
0476         // This is a new key.
0477         loc.with(pos, hash, false);
0478         return;
0479       } else {
0480         long stored = longArray.get(pos * 2 + 1);
0481         if ((int) (stored) == hash) {
0482           // Full hash code matches.  Let's compare the keys for equality.
0483           loc.with(pos, hash, true);
0484           if (loc.getKeyLength() == keyLength) {
0485             final boolean areEqual = ByteArrayMethods.arrayEquals(
0486               keyBase,
0487               keyOffset,
0488               loc.getKeyBase(),
0489               loc.getKeyOffset(),
0490               keyLength
0491             );
0492             if (areEqual) {
0493               return;
0494             }
0495           }
0496         }
0497       }
0498       pos = (pos + step) & mask;
0499       step++;
0500     }
0501   }
0502 
0503   /**
0504    * Handle returned by {@link BytesToBytesMap#lookup(Object, long, int)} function.
0505    */
0506   public final class Location {
0507     /** An index into the hash map's Long array */
0508     private int pos;
0509     /** True if this location points to a position where a key is defined, false otherwise */
0510     private boolean isDefined;
0511     /**
0512      * The hashcode of the most recent key passed to
0513      * {@link BytesToBytesMap#lookup(Object, long, int, int)}. Caching this hashcode here allows us
0514      * to avoid re-hashing the key when storing a value for that key.
0515      */
0516     private int keyHashcode;
0517     private Object baseObject;  // the base object for key and value
0518     private long keyOffset;
0519     private int keyLength;
0520     private long valueOffset;
0521     private int valueLength;
0522 
0523     /**
0524      * Memory page containing the record. Only set if created by {@link BytesToBytesMap#iterator()}.
0525      */
0526     @Nullable private MemoryBlock memoryPage;
0527 
0528     private void updateAddressesAndSizes(long fullKeyAddress) {
0529       updateAddressesAndSizes(
0530         taskMemoryManager.getPage(fullKeyAddress),
0531         taskMemoryManager.getOffsetInPage(fullKeyAddress));
0532     }
0533 
0534     private void updateAddressesAndSizes(final Object base, long offset) {
0535       baseObject = base;
0536       final int totalLength = UnsafeAlignedOffset.getSize(base, offset);
0537       int uaoSize = UnsafeAlignedOffset.getUaoSize();
0538       offset += uaoSize;
0539       keyLength = UnsafeAlignedOffset.getSize(base, offset);
0540       offset += uaoSize;
0541       keyOffset = offset;
0542       valueOffset = offset + keyLength;
0543       valueLength = totalLength - keyLength - uaoSize;
0544     }
0545 
0546     private Location with(int pos, int keyHashcode, boolean isDefined) {
0547       assert(longArray != null);
0548       this.pos = pos;
0549       this.isDefined = isDefined;
0550       this.keyHashcode = keyHashcode;
0551       if (isDefined) {
0552         final long fullKeyAddress = longArray.get(pos * 2);
0553         updateAddressesAndSizes(fullKeyAddress);
0554       }
0555       return this;
0556     }
0557 
0558     private Location with(MemoryBlock page, long offsetInPage) {
0559       this.isDefined = true;
0560       this.memoryPage = page;
0561       updateAddressesAndSizes(page.getBaseObject(), offsetInPage);
0562       return this;
0563     }
0564 
0565     /**
0566      * This is only used for spilling
0567      */
0568     private Location with(Object base, long offset, int length) {
0569       this.isDefined = true;
0570       this.memoryPage = null;
0571       baseObject = base;
0572       int uaoSize = UnsafeAlignedOffset.getUaoSize();
0573       keyOffset = offset + uaoSize;
0574       keyLength = UnsafeAlignedOffset.getSize(base, offset);
0575       valueOffset = offset + uaoSize + keyLength;
0576       valueLength = length - uaoSize - keyLength;
0577       return this;
0578     }
0579 
0580     /**
0581      * Find the next pair that has the same key as current one.
0582      */
0583     public boolean nextValue() {
0584       assert isDefined;
0585       long nextAddr = Platform.getLong(baseObject, valueOffset + valueLength);
0586       if (nextAddr == 0) {
0587         return false;
0588       } else {
0589         updateAddressesAndSizes(nextAddr);
0590         return true;
0591       }
0592     }
0593 
0594     /**
0595      * Returns the memory page that contains the current record.
0596      * This is only valid if this is returned by {@link BytesToBytesMap#iterator()}.
0597      */
0598     public MemoryBlock getMemoryPage() {
0599       return this.memoryPage;
0600     }
0601 
0602     /**
0603      * Returns true if the key is defined at this position, and false otherwise.
0604      */
0605     public boolean isDefined() {
0606       return isDefined;
0607     }
0608 
0609     /**
0610      * Returns the base object for key.
0611      */
0612     public Object getKeyBase() {
0613       assert (isDefined);
0614       return baseObject;
0615     }
0616 
0617     /**
0618      * Returns the offset for key.
0619      */
0620     public long getKeyOffset() {
0621       assert (isDefined);
0622       return keyOffset;
0623     }
0624 
0625     /**
0626      * Returns the base object for value.
0627      */
0628     public Object getValueBase() {
0629       assert (isDefined);
0630       return baseObject;
0631     }
0632 
0633     /**
0634      * Returns the offset for value.
0635      */
0636     public long getValueOffset() {
0637       assert (isDefined);
0638       return valueOffset;
0639     }
0640 
0641     /**
0642      * Returns the length of the key defined at this position.
0643      * Unspecified behavior if the key is not defined.
0644      */
0645     public int getKeyLength() {
0646       assert (isDefined);
0647       return keyLength;
0648     }
0649 
0650     /**
0651      * Returns the length of the value defined at this position.
0652      * Unspecified behavior if the key is not defined.
0653      */
0654     public int getValueLength() {
0655       assert (isDefined);
0656       return valueLength;
0657     }
0658 
0659     /**
0660      * Append a new value for the key. This method could be called multiple times for a given key.
0661      * The return value indicates whether the put succeeded or whether it failed because additional
0662      * memory could not be acquired.
0663      * <p>
0664      * It is only valid to call this method immediately after calling `lookup()` using the same key.
0665      * </p>
0666      * <p>
0667      * The key and value must be word-aligned (that is, their sizes must be a multiple of 8).
0668      * </p>
0669      * <p>
0670      * After calling this method, calls to `get[Key|Value]Address()` and `get[Key|Value]Length`
0671      * will return information on the data stored by this `append` call.
0672      * </p>
0673      * <p>
0674      * As an example usage, here's the proper way to store a new key:
0675      * </p>
0676      * <pre>
0677      *   Location loc = map.lookup(keyBase, keyOffset, keyLength);
0678      *   if (!loc.isDefined()) {
0679      *     if (!loc.append(keyBase, keyOffset, keyLength, ...)) {
0680      *       // handle failure to grow map (by spilling, for example)
0681      *     }
0682      *   }
0683      * </pre>
0684      * <p>
0685      * Unspecified behavior if the key is not defined.
0686      * </p>
0687      *
0688      * @return true if the put() was successful and false if the put() failed because memory could
0689      *         not be acquired.
0690      */
0691     public boolean append(Object kbase, long koff, int klen, Object vbase, long voff, int vlen) {
0692       assert (klen % 8 == 0);
0693       assert (vlen % 8 == 0);
0694       assert (longArray != null);
0695 
0696       // We should not increase number of keys to be MAX_CAPACITY. The usage pattern of this map is
0697       // lookup + append. If we append key until the number of keys to be MAX_CAPACITY, next time
0698       // the call of lookup will hang forever because it cannot find an empty slot.
0699       if (numKeys == MAX_CAPACITY - 1
0700         // The map could be reused from last spill (because of no enough memory to grow),
0701         // then we don't try to grow again if hit the `growthThreshold`.
0702         || !canGrowArray && numKeys >= growthThreshold) {
0703         return false;
0704       }
0705 
0706       // Here, we'll copy the data into our data pages. Because we only store a relative offset from
0707       // the key address instead of storing the absolute address of the value, the key and value
0708       // must be stored in the same memory page.
0709       // (total length) (key length) (key) (value) (8 byte pointer to next value)
0710       int uaoSize = UnsafeAlignedOffset.getUaoSize();
0711       final long recordLength = (2L * uaoSize) + klen + vlen + 8;
0712       if (currentPage == null || currentPage.size() - pageCursor < recordLength) {
0713         if (!acquireNewPage(recordLength + uaoSize)) {
0714           return false;
0715         }
0716       }
0717 
0718       // --- Append the key and value data to the current data page --------------------------------
0719       final Object base = currentPage.getBaseObject();
0720       long offset = currentPage.getBaseOffset() + pageCursor;
0721       final long recordOffset = offset;
0722       UnsafeAlignedOffset.putSize(base, offset, klen + vlen + uaoSize);
0723       UnsafeAlignedOffset.putSize(base, offset + uaoSize, klen);
0724       offset += (2L * uaoSize);
0725       Platform.copyMemory(kbase, koff, base, offset, klen);
0726       offset += klen;
0727       Platform.copyMemory(vbase, voff, base, offset, vlen);
0728       offset += vlen;
0729       // put this value at the beginning of the list
0730       Platform.putLong(base, offset, isDefined ? longArray.get(pos * 2) : 0);
0731 
0732       // --- Update bookkeeping data structures ----------------------------------------------------
0733       offset = currentPage.getBaseOffset();
0734       UnsafeAlignedOffset.putSize(base, offset, UnsafeAlignedOffset.getSize(base, offset) + 1);
0735       pageCursor += recordLength;
0736       final long storedKeyAddress = taskMemoryManager.encodePageNumberAndOffset(
0737         currentPage, recordOffset);
0738       longArray.set(pos * 2, storedKeyAddress);
0739       updateAddressesAndSizes(storedKeyAddress);
0740       numValues++;
0741       if (!isDefined) {
0742         numKeys++;
0743         longArray.set(pos * 2 + 1, keyHashcode);
0744         isDefined = true;
0745 
0746         // We use two array entries per key, so the array size is twice the capacity.
0747         // We should compare the current capacity of the array, instead of its size.
0748         if (numKeys >= growthThreshold && longArray.size() / 2 < MAX_CAPACITY) {
0749           try {
0750             growAndRehash();
0751           } catch (SparkOutOfMemoryError oom) {
0752             canGrowArray = false;
0753           }
0754         }
0755       }
0756       return true;
0757     }
0758   }
0759 
0760   /**
0761    * Acquire a new page from the memory manager.
0762    * @return whether there is enough space to allocate the new page.
0763    */
0764   private boolean acquireNewPage(long required) {
0765     try {
0766       currentPage = allocatePage(required);
0767     } catch (SparkOutOfMemoryError e) {
0768       return false;
0769     }
0770     dataPages.add(currentPage);
0771     UnsafeAlignedOffset.putSize(currentPage.getBaseObject(), currentPage.getBaseOffset(), 0);
0772     pageCursor = UnsafeAlignedOffset.getUaoSize();
0773     return true;
0774   }
0775 
0776   @Override
0777   public long spill(long size, MemoryConsumer trigger) throws IOException {
0778     if (trigger != this && destructiveIterator != null) {
0779       return destructiveIterator.spill(size);
0780     }
0781     return 0L;
0782   }
0783 
0784   /**
0785    * Allocate new data structures for this map. When calling this outside of the constructor,
0786    * make sure to keep references to the old data structures so that you can free them.
0787    *
0788    * @param capacity the new map capacity
0789    */
0790   private void allocate(int capacity) {
0791     assert (capacity >= 0);
0792     capacity = Math.max((int) Math.min(MAX_CAPACITY, ByteArrayMethods.nextPowerOf2(capacity)), 64);
0793     assert (capacity <= MAX_CAPACITY);
0794     longArray = allocateArray(capacity * 2L);
0795     longArray.zeroOut();
0796 
0797     this.growthThreshold = (int) (capacity * loadFactor);
0798     this.mask = capacity - 1;
0799   }
0800 
0801   /**
0802    * Free all allocated memory associated with this map, including the storage for keys and values
0803    * as well as the hash map array itself.
0804    *
0805    * This method is idempotent and can be called multiple times.
0806    */
0807   public void free() {
0808     updatePeakMemoryUsed();
0809     if (longArray != null) {
0810       freeArray(longArray);
0811       longArray = null;
0812     }
0813     Iterator<MemoryBlock> dataPagesIterator = dataPages.iterator();
0814     while (dataPagesIterator.hasNext()) {
0815       MemoryBlock dataPage = dataPagesIterator.next();
0816       dataPagesIterator.remove();
0817       freePage(dataPage);
0818     }
0819     assert(dataPages.isEmpty());
0820 
0821     while (!spillWriters.isEmpty()) {
0822       File file = spillWriters.removeFirst().getFile();
0823       if (file != null && file.exists()) {
0824         if (!file.delete()) {
0825           logger.error("Was unable to delete spill file {}", file.getAbsolutePath());
0826         }
0827       }
0828     }
0829   }
0830 
0831   public TaskMemoryManager getTaskMemoryManager() {
0832     return taskMemoryManager;
0833   }
0834 
0835   public long getPageSizeBytes() {
0836     return pageSizeBytes;
0837   }
0838 
0839   /**
0840    * Returns the total amount of memory, in bytes, consumed by this map's managed structures.
0841    */
0842   public long getTotalMemoryConsumption() {
0843     long totalDataPagesSize = 0L;
0844     for (MemoryBlock dataPage : dataPages) {
0845       totalDataPagesSize += dataPage.size();
0846     }
0847     return totalDataPagesSize + ((longArray != null) ? longArray.memoryBlock().size() : 0L);
0848   }
0849 
0850   private void updatePeakMemoryUsed() {
0851     long mem = getTotalMemoryConsumption();
0852     if (mem > peakMemoryUsedBytes) {
0853       peakMemoryUsedBytes = mem;
0854     }
0855   }
0856 
0857   /**
0858    * Return the peak memory used so far, in bytes.
0859    */
0860   public long getPeakMemoryUsedBytes() {
0861     updatePeakMemoryUsed();
0862     return peakMemoryUsedBytes;
0863   }
0864 
0865   /**
0866    * Returns the average number of probes per key lookup.
0867    */
0868   public double getAvgHashProbeBucketListIterations() {
0869     return (1.0 * numProbes) / numKeyLookups;
0870   }
0871 
0872   @VisibleForTesting
0873   public int getNumDataPages() {
0874     return dataPages.size();
0875   }
0876 
0877   /**
0878    * Returns the underline long[] of longArray.
0879    */
0880   public LongArray getArray() {
0881     assert(longArray != null);
0882     return longArray;
0883   }
0884 
0885   /**
0886    * Reset this map to initialized state.
0887    */
0888   public void reset() {
0889     updatePeakMemoryUsed();
0890     numKeys = 0;
0891     numValues = 0;
0892     freeArray(longArray);
0893     longArray = null;
0894     while (dataPages.size() > 0) {
0895       MemoryBlock dataPage = dataPages.removeLast();
0896       freePage(dataPage);
0897     }
0898     allocate(initialCapacity);
0899     canGrowArray = true;
0900     currentPage = null;
0901     pageCursor = 0;
0902   }
0903 
0904   /**
0905    * Grows the size of the hash table and re-hash everything.
0906    */
0907   @VisibleForTesting
0908   void growAndRehash() {
0909     assert(longArray != null);
0910 
0911     // Store references to the old data structures to be used when we re-hash
0912     final LongArray oldLongArray = longArray;
0913     final int oldCapacity = (int) oldLongArray.size() / 2;
0914 
0915     // Allocate the new data structures
0916     allocate(Math.min(growthStrategy.nextCapacity(oldCapacity), MAX_CAPACITY));
0917 
0918     // Re-mask (we don't recompute the hashcode because we stored all 32 bits of it)
0919     for (int i = 0; i < oldLongArray.size(); i += 2) {
0920       final long keyPointer = oldLongArray.get(i);
0921       if (keyPointer == 0) {
0922         continue;
0923       }
0924       final int hashcode = (int) oldLongArray.get(i + 1);
0925       int newPos = hashcode & mask;
0926       int step = 1;
0927       while (longArray.get(newPos * 2) != 0) {
0928         newPos = (newPos + step) & mask;
0929         step++;
0930       }
0931       longArray.set(newPos * 2, keyPointer);
0932       longArray.set(newPos * 2 + 1, hashcode);
0933     }
0934     freeArray(oldLongArray);
0935   }
0936 }