优化定长二进制串到整数变换
Table of Contents
在parquet格式的Decimal解析中,需要将fixed length binary转换成为整数,这个整数就是Decimal中的unscaledValue部分。这个过程可以看 https://github.com/apache/parquet-format/blob/master/LogicalTypes.md 里面的 <DECIMAL> 一节
The primitive type stores an unscaled integer value. For byte arrays, binary and fixed, the unscaled number must be encoded as two's complement using big-endian byte order (the most significant byte is the zeroth element). The scale stores the number of digits of that value that are to the right of the decimal point, and the precision stores the maximum number of digits supported in the unscaled value.
fixed length binary有两个特点:
- 字符串是定长的
- 所有字符串是连续存放的
1. 基本实现
最初的实现是下面这样的,其中Slice是 `(const char* data, size_t size)` 的tuple, 解析一下过程
- 先根据 `s.data[0]` 最高位判断是否需要使用1进行填充
- 然后将 `s.data` 拷贝到 `value` 的高地址部分
- 然后使用 int128_t 的字节翻转
- 最后存储在 `dst_data` 里面
void binary_to_int128(const std::vector<Slice>& src_data, std::vector<int128_t>& dst_data) { size_t size = src_data.size(); for (size_t i = 0; i < size; i++) { const Slice& s = src_data[i]; int128_t value = s.data[0] & 0x80 ? -1 : 0; memcpy(reinterpret_cast<char*>(&value) + sizeof(value) - s.size, s.data, s.size); value = ToHost128(value); dst_data[i] = value; } return; }
2. 模板处理常量
因为整个二进制串都是定长,所以使用模板来做处理常量。
void binary_to_int128_fixed(const std::vector<Slice>& src_data, std::vector<int128_t>& dst_data) { size_t size = src_data.size(); for (size_t i = 0; i < size; i++) { const Slice& s = src_data[i]; int128_t value = s.data[0] & 0x80 ? -1 : 0; memcpy(reinterpret_cast<char*>(&value) + sizeof(value) - 7, s.data, 7); value = ToHost128(value); dst_data[i] = value; } return; }
这个优化有改进但是不是特别大
----------------------------------------------------------------------------- Benchmark Time CPU Iterations ----------------------------------------------------------------------------- run_binary_to_int128/1000000 8655761 ns 8654883 ns 80 run_binary_to_int128_fixed/1000000 8215534 ns 8214657 ns 87
3. 非规则大小的memcpy
如果将上面代码放在 https://gcc.godbolt.org/ 里面,可以看到这个部分对应的汇编如下:
- rcx/rsi 是 int128_t 的高低两个部分
- rcx/rsi 这两个部分分别进行了填充
- 因为memcpy 7bytes, 所以这里填充非常不规则, 使用了比较多的指令来处理
.L9: mov rdx, rax xor r9d, r9d sal rdx, 4 mov r11, r9 mov rsi, QWORD PTR [rbx+rdx] movsx rcx, BYTE PTR [rsi] shr rcx, 63 mov r10, rcx neg r10 adc r11, 0 mov rcx, r10 mov QWORD PTR [rsp-24], r10 mov r10d, DWORD PTR [rsi] mov esi, DWORD PTR [rsi+3] neg r11 add rax, 1 bswap rcx mov QWORD PTR [rsp-16], r11 mov DWORD PTR [rsp-15], r10d mov DWORD PTR [rsp-12], esi mov rsi, QWORD PTR [rsp-16] mov QWORD PTR [rdi+8+rdx], rcx bswap rsi mov QWORD PTR [rdi+rdx], rsi cmp rax, rbp jb .L9
4. 使用int64_t来处理
如果使用规则大小(比如8 bytes)这样的memcpy, 那么我们可以使用一个mov就可以将内存值加载到寄存器里面。如果之后的操作也可以在寄存器里面完成的话,那么就能快不少了,实际上也的确可以做到。
假设7个字节分别是 [a7 a6 a5 a4 a3 a2 a1], 这个时候是按照big endian来存储的。之前整个过程如下:
- 我们先按照a7的高位进行填充,假设最高位是1。 value = [1,1,1,1,1,1,1,1]
- 然后按照7字节进行拷贝。 value = [1,a7,a6,a5,a4,a3,a2,a1]
- 然后按照8字节进行swap. value = [a1,a2,a3,a4,a5,a6,a7,1]
其实我们也可以做个改进,使得整个过程可以在寄存器内完成。
- 先对7字节进行拷贝,并且从低字节开始。 value = [a7,a6,a5,a4,a3,a2,a1,0]
- 然后按照8字节进行swap. value = [0,a1,a2,a3,a4,a5,a6,a7]
- 然后算术左移8bit. value = [a1,a2,a3,a4,a5,a6,a7,1]
我们多copy一些数据没有关系,因为最后都会被left shift给清除出去的。下面的实现有点简化,我们需要在尾部做些判断确保不会出现内存越界访问。
void binary_to_int128_ex(const std::vector<Slice>& src_data, std::vector<int128_t>& dst_data) { size_t size = src_data.size(); for (size_t i = 0; i < size; i++) { const Slice& s = src_data[i]; int64_t value = 0; memcpy((char*)&value, s.data, 8); value = ToHost64(value); value = value >> ((8 - BYTE_SIZE) * 8); dst_data[i] = value; } return; }
得到的汇编就会是下面这个样子的,看上去的确比上面指令少了许多。
.L14: movq (%r8,%rdx), %rax addq $1, %rcx movq (%rax), %rax bswap %rax movq %rax, %rdi sarq $63, %rax sarq $8, %rdi movq %rax, 8(%rsi,%rdx) movq %rdi, (%rsi,%rdx) addq $16, %rdx cmpq %r9, %rcx jb .L14
结果大约提升了3倍左右
----------------------------------------------------------------------------- Benchmark Time CPU Iterations ----------------------------------------------------------------------------- run_binary_to_int128/1000000 8667702 ns 8666784 ns 82 run_binary_to_int128_fixed/1000000 8182932 ns 8182026 ns 86 run_binary_to_int128_ex/1000000 2261388 ns 2261088 ns 353
5. 使用int128_t来处理
同理我们可以使用int128_t来处理更大范围的整数
void binary_to_int128_ex_128(const std::vector<Slice>& src_data, std::vector<int128_t>& dst_data) { size_t size = src_data.size(); for (size_t i = 0; i < size; i++) { const Slice& s = src_data[i]; int128_t value = 0; memcpy((char*)&value, s.data, 16); value = ToHost128(value); value = value >> ((16 - BYTE_SIZE) * 8); dst_data[i] = value; } return; }
得到的汇编也一样很精简
.L14: movq (%r10,%rax), %rcx addq $1, %rdx movq (%rcx), %rsi movq 8(%rcx), %rcx bswap %rsi movq %rsi, %r9 bswap %rcx movq %rcx, %rsi movq %r9, %rdi shrdq $40, %r9, %rsi sarq $40, %rdi movq %rsi, (%rbx,%rax) movq %rdi, 8(%rbx,%rax) addq $16, %rax cmpq %r11, %rdx jb .L14
结果如下,速度看上去比int64要稍微差些,但是也比之前的实现要快很多。
------------------------------------------------------------------------------ Benchmark Time CPU Iterations ------------------------------------------------------------------------------ run_binary_to_int128/1000000 8619666 ns 8618856 ns 81 run_binary_to_int128_fixed/1000000 8057064 ns 8056318 ns 87 run_binary_to_int128_ex/1000000 1952846 ns 1952639 ns 349 run_binary_to_int128_ex_128/1000000 2184613 ns 2184084 ns 354
6. 完整代码
github https://github.com/dirtysalt/codes/blob/master/cc/sr-test/b2i_perf.cpp
#include <benchmark/benchmark.h> #include <emmintrin.h> #include <immintrin.h> #include <cmath> #include <cstdlib> #include <cstring> #include <functional> #include <iostream> #include <random> typedef __int128 int128_t; struct Slice { const char* data; size_t size; }; static constexpr int BYTE_SIZE = 11; static constexpr int GAP_SIZE = 0; static constexpr bool verify = false; #define bswap_64(x) __bswap_64(x) inline unsigned __int128 bswap_128(unsigned __int128 host_int) { return static_cast<unsigned __int128>(bswap_64(static_cast<uint64_t>(host_int >> 64))) | (static_cast<unsigned __int128>(bswap_64(static_cast<uint64_t>(host_int))) << 64); } static unsigned __int128 ToHost128(unsigned __int128 x) { return bswap_128(x); } static uint64_t ToHost64(uint64_t x) { return __bswap_64(x); } void make_src_data(size_t size, std::string* blob, std::vector<Slice>* src_data) { // assume each data is 7 bytes // and bewteen each data there is 4 bytes. // add some extra padding bytes. size_t bytes = (BYTE_SIZE + GAP_SIZE) * (size) + 16; std::mt19937_64 gen64; blob->resize(bytes); for (size_t i = 0; i < bytes; i++) { (*blob)[i] = gen64() & 0xff; } // construct src data. const char* p = blob->data(); for (size_t i = 0; i < size; i++) { src_data->emplace_back(Slice{.data = p, .size = BYTE_SIZE}); p += (BYTE_SIZE + GAP_SIZE); } } void binary_to_int128(const std::vector<Slice>& src_data, std::vector<int128_t>& dst_data) { size_t size = src_data.size(); for (size_t i = 0; i < size; i++) { const Slice& s = src_data[i]; int128_t value = s.data[0] & 0x80 ? -1 : 0; memcpy(reinterpret_cast<char*>(&value) + sizeof(value) - s.size, s.data, s.size); value = ToHost128(value); dst_data[i] = value; } return; } static void run_binary_to_int128(benchmark::State& state) { // Code inside this loop is measured repeatedly std::string blob; size_t size = state.range(0); std::vector<Slice> src_data; std::vector<int128_t> dst_data(size); make_src_data(size, &blob, &src_data); for (auto _ : state) { // state.PauseTiming(); // state.ResumeTiming(); binary_to_int128(src_data, dst_data); } } void binary_to_int128_fixed(const std::vector<Slice>& src_data, std::vector<int128_t>& dst_data) { size_t size = src_data.size(); for (size_t i = 0; i < size; i++) { const Slice& s = src_data[i]; int128_t value = s.data[0] & 0x80 ? -1 : 0; memcpy(reinterpret_cast<char*>(&value) + sizeof(value) - 7, s.data, 7); value = ToHost128(value); dst_data[i] = value; } return; } static void run_binary_to_int128_fixed(benchmark::State& state) { // Code inside this loop is measured repeatedly std::string blob; size_t size = state.range(0); std::vector<Slice> src_data; std::vector<int128_t> dst_data(size); make_src_data(size, &blob, &src_data); for (auto _ : state) { // state.PauseTiming(); // state.ResumeTiming(); binary_to_int128_fixed(src_data, dst_data); } } template <typename TYPE> void binary_to_int128_ex(const std::vector<Slice>& src_data, std::vector<int128_t>& dst_data) { size_t size = src_data.size(); for (size_t i = 0; i < size; i++) { const Slice& s = src_data[i]; TYPE value = 0; memcpy((char*)&value, s.data, sizeof(TYPE)); if constexpr (std::is_same_v<TYPE, int64_t>) { value = ToHost64(value); } else { value = ToHost128(value); } value = value >> ((sizeof(TYPE) - BYTE_SIZE) * 8); if constexpr ((BYTE_SIZE <= sizeof(TYPE)) && verify) { TYPE value2 = s.data[0] & 0x80 ? -1 : 0; memcpy(reinterpret_cast<char*>(&value2) + sizeof(value2) - BYTE_SIZE, s.data, BYTE_SIZE); if constexpr (std::is_same_v<TYPE, int64_t>) { value2 = ToHost64(value2); } else { value2 = ToHost128(value2); } if (value != value2) { printf("FAILED at %s. v = %p, v2 = %p, raw = ", __func__, value, value2); for (int j = 0; j < BYTE_SIZE; j++) { printf("%x ", s.data[j]); } printf("\n"); exit(-1); } } dst_data[i] = value; } return; } static void run_binary_to_int128_ex(benchmark::State& state) { // Code inside this loop is measured repeatedly std::string blob; size_t size = state.range(0); std::vector<Slice> src_data; std::vector<int128_t> dst_data(size); make_src_data(size, &blob, &src_data); for (auto _ : state) { // state.PauseTiming(); // state.ResumeTiming(); binary_to_int128_ex<int64_t>(src_data, dst_data); } } static void run_binary_to_int128_ex_128(benchmark::State& state) { // Code inside this loop is measured repeatedly std::string blob; size_t size = state.range(0); std::vector<Slice> src_data; std::vector<int128_t> dst_data(size); make_src_data(size, &blob, &src_data); for (auto _ : state) { // state.PauseTiming(); // state.ResumeTiming(); binary_to_int128_ex<int128_t>(src_data, dst_data); } } static constexpr size_t N = 1000000; BENCHMARK(run_binary_to_int128)->Args({N}); BENCHMARK(run_binary_to_int128_fixed)->Args({N}); BENCHMARK(run_binary_to_int128_ex)->Args({N}); BENCHMARK(run_binary_to_int128_ex_128)->Args({N});