Trino Hive Partitions 加载过程

Table of Contents

1. 总体流程

Hive parttions加载分为两步:

  • 同步地去获取partition names.
  • 异步/按需根据batch partition names去获取partition values.

Pasted-Image-20241204105245.png

2. GetPartitionNames

在Optimizer阶段同步获取到PartitionNames. 并且也是存储在Cache里面,然后塞到table对象中。这个过程是同步执行的。

Pasted-Image-20241204105307.png

Pasted-Image-20241204105319.png

3. GetPartitionValues

Pasted-Image-20241204105341.png

在下面这个函数里面把 HivePartition变成了Partition对象(包装在HivePartitionMetadata里面),里面的过程大致是

  • 将parttionNames按照exponent进行拆分获取,最大值是100,并且依然是变为iterator.
  • 然后按照batch的方式对parttionNames得到partitionValues, 里面也是有cache来减少调用。
Iterator<HivePartitionMetadata> hivePartitions = getPartitionMetadata(
        session,
        metastore,
        table,
        peekingIterator(partitions),
        bucketHandle.map(HiveBucketHandle::toTableBucketProperty),
        neededColumnNames);

public class HivePartitionMetadata
{
    private final Optional<Partition> partition;
    private final HivePartition hivePartition;
    private final Map<Integer, HiveTypeName> hiveColumnCoercions;
}


Iterator<List<HivePartition>> partitionNameBatches = partitionExponentially(hivePartitions, minPartitionBatchSize, maxPartitionBatchSize);
Iterator<List<HivePartitionMetadata>> partitionBatches = transform(partitionNameBatches, partitionBatch -> {}

最后这个iterator被传入到background hive split loader里面去,所以执行的时候应该是在其他线程池(executor)执行的。这个线程池有1000个线程

private int queryExecutorPoolSize = 1000;

HiveSplitLoader hiveSplitLoader = new BackgroundHiveSplitLoader(
        table,
        hivePartitions,
        hiveTable.getCompactEffectivePredicate(),
        dynamicFilter,
        getDynamicFilteringWaitTimeout(session),
        typeManager,
        createBucketSplitInfo(bucketHandle, bucketFilter),
        session,
        fileSystemFactory,
        transactionalMetadata.getDirectoryLister(),
        executor,
        splitLoaderConcurrency,
        recursiveDfsWalkerEnabled,
        !hiveTable.getPartitionColumns().isEmpty() && isIgnoreAbsentPartitions(session),
        metastore.getValidWriteIds(session, hiveTable)
                .map(value -> value.getTableValidWriteIdList(table.getDatabaseName() + "." + table.getTableName())),
        hiveTable.getMaxScannedFileSize(),
        maxPartitionsPerScan);