Presto SQL on Everything

Presto使用情况,看上去这个生态系统还是很不错的:

Presto is an open-source distributed SQL query engine that has run in production at Facebook since 2013 and is used today by several large companies, including Uber, Netflix, Airbnb, Bloomberg, and LinkedIn. Organizations such as Qubole, Treasure Data, and Starburst Data have commercial offerings based on Presto. The Amazon Athena1 interactive querying service is built on Presto. With over a hundred contributors on GitHub, Presto has a strong open source community.

Presto is designed to be adaptive, flexible, and extensible. It provides an ANSI SQL interface to query data stored in Hadoop environments, open-source and proprietary RDBMSs, NoSQL systems, and stream processing systems such as Kafka. A ‘Generic RPC’2 connector makes adding a SQL interface to proprietary systems as easy as implementing a half dozen RPC endpoints. Presto exposes an open HTTP API, ships with JDBC support, and is compatible with sev- eral industry-standard business intelligence (BI) and query authoring tools. The built-in Hive connector can natively read from and write to distributed file systems such as HDFS and Amazon S3; and supports several popular open-source file formats including ORC, Parquet, and Avro.


2018年Presto已经在FB内部大规模使用,归类为下面几种场景:

As of late 2018, Presto is responsible for supporting much of the SQL analytic workload at Facebook, including interac- tive/BI queries and long-running batch extract-transform-load (ETL) jobs. In addition, Presto powers several end-user facing analytics tools, serves high performance dashboards, provides a SQL interface to multiple internal NoSQL systems, and supports Facebook’s A/B testing infrastructure. In aggregate, Presto processes hundreds of petabytes of data and quadrillions of rows per day at Facebook.

Presto has several notable characteristics:

Interactive Analytics 多租户交互式查询

Batch ETL 批量ETL任务

A/B Testing AB测试结果

Developer/Advertiser Analytics 开发者/广告主分析


Pasted-Image-20231225103214.png

运行过程是:

每种不同的数据源定义为Connector, 包含4个部分API:

查询优化使用的是Cascade/CBO框架,结合表/列统计数据选择join策略以及join顺序

We are in the process of enhancing the optimizer to perform a more comprehensive exploration of the search space using a cost-based evaluation of plans based on the techniques introduced by the Cascades framework [13]. However, Presto already supports two cost-based optimizations that take table and column statistics into account - join strategy selection and join re-ordering.

Data Layout API 可以汇报数据分布属性:分区, 排序,聚合以及索引等信息。优化器可以根据这些属性,选择出最佳的执行方案;如果不采用最自然的数据分布方式,在上面就要做enforce势必涉及data shuffle等操作,当然这些操作都是纳入到CBO框架下的。

The optimizer can take advantage of the physical layout of the data when it is provided by the connector Data Layout API. Connectors report locations and other data properties such as partitioning, sorting, grouping, and indices. Connectors can return multiple layouts for a single table, each with different properties, and the optimizer can select the most efficient layout for the query [15] [19]. This functionality is used by administrators operating clusters for the Developer/Advertiser Analytics use case; it enables them to optimize new query shapes simply by adding physical layouts. We will see some of the ways the engine can take advantage of these properties in the subsequent sections.

整个Plan其实是被划分成为多个Stages的,每个Stages里面又有多个Tasks,这些Tasks之间是pipeline串联起来的。Tasks之前传输是IPC,而Stages之间传输则涉及到网络传输和data shuffle, 有较大的memory和CPU开销。规划器会根据data layout properties以及node properties,来尽量减少data shuffle的次数。如果connector在data layout API中暴露出,如果某个属性是indices的话,并且在join的时候使用到的话,那么可以使用index-nested-loop join. (是不是基本可以认为这个connector给出的是事实表?)

Part of the optimization process involves identifying parts of the plan that can be executed in parallel across workers. These parts are known as ‘stages’, and every stage is distributed to one or more tasks, each of which execute the same computation on different sets of input data. The engine inserts buffered in-memory data transfers (shuffles) between stages to enable data exchange. Shuffles add latency, use up buffer memory, and have high CPU overhead. Therefore, the optimizer must reason carefully about the total number of shuffles introduced into the plan. Figure 3 shows how a na ̈ıve implementation would partition a plan into stages and connect them using shuffles.

Data Layout Properties : The physical data layout can be used by the optimizer to minimize the number of shuffles in the plan. This is very useful in the A/B Testing use case, where almost every query requires a large join to produce experiment details or population information. The engine takes advantage of the fact that both tables participating in the join are partitioned on the same column, and uses a co-located join strategy to eliminate a resource-intensive shuffle.

If connectors expose a data layout in which join columns are marked as indices, the optimizer is able to determine if using an index nested loop join would be an appropriate strategy. This can make it extremely efficient to operate on normalized data stored in a data warehouse by joining against production data stores (key-value or otherwise). This is a commonly used feature in the Interactive Analytics use case.

Node Properties : Like connectors, nodes in the plan tree can express properties of their outputs (i.e. the partitioning, sorting, bucketing, and grouping characteristics of the data) [24]. These nodes have the ability to also express required and preferred properties, which are taken into account when introducing shuffles. Redundant shuffles are simply elided, but in other cases the properties of the shuffle can be changed to reduce the number of shuffles required. Presto greedily selects partitioning that will satisfy as many required properties as possible to reduce shuffles. This means that the optimizer may choose to partition on fewer columns, which in some cases can result in greater partition skew.


调度(Scheduling)上Presto有好几个调度:Stage Scheduling, Task Scheduling Split Scheudling.

Stage Scheduling就是两种all-at-once和phased. all-at-once就是一次性将所有的stage执行都规划好,丢到下面的workers上去执行;而phased则是阶段性地执行stages. all-at-once好处就是延迟比较低,而phased在则比较适合批量对延迟要求不高可能耗费资源比较大的任务。

Presto将Tasks拆分成为chain of operators, 然后放在pipeline框架下面去调度,好处是调度策略可控,并且能实现资源隔离。Task也分Leaf Stage和Non-Leaf Stage, 对于Leaf Stage要求就是尽可能地离数据源近一些,而Non-Left Stage只是根据网络拓扑结构进行安排。

Split Scheduling则是如何下发Splits到Leaf Tasks上。Coordinator将Data Source拆分成为splits之后,并不是直接就丢给tasks的,也是使用类似pipeline的方式进行下发的:coordinator通过connector每次拿回一批splits, 然后通过RPC交给tasks执行,并且在后续交互过程中不断地feed splits. 我觉得这个倒是很不错的注意:


Query Execution 查询执行

Data Source API里面每次返回的数据叫做Page(按照列式编码的多行数据, 每一列对应的数据是Block);节点之间的数据交换是通过long-polling http来完成的,优化销量数据传输减少延迟。

数据传输上的流控机制有点意思,这个流控最终是影响到到splits -> tasks上的速度,整个流控是通过coordinator来完成的。

The engine tunes parallelism to maintain target utilization rates for output and input buffers. Full output buffers cause split execution to stall and use up valuable memory, while un- derutilized input buffers add unnecessary processing overhead.

The engine continuously monitors the output buffer utiliza- tion. When utilization is consistently high, it lowers effective concurrency by reducing the number of splits eligible to be run. This has the effect of increasing fairness in sharing of net- work resources. It is also an important efficiency optimization when dealing with clients (either end-users or other workers) that are unable to consume data at the rate it is being produced. Without this functionality, slow clients running complex multi- stage queries could hold tens of gigabytes worth of buffer memory for long periods of time. This scenario is common even when a small amount of result data (∼10-50MB) is being downloaded by a BI or query authoring tool over slow connections in the Interactive Analytics use case.

On the receiver side, the engine monitors the moving aver- age of data transferred per request to compute a target HTTP request concurrency that keeps the input buffers populated while not exceeding their capacity. This backpressure causes upstream tasks to slow down as their buffers fill up.


Resouce Management 资源管理

整个资源隔离性几乎都是通过CPU调度来完成的:memory, IO上实践上都不太好控制。presto会记录每个task的运行时间,然后在某些事件时(比如output buf full或者是input buf empty,或者是时间片到)将这个task切换出去。然后presto维护一个多级反馈队列,将CPU使用时间划分成为多个级别,然后将task放在对应的队列中,每个级别可以使用CPU时间是不同的,高优先级处理那些累计执行时间比较短的任务。

内存分配上划分成为total memory =(user + system memory). User memory就是用户代码可以使用,而System memory则是系统为了维护某些状态比如shuffle buffer. user memory上限分为per-node limit和global limit(across nodes). 一旦某个节点使用内存快到上限的时候,就会动用reserved pool:只让占用内存最多的一个query先跑完,其他query全部都挂起。 当使用内存最多的query跑完之后,内存空余出来后,再执行之前挂起的query. 快接近OOM的时候使用spilling机制非常诱人,presto也实现了它,但是在Facebook部署环境上并没有使用。我想到大家可以忍受任务早点挂掉,然后忍受不了任务跑出了比平时长很多的时间,latency predicability也是很重要的。

Fault-Tolerance只能做到集群级别,对于Query级别不太好做到partial-recovery计算量有点太高,重试/重新计算代价还更小。


Query Processing Optimizations 查询优化

Java & JVM:

Code Generation: 直接生成bytecode


Performance 性能分析

TPCDS 30TB, 100节点,28core, 2.4GHz, 1.6TB SSD, 256GB 内存

Every query is run with three settings on a 100-node test cluster: (1) Data stored in Raptor with table shards randomly distributed between nodes. (2) Data stored in Hive/HDFS with no statistics. (3) Data stored in Hive/HDFS along with table and column statistics. Presto’s optimizer can make cost- based decisions about join order and join strategy when these statistics are available. Every node is configured with a 28- core IntelTM XeonTM E5-2680 v4 CPU running at 2.40GHz, 1.6TB of flash storage and 256GB of DDR4 RAM.

Pasted-Image-20231225103555.png

各种Case的延迟分布,如果以75%为界限的话

Pasted-Image-20231225104147.png

Pasted-Image-20231225104754.png

资源管理上,下图是Interactive Analytics的4小时资源使用记录,并发查询最高点是44,而CPU使用率可以始终保持在90%左右。(单纯地从CPU利用率上看是很不错的,但是前提必须是CPU完全合理,如果a lot of waste那么就比较糟糕了,没有办法反应真实情况)

Pasted-Image-20231225103237.png