Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.sql.catalyst.expressions;
0019 
0020 import java.io.*;
0021 import java.math.BigDecimal;
0022 import java.math.BigInteger;
0023 import java.nio.ByteBuffer;
0024 import java.util.Arrays;
0025 import java.util.Collections;
0026 import java.util.HashSet;
0027 import java.util.Set;
0028 
0029 import com.esotericsoftware.kryo.Kryo;
0030 import com.esotericsoftware.kryo.KryoSerializable;
0031 import com.esotericsoftware.kryo.io.Input;
0032 import com.esotericsoftware.kryo.io.Output;
0033 
0034 import org.apache.spark.sql.catalyst.InternalRow;
0035 import org.apache.spark.sql.types.*;
0036 import org.apache.spark.unsafe.Platform;
0037 import org.apache.spark.unsafe.array.ByteArrayMethods;
0038 import org.apache.spark.unsafe.bitset.BitSetMethods;
0039 import org.apache.spark.unsafe.hash.Murmur3_x86_32;
0040 import org.apache.spark.unsafe.types.CalendarInterval;
0041 import org.apache.spark.unsafe.types.UTF8String;
0042 
0043 import static org.apache.spark.sql.types.DataTypes.*;
0044 import static org.apache.spark.unsafe.Platform.BYTE_ARRAY_OFFSET;
0045 
0046 /**
0047  * An Unsafe implementation of Row which is backed by raw memory instead of Java objects.
0048  *
0049  * Each tuple has three parts: [null bit set] [values] [variable length portion]
0050  *
0051  * The bit set is used for null tracking and is aligned to 8-byte word boundaries.  It stores
0052  * one bit per field.
0053  *
0054  * In the `values` region, we store one 8-byte word per field. For fields that hold fixed-length
0055  * primitive types, such as long, double, or int, we store the value directly in the word. For
0056  * fields with non-primitive or variable-length values, we store a relative offset (w.r.t. the
0057  * base address of the row) that points to the beginning of the variable-length field, and length
0058  * (they are combined into a long).
0059  *
0060  * Instances of `UnsafeRow` act as pointers to row data stored in this format.
0061  */
0062 public final class UnsafeRow extends InternalRow implements Externalizable, KryoSerializable {
0063 
0064   public static final int WORD_SIZE = 8;
0065 
0066   //////////////////////////////////////////////////////////////////////////////
0067   // Static methods
0068   //////////////////////////////////////////////////////////////////////////////
0069 
0070   public static int calculateBitSetWidthInBytes(int numFields) {
0071     return ((numFields + 63)/ 64) * 8;
0072   }
0073 
0074   /**
0075    * Field types that can be updated in place in UnsafeRows (e.g. we support set() for these types)
0076    */
0077   public static final Set<DataType> mutableFieldTypes;
0078 
0079   // DecimalType is also mutable
0080   static {
0081     mutableFieldTypes = Collections.unmodifiableSet(
0082       new HashSet<>(
0083         Arrays.asList(new DataType[] {
0084           NullType,
0085           BooleanType,
0086           ByteType,
0087           ShortType,
0088           IntegerType,
0089           LongType,
0090           FloatType,
0091           DoubleType,
0092           DateType,
0093           TimestampType
0094         })));
0095   }
0096 
0097   public static boolean isFixedLength(DataType dt) {
0098     if (dt instanceof UserDefinedType) {
0099       return isFixedLength(((UserDefinedType) dt).sqlType());
0100     }
0101 
0102     if (dt instanceof DecimalType) {
0103       return ((DecimalType) dt).precision() <= Decimal.MAX_LONG_DIGITS();
0104     } else {
0105       return mutableFieldTypes.contains(dt);
0106     }
0107   }
0108 
0109   public static boolean isMutable(DataType dt) {
0110     if (dt instanceof UserDefinedType) {
0111       return isMutable(((UserDefinedType) dt).sqlType());
0112     }
0113 
0114     return mutableFieldTypes.contains(dt) || dt instanceof DecimalType ||
0115       dt instanceof CalendarIntervalType;
0116   }
0117 
0118   //////////////////////////////////////////////////////////////////////////////
0119   // Private fields and methods
0120   //////////////////////////////////////////////////////////////////////////////
0121 
0122   private Object baseObject;
0123   private long baseOffset;
0124 
0125   /** The number of fields in this row, used for calculating the bitset width (and in assertions) */
0126   private int numFields;
0127 
0128   /** The size of this row's backing data, in bytes) */
0129   private int sizeInBytes;
0130 
0131   /** The width of the null tracking bit set, in bytes */
0132   private int bitSetWidthInBytes;
0133 
0134   private long getFieldOffset(int ordinal) {
0135     return baseOffset + bitSetWidthInBytes + ordinal * 8L;
0136   }
0137 
0138   private void assertIndexIsValid(int index) {
0139     assert index >= 0 : "index (" + index + ") should >= 0";
0140     assert index < numFields : "index (" + index + ") should < " + numFields;
0141   }
0142 
0143   //////////////////////////////////////////////////////////////////////////////
0144   // Public methods
0145   //////////////////////////////////////////////////////////////////////////////
0146 
0147   /**
0148    * Construct a new UnsafeRow. The resulting row won't be usable until `pointTo()` has been called,
0149    * since the value returned by this constructor is equivalent to a null pointer.
0150    *
0151    * @param numFields the number of fields in this row
0152    */
0153   public UnsafeRow(int numFields) {
0154     this.numFields = numFields;
0155     this.bitSetWidthInBytes = calculateBitSetWidthInBytes(numFields);
0156   }
0157 
0158   // for serializer
0159   public UnsafeRow() {}
0160 
0161   public Object getBaseObject() { return baseObject; }
0162   public long getBaseOffset() { return baseOffset; }
0163   public int getSizeInBytes() { return sizeInBytes; }
0164 
0165   @Override
0166   public int numFields() { return numFields; }
0167 
0168   /**
0169    * Update this UnsafeRow to point to different backing data.
0170    *
0171    * @param baseObject the base object
0172    * @param baseOffset the offset within the base object
0173    * @param sizeInBytes the size of this row's backing data, in bytes
0174    */
0175   public void pointTo(Object baseObject, long baseOffset, int sizeInBytes) {
0176     assert numFields >= 0 : "numFields (" + numFields + ") should >= 0";
0177     assert sizeInBytes % 8 == 0 : "sizeInBytes (" + sizeInBytes + ") should be a multiple of 8";
0178     this.baseObject = baseObject;
0179     this.baseOffset = baseOffset;
0180     this.sizeInBytes = sizeInBytes;
0181   }
0182 
0183   /**
0184    * Update this UnsafeRow to point to the underlying byte array.
0185    *
0186    * @param buf byte array to point to
0187    * @param sizeInBytes the number of bytes valid in the byte array
0188    */
0189   public void pointTo(byte[] buf, int sizeInBytes) {
0190     pointTo(buf, Platform.BYTE_ARRAY_OFFSET, sizeInBytes);
0191   }
0192 
0193   public void setTotalSize(int sizeInBytes) {
0194     assert sizeInBytes % 8 == 0 : "sizeInBytes (" + sizeInBytes + ") should be a multiple of 8";
0195     this.sizeInBytes = sizeInBytes;
0196   }
0197 
0198   public void setNotNullAt(int i) {
0199     assertIndexIsValid(i);
0200     BitSetMethods.unset(baseObject, baseOffset, i);
0201   }
0202 
0203   @Override
0204   public void setNullAt(int i) {
0205     assertIndexIsValid(i);
0206     BitSetMethods.set(baseObject, baseOffset, i);
0207     // To preserve row equality, zero out the value when setting the column to null.
0208     // Since this row does not currently support updates to variable-length values, we don't
0209     // have to worry about zeroing out that data.
0210     Platform.putLong(baseObject, getFieldOffset(i), 0);
0211   }
0212 
0213   @Override
0214   public void update(int ordinal, Object value) {
0215     throw new UnsupportedOperationException();
0216   }
0217 
0218   @Override
0219   public void setInt(int ordinal, int value) {
0220     assertIndexIsValid(ordinal);
0221     setNotNullAt(ordinal);
0222     Platform.putInt(baseObject, getFieldOffset(ordinal), value);
0223   }
0224 
0225   @Override
0226   public void setLong(int ordinal, long value) {
0227     assertIndexIsValid(ordinal);
0228     setNotNullAt(ordinal);
0229     Platform.putLong(baseObject, getFieldOffset(ordinal), value);
0230   }
0231 
0232   @Override
0233   public void setDouble(int ordinal, double value) {
0234     assertIndexIsValid(ordinal);
0235     setNotNullAt(ordinal);
0236     Platform.putDouble(baseObject, getFieldOffset(ordinal), value);
0237   }
0238 
0239   @Override
0240   public void setBoolean(int ordinal, boolean value) {
0241     assertIndexIsValid(ordinal);
0242     setNotNullAt(ordinal);
0243     Platform.putBoolean(baseObject, getFieldOffset(ordinal), value);
0244   }
0245 
0246   @Override
0247   public void setShort(int ordinal, short value) {
0248     assertIndexIsValid(ordinal);
0249     setNotNullAt(ordinal);
0250     Platform.putShort(baseObject, getFieldOffset(ordinal), value);
0251   }
0252 
0253   @Override
0254   public void setByte(int ordinal, byte value) {
0255     assertIndexIsValid(ordinal);
0256     setNotNullAt(ordinal);
0257     Platform.putByte(baseObject, getFieldOffset(ordinal), value);
0258   }
0259 
0260   @Override
0261   public void setFloat(int ordinal, float value) {
0262     assertIndexIsValid(ordinal);
0263     setNotNullAt(ordinal);
0264     Platform.putFloat(baseObject, getFieldOffset(ordinal), value);
0265   }
0266 
0267   /**
0268    * Updates the decimal column.
0269    *
0270    * Note: In order to support update a decimal with precision > 18, CAN NOT call
0271    * setNullAt() for this column.
0272    */
0273   @Override
0274   public void setDecimal(int ordinal, Decimal value, int precision) {
0275     assertIndexIsValid(ordinal);
0276     if (precision <= Decimal.MAX_LONG_DIGITS()) {
0277       // compact format
0278       if (value == null) {
0279         setNullAt(ordinal);
0280       } else {
0281         setLong(ordinal, value.toUnscaledLong());
0282       }
0283     } else {
0284       // fixed length
0285       long cursor = getLong(ordinal) >>> 32;
0286       assert cursor > 0 : "invalid cursor " + cursor;
0287       // zero-out the bytes
0288       Platform.putLong(baseObject, baseOffset + cursor, 0L);
0289       Platform.putLong(baseObject, baseOffset + cursor + 8, 0L);
0290 
0291       if (value == null) {
0292         setNullAt(ordinal);
0293         // keep the offset for future update
0294         Platform.putLong(baseObject, getFieldOffset(ordinal), cursor << 32);
0295       } else {
0296 
0297         final BigInteger integer = value.toJavaBigDecimal().unscaledValue();
0298         byte[] bytes = integer.toByteArray();
0299         assert(bytes.length <= 16);
0300 
0301         // Write the bytes to the variable length portion.
0302         Platform.copyMemory(
0303           bytes, Platform.BYTE_ARRAY_OFFSET, baseObject, baseOffset + cursor, bytes.length);
0304         setLong(ordinal, (cursor << 32) | ((long) bytes.length));
0305       }
0306     }
0307   }
0308 
0309   @Override
0310   public void setInterval(int ordinal, CalendarInterval value) {
0311     assertIndexIsValid(ordinal);
0312     long cursor = getLong(ordinal) >>> 32;
0313     assert cursor > 0 : "invalid cursor " + cursor;
0314     if (value == null) {
0315       setNullAt(ordinal);
0316       // zero-out the bytes
0317       Platform.putLong(baseObject, baseOffset + cursor, 0L);
0318       Platform.putLong(baseObject, baseOffset + cursor + 8, 0L);
0319       // keep the offset for future update
0320       Platform.putLong(baseObject, getFieldOffset(ordinal), (cursor << 32) | 16L);
0321     } else {
0322       Platform.putInt(baseObject, baseOffset + cursor, value.months);
0323       Platform.putInt(baseObject, baseOffset + cursor + 4, value.days);
0324       Platform.putLong(baseObject, baseOffset + cursor + 8, value.microseconds);
0325       setLong(ordinal, (cursor << 32) | 16L);
0326     }
0327   }
0328 
0329   @Override
0330   public Object get(int ordinal, DataType dataType) {
0331     return SpecializedGettersReader.read(this, ordinal, dataType, true, true);
0332   }
0333 
0334   @Override
0335   public boolean isNullAt(int ordinal) {
0336     assertIndexIsValid(ordinal);
0337     return BitSetMethods.isSet(baseObject, baseOffset, ordinal);
0338   }
0339 
0340   @Override
0341   public boolean getBoolean(int ordinal) {
0342     assertIndexIsValid(ordinal);
0343     return Platform.getBoolean(baseObject, getFieldOffset(ordinal));
0344   }
0345 
0346   @Override
0347   public byte getByte(int ordinal) {
0348     assertIndexIsValid(ordinal);
0349     return Platform.getByte(baseObject, getFieldOffset(ordinal));
0350   }
0351 
0352   @Override
0353   public short getShort(int ordinal) {
0354     assertIndexIsValid(ordinal);
0355     return Platform.getShort(baseObject, getFieldOffset(ordinal));
0356   }
0357 
0358   @Override
0359   public int getInt(int ordinal) {
0360     assertIndexIsValid(ordinal);
0361     return Platform.getInt(baseObject, getFieldOffset(ordinal));
0362   }
0363 
0364   @Override
0365   public long getLong(int ordinal) {
0366     assertIndexIsValid(ordinal);
0367     return Platform.getLong(baseObject, getFieldOffset(ordinal));
0368   }
0369 
0370   @Override
0371   public float getFloat(int ordinal) {
0372     assertIndexIsValid(ordinal);
0373     return Platform.getFloat(baseObject, getFieldOffset(ordinal));
0374   }
0375 
0376   @Override
0377   public double getDouble(int ordinal) {
0378     assertIndexIsValid(ordinal);
0379     return Platform.getDouble(baseObject, getFieldOffset(ordinal));
0380   }
0381 
0382   @Override
0383   public Decimal getDecimal(int ordinal, int precision, int scale) {
0384     if (isNullAt(ordinal)) {
0385       return null;
0386     }
0387     if (precision <= Decimal.MAX_LONG_DIGITS()) {
0388       return Decimal.createUnsafe(getLong(ordinal), precision, scale);
0389     } else {
0390       byte[] bytes = getBinary(ordinal);
0391       BigInteger bigInteger = new BigInteger(bytes);
0392       BigDecimal javaDecimal = new BigDecimal(bigInteger, scale);
0393       return Decimal.apply(javaDecimal, precision, scale);
0394     }
0395   }
0396 
0397   @Override
0398   public UTF8String getUTF8String(int ordinal) {
0399     if (isNullAt(ordinal)) return null;
0400     final long offsetAndSize = getLong(ordinal);
0401     final int offset = (int) (offsetAndSize >> 32);
0402     final int size = (int) offsetAndSize;
0403     return UTF8String.fromAddress(baseObject, baseOffset + offset, size);
0404   }
0405 
0406   @Override
0407   public byte[] getBinary(int ordinal) {
0408     if (isNullAt(ordinal)) {
0409       return null;
0410     } else {
0411       final long offsetAndSize = getLong(ordinal);
0412       final int offset = (int) (offsetAndSize >> 32);
0413       final int size = (int) offsetAndSize;
0414       final byte[] bytes = new byte[size];
0415       Platform.copyMemory(
0416         baseObject,
0417         baseOffset + offset,
0418         bytes,
0419         Platform.BYTE_ARRAY_OFFSET,
0420         size
0421       );
0422       return bytes;
0423     }
0424   }
0425 
0426   @Override
0427   public CalendarInterval getInterval(int ordinal) {
0428     if (isNullAt(ordinal)) {
0429       return null;
0430     } else {
0431       final long offsetAndSize = getLong(ordinal);
0432       final int offset = (int) (offsetAndSize >> 32);
0433       final int months = Platform.getInt(baseObject, baseOffset + offset);
0434       final int days = Platform.getInt(baseObject, baseOffset + offset + 4);
0435       final long microseconds = Platform.getLong(baseObject, baseOffset + offset + 8);
0436       return new CalendarInterval(months, days, microseconds);
0437     }
0438   }
0439 
0440   @Override
0441   public UnsafeRow getStruct(int ordinal, int numFields) {
0442     if (isNullAt(ordinal)) {
0443       return null;
0444     } else {
0445       final long offsetAndSize = getLong(ordinal);
0446       final int offset = (int) (offsetAndSize >> 32);
0447       final int size = (int) offsetAndSize;
0448       final UnsafeRow row = new UnsafeRow(numFields);
0449       row.pointTo(baseObject, baseOffset + offset, size);
0450       return row;
0451     }
0452   }
0453 
0454   @Override
0455   public UnsafeArrayData getArray(int ordinal) {
0456     if (isNullAt(ordinal)) {
0457       return null;
0458     } else {
0459       final long offsetAndSize = getLong(ordinal);
0460       final int offset = (int) (offsetAndSize >> 32);
0461       final int size = (int) offsetAndSize;
0462       final UnsafeArrayData array = new UnsafeArrayData();
0463       array.pointTo(baseObject, baseOffset + offset, size);
0464       return array;
0465     }
0466   }
0467 
0468   @Override
0469   public UnsafeMapData getMap(int ordinal) {
0470     if (isNullAt(ordinal)) {
0471       return null;
0472     } else {
0473       final long offsetAndSize = getLong(ordinal);
0474       final int offset = (int) (offsetAndSize >> 32);
0475       final int size = (int) offsetAndSize;
0476       final UnsafeMapData map = new UnsafeMapData();
0477       map.pointTo(baseObject, baseOffset + offset, size);
0478       return map;
0479     }
0480   }
0481 
0482   /**
0483    * Copies this row, returning a self-contained UnsafeRow that stores its data in an internal
0484    * byte array rather than referencing data stored in a data page.
0485    */
0486   @Override
0487   public UnsafeRow copy() {
0488     UnsafeRow rowCopy = new UnsafeRow(numFields);
0489     final byte[] rowDataCopy = new byte[sizeInBytes];
0490     Platform.copyMemory(
0491       baseObject,
0492       baseOffset,
0493       rowDataCopy,
0494       Platform.BYTE_ARRAY_OFFSET,
0495       sizeInBytes
0496     );
0497     rowCopy.pointTo(rowDataCopy, Platform.BYTE_ARRAY_OFFSET, sizeInBytes);
0498     return rowCopy;
0499   }
0500 
0501   /**
0502    * Creates an empty UnsafeRow from a byte array with specified numBytes and numFields.
0503    * The returned row is invalid until we call copyFrom on it.
0504    */
0505   public static UnsafeRow createFromByteArray(int numBytes, int numFields) {
0506     final UnsafeRow row = new UnsafeRow(numFields);
0507     row.pointTo(new byte[numBytes], numBytes);
0508     return row;
0509   }
0510 
0511   /**
0512    * Copies the input UnsafeRow to this UnsafeRow, and resize the underlying byte[] when the
0513    * input row is larger than this row.
0514    */
0515   public void copyFrom(UnsafeRow row) {
0516     // copyFrom is only available for UnsafeRow created from byte array.
0517     assert (baseObject instanceof byte[]) && baseOffset == Platform.BYTE_ARRAY_OFFSET;
0518     if (row.sizeInBytes > this.sizeInBytes) {
0519       // resize the underlying byte[] if it's not large enough.
0520       this.baseObject = new byte[row.sizeInBytes];
0521     }
0522     Platform.copyMemory(
0523       row.baseObject, row.baseOffset, this.baseObject, this.baseOffset, row.sizeInBytes);
0524     // update the sizeInBytes.
0525     this.sizeInBytes = row.sizeInBytes;
0526   }
0527 
0528   /**
0529    * Write this UnsafeRow's underlying bytes to the given OutputStream.
0530    *
0531    * @param out the stream to write to.
0532    * @param writeBuffer a byte array for buffering chunks of off-heap data while writing to the
0533    *                    output stream. If this row is backed by an on-heap byte array, then this
0534    *                    buffer will not be used and may be null.
0535    */
0536   public void writeToStream(OutputStream out, byte[] writeBuffer) throws IOException {
0537     if (baseObject instanceof byte[]) {
0538       int offsetInByteArray = (int) (baseOffset - Platform.BYTE_ARRAY_OFFSET);
0539       out.write((byte[]) baseObject, offsetInByteArray, sizeInBytes);
0540     } else {
0541       int dataRemaining = sizeInBytes;
0542       long rowReadPosition = baseOffset;
0543       while (dataRemaining > 0) {
0544         int toTransfer = Math.min(writeBuffer.length, dataRemaining);
0545         Platform.copyMemory(
0546           baseObject, rowReadPosition, writeBuffer, Platform.BYTE_ARRAY_OFFSET, toTransfer);
0547         out.write(writeBuffer, 0, toTransfer);
0548         rowReadPosition += toTransfer;
0549         dataRemaining -= toTransfer;
0550       }
0551     }
0552   }
0553 
0554   @Override
0555   public int hashCode() {
0556     return Murmur3_x86_32.hashUnsafeWords(baseObject, baseOffset, sizeInBytes, 42);
0557   }
0558 
0559   @Override
0560   public boolean equals(Object other) {
0561     if (other instanceof UnsafeRow) {
0562       UnsafeRow o = (UnsafeRow) other;
0563       return (sizeInBytes == o.sizeInBytes) &&
0564         ByteArrayMethods.arrayEquals(baseObject, baseOffset, o.baseObject, o.baseOffset,
0565           sizeInBytes);
0566     }
0567     return false;
0568   }
0569 
0570   /**
0571    * Returns the underlying bytes for this UnsafeRow.
0572    */
0573   public byte[] getBytes() {
0574     return UnsafeDataUtils.getBytes(baseObject, baseOffset, sizeInBytes);
0575   }
0576 
0577   // This is for debugging
0578   @Override
0579   public String toString() {
0580     StringBuilder build = new StringBuilder("[");
0581     for (int i = 0; i < sizeInBytes; i += 8) {
0582       if (i != 0) build.append(',');
0583       build.append(java.lang.Long.toHexString(Platform.getLong(baseObject, baseOffset + i)));
0584     }
0585     build.append(']');
0586     return build.toString();
0587   }
0588 
0589   @Override
0590   public boolean anyNull() {
0591     return BitSetMethods.anySet(baseObject, baseOffset, bitSetWidthInBytes / 8);
0592   }
0593 
0594   /**
0595    * Writes the content of this row into a memory address, identified by an object and an offset.
0596    * The target memory address must already been allocated, and have enough space to hold all the
0597    * bytes in this string.
0598    */
0599   public void writeToMemory(Object target, long targetOffset) {
0600     Platform.copyMemory(baseObject, baseOffset, target, targetOffset, sizeInBytes);
0601   }
0602 
0603   public void writeTo(ByteBuffer buffer) {
0604     assert (buffer.hasArray());
0605     byte[] target = buffer.array();
0606     int offset = buffer.arrayOffset();
0607     int pos = buffer.position();
0608     writeToMemory(target, Platform.BYTE_ARRAY_OFFSET + offset + pos);
0609     buffer.position(pos + sizeInBytes);
0610   }
0611 
0612   /**
0613    * Write the bytes of var-length field into ByteBuffer
0614    *
0615    * Note: only work with HeapByteBuffer
0616    */
0617   public void writeFieldTo(int ordinal, ByteBuffer buffer) {
0618     final long offsetAndSize = getLong(ordinal);
0619     final int offset = (int) (offsetAndSize >> 32);
0620     final int size = (int) offsetAndSize;
0621 
0622     buffer.putInt(size);
0623     int pos = buffer.position();
0624     buffer.position(pos + size);
0625     Platform.copyMemory(
0626       baseObject,
0627       baseOffset + offset,
0628       buffer.array(),
0629       Platform.BYTE_ARRAY_OFFSET + buffer.arrayOffset() + pos,
0630       size);
0631   }
0632 
0633   @Override
0634   public void writeExternal(ObjectOutput out) throws IOException {
0635     byte[] bytes = getBytes();
0636     out.writeInt(bytes.length);
0637     out.writeInt(this.numFields);
0638     out.write(bytes);
0639   }
0640 
0641   @Override
0642   public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
0643     this.baseOffset = BYTE_ARRAY_OFFSET;
0644     this.sizeInBytes = in.readInt();
0645     this.numFields = in.readInt();
0646     this.bitSetWidthInBytes = calculateBitSetWidthInBytes(numFields);
0647     this.baseObject = new byte[sizeInBytes];
0648     in.readFully((byte[]) baseObject);
0649   }
0650 
0651   @Override
0652   public void write(Kryo kryo, Output out) {
0653     byte[] bytes = getBytes();
0654     out.writeInt(bytes.length);
0655     out.writeInt(this.numFields);
0656     out.write(bytes);
0657   }
0658 
0659   @Override
0660   public void read(Kryo kryo, Input in) {
0661     this.baseOffset = BYTE_ARRAY_OFFSET;
0662     this.sizeInBytes = in.readInt();
0663     this.numFields = in.readInt();
0664     this.bitSetWidthInBytes = calculateBitSetWidthInBytes(numFields);
0665     this.baseObject = new byte[sizeInBytes];
0666     in.read((byte[]) baseObject);
0667   }
0668 }