概述

布隆过滤器(Bloom Filter)是 1970 年由布隆提出来的,它可以用于判断一个 key 是否在某一个集合中。

原理

Bloom Filter 实际上是一个 bitmap,每一位是一个二进制位,0 表示 key 不在集合中,1 表示 key 在集合中。当插入一个 key 时,通过 hash 函数计算出一个 hash 值,然后向 bitmap 中对应的位置 1。当查找一个 key 时,如果对应为为 0,则表示集合中没有该 key。但是当对应位位 1 时,我们并不能断定集合中存在该 key,因为 hash 冲突可能造成 Bloom Filter 假阳性。因此,如何选择 hash 函数就成为提高 Bloom Filter 性能的因素之一了。

最简单的想法是使用一个 Hash 函数会造成冲突过大,那么我们就使用多个 Hash 函数,将多个 bit 位置为 1。查找时我们需要判断多个 bit 位,只有当这些 bit 位全为 1 时,才到集合中去查找,否则就可能判断该 key 不存在于集合中了。

img

但是这样又会将一个问题放大:就是如果数组太小了,插入若干个 key 后可能将整个 bitmap 全置为 1 了。因此又引出了第二个需要考虑的问题:bitmap 的大小

推导

假设 Hash 函数以等概率选择 bitmap 中的某一位,该 bitmap 的大小为 m。同时我们选择 k 个 Hash 函数进行计算。那么在插入时,进行一次 Hash 函数计算后,将 bitmap 中的某一个特定位置为 1 的概率为1m\frac{1}{m}

那么在经过 k 次计算后,bitmap 中某一位没有被置为 1 的概率为:

(11m)k(1 - \frac{1}{m})^k

如果我们插入了 n 个元素,那么插入后,某一位仍然为 0 的概率为:

(11m)kn(1-\frac{1}{m})^{kn}

下面我们就可以计算经过 n 次插入后,查询一个 key,判断该 key 在集合中的概率为:

(1(11m)kn)k( 1 - (1-\frac{1}{m})^{kn})^{k}

该概率可以看作是布隆过滤器假阳性的概率。由于

limx0(1+x)1x=e\begin{equation} \lim_{x \to 0} (1+x)^{\frac{1}{x}} = e \end{equation}

lim1m0(11m)kn=eknm\begin{equation} \lim_{- \frac{1}{m}\to 0}(1-\frac{1}{m}) ^ {kn} = e^{- \frac{kn}{m}} \end{equation}

所以上面的公式趋向于:

(1eknm)k( 1 - e^{\frac{-kn}{m} } )^{k}

可以看出,当 m 越大,假阳性概率越低;当 n 越小时,假阳性概率越低。

根据给定的 m 和 n,我们可以计算出 Hash 函数:

f(k)=(1eknm)kf(k) = ( 1 - e^{\frac{-kn}{m} } )^{k}

设 $$ b = e^{\frac{n}{m}}$$,则可以简化为

f(k)=(1bk)kf(k) = (1-b^{-k})^{k}

两边取对数的

lnf(k)=kln(1bk)ln_{}{f(k}) = k \cdot ln_{}{ ( 1 - b^{-k}) }

两边对 k 求导得

1f(k)f(k)=ln(1bk)+kbklnb1bk\begin{equation} \frac{1}{f(k)} \cdot {f(k)}' = ln{(1-b^{-k})} + k \cdot \frac{b^{-k} \cdot ln{b}}{1 - b^{-k}} \end{equation}

当上面公式求最值,可得,当 k=ln2mnk = ln2 \cdot \frac{m}{n} 时,假阳性率最低。

对于给定的假阳性率 p,我们可以计算 bitmap 的大小 m:

m=nlnp(ln2)2\begin{equation}m = - \frac{nln{p}}{(ln2)^{2}} \end{equation}

实现

首先我们需要根据给定的假阳性率和数据量,计算出需要的 bitmap 每个 key 需要多少位,即 m/nm/n,可以用来计算 k

1
2
3
4
5
6
7
// m = - nlnp / (ln2)^2,return m / n
func BloomBitsPerKey(numEntries int, fp float64) int {
size := -1 * float64(numEntries) * math.Log(fp) /
math.Pow(math.Log(2), 2)
locs := math.Ceil(size / float64(numEntries))
return int(locs)
}

然后我们需要计算出需要 Hash 函数的数量

1
2
3
4
5
6
7
8
// k = m/n * ln2
func calcHashNum(bitsPerKey int) (k uint32) {
k = uint32(float64(bitsPerKey) * math.Log(2))
if k < 1 {
k = 1
}
return k
}

下面我们就可以将 key 插入 布隆过滤器了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func appendFilter(keys []uint32, bitsPerKey int) []byte {
if bitsPerKey < 0 {
bitsPerKey = 0
}

k := calcHashNum(bitsPerKey)

nBits := len(keys) * int(bitsPerKey) // n * m/n
nBytes := (nBits + 7) / 8
nBits = nBytes * 8
filter := make([]byte, nBytes+1)

for _, h := range keys {
// 对于每一个 hash 函数,计算哈希值
// 将哈希值对应的 位 设置为 1
}
filter[nBytes] = uint8(k)
return filter
}