0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017 package org.apache.spark.sql.catalyst.expressions;
0018
0019 import org.apache.spark.memory.TaskMemoryManager;
0020 import org.apache.spark.sql.types.*;
0021 import org.apache.spark.unsafe.Platform;
0022 import org.apache.spark.unsafe.UnsafeAlignedOffset;
0023
0024
0025
0026
0027
0028
0029
0030
0031
0032
0033 public final class VariableLengthRowBasedKeyValueBatch extends RowBasedKeyValueBatch {
0034
0035 private final long[] keyOffsets;
0036
0037
0038
0039
0040
0041
0042 @Override
0043 public UnsafeRow appendRow(Object kbase, long koff, int klen,
0044 Object vbase, long voff, int vlen) {
0045 int uaoSize = UnsafeAlignedOffset.getUaoSize();
0046 final long recordLength = 2 * uaoSize + klen + vlen + 8L;
0047
0048 if (numRows >= capacity || page == null || page.size() - pageCursor < recordLength) {
0049 return null;
0050 }
0051
0052 long offset = page.getBaseOffset() + pageCursor;
0053 final long recordOffset = offset;
0054 UnsafeAlignedOffset.putSize(base, offset, klen + vlen + uaoSize);
0055 UnsafeAlignedOffset.putSize(base, offset + uaoSize, klen);
0056
0057 offset += 2 * uaoSize;
0058 Platform.copyMemory(kbase, koff, base, offset, klen);
0059 offset += klen;
0060 Platform.copyMemory(vbase, voff, base, offset, vlen);
0061 offset += vlen;
0062 Platform.putLong(base, offset, 0);
0063
0064 pageCursor += recordLength;
0065
0066 keyOffsets[numRows] = recordOffset + 2 * uaoSize;
0067
0068 keyRowId = numRows;
0069 keyRow.pointTo(base, recordOffset + 2 * uaoSize, klen);
0070 valueRow.pointTo(base, recordOffset + 2 * uaoSize + klen, vlen);
0071 numRows++;
0072 return valueRow;
0073 }
0074
0075
0076
0077
0078 @Override
0079 public UnsafeRow getKeyRow(int rowId) {
0080 assert(rowId >= 0);
0081 assert(rowId < numRows);
0082 if (keyRowId != rowId) {
0083 long offset = keyOffsets[rowId];
0084 int klen = UnsafeAlignedOffset.getSize(base, offset - UnsafeAlignedOffset.getUaoSize());
0085 keyRow.pointTo(base, offset, klen);
0086
0087 keyRowId = rowId;
0088 }
0089 return keyRow;
0090 }
0091
0092
0093
0094
0095
0096
0097
0098 @Override
0099 public UnsafeRow getValueFromKey(int rowId) {
0100 if (keyRowId != rowId) {
0101 getKeyRow(rowId);
0102 }
0103 assert(rowId >= 0);
0104 int uaoSize = UnsafeAlignedOffset.getUaoSize();
0105 long offset = keyRow.getBaseOffset();
0106 int klen = keyRow.getSizeInBytes();
0107 int vlen = UnsafeAlignedOffset.getSize(base, offset - uaoSize * 2) - klen - uaoSize;
0108 valueRow.pointTo(base, offset + klen, vlen);
0109 return valueRow;
0110 }
0111
0112
0113
0114
0115 @Override
0116 public org.apache.spark.unsafe.KVIterator<UnsafeRow, UnsafeRow> rowIterator() {
0117 return new org.apache.spark.unsafe.KVIterator<UnsafeRow, UnsafeRow>() {
0118 private final UnsafeRow key = new UnsafeRow(keySchema.length());
0119 private final UnsafeRow value = new UnsafeRow(valueSchema.length());
0120
0121 private long offsetInPage = 0;
0122 private int recordsInPage = 0;
0123
0124 private int currentklen;
0125 private int currentvlen;
0126 private int totalLength;
0127
0128 private boolean initialized = false;
0129
0130 private void init() {
0131 if (page != null) {
0132 offsetInPage = page.getBaseOffset();
0133 recordsInPage = numRows;
0134 }
0135 initialized = true;
0136 }
0137
0138 @Override
0139 public boolean next() {
0140 if (!initialized) init();
0141
0142 if (recordsInPage == 0) {
0143 freeCurrentPage();
0144 return false;
0145 }
0146
0147 int uaoSize = UnsafeAlignedOffset.getUaoSize();
0148 totalLength = UnsafeAlignedOffset.getSize(base, offsetInPage) - uaoSize;
0149 currentklen = UnsafeAlignedOffset.getSize(base, offsetInPage + uaoSize);
0150 currentvlen = totalLength - currentklen;
0151
0152 key.pointTo(base, offsetInPage + 2 * uaoSize, currentklen);
0153 value.pointTo(base, offsetInPage + 2 * uaoSize + currentklen, currentvlen);
0154
0155 offsetInPage += 2 * uaoSize + totalLength + 8;
0156 recordsInPage -= 1;
0157 return true;
0158 }
0159
0160 @Override
0161 public UnsafeRow getKey() {
0162 return key;
0163 }
0164
0165 @Override
0166 public UnsafeRow getValue() {
0167 return value;
0168 }
0169
0170 @Override
0171 public void close() {
0172
0173 }
0174
0175 private void freeCurrentPage() {
0176 if (page != null) {
0177 freePage(page);
0178 page = null;
0179 }
0180 }
0181 };
0182 }
0183
0184 protected VariableLengthRowBasedKeyValueBatch(StructType keySchema, StructType valueSchema,
0185 int maxRows, TaskMemoryManager manager) {
0186 super(keySchema, valueSchema, maxRows, manager);
0187 this.keyOffsets = new long[maxRows];
0188 }
0189 }