lock

Table of Contents

1. spinlock

update@201509: 通过xchg/cmpxchg这种原子指令可以很容易实现一个spinlock原型. 如果critical section非常短小的话,spinlock相比mutex lock效率会好很多。

2. lock-free

上面这篇文章写得相当不错,至少阅读完成之后对于lock-free思想就比较清楚了。lock-free本质思想无非就是copy出一个复本出来(需要注意代价),然后在复本上修改,然后merge回去。如果这个原副本没有发生变化的话,那么就提交(原子操作),如果发生变化的话,那么这个需要重新执行直到成功。当然这里问题就在于如何判断原副本没有变化,一种方式是判断值,但是这个会出现ABA问题。所谓ABA问题就是,如果值首先是A,然后被修改为B,然后再修改回A,虽然原副本看上去没有变化,但是对于内部有一个变化的过程了。另外一种方式就是使用版本号version这么一个方式。实现lock-free首先需要确保正确性,其次需要注意性能(和加锁算法进行性能对比)。不过根据我的经验,lock-free实现正确比较困难(内核层面实现是有保证的,但是应用层面实现lock-free的话因为和应用相关,所以正确性不一定有保证),另外相比加锁算法不一定好,如果加锁可以满足那就应该使用加锁算法。


An Introduction to Lock-Free Programming 这篇文章作为lock-free programming入门也也不错,什么是lock-free,以及lock-free编程会使用到的技术。

lock-free顾名思义就是在多个执行体访问共享变量的时候,不使用任何锁相关的技术:任何一个执行体在访问这个共享变量的时候,都不会阻碍其他执行体的访问,它们之间是完全相互隔离的。这也意味着,lock-free通常意味着以较低的成本来处理冲突。

文章里面提到了几个技术:

  1. atomic read-modify-write. 比如C++11里面的std::atomic类型的 fetch_add, fetch_or 等等。
  2. CAS. compare-and-swap 指令来原子性地修改值,但是要注意ABA问题。
  3. sequential consistency. 顺序一致性,比如 std::atomic 类型的 load/store操作,以及Java里面的volatile关键字。
  4. memory ordering. 在std::atomic 类型里面, load/store 操作可以指定不同级别的 memory ordering.

3. linux futex

futex(fast userspace mutex)快速用户态互斥锁,需要用户态和内核态配合来完成。首先用户态判断是否存在contented case(比如判断当前资源是否足够),如果没有的话就没有必要陷入内核态,用户态就可以搞定。如果出现conteted case,那么这个需要等待资源可用,那么调用futex(FUTEX_WAIT)陷入内核态挂起,并且等待唤醒。如果一旦有资源可用的话,首先会判断用户态是否有等待资源的请求,如果没有的话同样没有必要陷入内核态,用户态就可以搞定。如果出现有等待资源并且挂起的请求的话,那么调用futex(FUTEX_WAKE)就可以将请求唤醒。上面那个链接可以有代码参考, 此外还可以阅读glibc/ntpl下面的pthread_mutex的部分。不过大部分时候我们没有必要接触这个东西,pthread_mutex已经做得足够高效并且屏蔽了操作系统的细节了。

4. 使用自选锁来实现信号量

typedef struct sema
{
  lock_t lock;
  int count;
  queue q;
} sema_t;

void init_sema(sema_t* sema, int init_cnt)
{
  init_lock(&sema->lock);
  init_queue(&sema->q);
  sema->count=init_cnt;
}

void p(sema_t* sema)
{
  lock(&sema->lock);
  sema->count--;
  if (sema->count < 0) {
    q.push(current_process_context());
    unlock(&sema->lock);
    swtch(); // switch to another process.
    return;
  }
  unlock(&sema->lock);
}

void v(sema_t* sema)
{
  lock(&sema->lock);
  sema->count++;
  if (sema->count <= 0) {
    pcs_ctx* ctx = q.pop();
    unlock(&sema->lock);
    enqueue(&running_queue, ctx);
    return ;
  }
  unlock(&sema->lock);
}