本文源地址: http://www.fullstackyang.com/...,转发请注明该地址或segmentfault地址,谢谢!
目前已经有相应实现的开源类库,如Google的Guava类库,Twitter的Algebird类库,和ScalaNLP breeze等等,其中Guava 11.0版本中增加了BloomFilter类,它使用了Funnel和Sink的设计,增强了泛化的能力,使其可以支持任何数据类型,其利用murmur3 hash来做哈希映射函数,不过它底层并没有使用传统的java.util.BitSet来做bit数组,而是用long型数组进行了重新封装,大部分操作均基于位的运算,因此能达到一个非常好的性能;下面我们就Guava类库中实现布隆过滤器的源码作详细分析,最后出于灵活性和解耦等因素的考虑,我们想要把布隆过滤器从JVM中拿出来,于是利用了Redis自带的Bitmaps作为底层的bit数组进行重构,另外随着插入的元素越来越多,当实际数量远远大于创建时设置的预计数量时,布隆过滤器的误判率会越来越高,因此在重构的过程中增加了自动扩容的特性,最后通过测试验证其正确性。
/** The bit set of the BloomFilter (not necessarily power of 2!) */
private final BitArray bits;
/** Number of hashes per element */
private final int numHashFunctions;
/** The funnel to translate Ts to bytes */
private final Funnel<? super T> funnel;
* The strategy we employ to map an element T to {@code numHashFunctions} bit indexes.
private final Strategy strategy;
interface Strategy extends java.io.Serializable {
* Sets {@code numHashFunctions} bits of the given bit array, by hashing a user element.
* <p>Returns whether any bits changed as a result of this operation.
<T> boolean put(T object, Funnel<? super T> funnel, int numHashFunctions, BitArray bits);
* Queries {@code numHashFunctions} bits of the given bit array, by hashing a user element;
* returns {@code true} if and only if all selected bits are set.
<T> boolean mightContain(
T object, Funnel<? super T> funnel, int numHashFunctions, BitArray bits);
* Identifier used to encode this strategy, when marshalled as part of a BloomFilter. Only
* values in the [-128, 127] range are valid for the compact serial form. Non-negative values
* are reserved for enums defined in BloomFilterStrategies; negative values are reserved for any
* custom, stateful strategy we may define (e.g. any kind of strategy that would depend on user
* input).
int ordinal();
static <T> BloomFilter<T> create(
Funnel<? super T> funnel, long expectedInsertions, double fpp, Strategy strategy) {
expectedInsertions >= 0, "Expected insertions (%s) must be >= 0", expectedInsertions);
checkArgument(fpp > 0.0, "False positive probability (%s) must be > 0.0", fpp);
checkArgument(fpp < 1.0, "False positive probability (%s) must be < 1.0", fpp);
if (expectedInsertions == 0) {
expectedInsertions = 1;
* TODO(user): Put a warning in the javadoc about tiny fpp values, since the resulting size
* is proportional to -log(p), but there is not much of a point after all, e.g.
* optimalM(1000, 0.0000000000000001) = 76680 which is less than 10kb. Who cares!
long numBits = optimalNumOfBits(expectedInsertions, fpp);
int numHashFunctions = optimalNumOfHashFunctions(expectedInsertions, numBits);
try {
return new BloomFilter<T>(new BitArray(numBits), numHashFunctions, funnel, strategy);
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Could not create BloomFilter of " + numBits + " bits", e);
static long optimalNumOfBits(long n, double p) {
if (p == 0) {
p = Double.MIN_VALUE;
return (long) (-n * Math.log(p) / (Math.log(2) * Math.log(2)));
static int optimalNumOfHashFunctions(long n, long m) {
// (m / n) * log(2), but avoid truncation due to division!
return Math.max(1, (int) Math.round((double) m / n * Math.log(2)));
接着再来看一下BloomFilterStrategies类,首先它是实现了BloomFilter.Strategy 接口的一个枚举类,其次它有两个2枚举值,MURMUR128_MITZ_32和MURMUR128_MITZ_64,分别对应了32位哈希映射函数,和64位哈希映射函数,后者使用了murmur3 hash生成的所有128位,具有更大的空间,不过原理是相通的,我们选择默认的MURMUR128_MITZ_64来分析:
MURMUR128_MITZ_64() {
public <T> boolean put(
T object, Funnel<? super T> funnel, int numHashFunctions, BitArray bits) {
long bitSize = bits.bitSize();
byte[] bytes = Hashing.murmur3_128().hashObject(object, funnel).getBytesInternal();
long hash1 = lowerEight(bytes);
long hash2 = upperEight(bytes);
boolean bitsChanged = false;
long combinedHash = hash1;
for (int i = 0; i < numHashFunctions; i++) {
// Make the combined hash positive and indexable
bitsChanged |= bits.set((combinedHash & Long.MAX_VALUE) % bitSize);
combinedHash += hash2;
return bitsChanged;
public <T> boolean mightContain(
T object, Funnel<? super T> funnel, int numHashFunctions, BitArray bits) {
long bitSize = bits.bitSize();
byte[] bytes = Hashing.murmur3_128().hashObject(object, funnel).getBytesInternal();
long hash1 = lowerEight(bytes);
long hash2 = upperEight(bytes);
long combinedHash = hash1;
for (int i = 0; i < numHashFunctions; i++) {
// Make the combined hash positive and indexable
if (!bits.get((combinedHash & Long.MAX_VALUE) % bitSize)) {
return false;
combinedHash += hash2;
return true;
抽象来看,put是写,mightContain是读,两个方法的代码有一点相似,都是先利用murmur3 hash对输入的funnel计算得到128位的字节数组,然后高低分别取8个字节(64位)创建2个long型整数hash1,hash2作为哈希值。循环体内采用了2个函数模拟其他函数的思想,即上文提到的gi(x) = h1(x) + ih2(x) ,这相当于每次累加hash2,然后通过基于bitSize取模的方式在bit数组中索引。
mod y=x-y[x/y](向下取整),所以-5 mod 3=
static final class BitArray {
final long[] data;
long bitCount;
BitArray(long bits) {
this(new long[Ints.checkedCast(LongMath.divide(bits, 64, RoundingMode.CEILING))]);
// Used by serialization
BitArray(long[] data) {
checkArgument(data.length > 0, "data length is zero!");
this.data = data;
long bitCount = 0;
for (long value : data) {
bitCount += Long.bitCount(value);
this.bitCount = bitCount;
/** Returns true if the bit changed value. */
boolean set(long index) {
if (!get(index)) {
data[(int) (index >>> 6)] |= (1L << index);
return true;
return false;
boolean get(long index) {
return (data[(int) (index >>> 6)] & (1L << index)) != 0;
/** Number of bits */
long bitSize() {
return (long) data.length * Long.SIZE;
public interface JedisExecutor<T> {
T execute(Jedis jedis);
public interface PipelineExecutor {
void load(Pipeline pipeline);
public class JedisUtils {
private static final GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
private JedisPool jedisPool;
public JedisUtils() {
jedisPool = new JedisPool(poolConfig, "localhost", 6379);
public <T> T execute(JedisExecutor<T> jedisExecutor) {
try (Jedis jedis = jedisPool.getResource()) {
return jedisExecutor.execute(jedis);
public List<Object> pipeline(List<PipelineExecutor> pipelineExecutors) {
try (Jedis jedis = jedisPool.getResource()) {
Pipeline pipeline = jedis.pipelined();
for (PipelineExecutor executor : pipelineExecutors)
return pipeline.syncAndReturnAll();
public boolean put(String string, int numHashFunctions, RedisBitmaps bits) {
long bitSize = bits.bitSize();
byte[] bytes = Hashing.murmur3_128().hashString(string, Charsets.UTF_8).asBytes();
long hash1 = lowerEight(bytes);
long hash2 = upperEight(bytes);
boolean bitsChanged = false;
long combinedHash = hash1;
// for (int i = 0; i < numHashFunctions; i++) {
// bitsChanged |= bits.set((combinedHash & Long.MAX_VALUE) % bitSize);
// combinedHash += hash2;
// }
long[] offsets = new long[numHashFunctions];
for (int i = 0; i < numHashFunctions; i++) {
offsets[i] = (combinedHash & Long.MAX_VALUE) % bitSize;
combinedHash += hash2;
bitsChanged = bits.set(offsets);
return bitsChanged;
public boolean mightContain(String object, int numHashFunctions, RedisBitmaps bits) {
long bitSize = bits.bitSize();
byte[] bytes = Hashing.murmur3_128().hashString(object, Charsets.UTF_8).asBytes();
long hash1 = lowerEight(bytes);
long hash2 = upperEight(bytes);
long combinedHash = hash1;
// for (int i = 0; i < numHashFunctions; i++) {
// if (!bits.get((combinedHash & Long.MAX_VALUE) % bitSize)) {
// return false;
// }
// combinedHash += hash2;
// }
// return true;
long[] offsets = new long[numHashFunctions];
for (int i = 0; i < numHashFunctions; i++) {
offsets[i] = (combinedHash & Long.MAX_VALUE) % bitSize;
combinedHash += hash2;
return bits.get(offsets);
最后,也是最重要的RedisBitmaps,其中bitSize用了Guava布隆过滤器中计算Long型数组长度的方法,得到bitSize之后使用setbit命令初始化一个全部为0的位数组。get(long offset)和set(long offset),这两个与Guava布隆过滤器中的逻辑类似,这里就不再赘述了,而get(long[] offsets)方法中,所有的offset要与每一个cursor对应的Bitmaps进行判断,若全部命中,那么这个元素就可能存在于该Bitmaps,反之若不能完全命中,则表示该元素不存在于任何一个Bitmaps,所以当满足这个条件,在set(long[] offsets)方法中,就可以插入到当前key的Bitmaps中了。
class RedisBitmaps {
private static final String BASE_KEY = "bloomfilter";
private static final String CURSOR = "cursor";
private JedisUtils jedisUtils;
private long bitSize;
RedisBitmaps(long bits) {
this.jedisUtils = new JedisUtils();
this.bitSize = LongMath.divide(bits, 64, RoundingMode.CEILING) * Long.SIZE;//位数组的长度,相当于n个long的长度
if (bitCount() == 0) {
jedisUtils.execute((jedis -> jedis.setbit(currentKey(), bitSize - 1, false)));
boolean get(long[] offsets) {
for (long i = 0; i < cursor() + 1; i++) {
final long cursor = i;
boolean match = Arrays.stream(offsets).boxed()
.map(offset -> jedisUtils.execute(jedis -> jedis.getbit(genkey(cursor), offset)))
.allMatch(b -> (Boolean) b);
if (match)
return true;
return false;
boolean get(final long offset) {
return jedisUtils.execute(jedis -> jedis.getbit(currentKey(), offset));
boolean set(long[] offsets) {
if (cursor() > 0 && get(offsets)) {
return false;
boolean bitsChanged = false;
for (long offset : offsets)
bitsChanged |= set(offset);
return bitsChanged;
boolean set(long offset) {
if (!get(offset)) {
jedisUtils.execute(jedis -> jedis.setbit(currentKey(), offset, true));
return true;
return false;
long bitCount() {
return jedisUtils.execute(jedis -> jedis.bitcount(currentKey()));
long bitSize() {
return this.bitSize;
private String currentKey() {
return genkey(cursor());
private String genkey(long cursor) {
return BASE_KEY + "-" + cursor;
private Long cursor() {
String cursor = jedisUtils.execute(jedis -> jedis.get(CURSOR));
return cursor == null ? 0 : Longs.tryParse(cursor);
void ensureCapacityInternal() {
if (bitCount() * 2 > bitSize())
void grow() {
Long cursor = jedisUtils.execute(jedis -> jedis.incr(CURSOR));
jedisUtils.execute((jedis -> jedis.setbit(genkey(cursor), bitSize - 1, false)));
void reset() {
String[] keys = LongStream.range(0, cursor() + 1).boxed().map(this::genkey).toArray(String[]::new);
jedisUtils.execute(jedis -> jedis.del(keys));
jedisUtils.execute(jedis -> jedis.set(CURSOR, "0"));
jedisUtils.execute(jedis -> jedis.setbit(currentKey(), bitSize - 1, false));
private PipelineExecutor apply(PipelineExecutor executor) {
return executor;
public class TestRedisBloomFilter {
private static final int TOTAL = 10000;
private static final double FPP = 0.0005;
public void test() {
RedisBloomFilter redisBloomFilter = RedisBloomFilter.create(TOTAL, FPP);
BloomFilter<String> bloomFilter = BloomFilter.create(Funnels.stringFunnel(Charsets.UTF_8), TOTAL, FPP);
IntStream.range(0, /* 3* */TOTAL).boxed()
.map(i -> Hashing.md5().hashInt(i).toString())
.collect(toList()).forEach(s -> {
String str1 = Hashing.md5().hashInt(99999).toString();
String str2 = Hashing.md5().hashInt(9999).toString();
String str3 = "abcdefghijklmnopqrstuvwxyz123456";
System.out.println(redisBloomFilter.mightContain(str1) + ":" + bloomFilter.mightContain(str1));
System.out.println(redisBloomFilter.mightContain(str2) + ":" + bloomFilter.mightContain(str2));
System.out.println(redisBloomFilter.mightContain(str3) + ":" + bloomFilter.mightContain(str3));
grow bloomfilter-1
grow bloomfilter-1
grow bloomfilter-2
grow bloomfilter-3
