0001
0002
0003
0004
0005
0006
0007
0008
0009
0010
0011
0012
0013
0014
0015
0016
0017
0018 package org.apache.spark.util.sketch;
0019
0020 import java.io.*;
0021
0022 class BloomFilterImpl extends BloomFilter implements Serializable {
0023
0024 private int numHashFunctions;
0025
0026 private BitArray bits;
0027
0028 BloomFilterImpl(int numHashFunctions, long numBits) {
0029 this(new BitArray(numBits), numHashFunctions);
0030 }
0031
0032 private BloomFilterImpl(BitArray bits, int numHashFunctions) {
0033 this.bits = bits;
0034 this.numHashFunctions = numHashFunctions;
0035 }
0036
0037 private BloomFilterImpl() {}
0038
0039 @Override
0040 public boolean equals(Object other) {
0041 if (other == this) {
0042 return true;
0043 }
0044
0045 if (other == null || !(other instanceof BloomFilterImpl)) {
0046 return false;
0047 }
0048
0049 BloomFilterImpl that = (BloomFilterImpl) other;
0050
0051 return this.numHashFunctions == that.numHashFunctions && this.bits.equals(that.bits);
0052 }
0053
0054 @Override
0055 public int hashCode() {
0056 return bits.hashCode() * 31 + numHashFunctions;
0057 }
0058
0059 @Override
0060 public double expectedFpp() {
0061 return Math.pow((double) bits.cardinality() / bits.bitSize(), numHashFunctions);
0062 }
0063
0064 @Override
0065 public long bitSize() {
0066 return bits.bitSize();
0067 }
0068
0069 @Override
0070 public boolean put(Object item) {
0071 if (item instanceof String) {
0072 return putString((String) item);
0073 } else if (item instanceof byte[]) {
0074 return putBinary((byte[]) item);
0075 } else {
0076 return putLong(Utils.integralToLong(item));
0077 }
0078 }
0079
0080 @Override
0081 public boolean putString(String item) {
0082 return putBinary(Utils.getBytesFromUTF8String(item));
0083 }
0084
0085 @Override
0086 public boolean putBinary(byte[] item) {
0087 int h1 = Murmur3_x86_32.hashUnsafeBytes(item, Platform.BYTE_ARRAY_OFFSET, item.length, 0);
0088 int h2 = Murmur3_x86_32.hashUnsafeBytes(item, Platform.BYTE_ARRAY_OFFSET, item.length, h1);
0089
0090 long bitSize = bits.bitSize();
0091 boolean bitsChanged = false;
0092 for (int i = 1; i <= numHashFunctions; i++) {
0093 int combinedHash = h1 + (i * h2);
0094
0095 if (combinedHash < 0) {
0096 combinedHash = ~combinedHash;
0097 }
0098 bitsChanged |= bits.set(combinedHash % bitSize);
0099 }
0100 return bitsChanged;
0101 }
0102
0103 @Override
0104 public boolean mightContainString(String item) {
0105 return mightContainBinary(Utils.getBytesFromUTF8String(item));
0106 }
0107
0108 @Override
0109 public boolean mightContainBinary(byte[] item) {
0110 int h1 = Murmur3_x86_32.hashUnsafeBytes(item, Platform.BYTE_ARRAY_OFFSET, item.length, 0);
0111 int h2 = Murmur3_x86_32.hashUnsafeBytes(item, Platform.BYTE_ARRAY_OFFSET, item.length, h1);
0112
0113 long bitSize = bits.bitSize();
0114 for (int i = 1; i <= numHashFunctions; i++) {
0115 int combinedHash = h1 + (i * h2);
0116
0117 if (combinedHash < 0) {
0118 combinedHash = ~combinedHash;
0119 }
0120 if (!bits.get(combinedHash % bitSize)) {
0121 return false;
0122 }
0123 }
0124 return true;
0125 }
0126
0127 @Override
0128 public boolean putLong(long item) {
0129
0130
0131
0132
0133
0134 int h1 = Murmur3_x86_32.hashLong(item, 0);
0135 int h2 = Murmur3_x86_32.hashLong(item, h1);
0136
0137 long bitSize = bits.bitSize();
0138 boolean bitsChanged = false;
0139 for (int i = 1; i <= numHashFunctions; i++) {
0140 int combinedHash = h1 + (i * h2);
0141
0142 if (combinedHash < 0) {
0143 combinedHash = ~combinedHash;
0144 }
0145 bitsChanged |= bits.set(combinedHash % bitSize);
0146 }
0147 return bitsChanged;
0148 }
0149
0150 @Override
0151 public boolean mightContainLong(long item) {
0152 int h1 = Murmur3_x86_32.hashLong(item, 0);
0153 int h2 = Murmur3_x86_32.hashLong(item, h1);
0154
0155 long bitSize = bits.bitSize();
0156 for (int i = 1; i <= numHashFunctions; i++) {
0157 int combinedHash = h1 + (i * h2);
0158
0159 if (combinedHash < 0) {
0160 combinedHash = ~combinedHash;
0161 }
0162 if (!bits.get(combinedHash % bitSize)) {
0163 return false;
0164 }
0165 }
0166 return true;
0167 }
0168
0169 @Override
0170 public boolean mightContain(Object item) {
0171 if (item instanceof String) {
0172 return mightContainString((String) item);
0173 } else if (item instanceof byte[]) {
0174 return mightContainBinary((byte[]) item);
0175 } else {
0176 return mightContainLong(Utils.integralToLong(item));
0177 }
0178 }
0179
0180 @Override
0181 public boolean isCompatible(BloomFilter other) {
0182 if (other == null) {
0183 return false;
0184 }
0185
0186 if (!(other instanceof BloomFilterImpl)) {
0187 return false;
0188 }
0189
0190 BloomFilterImpl that = (BloomFilterImpl) other;
0191 return this.bitSize() == that.bitSize() && this.numHashFunctions == that.numHashFunctions;
0192 }
0193
0194 @Override
0195 public BloomFilter mergeInPlace(BloomFilter other) throws IncompatibleMergeException {
0196
0197 if (other == null) {
0198 throw new IncompatibleMergeException("Cannot merge null bloom filter");
0199 }
0200
0201 if (!(other instanceof BloomFilterImpl)) {
0202 throw new IncompatibleMergeException(
0203 "Cannot merge bloom filter of class " + other.getClass().getName()
0204 );
0205 }
0206
0207 BloomFilterImpl that = (BloomFilterImpl) other;
0208
0209 if (this.bitSize() != that.bitSize()) {
0210 throw new IncompatibleMergeException("Cannot merge bloom filters with different bit size");
0211 }
0212
0213 if (this.numHashFunctions != that.numHashFunctions) {
0214 throw new IncompatibleMergeException(
0215 "Cannot merge bloom filters with different number of hash functions"
0216 );
0217 }
0218
0219 this.bits.putAll(that.bits);
0220 return this;
0221 }
0222
0223 @Override
0224 public void writeTo(OutputStream out) throws IOException {
0225 DataOutputStream dos = new DataOutputStream(out);
0226
0227 dos.writeInt(Version.V1.getVersionNumber());
0228 dos.writeInt(numHashFunctions);
0229 bits.writeTo(dos);
0230 }
0231
0232 private void readFrom0(InputStream in) throws IOException {
0233 DataInputStream dis = new DataInputStream(in);
0234
0235 int version = dis.readInt();
0236 if (version != Version.V1.getVersionNumber()) {
0237 throw new IOException("Unexpected Bloom filter version number (" + version + ")");
0238 }
0239
0240 this.numHashFunctions = dis.readInt();
0241 this.bits = BitArray.readFrom(dis);
0242 }
0243
0244 public static BloomFilterImpl readFrom(InputStream in) throws IOException {
0245 BloomFilterImpl filter = new BloomFilterImpl();
0246 filter.readFrom0(in);
0247 return filter;
0248 }
0249
0250 private void writeObject(ObjectOutputStream out) throws IOException {
0251 writeTo(out);
0252 }
0253
0254 private void readObject(ObjectInputStream in) throws IOException {
0255 readFrom0(in);
0256 }
0257 }