Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.memory;
0019 
0020 import java.io.IOException;
0021 
0022 import org.apache.spark.unsafe.array.LongArray;
0023 import org.apache.spark.unsafe.memory.MemoryBlock;
0024 
0025 /**
0026  * A memory consumer of {@link TaskMemoryManager} that supports spilling.
0027  *
0028  * Note: this only supports allocation / spilling of Tungsten memory.
0029  */
0030 public abstract class MemoryConsumer {
0031 
0032   protected final TaskMemoryManager taskMemoryManager;
0033   private final long pageSize;
0034   private final MemoryMode mode;
0035   protected long used;
0036 
0037   protected MemoryConsumer(TaskMemoryManager taskMemoryManager, long pageSize, MemoryMode mode) {
0038     this.taskMemoryManager = taskMemoryManager;
0039     this.pageSize = pageSize;
0040     this.mode = mode;
0041   }
0042 
0043   protected MemoryConsumer(TaskMemoryManager taskMemoryManager) {
0044     this(taskMemoryManager, taskMemoryManager.pageSizeBytes(), MemoryMode.ON_HEAP);
0045   }
0046 
0047   /**
0048    * Returns the memory mode, {@link MemoryMode#ON_HEAP} or {@link MemoryMode#OFF_HEAP}.
0049    */
0050   public MemoryMode getMode() {
0051     return mode;
0052   }
0053 
0054   /**
0055    * Returns the size of used memory in bytes.
0056    */
0057   public long getUsed() {
0058     return used;
0059   }
0060 
0061   /**
0062    * Force spill during building.
0063    */
0064   public void spill() throws IOException {
0065     spill(Long.MAX_VALUE, this);
0066   }
0067 
0068   /**
0069    * Spill some data to disk to release memory, which will be called by TaskMemoryManager
0070    * when there is not enough memory for the task.
0071    *
0072    * This should be implemented by subclass.
0073    *
0074    * Note: In order to avoid possible deadlock, should not call acquireMemory() from spill().
0075    *
0076    * Note: today, this only frees Tungsten-managed pages.
0077    *
0078    * @param size the amount of memory should be released
0079    * @param trigger the MemoryConsumer that trigger this spilling
0080    * @return the amount of released memory in bytes
0081    */
0082   public abstract long spill(long size, MemoryConsumer trigger) throws IOException;
0083 
0084   /**
0085    * Allocates a LongArray of `size`. Note that this method may throw `SparkOutOfMemoryError`
0086    * if Spark doesn't have enough memory for this allocation, or throw `TooLargePageException`
0087    * if this `LongArray` is too large to fit in a single page. The caller side should take care of
0088    * these two exceptions, or make sure the `size` is small enough that won't trigger exceptions.
0089    *
0090    * @throws SparkOutOfMemoryError
0091    * @throws TooLargePageException
0092    */
0093   public LongArray allocateArray(long size) {
0094     long required = size * 8L;
0095     MemoryBlock page = taskMemoryManager.allocatePage(required, this);
0096     if (page == null || page.size() < required) {
0097       throwOom(page, required);
0098     }
0099     used += required;
0100     return new LongArray(page);
0101   }
0102 
0103   /**
0104    * Frees a LongArray.
0105    */
0106   public void freeArray(LongArray array) {
0107     freePage(array.memoryBlock());
0108   }
0109 
0110   /**
0111    * Allocate a memory block with at least `required` bytes.
0112    *
0113    * @throws SparkOutOfMemoryError
0114    */
0115   protected MemoryBlock allocatePage(long required) {
0116     MemoryBlock page = taskMemoryManager.allocatePage(Math.max(pageSize, required), this);
0117     if (page == null || page.size() < required) {
0118       throwOom(page, required);
0119     }
0120     used += page.size();
0121     return page;
0122   }
0123 
0124   /**
0125    * Free a memory block.
0126    */
0127   protected void freePage(MemoryBlock page) {
0128     used -= page.size();
0129     taskMemoryManager.freePage(page, this);
0130   }
0131 
0132   /**
0133    * Allocates memory of `size`.
0134    */
0135   public long acquireMemory(long size) {
0136     long granted = taskMemoryManager.acquireExecutionMemory(size, this);
0137     used += granted;
0138     return granted;
0139   }
0140 
0141   /**
0142    * Release N bytes of memory.
0143    */
0144   public void freeMemory(long size) {
0145     taskMemoryManager.releaseExecutionMemory(size, this);
0146     used -= size;
0147   }
0148 
0149   private void throwOom(final MemoryBlock page, final long required) {
0150     long got = 0;
0151     if (page != null) {
0152       got = page.size();
0153       taskMemoryManager.freePage(page, this);
0154     }
0155     taskMemoryManager.showMemoryUsage();
0156     // checkstyle.off: RegexpSinglelineJava
0157     throw new SparkOutOfMemoryError("Unable to acquire " + required + " bytes of memory, got " +
0158       got);
0159     // checkstyle.on: RegexpSinglelineJava
0160   }
0161 }