Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.sql.catalyst.expressions;
0019 
0020 import java.io.Externalizable;
0021 import java.io.IOException;
0022 import java.io.ObjectInput;
0023 import java.io.ObjectOutput;
0024 import java.nio.ByteBuffer;
0025 
0026 import com.esotericsoftware.kryo.Kryo;
0027 import com.esotericsoftware.kryo.KryoSerializable;
0028 import com.esotericsoftware.kryo.io.Input;
0029 import com.esotericsoftware.kryo.io.Output;
0030 
0031 import org.apache.spark.sql.catalyst.util.MapData;
0032 import org.apache.spark.unsafe.Platform;
0033 
0034 import static org.apache.spark.unsafe.Platform.BYTE_ARRAY_OFFSET;
0035 
0036 /**
0037  * An Unsafe implementation of Map which is backed by raw memory instead of Java objects.
0038  *
0039  * Currently we just use 2 UnsafeArrayData to represent UnsafeMapData, with extra 8 bytes at head
0040  * to indicate the number of bytes of the unsafe key array.
0041  * [unsafe key array numBytes] [unsafe key array] [unsafe value array]
0042  *
0043  * Note that, user is responsible to guarantee that the key array does not have duplicated
0044  * elements, otherwise the behavior is undefined.
0045  */
0046 // TODO: Use a more efficient format which doesn't depend on unsafe array.
0047 public final class UnsafeMapData extends MapData implements Externalizable, KryoSerializable {
0048 
0049   private Object baseObject;
0050   private long baseOffset;
0051 
0052   // The size of this map's backing data, in bytes.
0053   // The 4-bytes header of key array `numBytes` is also included, so it's actually equal to
0054   // 4 + key array numBytes + value array numBytes.
0055   private int sizeInBytes;
0056 
0057   public Object getBaseObject() { return baseObject; }
0058   public long getBaseOffset() { return baseOffset; }
0059   public int getSizeInBytes() { return sizeInBytes; }
0060 
0061   private final UnsafeArrayData keys;
0062   private final UnsafeArrayData values;
0063 
0064   /**
0065    * Construct a new UnsafeMapData. The resulting UnsafeMapData won't be usable until
0066    * `pointTo()` has been called, since the value returned by this constructor is equivalent
0067    * to a null pointer.
0068    */
0069   public UnsafeMapData() {
0070     keys = new UnsafeArrayData();
0071     values = new UnsafeArrayData();
0072   }
0073 
0074   /**
0075    * Update this UnsafeMapData to point to different backing data.
0076    *
0077    * @param baseObject the base object
0078    * @param baseOffset the offset within the base object
0079    * @param sizeInBytes the size of this map's backing data, in bytes
0080    */
0081   public void pointTo(Object baseObject, long baseOffset, int sizeInBytes) {
0082     // Read the numBytes of key array from the first 8 bytes.
0083     final long keyArraySize = Platform.getLong(baseObject, baseOffset);
0084     assert keyArraySize >= 0 : "keyArraySize (" + keyArraySize + ") should >= 0";
0085     assert keyArraySize <= Integer.MAX_VALUE :
0086       "keyArraySize (" + keyArraySize + ") should <= Integer.MAX_VALUE";
0087     final int valueArraySize = sizeInBytes - (int)keyArraySize - 8;
0088     assert valueArraySize >= 0 : "valueArraySize (" + valueArraySize + ") should >= 0";
0089 
0090     keys.pointTo(baseObject, baseOffset + 8, (int)keyArraySize);
0091     values.pointTo(baseObject, baseOffset + 8 + keyArraySize, valueArraySize);
0092 
0093     assert keys.numElements() == values.numElements();
0094 
0095     this.baseObject = baseObject;
0096     this.baseOffset = baseOffset;
0097     this.sizeInBytes = sizeInBytes;
0098   }
0099 
0100   @Override
0101   public int numElements() {
0102     return keys.numElements();
0103   }
0104 
0105   @Override
0106   public UnsafeArrayData keyArray() {
0107     return keys;
0108   }
0109 
0110   @Override
0111   public UnsafeArrayData valueArray() {
0112     return values;
0113   }
0114 
0115   public void writeToMemory(Object target, long targetOffset) {
0116     Platform.copyMemory(baseObject, baseOffset, target, targetOffset, sizeInBytes);
0117   }
0118 
0119   public void writeTo(ByteBuffer buffer) {
0120     assert(buffer.hasArray());
0121     byte[] target = buffer.array();
0122     int offset = buffer.arrayOffset();
0123     int pos = buffer.position();
0124     writeToMemory(target, Platform.BYTE_ARRAY_OFFSET + offset + pos);
0125     buffer.position(pos + sizeInBytes);
0126   }
0127 
0128   @Override
0129   public UnsafeMapData copy() {
0130     UnsafeMapData mapCopy = new UnsafeMapData();
0131     final byte[] mapDataCopy = new byte[sizeInBytes];
0132     Platform.copyMemory(
0133       baseObject, baseOffset, mapDataCopy, Platform.BYTE_ARRAY_OFFSET, sizeInBytes);
0134     mapCopy.pointTo(mapDataCopy, Platform.BYTE_ARRAY_OFFSET, sizeInBytes);
0135     return mapCopy;
0136   }
0137 
0138   @Override
0139   public void writeExternal(ObjectOutput out) throws IOException {
0140     byte[] bytes = UnsafeDataUtils.getBytes(baseObject, baseOffset, sizeInBytes);
0141     out.writeInt(bytes.length);
0142     out.write(bytes);
0143   }
0144 
0145   @Override
0146   public void readExternal(ObjectInput in) throws IOException {
0147     this.baseOffset = BYTE_ARRAY_OFFSET;
0148     this.sizeInBytes = in.readInt();
0149     this.baseObject = new byte[sizeInBytes];
0150     in.readFully((byte[]) baseObject);
0151     pointTo(baseObject, baseOffset, sizeInBytes);
0152   }
0153 
0154   @Override
0155   public void write(Kryo kryo, Output output) {
0156     byte[] bytes = UnsafeDataUtils.getBytes(baseObject, baseOffset, sizeInBytes);
0157     output.writeInt(bytes.length);
0158     output.write(bytes);
0159   }
0160 
0161   @Override
0162   public void read(Kryo kryo, Input input) {
0163     this.baseOffset = BYTE_ARRAY_OFFSET;
0164     this.sizeInBytes = input.readInt();
0165     this.baseObject = new byte[sizeInBytes];
0166     input.read((byte[]) baseObject);
0167     pointTo(baseObject, baseOffset, sizeInBytes);
0168   }
0169 }