Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 package org.apache.spark.sql.catalyst.expressions;
0018 
0019 import java.io.Closeable;
0020 import java.io.IOException;
0021 
0022 import org.apache.spark.memory.MemoryConsumer;
0023 import org.apache.spark.memory.SparkOutOfMemoryError;
0024 import org.apache.spark.memory.TaskMemoryManager;
0025 import org.apache.spark.sql.types.*;
0026 import org.apache.spark.unsafe.memory.MemoryBlock;
0027 
0028 import org.slf4j.Logger;
0029 import org.slf4j.LoggerFactory;
0030 
0031 
0032 /**
0033  * RowBasedKeyValueBatch stores key value pairs in contiguous memory region.
0034  *
0035  * Each key or value is stored as a single UnsafeRow. Each record contains one key and one value
0036  * and some auxiliary data, which differs based on implementation:
0037  * i.e., `FixedLengthRowBasedKeyValueBatch` and `VariableLengthRowBasedKeyValueBatch`.
0038  *
0039  * We use `FixedLengthRowBasedKeyValueBatch` if all fields in the key and the value are fixed-length
0040  * data types. Otherwise we use `VariableLengthRowBasedKeyValueBatch`.
0041  *
0042  * RowBasedKeyValueBatch is backed by a single page / MemoryBlock (ranges from 1 to 64MB depending
0043  * on the system configuration). If the page is full, the aggregate logic should fallback to a
0044  * second level, larger hash map. We intentionally use the single-page design because it simplifies
0045  * memory address encoding & decoding for each key-value pair. Because the maximum capacity for
0046  * RowBasedKeyValueBatch is only 2^16, it is unlikely we need a second page anyway. Filling the
0047  * page requires an average size for key value pairs to be larger than 1024 bytes.
0048  *
0049  */
0050 public abstract class RowBasedKeyValueBatch extends MemoryConsumer implements Closeable {
0051   protected final Logger logger = LoggerFactory.getLogger(RowBasedKeyValueBatch.class);
0052 
0053   private static final int DEFAULT_CAPACITY = 1 << 16;
0054 
0055   protected final StructType keySchema;
0056   protected final StructType valueSchema;
0057   protected final int capacity;
0058   protected int numRows = 0;
0059 
0060   // ids for current key row and value row being retrieved
0061   protected int keyRowId = -1;
0062 
0063   // placeholder for key and value corresponding to keyRowId.
0064   protected final UnsafeRow keyRow;
0065   protected final UnsafeRow valueRow;
0066 
0067   protected MemoryBlock page = null;
0068   protected Object base = null;
0069   protected final long recordStartOffset;
0070   protected long pageCursor = 0;
0071 
0072   public static RowBasedKeyValueBatch allocate(StructType keySchema, StructType valueSchema,
0073                                                TaskMemoryManager manager) {
0074     return allocate(keySchema, valueSchema, manager, DEFAULT_CAPACITY);
0075   }
0076 
0077   public static RowBasedKeyValueBatch allocate(StructType keySchema, StructType valueSchema,
0078                                                TaskMemoryManager manager, int maxRows) {
0079     boolean allFixedLength = true;
0080     // checking if there is any variable length fields
0081     // there is probably a more succinct impl of this
0082     for (String name : keySchema.fieldNames()) {
0083       allFixedLength = allFixedLength
0084               && UnsafeRow.isFixedLength(keySchema.apply(name).dataType());
0085     }
0086     for (String name : valueSchema.fieldNames()) {
0087       allFixedLength = allFixedLength
0088               && UnsafeRow.isFixedLength(valueSchema.apply(name).dataType());
0089     }
0090 
0091     if (allFixedLength) {
0092       return new FixedLengthRowBasedKeyValueBatch(keySchema, valueSchema, maxRows, manager);
0093     } else {
0094       return new VariableLengthRowBasedKeyValueBatch(keySchema, valueSchema, maxRows, manager);
0095     }
0096   }
0097 
0098   protected RowBasedKeyValueBatch(StructType keySchema, StructType valueSchema, int maxRows,
0099                                 TaskMemoryManager manager) {
0100     super(manager, manager.pageSizeBytes(), manager.getTungstenMemoryMode());
0101 
0102     this.keySchema = keySchema;
0103     this.valueSchema = valueSchema;
0104     this.capacity = maxRows;
0105 
0106     this.keyRow = new UnsafeRow(keySchema.length());
0107     this.valueRow = new UnsafeRow(valueSchema.length());
0108 
0109     if (!acquirePage(manager.pageSizeBytes())) {
0110       page = null;
0111       recordStartOffset = 0;
0112     } else {
0113       base = page.getBaseObject();
0114       recordStartOffset = page.getBaseOffset();
0115     }
0116   }
0117 
0118   public final int numRows() { return numRows; }
0119 
0120   public final void close() {
0121     if (page != null) {
0122       freePage(page);
0123       page = null;
0124     }
0125   }
0126 
0127   private boolean acquirePage(long requiredSize) {
0128     try {
0129       page = allocatePage(requiredSize);
0130     } catch (SparkOutOfMemoryError e) {
0131       logger.warn("Failed to allocate page ({} bytes).", requiredSize);
0132       return false;
0133     }
0134     base = page.getBaseObject();
0135     pageCursor = 0;
0136     return true;
0137   }
0138 
0139   /**
0140    * Append a key value pair.
0141    * It copies data into the backing MemoryBlock.
0142    * Returns an UnsafeRow pointing to the value if succeeds, otherwise returns null.
0143    */
0144   public abstract UnsafeRow appendRow(Object kbase, long koff, int klen,
0145                                       Object vbase, long voff, int vlen);
0146 
0147   /**
0148    * Returns the key row in this batch at `rowId`. Returned key row is reused across calls.
0149    */
0150   public abstract UnsafeRow getKeyRow(int rowId);
0151 
0152   /**
0153    * Returns the value row in this batch at `rowId`. Returned value row is reused across calls.
0154    * Because `getValueRow(id)` is always called after `getKeyRow(id)` with the same id, we use
0155    * `getValueFromKey(id) to retrieve value row, which reuses metadata from the cached key.
0156    */
0157   public final UnsafeRow getValueRow(int rowId) {
0158     return getValueFromKey(rowId);
0159   }
0160 
0161   /**
0162    * Returns the value row by two steps:
0163    * 1) looking up the key row with the same id (skipped if the key row is cached)
0164    * 2) retrieve the value row by reusing the metadata from step 1)
0165    * In most times, 1) is skipped because `getKeyRow(id)` is often called before `getValueRow(id)`.
0166    */
0167   protected abstract UnsafeRow getValueFromKey(int rowId);
0168 
0169   /**
0170    * Sometimes the TaskMemoryManager may call spill() on its associated MemoryConsumers to make
0171    * space for new consumers. For RowBasedKeyValueBatch, we do not actually spill and return 0.
0172    * We should not throw OutOfMemory exception here because other associated consumers might spill
0173    */
0174   public final long spill(long size, MemoryConsumer trigger) throws IOException {
0175     logger.warn("Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.");
0176     return 0;
0177   }
0178 
0179   /**
0180    * Returns an iterator to go through all rows
0181    */
0182   public abstract org.apache.spark.unsafe.KVIterator<UnsafeRow, UnsafeRow> rowIterator();
0183 }