Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 package org.apache.spark.sql.catalyst.expressions;
0018 
0019 import org.apache.spark.memory.TaskMemoryManager;
0020 import org.apache.spark.sql.types.*;
0021 import org.apache.spark.unsafe.Platform;
0022 import org.apache.spark.unsafe.UnsafeAlignedOffset;
0023 
0024 /**
0025  * An implementation of `RowBasedKeyValueBatch` in which key-value records have variable lengths.
0026  *
0027  *  The format for each record looks like this (in case of uaoSize = 4):
0028  * [4 bytes total size = (klen + vlen + 4)] [4 bytes key size = klen]
0029  * [UnsafeRow for key of length klen] [UnsafeRow for Value of length vlen]
0030  * [8 bytes pointer to next]
0031  * Thus, record length = 4 + 4 + klen + vlen + 8
0032  */
0033 public final class VariableLengthRowBasedKeyValueBatch extends RowBasedKeyValueBatch {
0034   // full addresses for key rows and value rows
0035   private final long[] keyOffsets;
0036 
0037   /**
0038    * Append a key value pair.
0039    * It copies data into the backing MemoryBlock.
0040    * Returns an UnsafeRow pointing to the value if succeeds, otherwise returns null.
0041    */
0042   @Override
0043   public UnsafeRow appendRow(Object kbase, long koff, int klen,
0044                              Object vbase, long voff, int vlen) {
0045     int uaoSize = UnsafeAlignedOffset.getUaoSize();
0046     final long recordLength = 2 * uaoSize + klen + vlen + 8L;
0047     // if run out of max supported rows or page size, return null
0048     if (numRows >= capacity || page == null || page.size() - pageCursor < recordLength) {
0049       return null;
0050     }
0051 
0052     long offset = page.getBaseOffset() + pageCursor;
0053     final long recordOffset = offset;
0054     UnsafeAlignedOffset.putSize(base, offset, klen + vlen + uaoSize);
0055     UnsafeAlignedOffset.putSize(base, offset + uaoSize, klen);
0056 
0057     offset += 2 * uaoSize;
0058     Platform.copyMemory(kbase, koff, base, offset, klen);
0059     offset += klen;
0060     Platform.copyMemory(vbase, voff, base, offset, vlen);
0061     offset += vlen;
0062     Platform.putLong(base, offset, 0);
0063 
0064     pageCursor += recordLength;
0065 
0066     keyOffsets[numRows] = recordOffset + 2 * uaoSize;
0067 
0068     keyRowId = numRows;
0069     keyRow.pointTo(base, recordOffset + 2 * uaoSize, klen);
0070     valueRow.pointTo(base, recordOffset + 2 * uaoSize + klen, vlen);
0071     numRows++;
0072     return valueRow;
0073   }
0074 
0075   /**
0076    * Returns the key row in this batch at `rowId`. Returned key row is reused across calls.
0077    */
0078   @Override
0079   public UnsafeRow getKeyRow(int rowId) {
0080     assert(rowId >= 0);
0081     assert(rowId < numRows);
0082     if (keyRowId != rowId) { // if keyRowId == rowId, desired keyRow is already cached
0083       long offset = keyOffsets[rowId];
0084       int klen = UnsafeAlignedOffset.getSize(base, offset - UnsafeAlignedOffset.getUaoSize());
0085       keyRow.pointTo(base, offset, klen);
0086       // set keyRowId so we can check if desired row is cached
0087       keyRowId = rowId;
0088     }
0089     return keyRow;
0090   }
0091 
0092   /**
0093    * Returns the value row by two steps:
0094    * 1) looking up the key row with the same id (skipped if the key row is cached)
0095    * 2) retrieve the value row by reusing the metadata from step 1)
0096    * In most times, 1) is skipped because `getKeyRow(id)` is often called before `getValueRow(id)`.
0097    */
0098   @Override
0099   public UnsafeRow getValueFromKey(int rowId) {
0100     if (keyRowId != rowId) {
0101       getKeyRow(rowId);
0102     }
0103     assert(rowId >= 0);
0104     int uaoSize = UnsafeAlignedOffset.getUaoSize();
0105     long offset = keyRow.getBaseOffset();
0106     int klen = keyRow.getSizeInBytes();
0107     int vlen = UnsafeAlignedOffset.getSize(base, offset - uaoSize * 2) - klen - uaoSize;
0108     valueRow.pointTo(base, offset + klen, vlen);
0109     return valueRow;
0110   }
0111 
0112   /**
0113    * Returns an iterator to go through all rows
0114    */
0115   @Override
0116   public org.apache.spark.unsafe.KVIterator<UnsafeRow, UnsafeRow> rowIterator() {
0117     return new org.apache.spark.unsafe.KVIterator<UnsafeRow, UnsafeRow>() {
0118       private final UnsafeRow key = new UnsafeRow(keySchema.length());
0119       private final UnsafeRow value = new UnsafeRow(valueSchema.length());
0120 
0121       private long offsetInPage = 0;
0122       private int recordsInPage = 0;
0123 
0124       private int currentklen;
0125       private int currentvlen;
0126       private int totalLength;
0127 
0128       private boolean initialized = false;
0129 
0130       private void init() {
0131         if (page != null) {
0132           offsetInPage = page.getBaseOffset();
0133           recordsInPage = numRows;
0134         }
0135         initialized = true;
0136       }
0137 
0138       @Override
0139       public boolean next() {
0140         if (!initialized) init();
0141         //searching for the next non empty page is records is now zero
0142         if (recordsInPage == 0) {
0143           freeCurrentPage();
0144           return false;
0145         }
0146 
0147         int uaoSize = UnsafeAlignedOffset.getUaoSize();
0148         totalLength = UnsafeAlignedOffset.getSize(base, offsetInPage) - uaoSize;
0149         currentklen = UnsafeAlignedOffset.getSize(base, offsetInPage + uaoSize);
0150         currentvlen = totalLength - currentklen;
0151 
0152         key.pointTo(base, offsetInPage + 2 * uaoSize, currentklen);
0153         value.pointTo(base, offsetInPage + 2 * uaoSize + currentklen, currentvlen);
0154 
0155         offsetInPage += 2 * uaoSize + totalLength + 8;
0156         recordsInPage -= 1;
0157         return true;
0158       }
0159 
0160       @Override
0161       public UnsafeRow getKey() {
0162         return key;
0163       }
0164 
0165       @Override
0166       public UnsafeRow getValue() {
0167         return value;
0168       }
0169 
0170       @Override
0171       public void close() {
0172         // do nothing
0173       }
0174 
0175       private void freeCurrentPage() {
0176         if (page != null) {
0177           freePage(page);
0178           page = null;
0179         }
0180       }
0181     };
0182   }
0183 
0184   protected VariableLengthRowBasedKeyValueBatch(StructType keySchema, StructType valueSchema,
0185                                               int maxRows, TaskMemoryManager manager) {
0186     super(keySchema, valueSchema, maxRows, manager);
0187     this.keyOffsets = new long[maxRows];
0188   }
0189 }