Large-scale Incremental Processing Using Distributed Transactions and Notifications

Percolator

Motivation

MapReduce 无法单独处理增量数据更新,更新延迟与整体的数据量成正比

DBMS 可以利用事务进行增量更新,但是无法处理 Google web index 这个数量级的数据

已有的分布式存储系统(比如 Bigtable)可以处理这个量级的数据,但是无法保证并发更新时数据的不变性

Percolator 的目标是并发处理增量更新的同时,保证数据的不变性(事务),并且可以跟踪那些更新已经被处理了(Observe 机制)

Design

Percolator 是在 Bigtable 的基础上构建的。Bigtable 可以看做是一个多维的 map (row, column, timestamp),提供了在 row 上查找并保证 row 事务的能力。但是 Bigtable 不支持跨行事务。因此 Percolator 的一个目标就是利用 Bigtable 实现跨行、跨表事务。

Percolator 中没有单独的 lock manager,因此所有节点都可以发送请求修改数据,因此所有的锁都必须持久化,Bigtable 保证了即使 crash,lock 也不会丢失。

Percolator 实现了快照隔离级别,每个数据都有多个版本,当发生 W-W 冲突时,会保证最多只有一个事务会被提交

Observe:允许用于指定某些 observe column,当这些了column 被修改时,会调用 observe code。如果一个 column 关联了多个 observe code,那么只有一个会被执行。一个 column 修改多次可能只会执行一次 observe code(使用 acknowledge column来判断是否已经执行过了)

算法

类似于 2PC,将提交分为两个阶段

1. prewrite

  • 锁住所有相关的 cells(先锁 primary key,primary,secondary 都写 primary 的锁信息 + start_ts)
  • 检查冲突,如果有冲突就回滚
  • 如果没有冲突,写入 lock 和 data,时间戳为 start_ts

两种冲突:

  • 发现 write record 的时间戳大于自己的时间戳,不能写未来的数据,回滚(commit 才会写 write record,所以该写记录一定已经提交了)
  • 发现数据被锁住了,回滚(说明有写冲突)

2. commit

  • 申请提交时间戳 commitTs
  • 释放锁,用 write 记录替换 lock 记录(先替换 primary, 时间戳为 commitTs,内容为 startTs,表明数据的最新版本是 startTs 对应的数据)

正在提交时宕机failure,lock还没释放怎么办?如何清理这些锁

Percolator 采取 lazy 的方法清理锁,即只有用在 Get() 的时候才会去清理,这就会造成延迟可能会比较高,所以不适合那些要求低延迟的系统。

在写入锁时会指定第一个 lock 为 primary lock,其余的 lock 为 secondary lock,secondary lock 会记录下 primary 的位置。Percolator 保证 primary 提交成功后,该事务所做的修改就对外部可见了,所以可以查看 primary lock 或 primary write 是否存在来判断该进行什么操作

  • 如果 primary lock 不存在,说明事务已经提交了,需要恢复事务
  • 如果 primary lock 存在,说明事务还没有提交,可以回滚

Get

  • 检查 [0, start_timestamp] 是否有锁
    • 如果有锁,表示有其他事务正在占用,如果超时,查看是否能够清理,不能清理就 wait
    • 如果没有锁,读取能够看到最新的数据([0, start_ts])

BackoffAndMaybeClean 对应了清理 lock 的逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
fn back_off_maybe_clean_up_lock(&self, start_ts: u64, key: Vec<u8>) {
let mut table = self.data.lock().unwrap();
// get lock first, to find primary
if let Some((lock_k, lock_v)) = table.read(key.clone(), Column::Lock, None, Some(start_ts)) {
let lock_start_ts = parseTs(lock_key);
// find primary lock
let primary_key = get_primary(primary);
// determine rollback or roll-forward
match table.read(primary_key.to_vec(),Column::Write,lock_start_ts,u64::MAX) {
Some((primary_write_key, primary_write_value)) => {
if let Value::Timestamp(ts) = primary_write_value {
if *ts == lock_start_ts {
// roll-forward
let commit_ts = primary_write_key.1;
table.write(key,Column::Write,commit_ts,lock_start_ts);
table.erase(key, Column::Lock, lock_start_ts);
}
}
}
None => {
// rollback
table.erase(key, Column::Lock, lock_start_ts);
}
}
}
}
  • 拿到写记录对应的 lock
  • 根据 lock 中的 primary lock 信息拿到 primary lock(也可以拿到 primary 的写记录,如果有说明已经提交了)
    • 如果没有 primary lock(或者如果有 primary 的写记录),说明已经提交了,不能回滚,可以帮助提交(写 write 记录 with commit_ts, 释放锁)
    • 如果有 primary lock(或者如果没有 primary 的写记录),说明还没有提交,可以回滚(直接释放锁)

talent plan 有个练习,是对 percolator 的复现,完成这个练习可以加深对 percolator 的理解。我的实现