xv6: synchronization

Posted by aphasiayc on Wed 29 May 2019

xv6其实并不支持现代操作系统中的“多线程”。但由于kernel部分的地址空间是各个进程共享的,kernel本身还是需要处理进程之间同步的问题。运行在多处理器系统上,各个进程可能同时操作一个内存地址;即使在单处理器的情况下,在中断机制开启,允许切换进程时,也可能发生多个进程交替操作某个内存地址的情况。为此xv6实现了两种基本的进程间同步的模型:mutual exclusion和producer/comsumer。

这是xv6系列的第8篇。其他包括:

  1. minimal assembly
  2. how system boots
  3. address space
  4. interrupts
  5. system calls
  6. process
  7. context switch
  8. synchronization
  9. system initialization

mutual exclusion

自旋锁

自旋锁是实现mutual exclusion最简单的一种方法。xv6中定义的spinlock包括了一个关键的标志位locked

struct spinlock {
  uint locked;       // Is the lock held?
  ...
};

加锁的过程acquire

// spinlock.c
void acquire(struct spinlock *lk) {
  pushcli(); // disable interrupts to avoid deadlock.
  ...
  while(xchg(&lk->locked, 1) != 0)   // The xchg is atomic.
    ;
  ...
}

其中xchg是一个原子操作,它原子性地将数值写入某个内存位置,并返回此位置原先的值。acquire首先暂时禁止中断机制,然后通过xchglocked标志位置1。xchg如果返回值为1,表示已经有其他进程占用了这个锁,当前进程加锁失败,于是进入不断重试直至成功为止的流程。相反如果xchg返回0,则表示当前进程已经成功获得了这个锁,可以愉快的继续执行下面的指令。

在竞争激烈的情况下,获取自旋锁的过程中CPU时间可能都消耗在不断自旋等待之中,效率比较低下。

acquire镜像对称,release首先将locked标志位置零,然后恢复中断机制。

void release(struct spinlock *lk) {
  ...
  lk->locked = 0;
  popcli();
}

xv6在加锁之前首先会通过pushcli来禁止中断,相应在解锁之后通过popcli重启中断。pushclipopcli都是对当前CPU的操作,它们除了调用clisti来操作%eflags中的FL_IF标志位之外,还记录了操作的次数。只有当pushclipopcli的次数相等时,中断才会被开启。所以当程序中有两个锁处于锁定状态,只释放其中的一个,中断不会被开启,于是也不会发生进程切换。

void pushcli(void) {
  int eflags;

  eflags = readeflags();
  cli();
  if(cpu->ncli == 0)
    cpu->intena = eflags & FL_IF;
  cpu->ncli += 1;
}

void
popcli(void)
{
  if(readeflags()&FL_IF)
    panic("popcli - interruptible");
  if(--cpu->ncli < 0)
    panic("popcli");
  if(cpu->ncli == 0 && cpu->intena)
    sti();
}

producer/comsumer

sleep和wakeup

除了自旋锁处理的mutual exclusion之外,另一种需要进程间协调的关系是producer/consumer(例如pipe左右的两个进程)。这种关系中,consumer必须等producer完成(部分)工作之后才有事可做,否则处于空转消耗CPU时间的状态。xv6中用sleepwakeup两个system call处理这个问题。sleep让进程进入休眠状态,让出CPU;wakeup通过进程结构体proc中的信号量chan唤醒休眠的进程。

sleep除了要求一个信号量chan之外,还要求进程持有一个锁lk。进入sleep之后,锁定ptable,释放其他的锁(与sched的要求一致)。然后设置chan,将进程的状态设置为SLEEPING,然后进入sched,让出CPU去运行另一个进程。ptable锁将由切换后的进程负责释放。

void sleep(void *chan, struct spinlock *lk) {
  if(proc == 0)
    panic("sleep");
  if(lk == 0)
    panic("sleep without lk");

  if(lk != &ptable.lock){
    acquire(&ptable.lock);
    release(lk);
  }

  proc->chan = chan;
  proc->state = SLEEPING;
  sched();

  proc->chan = 0;
  if(lk != &ptable.lock){
    release(&ptable.lock);
    acquire(lk);
  }
}

walkup首先锁定ptable。然后遍历ptable,找到所有匹配信号量chan的休眠状态的进程,将它们的状态改为RUNNABLE。最后释放ptable锁。被唤醒的进程将在未来某个时刻由scheduler载入CPU,最终进入RUNNING状态。

void wakeup(void *chan) {
  struct proc *p;

  acquire(&ptable.lock);

  for(p = ptable.proc; p < &ptable.proc[NPROC]; p++)
    if(p->state == SLEEPING && p->chan == chan)
      p->state = RUNNABLE;

  release(&ptable.lock);
}

一个栗子:send/receive

sendrecv是关于如何使用sleepwakeup的例子。二者之间通过结构体q传递一个指针ptr

send的流程是:

  1. 首先对q加锁
  2. 然后检查q中的ptr是否为空,如果为空,则将p写入。
  3. 如果ptr不为空,说明recv还没有将之前的值取出,通道堵塞,send不能继续工作,所以通过sleep让它进入休眠。sleep将负责对q解锁,以便让recv可以工作。
  4. send将处于SLEEPING状态,直至recv将信号取走,且将它的状态改为RUNNABLE
  5. 某个时刻send被重新载入CPU执行,它从sleep返回,并且返回的过程已经对q再次加锁,此时send可以将p写入
  6. wakeup所有等待从q接收信号的进程
  7. 最后对q解锁

recv的流程几乎与之对称。

struct q {
  struct spinlock lock;
  void* ptr;
};

void* send(struct q *q, void *p) {
  acquire(&q->lock);
  while(q->ptr != 0)
    sleep(q, &q->lock);
  q->ptr = p;
  wakeup(q);
  release(&q->lock);
}

void* recv(struct q *q) {
  void*p;

  acquire(&q->lock);
  while((p = q->ptr) == 0)
    sleep(q, &q->lock);
  q->ptr = 0;
  wakeup(q)
  release(&q->lock);
  return p;
}

一个细节是sleep函数总是处于while循环之中,每当从sleep返回之后都要重新检查条件是否满足,以避免进程被意外唤醒。

另一个细节是q为何需要包括一个锁。这是因为while验证条件和调用sleep是两个操作,如果在这两步之间发生进程切换的话,可能会发生wakeupsleep之前执行的情况,这样当切换回原进程并最终进入休眠之后,将不能再被唤起。

一个更完整的栗子:pipe

管道是unix系统的经典设计。它是一种inter-process communication(IPC)机制,通过一个内存中的buffer在两个进程之间传递数据。

xv6中pipe的定义:

struct pipe {
  struct spinlock lock;
  char data[PIPESIZE];
  uint nread;     // number of bytes read
  uint nwrite;    // number of bytes written
  ...
};

管道连接的两个进程,一个负责写(pipewrite),另一个负责读(piperead),流程与sendrecv非常接近(sendrecv可以看成是buffer大小为0的管道)。

int pipewrite(struct pipe *p, char *addr, int n) {
  int i;

  acquire(&p->lock);
  for(i = 0; i < n; i++){
    while(p->nwrite == p->nread + PIPESIZE){ 
      ...
      wakeup(&p->nread);
      sleep(&p->nwrite, &p->lock);
    }
    p->data[p->nwrite++ % PIPESIZE] = addr[i];
  }
  wakeup(&p->nread);
  release(&p->lock);
  return n;
}

int piperead(struct pipe *p, char *addr, int n) {
  int i;

  acquire(&p->lock);
  while(p->nread == p->nwrite && p->writeopen){
    ...
    sleep(&p->nread, &p->lock); 
  }
  for(i = 0; i < n; i++){
    if(p->nread == p->nwrite)
      break;
    addr[i] = p->data[p->nread++ % PIPESIZE];
  }
  wakeup(&p->nwrite);
  release(&p->lock);
  return i;
}

参考

  1. UCI course on synchronization
  2. xv6 Book