Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.sql.catalyst.expressions.codegen;
0019 
0020 import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
0021 import org.apache.spark.unsafe.Platform;
0022 import org.apache.spark.unsafe.array.ByteArrayMethods;
0023 
0024 /**
0025  * A helper class to manage the data buffer for an unsafe row.  The data buffer can grow and
0026  * automatically re-point the unsafe row to it.
0027  *
0028  * This class can be used to build a one-pass unsafe row writing program, i.e. data will be written
0029  * to the data buffer directly and no extra copy is needed.  There should be only one instance of
0030  * this class per writing program, so that the memory segment/data buffer can be reused.  Note that
0031  * for each incoming record, we should call `reset` of BufferHolder instance before write the record
0032  * and reuse the data buffer.
0033  */
0034 final class BufferHolder {
0035 
0036   private static final int ARRAY_MAX = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH;
0037 
0038   // buffer is guarantee to be word-aligned since UnsafeRow assumes each field is word-aligned.
0039   private byte[] buffer;
0040   private int cursor = Platform.BYTE_ARRAY_OFFSET;
0041   private final UnsafeRow row;
0042   private final int fixedSize;
0043 
0044   BufferHolder(UnsafeRow row) {
0045     this(row, 64);
0046   }
0047 
0048   BufferHolder(UnsafeRow row, int initialSize) {
0049     int bitsetWidthInBytes = UnsafeRow.calculateBitSetWidthInBytes(row.numFields());
0050     if (row.numFields() > (ARRAY_MAX - initialSize - bitsetWidthInBytes) / 8) {
0051       throw new UnsupportedOperationException(
0052         "Cannot create BufferHolder for input UnsafeRow because there are " +
0053           "too many fields (number of fields: " + row.numFields() + ")");
0054     }
0055     this.fixedSize = bitsetWidthInBytes + 8 * row.numFields();
0056     int roundedSize = ByteArrayMethods.roundNumberOfBytesToNearestWord(fixedSize + initialSize);
0057     this.buffer = new byte[roundedSize];
0058     this.row = row;
0059     this.row.pointTo(buffer, buffer.length);
0060   }
0061 
0062   /**
0063    * Grows the buffer by at least neededSize and points the row to the buffer.
0064    */
0065   void grow(int neededSize) {
0066     if (neededSize < 0) {
0067       throw new IllegalArgumentException(
0068         "Cannot grow BufferHolder by size " + neededSize + " because the size is negative");
0069     }
0070     if (neededSize > ARRAY_MAX - totalSize()) {
0071       throw new IllegalArgumentException(
0072         "Cannot grow BufferHolder by size " + neededSize + " because the size after growing " +
0073           "exceeds size limitation " + ARRAY_MAX);
0074     }
0075     final int length = totalSize() + neededSize;
0076     if (buffer.length < length) {
0077       // This will not happen frequently, because the buffer is re-used.
0078       int newLength = length < ARRAY_MAX / 2 ? length * 2 : ARRAY_MAX;
0079       int roundedSize = ByteArrayMethods.roundNumberOfBytesToNearestWord(newLength);
0080       final byte[] tmp = new byte[roundedSize];
0081       Platform.copyMemory(
0082         buffer,
0083         Platform.BYTE_ARRAY_OFFSET,
0084         tmp,
0085         Platform.BYTE_ARRAY_OFFSET,
0086         totalSize());
0087       buffer = tmp;
0088       row.pointTo(buffer, buffer.length);
0089     }
0090   }
0091 
0092   byte[] getBuffer() {
0093     return buffer;
0094   }
0095 
0096   int getCursor() {
0097     return cursor;
0098   }
0099 
0100   void increaseCursor(int val) {
0101     cursor += val;
0102   }
0103 
0104   void reset() {
0105     cursor = Platform.BYTE_ARRAY_OFFSET + fixedSize;
0106   }
0107 
0108   int totalSize() {
0109     return cursor - Platform.BYTE_ARRAY_OFFSET;
0110   }
0111 }