Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.sql.catalyst.expressions.codegen;
0019 
0020 import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
0021 import org.apache.spark.sql.types.Decimal;
0022 import org.apache.spark.unsafe.Platform;
0023 import org.apache.spark.unsafe.bitset.BitSetMethods;
0024 
0025 /**
0026  * A helper class to write data into global row buffer using `UnsafeRow` format.
0027  *
0028  * It will remember the offset of row buffer which it starts to write, and move the cursor of row
0029  * buffer while writing.  If new data(can be the input record if this is the outermost writer, or
0030  * nested struct if this is an inner writer) comes, the starting cursor of row buffer may be
0031  * changed, so we need to call `UnsafeRowWriter.resetRowWriter` before writing, to update the
0032  * `startingOffset` and clear out null bits.
0033  *
0034  * Note that if this is the outermost writer, which means we will always write from the very
0035  * beginning of the global row buffer, we don't need to update `startingOffset` and can just call
0036  * `zeroOutNullBytes` before writing new data.
0037  */
0038 public final class UnsafeRowWriter extends UnsafeWriter {
0039 
0040   private final UnsafeRow row;
0041 
0042   private final int nullBitsSize;
0043   private final int fixedSize;
0044 
0045   public UnsafeRowWriter(int numFields) {
0046     this(new UnsafeRow(numFields));
0047   }
0048 
0049   public UnsafeRowWriter(int numFields, int initialBufferSize) {
0050     this(new UnsafeRow(numFields), initialBufferSize);
0051   }
0052 
0053   public UnsafeRowWriter(UnsafeWriter writer, int numFields) {
0054     this(null, writer.getBufferHolder(), numFields);
0055   }
0056 
0057   private UnsafeRowWriter(UnsafeRow row) {
0058     this(row, new BufferHolder(row), row.numFields());
0059   }
0060 
0061   private UnsafeRowWriter(UnsafeRow row, int initialBufferSize) {
0062     this(row, new BufferHolder(row, initialBufferSize), row.numFields());
0063   }
0064 
0065   private UnsafeRowWriter(UnsafeRow row, BufferHolder holder, int numFields) {
0066     super(holder);
0067     this.row = row;
0068     this.nullBitsSize = UnsafeRow.calculateBitSetWidthInBytes(numFields);
0069     this.fixedSize = nullBitsSize + 8 * numFields;
0070     this.startingOffset = cursor();
0071   }
0072 
0073   /**
0074    * Updates total size of the UnsafeRow using the size collected by BufferHolder, and returns
0075    * the UnsafeRow created at a constructor
0076    */
0077   public UnsafeRow getRow() {
0078     row.setTotalSize(totalSize());
0079     return row;
0080   }
0081 
0082   /**
0083    * Resets the `startingOffset` according to the current cursor of row buffer, and clear out null
0084    * bits.  This should be called before we write a new nested struct to the row buffer.
0085    */
0086   public void resetRowWriter() {
0087     this.startingOffset = cursor();
0088 
0089     // grow the global buffer to make sure it has enough space to write fixed-length data.
0090     grow(fixedSize);
0091     increaseCursor(fixedSize);
0092 
0093     zeroOutNullBytes();
0094   }
0095 
0096   /**
0097    * Clears out null bits.  This should be called before we write a new row to row buffer.
0098    */
0099   public void zeroOutNullBytes() {
0100     for (int i = 0; i < nullBitsSize; i += 8) {
0101       Platform.putLong(getBuffer(), startingOffset + i, 0L);
0102     }
0103   }
0104 
0105   public boolean isNullAt(int ordinal) {
0106     return BitSetMethods.isSet(getBuffer(), startingOffset, ordinal);
0107   }
0108 
0109   public void setNullAt(int ordinal) {
0110     BitSetMethods.set(getBuffer(), startingOffset, ordinal);
0111     write(ordinal, 0L);
0112   }
0113 
0114   @Override
0115   public void setNull1Bytes(int ordinal) {
0116     setNullAt(ordinal);
0117   }
0118 
0119   @Override
0120   public void setNull2Bytes(int ordinal) {
0121     setNullAt(ordinal);
0122   }
0123 
0124   @Override
0125   public void setNull4Bytes(int ordinal) {
0126     setNullAt(ordinal);
0127   }
0128 
0129   @Override
0130   public void setNull8Bytes(int ordinal) {
0131     setNullAt(ordinal);
0132   }
0133 
0134   public long getFieldOffset(int ordinal) {
0135     return startingOffset + nullBitsSize + 8L * ordinal;
0136   }
0137 
0138   @Override
0139   public void write(int ordinal, boolean value) {
0140     final long offset = getFieldOffset(ordinal);
0141     writeLong(offset, 0L);
0142     writeBoolean(offset, value);
0143   }
0144 
0145   @Override
0146   public void write(int ordinal, byte value) {
0147     final long offset = getFieldOffset(ordinal);
0148     writeLong(offset, 0L);
0149     writeByte(offset, value);
0150   }
0151 
0152   @Override
0153   public void write(int ordinal, short value) {
0154     final long offset = getFieldOffset(ordinal);
0155     writeLong(offset, 0L);
0156     writeShort(offset, value);
0157   }
0158 
0159   @Override
0160   public void write(int ordinal, int value) {
0161     final long offset = getFieldOffset(ordinal);
0162     writeLong(offset, 0L);
0163     writeInt(offset, value);
0164   }
0165 
0166   @Override
0167   public void write(int ordinal, long value) {
0168     writeLong(getFieldOffset(ordinal), value);
0169   }
0170 
0171   @Override
0172   public void write(int ordinal, float value) {
0173     final long offset = getFieldOffset(ordinal);
0174     writeLong(offset, 0);
0175     writeFloat(offset, value);
0176   }
0177 
0178   @Override
0179   public void write(int ordinal, double value) {
0180     writeDouble(getFieldOffset(ordinal), value);
0181   }
0182 
0183   @Override
0184   public void write(int ordinal, Decimal input, int precision, int scale) {
0185     if (precision <= Decimal.MAX_LONG_DIGITS()) {
0186       // make sure Decimal object has the same scale as DecimalType
0187       if (input != null && input.changePrecision(precision, scale)) {
0188         write(ordinal, input.toUnscaledLong());
0189       } else {
0190         setNullAt(ordinal);
0191       }
0192     } else {
0193       // grow the global buffer before writing data.
0194       holder.grow(16);
0195 
0196       // always zero-out the 16-byte buffer
0197       Platform.putLong(getBuffer(), cursor(), 0L);
0198       Platform.putLong(getBuffer(), cursor() + 8, 0L);
0199 
0200       // Make sure Decimal object has the same scale as DecimalType.
0201       // Note that we may pass in null Decimal object to set null for it.
0202       if (input == null || !input.changePrecision(precision, scale)) {
0203         BitSetMethods.set(getBuffer(), startingOffset, ordinal);
0204         // keep the offset for future update
0205         setOffsetAndSize(ordinal, 0);
0206       } else {
0207         final byte[] bytes = input.toJavaBigDecimal().unscaledValue().toByteArray();
0208         final int numBytes = bytes.length;
0209         assert numBytes <= 16;
0210 
0211         // Write the bytes to the variable length portion.
0212         Platform.copyMemory(
0213           bytes, Platform.BYTE_ARRAY_OFFSET, getBuffer(), cursor(), numBytes);
0214         setOffsetAndSize(ordinal, bytes.length);
0215       }
0216 
0217       // move the cursor forward.
0218       increaseCursor(16);
0219     }
0220   }
0221 }