mace 记录 -- 一个多线程的BUG
在修改完恢复代码之后的测试中,发现多线程的测试MVCC测试中出现任意位置的崩溃,在排除各个模块后锁定到buffer模块,本次修改合并了多个page file为一个,修改了部分 buffer 的代码,对其他模块mock后测试发现,日志如下
18:48:56.989247 338925 [DEBUG] src/map/buffer.rs:312 install data 0x7f5bb9ff7010
18:48:56.989247 338927 [DEBUG] src/map/buffer.rs:105 (1, 4194304) 0x7f5bb9ff7010 off 0 size 88
18:48:56.989247 338925 [DEBUG] src/map/buffer.rs:105 (1, 4194304) 0x7f5bb9ff7010 off 0 size 17536
18:48:56.989258 338925 [DEBUG] src/map/buffer.rs:251 duplicated allocation (1, 4194304) 0x7f5bb9ff7010 size 17536
这是buffer模块中内存管理的地方,从日志中很明显的看到内存分配器在不同的线程分配了两个相同的地址,这部分代码简化下就是
const LIMIT: u64 = 4 << 20;
static QUIT: AtomicBool = AtomicBool::new(false);
enum Code {
Again,
NoSpace,
}
struct Junk {
idx: usize,
state: AtomicU64,
offset: AtomicU64,
cnt: AtomicU64,
}
impl Junk {
const FREE: u64 = 3;
const SEALED: u64 = 7;
fn new(idx: usize) -> Self {
Self {
idx,
state: AtomicU64::new(Self::FREE),
offset: AtomicU64::new(0),
cnt: AtomicU64::new(0),
}
}
fn reset(&self, off: u64) {
assert_ne!(off, 0);
self.state.store(Self::FREE, Release);
self.offset.store(off, Release);
self.cnt.fetch_add(1, Release);
}
fn inc(&self, size: u64) -> Result<(usize, u64), Code> {
debug_assert!(size < LIMIT);
let mut cur = self.offset.load(Acquire);
loop {
if self.state.load(Acquire) == Self::SEALED {
return Err(Code::Again);
}
let new = cur + size;
if new > LIMIT {
return Err(Code::NoSpace);
}
match self.offset.compare_exchange(cur, new, AcqRel, Acquire) {
Ok(_) => return Ok((self.idx, cur)),
Err(e) => cur = e,
}
}
}
fn seal(&self) {
self.state.store(Self::SEALED, Release);
}
fn cnt(&self) -> u64 {
self.cnt.load(Acquire)
}
}
#[derive(Debug, Clone, Copy)]
struct Handle {
raw: *mut Junk,
}
unsafe impl Send for Handle {}
unsafe impl Sync for Handle {}
impl From<*mut Junk> for Handle {
fn from(value: *mut Junk) -> Self {
Self { raw: value }
}
}
impl Deref for Handle {
type Target = Junk;
fn deref(&self) -> &Self::Target {
unsafe { &*self.raw }
}
}
struct Pool {
buf: Vec<Handle>,
freeq: ArrayQueue<Handle>,
cur: AtomicPtr<Junk>,
set: RwLock<HashSet<u64>>,
next: AtomicU64,
cnt: AtomicUsize,
}
impl Drop for Pool {
fn drop(&mut self) {
for h in &self.buf {
unsafe {
let x = Box::from_raw(h.raw);
drop(x);
}
}
}
}
impl Pool {
const N: usize = 8;
fn new() -> Self {
let freeq = ArrayQueue::new(Self::N);
let mut buf = Vec::with_capacity(Self::N);
for i in 0..Self::N {
let raw = Box::into_raw(Box::new(Junk::new(i)));
let h = Handle { raw };
buf.push(h);
freeq.push(h).unwrap();
}
Self {
buf,
freeq,
cur: AtomicPtr::new(null_mut()),
set: RwLock::new(HashSet::new()),
next: AtomicU64::new(4 << 20),
cnt: AtomicUsize::new(0),
}
}
fn current(&self) -> Option<Handle> {
let cur = self.cur.load(Relaxed);
if cur.is_null() {
self.install(cur)?;
}
Some(self.cur.load(Acquire).into())
}
fn process(&self, size: u64) -> Option<()> {
while !QUIT.load(Relaxed) {
let cur = self.current()?;
match cur.inc(size) {
Err(Code::Again) => continue,
Err(Code::NoSpace) => {
cur.seal();
self.install(cur.raw)?;
}
Ok((idx, off)) => {
let mut lk = self.set.write().unwrap();
if !lk.insert(off) {
log::debug!("duplicated idx {idx} off {off} cnt {}", cur.cnt());
panic!("");
}
return Some(());
}
}
}
None
}
fn install(&self, cur: *mut Junk) -> Option<()> {
if self.cnt.load(Acquire) >= Self::N {
QUIT.store(true, Relaxed);
return None;
}
let new = self.freeq.pop()?;
match self.cur.compare_exchange(cur, new.raw, AcqRel, Acquire) {
Ok(_) => {
new.reset(self.next());
log::debug!(
"install {} off {} cnt {}",
new.idx,
new.offset.load(Relaxed),
new.cnt()
);
self.cnt.fetch_add(1, Relaxed);
Some(())
}
Err(_) => {
self.freeq.push(new).expect("no space");
None
}
}
}
fn next(&self) -> u64 {
self.next.fetch_add(LIMIT, Release)
}
fn is_full(&self) -> bool {
self.cnt.load(Acquire) >= Self::N
}
}
以上的代码完美的复现了问题
12:29:38.456084 820556 [DEBUG] src/lib.rs:189 install 0 off 4194304 cnt 1
12:29:38.456175 820556 [DEBUG] src/lib.rs:189 install 3 off 16777216 cnt 1
12:29:38.456107 820554 [DEBUG] src/lib.rs:189 install 2 off 12582912 cnt 1
12:29:38.456100 820555 [DEBUG] src/lib.rs:189 install 1 off 8388608 cnt 1
12:29:38.456191 820554 [DEBUG] src/lib.rs:189 install 5 off 25165824 cnt 1
12:29:38.456186 820556 [DEBUG] src/lib.rs:189 install 4 off 20971520 cnt 1
12:29:38.456204 820555 [DEBUG] src/lib.rs:189 install 6 off 29360128 cnt 1
12:29:38.456206 820554 [DEBUG] src/lib.rs:169 duplicated idx 6 off 0 cnt 1
12:29:38.456210 820556 [DEBUG] src/lib.rs:189 install 7 off 33554432 cnt 1
12:29:38.456218 820555 [DEBUG] src/lib.rs:240 thread 2 exit
12:29:38.456221 820556 [DEBUG] src/lib.rs:246 thread 3 exit
从日志中,我们可以发现:bug触发时,刚好完成了 install
,并且完成 install
的线程在使用新的 Handle
前其他线程已经使用Handle
进行过 inc
,每次重复都发生在 off 0
(而 off
只有在初始化时才设置为0,往后绝不可能为0)
考虑到我们对分配做了限制,不存在循环分配,因此只有第一个Handle
第一次使用时才会出现 off 0
的,而这必须发生在 reset
之前,对比日志(mace的日志),刚好符合,即:
- thread 1
install a
- thread 2
a.inc
- thread 1
a.reset
- thread 1
a.inc
触发Bug的关键就是第2步,因为前面的Handle
已经在set中插入过0了,而第二步抢在重置off之前执行了,导致重新插入0,触发Bug,我们期望的是install
和 reset
两步是一个原子的操作,但实际上被分开了
找到了原因,那么修复就简单了,由于CAS是一个同步点,我们可以将 Pool::next
拆为两步,一步 load
一步 store
,load
放在CAS前,store
放在CAS成功的分支
fn install(&self, cur: *mut Junk) -> Option<()> {
if self.cnt.load(Acquire) >= Self::N {
QUIT.store(true, Relaxed);
return None;
}
let new = self.freeq.pop()?;
new.reset(self.next()); <-----
match self.cur.compare_exchange(cur, new.raw, AcqRel, Acquire) {
Ok(_) => {
log::debug!(
"install {} off {} cnt {}",
new.idx,
new.offset.load(Relaxed),
new.cnt()
);
self.update_next(); <-----
self.cnt.fetch_add(1, Relaxed);
Some(())
}
Err(_) => {
self.freeq.push(new).expect("no space");
None
}
}
}
fn next(&self) -> u64 {
self.next.load(Relaxed)
}
fn update_next(&self) {
self.next.fetch_add(LIMIT, Release);
}
同理,检查 mace 的代码,确实是相同的问题,做同样的修改即可
注意:拆开的前提是,以为多个线程可能会同时调用 install
,如果不拆开的话,next
就可能不连续了;相反,如果不存在并发调用 install
时,则不需要拆开,只需要保证 reset
在 CAS 之前完成即可
总结一下:在修改buffer前,代码是正确的,在增加新功能后修改出错了,并且 review 时并没有发现这个错误,加上这个错误不容易出现,因此在大范围修改后才测试出来,这时候再debug就不容易了。因此在修改旧代码时应该谨慎,分为多个步骤,没完成一步都需要完成所有相关的测试,比如,本次的 buffer,它是可以剥离开单独在系统中运行的,这也要求模块间要解耦