In-Stream Big Data Processing

Table of Contents

In recent years, this idea got a lot of traction and a whole bunch of solutions like Twitter’s Storm, Yahoo’s S4, Cloudera’s Impala, Apache Spark, and Apache Tez appeared and joined the army of Big Data and NoSQL systems. This article is an effort to

At Grid Dynamics, we recently faced a necessity to build an in-stream data processing system that aimed to crunch about 8 billion events daily providing fault-tolerance and strict transactioanlity i.e. none of these events can be lost or duplicated. This system has been designed to supplement and succeed the existing Hadoop-based system that had too high latency of data processing and too high maintenance costs.(使用Hadoop系统来处理数据会产生比较高的延迟,是流式系统出现的重要原因) A high-level overview of the environment we worked with is shown in the figure below:


Our goal was to equip all facilities with a new in-stream engine (shown in the bottom of the figure) that processes most intensive data flows and ships the pre-aggregated data to the central facility, thus decreasing the amount of raw data and heavy batch jobs in Hadoop. The design of the in-stream processing engine itself was driven by the following requirements: 流式处理系统通常比较重要的特征是:

1. Basics of Distributed Query Processing

It is clear that distributed in-stream data processing has something to do with query processing in distributed relational databases. Many standard query processing techniques can be employed by in-stream processing engine, so it is extremely useful to understand classical algorithms of distributed query processing and see how it all relates to in-stream processing and other popular paradigms like MapReduce.

1.1. Partitioning and Shuffling

Although optimal partitioning for selection and projection operations can be tricky (e.g. for range queries), we can assume that for in-stream data filtering it is practically enough to distribute data among the processors using a hash-based partitioning.

Processing of distributed joins is not so easy and requires a more thorough examination. There are two main data partitioning techniques that can be employed by distributed join processing:

  • Disjoint data partitioning
  • Divide and broadcast join

In a distributed database, division typically is not a part of the query processing itself because data sets are initially distributed among multiple nodes.

Disjoint data partitioning technique shuffles the data into several partitions in such a way that join keys in different partitions do not overlap.


The divide and broadcast join algorithm is illustrated in the figure below. This method divides the first data set into multiple disjoint partitions (R1, R2, and R3 in the figure) and replicates the second data set to all processors.


1.2. Pipelining

all operators in a query should be chained in such a way that the data flows smoothly through the entire pipeline i.e. neither operation should block processing by waiting for a large piece of input data without producing any output or by writing intermediate results on disk. Some operations like sorting are inherently incompatible with this concept (obviously, a sorting block cannot produce any output until the entire input is ingested), but in many cases pipelining algorithms are applicable. (虽然一些算法需要等待到全量数据到达,但是大部分算法还是可以in-stream处理的。对于这类需要全量数据的算法其实应该考虑提供时间窗口,比如10s内数据作为一个batch来处理这样,maybe) A typical example of pipelining is shown below: (下面以hash join为例)


In-stream processing naturally employs this technique to join a data stream with the static data (admixtures). 但是上面这种方式通常是和静态数据来做join的。

In relational databases, join operation can take advantage of pipelining by using the symmetric hash join algorithm or some of its advanced variants. Symmetric hash join is a generalization of hash join. Whereas a normal hash join requires at least one of its inputs to be completely available to produce first results (the input is needed to build a hash table), symmetric hash join is able to produce first results immediately. In contrast to the normal hash join, it maintains hash tables for both inputs and populates these tables as tuples arrive:


As a tuple comes in, the joiner first looks it up in the hash table of the other stream. If match is found, an output tuple is produced. Then the tuple is inserted in its own hash table. However, it does not make a lot of sense to perform a complete join of infinite streams. In many cases join is performed on a finite time window or other type of buffer e.g. LFU cache that contains most frequent tuples in the stream. Symmetric hash join can be employed if the buffer is large comparing to the stream rate or buffer is flushed frequently according to some application logic or buffer eviction strategy is not predictable. (sym hash join有一定的限制,依然没有获得全量的数据,但是如果每次到达的数据足够多的话,那么实际上这种方法还是可行的。这里的Hash数据也是需要不断evict的)

It is worth noting that in-stream processing often deals with sophisticated stream correlation algorithms where records are matched based on scoring metrics, not on field equality condition. A more complex system of buffers can be required for both streams in such cases. (在online或者是in-stream上面算法相对都比较fancy)

2. In-Stream Processing Patterns

In this section, we discuss a number of techniques that are used by streaming systems to provide message delivery guarantees and some other patterns that are not typical for standard query processing.

2.1. Stream Replay


2.2. Lineage Tracking

In a streaming system, events flow through a chain of processors until the result reaches the final destination (like an external database). Each input event produces a directed graph of descendant events (lineage) that ends by the final results. To guarantee reliable data processing, it is necessary to ensure that the entire graph was processed successfully and to restart processing in case of failures. (所谓lineage tracking就是,追踪output对应的input以及对应的pipelines. 如果output失败的话那么可以从input开始重新构建这个output)

  • twitter storm lineage tracking
  • apache spark lineage tracking

2.2.1. storm lineage tracking


  • 每个消息分配一个ID和签名sig
  • 对于发送节点来说,会有一张表来做[ID,sig']映射关系,初始时候这个表是[ID,0]
  • 对于处理节点(包括发送节点)来说,假设接收消息[ID,sig],(对于发送节点来说,接受消息就是[ID,0])
    • 输出消息使用和接受消息相同的ID,但是不同的签名比如[ID,sig1],[ID,sig2],…
    • 处理完成之后,将输出消息和接收消息一起发送给我发送节点
    • 发送节点以这个ID为key,value更新为 sig XOR sig1 XOR sig2 …

举个例子,假设发送端产生数据[0,01111], 处理节点1产生数据[0,10000], 处理节点2直接输出到数据库不产生数据

  • 发送端因为只发送给处理节点1数据,所以表更新为[0, 01111 XOR 0] = [0,011111]
  • 处理节点1处理完成之后,将表更新为[0, 01111 XOR 01111 XOR 10000] = [0,10000]
  • 处理节点2处理完成之后,将表更新为[0, 10000 XOR 10000] = [0,0]

因此一个正常的逻辑下来,某个ID对应的sig最终应该是0的。然后发送端从发送完成数据之后,就可以定期检查ID是否都为0. 如果超过一定时间不为0的话,那么说明数据中途丢失,也可能是处理超时,那么发送端就会触发重传的逻辑。

上面这个算法有个好处就是和更新顺序没有任何关系,并且非常高效。但是有一定的概率在处理节点中间的时候就更新成为0。One can note that the algorithm above is not strictly reliable – the signature could turn into zero accidentally due to unfortunate combination of IDs. However, 64-bit IDs are sufficient to guarantee a very low probability of error, about 2^(-64), that is acceptable in almost all practical applications. As result, the table of signatures could have a small memory footprint.


2.2.2. spark lineage tracking

The idea is to consider the final result as a function of the incoming data. To simplify lineage tracking, the framework processes events in batches, so the result is a sequence of batches where each batch is a function of the input batches. Resulting batches can be computed in parallel and if some computation fails, the framework simply reruns it. The framework considers the incoming streams not as streams, but as set of batches. Each batch has an ID and the framework can fetch it by the ID at any moment of time. So, stream processing can be represented as a bunch of transactions where each transaction takes a group of input batches, transforms them using a processing function, and persists a result. 这点和Spark本身有关系。spark使用function来描述dataflow,batch作为input, 所以很自然lineage就可以使用这种function来描述。


This simple but powerful paradigm enables centralized transaction management and inherently provides exactly-once message processing semantics. It is worth noting that this technique can be used both for batch processing and for stream processing because it treats the input data as a set of batches regardless to their streaming of static nature.

2.3. State Checkpointing


2.4. Additive State and Sketches

  • Additivity of intermediate and final computational results is an important property that drastically simplifies design, implementation, maintenance, and recovery of in-stream data processing systems. Additivity means that the computational result for a larger time range or a larger data partition can be calculated as a combination of results for smaller time ranges or smaller partitions.
  • Sketches is a very efficient way to transform non-additive values into additive. Sketches are very popular in certain areas like internet advertising and can be considered as an independent pattern of in-stream processing.

2.5. Logical Time Tracking

It is very common for in-stream computations to depend on time: aggregations and joins are often performed on sliding time windows; processing logic often depends on a time interval between events and so on. Obviously, the in-stream processing system should have a notion of application’s view of time, instead of CPU wall-clock. However, proper time tracking is not trivial because data streams and particular events can be replayed in case of failures. Each processor in a pipeline tracks the maximal timestamp it has seen in a stream and updates a global persistent clock by this timestamp if the global clock is behind. All other processors synchronize their time with the global clock.#todo: 为什么需要同步这个global clock?

2.6. Aggregation in a Persistent Store

Instead of maintaining in-memory event buffers, one can simply save all incoming events from all data streams to Casandra using a join key as row key, as it shown in the figure below. On the other side, the second process traverses the records periodically, assembles and emits joined events, and evicts the events that fell out of the time window. Cassandra even can facilitate this activity by sorting events according to their timestamps.


It is important to understand that such techniques can defeat the whole purpose of in-stream data processing if implemented incorrectly – writing individual events to the data store can introduce a serious performance bottleneck even for fast stores like Cassandra or Redis. On the other hand, this approach provides perfect persistence of the computational state and different performance optimizations – say, batch writes – can help to achieve acceptable performance in many use cases. #note: 不要当作通用技术使用

2.7. Aggregation on a Sliding Window

Incremental computations over sliding windows is a group of techniques that are widely used in digital signal processing, in both software and hardware.


3. Query Processing Pipeline: Storm, Cassandra, Kafka

4. Towards Unified Big Data Processing

The key observation is that relational query processing, MapReduce, and in-stream processing could be implemented using exactly the same concepts and techniques like shuffling and pipelining. At the same time:

  • In-stream processing could require strict data delivery guarantees and persistence of the intermediate state. These properties are not crucial for batch processing where computations can be easily restarted.
  • In-stream processing is inseparable from pipelining. For batch processing, pipelining is not so crucial and even inapplicable in certain cases. Systems like Apache Hive are based on staged MapReduce with materialization of the intermediate state and do not take full advantage of pipelining.

#note: stream和batch之间的差异主要还是在数据量上。spark被称为stream系统但是内部还是按照batch来做处理的,只不过在实时性上spark和mapreduce区分开来。实时性一方面来源于input/output性质, 一方面来源于pipelining的中间状态是否需要保存以及如何保存

The two statement above imply that tunable persistence (in-memory message passing versus on-disk materialization) and reliability are the distinctive features of the imaginary query engine that provides a set of processing primitives and interfaces to the high-level frameworks:


Among the emerging technologies, the following two are especially notable in the context of this discussion:

  • Apache Tez, a part of the Stinger Initiative. Apache Tez is designed to succeed the MapReduce framework introducing a set of fine-grained query processing primitives. The goal is to enable frameworks like Apache Pig and Apache Hive to decompose their queries and

scripts into efficient query processing pipelines instead of sequences of MapReduce jobs that are generally slow due to materialization of intermediate results.

  • Apache Spark. This project is probably the most advanced and promising technology for unified Big Data processing that already includes a batch processing framework, SQL query engine, and a stream processing framework.

5. References

  • A. Wilschut and P. Apers, “Dataflow Query Execution in a Parallel Main-Memory Environment “
  • T. Urhan and M. Franklin, “XJoin: A Reactively-Scheduled Pipelined Join Operator“
  • M. Zaharia, T. Das, H. Li, S. Shenker, and I. Stoica, “Discretized Streams: An Efficient and Fault-Tolerant Model for Stream Processing on Large Clusters”
  • E. Jacobsen and R. Lyons, “The Sliding DFT“
  • A. Elmagarmid, Data Streams Models and Algorithms
  • N. Marz, “Big Data Lambda Architecture”
  • J. Kinley, “The Lambda architecture: principles for architecting realtime Big Data systems”