xv6其实并不支持现代操作系统中的“多线程”。但由于kernel部分的地址空间是各个进程共享的,kernel本身还是需要处理进程之间同步的问题。运行在多处理器系统上,各个进程可能同时操作一个内存地址;即使在单处理器的情况下,在中断机制开启,允许切换进程时,也可能发生多个进程交替操作某个内存地址的情况。为此xv6实现了两种基本的进程间同步的模型:mutual exclusion和producer/comsumer。
这是xv6系列的第8篇。其他包括:
- minimal assembly
- how system boots
- address space
- interrupts
- system calls
- process
- context switch
- synchronization
- system initialization
mutual exclusion
自旋锁
自旋锁是实现mutual exclusion最简单的一种方法。xv6中定义的spinlock包括了一个关键的标志位locked:
struct spinlock {
uint locked; // Is the lock held?
...
};
加锁的过程acquire:
// spinlock.c
void acquire(struct spinlock *lk) {
pushcli(); // disable interrupts to avoid deadlock.
...
while(xchg(&lk->locked, 1) != 0) // The xchg is atomic.
;
...
}
其中xchg是一个原子操作,它原子性地将数值写入某个内存位置,并返回此位置原先的值。acquire首先暂时禁止中断机制,然后通过xchg将locked标志位置1。xchg如果返回值为1,表示已经有其他进程占用了这个锁,当前进程加锁失败,于是进入不断重试直至成功为止的流程。相反如果xchg返回0,则表示当前进程已经成功获得了这个锁,可以愉快的继续执行下面的指令。
在竞争激烈的情况下,获取自旋锁的过程中CPU时间可能都消耗在不断自旋等待之中,效率比较低下。
与acquire镜像对称,release首先将locked标志位置零,然后恢复中断机制。
void release(struct spinlock *lk) {
...
lk->locked = 0;
popcli();
}
xv6在加锁之前首先会通过pushcli来禁止中断,相应在解锁之后通过popcli重启中断。pushcli和popcli都是对当前CPU的操作,它们除了调用cli和sti来操作%eflags中的FL_IF标志位之外,还记录了操作的次数。只有当pushcli和popcli的次数相等时,中断才会被开启。所以当程序中有两个锁处于锁定状态,只释放其中的一个,中断不会被开启,于是也不会发生进程切换。
void pushcli(void) {
int eflags;
eflags = readeflags();
cli();
if(cpu->ncli == 0)
cpu->intena = eflags & FL_IF;
cpu->ncli += 1;
}
void
popcli(void)
{
if(readeflags()&FL_IF)
panic("popcli - interruptible");
if(--cpu->ncli < 0)
panic("popcli");
if(cpu->ncli == 0 && cpu->intena)
sti();
}
producer/comsumer
sleep和wakeup
除了自旋锁处理的mutual exclusion之外,另一种需要进程间协调的关系是producer/consumer(例如pipe左右的两个进程)。这种关系中,consumer必须等producer完成(部分)工作之后才有事可做,否则处于空转消耗CPU时间的状态。xv6中用sleep和wakeup两个system call处理这个问题。sleep让进程进入休眠状态,让出CPU;wakeup通过进程结构体proc中的信号量chan唤醒休眠的进程。
sleep除了要求一个信号量chan之外,还要求进程持有一个锁lk。进入sleep之后,锁定ptable,释放其他的锁(与sched的要求一致)。然后设置chan,将进程的状态设置为SLEEPING,然后进入sched,让出CPU去运行另一个进程。ptable锁将由切换后的进程负责释放。
void sleep(void *chan, struct spinlock *lk) {
if(proc == 0)
panic("sleep");
if(lk == 0)
panic("sleep without lk");
if(lk != &ptable.lock){
acquire(&ptable.lock);
release(lk);
}
proc->chan = chan;
proc->state = SLEEPING;
sched();
proc->chan = 0;
if(lk != &ptable.lock){
release(&ptable.lock);
acquire(lk);
}
}
walkup首先锁定ptable。然后遍历ptable,找到所有匹配信号量chan的休眠状态的进程,将它们的状态改为RUNNABLE。最后释放ptable锁。被唤醒的进程将在未来某个时刻由scheduler载入CPU,最终进入RUNNING状态。
void wakeup(void *chan) {
struct proc *p;
acquire(&ptable.lock);
for(p = ptable.proc; p < &ptable.proc[NPROC]; p++)
if(p->state == SLEEPING && p->chan == chan)
p->state = RUNNABLE;
release(&ptable.lock);
}
一个栗子:send/receive
send和recv是关于如何使用sleep和wakeup的例子。二者之间通过结构体q传递一个指针ptr。
send的流程是:
- 首先对
q加锁 - 然后检查
q中的ptr是否为空,如果为空,则将p写入。 - 如果
ptr不为空,说明recv还没有将之前的值取出,通道堵塞,send不能继续工作,所以通过sleep让它进入休眠。sleep将负责对q解锁,以便让recv可以工作。 send将处于SLEEPING状态,直至recv将信号取走,且将它的状态改为RUNNABLE- 某个时刻
send被重新载入CPU执行,它从sleep返回,并且返回的过程已经对q再次加锁,此时send可以将p写入 wakeup所有等待从q接收信号的进程- 最后对
q解锁
recv的流程几乎与之对称。
struct q {
struct spinlock lock;
void* ptr;
};
void* send(struct q *q, void *p) {
acquire(&q->lock);
while(q->ptr != 0)
sleep(q, &q->lock);
q->ptr = p;
wakeup(q);
release(&q->lock);
}
void* recv(struct q *q) {
void*p;
acquire(&q->lock);
while((p = q->ptr) == 0)
sleep(q, &q->lock);
q->ptr = 0;
wakeup(q)
release(&q->lock);
return p;
}
一个细节是sleep函数总是处于while循环之中,每当从sleep返回之后都要重新检查条件是否满足,以避免进程被意外唤醒。
另一个细节是q为何需要包括一个锁。这是因为while验证条件和调用sleep是两个操作,如果在这两步之间发生进程切换的话,可能会发生wakeup在sleep之前执行的情况,这样当切换回原进程并最终进入休眠之后,将不能再被唤起。
一个更完整的栗子:pipe
管道是unix系统的经典设计。它是一种inter-process communication(IPC)机制,通过一个内存中的buffer在两个进程之间传递数据。
xv6中pipe的定义:
struct pipe {
struct spinlock lock;
char data[PIPESIZE];
uint nread; // number of bytes read
uint nwrite; // number of bytes written
...
};
管道连接的两个进程,一个负责写(pipewrite),另一个负责读(piperead),流程与send和recv非常接近(send和recv可以看成是buffer大小为0的管道)。
int pipewrite(struct pipe *p, char *addr, int n) {
int i;
acquire(&p->lock);
for(i = 0; i < n; i++){
while(p->nwrite == p->nread + PIPESIZE){
...
wakeup(&p->nread);
sleep(&p->nwrite, &p->lock);
}
p->data[p->nwrite++ % PIPESIZE] = addr[i];
}
wakeup(&p->nread);
release(&p->lock);
return n;
}
int piperead(struct pipe *p, char *addr, int n) {
int i;
acquire(&p->lock);
while(p->nread == p->nwrite && p->writeopen){
...
sleep(&p->nread, &p->lock);
}
for(i = 0; i < n; i++){
if(p->nread == p->nwrite)
break;
addr[i] = p->data[p->nread++ % PIPESIZE];
}
wakeup(&p->nwrite);
release(&p->lock);
return i;
}