Trino 查询调度和线程池

Table of Contents

1. 总体流程

Pasted-Image-20241202173541.png

这里稍微简单总结一下流程,用户分为两步:

Create Query:

  • 创建query, 返回query id, 并且将query 放在dispatch executor里面去进行调度执行。
  • Query dispatch executor 下发plan成功之后,就在query的exchange data source里面记录be endpoint.
  • Exchange data source内部开始去将http client(be endpoint)放在scheduler executor里面去调度,http client拿到pages之后,缓存在exchange data source里面。

Get Query Result:

  • 根据query id找到Query对象,进而找到exchange data source对象。
  • 从exchange data source对象里面拿到page, 丢到response executor里面变换成为rows返回。
  • 拿到page之后,同时调用exchange data source 的 reschedule接口,让http client去获取更多的pages.

主要有3个线程池:

  1. Dispatcher线程池,这个用来对query进行调度的。
  2. Scheduler线程池,这个用来不断对query task endpoint取page的。
  3. Response线程池,这个用来响应用户查询数据的,主要是处理page->rows的处理。

2. Create Query

QueuedStatementResource

主要过程:

  • 执行registerQuery, 创建query对象,然后放在queryManager里面去,返回query id.
  • 之后我们就可以通过query id去拿到这个query对象。
  • 注意这个时候并没有开始执行,只是创建了query对象。

Pasted-Image-20241202173704.png

3. Execute Query

ExecutingStatementResource

3.1. 触发调度

根据query id 去查询query status, 这里同时会去触发调度。

Pasted-Image-20241202173744.png

Pasted-Image-20241202173752.png

Pasted-Image-20241202173805.png

可以看到这个调度是放在 `dispatchExecutor` 这个对象里面去执行的的,这个东西看上去线程池没有上线,但是有queue size上限,也就是超过这么多query的话其实并不会马上执行。

public static final int DISPATCHER_THREADPOOL_MAX_SIZE = max(50, getRuntime().availableProcessors() * 10);

public DispatchExecutor(QueryManagerConfig config, VersionEmbedder versionEmbedder)
{
    ExecutorService coreExecutor = newCachedThreadPool(daemonThreadsNamed("dispatcher-query-%s"));
    closer.register(coreExecutor::shutdownNow);
    executor = decorateWithVersion(coreExecutor, versionEmbedder);

    ScheduledExecutorService coreScheduledExecutor = newScheduledThreadPool(config.getQueryManagerExecutorPoolSize(), daemonThreadsNamed("dispatch-executor-%s"));
    closer.register(coreScheduledExecutor::shutdownNow);
    scheduledExecutor = listeningDecorator(coreScheduledExecutor);

    mbeans = new DispatchExecutorMBeans(coreExecutor, coreScheduledExecutor);
}

@Inject
public DispatchManager(
        QueryIdGenerator queryIdGenerator,
        QueryPreparer queryPreparer,
        ResourceGroupManager<?> resourceGroupManager,
        DispatchQueryFactory dispatchQueryFactory,
        FailedDispatchQueryFactory failedDispatchQueryFactory,
        AccessControl accessControl,
        SessionSupplier sessionSupplier,
        SessionPropertyDefaults sessionPropertyDefaults,
        SessionPropertyManager sessionPropertyManager,
        Tracer tracer,
        QueryManagerConfig queryManagerConfig,
        DispatchExecutor dispatchExecutor,
        QueryMonitor queryMonitor)
{
    this.queryIdGenerator = requireNonNull(queryIdGenerator, "queryIdGenerator is null");
    this.queryPreparer = requireNonNull(queryPreparer, "queryPreparer is null");
    this.resourceGroupManager = requireNonNull(resourceGroupManager, "resourceGroupManager is null");
    this.dispatchQueryFactory = requireNonNull(dispatchQueryFactory, "dispatchQueryFactory is null");
    this.failedDispatchQueryFactory = requireNonNull(failedDispatchQueryFactory, "failedDispatchQueryFactory is null");
    this.accessControl = requireNonNull(accessControl, "accessControl is null");
    this.sessionSupplier = requireNonNull(sessionSupplier, "sessionSupplier is null");
    this.sessionPropertyDefaults = requireNonNull(sessionPropertyDefaults, "sessionPropertyDefaults is null");
    this.sessionPropertyManager = sessionPropertyManager;
    this.tracer = requireNonNull(tracer, "tracer is null");

    this.maxQueryLength = queryManagerConfig.getMaxQueryLength();

    this.dispatchExecutor = new BoundedExecutor(dispatchExecutor.getExecutor(), queryManagerConfig.getDispatcherQueryPoolSize());

    this.queryTracker = new QueryTracker<>(queryManagerConfig, dispatchExecutor.getScheduledExecutor());
    this.queryMonitor = requireNonNull(queryMonitor, "queryMonitor is null");
    this.statsUpdaterExecutor = newSingleThreadScheduledExecutor(daemonThreadsNamed("dispatch-manager-stats-%s"));
}

3.2. 等待Response

这里可以看到触发调度之后,还回等待执行结果,以及检查超时。

  • 检查超时是在 timeoutExecutor
  • 获取结果是在 responseExecutor(实际没有得到任何结果,只是reponse OK)
  • 将结果变为response 是在 directExecutor 执行

trino里面非常多地使用了future特性,并且支持在不同的线程池上执行。 分阶段事件驱动架构 - 维基百科 — Staged event-driven architecture - Wikipedia

Pasted-Image-20241202173837.png

如果是null的话,那么后面流程就不在处理。这里就会返回null. 拿不到任何数据。如果拿不到任何数据,就要等待more response这个部分的流程。

UPDATE: 但是我进入这个函数似乎里面没有设置任何data, 可能只设置了一个token的东西,所以主要获取数据还是在接下来这个部分。

3.3. 等待更多Response

Pasted-Image-20241202173859.png

private void asyncQueryResults(
        Query query,
        long token,
        Duration maxWait,
        DataSize targetResultSize,
        ExternalUriInfo externalUriInfo,
        DisconnectionAwareAsyncResponse asyncResponse)

按照gpt的解释,这个东西可以按需进行获取,并且控制每次targetResultSize. 但是线程池模型和上面一样,也包括responseExecutor和directExecutor这两类。 这种多轮获取还有好处可能就是可以追踪进度了。

用途描述

  1. 异步获取查询结果:
    • 该接口允许客户端通过提供 queryId 和其他标识符(如 slug 和 token),异步拉取查询的结果。
    • 查询结果并不是通过一个请求返回的,而是通过多次调用该接口分批获取。
  2. 支持长轮询机制:
    • maxWait 参数允许客户端指定最长等待时间(例如 1 秒),Coordinator 会在指定时间内等待查询结果可用。
    • 如果结果尚未生成且超时,客户端需再次调用此接口。
  3. 动态控制结果大小:
    • targetResultSize 参数让客户端动态调整期望的结果集大小,用于优化网络传输和结果消费的性能。
  4. 检查查询状态:
    • 如果查询已失败或被取消,该接口可以返回相应状态,让客户端感知查询状态的变化。
    • 接口通过验证 slug 和 token,确保只有正确的客户端能获取对应查询的结果。

4. asyncQueryResults

如果继续看 `asyncQueryResults` 代码的话,主要是调用waitForResults. 这个函数做几件事情:

  • 创建futureStateChange ( `getFutureStateChange` ). 这个表示有数据到来
  • 同时把这个future设置timeout exception避免太长
  • 如果有数据来了的话,那么在 `resultsProcessorExecutor` 这个executor上执行。 (其实这个executor就是上面写的reponseExecutor).
private void asyncQueryResults(
        Query query,
        long token,
        Duration maxWait,
        DataSize targetResultSize,
        ExternalUriInfo externalUriInfo,
        DisconnectionAwareAsyncResponse asyncResponse)
{
    Duration wait = WAIT_ORDERING.min(MAX_WAIT_TIME, maxWait);
    if (targetResultSize == null) {
        targetResultSize = DEFAULT_TARGET_RESULT_SIZE;
    }
    else {
        targetResultSize = Ordering.natural().min(targetResultSize, MAX_TARGET_RESULT_SIZE);
    }
    ListenableFuture<QueryResultsResponse> queryResultsFuture = query.waitForResults(token, externalUriInfo, wait, targetResultSize);

    ListenableFuture<Response> response = Futures.transform(queryResultsFuture, this::toResponse, directExecutor());

    bindDisconnectionAwareAsyncResponse(asyncResponse, response, responseExecutor);
}

public ListenableFuture<QueryResultsResponse> waitForResults(long token, ExternalUriInfo externalUriInfo, Duration wait, DataSize targetResultSize)
{
    ListenableFuture<Void> futureStateChange;
    synchronized (this) {
        // before waiting, check if this request has already been processed and cached
        Optional<QueryResults> cachedResult = getCachedResult(token);
        if (cachedResult.isPresent()) {
            return immediateFuture(toResultsResponse(cachedResult.get()));
        }
        // release the lock eagerly after acquiring the future to avoid contending with callback threads
        futureStateChange = getFutureStateChange();
    }

    // wait for a results data or query to finish, up to the wait timeout
    if (!futureStateChange.isDone()) {
        futureStateChange = addTimeout(futureStateChange, () -> null, wait, timeoutExecutor);
    }
    // when state changes, fetch the next result
    return Futures.transform(futureStateChange, _ -> getNextResult(token, externalUriInfo, targetResultSize), resultsProcessorExecutor);
}

这个ResponseExecutor定义在这里

@Provides
@Singleton
@ForStatementResource
public static BoundedExecutor createStatementResponseExecutor(@ForStatementResource ExecutorService coreExecutor, TaskManagerConfig config)
{
    return new BoundedExecutor(coreExecutor, config.getHttpResponseThreads());
}

4.1. getFutureStateChange

其实这里创建的元素,就是看 `exchangeDataSourceBlocked` 上是否有数据。至于这个future,则是看 `exchangeDataSource` 上是不是还有数据。

private synchronized ListenableFuture<Void> getFutureStateChange()
{
    // if the exchange client is open, wait for data
    if (!exchangeDataSource.isFinished()) {
        if (exchangeDataSourceBlocked != null && !exchangeDataSourceBlocked.isDone()) {
            return exchangeDataSourceBlocked;
        }
        ListenableFuture<Void> blocked = exchangeDataSource.isBlocked();
        if (blocked.isDone()) {
            // not blocked
            return immediateVoidFuture();
        }
        // cache future to avoid accumulation of callbacks on the underlying future
        exchangeDataSourceBlocked = ignoreCancellation(blocked);
        return exchangeDataSourceBlocked;
    }
    exchangeDataSourceBlocked = null;

    if (!resultsConsumed) {
        return immediateVoidFuture();
    }

    // otherwise, wait for the query to finish
    queryManager.recordHeartbeat(queryId);
    try {
        return queryDoneFuture(queryManager.getQueryState(queryId));
    }
    catch (NoSuchElementException e) {
        return immediateVoidFuture();
    }
}

4.2. getNextResult

这个函数里面有个主要的调用

private synchronized QueryResultRows removePagesFromExchange(ResultQueryInfo queryInfo, long targetResultBytes)

调用这个函数的时候,说明exchangeDataSource里面其实是有数据的,那么只需要pollPage就OK了。并且在这里做deserialize Page.

try {
    long bytes = 0;
    while (bytes < targetResultBytes) {
        Slice serializedPage = exchangeDataSource.pollPage();
        if (serializedPage == null) {
            break;
        }

        Page page = deserializer.deserialize(serializedPage);
        bytes += page.getLogicalSizeInBytes();
        resultBuilder.addPage(page);
    }
    if (exchangeDataSource.isFinished()) {
        exchangeDataSource.close();
        deserializer = null; // null to reclaim memory of PagesSerde which does not expose explicit lifecycle
    }
}
catch (Throwable cause) {
    queryManager.failQuery(queryId, cause);
}

5. ExchangeDataSource

ExchangeDataSource 这个对象是个接口,我觉得可以理解为buffer queue. 但是注意这里exchange input其实是task endpoint, 真正在等待获取数据是在poll page这里执行。注意这里exchange data source加入的是task endpoint, 但是出来的其实是page.

那么如何是用be endpoint -> page, 这个被封装在了实现里面,被 `directExchangeClientSupplier` 来处理。这个在PollPage这节有说。

public interface ExchangeDataSource
        extends Closeable
{
    Slice pollPage();

    boolean isFinished();

    ListenableFuture<Void> isBlocked();

    void addInput(ExchangeInput input);

    void noMoreInputs();

    OperatorInfo getInfo();

    @Override
    void close();
}


public class DirectExchangeInput
        implements ExchangeInput
{
    private static final int INSTANCE_SIZE = instanceSize(DirectExchangeInput.class);

    private final TaskId taskId;
    private final String location;

    @JsonCreator
    public DirectExchangeInput(
            @JsonProperty("taskId") TaskId taskId,
            @JsonProperty("location") String location)
    {
        this.taskId = requireNonNull(taskId, "taskId is null");
        this.location = requireNonNull(location, "location is null");
    }
ExchangeDataSource exchangeDataSource = new LazyExchangeDataSource(
        session.getQueryId(),
        new ExchangeId("query-results-exchange-" + session.getQueryId()),
        session.getQuerySpan(),
        directExchangeClientSupplier,
        new SimpleLocalMemoryContext(newSimpleAggregatedMemoryContext(), Query.class.getSimpleName()),
        queryManager::outputTaskFailed,
        getRetryPolicy(session),
        exchangeManagerRegistry);

5.1. AddInput

AddInput是将ExchangeInput加入到exchangeSource里面去,这个通常是在创建query阶段就完成了。

@Override
public void taskCreated(PlanFragmentId fragmentId, RemoteTask task)
{
    URI taskUri = uriBuilderFrom(task.getTaskStatus().getSelf())
            .appendPath("results")
            .appendPath("0").build();
    DirectExchangeInput input = new DirectExchangeInput(task.getTaskId(), taskUri.toString());
    queryStateMachine.updateInputsForQueryResults(ImmutableList.of(input), false);
}

private void fireStateChangedIfReady(Optional<QueryOutputInfo> info, Optional<Consumer<QueryOutputInfo>> listener)
{
    if (info.isEmpty() || listener.isEmpty()) {
        return;
    }
    executor.execute(() -> listener.get().accept(info.get()));
}

private synchronized void setQueryOutputInfo(QueryExecution.QueryOutputInfo outputInfo)
{
    // if first callback, set column names
    if (columns == null) {
        List<String> columnNames = outputInfo.getColumnNames();
        List<Type> columnTypes = outputInfo.getColumnTypes();
        checkArgument(columnNames.size() == columnTypes.size(), "Column names and types size mismatch");

        ImmutableList.Builder<Column> list = ImmutableList.builder();
        for (int i = 0; i < columnNames.size(); i++) {
            list.add(createColumn(columnNames.get(i), columnTypes.get(i), supportsParametricDateTime));
        }
        columns = list.build();
        types = outputInfo.getColumnTypes();
    }

    outputInfo.drainInputs(exchangeDataSource::addInput);
    if (outputInfo.isNoMoreInputs()) {
        exchangeDataSource.noMoreInputs();
    }
}

5.2. PollPage

如果将ExchangeInput变为Page. 里面有个 `DirectExchangeClient` 类来做处理。这个类内部有个方法:相当于会不断地调用client去获取pages, 并且添加到buffer里面。如果buffer里面足够的话,那么就不会进行scheudle.

synchronized int scheduleRequestIfNecessary()
{
    if ((buffer.isFinished() || buffer.isFailed()) && completedClients.size() == allClients.size()) {
        return 0;
    }

    long neededBytes = buffer.getRemainingCapacityInBytes();
    if (neededBytes <= 0) {
        return 0;
    }

    long reservedBytesForScheduledClients = allClients.values().stream()
            .filter(client -> !queuedClients.contains(client) && !completedClients.contains(client))
            .mapToLong(HttpPageBufferClient::getAverageRequestSizeInBytes)
            .sum();
    long projectedBytesToBeRequested = 0;
    int clientCount = 0;

    Iterator<HttpPageBufferClient> clientIterator = queuedClients.iterator();
    while (clientIterator.hasNext()) {
        HttpPageBufferClient client = clientIterator.next();
        if (projectedBytesToBeRequested >= neededBytes * concurrentRequestMultiplier - reservedBytesForScheduledClients) {
            break;
        }
        projectedBytesToBeRequested += client.getAverageRequestSizeInBytes();

        client.scheduleRequest();

        // Remove the client from the queuedClient's set.
        clientIterator.remove();

        clientCount++;
    }

    return clientCount;
}

每个client都会被放在一个schedule executor里面去执行,这个是全局的。

@Provides
@Singleton
@ForStatementResource
public static ScheduledExecutorService createStatementTimeoutExecutor(TaskManagerConfig config)
{
    return newScheduledThreadPool(config.getHttpTimeoutThreads(), daemonThreadsNamed("statement-timeout-%s"));
}