Introducing Druid: Real-Time Analytics at a Billion Rows Per Second

Table of Contents

http://metamarkets.com/2011/druid-part-i-real-time-analytics-at-a-billion-rows-per-second/

1. Background

Druid is the distributed, in-memory OLAP data store that resulted.(分布式全内存的OLAP存储系统) 有两个名字需要稍微了解一下:

  • roll up,降维比如使用group
  • drill down. roll up的反义

文中以下面的实际例子来进行分析。假设我们有如下的data table数据集合成为alpha data set,这个部分数据是存放在硬盘上面的。

timestamp publisher advertiser gender country .. dimensions .. click price 2011-01-01T01:01:35Z bieberfever.com google.com Male USA 0 0.65 2011-01-01T01:03:63Z bieberfever.com google.com Male USA 0 0.62 2011-01-01T01:04:51Z bieberfever.com google.com Male USA 1 0.45 … 2011-01-01T01:00:00Z ultratrimfast.com google.com Female UK 0 0.87 2011-01-01T02:00:00Z ultratrimfast.com google.com Female UK 0 0.99 2011-01-01T02:00:00Z ultratrimfast.com google.com Female UK 1 1.53

为了能够将数据放入内存,在alpha data set上面做roll up形成beta data set.这个部分可以放入内存。roll up方法如下

GROUP BY timestamp, publisher, advertiser, gender, country :: impressions = COUNT(1), clicks = SUM(click), revenue = SUM(price)

产生的beta data set如下:

timestamp publisher advertiser gender country impressions clicks revenue 2011-01-01T01:00:00Z ultratrimfast.com google.com Male USA 1800 25 15.70 2011-01-01T01:00:00Z bieberfever.com google.com Male USA 2912 42 29.18 2011-01-01T02:00:00Z ultratrimfast.com google.com Male UK 1953 17 17.31 2011-01-01T02:00:00Z bieberfever.com google.com Male UK 3194 170 34.01

beta data set主要包含3个部分:

  • Timestamp column: We treat timestamp separately because all of our queries center around the time axis. Timestamps are faceted by varying granularities (hourly, in the example above).(时间列主要用来做时间范围内的查询,roll up时候可以按照不同的粒度切片,上面的例子是按照小时)
  • Dimension columns: Here we have four dimensions of publisher, advertiser, gender, and country. They each represent an axis of the data that we’ve chosen to slice across.(维度列主要用来进行交叉查询)
  • Metric columns: These are impressions, clicks and revenue. These represent values, usually numeric, which are derived from an aggregation operation – such as count, sum, and mean (we also run variance and higher moment calculations). For example, in the first row, the revenue metric of 15.70 is the sum of 1800 event-level prices.(指标列则是一些具体的数值)

对于这些数据集合上面我们可能需要有下面这些操作:

  • “How many impressions from males were on bieberfever.com?” and
  • “What is the average cost to advertise to women at ultratrimfast.com?”
  • But we have a hard requirement to meet: we want queries over any arbitrary combination of dimensions at sub-second latencies.

理论上这个集合可能非常大,但是实际上这个大部分的维度交叉item还是非常少的,比如few Kazakhstanis visit beiberfever.com

2. Failed Solution I: Dynamic Roll-Ups with a RDBMS

So about a year ago, we fired up a RDBMS instance (actually, the Greenplum Community Edition, running on an m1.large EC2 box) 开始使用一些关系数据库,但是存在下面这些问题:

  • We stored the data in a star schema, which meant that there was operational overhead maintaining dimension and fact tables. TOOO(dirlt):不太明白这个是什么意思
    • 关于star schema可以参考这篇文章 Salina & IT Mind: Data Warehouse: Star Schema http://salinaitmind.blogspot.jp/2012/10/data-warehouse-star-schema.html
    • 所谓的fact table里面存放的是具体值,而dimensional table是指属性或者说是维度。
    • dimensional table引用的都是fact table里的值。
    • 一个fact table(多个fact table也行)可能会被多个dimensional table所引用,这样就形成了一个星型schema。
  • Whenever we needed to do a full table scan, for things like global counts, the queries ran slow. For example, naive benchmarks showed scanning 33 million rows took 3 seconds. (对于一些全表扫描的操作非常地慢。是因为没有使用并行查询的方式吗?)
    • We started materializing all dimensional roll-ups of a certain depth, and began routing queries to these pre-aggregated tables. We also implemented a caching layer in front of our queries. (开始通过pre roll up到一定的深度然后在这些table上面进行查询,并且在query之前假设一个cache layer)
    • This approach generally worked and is, I believe, a fairly common strategy in the space. Except, when things weren’t in the cache and a query couldn’t be mapped to a pre-aggregated table, we were back to full scans and slow performance.(上面工作方式在大部分时候工作还是很好的,但是如果没有出现在cache或者是pre-compute的table里面性能就非常差)
    • We tried indexing our way out of it, but given that we are allowing arbitrary combinations of dimensions, we couldn’t really take advantage of composite indexes. (尝试建立二级联合索引,但是因为允许在所有的dimenson上面进行查询,所以还是不行)
    • Additionally, index merge strategies are not always implemented, or only implemented for bitmap indexes, depending on the flavor of RDBMS.(另外一些RDBMS的index merge策略可能没有实现,或者只是实现了bitmap index merge策略)

We also benchmarked plain Postgres, MySQL, and InfoBright, but did not observe dramatically better performance.

3. Failed Solution II: Pre-compute the World in NoSQL

  • In short, we took all of our data and pre-computed aggregates for every combination of dimensions. At query time we need only locate the specific pre-computed aggregate and and return it: an O(1) key-value lookup. This made things fast and worked wonderfully when we had a six dimension beta data set.(在NoSQL里面需要预先计算很多维度的组合,但是在查询的时候非常快。如果维度只有6个的时候还是工作非常快速的)
  • But when we added five more dimensions – giving us 11 dimensions total – the time to pre-compute all aggregates became unmanageably large (such that we never waited more than 24 hours required to see it finish).(但是我们测试11个维度的时候,发现计算量太大)
  • Lesson learned: massively scalable counter systems like rainbird are intended for high cardinality data sets with pre-defined hierarchical drill-downs. But they break down when supporting arbitrary drill downs across all dimensions. (NoSQL不太适合高维度的查询,只是适合低纬度并且能够预先计算的场景)

4. Introducing Druid: A Distributed, In-Memory OLAP Store

下面是前面两种方式各自的问题:

  • Relational Database Architectures
    • Full table scans were slow, regardless of the storage engine used
    • Maintaining proper dimension tables, indexes and aggregate tables was painful
    • Parallelization of queries was not always supported or non-trivial
  • Massive NOSQL With Pre-Computation
    • Supporting high dimensional OLAP requires pre-computing an exponentially large amount of data

Keeping everything in memory provides fast scans, but it does introduce a new problem: machine memory is limited. The corollary thus was: distribute the data over multiple machines. (内存有限的话通过将数据分布在多个机器上面)

Thus, our requirements were:

  • Ability to load up, store, and query data sets in memory (放在内存里面避免了load up时间)
  • Parallelized architecture that allows us to add more machines in order to relieve memory pressure(分布式查询能够减缓memory压力)

And then we threw in a couple more that seemed like good ideas:

  • Parallelized queries to speed up full scan processing (同时分布式查询可以加快full scan处理速度)
  • No dimensional tables to manage (不维护任何dimensional table)

These are the requirements we used to implement Druid. The system makes a number of simplifying assumptions that fit our use case (namely that all analytics are time-based) and integrates access to real-time and historical data for a configurable amount of time into the past.(做了一些假设来简化设计比如所有的分析都是按照时间来进行划分的,并且支持对实时和非实时数据的统一访问)