Apache Hadoop Goes Realtime at Facebook

Table of Contents




  • At Facebook, Hadoop has traditionally been used in conjunction with Hive for storage and analysis of large data sets. Most of this analysis occurs in offline batch jobs and the emphasis has been on maximizing throughput and efficiency. These workloads typically read and write large amounts of data from disk sequentially. As such, there has been less emphasis on making Hadoop performant for random access workloads by providing low latency access to HDFS. (早期Hadoop只是用于离线分析,顺序读写磁盘,没有关注通过降低访问HDFS的延迟来提升随机访问的性能)Instead, we have used a combination of large clusters of MySQL databases and caching tiers built using memcached. In many cases, results from Hadoop are uploaded into MySQL or memcached for consumption by the web tier.(对于在线数据访问是将Hadoop数据upload到MySQL, 然后配合memcached来完成的)
  • Recently, a new generation of applications has arisen at Facebook that require very high write throughput and cheap and elastic storage, while simultaneously requiring low latency and disk efficient sequential and random read performance. MySQL storage engines are proven and have very good random read performance, but typically suffer from low random write throughput. It is difficult to scale up our MySQL clusters rapidly while maintaining good load balancing and high uptime. Administration of MySQL clusters requires a relatively high management overhead and they typically use more expensive hardware. Given our high confidence in the reliability and scalability of HDFS, we began to explore Hadoop and HBase for such applications.(MySQL的随机读性能不错,但是随机写性能非常差,同时在扩展性上也不太好。管理集群需要非常高的额外代价,并且在硬件使用方面都是高端机器)
    • The first set of applications requires realtime concurrent, but sequential, read access to a very large stream of realtime data being stored in HDFS. (HDFS必须支持高并发的实时读写需求)An example system generating and storing such data is Scribe , an open source distributed log aggregation service created by and used extensively at Facebook. Previously, data generated by Scribe was stored in expensive and hard to manage NFS servers.
    • The second generation of non-MapReduce Hadoop applications needed to dynamically index a rapidly growing data set for fast random lookups. In addition, this new application had to be suited for production use by more than 500 million people immediately after launch and needed to scale to many petabytes of data with stringent uptime requirements. We decided to use HBase for this project. HBase in turn leverages HDFS for scalable and fault tolerant storage and ZooKeeper for distributed consensus.(HBase用来存储索引并且提供实时的随机查询)


3.1 Facebook Messaging


3.1.1 High Write Throughput

3.1.2 Large Tables

3.1.3 Data Migration

3.2 Facebook Insights


3.2.1 Realtime Analytics

The insights teams wanted to make statistics available to their users within seconds of user actions rather than the hours previously supported. This would require a large-scale, asynchronous queuing system for user actions as well as systems to process, aggregate, and persist these events. All of these systems need to be fault-tolerant and support more than a million events per second.

3.2.2 High Throughput Increments

3.3 Facebook Metrics System (ODS)


3.3.1 Automatic Sharding

3.3.2 Fast Reads of Recent Data and Table Scans


The requirements for the storage system from the workloads presented above can be summarized as follows (in no particular order):

  1. Elasticity: We need to be able to add incremental capacity to our storage systems with minimal overhead and no downtime. In some cases we may want to add capacity rapidly and the system should automatically balance load and utilization across new hardware. (扩展性好)
  2. High write throughput: Most of the applications store (and optionally index) tremendous amounts of data and require high aggregate write throughput.(高吞吐写入)
  3. Efficient and low-latency strong consistency semantics within a data center. While a globally distributed strongly consistent system is practically impossible, a system that could at least provide strong consistency within a data center would make it possible to provide a good user experience.(单机房里高效并且低延迟地达到强一致性)
  4. Efficient random reads from disk: In spite of the widespread use of application level caches (whether embedded or via memcached), at Facebook scale, a lot of accesses miss the cache and hit the back-end storage system. MySQL is very efficient at performing random reads from disk and any new system would have to be comparable.(磁盘随机读需要高效)
  5. High Availability and Disaster Recovery: We need to provide a service with very high uptime to users that covers both planned and unplanned events (examples of the former being events like software upgrades and addition of hardware/capacity and the latter exemplified by failures of hardware components). We also need to be able to tolerate the loss of a data center with minimal data loss and be able to serve data out of another data center in a reasonable time frame.(高可用性以及容灾)
  6. Fault Isolation: Our long experience running large farms of MySQL databases has shown us that fault isolation is critical. Individual databases can and do go down, but only a small fraction of users are affected by any such event. Similarly, in our warehouse usage of Hadoop, individual disk failures affect only a small part of the data and the system quickly recovers from such faults.(错误隔离。即使出现问题的话只是部分数据受到影响,而其他数据依然可以正常访问和读写)
  7. Atomic read-modify-write primitives: Atomic increments and compare-and-swap APIs have been very useful in building lockless concurrent applications and are a must have from the underlying storage system(这个可能对于计数是有用的)
  8. Range Scans: Several applications require efficient retrieval of a set of rows in a particular range. For example all the last 100 messages for a given user or the hourly impression counts over the last 24 hours for a given advertiser.(范围扫描比如需要知道最后100条消息的时候可能有用)

It is also worth pointing out non-requirements:

  1. Tolerance of network partitions within a single data center: Different system components are often inherently centralized. For example, MySQL servers may all be located within a few racks, and network partitions within a data center would cause major loss in serving capabilities therein. Hence every effort is made to eliminate the possibility of such events at the hardware level by having a highly redundant network design.(网络分割性的话通过在硬件层面解决比如使用高冗余的网络设计)
  2. Zero Downtime in case of individual data center failure: In our experience such failures are very rare, though not impossible. In a less than ideal world where the choice of system design boils down to the choice of compromises that are acceptable, this is one compromise that we are willing to make given the low occurrence rate of such events
  3. Active-active serving capability across different data centers: As mentioned before, we were comfortable making the assumption that user data could be federated across different data centers (based ideally on user locality). Latency (when user and data locality did not match up) could be masked by using an application cache close to the user.


5.1 High Availability - AvatarNode

NN hot standby.

  • fsimage的editlog通过NFS传递
  • 多个avatar node之间通过zookeeper选举primary node,
  • dn会向多个avatar node做block report.

5.2 Hadoop RPC compatibility


5.3 Block Availability: Placement Policy

  • The default HDFS block placement policy, while rack aware, is still minimally constrained. Placement decision for non-local replicas is random, it can be on any rack and within any node of the rack.
  • To reduce the probability of data loss when multiple simultaneous nodes fail, we implemented a pluggable block placement policy that constrains the placement of block replicas into smaller, configurable node groups.(编写可插拔的策略. 将node进行分组来决定replicas放置位置) This allows us to reduce the probability of data loss by orders of magnitude, depending on the size chosen for the groups.
  • Our strategy is to define a window of racks and machines where replicas can be placed around the original block, using a logical ring of racks, each one containing a logical ring of machines. More details, the math, and the scripts used to calculate these numbers can be found at HDFS-1094. (这个策略我推断是这样的,首先将所有的rack做编号,然后每个rack内部的machine做编号。然后根据original block的位置,1th replica位置应该是在相同的rack但是不同的机器,这个机器和这个original block距离是某个windows size. 同理rack如此)
  • We found that the probability of losing a random block increases with the size of the node group. In our clusters, we started to use a node group of (2, 5), i.e. a rack window size of 2 and a machine window size of 5. We picked this choice because the probability of data loss is about a hundred times lesser than the default block placement policy.

5.4 Performance Improvements for a Realtime Workload

HDFS is originally designed for high-throughput systems like MapReduce. Many of its original design principles are to improve its throughput but do not focus much on response time. For example, when dealing with errors, it favors retries or wait over fast failures. To support realtime applications, offering reasonable response time even in case of errors becomes the major challenge for HDFS.

5.4.1 RPC Timeout

  • When a RPC client detects a tcp-socket timeout, instead of declaring a RPC timeout, it sends a ping to the RPC server. If the server is still alive, the client continues to wait for a response. (原有RPC实现是检测发生超时的话那么会发送一个ping检查RPC server是否存在,如果存在的话,那么依然会等待响应结果)
    • The idea is that if a RPC server is experiencing a communication burst, a temporary high load, or a stop the world GC, the client should wait and throttles its traffic to the server. (这是因为考虑到dn可能有高峰的负载或者是GC,所以client会等待并且自动调节和server的流量)
    • On the contrary, throwing a timeout exception or retrying the RPC request causes tasks to fail unnecessarily or add additional load to a RPC server. (相反如果立即返回exception或者是重试的话,那么可能造成task不必要地失败,或者是对RPC server造成更大的负担)
  • However, infinite wait adversely impacts any application that has a real time requirement. An HDFS client occasionally makes an RPC to some Dataode, and it is bad when the DataNode fails to respond back in time and the client is stuck in an RPC. (但是上面的策略会导致client stuck在某个RPC上,这对于实时系统是不可以接收的)
    • A better strategy is to fail fast and try a different DataNode for either reading or writing. (一个比较好的解决办法就是如果发现超时的话那么更换一台dn机器进行尝试)
    • Hence, we added the ability for specifying an RPC-timeout when starting a RPC session with a server.

5.4.2 Recover File Lease

  • Another enhancement is to revoke a writer‘s lease quickly. 加快回收writer的租赁时间
    • HDFS supports only a single writer to a file and the NameNode maintains leases to enforce this semantic. (HDFS只允许每个文件一个writer)
    • There are many cases when an application wants to open a file to read but it was not closed cleanly earlier.(但是如果之前的writer没有正确清理而这个时候有reader的话,那么reader就需要等待这个writer尽快释放其lease)
    • Previously this was done by repetitively calling HDFS-append on the log file until the call succeeds. The append operations triggers a file’s soft lease to expire. So the application had to wait for a minimum of the soft lease period (with a default value of one minute) before the HDFS name node revokes the log file‘s lease.(开始是不断调用append来出发soft lease失效,但是失效也需要等待一段时间)
    • Secondly, the HDFS-append operation has additional unneeded cost as establishing a write pipeline usually involves more than one DataNode. When an error occurs, a pipeline establishment might take up to 10 minutes. (而append本身也是存在不必要的开销需要建立write pipeline)
  • To avoid the HDFS-append overhead, we added a lightweight HDFS API called recoverLease that revokes a file’s lease explicitly. (增加API以及对应的语义来加快lease的失效和回收)
    • When the NameNode receives a recoverLease request, it immediately changes the fileBs lease holder to be itself. It then starts the lease recovery process.
    • The recoverLease rpc returns the status whether the lease recovery was complete. The application waits for a success return code from recoverLease before attempting to read from the file.

5.4.3 Reads from Local Replicas

  • There are times when an application wants to store data in HDFS for scalability and performance reasons. However, the latency of reads and writes to an HDFS file is an order of magnitude greater than reading or writing to a local file on the machine.
  • To alleviate this problem, we implemented an enhancement to the HDFS client that detects that there is a local replica of the data and then transparently reads data from the local replica without transferring the data via the DataNode. This has resulted in doubling the performance profile of a certain workload that uses HBase.(如果发现本地有对应hdfs block的话那么直接从本地进行读取,而在走dn这层)

5.5 New Features

5.5.1 HDFS sync

  • Hflush/sync is an important operation for both HBase and Scribe. It pushes the written data buffered at the client side to the write pipeline, making the data visible to any new reader and increasing the data durability when either the client or any DataNode on the pipeline fails.(调用hflush/sync之后,之前所写的内容应该是全部都到了dn disk上面,能够被所有的reader读取到) #todo: 对hflush/sync这个语义至今比较模糊,一个比较主要的原因就是这个API历史上是调整过语义的
  • Hflush/sync is a synchronous operation, meaning that it does not return until an acknowledgement from the write pipeline is received. Since the operation is frequently invoked, increasing its efficiency is important.(但是这个过程是同步的)
  • One optimization we have is to allow following writes to proceed while an Hflush/sync operation is waiting for a reply. This greatly increases the write throughput in both HBase and Scribe where a designated thread invokes Hflush/sync periodically.(一个优化就是在write的同时调用hflush/sync并且等待返回,这样可以增加写入的吞吐)

5.5.2 Concurrent Readers

  • We have an application that requires the ability to read a file while it is being written to. The reader first talks to the NameNode to get the meta information of the file. Since the NameNode does not have the most updated information of its last block‘s length, the client fetches the information from one of the DataNodes where one of its replicas resides. It then starts to read the file.
  • The challenge of concurrent readers and writer is how to provision the last chunk of data when its data content and checksum are dynamically changing. We solve the problem by recomputing the checksum of the last chunk of data on demand.


6.1 ACID Compliance

6.2 Availability Improvements

6.2.1 HBase Master Rewrite

  • We originally uncovered numerous issues during kill testing where HBase regions would go offline. We soon identified the problem: the transient state of the cluster is stored in the memory of the currently active HBase master only. Upon losing the master, this state is lost.
  • We undertook a large HBase master rewrite effort. The critical component of this rewrite was moving region assignment information from the master's in-memory state to ZooKeeper. Since ZooKeeper is quorum written to a majority of nodes, this transient state is not lost on master failover and can survive multiple server outages.(将一些中间状态比如region分配信息等写入到zookeeper里面,这样如果master失败的话那么重启的时候还能够恢复)

6.2.2 Online Upgrades

  • The largest cause of cluster downtime was not random server deaths, but rather system maintenance. We had a number of problems to solve to minimize this downtime. (系统维护是整个服务down掉的最主要的因素)
  • First, we discovered over time that RegionServers would intermittently require minutes to shutdown after issuing a stop request. This intermittent problem was caused by long compaction cycles. To address this, we made compactions interruptible to favor responsiveness over completion. This reduced RegionServer downtime to seconds and gave us a reasonable bound on cluster shutdown time. (通常在stop之前最要做一个compaction, 所以在发起stop到真正完全down掉期间会有很长的停顿。我们修改compaction使得整个过程可以中断,这样stop的时间就变得可空)
  • Another availability improvement was rolling restarts. Originally, HBase only supported full cluster stop and start for upgrades. We added rolling restarts script to perform software upgrades one server at a time. Since the master automatically reassigns regions on a RegionServer stop, this minimizes the amount of downtime that our users experience. (修改启动脚本能够让整个集群滚动地启动,而因为hbase每次rs下面都会做rebalance, 因此整个过程是非常快速的)
    • We fixed numerous edge case issues that resulted from this new restart. Incidentally, numerous bugs during rolling restarts were related to region offlining and reassignment, so our master rewrite with ZooKeeper integration helped address a number of issues here as well. (在offlining和reassignment的过程中出现了很多边界情况,通过将master状态写到zookeeper这个实现有助于定位和解决问题)

6.2.3 Distributed Log Splitting

6.3 Performance Improvements

6.3.1 Compaction

  • The next task was improving the compaction algorithm. We discovered a pathological case where a 1 GB file would be regularly compacted with three 5 MB files to produce a slightly larger file. This network IO waste would continue until the compaction queue started to backlog. This problem occurred because the existing algorithm would unconditionally minor compact the first four HFiles, while triggering a minor compaction after 3 HFiles had been reached. The solution was to stop unconditionally compacting files above a certain size and skip compactions if enough candidate files could not be found. Afterwards, our put latency dropped from 25 milliseconds to 3 milliseconds.
  • We also worked on improving the size ratio decision of the compaction algorithm. Originally, the compaction algorithm would sort by file age and compare adjacent files. If the older file was less than 2x the size of the newer file, the compaction algorithm with include this file and iterate. However, this algorithm had suboptimal behavior as the number and size of HFiles increased significantly. To improve, we now include an older file if it is within 2x the aggregate size of all newer HFiles. This transforms the steady state so that an old HFile will be roughly 4x the size of the next newer file, and we consequently have a steeper curve while still maintaining a 50% compaction ratio.

6.3.2 Read Optimizations

  • As discussed, read performance hinges on keeping the number of files in a region low thus reducing random IO operations. In addition to utilizing comapctions to keep the number of files on disk low, it is also possible to skip certain files for some queries, similarly reducing IO operations.(减少sstable文件)
  • Bloom filters provide a space-efficient and constant-time method for checking if a given row or row and column exists in a given HFile.
  • For data stored in HBase that is time-series or contains a specific, known timestamp, a special timestamp file selection algorithm was added. Since time moves forward and data is rarely inserted at a significantly later time than its timestamp, each HFile will generally contain values for a fixed range of time. This information is stored as metadata in each HFile and queries that ask for a specific timestamp or range of timestamps will check if the request intersects with the ranges of each file, skipping those which do not overlap.(直接在HFile里面添加timestamp信息来做过滤)
  • As read performance improved significantly with HDFS local file reads, it is critical that regions are hosted on the same physical nodes as their files. Changes have been made to retain the assignment of regions across cluster and node restarts to ensure that locality is maintained


7.1 Testing

  • From early on in our design of an HBase solution, we were worried about code stability. We first needed to test the stability and durability of the open source HBase code and additionally ensure the stability of our future changes. To this end, we wrote an HBase testing program. The testing program generated data to write into HBase, both deterministically and randomly. The tester will write data into the HBase cluster and simultaneously read and verify all the data it has added. (对于HBase读写做正确性验证)We further enhanced the tester to randomly select and kill processes in the cluster and verify that successfully returned database transactions were indeed written. This helped catch a lot of issues, and is still our first method of testing changes(并且随机杀掉一些进程来验证数据是否正确)
  • Although our common cluster contains many servers operating in a distributed fashion, our local development verification commonly consists of unit tests and single-server setups. We were concerned about discrepancies between single-server setups and truly distributed scenarios. We created a utility called HBase Verify to run simple CRUD workloads on a live server. This allows us to exercise simple API calls and run load tests in a couple of minutes. This utility is even more important for our dark launch clusters, where algorithms are first evaluated at a large scale.(对于单机使用了unittest以及a live-server上面验证CRUD操作是否正确)

7.2 Monitoring and Tools

  • As we gained more experience with production usage of HBase, it became clear that our primary problem was in consistent assignment of regions to RegionServers. Two RegionServers could end up serving the same region, or a region may be left unassigned.(region assignement不一致,会造成某个region被两个rs共同管理,或者是某个region没有rs来管理)
    • These problems are characterized by inconsistencies in metadata about the state of the regions that are stored in different places: the META region in HBase, ZooKeeper, files corresponding to a region in HDFS and the in-memory state of the RegionServers.
    • To that end, we created HBCK as a database-level FSCK utility to verify the consistency between these different sources of metadata. For the common inconsistencies, we added an HBCK ifixB option to clear the inmemory state and have the HMaster reassign the inconsistent region. Nowadays we run HBCK almost continuously against our production clusters to catch problems as early as possible.
  • A critical component for cluster monitoring is operational metrics. In particular, RegionServer metrics are far more useful for evaluating the health of the cluster than HMaster or ZooKeeper metrcs. HBase already had a number of metrics exported through JMX. However, all the metrics were for short-running operations such as log writes and RPC requests. We needed to add metrics to monitor long-running events such as compactions, flushes, and log splits. A slightly innocuous metric that ended up being critical for monitoring was version information. We have multiple clusters that often have divergent versions.(版本是比较重要的监控信息,因为每个版本功能是不同的,而整个集群里面可能会存在很多不同的版本)

7.3 Manual versus Automatic Splitting

#note: manual splitting over automatic splitting是有一定前提假设的,不过确实带来了许多可控方面的好处

  • Since our data grows roughly uniform across all regions, it's easy for automatic splitting to cause split and compaction storms as the regions all roughly hit the same data size at the same time. With manual splits, we can stagger splits across time and thereby spread out the network IO load typically generated by the splitting process. This minimizes impact to production workload.
  • Since the number of regions is known at any given point in time, long-term debugging and profiling is much easier. It is hard to trace the logs to understand region level problems if regions keep splitting and getting renamed.

7.4 Dark Launch

  • Migrating from a legacy messaging system offered one major advantage: real-world testing capability. At Facebook, we widely use a testing/rollout process called "Dark Launch" where critical back-end functionality is exercised by a subset of the user base without exposing any UI changes to them. We used this facility to double-write messaging traffic for some users to both the legacy infrastructure and HBase.(将一部分用户请求重写一份到新的集群上面)
    • This allowed us to do useful performance benchmarks and find practical HBase bottlenecks instead of relying purely on artificial benchmarks and estimations. (使用真实流量测试性能)
    • Even after product launch, we still found many uses for Dark Launch clusters. All code changes normally spend a week running on Dark Launch before a production push is considered. (正式发布是在暗启动正常工作1周后上线)
    • Additionally, Dark Launch normally handles at least 2x the load that we expect our production clusters to handle. Long term testing at 2x load allows us to weather multiple traffic spikes and verify that HBase can handle outlier peak conditions before we vertically scale.

7.5 Dashboards/ODS integration

7.6 Backups at the Application layer

7.7 Schema Changes

7.8 Importing Data

7.9 Reducing Network IO

  • After running in production for a couple months, we quickly realized from our dashboards that we were network IO bound. We needed some way to analyze where our network IO traffic was coming from.
  • We utilized a combination of JMX statistics and log scraping to estimate total network IO on a single RegionServer for a 24-hour period. We broke down the network traffic across the MemStore flush (15%), size-based minor compactions (38%), and time-based major compactions (47%).(通过JMX来观察网路流量的开销来进行优化) We found a lot of lowhanging optimizations by observing these ratios.
    • We were able to get 40% network IO reduction by simply increasing our major compaction interval from every day to every week.
    • We also got big gains by excluding certain column families from being logged to the HLog. Best effort durability sufficed for data stored in these column families


  • adding support for maintenance of secondary indices and summary views in HBase. In many use cases, such derived data and views can be maintained asynchronously (维护二级索引以及摘要等信息,这些信息通常都是可以异步维护的)
  • Many use cases benefit from storing a large amount of data in HBaseBs cache and improvements to HBase are required to exploit very large physical memory. The current limitations in this area arise from issues with using an extremely large heap in Java and we are evaluating several proposals like writing a slab allocator in Java or managing memory via JNI(使用JNI来改写slab allocator)
  • A related topic is exploiting flash memory to extend the HBase cache and we are exploring various ways to utilize it including FlashCache
  • Finally, as we try to use Hadoop and HBase for applications that are built to serve the same data in an active-active manner across different data centers, we are exploring approaches to deal with multi data-center replication and conflict resolution(多机房部署使用以及冲突处理)
comments powered by Disqus