0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.memory;
0019
0020 import java.io.IOException;
0021
0022 import org.apache.spark.unsafe.array.LongArray;
0023 import org.apache.spark.unsafe.memory.MemoryBlock;
0024
0025
0026
0027
0028
0029
0030 public abstract class MemoryConsumer {
0031
0032 protected final TaskMemoryManager taskMemoryManager;
0033 private final long pageSize;
0034 private final MemoryMode mode;
0035 protected long used;
0036
0037 protected MemoryConsumer(TaskMemoryManager taskMemoryManager, long pageSize, MemoryMode mode) {
0038 this.taskMemoryManager = taskMemoryManager;
0039 this.pageSize = pageSize;
0040 this.mode = mode;
0041 }
0042
0043 protected MemoryConsumer(TaskMemoryManager taskMemoryManager) {
0044 this(taskMemoryManager, taskMemoryManager.pageSizeBytes(), MemoryMode.ON_HEAP);
0045 }
0046
0047
0048
0049
0050 public MemoryMode getMode() {
0051 return mode;
0052 }
0053
0054
0055
0056
0057 public long getUsed() {
0058 return used;
0059 }
0060
0061
0062
0063
0064 public void spill() throws IOException {
0065 spill(Long.MAX_VALUE, this);
0066 }
0067
0068
0069
0070
0071
0072
0073
0074
0075
0076
0077
0078
0079
0080
0081
0082 public abstract long spill(long size, MemoryConsumer trigger) throws IOException;
0083
0084
0085
0086
0087
0088
0089
0090
0091
0092
0093 public LongArray allocateArray(long size) {
0094 long required = size * 8L;
0095 MemoryBlock page = taskMemoryManager.allocatePage(required, this);
0096 if (page == null || page.size() < required) {
0097 throwOom(page, required);
0098 }
0099 used += required;
0100 return new LongArray(page);
0101 }
0102
0103
0104
0105
0106 public void freeArray(LongArray array) {
0107 freePage(array.memoryBlock());
0108 }
0109
0110
0111
0112
0113
0114
0115 protected MemoryBlock allocatePage(long required) {
0116 MemoryBlock page = taskMemoryManager.allocatePage(Math.max(pageSize, required), this);
0117 if (page == null || page.size() < required) {
0118 throwOom(page, required);
0119 }
0120 used += page.size();
0121 return page;
0122 }
0123
0124
0125
0126
0127 protected void freePage(MemoryBlock page) {
0128 used -= page.size();
0129 taskMemoryManager.freePage(page, this);
0130 }
0131
0132
0133
0134
0135 public long acquireMemory(long size) {
0136 long granted = taskMemoryManager.acquireExecutionMemory(size, this);
0137 used += granted;
0138 return granted;
0139 }
0140
0141
0142
0143
0144 public void freeMemory(long size) {
0145 taskMemoryManager.releaseExecutionMemory(size, this);
0146 used -= size;
0147 }
0148
0149 private void throwOom(final MemoryBlock page, final long required) {
0150 long got = 0;
0151 if (page != null) {
0152 got = page.size();
0153 taskMemoryManager.freePage(page, this);
0154 }
0155 taskMemoryManager.showMemoryUsage();
0156
0157 throw new SparkOutOfMemoryError("Unable to acquire " + required + " bytes of memory, got " +
0158 got);
0159
0160 }
0161 }