Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 package org.apache.spark.sql.catalyst.expressions;
0018 
0019 import org.apache.spark.memory.TaskMemoryManager;
0020 import org.apache.spark.sql.types.*;
0021 import org.apache.spark.unsafe.Platform;
0022 
0023 /**
0024  * An implementation of `RowBasedKeyValueBatch` in which all key-value records have same length.
0025  *
0026  * The format for each record looks like this:
0027  * [UnsafeRow for key of length klen] [UnsafeRow for Value of length vlen]
0028  * [8 bytes pointer to next]
0029  * Thus, record length = klen + vlen + 8
0030  */
0031 public final class FixedLengthRowBasedKeyValueBatch extends RowBasedKeyValueBatch {
0032   private final int klen;
0033   private final int vlen;
0034   private final int recordLength;
0035 
0036   private long getKeyOffsetForFixedLengthRecords(int rowId) {
0037     return recordStartOffset + rowId * (long) recordLength;
0038   }
0039 
0040   /**
0041    * Append a key value pair.
0042    * It copies data into the backing MemoryBlock.
0043    * Returns an UnsafeRow pointing to the value if succeeds, otherwise returns null.
0044    */
0045   @Override
0046   public UnsafeRow appendRow(Object kbase, long koff, int klen,
0047                              Object vbase, long voff, int vlen) {
0048     // if run out of max supported rows or page size, return null
0049     if (numRows >= capacity || page == null || page.size() - pageCursor < recordLength) {
0050       return null;
0051     }
0052 
0053     long offset = page.getBaseOffset() + pageCursor;
0054     final long recordOffset = offset;
0055     Platform.copyMemory(kbase, koff, base, offset, klen);
0056     offset += klen;
0057     Platform.copyMemory(vbase, voff, base, offset, vlen);
0058     offset += vlen;
0059     Platform.putLong(base, offset, 0);
0060 
0061     pageCursor += recordLength;
0062 
0063     keyRowId = numRows;
0064     keyRow.pointTo(base, recordOffset, klen);
0065     valueRow.pointTo(base, recordOffset + klen, vlen);
0066     numRows++;
0067     return valueRow;
0068   }
0069 
0070   /**
0071    * Returns the key row in this batch at `rowId`. Returned key row is reused across calls.
0072    */
0073   @Override
0074   public UnsafeRow getKeyRow(int rowId) {
0075     assert(rowId >= 0);
0076     assert(rowId < numRows);
0077     if (keyRowId != rowId) { // if keyRowId == rowId, desired keyRow is already cached
0078       long offset = getKeyOffsetForFixedLengthRecords(rowId);
0079       keyRow.pointTo(base, offset, klen);
0080       // set keyRowId so we can check if desired row is cached
0081       keyRowId = rowId;
0082     }
0083     return keyRow;
0084   }
0085 
0086   /**
0087    * Returns the value row by two steps:
0088    * 1) looking up the key row with the same id (skipped if the key row is cached)
0089    * 2) retrieve the value row by reusing the metadata from step 1)
0090    * In most times, 1) is skipped because `getKeyRow(id)` is often called before `getValueRow(id)`.
0091    */
0092   @Override
0093   protected UnsafeRow getValueFromKey(int rowId) {
0094     if (keyRowId != rowId) {
0095       getKeyRow(rowId);
0096     }
0097     assert(rowId >= 0);
0098     valueRow.pointTo(base, keyRow.getBaseOffset() + klen, vlen);
0099     return valueRow;
0100   }
0101 
0102   /**
0103    * Returns an iterator to go through all rows
0104    */
0105   @Override
0106   public org.apache.spark.unsafe.KVIterator<UnsafeRow, UnsafeRow> rowIterator() {
0107     return new org.apache.spark.unsafe.KVIterator<UnsafeRow, UnsafeRow>() {
0108       private final UnsafeRow key = new UnsafeRow(keySchema.length());
0109       private final UnsafeRow value = new UnsafeRow(valueSchema.length());
0110 
0111       private long offsetInPage = 0;
0112       private int recordsInPage = 0;
0113 
0114       private boolean initialized = false;
0115 
0116       private void init() {
0117         if (page != null) {
0118           offsetInPage = page.getBaseOffset();
0119           recordsInPage = numRows;
0120         }
0121         initialized = true;
0122       }
0123 
0124       @Override
0125       public boolean next() {
0126         if (!initialized) init();
0127         //searching for the next non empty page is records is now zero
0128         if (recordsInPage == 0) {
0129           freeCurrentPage();
0130           return false;
0131         }
0132 
0133         key.pointTo(base, offsetInPage, klen);
0134         value.pointTo(base, offsetInPage + klen, vlen);
0135 
0136         offsetInPage += recordLength;
0137         recordsInPage -= 1;
0138         return true;
0139       }
0140 
0141       @Override
0142       public UnsafeRow getKey() {
0143         return key;
0144       }
0145 
0146       @Override
0147       public UnsafeRow getValue() {
0148         return value;
0149       }
0150 
0151       @Override
0152       public void close() {
0153         // do nothing
0154       }
0155 
0156       private void freeCurrentPage() {
0157         if (page != null) {
0158           freePage(page);
0159           page = null;
0160         }
0161       }
0162     };
0163   }
0164 
0165   protected FixedLengthRowBasedKeyValueBatch(StructType keySchema, StructType valueSchema,
0166                                              int maxRows, TaskMemoryManager manager) {
0167     super(keySchema, valueSchema, maxRows, manager);
0168     int keySize = keySchema.size() * 8; // each fixed-length field is stored in a 8-byte word
0169     int valueSize = valueSchema.size() * 8;
0170     klen = keySize + UnsafeRow.calculateBitSetWidthInBytes(keySchema.length());
0171     vlen = valueSize + UnsafeRow.calculateBitSetWidthInBytes(valueSchema.length());
0172     recordLength = klen + vlen + 8;
0173   }
0174 }