Back to home page

OSCL-LXR

 
 

    


0001 /*
0002  * Licensed to the Apache Software Foundation (ASF) under one or more
0003  * contributor license agreements.  See the NOTICE file distributed with
0004  * this work for additional information regarding copyright ownership.
0005  * The ASF licenses this file to You under the Apache License, Version 2.0
0006  * (the "License"); you may not use this file except in compliance with
0007  * the License.  You may obtain a copy of the License at
0008  *
0009  *    http://www.apache.org/licenses/LICENSE-2.0
0010  *
0011  * Unless required by applicable law or agreed to in writing, software
0012  * distributed under the License is distributed on an "AS IS" BASIS,
0013  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
0014  * See the License for the specific language governing permissions and
0015  * limitations under the License.
0016  */
0017 
0018 package org.apache.spark.util.sketch;
0019 
0020 import java.io.*;
0021 
0022 class BloomFilterImpl extends BloomFilter implements Serializable {
0023 
0024   private int numHashFunctions;
0025 
0026   private BitArray bits;
0027 
0028   BloomFilterImpl(int numHashFunctions, long numBits) {
0029     this(new BitArray(numBits), numHashFunctions);
0030   }
0031 
0032   private BloomFilterImpl(BitArray bits, int numHashFunctions) {
0033     this.bits = bits;
0034     this.numHashFunctions = numHashFunctions;
0035   }
0036 
0037   private BloomFilterImpl() {}
0038 
0039   @Override
0040   public boolean equals(Object other) {
0041     if (other == this) {
0042       return true;
0043     }
0044 
0045     if (other == null || !(other instanceof BloomFilterImpl)) {
0046       return false;
0047     }
0048 
0049     BloomFilterImpl that = (BloomFilterImpl) other;
0050 
0051     return this.numHashFunctions == that.numHashFunctions && this.bits.equals(that.bits);
0052   }
0053 
0054   @Override
0055   public int hashCode() {
0056     return bits.hashCode() * 31 + numHashFunctions;
0057   }
0058 
0059   @Override
0060   public double expectedFpp() {
0061     return Math.pow((double) bits.cardinality() / bits.bitSize(), numHashFunctions);
0062   }
0063 
0064   @Override
0065   public long bitSize() {
0066     return bits.bitSize();
0067   }
0068 
0069   @Override
0070   public boolean put(Object item) {
0071     if (item instanceof String) {
0072       return putString((String) item);
0073     } else if (item instanceof byte[]) {
0074       return putBinary((byte[]) item);
0075     } else {
0076       return putLong(Utils.integralToLong(item));
0077     }
0078   }
0079 
0080   @Override
0081   public boolean putString(String item) {
0082     return putBinary(Utils.getBytesFromUTF8String(item));
0083   }
0084 
0085   @Override
0086   public boolean putBinary(byte[] item) {
0087     int h1 = Murmur3_x86_32.hashUnsafeBytes(item, Platform.BYTE_ARRAY_OFFSET, item.length, 0);
0088     int h2 = Murmur3_x86_32.hashUnsafeBytes(item, Platform.BYTE_ARRAY_OFFSET, item.length, h1);
0089 
0090     long bitSize = bits.bitSize();
0091     boolean bitsChanged = false;
0092     for (int i = 1; i <= numHashFunctions; i++) {
0093       int combinedHash = h1 + (i * h2);
0094       // Flip all the bits if it's negative (guaranteed positive number)
0095       if (combinedHash < 0) {
0096         combinedHash = ~combinedHash;
0097       }
0098       bitsChanged |= bits.set(combinedHash % bitSize);
0099     }
0100     return bitsChanged;
0101   }
0102 
0103   @Override
0104   public boolean mightContainString(String item) {
0105     return mightContainBinary(Utils.getBytesFromUTF8String(item));
0106   }
0107 
0108   @Override
0109   public boolean mightContainBinary(byte[] item) {
0110     int h1 = Murmur3_x86_32.hashUnsafeBytes(item, Platform.BYTE_ARRAY_OFFSET, item.length, 0);
0111     int h2 = Murmur3_x86_32.hashUnsafeBytes(item, Platform.BYTE_ARRAY_OFFSET, item.length, h1);
0112 
0113     long bitSize = bits.bitSize();
0114     for (int i = 1; i <= numHashFunctions; i++) {
0115       int combinedHash = h1 + (i * h2);
0116       // Flip all the bits if it's negative (guaranteed positive number)
0117       if (combinedHash < 0) {
0118         combinedHash = ~combinedHash;
0119       }
0120       if (!bits.get(combinedHash % bitSize)) {
0121         return false;
0122       }
0123     }
0124     return true;
0125   }
0126 
0127   @Override
0128   public boolean putLong(long item) {
0129     // Here we first hash the input long element into 2 int hash values, h1 and h2, then produce n
0130     // hash values by `h1 + i * h2` with 1 <= i <= numHashFunctions.
0131     // Note that `CountMinSketch` use a different strategy, it hash the input long element with
0132     // every i to produce n hash values.
0133     // TODO: the strategy of `CountMinSketch` looks more advanced, should we follow it here?
0134     int h1 = Murmur3_x86_32.hashLong(item, 0);
0135     int h2 = Murmur3_x86_32.hashLong(item, h1);
0136 
0137     long bitSize = bits.bitSize();
0138     boolean bitsChanged = false;
0139     for (int i = 1; i <= numHashFunctions; i++) {
0140       int combinedHash = h1 + (i * h2);
0141       // Flip all the bits if it's negative (guaranteed positive number)
0142       if (combinedHash < 0) {
0143         combinedHash = ~combinedHash;
0144       }
0145       bitsChanged |= bits.set(combinedHash % bitSize);
0146     }
0147     return bitsChanged;
0148   }
0149 
0150   @Override
0151   public boolean mightContainLong(long item) {
0152     int h1 = Murmur3_x86_32.hashLong(item, 0);
0153     int h2 = Murmur3_x86_32.hashLong(item, h1);
0154 
0155     long bitSize = bits.bitSize();
0156     for (int i = 1; i <= numHashFunctions; i++) {
0157       int combinedHash = h1 + (i * h2);
0158       // Flip all the bits if it's negative (guaranteed positive number)
0159       if (combinedHash < 0) {
0160         combinedHash = ~combinedHash;
0161       }
0162       if (!bits.get(combinedHash % bitSize)) {
0163         return false;
0164       }
0165     }
0166     return true;
0167   }
0168 
0169   @Override
0170   public boolean mightContain(Object item) {
0171     if (item instanceof String) {
0172       return mightContainString((String) item);
0173     } else if (item instanceof byte[]) {
0174       return mightContainBinary((byte[]) item);
0175     } else {
0176       return mightContainLong(Utils.integralToLong(item));
0177     }
0178   }
0179 
0180   @Override
0181   public boolean isCompatible(BloomFilter other) {
0182     if (other == null) {
0183       return false;
0184     }
0185 
0186     if (!(other instanceof BloomFilterImpl)) {
0187       return false;
0188     }
0189 
0190     BloomFilterImpl that = (BloomFilterImpl) other;
0191     return this.bitSize() == that.bitSize() && this.numHashFunctions == that.numHashFunctions;
0192   }
0193 
0194   @Override
0195   public BloomFilter mergeInPlace(BloomFilter other) throws IncompatibleMergeException {
0196     // Duplicates the logic of `isCompatible` here to provide better error message.
0197     if (other == null) {
0198       throw new IncompatibleMergeException("Cannot merge null bloom filter");
0199     }
0200 
0201     if (!(other instanceof BloomFilterImpl)) {
0202       throw new IncompatibleMergeException(
0203         "Cannot merge bloom filter of class " + other.getClass().getName()
0204       );
0205     }
0206 
0207     BloomFilterImpl that = (BloomFilterImpl) other;
0208 
0209     if (this.bitSize() != that.bitSize()) {
0210       throw new IncompatibleMergeException("Cannot merge bloom filters with different bit size");
0211     }
0212 
0213     if (this.numHashFunctions != that.numHashFunctions) {
0214       throw new IncompatibleMergeException(
0215         "Cannot merge bloom filters with different number of hash functions"
0216       );
0217     }
0218 
0219     this.bits.putAll(that.bits);
0220     return this;
0221   }
0222 
0223   @Override
0224   public void writeTo(OutputStream out) throws IOException {
0225     DataOutputStream dos = new DataOutputStream(out);
0226 
0227     dos.writeInt(Version.V1.getVersionNumber());
0228     dos.writeInt(numHashFunctions);
0229     bits.writeTo(dos);
0230   }
0231 
0232   private void readFrom0(InputStream in) throws IOException {
0233     DataInputStream dis = new DataInputStream(in);
0234 
0235     int version = dis.readInt();
0236     if (version != Version.V1.getVersionNumber()) {
0237       throw new IOException("Unexpected Bloom filter version number (" + version + ")");
0238     }
0239 
0240     this.numHashFunctions = dis.readInt();
0241     this.bits = BitArray.readFrom(dis);
0242   }
0243 
0244   public static BloomFilterImpl readFrom(InputStream in) throws IOException {
0245     BloomFilterImpl filter = new BloomFilterImpl();
0246     filter.readFrom0(in);
0247     return filter;
0248   }
0249 
0250   private void writeObject(ObjectOutputStream out) throws IOException {
0251     writeTo(out);
0252   }
0253 
0254   private void readObject(ObjectInputStream in) throws IOException {
0255     readFrom0(in);
0256   }
0257 }