Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.shuffle.sort;
0019 
0020 /**
0021  * Wrapper around an 8-byte word that holds a 24-bit partition number and 40-bit record pointer.
0022  * <p>
0023  * Within the long, the data is laid out as follows:
0024  * <pre>
0025  *   [24 bit partition number][13 bit memory page number][27 bit offset in page]
0026  * </pre>
0027  * This implies that the maximum addressable page size is 2^27 bits = 128 megabytes, assuming that
0028  * our offsets in pages are not 8-byte-word-aligned. Since we have 2^13 pages (based off the
0029  * 13-bit page numbers assigned by {@link org.apache.spark.memory.TaskMemoryManager}), this
0030  * implies that we can address 2^13 * 128 megabytes = 1 terabyte of RAM per task.
0031  * <p>
0032  * Assuming word-alignment would allow for a 1 gigabyte maximum page size, but we leave this
0033  * optimization to future work as it will require more careful design to ensure that addresses are
0034  * properly aligned (e.g. by padding records).
0035  */
0036 final class PackedRecordPointer {
0037 
0038   static final int MAXIMUM_PAGE_SIZE_BYTES = 1 << 27;  // 128 megabytes
0039 
0040   /**
0041    * The maximum partition identifier that can be encoded. Note that partition ids start from 0.
0042    */
0043   static final int MAXIMUM_PARTITION_ID = (1 << 24) - 1;  // 16777215
0044 
0045   /**
0046    * The index of the first byte of the partition id, counting from the least significant byte.
0047    */
0048   static final int PARTITION_ID_START_BYTE_INDEX = 5;
0049 
0050   /**
0051    * The index of the last byte of the partition id, counting from the least significant byte.
0052    */
0053   static final int PARTITION_ID_END_BYTE_INDEX = 7;
0054 
0055   /** Bit mask for the lower 40 bits of a long. */
0056   private static final long MASK_LONG_LOWER_40_BITS = (1L << 40) - 1;
0057 
0058   /** Bit mask for the upper 24 bits of a long */
0059   private static final long MASK_LONG_UPPER_24_BITS = ~MASK_LONG_LOWER_40_BITS;
0060 
0061   /** Bit mask for the lower 27 bits of a long. */
0062   private static final long MASK_LONG_LOWER_27_BITS = (1L << 27) - 1;
0063 
0064   /** Bit mask for the lower 51 bits of a long. */
0065   private static final long MASK_LONG_LOWER_51_BITS = (1L << 51) - 1;
0066 
0067   /** Bit mask for the upper 13 bits of a long */
0068   private static final long MASK_LONG_UPPER_13_BITS = ~MASK_LONG_LOWER_51_BITS;
0069 
0070   /**
0071    * Pack a record address and partition id into a single word.
0072    *
0073    * @param recordPointer a record pointer encoded by TaskMemoryManager.
0074    * @param partitionId a shuffle partition id (maximum value of 2^24).
0075    * @return a packed pointer that can be decoded using the {@link PackedRecordPointer} class.
0076    */
0077   public static long packPointer(long recordPointer, int partitionId) {
0078     assert (partitionId <= MAXIMUM_PARTITION_ID);
0079     // Note that without word alignment we can address 2^27 bytes = 128 megabytes per page.
0080     // Also note that this relies on some internals of how TaskMemoryManager encodes its addresses.
0081     final long pageNumber = (recordPointer & MASK_LONG_UPPER_13_BITS) >>> 24;
0082     final long compressedAddress = pageNumber | (recordPointer & MASK_LONG_LOWER_27_BITS);
0083     return (((long) partitionId) << 40) | compressedAddress;
0084   }
0085 
0086   private long packedRecordPointer;
0087 
0088   public void set(long packedRecordPointer) {
0089     this.packedRecordPointer = packedRecordPointer;
0090   }
0091 
0092   public int getPartitionId() {
0093     return (int) ((packedRecordPointer & MASK_LONG_UPPER_24_BITS) >>> 40);
0094   }
0095 
0096   public long getRecordPointer() {
0097     final long pageNumber = (packedRecordPointer << 24) & MASK_LONG_UPPER_13_BITS;
0098     final long offsetInPage = packedRecordPointer & MASK_LONG_LOWER_27_BITS;
0099     return pageNumber | offsetInPage;
0100   }
0101 
0102 }