Kudu: Storage for Fast Analytics on Fast Data

在Apache生态下面,比较适合数据分析的文件结构有Avro/Parquet. 但是这些文件结构如果放在HDFS上的话,就没有办法做到快速更新和随机查询。HBase/C*倒是适合快速更新和随机查询,但是存储结构又不太适合用于数据分析(不知道SQL on HBase性能如何,估计做某些复杂查询的时候可能力不从心)所以需要这么一个存储系统: 分布式的,允许快速更新和查询,能提供大吞吐量的,适合数据分析的。Kudu就是这么一个东西,可以认为是改良版的Avro/Parquet on HDFS,但绝对不是简单的组合。

在用户接口上,类似RDBMS的Table. Schema定义包括columns以及types, 以及primary key. 现在只有primary key作为索引,没有二级索引。Write Operation操作必须带上PK, 保证single row上的原子性。Read Operationw有几个限制:1. 必须带上projections 就是不能 select * 2. 条件语句只支持 column 和 constant value 比较,或者是PK range查询. 此外Kudu还提供其他一些辅助API比如查询data locality的(data range在哪些servers上面)。


Kudu的分布式有点类似BigTable,master + tablet servers结构,大约有下面节点不同:

  1. Tablet Server之间是通过Raft选举出leader的,而不是master来指派的。(500ms heartbeat, 1500ms election timeout)
  2. Partition方式由用户进行制定:hash parition or range partition(需要用户指定范围)

table按照partition方式拆分成为了多个tablets. 每个tablet会被多个tablet server管理,之间通过raft达成共识。建议是每个tablet大小在10GB左右(这样做full snapshot transfer时间也不会太长),然后每个tablet server管理10-100个tablet(大小上大约就是1TB的样子,可能再多的话会影响到共识的效率)。

Master大致有这么几个用途:

  1. 管理table/tablet信息,比如table的schema, partition信息等
  2. 管理tablets的raft configuration信息,这样client可以直接去和tablet server通信

如果tablet server的重新进行raft选举的话,会生成新的commit index. client去和tablet server通信的时候,如果commit index不一致估计就会被拒绝了。tablet server会定期将这个commit index同步给master. 这样client发现自己cache的信息不对的时候,可以重新和master进行纠正。这里的client和GFS client非常类似,会缓存tablet以及tablet servers的信息(GFS client会缓存chunk master信息)


每个tablet是由多个RowSets组成的,一个MemRowSets以及多个DiskRowSets. 对于一个没有删除的PK来说只能存在于一个RowSet中,但是多个RowSet之间的PK interval[min PK, max PK]是可能存在重合的。这个设计非常类似leveldb结构,MemRS会定期刷新到DiskRS,然胡DiskRS之间也会定期做Compaction.

MemRS是一个基于行的内存Btree结构使用乐观锁解决并发问题,参考了MassTree实现,但是不允许做delete操作。为了提高性能,刻意增大了leaf node size到256bytes(4 cache-line size). 在MemRS上面的CPU操作做了几个优化:1. prefetch btree 下一个节点 2. 使用LLVM将projection等操作代码JIT. 我的理解是在MemRS上的操作没有DiskRS那样容易受到disk latency的影响,所以CPU操作时间就相对较多需要专门优化。

将MemRS刷新出来的时候,每32MB IO就会产生一个DiskRowSet文件。一个DiskRowSet包括两个部分:base(on disk) 和 delta (mem + disk files). 其中DeltaMemStores结构和MemRS非常类似,然后DeltaFiles则是DeltaMemStores刷新下来的结果。但是这个delta里面的key不再是primary key, 而是row offset,这个row offset是row在BaseFile里面的记录号(第一个row就是0,第二个row就是1)。delta里面的value一个change operation. 可以想象,当从base里面读取到某个row之后,还需要来delta这边看看,把这个change operation apply上去,才算是最终的结果。delta也定义会和base进行compaction, 不过只需要重写有修改的column.

Base的文件格式大致是这样的:

  1. 每个column下面的values会被存储在一起组成个block. block里面是多个pages, 然后有个embedded btree结构记录每个page的初始row offset.
  2. primary key index column对应的values也会被存在一起,方便根据PK查找到row offset.
  3. 一个Base还会存在MinPK和MaxPK,以及一个4KB的bloom filter,用于快速判断某个PK是否存在这个RowSet里面

在进行Query的时候,当我们要进行PK range query的时候,因为多个RowSet之间的PK interval是会重现重合的,所以理论上我们需要去touch多个RowSet. 为了减少touch的RowSet,我们就需要做RowSet Compaction,因此做RowSet Compaction的依据就是:尽可能地挑选PK Interval重合比较大的两个RowSets进行合并。一个可用的优化是 "Lazy Materialization", 大致就是我们可以选择某个顺序读column期间更新predicate,而不是一次把所有的columns全部读上来做predicate. 有点类似短路求值的意思。