snappy

Table of Contents

http://code.google.com/p/snappy/

1 Overview

Snappy is a compression/decompression library. It does not aim for maximum compression, or compatibility with any other compression library; instead, it aims for very high speeds and reasonable compression. For instance, compared to the fastest mode of zlib, Snappy is an order of magnitude faster for most inputs, but the resulting compressed files are anywhere from 20% to 100% bigger. (For more information, see "Performance", below.)

Snappy is intended to be fast. On a single core of a Core i7 processor in 64-bit mode, it compresses at about 250 MB/sec or more and decompresses at about 500 MB/sec or more. (These numbers are for the slowest inputs in our benchmark suite; others are much faster.) In our tests, Snappy usually is faster than algorithms in the same class (e.g. LZO, LZF, FastLZ, QuickLZ, etc.) while achieving comparable compression ratios.

Typical compression ratios (based on the benchmark suite) are about 1.5-1.7x for plain text, about 2-4x for HTML, and of course 1.0x for JPEGs, PNGs and other already-compressed data. Similar numbers for zlib in its fastest mode are 2.6-2.8x, 3-7x and 1.0x, respectively. More sophisticated algorithms are capable of achieving yet higher compression rates, although usually at the expense of speed. Of course, compression ratio will vary significantly with the input.

Snappy里面大部分操作都是针对32/64位数据操作,然后假设不对齐的32/64位数据载入开销很少。 同时假设CPU字节序是little-endian.也有big-endian实现但是速度相对较慢。对于Intel处理器就非常适用。

Although Snappy should be fairly portable, it is primarily optimized for 64-bit x86-compatible processors, and may run slower in other environments. In particular:

  • Snappy uses 64-bit operations in several places to process more data at once than would otherwise be possible.
  • Snappy assumes unaligned 32- and 64-bit loads and stores are cheap. On some platforms, these must be emulated with single-byte loads and stores, which is much slower.
  • Snappy assumes little-endian throughout, and needs to byte-swap data in several places if running on a big-endian platform.

2 Format

Snappy is a LZ77-type compressor with a fixed, byte-oriented encoding.

压缩的头部是一个原始串长度,然后使用变长整数方式来编码(google::protobuf也使用)

The stream starts with the uncompressed length (up to a maximum of 2^32 - 1), stored as a little-endian varint. Varints consist of a series of bytes, where the lower 7 bits are data and the upper bit is set iff there are more bytes to be read. In other words, an uncompressed length of 64 would be stored as 0x40, and an uncompressed length of 2097151 (0x1FFFFF) would be stored as 0xFF 0xFF 0x7F.

对于LZ77类型的压缩算法说里面包含两个部分

  • literal(字表).这个表示原始内容
  • copy(拷贝).这个表示针对原始内容的copy.

对于LZ77类型压缩类型,所谓的copy无非就是(offset,length).然后通过精巧的编码方式将(offset,length)写入。 通常来说编码方式会比原始内容小,这样达到压缩目的。而literal和copy本身也通过编码方式来进行区分,第一个字节 低两位00表示literal,而01,10,11分别表示不同类型的copy.

Snappy实现上来说非常简单,首先针对字节流进行分块(比如32KB,现在实现就是这样的,当然可以小于32KB).针对每个32KB都压缩称为 (literal+ copy+)* literal+这样的模式。解压起来也非常简单,首先将Literal解压缩copy到output字节流里面去,然后根据后面的copy 配合之前输出的output解压出来。

2.1 Literal

对于literal来说的话编码方式是这样的,假设literal的长度为len(len>=1).

  • 如果len<=60.那么literal使用一个字节表示(len-1) << 2 | 0x0 (1个字节)
  • 如果len可以用1个字节表示的话,那么literal表示[60 << 2 | 0x0, len](2个字节)
  • 如果len可以用2个字节表示的话,那么literal表示[61 << 2 | 0x0, len](3个字节).
  • 如果len可以用3个字节表示的话,那么literal表示[62 << 2 | 0x0, len](4个字节).
  • 如果len可以用4个字节表示的话,那么literal表示[63 << 2 | 0x0, len](5个字节).

注意这里len都是按照小端序写入的。

2.2 Copy

对于copy来说的话我们假设内容是(offset,length).(offset>=1,length>=1).

  • 如果length在[4,11],而offset在[0,2047].那么表示[(offset >> 8) << 5 | (length-4) <<2 | 0x01 , offset & 0xff](2个字节)
  • 如果length在[1,64],而offset在[0,65535],那么表示[(length-1) << 2 | 0x02 , offset](3个字节)
  • 如果length在[1,64],而offset在[0,2^32-1],那么表示[(length-1) << 2 | 0x03, offset](5个字节)

我们这里注意到length都在[1,64]之间,如果length>64的话。那么我们可以将copy切分。这个在代码里面可以看到。

3 Source & Sink

这里Source表示输入源,而Sink表示输出源。不过设计上比较遗憾的是在Compress上面使用了这两个结构, 而在Uncompress上面的话只是使用了Source这个结构,而输出源的话使用了另外一个内部类Writer.

size_t Compress(Source* source, Sink* sink);
bool RawUncompress(Source* compressed, char* uncompressed); // 内部有一个Writer封装了uncompressed操作

我们首先看看Sink定义的接口。能够理解这个接口全依赖这个清晰的注释。其中

  • Append接口的话是将bytes[0,n-1]这个字节流写入。
  • GetAppendBuffer的话是交出一块length的buffer。这块length的buffer的话必须一致有效直到Append被调用。当然我们也可以直接返回scratch(外围框架分配的内存).
// A Sink is an interface that consumes a sequence of bytes.
class Sink {
 public:
  Sink() { }
  virtual ~Sink();

  // Append "bytes[0,n-1]" to this.
  virtual void Append(const char* bytes, size_t n) = 0;

  // Returns a writable buffer of the specified length for appending.
  // May return a pointer to the caller-owned scratch buffer which
  // must have at least the indicated length.  The returned buffer is
  // only valid until the next operation on this Sink.
  //
  // After writing at most "length" bytes, call Append() with the
  // pointer returned from this function and the number of bytes
  // written.  Many Append() implementations will avoid copying
  // bytes if this function returned an internal buffer.
  //
  // If a non-scratch buffer is returned, the caller may only pass a
  // prefix of it to Append().  That is, it is not correct to pass an
  // interior pointer of the returned array to Append().
  //
  // The default implementation always returns the scratch buffer.
  virtual char* GetAppendBuffer(size_t length, char* scratch);

 private:
  // No copying
  Sink(const Sink&);
  void operator=(const Sink&);
};

然后看看snappy默认实现Sink接口。这里GetAppendBuffer并没有使用scratch而是直接返回dest_,这样的话可以减少1次copy.因为如果返回scratch的话,那么外部框架首先copy到scratch,然后从scratch在copy回dest_.所以这里为什么Append需要判断(data!=dest_).

// A Sink implementation that writes to a flat array without any bound checks.
class UncheckedByteArraySink : public Sink {
 public:
  explicit UncheckedByteArraySink(char* dest) : dest_(dest) { }
  virtual ~UncheckedByteArraySink();
  virtual void Append(const char* data, size_t n);
  virtual char* GetAppendBuffer(size_t len, char* scratch);

  // Return the current output pointer so that a caller can see how
  // many bytes were produced.
  // Note: this is not a Sink method.
  char* CurrentDestination() const { return dest_; }
 private:
  char* dest_;
};

void UncheckedByteArraySink::Append(const char* data, size_t n) {
  // Do no copying if the caller filled in the result of GetAppendBuffer()
  if (data != dest_) {
    memcpy(dest_, data, n);
  }
  dest_ += n;
}

char* UncheckedByteArraySink::GetAppendBuffer(size_t len, char* scratch) {
  return dest_;
}

其实这个Sink接口非常简单地重定向到其他部分比如文件,这个在Example里面会给出例子。

接着我们看看Source定义的接口。同样能够理解这个接口全依赖清晰的注释。其中

  • Available表示还有多少个字节剩余。
  • Peek是返回前面可以窥探到的字节流,并且返回长度。返回的buffer必须持续有效直到Skip.
  • Skip相当于告诉Source某个部分的字节流已经不需要被使用了。
// A Source is an interface that yields a sequence of bytes
class Source {
 public:
  Source() { }
  virtual ~Source();

  // Return the number of bytes left to read from the source
  virtual size_t Available() const = 0;

  // Peek at the next flat region of the source.  Does not reposition
  // the source.  The returned region is empty iff Available()==0.
  //
  // Returns a pointer to the beginning of the region and store its
  // length in *len.
  //
  // The returned region is valid until the next call to Skip() or
  // until this object is destroyed, whichever occurs first.
  //
  // The returned region may be larger than Available() (for example
  // if this ByteSource is a view on a substring of a larger source).
  // The caller is responsible for ensuring that it only reads the
  // Available() bytes.
  virtual const char* Peek(size_t* len) = 0;

  // Skip the next n bytes.  Invalidates any buffer returned by
  // a previous call to Peek().
  // REQUIRES: Available() >= n
  virtual void Skip(size_t n) = 0;

 private:
  // No copying
  Source(const Source&);
  void operator=(const Source&);
};

然后snappy的默认实现非常简单

// A Source implementation that yields the contents of a flat array
class ByteArraySource : public Source {
 public:
  ByteArraySource(const char* p, size_t n) : ptr_(p), left_(n) { }
  virtual ~ByteArraySource();
  virtual size_t Available() const;
  virtual const char* Peek(size_t* len);
  virtual void Skip(size_t n);
 private:
  const char* ptr_;
  size_t left_;
};

size_t ByteArraySource::Available() const { return left_; }

const char* ByteArraySource::Peek(size_t* len) {
  *len = left_;
  return ptr_;
}

void ByteArraySource::Skip(size_t n) {
  left_ -= n;
  ptr_ += n;
}

从Source接口上来看到的话并不是非常好扩展。因为一开始必须知道串有多大并且从代码上看Snappy并不是一个可以增量压缩的东西。 所以个人感觉来说Source只能够做内存buffer的封装而不能够包装磁盘或者是网络流。

4 Snippet

Snappy里面有相当多的代码片段非常精巧(一定程度上难懂),所以有必要首先看看这些函数实现。

4.1 Bits

定义了一些位操作,都使用了gcc内置函数

// Some bit-manipulation functions.
class Bits {
 public:
  // Return floor(log2(n)) for positive integer n.  Returns -1 iff n == 0.
  static int Log2Floor(uint32 n);

  // Return the first set least / most significant bit, 0-indexed.  Returns an
  // undefined value if n == 0.  FindLSBSetNonZero() is similar to ffs() except
  // that it's 0-indexed.
  static int FindLSBSetNonZero(uint32 n);
  static int FindLSBSetNonZero64(uint64 n);

 private:
  DISALLOW_COPY_AND_ASSIGN(Bits);
};

inline int Bits::Log2Floor(uint32 n) {
  return n == 0 ? -1 : 31 ^ __builtin_clz(n); // 只是取低5位即可
}

inline int Bits::FindLSBSetNonZero(uint32 n) {
  return __builtin_ctz(n);
}

inline int Bits::FindLSBSetNonZero64(uint64 n) {
  return __builtin_ctzll(n);
}

为了方便这里给出三个内置函数解释

  • int __builtin_clz (unsigned int x) // Returns the number of leading 0-bits in x, starting at the most significant bit position. If x is 0, the result is undefined.
  • int __builtin_ctz (unsigned int x) // Returns the number of trailing 0-bits in x, starting at the least significant bit position. If x is 0, the result is undefined.
  • int __builtin_clzll (unsigned long long) // Similar to __builtin_clz, except the argument type is unsigned long long.

4.2 Varint

定义了如何将32位整数进行编码和解码。关于这种编码方式可以参考protobuf的链接 http://code.google.com/intl/zh-CN/apis/protocolbuffers/docs/encoding.html#varints

// Variable-length integer encoding.
class Varint {
 public:
  // Maximum lengths of varint encoding of uint32.
  static const int kMax32 = 5;

  // Attempts to parse a varint32 from a prefix of the bytes in [ptr,limit-1].
  // Never reads a character at or beyond limit.  If a valid/terminated varint32
  // was found in the range, stores it in *OUTPUT and returns a pointer just
  // past the last byte of the varint32. Else returns NULL.  On success,
  // "result <= limit".
  static const char* Parse32WithLimit(const char* ptr, const char* limit,
                                      uint32* OUTPUT);

  // REQUIRES   "ptr" points to a buffer of length sufficient to hold "v".
  // EFFECTS    Encodes "v" into "ptr" and returns a pointer to the
  //            byte just past the last encoded byte.
  static char* Encode32(char* ptr, uint32 v);

  // EFFECTS    Appends the varint representation of "value" to "*s".
  static void Append32(string* s, uint32 value);
};

inline const char* Varint::Parse32WithLimit(const char* p,
                                            const char* l,
                                            uint32* OUTPUT) {
  const unsigned char* ptr = reinterpret_cast<const unsigned char*>(p);
  const unsigned char* limit = reinterpret_cast<const unsigned char*>(l);
  uint32 b, result;
  if (ptr >= limit) return NULL;
  b = *(ptr++); result = b & 127;          if (b < 128) goto done;
  if (ptr >= limit) return NULL;
  b = *(ptr++); result |= (b & 127) <<  7; if (b < 128) goto done;
  if (ptr >= limit) return NULL;
  b = *(ptr++); result |= (b & 127) << 14; if (b < 128) goto done;
  if (ptr >= limit) return NULL;
  b = *(ptr++); result |= (b & 127) << 21; if (b < 128) goto done;
  if (ptr >= limit) return NULL;
  b = *(ptr++); result |= (b & 127) << 28; if (b < 16) goto done;
  return NULL;       // Value is too long to be a varint32
 done:
  *OUTPUT = result;
  return reinterpret_cast<const char*>(ptr);
}

inline char* Varint::Encode32(char* sptr, uint32 v) {
  // Operate on characters as unsigneds
  unsigned char* ptr = reinterpret_cast<unsigned char*>(sptr);
  static const int B = 128;
  if (v < (1<<7)) {
    *(ptr++) = v;
  } else if (v < (1<<14)) {
    *(ptr++) = v | B;
    *(ptr++) = v>>7;
  } else if (v < (1<<21)) {
    *(ptr++) = v | B;
    *(ptr++) = (v>>7) | B;
    *(ptr++) = v>>14;
  } else if (v < (1<<28)) {
    *(ptr++) = v | B;
    *(ptr++) = (v>>7) | B;
    *(ptr++) = (v>>14) | B;
    *(ptr++) = v>>21;
  } else {
    *(ptr++) = v | B;
    *(ptr++) = (v>>7) | B;
    *(ptr++) = (v>>14) | B;
    *(ptr++) = (v>>21) | B;
    *(ptr++) = v>>28;
  }
  return reinterpret_cast<char*>(ptr);
}

void Varint::Append32(string* s, uint32 value) {
  char buf[Varint::kMax32];
  const char* p = Varint::Encode32(buf, value);
  s->append(buf, p - buf);
}

4.3 GetUint32AtOffset

能够从一个uint64里面取出任意偏移的uint32。通常我们一次载入一个uint64的话那么知道可以得到5个uint32,效率会很高。

// For 0 <= offset <= 4, GetUint32AtOffset(UNALIGNED_LOAD64(p), offset) will
// equal UNALIGNED_LOAD32(p + offset).  Motivation: On x86-64 hardware we have
// empirically found that overlapping loads such as
//  UNALIGNED_LOAD32(p) ... UNALIGNED_LOAD32(p+1) ... UNALIGNED_LOAD32(p+2)
// are slower than UNALIGNED_LOAD64(p) followed by shifts and casts to uint32.
static inline uint32 GetUint32AtOffset(uint64 v, int offset) {
  DCHECK(0 <= offset && offset <= 4) << offset;
  return v >> (LittleEndian::IsLittleEndian() ? 8 * offset : 32 - 8 * offset);
}

4.4 GetHashTable

在Compress的时候需要得到HashTable来判断哪些地方可以得到copy.不过实话说GetHashTable其实没有太多可以值得学习的地方,但是我们还是看看吧。 至于hashtable每一个entry都是uint16表示什么东西,这个在Compress函数里面会讲到

// 因为Compress是按照1个个chunk来进行压缩的,input_size表示这个chunk大小多少
// 现在来看的话chunk最大32KB。然后table_size表示hashtable桶大小。

uint16* WorkingMemory::GetHashTable(size_t input_size, int* table_size) {
  // Use smaller hash table when input.size() is smaller, since we
  // fill the table, incurring O(hash table size) overhead for
  // compression, and if the input is short, we won't need that
  // many hash table entries anyway.
  assert(kMaxHashTableSize >= 256);
  int htsize = 256; // 首先从桶大小256开始进行调节,*2直到>=input_size或者是kMaxHashTableSize
  while (htsize < kMaxHashTableSize && htsize < input_size) {
    htsize <<= 1;
  }
  CHECK_EQ(0, htsize & (htsize - 1)) << ": must be power of two";
  CHECK_LE(htsize, kMaxHashTableSize) << ": hash table too large";

  uint16* table;
  if (htsize <= ARRAYSIZE(small_table_)) { // 对于WorkingMemory内部有一个small_table_[1<<10],这样可以减少分配代价
    table = small_table_;
  } else {
    if (large_table_ == NULL) {
      large_table_ = new uint16[kMaxHashTableSize]; // 对于大桶的话那么我们需要new出来
    }
    table = large_table_;
  }

  *table_size = htsize;
  memset(table, 0, htsize * sizeof(*table)); // memset(0)是需要的,在Compress部分可以看到为什么需要清0.
  return table;
}

4.5 FindMatchLength

为了快速找到两个串(s1,s2)最大匹配长度多少,其中s2_limit表示s2的结尾返回匹配长度。引用场景下面s1和s2是同一个串,不过(s1<s2)因为我们不需要判断s1溢出。

// Return the largest n such that
//
//   s1[0,n-1] == s2[0,n-1]
//   and n <= (s2_limit - s2).
//
// Does not read *s2_limit or beyond.
// Does not read *(s1 + (s2_limit - s2)) or beyond.
// Requires that s2_limit >= s2.

static inline int FindMatchLength(const char* s1,
                                  const char* s2,
                                  const char* s2_limit) {
  DCHECK_GE(s2_limit, s2);
  int matched = 0;

  // Find out how long the match is. We loop over the data 64 bits at a
  // time until we find a 64-bit block that doesn't match; then we find
  // the first non-matching bit and use that to calculate the total
  // length of the match.
  while (PREDICT_TRUE(s2 <= s2_limit - 8)) { // 如果s2还有8个以上字节的话
    if (PREDICT_FALSE(UNALIGNED_LOAD64(s2) == UNALIGNED_LOAD64(s1 + matched))) { // 那么我们首先8个字节进行比较,匹配上的话+8
      s2 += 8;
      matched += 8;
    } else { // 如果不匹配的话,那么我们可以通过xor来判断。如果相同的话为0,然后从LSB判断有多少个0,然后count >> 3就表示匹配字节。非常巧妙。
      // On current (mid-2008) Opteron models there is a 3% more
      // efficient code sequence to find the first non-matching byte.
      // However, what follows is ~10% better on Intel Core 2 and newer,
      // and we expect AMD's bsf instruction to improve.
      uint64 x = UNALIGNED_LOAD64(s2) ^ UNALIGNED_LOAD64(s1 + matched);
      int matching_bits = Bits::FindLSBSetNonZero64(x);
      matched += matching_bits >> 3;
      return matched;
    }
  }
  while (PREDICT_TRUE(s2 < s2_limit)) { // 如果没有8个字节的话那么之后能够1个字节逐个匹配。
    if (PREDICT_TRUE(s1[matched] == *s2)) {
      ++s2;
      ++matched;
    } else {
      return matched;
    }
  }
  return matched;
}

4.6 MaxCompressedLength

snappy接口简单很大程度上是因为对于一个input串的话,可以根据input串估计出compress之后的串长度上限。 然后进行压缩之前可以进行预分配,然后snappy内部的话在进行压缩时候就不进行字符越界检查了。 至于这个长度是如何计算出来的,可以接合此处注释以及后续对copy以及literal带来的膨胀代价分析。

size_t MaxCompressedLength(size_t source_len) {
  // Compressed data can be defined as:
  //    compressed := item* literal*
  //    item       := literal* copy
  //
  // The trailing literal sequence has a space blowup of at most 62/60
  // since a literal of length 60 needs one tag byte + one extra byte
  // for length information.
  //
  // Item blowup is trickier to measure.  Suppose the "copy" op copies
  // 4 bytes of data.  Because of a special check in the encoding code,
  // we produce a 4-byte copy only if the offset is < 65536.  Therefore
  // the copy op takes 3 bytes to encode, and this type of item leads
  // to at most the 62/60 blowup for representing literals.
  //
  // Suppose the "copy" op copies 5 bytes of data.  If the offset is big
  // enough, it will take 5 bytes to encode the copy op.  Therefore the
  // worst case here is a one-byte literal followed by a five-byte copy.
  // I.e., 6 bytes of input turn into 7 bytes of "compressed" data.
  //
  // This last factor dominates the blowup, so the final estimate is:
  return 32 + source_len + source_len/6;
}

4.7 IncrementalCopy

首先看看IncrementalCopy的原型和语义是什么

// Copy "len" bytes from "src" to "op", one byte at a time.  Used for
// handling COPY operations where the input and output regions may
// overlap.  For example, suppose:
//    src    == "ab"
//    op     == src + 2
//    len    == 20
// After IncrementalCopy(src, op, len), the result will have
// eleven copies of "ab"
//    ababababababababababab
// Note that this does not match the semantics of either memcpy()
// or memmove();
static inline void IncrementalCopy(const char* src, char* op, int len) {
  DCHECK_GT(len, 0);
  do {
    *op++ = *src++;
  } while (--len > 0);
}

将src内容逐个copy到op上面去。但是注意这里必须是逐个copy到op上面去。因为src和op非常有可能重叠。 可能op后面的部分字节依赖于op前面部分的字节。这个语义必须清晰,和memcpy和memmove都是不同的。 简单的实现和上面一样逐个字节进行copy.这个在解压缩的时候非常有用,因为我们的literal已经解出来放在outputle, 而copy很可能和literal重叠,需要这种IncrementalCopy的实现。

但是是否有办法更快呢?下面就是一个更快的实现。首先我们是的op和src的差距拉到>=8,一旦这样之后的话 就可以开始使用8个字节进行copy了。为了将差距拉到8,不断地做8个字节copy并且调整dest.

const int kMaxIncrementCopyOverflow = 10;

static inline void IncrementalCopyFastPath(const char* src, char* op, int len) {
  while (op - src < 8) {
    UNALIGNED_STORE64(op, UNALIGNED_LOAD64(src));
    len -= op - src;
    op += op - src;
  }
  while (len > 0) {
    UNALIGNED_STORE64(op, UNALIGNED_LOAD64(src));
    src += 8;
    op += 8;
    len -= 8;
  }
}

可能阅读这个代码比较难理解,作者良好的注释又有帮助了。

// Equivalent to IncrementalCopy except that it can write up to ten extra
// bytes after the end of the copy, and that it is faster.
//
// The main part of this loop is a simple copy of eight bytes at a time until
// we've copied (at least) the requested amount of bytes.  However, if op and
// src are less than eight bytes apart (indicating a repeating pattern of
// length < 8), we first need to expand the pattern in order to get the correct
// results. For instance, if the buffer looks like this, with the eight-byte
// <src> and <op> patterns marked as intervals:
//
//    abxxxxxxxxxxxx
//    [------]           src
//      [------]         op
//
// a single eight-byte copy from <src> to <op> will repeat the pattern once,
// after which we can move <op> two bytes without moving <src>:
//
//    ababxxxxxxxxxx
//    [------]           src
//        [------]       op
//
// and repeat the exercise until the two no longer overlap.
//
// This allows us to do very well in the special case of one single byte
// repeated many times, without taking a big hit for more general cases.
//
// The worst case of extra writing past the end of the match occurs when
// op - src == 1 and len == 1; the last copy will read from byte positions
// [0..7] and write to [4..11], whereas it was only supposed to write to
// position 1. Thus, ten excess bytes.

不看上面注释我们也可以知道,使用这种方式是可能存在内存重叠copy的,并且很可能会多访问最后那么几个字节。 上面注释的分析就是,如果op+length之后后面还有10个空余字节的话那么就是安全的,所以kMaxIncrementCopyOverflow==10. 在使用上的话只有发现后面空余字节超过10字节之后的话,上面的算法才是安全的。

4.8 EmitLiteral

EmitLiteral就是输出literal.虽然从Format很直观地看出literal应该怎么输出,但是还是有技巧的。最技巧的方便就是allow_fast_path. allow_fast_path场景是如果op后面有15个空余字节的话,那么就可以使用2个8字节copy完成。这个在CompressFragment代码里面可以看到, 应用层面上只有在确保了op后面15个空余字节才会让allow_fast_path==true.而其他逻辑的话因为就是正常的copy都走了memcpy这个分支。

static inline char* EmitLiteral(char* op,
                                const char* literal,
                                int len,
                                bool allow_fast_path) {
  int n = len - 1;      // Zero-length literals are disallowed
  if (n < 60) {
    // Fits in tag byte
    *op++ = LITERAL | (n << 2);

    // The vast majority of copies are below 16 bytes, for which a
    // call to memcpy is overkill. This fast path can sometimes
    // copy up to 15 bytes too much, but that is okay in the
    // main loop, since we have a bit to go on for both sides:
    //
    //   - The input will always have kInputMarginBytes = 15 extra
    //     available bytes, as long as we're in the main loop, and
    //     if not, allow_fast_path = false.
    //   - The output will always have 32 spare bytes (see
    //     MaxCompressedLength).
    if (allow_fast_path && len <= 16) {
      UNALIGNED_STORE64(op, UNALIGNED_LOAD64(literal));
      UNALIGNED_STORE64(op + 8, UNALIGNED_LOAD64(literal + 8));
      return op + len;
    }
  } else {
    // Encode in upcoming bytes
    char* base = op;
    int count = 0;
    op++;
    while (n > 0) {
      *op++ = n & 0xff;
      n >>= 8;
      count++;
    }
    assert(count >= 1);
    assert(count <= 4);
    *base = LITERAL | ((59+count) << 2);
  }
  memcpy(op, literal, len);
  return op + len;
}

4.9 EmitCopy

EmitCopy非常简单也不牵扯到数据的memcpy,只是写入(offset,length)这两个分量。但是之前说了如果length>=64的话,那么是需要进行copy的切分的。

static inline char* EmitCopyLessThan64(char* op, int offset, int len) {
  DCHECK_LE(len, 64);
  DCHECK_GE(len, 4);
  DCHECK_LT(offset, 65536);

  if ((len < 12) && (offset < 2048)) {
    int len_minus_4 = len - 4;
    assert(len_minus_4 < 8);            // Must fit in 3 bits
    *op++ = COPY_1_BYTE_OFFSET | ((len_minus_4) << 2) | ((offset >> 8) << 5);
    *op++ = offset & 0xff;
  } else {
    *op++ = COPY_2_BYTE_OFFSET | ((len-1) << 2);
    LittleEndian::Store16(op, offset);
    op += 2;
  }
  // 这里没有处理offset>=65536的格式,因为就现在实现来说chunk的大小是32K不会造成offset>=64K的情况。
  return op;
}

static inline char* EmitCopy(char* op, int offset, int len) {
  // Emit 64 byte copies but make sure to keep at least four bytes reserved
  while (len >= 68) { // 这里必须判断68,不然如果len<4而offset<208的话那么没有对应的copy编码形式
    op = EmitCopyLessThan64(op, offset, 64);
    len -= 64;
  }

  // Emit an extra 60 byte copy if have too much data to fit in one copy
  if (len > 64) {
    op = EmitCopyLessThan64(op, offset, 60);
    len -= 60;
  }

  // Emit remainder
  op = EmitCopyLessThan64(op, offset, len);
  return op;
}

5 Compress

终于进行Compress正题了,看看大体框架吧。

size_t Compress(Source* reader, Sink* writer) {

    // 首先取得reader的长度进行编码放在最开头

    size_t written = 0;
    int N = reader->Available();
    char ulength[Varint::kMax32];
    char* dest= writer->GetAppendBuffer(Varint::kMax32,ulength);
    char* p = Varint::Encode32(dest,N);
    writer->Append(dest,p-dest);
    written += (p - dest);

    //
    //   size_t written = 0;
    //   int N = reader->Available();
    //   char ulength[Varint::kMax32];
    //   char* p = Varint::Encode32(ulength, N);
    //   writer->Append(ulength, p-ulength);
    //   written += (p - ulength);

  internal::WorkingMemory wmem;
  char* scratch = NULL;
  char* scratch_output = NULL;

  while (N > 0) {
    // 然后num_to_read表示本次压缩chunk的大小

    // Get next block to compress (without copying if possible)
    size_t fragment_size;
    const char* fragment = reader->Peek(&fragment_size);
    DCHECK_NE(fragment_size, 0) << ": premature end of input";
    const int num_to_read = min(N, kBlockSize);
    size_t bytes_read = fragment_size;

    // 这个地方会尝试发起多次读,但是就现在Snappy默认实现来说,其实就是一步到位,不会进入else逻辑
    // 但是即使进入下面逻辑也非常简单,无非就是多次发起读然后放在scratch内存里面
    // 出这个判断之后,地址和大小放在了fragment,fragment_size里面。

    int pending_advance = 0;
    if (bytes_read >= num_to_read) {
      // Buffer returned by reader is large enough
      pending_advance = num_to_read;
      fragment_size = num_to_read;
    } else {
      // Read into scratch buffer
      if (scratch == NULL) {
        // If this is the last iteration, we want to allocate N bytes
        // of space, otherwise the max possible kBlockSize space.
        // num_to_read contains exactly the correct value
        scratch = new char[num_to_read];
      }
      memcpy(scratch, fragment, bytes_read);
      reader->Skip(bytes_read);

      while (bytes_read < num_to_read) {
        fragment = reader->Peek(&fragment_size);
        size_t n = min<size_t>(fragment_size, num_to_read - bytes_read);
        memcpy(scratch + bytes_read, fragment, n);
        bytes_read += n;
        reader->Skip(n);
      }
      DCHECK_EQ(bytes_read, num_to_read);
      fragment = scratch;
      fragment_size = num_to_read;
    }
    DCHECK_EQ(fragment_size, num_to_read);

    // 准备针对fragment来进行压缩,首先我们创建一个hashtable
    // Get encoding table for compression
    int table_size;
    uint16* table = wmem.GetHashTable(num_to_read, &table_size);

    // 针对这次压缩的话,临时空间到底应该开辟多大。
    // Compress input_fragment and append to dest
    const int max_output = MaxCompressedLength(num_to_read);

    // Need a scratch buffer for the output, in case the byte sink doesn't
    // have room for us directly.
    if (scratch_output == NULL) {
      scratch_output = new char[max_output];
    } else {
      // Since we encode kBlockSize regions followed by a region
      // which is <= kBlockSize in length, a previously allocated
      // scratch_output[] region is big enough for this iteration.
    }
    // 调用CompressFragment来压缩这个fragment
    char* dest = writer->GetAppendBuffer(max_output, scratch_output);
    char* end = internal::CompressFragment(fragment, fragment_size,
                                           dest, table, table_size);
    writer->Append(dest, end - dest);
    written += (end - dest);

    N -= num_to_read;
    reader->Skip(pending_advance);
  }

  delete[] scratch;
  delete[] scratch_output;

  return written;
}

整个过程并不是很麻烦,无非就是切出chunk出来并且初始化hashtable然后交给CompressFragment来处理。

char* CompressFragment(const char* const input,
                       const size_t input_size,
                       char* op,
                       uint16* table,
                       const int table_size) {
  // "ip" is the input pointer, and "op" is the output pointer.
  const char* ip = input;
  CHECK_LE(input_size, kBlockSize);
  CHECK_EQ(table_size & (table_size - 1), 0) << ": table must be power of two";
  const int shift = 32 - Bits::Log2Floor(table_size);
  DCHECK_EQ(kuint32max >> shift, table_size - 1);
  const char* ip_end = input + input_size;
  const char* base_ip = ip;
  // Bytes in [next_emit, ip) will be emitted as literal bytes.  Or
  // [next_emit, ip_end) after the main loop.
  const char* next_emit = ip;

  // 回想一下我们之前EmitLiteral必须确保15个字节才有allow_fast_path
  // 如果我们确保最后15个字节进行literal的话,那么op后面必须存在>=15字节
  // 这样前面进行EmitLiteral都可以进行allow_fast_path了。

  const int kInputMarginBytes = 15;
  if (PREDICT_TRUE(input_size >= kInputMarginBytes)) {
    const char* ip_limit = input + input_size - kInputMarginBytes;

    for (uint32 next_hash = Hash(++ip, shift); ; ) {
      DCHECK_LT(next_emit, ip);
      // The body of this loop calls EmitLiteral once and then EmitCopy one or
      // more times.  (The exception is that when we're close to exhausting
      // the input we goto emit_remainder.)
      //
      // In the first iteration of this loop we're just starting, so
      // there's nothing to copy, so calling EmitLiteral once is
      // necessary.  And we only start a new iteration when the
      // current iteration has determined that a call to EmitLiteral will
      // precede the next call to EmitCopy (if any).
      //
      // Step 1: Scan forward in the input looking for a 4-byte-long match.
      // If we get close to exhausting the input then goto emit_remainder.
      //
      // Heuristic match skipping: If 32 bytes are scanned with no matches
      // found, start looking only at every other byte. If 32 more bytes are
      // scanned, look at every third byte, etc.. When a match is found,
      // immediately go back to looking at every byte. This is a small loss
      // (~5% performance, ~0.1% density) for compressible data due to more
      // bookkeeping, but for non-compressible data (such as JPEG) it's a huge
      // win since the compressor quickly "realizes" the data is incompressible
      // and doesn't bother looking for matches everywhere.
      //
      // The "skip" variable keeps track of how many bytes there are since the
      // last match; dividing it by 32 (ie. right-shifting by five) gives the
      // number of bytes to move ahead for each iteration.
      uint32 skip = 32;

      // 我们首先查找4bytes的match
      // 这里hash==Hash(Load32(ip)),然后table[hash]=ip-base_ip.
      // 所以桶里面的内容就是相对于base_ip也就是起始输入的偏移

      // 如果需要用表达是表明的话
      // table[Hash(Load32(ip))]=ip-base_ip
      // 这样我们可以通过首先匹配hash,一旦hash匹配上然后检查内容是否相同来发现match
      // 注意这里如果出现hash冲突的话那么table是不断update的

      // 另外一个有趣的事情就是这个skip
      // 可以看到前面32次都是按照1个字节跳跃,直到skip==64
      // 然后按照2个字节跳跃,直到skip==128
      // 这是一种启发是的匹配算法
      const char* next_ip = ip;
      const char* candidate;
      do {
        ip = next_ip;
        uint32 hash = next_hash;
        DCHECK_EQ(hash, Hash(ip, shift));
        uint32 bytes_between_hash_lookups = skip++ >> 5;
        next_ip = ip + bytes_between_hash_lookups;
        if (PREDICT_FALSE(next_ip > ip_limit)) {
          goto emit_remainder;
        }
        next_hash = Hash(next_ip, shift);

        candidate = base_ip + table[hash];
        DCHECK_GE(candidate, base_ip);
        DCHECK_LT(candidate, ip);

        table[hash] = ip - base_ip;
      } while (PREDICT_TRUE(UNALIGNED_LOAD32(ip) !=
                            UNALIGNED_LOAD32(candidate)));


      // 到这个步骤之后的话,那么ip和candidate就完全匹配上了
      // 我们可以将[next_emit,ip-1]作为literal输出

      // Step 2: A 4-byte match has been found.  We'll later see if more
      // than 4 bytes match.  But, prior to the match, input
      // bytes [next_emit, ip) are unmatched.  Emit them as "literal bytes."
      DCHECK_LE(next_emit + 16, ip_end); // including margin bytes.
      op = EmitLiteral(op, next_emit, ip - next_emit, true);

      // 然后我们进行copy输出

      // Step 3: Call EmitCopy, and then see if another EmitCopy could
      // be our next move.  Repeat until we find no match for the
      // input immediately after what was consumed by the last EmitCopy call.
      //
      // If we exit this loop normally then we need to call EmitLiteral next,
      // though we don't yet know how big the literal will be.  We handle that
      // by proceeding to the next iteration of the main loop.  We also can exit
      // this loop via goto if we get close to exhausting the input.
      uint64 input_bytes = 0;
      uint32 candidate_bytes = 0;

      do {
        // We have a 4-byte match at ip, and no need to emit any
        // "literal bytes" prior to ip.
        const char* base = ip;

        // 如果ip和candidate匹配的话,那么尝试取发现更长的copy

        int matched = 4 + FindMatchLength(candidate + 4, ip + 4, ip_end);
        ip += matched;
        int offset = base - candidate;
        DCHECK_EQ(0, memcmp(base, candidate, matched));
        op = EmitCopy(op, offset, matched);

        // 后面是算法的优化,更新一下Hash(ip-1),然后看看后面ip是否立刻有candidate匹配
        // We could immediately start working at ip now, but to improve
        // compression we first update table[Hash(ip - 1, ...)].
        const char* insert_tail = ip - 1;
        next_emit = ip;
        if (PREDICT_FALSE(ip >= ip_limit)) {
          goto emit_remainder;
        }
        input_bytes = UNALIGNED_LOAD64(insert_tail);
        uint32 prev_hash = HashBytes(GetUint32AtOffset(input_bytes, 0), shift);
        table[prev_hash] = ip - base_ip - 1;
        uint32 cur_hash = HashBytes(GetUint32AtOffset(input_bytes, 1), shift);
        candidate = base_ip + table[cur_hash];
        candidate_bytes = UNALIGNED_LOAD32(candidate);
        table[cur_hash] = ip - base_ip;
      } while (GetUint32AtOffset(input_bytes, 1) == candidate_bytes);

      next_hash = HashBytes(GetUint32AtOffset(input_bytes, 2), shift);
      ++ip;
    }
  }

  // 如果最后部分没有>=15个字节的话,那么就直接作为literal输出,但是肯定不能够allow_fast_path
 emit_remainder:
  // Emit the remaining bytes as a literal
  if (next_emit < ip_end) {
    op = EmitLiteral(op, next_emit, ip_end - next_emit, false);
  }

  return op;
}

6 Uncompress

Uncompress最终走到的逻辑是下面这个代码。比较重要的就是SnappyDecompressor以及Write这个接口。不过从代码逻辑上看非常简单, 首先ReadUncompressedLength,然后writer设置最终解码之后应该多大。然后decompressor开始分析各个tag了。最后decompressor判断是否读完以及 writer判断自己解码之后长度和uncompressed_len是否相同。

template <typename Writer>
static bool InternalUncompress(Source* r,
                               Writer* writer,
                               uint32 max_len) {
  // Read the uncompressed length from the front of the compressed input
  SnappyDecompressor decompressor(r);
  uint32 uncompressed_len = 0;
  if (!decompressor.ReadUncompressedLength(&uncompressed_len)) return false;
  // Protect against possible DoS attack
  if (static_cast<uint64>(uncompressed_len) > max_len) {
    return false;
  }

  writer->SetExpectedLength(uncompressed_len);

  // Process the entire input
  decompressor.DecompressAllTags(writer);
  return (decompressor.eof() && writer->CheckLength());
}

6.1 Writer

我们首先看看一个实现SnappyArrayWriter.最关键的两个函数就是Append(copy literal)以及AppendFromSelf(copy copy). 然后对于SnappyDecompressor分析出literal以及copy之后就可以调用这两个函数来进行解压缩了。

class SnappyArrayWriter {
 private:
  char* base_; // 起始地址
  char* op_;  // 当前操作地址
  char* op_limit_; // 当前操作阈值

 public:
  inline explicit SnappyArrayWriter(char* dst)
      : base_(dst),
        op_(dst) {
  }

  inline void SetExpectedLength(size_t len) {
    op_limit_ = op_ + len; // 设置阈值
  }

  inline bool CheckLength() const {
    return op_ == op_limit_; // 判断长度是否匹配
  }

  inline bool Append(const char* ip, uint32 len, bool allow_fast_path) {
    char* op = op_;
    const int space_left = op_limit_ - op;
    if (allow_fast_path && len <= 16 && space_left >= 16) {
      // Fast path, used for the majority (about 90%) of dynamic invocations.
      UNALIGNED_STORE64(op, UNALIGNED_LOAD64(ip));
      UNALIGNED_STORE64(op + 8, UNALIGNED_LOAD64(ip + 8));
    } else {
      if (space_left < len) {
        return false;
      }
      memcpy(op, ip, len);
    }
    op_ = op + len;
    return true;
  }

  inline bool AppendFromSelf(uint32 offset, uint32 len) { // 从自身复制,向前走offset然后copy len个字节数据
    char* op = op_;
    const int space_left = op_limit_ - op;

    if (op - base_ <= offset - 1u) {  // -1u catches offset==0
      return false;
    }
    if (len <= 16 && offset >= 8 && space_left >= 16) { // 只有offset>=8才可以直接操作
      // Fast path, used for the majority (70-80%) of dynamic invocations.
      UNALIGNED_STORE64(op, UNALIGNED_LOAD64(op - offset));
      UNALIGNED_STORE64(op + 8, UNALIGNED_LOAD64(op - offset + 8));
    } else {
      if (space_left >= len + kMaxIncrementCopyOverflow) { // 如果允许FastPath的话
        IncrementalCopyFastPath(op - offset, op, len);
      } else {
        if (space_left < len) {
          return false;
        }
        IncrementalCopy(op - offset, op, len);
      }
    }

    op_ = op + len;
    return true;
  }
};

不难想到如果修改一下这个实现的话那么可以做检查器,我们只是验证压缩包是否正确。

// A Writer that drops everything on the floor and just does validation
class SnappyDecompressionValidator {
 private:
  size_t expected_;
  size_t produced_;

 public:
  inline SnappyDecompressionValidator() : produced_(0) { }
  inline void SetExpectedLength(size_t len) {
    expected_ = len;
  }
  inline bool CheckLength() const {
    return expected_ == produced_;
  }
  inline bool Append(const char* ip, uint32 len, bool allow_fast_path) {
    produced_ += len;
    return produced_ <= expected_;
  }
  inline bool AppendFromSelf(uint32 offset, uint32 len) {
    if (produced_ <= offset - 1u) return false;  // -1u catches offset==0
    produced_ += len;
    return produced_ <= expected_;
  }
};

6.2 SnappyDecompressor

SnappyDecompressor有几个比较重要的方法:

  • ReadUncompressedLength.这个就是解压缩开头的Varint.这个没有太大的问题。
  • DecompressAllTags.这个是解压缩所有的tag,源是RefillTag填充的scratch[].
  • RefillTag.填充tag所需要的字节到scratch[]内部。这个也没有太大问题。

在看这个代码之前,我们想想如果我们得到scratch之后应该如何从中提取tag信息呢?最好的方式就是打表。 因为tag的头一个字节反应了这个tag所有信息,所以在snappy里面有char_table这个表以头一个字节内容作为索引

// Data stored per entry in lookup table:
//      Range   Bits-used       Description
//      ------------------------------------
//      1..64   0..7            Literal/copy length encoded in opcode byte
//      0..7    8..10           Copy offset encoded in opcode byte / 256
//      0..4    11..13          Extra bytes after opcode
//
// We use eight bits for the length even though 7 would have sufficed
// because of efficiency reasons:
//      (1) Extracting a byte is faster than a bit-field
//      (2) It properly aligns copy offset so we do not need a <<8
static const uint16 char_table[256] = {
  0x0001, 0x0804, 0x1001, 0x2001, 0x0002, 0x0805, 0x1002, 0x2002,
  0x0003, 0x0806, 0x1003, 0x2003, 0x0004, 0x0807, 0x1004, 0x2004,
  // ...
};

这个表是可以计算出来的

  • [0..7]表示literal/copy长度,因为不管是literal/copy长度。对于copy是足够的,但是对于literal来说的话能够根据高extra byte判断后面剩余长度字节。
  • [8..10]表示copy的offset / 256,对于offset剩余内容通过extra表示
  • [11..13]表示后面剩余多少个字节。通过wordmask来提取。

这里workmask

static const uint32 wordmask[] = {
  0u, 0xffu, 0xffffu, 0xffffffu, 0xffffffffu
};

这样如果extra bytes==1的话,通过& 0xffu可以取到1个字节。这个在代码里面就会有体现。非常精巧。

我们看看DecompressAllTags这个函数实现

// Process the next item found in the input.
// Returns true if successful, false on error or end of input.
template <class Writer>
void DecompressAllTags(Writer* writer) {
  const char* ip = ip_;
  for ( ;; ) {
    if (ip_limit_ - ip < 5) { // 如果不够5个字节的话那么就填充,但是填充结果不一定达到5个字节。
      ip_ = ip;
      if (!RefillTag()) return;
      ip = ip_;
    }

    // 读取tag第一个字节到c,得到entry
    const unsigned char c = *(reinterpret_cast<const unsigned char*>(ip++));
    const uint32 entry = char_table[c];
    // 我们将extra bytes按照uint32载入,然后根据wordmask知道,除了留在c里面的长度信息之外,存放在extra bytes里面的长度信息,叫做trailer.
    const uint32 trailer = LittleEndian::Load32(ip) & wordmask[entry >> 11];
    // ip后面extra bytes跳过
    ip += entry >> 11;
    // 得到放在c里面的长度信息
    const uint32 length = entry & 0xff;

    // 如果这个tag是LITERAL的话
    if ((c & 0x3) == LITERAL) {
      uint32 literal_length = length + trailer; // 那么trailer表示literal_length部分
      // 后面操作就是写入LITERAL
      uint32 avail = ip_limit_ - ip;
      while (avail < literal_length) {
        bool allow_fast_path = (avail >= 16);
        if (!writer->Append(ip, avail, allow_fast_path)) return;
        literal_length -= avail;
        reader_->Skip(peeked_);
        size_t n;
        ip = reader_->Peek(&n);
        avail = n;
        peeked_ = avail;
        if (avail == 0) return;  // Premature end of input
        ip_limit_ = ip + avail;
      }
      bool allow_fast_path = (avail >= 16);
      if (!writer->Append(ip, literal_length, allow_fast_path)) {
        return;
      }
      ip += literal_length;
    } else {
      // 如果是COPY的话
      // copy_offset/256 is encoded in bits 8..10.  By just fetching
      // those bits, we get copy_offset (since the bit-field starts at
      // bit 8).
        const uint32 copy_offset = (entry & 0x700) ; // 注意这个地方已经 * 256了,我们不需要进行任何操作
       // 回忆COPY(01)的(offset >> 8) << 5.所以这个地方直接就是这个结果
       // 如果是copy的话,那么trailer信息是offset而不是length
       // 然后将COPY写入.
      if (!writer->AppendFromSelf(copy_offset + trailer, length)) {
        return;
      }
    }
  }
}

7 Example

工作很简单,首先从main.cc里面读取内容然后压缩到main.cc.compress文件里面,然后读取出来解压缩对比是否正确。这里我们演示了Sink如何封装。

#include <sys/stat.h>
#include <unistd.h>
#include <fcntl.h>

#include <snappy.h>
#include <snappy-sinksource.h>

static const char* IN_NAME="./main.cc";
static const char* OUT_NAME="./main.cc.compress";

class FileSink:public snappy::Sink{
  public:
    FileSink(int fd):fd_(fd){
    }
    virtual ~FileSink(){}
    virtual char* GetAppendBuffer(size_t length,char* scratch){
        return scratch;
    }
    virtual void Append(const char* bytes,size_t n){
        write(fd_,bytes,n);
    }
  private:
    int fd_;
};

int main(){
    struct stat stbuf;
    stat(IN_NAME,&stbuf);
    size_t in_fsize=stbuf.st_size;
    size_t compress_fsize=0;

    // do comress
    char* in_buf=new char[in_fsize];
    {
        FILE* fin=fopen(IN_NAME,"rb");
        fread(in_buf,1,in_fsize,fin);
        fclose(fin);
        snappy::ByteArraySource source(in_buf,in_fsize);

        int fd=open(OUT_NAME,O_CREAT | O_WRONLY,0666);
        FileSink sink(fd);
        compress_fsize=snappy::Compress(&source,&sink);
        close(fd);
    }

    // do decompress
    stat(OUT_NAME,&stbuf);
    size_t out_fsize=stbuf.st_size;
    assert(out_fsize==compress_fsize);
    char* out_buf=new char[out_fsize];
    std::string out;
    {
        FILE* fin=fopen(OUT_NAME,"rb");
        fread(out_buf,1,out_fsize,fin);
        fclose(fin);
        assert(snappy::Uncompress(out_buf,out_fsize,&out)==true);
    }

    // do validate
    assert(out.size()==in_fsize);
    assert(memcmp(out.data(),in_buf,in_fsize)==0);
    delete [] in_buf;
    delete [] out_buf;
    return 0;
}
comments powered by Disqus