0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017 package org.apache.spark.sql.catalyst.expressions;
0018
0019 import org.apache.spark.memory.TaskMemoryManager;
0020 import org.apache.spark.sql.types.*;
0021 import org.apache.spark.unsafe.Platform;
0022
0023
0024
0025
0026
0027
0028
0029
0030
0031 public final class FixedLengthRowBasedKeyValueBatch extends RowBasedKeyValueBatch {
0032 private final int klen;
0033 private final int vlen;
0034 private final int recordLength;
0035
0036 private long getKeyOffsetForFixedLengthRecords(int rowId) {
0037 return recordStartOffset + rowId * (long) recordLength;
0038 }
0039
0040
0041
0042
0043
0044
0045 @Override
0046 public UnsafeRow appendRow(Object kbase, long koff, int klen,
0047 Object vbase, long voff, int vlen) {
0048
0049 if (numRows >= capacity || page == null || page.size() - pageCursor < recordLength) {
0050 return null;
0051 }
0052
0053 long offset = page.getBaseOffset() + pageCursor;
0054 final long recordOffset = offset;
0055 Platform.copyMemory(kbase, koff, base, offset, klen);
0056 offset += klen;
0057 Platform.copyMemory(vbase, voff, base, offset, vlen);
0058 offset += vlen;
0059 Platform.putLong(base, offset, 0);
0060
0061 pageCursor += recordLength;
0062
0063 keyRowId = numRows;
0064 keyRow.pointTo(base, recordOffset, klen);
0065 valueRow.pointTo(base, recordOffset + klen, vlen);
0066 numRows++;
0067 return valueRow;
0068 }
0069
0070
0071
0072
0073 @Override
0074 public UnsafeRow getKeyRow(int rowId) {
0075 assert(rowId >= 0);
0076 assert(rowId < numRows);
0077 if (keyRowId != rowId) {
0078 long offset = getKeyOffsetForFixedLengthRecords(rowId);
0079 keyRow.pointTo(base, offset, klen);
0080
0081 keyRowId = rowId;
0082 }
0083 return keyRow;
0084 }
0085
0086
0087
0088
0089
0090
0091
0092 @Override
0093 protected UnsafeRow getValueFromKey(int rowId) {
0094 if (keyRowId != rowId) {
0095 getKeyRow(rowId);
0096 }
0097 assert(rowId >= 0);
0098 valueRow.pointTo(base, keyRow.getBaseOffset() + klen, vlen);
0099 return valueRow;
0100 }
0101
0102
0103
0104
0105 @Override
0106 public org.apache.spark.unsafe.KVIterator<UnsafeRow, UnsafeRow> rowIterator() {
0107 return new org.apache.spark.unsafe.KVIterator<UnsafeRow, UnsafeRow>() {
0108 private final UnsafeRow key = new UnsafeRow(keySchema.length());
0109 private final UnsafeRow value = new UnsafeRow(valueSchema.length());
0110
0111 private long offsetInPage = 0;
0112 private int recordsInPage = 0;
0113
0114 private boolean initialized = false;
0115
0116 private void init() {
0117 if (page != null) {
0118 offsetInPage = page.getBaseOffset();
0119 recordsInPage = numRows;
0120 }
0121 initialized = true;
0122 }
0123
0124 @Override
0125 public boolean next() {
0126 if (!initialized) init();
0127
0128 if (recordsInPage == 0) {
0129 freeCurrentPage();
0130 return false;
0131 }
0132
0133 key.pointTo(base, offsetInPage, klen);
0134 value.pointTo(base, offsetInPage + klen, vlen);
0135
0136 offsetInPage += recordLength;
0137 recordsInPage -= 1;
0138 return true;
0139 }
0140
0141 @Override
0142 public UnsafeRow getKey() {
0143 return key;
0144 }
0145
0146 @Override
0147 public UnsafeRow getValue() {
0148 return value;
0149 }
0150
0151 @Override
0152 public void close() {
0153
0154 }
0155
0156 private void freeCurrentPage() {
0157 if (page != null) {
0158 freePage(page);
0159 page = null;
0160 }
0161 }
0162 };
0163 }
0164
0165 protected FixedLengthRowBasedKeyValueBatch(StructType keySchema, StructType valueSchema,
0166 int maxRows, TaskMemoryManager manager) {
0167 super(keySchema, valueSchema, maxRows, manager);
0168 int keySize = keySchema.size() * 8;
0169 int valueSize = valueSchema.size() * 8;
0170 klen = keySize + UnsafeRow.calculateBitSetWidthInBytes(keySchema.length());
0171 vlen = valueSize + UnsafeRow.calculateBitSetWidthInBytes(valueSchema.length());
0172 recordLength = klen + vlen + 8;
0173 }
0174 }