Trino Hive Partitions 加载过程
Table of Contents
1. 总体流程
Hive parttions加载分为两步:
- 同步地去获取partition names.
- 异步/按需根据batch partition names去获取partition values.
2. GetPartitionNames
在Optimizer阶段同步获取到PartitionNames. 并且也是存储在Cache里面,然后塞到table对象中。这个过程是同步执行的。
3. GetPartitionValues
在下面这个函数里面把 HivePartition变成了Partition对象(包装在HivePartitionMetadata里面),里面的过程大致是
- 将parttionNames按照exponent进行拆分获取,最大值是100,并且依然是变为iterator.
- 然后按照batch的方式对parttionNames得到partitionValues, 里面也是有cache来减少调用。
Iterator<HivePartitionMetadata> hivePartitions = getPartitionMetadata(
session,
metastore,
table,
peekingIterator(partitions),
bucketHandle.map(HiveBucketHandle::toTableBucketProperty),
neededColumnNames);
public class HivePartitionMetadata
{
private final Optional<Partition> partition;
private final HivePartition hivePartition;
private final Map<Integer, HiveTypeName> hiveColumnCoercions;
}
Iterator<List<HivePartition>> partitionNameBatches = partitionExponentially(hivePartitions, minPartitionBatchSize, maxPartitionBatchSize);
Iterator<List<HivePartitionMetadata>> partitionBatches = transform(partitionNameBatches, partitionBatch -> {}
最后这个iterator被传入到background hive split loader里面去,所以执行的时候应该是在其他线程池(executor)执行的。这个线程池有1000个线程
private int queryExecutorPoolSize = 1000;
HiveSplitLoader hiveSplitLoader = new BackgroundHiveSplitLoader(
table,
hivePartitions,
hiveTable.getCompactEffectivePredicate(),
dynamicFilter,
getDynamicFilteringWaitTimeout(session),
typeManager,
createBucketSplitInfo(bucketHandle, bucketFilter),
session,
fileSystemFactory,
transactionalMetadata.getDirectoryLister(),
executor,
splitLoaderConcurrency,
recursiveDfsWalkerEnabled,
!hiveTable.getPartitionColumns().isEmpty() && isIgnoreAbsentPartitions(session),
metastore.getValidWriteIds(session, hiveTable)
.map(value -> value.getTableValidWriteIdList(table.getDatabaseName() + "." + table.getTableName())),
hiveTable.getMaxScannedFileSize(),
maxPartitionsPerScan);