OSTEP / Concurrency-Lock

[OSTEP](http://pages.cs.wisc.edu/~remzi/OSTEP/)

使用xchg/cmpxchg等原子指令就可以实现spinlock原型。但是spinlock有两个主要问题: 1. fairness(公平性) 2. performance(性能). 公平性是因为,如果有多个线程同时竞争一个spinlock的话,有可能部分线程一直抢不到锁,或者是某些线程相比另外一些线程有更低概率拿到锁。通常地,低优先级线程可能一直就拿不到锁。性能原因是因为,其他线程在争抢这个锁时是空转的,一直耗费着CPU. 所以会看到CPU util很高,但是程序运行却很慢。

ticket lock是一种spinlock实现,解决了公平性问题。可以确保即使低优先级线程可以拿到锁。原理上是,为每个争抢者分配一个token, 而这个spinlock有一个ticket。这些争抢者拿自己token和ticket比较,如果相等就抢到了,等到释放锁时ticket+1, 交给下一个争抢者。

typedef struct __lock_t {
  int ticket;
  int turn;
} lock_t;

void lock_init(lock_t *lock) {
  lock->ticket = 0;
  lock->turn = 0;
}

void lock(lock_t *lock) {
  int myturn = FetchAndAdd(&lock->ticket);
  while (lock->turn != myturn)
    ; // spin
}

void unlock(lock_t *lock) {
  FetchAndAdd(&lock->turn);
}

如果争抢者在一定时间内抢占不到的话,不如放弃CPU让其他进程/线程处理,这样可以在一定程度上改进性能,避免CPU空转。实现上只要在spin部分加上sched_yield即可,非常简单。可是这没有完全解决性能问题,设想如果有100个线程抢占同一个spinlock的话,那么99个线程都会调用sched_yield,然后把控制权交出,造成99次cs(context switch), 而这些cs是完全无用的。

为了彻底解决性能问题,正确做法应该是将这些线程挂起进入block状态。下面是书中给出的实现。每一个lock上会有一个Q, 上面挂着有哪些因为竞争这个lock而block住的线程。注意在lock的park()和unlock的unpark()会有race condition,这个和park/unpark实现相关,或者在m->guard=0之前加上set_park()设置park标记。

typedef struct __lock_t {
    int flag;
    int guard;
    queue_t *q;
} lock_t;

void lock_init(lock_t *m) {
    m->flag = 0;
    m->guard = 0;
    queue_init(m->q);
}

void lock(lock_t *m) {
    while (TestAndSet(&m->guard, 1) == 1)
        ; //acquire guard lock by spinning
    if (m->flag == 0) {
        m->flag = 1; // lock is acquired
        m->guard = 0;
    } else {
        queue_add(m->q, gettid());
        m->guard = 0;
        park();
    }
}

void unlock(lock_t *m) {
    while (TestAndSet(&m->guard, 1) == 1)
        ; //acquire guard lock by spinning
    if (queue_empty(m->q))
        m->flag = 0; // let go of lock; no one wants it
    else
        unpark(queue_remove(m->q)); // hold lock (for next thread!)
    m->guard = 0;
}

linux下lock实现依赖于futex. 这里介绍两个调用:

书中给了一个使用futex实现mutex的例子。这个mutex最高位表示是否lock, 而剩余31位表示有多少个waiter.

void mutex_lock (int *mutex) {
    int v;
    /* Bit 31 was clear, we got the mutex (this is the fastpath) */
    if (atomic_bit_test_set (mutex, 31) == 0)
        return;
    atomic_increment (mutex);
    while (1) {
        if (atomic_bit_test_set (mutex, 31) == 0) {
            atomic_decrement (mutex);
            return;
        }
        /* We have to wait now. First make sure the futex value
           we are monitoring is truly negative (i.e. locked). */
        v = *mutex;
        if (v >= 0)
            continue;
        futex_wait (mutex, v);
    }
}

void mutex_unlock (int *mutex) {
    /* Adding 0x80000000 to the counter results in 0 if and only if
       there are not other interested threads */
    if (atomic_add_zero (mutex, 0x80000000))
        return;

    /* There are other threads waiting for this mutex,
       wake one of them up. */
    futex_wake (mutex);
}