观察Memory Ordering效果

代码 参考了这篇文章 https://preshing.com/20120515/memory-reordering-caught-in-the-act/

这件事情比较重要的原因是,我们需要有个实验平台才能验证自己的理论是否正确,尤其是做性能优化(memory barrier). 如果性能提升上去了,但是发现结果会出错,即便是以比较小的概率出错,那么我觉得这个性能提升也是不值得的。

这个小程序是这样的,三个线程:

  1. t0. 对 x=1, r1=y 进行操作
  2. t1. 对 y=1, r1=x 进行操作
  3. control. 控制t0, t1同时执行上面的操作

在原文里面是在Win32上使用信号量机制来做latch的,我这里改做了CAS. 使用随机数是希望等待一定时间,好让t0,t1可以同时开始进行操作,提高出现问题的几率。 三个线程没有任何blocking的操作,所以可能会比较费CPU,使用率可能能到300%.

doris-sandbox04 :: ~ » g++ WatchMemoryOrdering.cpp -O2 -lpthread
doris-sandbox04 :: ~ » ./a.out
1 reorders detected after 366 iterations
2 reorders detected after 413 iterations
3 reorders detected after 2495 iterations
4 reorders detected after 4144 iterations
5 reorders detected after 4362 iterations
6 reorders detected after 5493 iterations
7 reorders detected after 5902 iterations

doris-sandbox04 :: ~ » g++ WatchMemoryOrdering.cpp -O2 -lpthread -DUSE_FENCE
doris-sandbox04 :: ~ » time ./a.out
^C
./a.out  12.31s user 3.66s system 295% cpu 5.408 total

这是一个很不错的实验。过去我一直以为复现这个问题比较困难,现在发现也不需要太多代码就可以做到,关键还是在设计精巧的实验上。

/* coding:utf-8
 * Copyright (C) dirlt
 */

#include <atomic>
#include <condition_variable>
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <functional>
#include <iostream>
#include <map>
#include <memory>
#include <mutex>
#include <string>
#include <thread>
#include <vector>

// https://preshing.com/20120515/memory-reordering-caught-in-the-act/

using namespace std;

atomic<int> t0, t1, ctl;
int r0, r1;
int X, Y;
const int waiting = 20;

#define WAIT_AND_SET(t, exp, act)                     \
    do {                                              \
        for (;;) {                                    \
            int _exp = exp;                           \
            if (t.compare_exchange_strong(_exp, 0)) { \
                break;                                \
            }                                         \
        }                                             \
    } while (0)

#ifdef USE_FENCE
#define FENCE() atomic_thread_fence(memory_order_seq_cst)
#else
#define FENCE()
#endif

void thread0() {
    for (;;) {
        WAIT_AND_SET(t0, 1, 0);
        while ((rand() % waiting) != 0) {
        }

        X = 1;
        FENCE();
        r0 = Y;

        ctl.fetch_add(1);
    }
}

void thread1() {
    for (;;) {
        WAIT_AND_SET(t1, 1, 0);
        while ((rand() % waiting) != 0) {
        }

        Y = 1;
        FENCE();
        r1 = X;

        ctl.fetch_add(1);
    }
}

void control() {
    int detected = 0;
    int iterations = 0;

    for (;;) {
        X = 0;
        Y = 0;
        iterations++;
        t0.store(1);
        t1.store(1);

        WAIT_AND_SET(ctl, 2, 0);
        if (r1 == 0 && r0 == 0) {
            detected++;
            printf("%d reorders detected after %d iterations\n", detected,
                   iterations);
        }
    }
}

int main() {
    // initialization.
    r0 = 1;
    r1 = 1;
    t0 = 0;
    t1 = 0;
    ctl = 0;
    X = 0;
    Y = 0;
    // start thread.
    thread _t0(thread0);
    thread _t1(thread1);
    thread _ctl(control);
    _t0.join();
    _t1.join();
    _ctl.join();
    return 0;
}