优化定长二进制串到整数变换

Table of Contents

在parquet格式的Decimal解析中,需要将fixed length binary转换成为整数,这个整数就是Decimal中的unscaledValue部分。这个过程可以看 https://github.com/apache/parquet-format/blob/master/LogicalTypes.md 里面的 <DECIMAL> 一节

The primitive type stores an unscaled integer value. For byte arrays, binary and fixed, the unscaled number must be encoded as two's complement using big-endian byte order (the most significant byte is the zeroth element). The scale stores the number of digits of that value that are to the right of the decimal point, and the precision stores the maximum number of digits supported in the unscaled value.

fixed length binary有两个特点:

  1. 字符串是定长的
  2. 所有字符串是连续存放的

1 基本实现

最初的实现是下面这样的,其中Slice是 `(const char* data, size_t size)` 的tuple, 解析一下过程

  • 先根据 `s.data[0]` 最高位判断是否需要使用1进行填充
  • 然后将 `s.data` 拷贝到 `value` 的高地址部分
  • 然后使用 int128_t 的字节翻转
  • 最后存储在 `dst_data` 里面
void binary_to_int128(const std::vector<Slice>& src_data, std::vector<int128_t>& dst_data) {
    size_t size = src_data.size();
    for (size_t i = 0; i < size; i++) {
        const Slice& s = src_data[i];
        int128_t value = s.data[0] & 0x80 ? -1 : 0;
        memcpy(reinterpret_cast<char*>(&value) + sizeof(value) - s.size, s.data, s.size);
        value = ToHost128(value);
        dst_data[i] = value;
    }
    return;
}

2 模板处理常量

因为整个二进制串都是定长,所以使用模板来做处理常量。

void binary_to_int128_fixed(const std::vector<Slice>& src_data, std::vector<int128_t>& dst_data) {
    size_t size = src_data.size();
    for (size_t i = 0; i < size; i++) {
        const Slice& s = src_data[i];
        int128_t value = s.data[0] & 0x80 ? -1 : 0;
        memcpy(reinterpret_cast<char*>(&value) + sizeof(value) - 7, s.data, 7);
        value = ToHost128(value);
        dst_data[i] = value;
    }
    return;
}

这个优化有改进但是不是特别大

-----------------------------------------------------------------------------
Benchmark                                   Time             CPU   Iterations
-----------------------------------------------------------------------------
run_binary_to_int128/1000000          8655761 ns      8654883 ns           80
run_binary_to_int128_fixed/1000000    8215534 ns      8214657 ns           87

3 非规则大小的memcpy

如果将上面代码放在 https://gcc.godbolt.org/ 里面,可以看到这个部分对应的汇编如下:

  • rcx/rsi 是 int128_t 的高低两个部分
  • rcx/rsi 这两个部分分别进行了填充
  • 因为memcpy 7bytes, 所以这里填充非常不规则, 使用了比较多的指令来处理
.L9:
        mov     rdx, rax
        xor     r9d, r9d
        sal     rdx, 4
        mov     r11, r9
        mov     rsi, QWORD PTR [rbx+rdx]
        movsx   rcx, BYTE PTR [rsi]
        shr     rcx, 63
        mov     r10, rcx
        neg     r10
        adc     r11, 0
        mov     rcx, r10
        mov     QWORD PTR [rsp-24], r10
        mov     r10d, DWORD PTR [rsi]
        mov     esi, DWORD PTR [rsi+3]
        neg     r11
        add     rax, 1
        bswap   rcx
        mov     QWORD PTR [rsp-16], r11
        mov     DWORD PTR [rsp-15], r10d
        mov     DWORD PTR [rsp-12], esi
        mov     rsi, QWORD PTR [rsp-16]
        mov     QWORD PTR [rdi+8+rdx], rcx
        bswap   rsi
        mov     QWORD PTR [rdi+rdx], rsi
        cmp     rax, rbp
        jb      .L9

4 使用int64_t来处理

如果使用规则大小(比如8 bytes)这样的memcpy, 那么我们可以使用一个mov就可以将内存值加载到寄存器里面。如果之后的操作也可以在寄存器里面完成的话,那么就能快不少了,实际上也的确可以做到。

假设7个字节分别是 [a7 a6 a5 a4 a3 a2 a1], 这个时候是按照big endian来存储的。之前整个过程如下:

  • 我们先按照a7的高位进行填充,假设最高位是1。 value = [1,1,1,1,1,1,1,1]
  • 然后按照7字节进行拷贝。 value = [1,a7,a6,a5,a4,a3,a2,a1]
  • 然后按照8字节进行swap. value = [a1,a2,a3,a4,a5,a6,a7,1]

其实我们也可以做个改进,使得整个过程可以在寄存器内完成。

  • 先对7字节进行拷贝,并且从低字节开始。 value = [a7,a6,a5,a4,a3,a2,a1,0]
  • 然后按照8字节进行swap. value = [0,a1,a2,a3,a4,a5,a6,a7]
  • 然后算术左移8bit. value = [a1,a2,a3,a4,a5,a6,a7,1]

我们多copy一些数据没有关系,因为最后都会被left shift给清除出去的。下面的实现有点简化,我们需要在尾部做些判断确保不会出现内存越界访问。

void binary_to_int128_ex(const std::vector<Slice>& src_data, std::vector<int128_t>& dst_data) {
    size_t size = src_data.size();
    for (size_t i = 0; i < size; i++) {
        const Slice& s = src_data[i];
        int64_t value = 0;
        memcpy((char*)&value, s.data, 8);
        value = ToHost64(value);
        value = value >> ((8 - BYTE_SIZE) * 8);
        dst_data[i] = value;
    }
    return;
}

得到的汇编就会是下面这个样子的,看上去的确比上面指令少了许多。

.L14:
        movq    (%r8,%rdx), %rax
        addq    $1, %rcx
        movq    (%rax), %rax
        bswap   %rax
        movq    %rax, %rdi
        sarq    $63, %rax
        sarq    $8, %rdi
        movq    %rax, 8(%rsi,%rdx)
        movq    %rdi, (%rsi,%rdx)
        addq    $16, %rdx
        cmpq    %r9, %rcx
        jb      .L14

结果大约提升了3倍左右

-----------------------------------------------------------------------------
Benchmark                                   Time             CPU   Iterations
-----------------------------------------------------------------------------
run_binary_to_int128/1000000          8667702 ns      8666784 ns           82
run_binary_to_int128_fixed/1000000    8182932 ns      8182026 ns           86
run_binary_to_int128_ex/1000000       2261388 ns      2261088 ns          353

5 使用int128_t来处理

同理我们可以使用int128_t来处理更大范围的整数

void binary_to_int128_ex_128(const std::vector<Slice>& src_data, std::vector<int128_t>& dst_data) {
    size_t size = src_data.size();
    for (size_t i = 0; i < size; i++) {
        const Slice& s = src_data[i];
        int128_t value = 0;
        memcpy((char*)&value, s.data, 16);
        value = ToHost128(value);
        value = value >> ((16 - BYTE_SIZE) * 8);
        dst_data[i] = value;
    }
    return;
}

得到的汇编也一样很精简

.L14:
        movq    (%r10,%rax), %rcx
        addq    $1, %rdx
        movq    (%rcx), %rsi
        movq    8(%rcx), %rcx
        bswap   %rsi
        movq    %rsi, %r9
        bswap   %rcx
        movq    %rcx, %rsi
        movq    %r9, %rdi
        shrdq   $40, %r9, %rsi
        sarq    $40, %rdi
        movq    %rsi, (%rbx,%rax)
        movq    %rdi, 8(%rbx,%rax)
        addq    $16, %rax
        cmpq    %r11, %rdx
        jb      .L14

结果如下,速度看上去比int64要稍微差些,但是也比之前的实现要快很多。

------------------------------------------------------------------------------
Benchmark                                    Time             CPU   Iterations
------------------------------------------------------------------------------
run_binary_to_int128/1000000           8619666 ns      8618856 ns           81
run_binary_to_int128_fixed/1000000     8057064 ns      8056318 ns           87
run_binary_to_int128_ex/1000000        1952846 ns      1952639 ns          349
run_binary_to_int128_ex_128/1000000    2184613 ns      2184084 ns          354

6 完整代码

github https://github.com/dirtysalt/codes/blob/master/cc/sr-test/b2i_perf.cpp

#include <benchmark/benchmark.h>
#include <emmintrin.h>
#include <immintrin.h>

#include <cmath>
#include <cstdlib>
#include <cstring>
#include <functional>
#include <iostream>
#include <random>

typedef __int128 int128_t;

struct Slice {
    const char* data;
    size_t size;
};
static constexpr int BYTE_SIZE = 11;
static constexpr int GAP_SIZE = 0;
static constexpr bool verify = false;

#define bswap_64(x) __bswap_64(x)

inline unsigned __int128 bswap_128(unsigned __int128 host_int) {
    return static_cast<unsigned __int128>(bswap_64(static_cast<uint64_t>(host_int >> 64))) |
           (static_cast<unsigned __int128>(bswap_64(static_cast<uint64_t>(host_int))) << 64);
}

static unsigned __int128 ToHost128(unsigned __int128 x) {
    return bswap_128(x);
}

static uint64_t ToHost64(uint64_t x) {
    return __bswap_64(x);
}

void make_src_data(size_t size, std::string* blob, std::vector<Slice>* src_data) {
    // assume each data is 7 bytes
    // and bewteen each data there is 4 bytes.

    // add some extra padding bytes.
    size_t bytes = (BYTE_SIZE + GAP_SIZE) * (size) + 16;
    std::mt19937_64 gen64;
    blob->resize(bytes);
    for (size_t i = 0; i < bytes; i++) {
        (*blob)[i] = gen64() & 0xff;
    }

    // construct src data.
    const char* p = blob->data();
    for (size_t i = 0; i < size; i++) {
        src_data->emplace_back(Slice{.data = p, .size = BYTE_SIZE});
        p += (BYTE_SIZE + GAP_SIZE);
    }
}

void binary_to_int128(const std::vector<Slice>& src_data, std::vector<int128_t>& dst_data) {
    size_t size = src_data.size();
    for (size_t i = 0; i < size; i++) {
        const Slice& s = src_data[i];
        int128_t value = s.data[0] & 0x80 ? -1 : 0;
        memcpy(reinterpret_cast<char*>(&value) + sizeof(value) - s.size, s.data, s.size);
        value = ToHost128(value);
        dst_data[i] = value;
    }
    return;
}

static void run_binary_to_int128(benchmark::State& state) {
    // Code inside this loop is measured repeatedly
    std::string blob;
    size_t size = state.range(0);
    std::vector<Slice> src_data;
    std::vector<int128_t> dst_data(size);
    make_src_data(size, &blob, &src_data);

    for (auto _ : state) {
        // state.PauseTiming();
        // state.ResumeTiming();
        binary_to_int128(src_data, dst_data);
    }
}

void binary_to_int128_fixed(const std::vector<Slice>& src_data, std::vector<int128_t>& dst_data) {
    size_t size = src_data.size();
    for (size_t i = 0; i < size; i++) {
        const Slice& s = src_data[i];
        int128_t value = s.data[0] & 0x80 ? -1 : 0;
        memcpy(reinterpret_cast<char*>(&value) + sizeof(value) - 7, s.data, 7);
        value = ToHost128(value);
        dst_data[i] = value;
    }
    return;
}

static void run_binary_to_int128_fixed(benchmark::State& state) {
    // Code inside this loop is measured repeatedly
    std::string blob;
    size_t size = state.range(0);
    std::vector<Slice> src_data;
    std::vector<int128_t> dst_data(size);
    make_src_data(size, &blob, &src_data);

    for (auto _ : state) {
        // state.PauseTiming();
        // state.ResumeTiming();
        binary_to_int128_fixed(src_data, dst_data);
    }
}

template <typename TYPE>
void binary_to_int128_ex(const std::vector<Slice>& src_data, std::vector<int128_t>& dst_data) {
    size_t size = src_data.size();
    for (size_t i = 0; i < size; i++) {
        const Slice& s = src_data[i];

        TYPE value = 0;
        memcpy((char*)&value, s.data, sizeof(TYPE));
        if constexpr (std::is_same_v<TYPE, int64_t>) {
            value = ToHost64(value);
        } else {
            value = ToHost128(value);
        }
        value = value >> ((sizeof(TYPE) - BYTE_SIZE) * 8);

        if constexpr ((BYTE_SIZE <= sizeof(TYPE)) && verify) {
            TYPE value2 = s.data[0] & 0x80 ? -1 : 0;
            memcpy(reinterpret_cast<char*>(&value2) + sizeof(value2) - BYTE_SIZE, s.data, BYTE_SIZE);
            if constexpr (std::is_same_v<TYPE, int64_t>) {
                value2 = ToHost64(value2);
            } else {
                value2 = ToHost128(value2);
            }
            if (value != value2) {
                printf("FAILED at %s. v = %p, v2 = %p, raw = ", __func__, value, value2);
                for (int j = 0; j < BYTE_SIZE; j++) {
                    printf("%x ", s.data[j]);
                }
                printf("\n");
                exit(-1);
            }
        }
        dst_data[i] = value;
    }
    return;
}

static void run_binary_to_int128_ex(benchmark::State& state) {
    // Code inside this loop is measured repeatedly
    std::string blob;
    size_t size = state.range(0);
    std::vector<Slice> src_data;
    std::vector<int128_t> dst_data(size);
    make_src_data(size, &blob, &src_data);

    for (auto _ : state) {
        // state.PauseTiming();
        // state.ResumeTiming();
        binary_to_int128_ex<int64_t>(src_data, dst_data);
    }
}

static void run_binary_to_int128_ex_128(benchmark::State& state) {
    // Code inside this loop is measured repeatedly
    std::string blob;
    size_t size = state.range(0);
    std::vector<Slice> src_data;
    std::vector<int128_t> dst_data(size);
    make_src_data(size, &blob, &src_data);

    for (auto _ : state) {
        // state.PauseTiming();
        // state.ResumeTiming();
        binary_to_int128_ex<int128_t>(src_data, dst_data);
    }
}

static constexpr size_t N = 1000000;
BENCHMARK(run_binary_to_int128)->Args({N});
BENCHMARK(run_binary_to_int128_fixed)->Args({N});
BENCHMARK(run_binary_to_int128_ex)->Args({N});
BENCHMARK(run_binary_to_int128_ex_128)->Args({N});