Pregel: A System for Large-Scale Graph Processing
Table of Contents
http://kowshik.github.com/JPregel/pregel_paper.pdf @ 2010
The name honors Leonhard Euler. The Bridges of Konigsberg, which inspired his famous theorem, spanned the Pregel river(Euler回路起源就是Konigsberg桥问题,而这个桥就在Pregel River上面)
1. Miscellaneous
- JPregel @ GitHub http://kowshik.github.com/JPregel/
- Welcome To Apache Incubator Giraph http://incubator.apache.org/giraph/
- Bagel Programming Guide https://github.com/mesos/spark/wiki/Bagel-Programming-Guide
- Pregel: Google’s other data-processing infrastructure http://www.royans.net/arch/pregel-googles-other-data-processing-infrastructure/
- Pregel: A System for Large-Scale Graph Processing http://www.slideshare.net/shatteredNirvana/pregel-a-system-for-largescale-graph-processing
2. Abstract
- Programs are expressed as a sequence of iterations, in each of which a vertex can receive messages sent in the previous iteration, send mes-sages to other vertices, and modify its own state and that of its outgoing edges or mutate graph topology. (主要是为了解决大规模图算法的问题。程序分为多次迭代地执行,每次迭代在顶点上面处理上次迭代接受到的消息,产生消息发送到其他节点,并且修改自身的状态,或者修改其出边,或者是修改整个图的拓扑结构)
3. Introduction
- The Internet made the Web graph a popular object of analysis and research. Web 2.0 fueled interest in social net-works. Other large graphs—for example induced by trans-portation routes, similarity of newspaper articles, paths of disease outbreaks, or citation relationships among published scientific work—have been processed for decades. (相当数量大规模图算法的应用)
- Efficient processing of large graphs is challenging. (图算法本身特点,这些在设计框架的时候需要考虑)
- Graph algorithms often exhibit poor locality of memory access, (locality非常差,需要访问其他节点产生的数据)
- very little work per vertex, and (每个节点计算量并不大,主要还是多轮大量节点的迭代效果)
- a changing degree of parallelism over the course of execution [31, 39]. (在执行过程中需要不断地修改并行度/图的拓扑结构)
- Distribution over many machines exacerbates the locality issue, and increases the probability that a machine will fail during computation.(分布式需要考虑locality问题,并且还需要failover的情况)
- MapReduce [14,], for example, is a very good fit for a wide array of large-scale computing problems. It is sometimes used to mine large graphs [11, 30], but this can lead to sub-optimal performance and usability issues. The basic models for processing data have been extended to fa-cilitate aggregation [41,] and SQL-like queries [40, 47], but these extensions are usually not ideal for graph al-gorithms that often better fit a message passing model.(对于MR来说非常适合大规模的计算问题,并且有时候能够用来最一些挖掘工作,但是并不是最优的解决方案,并且还需要考虑一些可用性问题。本质上对于图算法来说的话,使用消息传递机制会更加合适。/另外MR计算方式和图算法似乎还需要一个转换过程,而并不是intuitive的)
- Pregel computations consist of a sequence of iterations, called su-persteps. During a superstep the framework invokes a user-defined function for each vertex, conceptually in parallel. (每次迭代称为superstep,在每个superstep框架会调用每个vertex的自定义函数,这个过程是并行都的)
- The function specifies behavior at a single vertex V and a single superstep S. It can read messages sent to V in su-perstep S − 1, send messages to other vertices that will be received at superstep S + 1, and modify the state of V and its outgoing edges. (每个节点会读取S-1时候其他节点发送给它的小心,处理完成之后就可以向其他节点发送消息,让这些节点在S+1时候处理。同时还可以修改整个图的拓扑结构)
- Messages are typically sent along outgo-ing edges, but a message may be sent to any vertex whose identifier is known.(message并不一定是要随着outgoing edges,而是可以发送到任何节点)。
- The vertex-centric approach is reminiscent of MapReduce in that users focus on a local action, processing each item independently, and the system composes these actions to lift computation to a large dataset. By design the model is well suited for distributed implementations: it doesn’t expose any mechanism for detecting order of execution within a superstep, and all communication is from superstep S to superstep S + 1.(以节点为中心的设计方式和MR非常像,用户只需要关于一个本地计算,对于每个item单独处理然后合并在一起形成一个大数据。这种设计是非常适合分布式的,对于一个superstep内部来说没有定义执行顺序,所有通信都是从S->S+1之间的)
- The synchronicity of this model makes it easier to reason about program semantics when implementing algorithms, and ensures that Pregel programs are inherently free of dead-locks and data races common in asynchronous systems.(以同步的方式可以更好地对program进行reason,并且可以使得整个系统避免死锁)
- In principle the performance of Pregel programs should be com-petitive with that of asynchronous systems given enough parallel slack [28, 34]. Because typical graph computations have many more vertices than machines, one should be able to balance the machine loads so that the synchronization between supersteps does not add excessive latency. (因为节点数目远大于机器,因此整个分布式非常均匀的,因此采用同步的方式并不会带来太多的延迟,大部分节点完成时间是相同的,不会存在某个节点计算时间过长,否则采用异步方式会使得延迟更小。) #note: 异步方式会使得程序更加难懂,因为缺少一个barrier,V1可能计算到了Step3,而V2可能只是计算到Step1。此外也比较难确定一个节点什么时候需要计算下一轮,需要很多额外的描述
4. Model of Computation
- The input to a Pregel computation is a directed graph in which each vertex is uniquely identified by a string vertex identifier. Each vertex is associated with a modifiable, user defined value. The directed edges are associated with their source vertices, and each edge consists of a modifiable, user defined value and a target vertex identifier.(pregel的输入就是一个有向图,每个节点都有一个唯一标识,并且有一个相应的自定义可修改数据。edge对应了source和target vertex,并且上面也有自定义数据) 。 但是注意 Edges are not first-class citizens in this model, having no associated computation. 对于edge来说并不是first-class,pregel是vertex-centric模式,edge上面没有对应任何计算。
- A typical Pregel computation consists of input, when the graph is initialized, followed by a sequence of supersteps sep-arated by global synchronization points until the algorithm terminates, and finishing with output.(有一个全局的同步点机制,每轮迭代都会进行同步)
- Within each superstep the vertices compute in parallel, each executing the same user-defined function that expresses the logic of a given algorithm. A vertex can modify its state or that of its outgoing edges, receive messages sent to it in the previous superstep, send messages to other vertices (to be received in the next superstep), or even mutate the topology of the graph.(在每一个superstep里面每个节点都会指定自定义函数,节点可以修改自身状态以及出口,接受上一步产生的信息并且写新的消息到其他节点,甚至修改全局拓扑)。 #note: 修改全局拓扑似乎是一个比较难实现的东西
- Algorithm termination is based on every vertex voting to halt. In superstep 0, every vertex is in the active state; all active vertices participate in the computation of any given superstep. A vertex deactivates itself by voting to halt. This means that the vertex has no further work to do unless trig-gered externally, and the Pregel framework will not execute that vertex in subsequent supersteps unless it receives a mes-sage. If reactivated by a message, a vertex must explicitly deactivate itself again. The algorithm as a whole terminates when all vertices are simultaneously inactive and there are no messages in transit.(起初每个节点都是active的,节点变为inactive需要自己vote to halt,这样节点下次就不会参与计算了。但是如果其他节点向这个节点发送消息的话,那么又会从inactive变为active状态。因此pregel的终止状态是所有节点都已经vote to halt并且没有任何message流动)
- The output of a Pregel program is the set of values ex-plicitly output by the vertices. It is often a directed graph isomorphic to the input, but this is not a necessary prop-erty of the system because vertices and edges can be added and removed during computation. A clustering algorithm, for example, might generate a small set of disconnected ver-tices selected from a large graph. A graph mining algorithm might simply output aggregated statistics mined from the graph.(通常pregel的输出是是一个图,每个节点上面都有对应的value,图和原图是同构的。但是也不排除一些算法会修改图的结构,比如簇集算法或者图挖掘算法等)
- We chose a pure message passing model, omitting remote reads and other ways of emulating shared memory, for two reasons. (选择消息传递模型而不是模拟共享内存的方式,主要有下面两点原因)
- First, message passing is sufficiently expressive that there is no need for remote reads. We have not found any graph algorithms for which message passing is insufficient. (够用)
- Second, this choice is better for performance. In a cluster environment, reading a value from a remote machine in-curs high latency that can’t easily be hidden. Our message passing model allows us to amortize latency by delivering messages asynchronously in batches.(能够隐藏一些延迟,比如能够通过batch方式,并且进行异步发送)
- Graph algorithms can be written as a series of chained MapReduce invocations [11, 30]. We chose a different model for reasons of usability and performance. (虽然pregel可以使用系列的MR完成,但是考虑可用性以及性能使用另外模型):
- Pregel keeps ver-tices and edges on the machine that performs computation, and uses network transfers only for messages. (所有节点和边都是保存好的)
- MapReduce, however, is essentially functional, so expressing a graph algo-rithm as a chained MapReduce requires passing the entire state of the graph from one stage to the next—in general requiring much more communication and associated serial-ization overhead(虽然MR是函数式的,但是每次都需要传递整个graph数据结构,这样会给通信以及序列话带来很多额外的开销)
- In addition, the need to coordinate the steps of a chained MapReduce adds programming complex-ity that is avoided by Pregel’s iteration over supersteps.(使用MR还需要其他协调机制来保证同步,而pregel可以通过基于superstep的iteration来解决)
5. The C++ API
template <typename VertexValue, typename EdgeValue, typename MessageValue> class Vertex { public: virtual void Compute(MessageIterator* msgs) = 0; // 可以迭代发送给这个节点的消息 const string& vertex_id() const; // 自身 int64 superstep() const; // 迭代次数 const VertexValue& GetValue(); // 自身hold value,custom。 VertexValue* MutableValue(); OutEdgeIterator GetOutEdgeIterator(); // out going vertices。这个也算是vertex hold value,builtin。 void SendMessageTo(const string& dest_vertex, // 给某个节点发送消息 const MessageValue& message); void VoteToHalt(); // vote to halt };
- The values associated with the vertex and its edges are the only per-vertex state that persists across supersteps. Lim-iting the graph state managed by the framework to a single value per vertex or edge simplifies the main computation cycle, graph distribution, and failure recovery.(一个节点持有的value以及对应的edges是每次superstep需要保存的东西,这样可以简化框架所需要完成的工作,包括计算,图分布以及故障恢复)
5.1. Message Passing
When the destination vertex of any message does not ex-ist, we execute user-defined handlers. A handler could, for example, create the missing vertex or remove the dangling edge from its source vertex.
5.2. Combiners
- 这个概念和MR里面非常类似,能够有效地减少数据传输量。我猜想combiner工作集合应该是节点发送给某个节点所有消息。
- 其实对于incoming messages也可以进行combine,虽然这样减少不了传输大小,但是可以减少保存消息的大小。
- If the user has provided a Combiner (Section 3.2), it is applied when messages are added to the outgoing message queue and when they are received at the incoming message queue. The latter does not reduce network usage, but does reduce the space needed to store messages.
- There are no guarantees about which (if any) messages are combined, the groupings presented to the combiner, or the order of combining, so combiners should only be enabled for commutative and associative operations.(对于combiner不应该有太多的假设,包括消息个数,顺序以及如何进行group的,因此combiner本身的计算应该满足交换律和结合律)
5.3. Aggregators
- Pregel aggregators are a mechanism for global communica-tion, monitoring, and data. Each vertex can provide a value to an aggregator in superstep S, the system combines those values using a reduction operator, and the resulting value is made available to all vertices in superstep S + 1. (通过将S的所有节点数据全部聚合起来,然后在S+1散播给所有的节点)
- Aggregators can be used for statistics. For instance, a sum aggregator applied to the out-degree of each vertex yields the total number of edges in the graph. More complex reduction operators can generate histograms of a statistic.(统计使用,比如计算所有节点的出度,或者是更加复杂的聚合操作可以产生统计指标的直方图等)
- Aggregators can also be used for global coordination. For instance, one branch of Compute() can be executed for the supersteps until an and aggregator determines that all ver-tices satisfy some condition, and then another branch can be executed until termination. A min or max aggregator, ap-plied to the vertex ID, can be used to select a vertex to play a distinguished role in an algorithm.(全局协调作用。一个节点可以单独选举出来作为其他作用,比如在Vertex id上面做min/max操作来选择一个节点)
- To define a new aggregator, a user subclasses the pre-defined Aggregator class, and specifies how the aggregated value is initialized from the first input value and how mul-tiple partially aggregated values are reduced to one. Aggre-gation operators should be commutative and associative.(定义一个聚合类非常简单,但是需要注意的是所有input的顺序以及group方式都不确定,和combiner类似,应该满足结合律和交换律)
- By default an aggregator only reduces input values from a single superstep, but it is also possible to define a sticky aggregator that uses input values from all supersteps. This is useful, for example, for maintaining a global edge count that is adjusted only when edges are added or removed.(默认的聚合操作是针对一个superstep完成的,如何设置成为sticky bit的话,那么这个聚合操作就可以一直存在收集所有superstep的输入)
5.4. Topology Mutations
- Multiple vertices may issue conflicting requests in the same superstep (e.g., two requests to add a vertex V , with dif-ferent initial values). We use two mechanisms to achieve determinism: partial ordering and handlers.(拓扑结构的修改在同一个superstep可能会出现冲突,比如用一个value创建两个不同的节点),我们通过下面两个机制达到确定性:偏序和handlers
- 所谓偏序就是定义所有操作的顺序:
- As with messages, mutations become effective in the su-perstep after the requests were issued. #todo: 对于所有的mutations操作都是在request发起之后都会立刻生效?
- Within that super-step removals are performed first, with edge removal before vertex removal, since removing a vertex implicitly removes all of its out-edges. Additions follow removals, with ver-tex addition before edge addition,(在一个superstep里面来说,remove首先执行,首先是edge removal,然后是vertex removal,addition后执行,首先是vertex addition,然后是edge addition)
- and all mutations precede calls to Compute(). This partial ordering yields determinis-tic results for most conflicts. #todo: 本次的修改会在下次的compute之前生效? 因为按照我们写程序的习惯来说,肯定是一边compute一边计算需要删除和增加哪些顶点和边的
- The remaining conflicts are resolved by user-defined han-dlers. If there are multiple requests to create the same vertex in the same superstep, then by default the system just picks one arbitrarily, but users with special needs may specify a better conflict resolution policy by defining an appropriate handler method in their Vertex subclass. The same handler mechanism is used to resolve conflicts caused by multiple vertex removal requests, or by multiple edge addition or re-moval requests. We delegate the resolution to handlers to keep the code of Compute() simple, which limits the inter-action between a handler and Compute(), but has not been an issue in practice.(如果依然出现冲突的话,那么系统会选择任意节点处理。但是如果用户指定了handler的话,那么可以由用户自己选择一个节点来处理这个冲突问题。)
- Our coordination mechanism is lazy: global mutations do not require coordination until the point when they are ap-plied. This design choice facilitates stream processing. The intuition is that conflicts involving modification of a vertex V are handled by V itself. (使用懒协调机制,大部分全局修改不需要协调机制只有当交集的时候。这样适合流处理。直觉上面告诉我们,如果对于V修改出现冲突的话,那么应该是V自身进行处理)
- Pregel also supports purely local mutations, i.e., a vertex adding or removing its own outgoing edges or removing it-self. Local mutations cannot introduce conflicts and making them immediately effective simplifies distributed program-ming by using an easier sequential programming semantics(对于local修改比如增加出边和减少出边,或者是直接删除自身,因为没有牵扯到冲突所有修改起来非常简单。本地修改也是立刻生效)
#todo:修改到底是立刻生效,还是仅仅是通知master节点,然后下论生效?partial order是强制还是建议的?
5.5. Input and Output
定义了常用的输入输出格式,也提供了读写接口来进行扩展。
6. Implementation
6.1. Basic architecture
- The Pregel library divides a graph into partitions, each consisting of a set of vertices and all of those vertices’ out-going edges. Assignment of a vertex to a partition depends solely on the vertex ID, which implies it is possible to know which partition a given vertex belongs to even if the vertex is owned by a different machine, or even if the vertex does not yet exist. The default partitioning function is just hash(ID) mod N , where N is the number of partitions, but users can replace it.(pregel library首先将输入切割成为多份称为partition,每个partition应该对应每个调度单位可能对应进程,这样多个节点的处理就分摊到一个进程上面执行了。但是parition算法仅仅是以来与vertex id,因此如何分布事先就可以知道。默认的算法就是取模,但是用户可以进行简单的替换)
- In the absence of faults, the execution of a Pregel program consists of several stages:(如果不考虑出错的情况,那么一个pregel执行过程如下):
- 首先一个单独的进程起来作为master存在,其他worker使用name service来发现master并且汇报自己。(worker的数量就是进程数目,应该也就是partition number)
- master了解到整个partition情况之后,将输入按照partition分布到不同的worker上面去(注意每个worker可能会分配到多个partition)。每个worker都会一些节点的状态以及让这些节点进行compute,同时每个worker也会知道整个图是如何分配的(这个通过partition function也可以知道)。 #note: 应该是为了解决底层通信问题,但是还需要考虑failover的情况。
- 因为输入不一定能够正好分割,因此如果这个输入是remote worker的话,那么这个worker还需要通过消息通过给remote worker。
- 一旦load complete之后,所有的节点都标记为active。
- 然后开始计算,每轮计算的结果都是通过batch聚合并且异步消息传递的,但是每个superstep之间必须同步。每个superstep完成之后,worker都会通知下论有多少激活点。
- 计算完成之后,master会通知worker将结果输出。可能是GFS,也可能是BigTable。
6.2. Fault tolerance
- Fault tolerance is achieved through checkpointing. At the beginning of a superstep, the master instructs the workers to save the state of their partitions to persistent storage, including vertex values, edge values, and incoming messages; the master separately saves the aggregator values.(通过chkp来完成容错的。在superstep之前,master会协调所有的worker将他们的状态进行持久化,包括节点,边以及将要处理的消息等,而master单独保存聚合内容)
- Worker failures are detected using regular “ping” messages that the master issues to workers. If a worker does not receive a ping message after a specified interval, the worker process terminates. If the master does not hear back from a worker, the master marks that worker process as failed.(master通过和worker发送ping心跳来检测worker是否正在正常工作。如果没有检测到的话,那么就认为这个worker失败)
- When one or more workers fail, the current state of the partitions assigned to these workers is lost. The master reas-signs graph partitions to the currently available set of work-ers, and they all reload their partition state from the most recent available checkpoint at the beginning of a superstep S. That checkpoint may be several supersteps earlier than the latest superstep S completed by any partition before the failure, requiring that recovery repeat the missing su-persteps. We select checkpoint frequency based on a mean time to failure model [13, ], balancing checkpoint cost against expected recovery cost. (一旦检测到worker失败的话,那么master会将整个集群回滚。重新对graph进行partition,然后每个节点重新读取chkp,然后从那个superstep开始计算。可能这个superstep早于出现鼓掌时候的superstep) #note: 这点似乎是个比较大的问题,因为只要有单个worker出现问题的话,那么整个集群就要进行回滚
- Confined recovery is under development to improve the cost and latency of recovery. In addition to the basic check-points, the workers also log outgoing messages from their as-signed partitions during graph loading and supersteps. Re-covery is then confined to the lost partitions, which are re-covered from checkpoints. (限制性恢复则是对于上面情况的一个改善,能够改善恢复代价和延迟。在这个情况下面,worker会记录在graph load以及每个superstep出去的信息。这样故障恢复可以仅仅限于挂掉的部分,减少恢复的代价)
- The system recomputes the miss-ing supersteps up to S using logged messages from healthy partitions and recalculated ones from recovering partitions.(挂掉部分在恢复的时候,可以从其他节点读取每个superstep需要处理的消息)
- This approach saves compute resources during recovery by only recomputing lost partitions, and can improve the la-tency of recovery since each worker may be recovering fewer partitions. (只是针对挂掉的部分的错误恢复,并且因为恢复区域面积减少所以故障恢复有更小的延迟)
- Saving the outgoing messages adds overhead, but a typical machine has adequate disk bandwidth to ensure that I/O does not become the bottleneck.(尽管保存输出消息会带来额外的开销,但是只要disk有相对可以的带宽那么IO不是问题)
- Confined recovery requires the user algorithm to be deter-ministic, to avoid inconsistencies due to mixing saved mes-sages from the original execution with new messages from the recovery. Randomized algorithms can be made deter-ministic by seeding a pseudorandom number generator de-terministically based on the superstep and the partition. Nondeterministic algorithms can disable confined recovery and fall back to the basic recovery mechanism.(对于限制恢复的话仅仅适用于确定性的算法,对于随机算法的话可以保存其seed来获得确定性。而对于非确定性算法的话那么只能够使用基本的故障恢复方法)
6.3. Worker implementation
- A worker machine maintains the state of its portion of the graph in memory. Conceptually this can be thought of as a map from vertex ID to the state of each vertex, where the state of each vertex consists of its current value, a list of its outgoing edges (the vertex ID for the edge’s target, and the edge’s current value), a queue containing incoming messages, and a flag specifying whether the vertex is active. (本质上来说一个worker保存的内容就是map,key为vertex id,而value为这个vertex需要保存的状态,包括value,edges,message queue以及active flag)
- When the worker performs a superstep it loops through all vertices and calls Compute(), passing it the current value, an iterator to the incoming messages, and an iterator to the outgoing edges. (在计算的时候,worker只需要遍历每个vertex并且调用相应的compute方法即可)
- For performance reasons, the active vertex flags are stored separately from the incoming message queues. Furthermore, while only a single copy of the vertex and edge values ex-ists, two copies of the active vertex flags and the incoming message queue exist: one for the current superstep and one for the next superstep.(对于active flag以及message queue是分开管理的,并且这两个变量存在两份内容,一份是表示本次superstep结果,一份是表示下次superstep结果)
- While a worker processes its ver-tices in superstep S it is simultaneously, in another thread, receiving messages from other workers executing the same superstep. Since vertices receive messages that were sent in the previous superstep (see Section 2), messages for super-steps S and S + 1 must be kept separate.(在执行superstep S的时候,其他worker也在为下次superstep发送消息,这两个消息必须是区分开来的,因此使用了两个queue来保存)
- Similarly, arrival of a message for a vertex V means that V will be active in the next superstep, not necessarily the current one.(同样因为这个原因,active flag也是分开存储的)
- When Compute() requests sending a message to another vertex, the worker process first determines whether the des-tination vertex is owned by a remote worker machine, or by the same worker that owns the sender. In the remote case the message is buffered for delivery to the destination worker. When the buffer sizes reach a threshold, the largest buffers are asynchronously flushed, delivering each to its des-tination worker as a single network message. In the local case an optimization is possible: the message is placed di-rectly in the destination vertex’s incoming message queue.(发送消息的时候,pregel会确定dest vertex是否在同样一个worker上面,如果是在同一个worker的话,那么直接将消息放到队列里面即可,如果是远程节点的话,那么会进行batch然后异步发送)
- #note: 因为vertex id分配到partiion算法是固定的,而partition分配到那个worker却未知,因此需要有一个服务或者是master知道某个partition是在哪个worker上面的。
6.4. Master implementation
- The master is primarily responsible for coordinating the activities of workers. Each worker is assigned a unique iden-tifier at the time of its registration. The master maintains a list of all workers currently known to be alive, including the worker’s unique identifier, its addressing information, and which portion of the graph it has been assigned.(master为每个worker分配了id,并且保存了那些alive的worker节点信息,包括id,地址信息,以及哪些partition分配在上面)
- The size of the master’s data structures is proportional to the number of partitions, not the number of vertices or edges, so a sin-gle master can coordinate computation for even a very large graph.(因此master数据量大小只是和parition number成比率,因此实际上可以处理非常大的图)
- Most master operations, including input, output, compu-tation, and saving and resuming from checkpoints, are ter-minated at barriers: the master sends the same request to every worker that was known to be alive at the time the op-eration begins, and waits for a response from every worker. If any worker fails, the master enters recovery mode as de-scribed in section 4.2. If the barrier synchronization suc-ceeds, the master proceeds to the next stage. In the case of a computation barrier, for example, the master increments the global superstep index and proceeds to the next super-step.(master在每个superstep之间通过barrier进行协调。在每个superstep之间会向所有的worker发送开始信息,然后等待结束信息,如果worker出现问题的话那么就需要进行回滚。如果同步OK的话,那么增加superstep index进入下论迭代)
- The master also maintains statistics about the progress of computation and the state of the graph, such as the total size of the graph, a histogram of its distribution of out-degrees, the number of active vertices, the timing and message traf-fic of recent supersteps, and the values of all user-defined aggregators. To enable user monitoring, the master runs an HTTP server that displays this information.(master还会保存一些统计信息,比如计算进度,图大小,出度的直方图统计,活跃节点,以及在每个superstep的耗时以及消息传送,以及用户自定义的聚合等。master也提供了HTTP Server来展示这些信息)
6.5. Aggregators
- Each worker maintains a collection of ag-gregator instances, identified by a type name and instance name. When a worker executes a superstep for any partition of the graph, the worker combines all of the values supplied to an aggregator instance into a single local value: an ag-gregator that is partially reduced over all of the worker’s vertices in the partition. (每个worker上面都会维护聚合操作实例。在一个superstep执行的时候会将节点的值进行聚合/考虑到不确定哪一个vertex先完成,因此聚合操作必须满足交换律。而最后聚合操作会聚合所有worker上的value,因此也必须满足结合律)
- reduced over all of the worker’s vertices in the partition. At the end of the superstep work-ers form a tree to reduce partially reduced aggregators into global values and deliver them to the master. We use a tree-based reduction—rather than pipelining with a chain of workers—to parallelize the use of CPU during reduction. (使用tree-based reduction方式而不是pipelining方式可以减少CPU开销以及延迟) #todo: 为什么不直接将所有的value汇报给master然后让master进行聚合呢?为了减少master压力?这个tree-based reduction应该是master协调完成的
- The master sends the global values to all workers at the beginning of the next superstep.(master得到这个global value之后,重新传递给所有的workers,然后进行下轮迭代)
7. Applications
7.1. PageRank
class PageRankVertex : public Vertex<double, void, double> { public: virtual void Compute(MessageIterator* msgs) { if (superstep() >= 1) { double sum = 0; for (; !msgs->Done(); msgs->Next()) sum += msgs->Value(); *MutableValue() = 0.15 / NumVertices() + 0.85 * sum; } if (superstep() < 30) { const int64 n = GetOutEdgeIterator().size(); SendMessageToAllNeighbors(GetValue() / n); } else { VoteToHalt(); } } };
使用pregel描述page-rank算法还是相当直观的:
- 初始权值为 0.15 / NumVertices()
- 另外0.85来自于其他节点的贡献
- 下轮迭代将自己的权值叠加到自己的link上面
- 迭代30轮
8. Experiments
9. Related Work
10. Conclusion and Future Work
- Other usability aspects of Pregel motivated by user experience include a set of status pages with detailed information about the progress of Pregel programs, a unittesting framework, and a single-machine mode which helps with rapid prototyping and debugging.(其他可用性上面还有整个pregel的详细运行信息 via HTTP Server,单测框架,用来方便进行原型开发以及debug的单机框架)
- The performance, scalability, and fault-tolerance of Pregel are already satisfactory for graphs with billions of vertices. (能够应对10亿规模的节点)
- We are investigating techniques for scaling to even larger graphs, such as relaxing the synchronicity of the model to avoid the cost of faster workers having to wait frequently at inter-superstep barriers.(尝试扩展到更大的图计算模型。对于更大的图计算模型的话会在每个superstep耗时过多,可以通过放松强同步这个性质,这样可以使得某些运行很快的worker不必等待)
- Currently the entire computation state resides in RAM. We already spill some data to local disk, and will continue in this direction to enable computations on large graphs when terabytes of main memory are not available.(现在每个节点的内容都存放在RAM里面,后续对于更大的数据会通过溢出到磁盘解决)
- Assigning vertices to machines to minimize inter-machine communication is a challenge. Partitioning of the input graph based on topology may suffice if the topology cor-responds to the message traffic, but it may not. We would like to devise dynamic re-partitioning mechanisms.(有效减少机器之间进行通信是必要的。虽然通过拓扑来进行partition在消息通信热度正好和拓扑类似的情况下面运行良好,但是也存在一些其他情况不那么匹配。如果允许动态进行re-partition的话或许会有更好的性能)
- Pregel is designed for sparse graphs where communica-tion occurs mainly over edges, and we do not expect that focus to change. Although care has been taken to support high fan-out and fan-in traffic, performance will suffer when most vertices continuously send messages to most other ver-tices. However, realistic dense graphs are rare, as are al-gorithms with dense communication over a sparse graph. Some such algorithms can be transformed into more Pregel-friendly variants, for example by using combiners, aggrega-tors, or topology mutations, and of course such computa-tions are difficult for any highly distributed system.(pregel主要关注的是稀疏图算法的处理,而起这个关注点应该是不改变的。虽然考虑到稠密图的处理,但是实际上运行稠密图算法因为消息通信量过大性能依然会存在影响,但是在现实中稠密图以及对应的算法是非常少见的。这些算法通常可以转换成为适合于pregel运行的程序,比如使用combiner以及aggregator,以及拓扑修改,但是无论如何这类算法都是不适合高度分布式的系统)