Presto查询计划

Presto查询计划

Presto执行计划-生成计划

阅读源码时梳理逻辑用,无阅读价值

在创建 dispatchQuery 的最后,返回了一个LocalDispatchQuery,其构造函数最后一个参数调用了 SqlQueryExecution 的start()方法

SqlQueryManager
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
return new LocalDispatchQuery(
stateMachine,
queryExecutionFuture,
queryMonitor,
clusterSizeMonitor,
executor,
queryManager::createQuery);

@Override
public void createQuery(QueryExecution queryExecution)
{
requireNonNull(queryExecution, "queryExecution is null");

if (!queryTracker.addQuery(queryExecution)) {
throw new TrinoException(GENERIC_INTERNAL_ERROR, format("Query %s already registered", queryExecution.getQueryId()));
}

queryExecution.addFinalQueryInfoListener(finalQueryInfo -> {
// execution MUST be added to the expiration queue or there will be a leak
queryTracker.expireQuery(queryExecution.getQueryId());
});

queryExecution.start();
}

@Override
public void start()
{
try (SetThreadName ignored = new SetThreadName("Query-%s", stateMachine.getQueryId())) {
try {
if (!stateMachine.transitionToPlanning()) {
// query already started or finished
return;
}

AtomicReference<Thread> planningThread = new AtomicReference<>(currentThread());
stateMachine.getStateChange(PLANNING).addListener(() -> {
if (stateMachine.getQueryState() == FAILED) {
synchronized (this) {
Thread thread = planningThread.get();
if (thread != null) {
thread.interrupt();
}
}
}
}, directExecutor());

try {
// 执行计划 分析方法入口
PlanRoot plan = planQuery();
// DynamicFilterService needs plan for query to be registered.
// Query should be registered before dynamic filter suppliers are requested in distribution planning.
registerDynamicFilteringQuery(plan);
planDistribution(plan);
}
finally {
synchronized (this) {
planningThread.set(null);
// Clear the interrupted flag in case there was a race condition where
// the planning thread was interrupted right after planning completes above
Thread.interrupted();
}
}

if (!stateMachine.transitionToStarting()) {
// query already started or finished
return;
}

// if query is not finished, start the scheduler, otherwise cancel it
SqlQueryScheduler scheduler = queryScheduler.get();

if (!stateMachine.isDone()) {
scheduler.start();
}
}
catch (Throwable e) {
fail(e);
throwIfInstanceOf(e, Error.class);
}
}
}

private PlanRoot planQuery()
{
try {
return doPlanQuery();
}
catch (StackOverflowError e) {
throw new TrinoException(NOT_SUPPORTED, "statement is too large (stack overflow during analysis)", e);
}
}

private PlanRoot doPlanQuery()
{
// plan query
PlanNodeIdAllocator idAllocator = new PlanNodeIdAllocator();
LogicalPlanner logicalPlanner = new LogicalPlanner(stateMachine.getSession(),
planOptimizers,
idAllocator,
metadata,
typeOperators,
new TypeAnalyzer(sqlParser, metadata),
statsCalculator,
costCalculator,
stateMachine.getWarningCollector());
// 生成 查询执行计划,其中analysis 是在SqlQueryExecution构造方法中实例化的
Plan plan = logicalPlanner.plan(analysis);
queryPlan.set(plan);

// fragment the plan
SubPlan fragmentedPlan = planFragmenter.createSubPlans(stateMachine.getSession(), plan, false, stateMachine.getWarningCollector());

// extract inputs
List<Input> inputs = new InputExtractor(metadata, stateMachine.getSession()).extractInputs(fragmentedPlan);
stateMachine.setInputs(inputs);

stateMachine.setOutput(analysis.getTarget());

boolean explainAnalyze = analysis.getStatement() instanceof Explain && ((Explain) analysis.getStatement()).isAnalyze();
return new PlanRoot(fragmentedPlan, !explainAnalyze);
}
io.trino.execution.SqlQueryExecution
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// analyze query
this.analysis = analyze(preparedQuery, stateMachine, metadata, groupProvider, accessControl, sqlParser, queryExplainer, warningCollector);

private Analysis analyze(
PreparedQuery preparedQuery,
QueryStateMachine stateMachine,
Metadata metadata,
GroupProvider groupProvider,
AccessControl accessControl,
SqlParser sqlParser,
QueryExplainer queryExplainer,
WarningCollector warningCollector)
{
stateMachine.beginAnalysis();

requireNonNull(preparedQuery, "preparedQuery is null");
// new 一个 语义分析器
Analyzer analyzer = new Analyzer(
stateMachine.getSession(),
metadata,
sqlParser,
groupProvider,
accessControl,
Optional.of(queryExplainer),
preparedQuery.getParameters(),
parameterExtractor(preparedQuery.getStatement(), preparedQuery.getParameters()),
warningCollector,
statsCalculator);
// 对statement 进行语义分析,得到 io.trino.sql.analyzer.Analysis
Analysis analysis = analyzer.analyze(preparedQuery.getStatement());

stateMachine.setUpdateType(analysis.getUpdateType());
stateMachine.setReferencedTables(analysis.getReferencedTables());
stateMachine.setRoutines(analysis.getRoutines());

stateMachine.endAnalysis();

return analysis;
}

analyzer.analyze(statement)方法,主要是构造了一个StatementAnalyzer对传入的statement 进行分析,将分析结果存入Analysis中返回

Statement语义分析

StatementAnalyzer 是对statement 进行语义分析的类,针对不同的statement实现类进行不同的语义分析:

io.trino.sql.analyzer.Analyzer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public Analysis analyze(Statement statement)
{
return analyze(statement, false);
}

public Analysis analyze(Statement statement, boolean isDescribe)
{
// StatementRewrite 会对部分sql 进行重写,将其转化为一个标准的sql
// 比如,show tables 会转换为: select table_name as Table from catalogname.schema order by table_name
// 主要有5种类型的statement会被转换为sql
// DescribeInputRewrite、DescribeOutputRewrite
// ShowQueriesRewrite、ShowStatsRewrite、ExplainRewrite
Statement rewrittenStatement = StatementRewrite.rewrite(session, metadata, sqlParser, queryExplainer, statement, parameters, parameterLookup, groupProvider, accessControl, warningCollector, statsCalculator);
// 非标的sql 转成标准sql 后就进行 StatementAnalyzer 分析
// Visitor 继承自 AstVisitor
Analysis analysis = new Analysis(rewrittenStatement, parameterLookup, isDescribe);
StatementAnalyzer analyzer = new StatementAnalyzer(analysis, metadata, sqlParser, groupProvider, accessControl, session, warningCollector, CorrelationSupport.ALLOWED);
analyzer.analyze(rewrittenStatement, Optional.empty());

// check column access permissions for each table
analysis.getTableColumnReferences().forEach((accessControlInfo, tableColumnReferences) ->
tableColumnReferences.forEach((tableName, columns) ->
accessControlInfo.getAccessControl().checkCanSelectFromColumns(
accessControlInfo.getSecurityContext(session.getRequiredTransactionId(), session.getQueryId()),
tableName,
columns)));
return analysis;
}

上述该方法主要先对statement进行rewrite,对非标准命令转换为sql指令,然后构造了一个StatementAnalyzer对传入的statement进行分析,将结果存储在Analysis中返回

提前重写statement类型包括:

  • 包括show xxx系列: 比如show catalogs/show schemas/show tables/show functions/show columns/show session(当前连接会话的配置)
  • SHOW STATS FOR table|query: 查看表格/查询的统计信息,例如:SHOW STATS FOR ( SELECT * FROM table [ WHERE condition ] )
  • explain sql : 查看sql 执行计划,包含两种类型,Logical逻辑计划 和 Distributed分布式计划
  • DescribeInputRewrite和DescribeOutputRewrite ,desc 命令?没看出来干嘛的….

StatementAnalyzer 是对Statment进行语义分析的类,针对不同的Statement实现类进行语义分析,具体实现在内部类Visitor,该类的类视图如下所示:

观察 visitXxx方法,返回的是Scope,Scope主要包括:

Scope
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class Scope
{
private final Optional<Scope> parent; // 上级 Scope
private final boolean queryBoundary; //查询边界
private final RelationId relationId; //
private final RelationType relation; //包含了 Field 列表
private final Map<String, WithQuery> namedQueries; // with query
//......

public class RelationType
{
private final List<Field> visibleFields; // 可见的Fields
private final List<Field> allFields; // 所有Fields

private final Map<Field, Integer> fieldIndexes; // Field index
//......

public class Field
{
private final Optional<QualifiedObjectName> originTable; // 源表
private final Optional<String> originColumnName; // 源列名
private final Optional<QualifiedName> relationAlias; // 字段别名
private final Optional<String> name; //字段名称
private final Type type; // 字段类型
private final boolean hidden; // 字段是否隐藏
private final boolean aliased; // 字段是否有别名

可以看出,Scope 包含了一系列的Field,一个field 表示对一个字段的描述,每个field 包含源表、源列名、别名、字段名、字段类型、是否隐藏、是否有别名

对于select 语句(包括StatementRewrite处理过的),返回的scope 包含取到的每一列,每一个field 表示一列;

对于insert语句、delete和create Table as select 语句,返回的scope 只有一列,表示语句执行所操作的行数

StatementAnalyzer的Visitor的visitXxx方法针对不同的statement实现类做了处理,以下针对不同的statement分别进行拆解:

1.元信息相关的命令分析

Statement rewrittenStatement = StatementRewrite.rewrite()环节,已经将所有的 Show Tables、 sho w schemas、show catalogs、 show columns、show functions、show partitions、show session转换为了标准的select,其中用到的库是 information_schema

presto 在启动时为每一个Catalog都建立了一个information_schema库,共6张表,

  • __internal_functions__:存储了Presto内部注册的所有函数 (trino356 中已经没有了)
  • __internal_partitions: 存储了当前Catalog下所有分区表的分区信息 (trino356 中已经没有了)
  • columns: 存储了当亲啊Catalog下所有表的列信息
  • schemata:存储了当前Catalog下所有的库信息
  • tables: 存储了当前Catalog下所有的表信息
  • views: 存储了当前Catalog下所有的视图信息

trino356 版本中多了5张表,,但很多Connector 并不支持:

  • applicable_roles: 适用的角色
  • enabled_roles:启用的角色
  • role_authorization_descriptors: 角色 授权
  • roles:角色列表
  • table_privileges : 表权限
use 命令

use命令是用来切换 catalog 和schema的,只在Cli 连接时才有用,JDBC等其他方式不支持该语法。比如 use hive.default 。在Visitor 中是 visitUse()方法

2 Create View

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Override
protected Scope visitCreateView(CreateView node, Optional<Scope> scope)
{
QualifiedObjectName viewName = createQualifiedObjectName(session, node, node.getName());

// analyze the query that creates the view
StatementAnalyzer analyzer = new StatementAnalyzer(analysis, metadata, sqlParser, groupProvider, accessControl, session, warningCollector, CorrelationSupport.ALLOWED);

Scope queryScope = analyzer.analyze(node.getQuery(), scope);

accessControl.checkCanCreateView(session.toSecurityContext(), viewName);
// 确保查询语句中的每一列都有列名且列名唯一
validateColumns(node, queryScope.getRelationType());
// 修改analysis的updateType为 "CREATE VIEW"
analysis.setUpdateType(
"CREATE VIEW",
viewName,
Optional.empty(),
Optional.of(queryScope.getRelationType().getVisibleFields().stream()
.map(this::createOutputColumn)
.collect(toImmutableList())));
// 封装成scope 返回
return createAndAssignScope(node, scope);
}

并不是所有的数据源都支持Create View 操作

3 Insert

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
@Override
protected Scope visitInsert(Insert insert, Optional<Scope> scope)
{
QualifiedObjectName targetTable = createQualifiedObjectName(session, insert, insert.getTarget());

if (metadata.getMaterializedView(session, targetTable).isPresent()) {
throw semanticException(NOT_SUPPORTED, insert, "Inserting into materialized views is not supported");
}

if (metadata.getView(session, targetTable).isPresent()) {
throw semanticException(NOT_SUPPORTED, insert, "Inserting into views is not supported");
}
// 从当前insert statement 中分析得到 queryScope(每列的相关信息)
// analyze the query that creates the data
Scope queryScope = analyze(insert.getQuery(), createScope(scope));

// verify the insert destination columns match the query
Optional<TableHandle> targetTableHandle = metadata.getTableHandle(session, targetTable);
if (targetTableHandle.isEmpty()) {
throw semanticException(TABLE_NOT_FOUND, insert, "Table '%s' does not exist", targetTable);
}
accessControl.checkCanInsertIntoTable(session.toSecurityContext(), targetTable);

if (!accessControl.getRowFilters(session.toSecurityContext(), targetTable).isEmpty()) {
throw semanticException(NOT_SUPPORTED, insert, "Insert into table with a row filter is not supported");
}
// ======以下为找出插入的列=======
TableSchema tableSchema = metadata.getTableSchema(session, targetTableHandle.get());
// =========获取 table 的非隐藏列名
List<ColumnSchema> columns = tableSchema.getColumns().stream()
.filter(column -> !column.isHidden())
.collect(toImmutableList());

for (ColumnSchema column : columns) {
if (!accessControl.getColumnMasks(session.toSecurityContext(), targetTable, column.getName(), column.getType()).isEmpty()) {
throw semanticException(NOT_SUPPORTED, insert, "Insert into table with column masks is not supported");
}
}

List<String> tableColumns = columns.stream()
.map(ColumnSchema::getName)
.collect(toImmutableList());
// ========= 分析目标table的layout(从connector 实现的metadataAPI拿)
// ========= 判断获得的tableColumns是否包含所有的 分区列
// analyze target table layout, table columns should contain all partition columns
Optional<NewTableLayout> newTableLayout = metadata.getInsertLayout(session, targetTableHandle.get());
newTableLayout.ifPresent(layout -> {
if (!ImmutableSet.copyOf(tableColumns).containsAll(layout.getPartitionColumns())) {
throw new TrinoException(NOT_SUPPORTED, "INSERT must write all distribution columns: " + layout.getPartitionColumns());
}
});
// ========= 找出插入的列
List<String> insertColumns;
if (insert.getColumns().isPresent()) {
insertColumns = insert.getColumns().get().stream()
.map(Identifier::getValue)
.map(column -> column.toLowerCase(ENGLISH))
.collect(toImmutableList());

Set<String> columnNames = new HashSet<>();
for (String insertColumn : insertColumns) {
// insert的列包含在表格列中
if (!tableColumns.contains(insertColumn)) {
throw semanticException(COLUMN_NOT_FOUND, insert, "Insert column name does not exist in target table: %s", insertColumn);
}
// 无重复列判断
if (!columnNames.add(insertColumn)) {
throw semanticException(DUPLICATE_COLUMN_NAME, insert, "Insert column name is specified more than once: %s", insertColumn);
}
}
}
else {
insertColumns = tableColumns;
}
// 从metadata 中获取columnHandles
Map<String, ColumnHandle> columnHandles = metadata.getColumnHandles(session, targetTableHandle.get());
// 将insert 内容包装到 当前analysis 中
analysis.setInsert(new Analysis.Insert(
targetTableHandle.get(),
insertColumns.stream().map(columnHandles::get).collect(toImmutableList()),
newTableLayout));
// 校验查询语句和目标表的每一列对应的类型是否相同,
// tableTypes是根据tableSchema中获得,queryTypes是analysis 方法分析得出的
List<Type> tableTypes = insertColumns.stream()
.map(insertColumn -> tableSchema.getColumn(insertColumn).getType())
.collect(toImmutableList());

List<Type> queryTypes = queryScope.getRelationType().getVisibleFields().stream()
.map(Field::getType)
.collect(toImmutableList());

if (!typesMatchForInsert(tableTypes, queryTypes)) {
throw semanticException(TYPE_MISMATCH,
insert,
"Insert query has mismatched column types: Table: [%s], Query: [%s]",
Joiner.on(", ").join(tableTypes),
Joiner.on(", ").join(queryTypes));
}
// 合并两个流(列名 和 列类型) 组成 Column Stream
Stream<Column> columnStream = Streams.zip(
insertColumns.stream(),
tableTypes.stream()
.map(Type::toString),
Column::new);

analysis.setUpdateType(
"INSERT",
targetTable,
Optional.empty(),
Optional.of(Streams.zip(// 合并两个流(Column 和 列类型) 组成 Column Stream
columnStream,
queryScope.getRelationType().getVisibleFields().stream(),
(column, field) -> new OutputColumn(column, analysis.getSourceColumns(field)))
.collect(toImmutableList())));
// 包装成Scope 返回
return createAndAssignScope(insert, scope, Field.newUnqualified("rows", BIGINT));
}

Query

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
@Override
protected Scope visitQuery(Query node, Optional<Scope> scope)
{
// 分析 with as 语句
Scope withScope = analyzeWith(node, scope);
Scope queryBodyScope = process(node.getQueryBody(), withScope);

List<Expression> orderByExpressions = emptyList();
if (node.getOrderBy().isPresent()) {
// 表达式分析
orderByExpressions = analyzeOrderBy(node, getSortItemsFromOrderBy(node.getOrderBy()), queryBodyScope);

if (queryBodyScope.getOuterQueryParent().isPresent() && node.getLimit().isEmpty() && node.getOffset().isEmpty()) {
// not the root scope and ORDER BY is ineffective
analysis.markRedundantOrderBy(node.getOrderBy().get());
warningCollector.add(new TrinoWarning(REDUNDANT_ORDER_BY, "ORDER BY in subquery may have no effect"));
}
}
analysis.setOrderByExpressions(node, orderByExpressions);

if (node.getOffset().isPresent()) {
analyzeOffset(node.getOffset().get(), queryBodyScope);
}

if (node.getLimit().isPresent()) {
boolean requiresOrderBy = analyzeLimit(node.getLimit().get(), queryBodyScope);
if (requiresOrderBy && node.getOrderBy().isEmpty()) {
throw semanticException(MISSING_ORDER_BY, node.getLimit().get(), "FETCH FIRST WITH TIES clause requires ORDER BY");
}
}

// Input fields == Output fields
analysis.setSelectExpressions(
node,
descriptorToFields(queryBodyScope).stream()
.map(expression -> new SelectExpression(expression, Optional.empty()))
.collect(toImmutableList()));

Scope queryScope = Scope.builder()
.withParent(withScope)
.withRelationType(RelationId.of(node), queryBodyScope.getRelationType())
.build();

analysis.setScope(node, queryScope);
return queryScope;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
@Override
protected Scope visitQuerySpecification(QuerySpecification node, Optional<Scope> scope)
{
// TODO: extract candidate names from SELECT, WHERE, HAVING, GROUP BY and ORDER BY expressions
// to pass down to analyzeFrom

Scope sourceScope = analyzeFrom(node, scope);

analyzeWindowDefinitions(node, sourceScope);
resolveFunctionCallWindows(node);

node.getWhere().ifPresent(where -> analyzeWhere(node, sourceScope, where));

List<Expression> outputExpressions = analyzeSelect(node, sourceScope);
GroupingSetAnalysis groupByAnalysis = analyzeGroupBy(node, sourceScope, outputExpressions);
analyzeHaving(node, sourceScope);

Scope outputScope = computeAndAssignOutputScope(node, scope, sourceScope);

List<Expression> orderByExpressions = emptyList();
Optional<Scope> orderByScope = Optional.empty();
if (node.getOrderBy().isPresent()) {
OrderBy orderBy = node.getOrderBy().get();
orderByScope = Optional.of(computeAndAssignOrderByScope(orderBy, sourceScope, outputScope));

orderByExpressions = analyzeOrderBy(node, orderBy.getSortItems(), orderByScope.get());

if (sourceScope.getOuterQueryParent().isPresent() && node.getLimit().isEmpty() && node.getOffset().isEmpty()) {
// not the root scope and ORDER BY is ineffective
analysis.markRedundantOrderBy(orderBy);
warningCollector.add(new TrinoWarning(REDUNDANT_ORDER_BY, "ORDER BY in subquery may have no effect"));
}
}
analysis.setOrderByExpressions(node, orderByExpressions);

if (node.getOffset().isPresent()) {
analyzeOffset(node.getOffset().get(), outputScope);
}

if (node.getLimit().isPresent()) {
boolean requiresOrderBy = analyzeLimit(node.getLimit().get(), outputScope);
if (requiresOrderBy && node.getOrderBy().isEmpty()) {
throw semanticException(MISSING_ORDER_BY, node.getLimit().get(), "FETCH FIRST WITH TIES clause requires ORDER BY");
}
}

List<Expression> sourceExpressions = new ArrayList<>();
analysis.getSelectExpressions(node).stream()
.map(SelectExpression::getExpression)
.forEach(sourceExpressions::add);
node.getHaving().ifPresent(sourceExpressions::add);
for (WindowDefinition windowDefinition : node.getWindows()) {
WindowSpecification window = windowDefinition.getWindow();
sourceExpressions.addAll(window.getPartitionBy());
getSortItemsFromOrderBy(window.getOrderBy()).stream()
.map(SortItem::getSortKey)
.forEach(sourceExpressions::add);
window.getFrame()
.map(WindowFrame::getStart)
.flatMap(FrameBound::getValue)
.ifPresent(sourceExpressions::add);
window.getFrame()
.flatMap(WindowFrame::getEnd)
.flatMap(FrameBound::getValue)
.ifPresent(sourceExpressions::add);
}

analyzeGroupingOperations(node, sourceExpressions, orderByExpressions);
analyzeAggregations(node, sourceScope, orderByScope, groupByAnalysis, sourceExpressions, orderByExpressions);
analyzeWindowFunctions(node, outputExpressions, orderByExpressions);

if (analysis.isAggregation(node) && node.getOrderBy().isPresent()) {
ImmutableList.Builder<Expression> aggregates = ImmutableList.<Expression>builder()
.addAll(groupByAnalysis.getOriginalExpressions())
.addAll(extractAggregateFunctions(orderByExpressions, metadata))
.addAll(extractExpressions(orderByExpressions, GroupingOperation.class));

analysis.setOrderByAggregates(node.getOrderBy().get(), aggregates.build());
}

if (node.getOrderBy().isPresent() && node.getSelect().isDistinct()) {
verifySelectDistinct(node, orderByExpressions, outputExpressions, sourceScope, orderByScope.get());
}

return outputScope;
}

Plan plan = logicalPlanner.plan(analysis); 一直跟进去,最终的执行计划生成见下面的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public Plan plan(Analysis analysis, Stage stage, boolean collectPlanStatistics)
{
PlanNode root = planStatement(analysis, analysis.getStatement());

planSanityChecker.validateIntermediatePlan(root, session, metadata, typeOperators, typeAnalyzer, symbolAllocator.getTypes(), warningCollector);

if (stage.ordinal() >= OPTIMIZED.ordinal()) {
for (PlanOptimizer optimizer : planOptimizers) {
root = optimizer.optimize(root, session, symbolAllocator.getTypes(), symbolAllocator, idAllocator, warningCollector);
requireNonNull(root, format("%s returned a null plan", optimizer.getClass().getName()));
}
}

if (stage.ordinal() >= OPTIMIZED_AND_VALIDATED.ordinal()) {
// make sure we produce a valid plan after optimizations run. This is mainly to catch programming errors
planSanityChecker.validateFinalPlan(root, session, metadata, typeOperators, typeAnalyzer, symbolAllocator.getTypes(), warningCollector);
}

TypeProvider types = symbolAllocator.getTypes();

StatsAndCosts statsAndCosts = StatsAndCosts.empty();
if (collectPlanStatistics) {
StatsProvider statsProvider = new CachingStatsProvider(statsCalculator, session, types);
CostProvider costProvider = new CachingCostProvider(costCalculator, statsProvider, Optional.empty(), session, types);
statsAndCosts = StatsAndCosts.create(root, statsProvider, costProvider);
}
return new Plan(root, types, statsAndCosts);
}

public PlanNode planStatement(Analysis analysis, Statement statement)
{
if ((statement instanceof CreateTableAsSelect && analysis.getCreate().get().isCreateTableAsSelectNoOp()) ||
statement instanceof RefreshMaterializedView && analysis.isSkipMaterializedViewRefresh()) {
Symbol symbol = symbolAllocator.newSymbol("rows", BIGINT);
PlanNode source = new ValuesNode(idAllocator.getNextId(), ImmutableList.of(symbol), ImmutableList.of(new Row(ImmutableList.of(new GenericLiteral("BIGINT", "0")))));
return new OutputNode(idAllocator.getNextId(), source, ImmutableList.of("rows"), ImmutableList.of(symbol));
}
return createOutputPlan(planStatementWithoutOutput(analysis, statement), analysis);
}

Comments

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×