0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017 package org.apache.spark.sql.catalyst.expressions;
0018
0019 import java.io.Closeable;
0020 import java.io.IOException;
0021
0022 import org.apache.spark.memory.MemoryConsumer;
0023 import org.apache.spark.memory.SparkOutOfMemoryError;
0024 import org.apache.spark.memory.TaskMemoryManager;
0025 import org.apache.spark.sql.types.*;
0026 import org.apache.spark.unsafe.memory.MemoryBlock;
0027
0028 import org.slf4j.Logger;
0029 import org.slf4j.LoggerFactory;
0030
0031
0032
0033
0034
0035
0036
0037
0038
0039
0040
0041
0042
0043
0044
0045
0046
0047
0048
0049
0050 public abstract class RowBasedKeyValueBatch extends MemoryConsumer implements Closeable {
0051 protected final Logger logger = LoggerFactory.getLogger(RowBasedKeyValueBatch.class);
0052
0053 private static final int DEFAULT_CAPACITY = 1 << 16;
0054
0055 protected final StructType keySchema;
0056 protected final StructType valueSchema;
0057 protected final int capacity;
0058 protected int numRows = 0;
0059
0060
0061 protected int keyRowId = -1;
0062
0063
0064 protected final UnsafeRow keyRow;
0065 protected final UnsafeRow valueRow;
0066
0067 protected MemoryBlock page = null;
0068 protected Object base = null;
0069 protected final long recordStartOffset;
0070 protected long pageCursor = 0;
0071
0072 public static RowBasedKeyValueBatch allocate(StructType keySchema, StructType valueSchema,
0073 TaskMemoryManager manager) {
0074 return allocate(keySchema, valueSchema, manager, DEFAULT_CAPACITY);
0075 }
0076
0077 public static RowBasedKeyValueBatch allocate(StructType keySchema, StructType valueSchema,
0078 TaskMemoryManager manager, int maxRows) {
0079 boolean allFixedLength = true;
0080
0081
0082 for (String name : keySchema.fieldNames()) {
0083 allFixedLength = allFixedLength
0084 && UnsafeRow.isFixedLength(keySchema.apply(name).dataType());
0085 }
0086 for (String name : valueSchema.fieldNames()) {
0087 allFixedLength = allFixedLength
0088 && UnsafeRow.isFixedLength(valueSchema.apply(name).dataType());
0089 }
0090
0091 if (allFixedLength) {
0092 return new FixedLengthRowBasedKeyValueBatch(keySchema, valueSchema, maxRows, manager);
0093 } else {
0094 return new VariableLengthRowBasedKeyValueBatch(keySchema, valueSchema, maxRows, manager);
0095 }
0096 }
0097
0098 protected RowBasedKeyValueBatch(StructType keySchema, StructType valueSchema, int maxRows,
0099 TaskMemoryManager manager) {
0100 super(manager, manager.pageSizeBytes(), manager.getTungstenMemoryMode());
0101
0102 this.keySchema = keySchema;
0103 this.valueSchema = valueSchema;
0104 this.capacity = maxRows;
0105
0106 this.keyRow = new UnsafeRow(keySchema.length());
0107 this.valueRow = new UnsafeRow(valueSchema.length());
0108
0109 if (!acquirePage(manager.pageSizeBytes())) {
0110 page = null;
0111 recordStartOffset = 0;
0112 } else {
0113 base = page.getBaseObject();
0114 recordStartOffset = page.getBaseOffset();
0115 }
0116 }
0117
0118 public final int numRows() { return numRows; }
0119
0120 public final void close() {
0121 if (page != null) {
0122 freePage(page);
0123 page = null;
0124 }
0125 }
0126
0127 private boolean acquirePage(long requiredSize) {
0128 try {
0129 page = allocatePage(requiredSize);
0130 } catch (SparkOutOfMemoryError e) {
0131 logger.warn("Failed to allocate page ({} bytes).", requiredSize);
0132 return false;
0133 }
0134 base = page.getBaseObject();
0135 pageCursor = 0;
0136 return true;
0137 }
0138
0139
0140
0141
0142
0143
0144 public abstract UnsafeRow appendRow(Object kbase, long koff, int klen,
0145 Object vbase, long voff, int vlen);
0146
0147
0148
0149
0150 public abstract UnsafeRow getKeyRow(int rowId);
0151
0152
0153
0154
0155
0156
0157 public final UnsafeRow getValueRow(int rowId) {
0158 return getValueFromKey(rowId);
0159 }
0160
0161
0162
0163
0164
0165
0166
0167 protected abstract UnsafeRow getValueFromKey(int rowId);
0168
0169
0170
0171
0172
0173
0174 public final long spill(long size, MemoryConsumer trigger) throws IOException {
0175 logger.warn("Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.");
0176 return 0;
0177 }
0178
0179
0180
0181
0182 public abstract org.apache.spark.unsafe.KVIterator<UnsafeRow, UnsafeRow> rowIterator();
0183 }