SIMD代码片段分析

过去一直想收集些自己曾经看过的SIMD代码,觉得这些代码写出来都非常有意思。按照我粗浅的理解,SIMD设计初衷有两个:数据并行化以及减少分支操作。数据并行化这个自不必说,在许多代码下面如果分支预测不好的话,CPU的深度流水线就没有什么用途,造成的misprediction penalty是非常高的。所以在设计和使用SIMD指令的时候,脑子里面一定要将程序设计里面的分支跳转全部忘掉,而是想如何能够使用mask, and, or这些bit操作技巧来得到计算结果。

想要知道代码仓库里面是否使用了SIMD,简单的方式就是搜索一些是否使用下面几个头文件


下面两个代码片段来自惠新宸老师的博客,链接如下:

我把代码复制下来了,博客上有关于这段代码的解释,我就不再这里赘述了。在replace函数里面可以计算替换了多少个字符, 不过这个操作代价有点高,相比替换操作来说 "__builtin_popcount" 这个操作可以占到超过1/2的时间。

int StringReplace1(char* s, size_t sz, char x, char y) {
    int ans = 0;
    size_t i = 0;
    __m128i search = _mm_set1_epi8(x);
    __m128i delta = _mm_set1_epi8(y - x);
    // unaligned.
    for(;(i+16)<sz;i+=16) {
        __m128i d = _mm_loadu_si128((__m128i*)(s+i));
        __m128i mask = _mm_cmpeq_epi8(d, search);
        int ret = _mm_movemask_epi8(mask);
        if (ret) {
            __m128i add = _mm_and_si128(mask, delta);
            __m128i res = _mm_add_epi8(add, d);
            _mm_storeu_si128((__m128i*)(s+i), res);
            //            ans += __builtin_popcount(ret);
        }
    }
    for(;i<sz;i++) {
        if(s[i] == x) {
            s[i] = y;
            // ans += 1;
        }
    }
    return ans;
}

测试代码可以看 这里, 粘贴一些运行时间。运行时间分为 sparse 和 dense两种模式,sparse触发替换比较少的,dense则是比较多的。可以预测到,dense模式下面分支预测比较好,所以运行时间相比sparse更短。

YAN007 :: ~/shared » ./a.out
Replace0: mode=sparse, sz=32(0,0) timer=1ms
Replace1: mode=sparse, sz=32(0,0), timer=0ms
Replace0: mode=sparse, sz=128(0,0) timer=5ms
Replace1: mode=sparse, sz=128(0,0), timer=1ms
Replace0: mode=sparse, sz=1024(0,0) timer=42ms
Replace1: mode=sparse, sz=1024(0,0), timer=3ms
Replace0: mode=sparse, sz=10240(0,0) timer=408ms
Replace1: mode=sparse, sz=10240(0,0), timer=28ms
Replace0: mode=sparse, sz=20480(0,0) timer=820ms
Replace1: mode=sparse, sz=20480(0,0), timer=55ms
Replace0: mode=dense, sz=32(0,0) timer=1ms
Replace1: mode=dense, sz=32(0,0), timer=0ms
Replace0: mode=dense, sz=128(0,0) timer=5ms
Replace1: mode=dense, sz=128(0,0), timer=0ms
Replace0: mode=dense, sz=1024(0,0) timer=31ms
Replace1: mode=dense, sz=1024(0,0), timer=3ms
Replace0: mode=dense, sz=10240(0,0) timer=307ms
Replace1: mode=dense, sz=10240(0,0), timer=25ms
Replace0: mode=dense, sz=20480(0,0) timer=617ms
Replace1: mode=dense, sz=20480(0,0), timer=51ms

至于stirng tolowe/upper这个函数,思想就是拿每个字符去和'A', 'Z'比较,如果处于这个中间的话,那么就直接+32. 因为SSE2只有cmpeq和cmpgt, 没有cmpge,cmple,所以在阈值选择上需要+1/-1. 这个函数的效果相比naive的for循环使用,也是好得多的。

void StringLower1(char* s, size_t sz) {
    size_t i = 0;
    __m128i a = _mm_set1_epi8(64);
    __m128i z = _mm_set1_epi8(91);
    __m128i delta = _mm_set1_epi8(32);
    // unaligned.
    for(;(i+16)<sz;i+=16) {
        __m128i d = _mm_loadu_si128((__m128i*)(s+i));
        __m128i x = _mm_cmpgt_epi8(d, a);
        __m128i y = _mm_cmpgt_epi8(z, d);
        __m128i z = _mm_and_si128(x, y);
        __m128i z2 = _mm_and_si128(z, delta);
        __m128i res = _mm_add_epi8(d, z2);
        _mm_storeu_si128((__m128i*)(s+i), res);
    }
    for(;i<sz;i++) {
        if (s[i] >= 65 && s[i] <= 90) {
            s[i] += 32;
        }
    }
}

下面两个代码片段来自RapidJSON:

其中writer中的实现是 `ScanWriteUnescapedString` 复制一个字符串直到出现某些字符,而reader中的实现是 `SkipWhitespace_SIMD` 扫描到最近一个空格。两个函数有点类似,reader中使用了比较高级的 `_mm_cmpistri` 指令,所以这里我不选择reader中的函数,只看看writer中的函数。

下面是这段代码,稍微有点长,可以分为下面几个部分阅读:

处理中间对齐的部分是这样的:

这种操作让人联想到了是否可以使用 SIMD的load/store 来加速memcpy呢?后面可以做做实验。

template<>
inline bool Writer<StringBuffer>::ScanWriteUnescapedString(StringStream& is, size_t length) {
    if (length < 16)
        return RAPIDJSON_LIKELY(is.Tell() < length);

    if (!RAPIDJSON_LIKELY(is.Tell() < length))
        return false;

    const char* p = is.src_;
    const char* end = is.head_ + length;
    const char* nextAligned = reinterpret_cast<const char*>((reinterpret_cast<size_t>(p) + 15) & static_cast<size_t>(~15));
    const char* endAligned = reinterpret_cast<const char*>(reinterpret_cast<size_t>(end) & static_cast<size_t>(~15));
    if (nextAligned > end)
        return true;

    while (p != nextAligned)
        if (*p < 0x20 || *p == '\"' || *p == '\\') {
            is.src_ = p;
            return RAPIDJSON_LIKELY(is.Tell() < length);
        }
        else
            os_->PutUnsafe(*p++);

    // The rest of string using SIMD
    static const char dquote[16] = { '\"', '\"', '\"', '\"', '\"', '\"', '\"', '\"', '\"', '\"', '\"', '\"', '\"', '\"', '\"', '\"' };
    static const char bslash[16] = { '\\', '\\', '\\', '\\', '\\', '\\', '\\', '\\', '\\', '\\', '\\', '\\', '\\', '\\', '\\', '\\' };
    static const char space[16]  = { 0x1F, 0x1F, 0x1F, 0x1F, 0x1F, 0x1F, 0x1F, 0x1F, 0x1F, 0x1F, 0x1F, 0x1F, 0x1F, 0x1F, 0x1F, 0x1F };
    const __m128i dq = _mm_loadu_si128(reinterpret_cast<const __m128i *>(&dquote[0]));
    const __m128i bs = _mm_loadu_si128(reinterpret_cast<const __m128i *>(&bslash[0]));
    const __m128i sp = _mm_loadu_si128(reinterpret_cast<const __m128i *>(&space[0]));

    for (; p != endAligned; p += 16) {
        const __m128i s = _mm_load_si128(reinterpret_cast<const __m128i *>(p));
        const __m128i t1 = _mm_cmpeq_epi8(s, dq);
        const __m128i t2 = _mm_cmpeq_epi8(s, bs);
        const __m128i t3 = _mm_cmpeq_epi8(_mm_max_epu8(s, sp), sp); // s < 0x20 <=> max(s, 0x1F) == 0x1F
        const __m128i x = _mm_or_si128(_mm_or_si128(t1, t2), t3);
        unsigned short r = static_cast<unsigned short>(_mm_movemask_epi8(x));
        if (RAPIDJSON_UNLIKELY(r != 0)) {   // some of characters is escaped
            SizeType len;
#ifdef _MSC_VER         // Find the index of first escaped
            unsigned long offset;
            _BitScanForward(&offset, r);
            len = offset;
#else
            len = static_cast<SizeType>(__builtin_ffs(r) - 1);
#endif
            char* q = reinterpret_cast<char*>(os_->PushUnsafe(len));
            for (size_t i = 0; i < len; i++)
                q[i] = p[i];

            p += len;
            break;
        }
        _mm_storeu_si128(reinterpret_cast<__m128i *>(os_->PushUnsafe(16)), s);
    }

    is.src_ = p;
    return RAPIDJSON_LIKELY(is.Tell() < length);
}

用 SIMD 来加速memcpy是否可行呢? 测试代码可以看 这里, 从结果上看没有 `std::memcpy` 效果好。

Memcopy: mode=dense, sz=32 timer=2ms
std::memcpy: mode=dense, sz=32, timer=0ms
Memcopy: mode=dense, sz=128 timer=3ms
std::memcpy: mode=dense, sz=128, timer=0ms
Memcopy: mode=dense, sz=1024 timer=7ms
std::memcpy: mode=dense, sz=1024, timer=3ms
Memcopy: mode=dense, sz=10240 timer=50ms
std::memcpy: mode=dense, sz=10240, timer=21ms
Memcopy: mode=dense, sz=20480 timer=170ms
std::memcpy: mode=dense, sz=20480, timer=115ms