Trino 查询调度和线程池
Table of Contents
1. 总体流程
这里稍微简单总结一下流程,用户分为两步:
Create Query:
- 创建query, 返回query id, 并且将query 放在dispatch executor里面去进行调度执行。
- Query dispatch executor 下发plan成功之后,就在query的exchange data source里面记录be endpoint.
- Exchange data source内部开始去将http client(be endpoint)放在scheduler executor里面去调度,http client拿到pages之后,缓存在exchange data source里面。
Get Query Result:
- 根据query id找到Query对象,进而找到exchange data source对象。
- 从exchange data source对象里面拿到page, 丢到response executor里面变换成为rows返回。
- 拿到page之后,同时调用exchange data source 的 reschedule接口,让http client去获取更多的pages.
主要有3个线程池:
- Dispatcher线程池,这个用来对query进行调度的。
- Scheduler线程池,这个用来不断对query task endpoint取page的。
- Response线程池,这个用来响应用户查询数据的,主要是处理page->rows的处理。
2. Create Query
QueuedStatementResource
主要过程:
- 执行registerQuery, 创建query对象,然后放在queryManager里面去,返回query id.
- 之后我们就可以通过query id去拿到这个query对象。
- 注意这个时候并没有开始执行,只是创建了query对象。
3. Execute Query
ExecutingStatementResource
3.1. 触发调度
根据query id 去查询query status, 这里同时会去触发调度。
可以看到这个调度是放在 `dispatchExecutor` 这个对象里面去执行的的,这个东西看上去线程池没有上线,但是有queue size上限,也就是超过这么多query的话其实并不会马上执行。
public static final int DISPATCHER_THREADPOOL_MAX_SIZE = max(50, getRuntime().availableProcessors() * 10);
public DispatchExecutor(QueryManagerConfig config, VersionEmbedder versionEmbedder)
{
ExecutorService coreExecutor = newCachedThreadPool(daemonThreadsNamed("dispatcher-query-%s"));
closer.register(coreExecutor::shutdownNow);
executor = decorateWithVersion(coreExecutor, versionEmbedder);
ScheduledExecutorService coreScheduledExecutor = newScheduledThreadPool(config.getQueryManagerExecutorPoolSize(), daemonThreadsNamed("dispatch-executor-%s"));
closer.register(coreScheduledExecutor::shutdownNow);
scheduledExecutor = listeningDecorator(coreScheduledExecutor);
mbeans = new DispatchExecutorMBeans(coreExecutor, coreScheduledExecutor);
}
@Inject
public DispatchManager(
QueryIdGenerator queryIdGenerator,
QueryPreparer queryPreparer,
ResourceGroupManager<?> resourceGroupManager,
DispatchQueryFactory dispatchQueryFactory,
FailedDispatchQueryFactory failedDispatchQueryFactory,
AccessControl accessControl,
SessionSupplier sessionSupplier,
SessionPropertyDefaults sessionPropertyDefaults,
SessionPropertyManager sessionPropertyManager,
Tracer tracer,
QueryManagerConfig queryManagerConfig,
DispatchExecutor dispatchExecutor,
QueryMonitor queryMonitor)
{
this.queryIdGenerator = requireNonNull(queryIdGenerator, "queryIdGenerator is null");
this.queryPreparer = requireNonNull(queryPreparer, "queryPreparer is null");
this.resourceGroupManager = requireNonNull(resourceGroupManager, "resourceGroupManager is null");
this.dispatchQueryFactory = requireNonNull(dispatchQueryFactory, "dispatchQueryFactory is null");
this.failedDispatchQueryFactory = requireNonNull(failedDispatchQueryFactory, "failedDispatchQueryFactory is null");
this.accessControl = requireNonNull(accessControl, "accessControl is null");
this.sessionSupplier = requireNonNull(sessionSupplier, "sessionSupplier is null");
this.sessionPropertyDefaults = requireNonNull(sessionPropertyDefaults, "sessionPropertyDefaults is null");
this.sessionPropertyManager = sessionPropertyManager;
this.tracer = requireNonNull(tracer, "tracer is null");
this.maxQueryLength = queryManagerConfig.getMaxQueryLength();
this.dispatchExecutor = new BoundedExecutor(dispatchExecutor.getExecutor(), queryManagerConfig.getDispatcherQueryPoolSize());
this.queryTracker = new QueryTracker<>(queryManagerConfig, dispatchExecutor.getScheduledExecutor());
this.queryMonitor = requireNonNull(queryMonitor, "queryMonitor is null");
this.statsUpdaterExecutor = newSingleThreadScheduledExecutor(daemonThreadsNamed("dispatch-manager-stats-%s"));
}
3.2. 等待Response
这里可以看到触发调度之后,还回等待执行结果,以及检查超时。
- 检查超时是在 timeoutExecutor
- 获取结果是在 responseExecutor(实际没有得到任何结果,只是reponse OK)
- 将结果变为response 是在 directExecutor 执行
trino里面非常多地使用了future特性,并且支持在不同的线程池上执行。 分阶段事件驱动架构 - 维基百科 — Staged event-driven architecture - Wikipedia
如果是null的话,那么后面流程就不在处理。这里就会返回null. 拿不到任何数据。如果拿不到任何数据,就要等待more response这个部分的流程。
UPDATE: 但是我进入这个函数似乎里面没有设置任何data, 可能只设置了一个token的东西,所以主要获取数据还是在接下来这个部分。
3.3. 等待更多Response
private void asyncQueryResults(
Query query,
long token,
Duration maxWait,
DataSize targetResultSize,
ExternalUriInfo externalUriInfo,
DisconnectionAwareAsyncResponse asyncResponse)
按照gpt的解释,这个东西可以按需进行获取,并且控制每次targetResultSize. 但是线程池模型和上面一样,也包括responseExecutor和directExecutor这两类。 这种多轮获取还有好处可能就是可以追踪进度了。
用途描述
- 异步获取查询结果:
- 该接口允许客户端通过提供 queryId 和其他标识符(如 slug 和 token),异步拉取查询的结果。
- 查询结果并不是通过一个请求返回的,而是通过多次调用该接口分批获取。
- 支持长轮询机制:
- maxWait 参数允许客户端指定最长等待时间(例如 1 秒),Coordinator 会在指定时间内等待查询结果可用。
- 如果结果尚未生成且超时,客户端需再次调用此接口。
- 动态控制结果大小:
- targetResultSize 参数让客户端动态调整期望的结果集大小,用于优化网络传输和结果消费的性能。
- 检查查询状态:
- 如果查询已失败或被取消,该接口可以返回相应状态,让客户端感知查询状态的变化。
- 接口通过验证 slug 和 token,确保只有正确的客户端能获取对应查询的结果。
4. asyncQueryResults
如果继续看 `asyncQueryResults` 代码的话,主要是调用waitForResults. 这个函数做几件事情:
- 创建futureStateChange ( `getFutureStateChange` ). 这个表示有数据到来
- 同时把这个future设置timeout exception避免太长
- 如果有数据来了的话,那么在 `resultsProcessorExecutor` 这个executor上执行。 (其实这个executor就是上面写的reponseExecutor).
private void asyncQueryResults(
Query query,
long token,
Duration maxWait,
DataSize targetResultSize,
ExternalUriInfo externalUriInfo,
DisconnectionAwareAsyncResponse asyncResponse)
{
Duration wait = WAIT_ORDERING.min(MAX_WAIT_TIME, maxWait);
if (targetResultSize == null) {
targetResultSize = DEFAULT_TARGET_RESULT_SIZE;
}
else {
targetResultSize = Ordering.natural().min(targetResultSize, MAX_TARGET_RESULT_SIZE);
}
ListenableFuture<QueryResultsResponse> queryResultsFuture = query.waitForResults(token, externalUriInfo, wait, targetResultSize);
ListenableFuture<Response> response = Futures.transform(queryResultsFuture, this::toResponse, directExecutor());
bindDisconnectionAwareAsyncResponse(asyncResponse, response, responseExecutor);
}
public ListenableFuture<QueryResultsResponse> waitForResults(long token, ExternalUriInfo externalUriInfo, Duration wait, DataSize targetResultSize)
{
ListenableFuture<Void> futureStateChange;
synchronized (this) {
// before waiting, check if this request has already been processed and cached
Optional<QueryResults> cachedResult = getCachedResult(token);
if (cachedResult.isPresent()) {
return immediateFuture(toResultsResponse(cachedResult.get()));
}
// release the lock eagerly after acquiring the future to avoid contending with callback threads
futureStateChange = getFutureStateChange();
}
// wait for a results data or query to finish, up to the wait timeout
if (!futureStateChange.isDone()) {
futureStateChange = addTimeout(futureStateChange, () -> null, wait, timeoutExecutor);
}
// when state changes, fetch the next result
return Futures.transform(futureStateChange, _ -> getNextResult(token, externalUriInfo, targetResultSize), resultsProcessorExecutor);
}
这个ResponseExecutor定义在这里
@Provides
@Singleton
@ForStatementResource
public static BoundedExecutor createStatementResponseExecutor(@ForStatementResource ExecutorService coreExecutor, TaskManagerConfig config)
{
return new BoundedExecutor(coreExecutor, config.getHttpResponseThreads());
}
4.1. getFutureStateChange
其实这里创建的元素,就是看 `exchangeDataSourceBlocked` 上是否有数据。至于这个future,则是看 `exchangeDataSource` 上是不是还有数据。
private synchronized ListenableFuture<Void> getFutureStateChange()
{
// if the exchange client is open, wait for data
if (!exchangeDataSource.isFinished()) {
if (exchangeDataSourceBlocked != null && !exchangeDataSourceBlocked.isDone()) {
return exchangeDataSourceBlocked;
}
ListenableFuture<Void> blocked = exchangeDataSource.isBlocked();
if (blocked.isDone()) {
// not blocked
return immediateVoidFuture();
}
// cache future to avoid accumulation of callbacks on the underlying future
exchangeDataSourceBlocked = ignoreCancellation(blocked);
return exchangeDataSourceBlocked;
}
exchangeDataSourceBlocked = null;
if (!resultsConsumed) {
return immediateVoidFuture();
}
// otherwise, wait for the query to finish
queryManager.recordHeartbeat(queryId);
try {
return queryDoneFuture(queryManager.getQueryState(queryId));
}
catch (NoSuchElementException e) {
return immediateVoidFuture();
}
}
4.2. getNextResult
这个函数里面有个主要的调用
private synchronized QueryResultRows removePagesFromExchange(ResultQueryInfo queryInfo, long targetResultBytes)
调用这个函数的时候,说明exchangeDataSource里面其实是有数据的,那么只需要pollPage就OK了。并且在这里做deserialize Page.
try {
long bytes = 0;
while (bytes < targetResultBytes) {
Slice serializedPage = exchangeDataSource.pollPage();
if (serializedPage == null) {
break;
}
Page page = deserializer.deserialize(serializedPage);
bytes += page.getLogicalSizeInBytes();
resultBuilder.addPage(page);
}
if (exchangeDataSource.isFinished()) {
exchangeDataSource.close();
deserializer = null; // null to reclaim memory of PagesSerde which does not expose explicit lifecycle
}
}
catch (Throwable cause) {
queryManager.failQuery(queryId, cause);
}
5. ExchangeDataSource
ExchangeDataSource 这个对象是个接口,我觉得可以理解为buffer queue. 但是注意这里exchange input其实是task endpoint, 真正在等待获取数据是在poll page这里执行。注意这里exchange data source加入的是task endpoint, 但是出来的其实是page.
那么如何是用be endpoint -> page, 这个被封装在了实现里面,被 `directExchangeClientSupplier` 来处理。这个在PollPage这节有说。
public interface ExchangeDataSource
extends Closeable
{
Slice pollPage();
boolean isFinished();
ListenableFuture<Void> isBlocked();
void addInput(ExchangeInput input);
void noMoreInputs();
OperatorInfo getInfo();
@Override
void close();
}
public class DirectExchangeInput
implements ExchangeInput
{
private static final int INSTANCE_SIZE = instanceSize(DirectExchangeInput.class);
private final TaskId taskId;
private final String location;
@JsonCreator
public DirectExchangeInput(
@JsonProperty("taskId") TaskId taskId,
@JsonProperty("location") String location)
{
this.taskId = requireNonNull(taskId, "taskId is null");
this.location = requireNonNull(location, "location is null");
}
ExchangeDataSource exchangeDataSource = new LazyExchangeDataSource(
session.getQueryId(),
new ExchangeId("query-results-exchange-" + session.getQueryId()),
session.getQuerySpan(),
directExchangeClientSupplier,
new SimpleLocalMemoryContext(newSimpleAggregatedMemoryContext(), Query.class.getSimpleName()),
queryManager::outputTaskFailed,
getRetryPolicy(session),
exchangeManagerRegistry);
5.1. AddInput
AddInput是将ExchangeInput加入到exchangeSource里面去,这个通常是在创建query阶段就完成了。
@Override
public void taskCreated(PlanFragmentId fragmentId, RemoteTask task)
{
URI taskUri = uriBuilderFrom(task.getTaskStatus().getSelf())
.appendPath("results")
.appendPath("0").build();
DirectExchangeInput input = new DirectExchangeInput(task.getTaskId(), taskUri.toString());
queryStateMachine.updateInputsForQueryResults(ImmutableList.of(input), false);
}
private void fireStateChangedIfReady(Optional<QueryOutputInfo> info, Optional<Consumer<QueryOutputInfo>> listener)
{
if (info.isEmpty() || listener.isEmpty()) {
return;
}
executor.execute(() -> listener.get().accept(info.get()));
}
private synchronized void setQueryOutputInfo(QueryExecution.QueryOutputInfo outputInfo)
{
// if first callback, set column names
if (columns == null) {
List<String> columnNames = outputInfo.getColumnNames();
List<Type> columnTypes = outputInfo.getColumnTypes();
checkArgument(columnNames.size() == columnTypes.size(), "Column names and types size mismatch");
ImmutableList.Builder<Column> list = ImmutableList.builder();
for (int i = 0; i < columnNames.size(); i++) {
list.add(createColumn(columnNames.get(i), columnTypes.get(i), supportsParametricDateTime));
}
columns = list.build();
types = outputInfo.getColumnTypes();
}
outputInfo.drainInputs(exchangeDataSource::addInput);
if (outputInfo.isNoMoreInputs()) {
exchangeDataSource.noMoreInputs();
}
}
5.2. PollPage
如果将ExchangeInput变为Page. 里面有个 `DirectExchangeClient` 类来做处理。这个类内部有个方法:相当于会不断地调用client去获取pages, 并且添加到buffer里面。如果buffer里面足够的话,那么就不会进行scheudle.
synchronized int scheduleRequestIfNecessary()
{
if ((buffer.isFinished() || buffer.isFailed()) && completedClients.size() == allClients.size()) {
return 0;
}
long neededBytes = buffer.getRemainingCapacityInBytes();
if (neededBytes <= 0) {
return 0;
}
long reservedBytesForScheduledClients = allClients.values().stream()
.filter(client -> !queuedClients.contains(client) && !completedClients.contains(client))
.mapToLong(HttpPageBufferClient::getAverageRequestSizeInBytes)
.sum();
long projectedBytesToBeRequested = 0;
int clientCount = 0;
Iterator<HttpPageBufferClient> clientIterator = queuedClients.iterator();
while (clientIterator.hasNext()) {
HttpPageBufferClient client = clientIterator.next();
if (projectedBytesToBeRequested >= neededBytes * concurrentRequestMultiplier - reservedBytesForScheduledClients) {
break;
}
projectedBytesToBeRequested += client.getAverageRequestSizeInBytes();
client.scheduleRequest();
// Remove the client from the queuedClient's set.
clientIterator.remove();
clientCount++;
}
return clientCount;
}
每个client都会被放在一个schedule executor里面去执行,这个是全局的。
@Provides
@Singleton
@ForStatementResource
public static ScheduledExecutorService createStatementTimeoutExecutor(TaskManagerConfig config)
{
return newScheduledThreadPool(config.getHttpTimeoutThreads(), daemonThreadsNamed("statement-timeout-%s"));
}