Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 package org.apache.spark.sql.catalyst.expressions.codegen;
0018 
0019 import org.apache.spark.sql.catalyst.expressions.UnsafeArrayData;
0020 import org.apache.spark.sql.catalyst.expressions.UnsafeMapData;
0021 import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
0022 import org.apache.spark.sql.types.Decimal;
0023 import org.apache.spark.unsafe.Platform;
0024 import org.apache.spark.unsafe.array.ByteArrayMethods;
0025 import org.apache.spark.unsafe.bitset.BitSetMethods;
0026 import org.apache.spark.unsafe.types.CalendarInterval;
0027 import org.apache.spark.unsafe.types.UTF8String;
0028 
0029 /**
0030  * Base class for writing Unsafe* structures.
0031  */
0032 public abstract class UnsafeWriter {
0033   // Keep internal buffer holder
0034   protected final BufferHolder holder;
0035 
0036   // The offset of the global buffer where we start to write this structure.
0037   protected int startingOffset;
0038 
0039   protected UnsafeWriter(BufferHolder holder) {
0040     this.holder = holder;
0041   }
0042 
0043   /**
0044    * Accessor methods are delegated from BufferHolder class
0045    */
0046   public final BufferHolder getBufferHolder() {
0047     return holder;
0048   }
0049 
0050   public final byte[] getBuffer() {
0051     return holder.getBuffer();
0052   }
0053 
0054   public final void reset() {
0055     holder.reset();
0056   }
0057 
0058   public final int totalSize() {
0059     return holder.totalSize();
0060   }
0061 
0062   public final void grow(int neededSize) {
0063     holder.grow(neededSize);
0064   }
0065 
0066   public final int cursor() {
0067     return holder.getCursor();
0068   }
0069 
0070   public final void increaseCursor(int val) {
0071     holder.increaseCursor(val);
0072   }
0073 
0074   public final void setOffsetAndSizeFromPreviousCursor(int ordinal, int previousCursor) {
0075     setOffsetAndSize(ordinal, previousCursor, cursor() - previousCursor);
0076   }
0077 
0078   protected void setOffsetAndSize(int ordinal, int size) {
0079     setOffsetAndSize(ordinal, cursor(), size);
0080   }
0081 
0082   protected void setOffsetAndSize(int ordinal, int currentCursor, int size) {
0083     final long relativeOffset = currentCursor - startingOffset;
0084     final long offsetAndSize = (relativeOffset << 32) | (long)size;
0085 
0086     write(ordinal, offsetAndSize);
0087   }
0088 
0089   protected final void zeroOutPaddingBytes(int numBytes) {
0090     if ((numBytes & 0x07) > 0) {
0091       Platform.putLong(getBuffer(), cursor() + ((numBytes >> 3) << 3), 0L);
0092     }
0093   }
0094 
0095   public abstract void setNull1Bytes(int ordinal);
0096   public abstract void setNull2Bytes(int ordinal);
0097   public abstract void setNull4Bytes(int ordinal);
0098   public abstract void setNull8Bytes(int ordinal);
0099 
0100   public abstract void write(int ordinal, boolean value);
0101   public abstract void write(int ordinal, byte value);
0102   public abstract void write(int ordinal, short value);
0103   public abstract void write(int ordinal, int value);
0104   public abstract void write(int ordinal, long value);
0105   public abstract void write(int ordinal, float value);
0106   public abstract void write(int ordinal, double value);
0107   public abstract void write(int ordinal, Decimal input, int precision, int scale);
0108 
0109   public final void write(int ordinal, UTF8String input) {
0110     writeUnalignedBytes(ordinal, input.getBaseObject(), input.getBaseOffset(), input.numBytes());
0111   }
0112 
0113   public final void write(int ordinal, byte[] input) {
0114     write(ordinal, input, 0, input.length);
0115   }
0116 
0117   public final void write(int ordinal, byte[] input, int offset, int numBytes) {
0118     writeUnalignedBytes(ordinal, input, Platform.BYTE_ARRAY_OFFSET + offset, numBytes);
0119   }
0120 
0121   private void writeUnalignedBytes(
0122       int ordinal,
0123       Object baseObject,
0124       long baseOffset,
0125       int numBytes) {
0126     final int roundedSize = ByteArrayMethods.roundNumberOfBytesToNearestWord(numBytes);
0127     grow(roundedSize);
0128     zeroOutPaddingBytes(numBytes);
0129     Platform.copyMemory(baseObject, baseOffset, getBuffer(), cursor(), numBytes);
0130     setOffsetAndSize(ordinal, numBytes);
0131     increaseCursor(roundedSize);
0132   }
0133 
0134   public final void write(int ordinal, CalendarInterval input) {
0135     // grow the global buffer before writing data.
0136     grow(16);
0137 
0138     if (input == null) {
0139       BitSetMethods.set(getBuffer(), startingOffset, ordinal);
0140     } else {
0141       // Write the months, days and microseconds fields of interval to the variable length portion.
0142       Platform.putInt(getBuffer(), cursor(), input.months);
0143       Platform.putInt(getBuffer(), cursor() + 4, input.days);
0144       Platform.putLong(getBuffer(), cursor() + 8, input.microseconds);
0145     }
0146     // we need to reserve the space so that we can update it later.
0147     setOffsetAndSize(ordinal, 16);
0148     // move the cursor forward.
0149     increaseCursor(16);
0150   }
0151 
0152   public final void write(int ordinal, UnsafeRow row) {
0153     writeAlignedBytes(ordinal, row.getBaseObject(), row.getBaseOffset(), row.getSizeInBytes());
0154   }
0155 
0156   public final void write(int ordinal, UnsafeMapData map) {
0157     writeAlignedBytes(ordinal, map.getBaseObject(), map.getBaseOffset(), map.getSizeInBytes());
0158   }
0159 
0160   public final void write(UnsafeArrayData array) {
0161     // Unsafe arrays both can be written as a regular array field or as part of a map. This makes
0162     // updating the offset and size dependent on the code path, this is why we currently do not
0163     // provide an method for writing unsafe arrays that also updates the size and offset.
0164     int numBytes = array.getSizeInBytes();
0165     grow(numBytes);
0166     Platform.copyMemory(
0167             array.getBaseObject(),
0168             array.getBaseOffset(),
0169             getBuffer(),
0170             cursor(),
0171             numBytes);
0172     increaseCursor(numBytes);
0173   }
0174 
0175   private void writeAlignedBytes(
0176       int ordinal,
0177       Object baseObject,
0178       long baseOffset,
0179       int numBytes) {
0180     grow(numBytes);
0181     Platform.copyMemory(baseObject, baseOffset, getBuffer(), cursor(), numBytes);
0182     setOffsetAndSize(ordinal, numBytes);
0183     increaseCursor(numBytes);
0184   }
0185 
0186   protected final void writeBoolean(long offset, boolean value) {
0187     Platform.putBoolean(getBuffer(), offset, value);
0188   }
0189 
0190   protected final void writeByte(long offset, byte value) {
0191     Platform.putByte(getBuffer(), offset, value);
0192   }
0193 
0194   protected final void writeShort(long offset, short value) {
0195     Platform.putShort(getBuffer(), offset, value);
0196   }
0197 
0198   protected final void writeInt(long offset, int value) {
0199     Platform.putInt(getBuffer(), offset, value);
0200   }
0201 
0202   protected final void writeLong(long offset, long value) {
0203     Platform.putLong(getBuffer(), offset, value);
0204   }
0205 
0206   protected final void writeFloat(long offset, float value) {
0207     Platform.putFloat(getBuffer(), offset, value);
0208   }
0209 
0210   protected final void writeDouble(long offset, double value) {
0211     Platform.putDouble(getBuffer(), offset, value);
0212   }
0213 }