Presto执行计划-词法语法分析

Presto执行计划-词法语法分析

Presto执行计划-词法语法分析

阅读源码时梳理逻辑用,无阅读价值

如果要跟可以从上一节提交查询过程中client 发送给coordinator 的提交查询API (/v1/statement)开始跟

client初始提交query后,coordinator 创建了一个Query,然后组装nextUri 后直接response了,这时候 query 尚未排队,只有当client 来请求query状态了(/v1/statement/queued/{queryId}/{slug}/{token}),这时才将其加入到调度队列中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
@ResourceSecurity(AUTHENTICATED_USER)
@POST
@Produces(APPLICATION_JSON)
public Response postStatement(
String statement,
@Context HttpServletRequest servletRequest,
@Context HttpHeaders httpHeaders,
@Context UriInfo uriInfo)
{
if (isNullOrEmpty(statement)) {
throw badRequest(BAD_REQUEST, "SQL statement is empty");
}

String remoteAddress = servletRequest.getRemoteAddr();
Optional<Identity> identity = Optional.ofNullable((Identity) servletRequest.getAttribute(AUTHENTICATED_IDENTITY));
MultivaluedMap<String, String> headers = httpHeaders.getRequestHeaders();

SessionContext sessionContext = new HttpRequestSessionContext(headers, alternateHeaderName, remoteAddress, identity, groupProvider);
//coordinator 创建query
Query query = new Query(statement, sessionContext, dispatchManager, queryInfoUrlFactory);
//queries :ConcurrentMap<QueryId, Query> queries = new ConcurrentHashMap<>();
//只是用来持续监听,及时销毁
queries.put(query.getQueryId(), query);

// let authentication filter know that identity lifecycle has been handed off
servletRequest.setAttribute(AUTHENTICATED_IDENTITY, null);
//直接返回了,并没有此时就进入查询计划流程
//query.getQueryResults会生成nextUri,即下一个API
return createQueryResultsResponse(query.getQueryResults(query.getLastToken(), uriInfo), compressionEnabled);
}

@ResourceSecurity(PUBLIC)
@GET
@Path("queued/{queryId}/{slug}/{token}")
@Produces(APPLICATION_JSON)
public void getStatus(
@PathParam("queryId") QueryId queryId,
@PathParam("slug") String slug,
@PathParam("token") long token,
@QueryParam("maxWait") Duration maxWait,
@Context UriInfo uriInfo,
@Suspended AsyncResponse asyncResponse)
{
// client 会根据上一个接口中返回nextUri立刻请求该当前接口
Query query = getQuery(queryId, slug, token);

// wait for query to be dispatched, up to the wait timeout
// waitForDispatched() 初次将 dispatchManager.createQuery
// 后续就是等待query执行完成,具体见后面的代码
ListenableFuture<?> futureStateChange = addTimeout(
query.waitForDispatched(),
() -> null,
WAIT_ORDERING.min(MAX_WAIT_TIME, maxWait),
timeoutExecutor);

// when state changes, fetch the next result
ListenableFuture<QueryResults> queryResultsFuture = Futures.transform(
futureStateChange,
ignored -> query.getQueryResults(token, uriInfo),
responseExecutor);

// transform to Response
ListenableFuture<Response> response = Futures.transform(
queryResultsFuture,
queryResults -> createQueryResultsResponse(queryResults, compressionEnabled),
directExecutor());
bindAsyncResponse(asyncResponse, response, responseExecutor);
}

//----------- 等待分配,初次由dispatchManager.createQuery----------
private ListenableFuture<?> waitForDispatched()
{
// if query submission has not finished, wait for it to finish
synchronized (this) {
if (querySubmissionFuture == null) {
querySubmissionFuture = dispatchManager.createQuery(queryId, slug, sessionContext, query);
}
if (!querySubmissionFuture.isDone()) {
return querySubmissionFuture;
}
}

// otherwise, wait for the query to finish
return dispatchManager.waitForDispatched(queryId);
}
io.trino.dispatcher.DispatchManager
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
public ListenableFuture<?> createQuery(QueryId queryId, Slug slug, SessionContext sessionContext, String query)
{
requireNonNull(queryId, "queryId is null");
requireNonNull(sessionContext, "sessionContext is null");
requireNonNull(query, "query is null");
checkArgument(!query.isEmpty(), "query must not be empty string");
checkArgument(queryTracker.tryGetQuery(queryId).isEmpty(), "query %s already exists", queryId);

DispatchQueryCreationFuture queryCreationFuture = new DispatchQueryCreationFuture();
dispatchExecutor.execute(() -> {
try {
createQueryInternal(queryId, slug, sessionContext, query, resourceGroupManager);
}
finally {
queryCreationFuture.set(null);
}
});
return queryCreationFuture;
}
// -------------- io.trino.dispatcher.DispatchManager.createQueryInternal() ---------
/**
* Creates and registers a dispatch query with the query tracker. This method will never fail to register a query with the query
* tracker. If an error occurs while creating a dispatch query, a failed dispatch will be created and registered.
*/
private <C> void createQueryInternal(QueryId queryId, Slug slug, SessionContext sessionContext, String query, ResourceGroupManager<C> resourceGroupManager)
{
Session session = null;
PreparedQuery preparedQuery = null;
try {
if (query.length() > maxQueryLength) {
int queryLength = query.length();
query = query.substring(0, maxQueryLength);
throw new TrinoException(QUERY_TEXT_TOO_LARGE, format("Query text length (%s) exceeds the maximum length (%s)", queryLength, maxQueryLength));
}

// decode session
session = sessionSupplier.createSession(queryId, sessionContext);

// check query execute permissions
accessControl.checkCanExecuteQuery(sessionContext.getIdentity());

// prepare query
// 对query 进行预处理。在这里就正式开启查询计划生成之旅,先做词法语法解析SqlParse.createStatement()
// 经过Antlr 4语法解析起进行语法分析后,最终生成了一个Node,然后转成Statement,然后再包装成PreparedQuery
preparedQuery = queryPreparer.prepareQuery(session, query);

// select resource group
//根据queryType 选择SelectionContext
Optional<String> queryType = getQueryType(preparedQuery.getStatement().getClass()).map(Enum::name);
SelectionContext<C> selectionContext = resourceGroupManager.selectGroup(new SelectionCriteria(
sessionContext.getIdentity().getPrincipal().isPresent(),
sessionContext.getIdentity().getUser(),
sessionContext.getIdentity().getGroups(),
Optional.ofNullable(sessionContext.getSource()),
sessionContext.getClientTags(),
sessionContext.getResourceEstimates(),
queryType));

// apply system default session properties (does not override user set properties)
session = sessionPropertyDefaults.newSessionWithDefaultProperties(session, queryType, selectionContext.getResourceGroupId());

// mark existing transaction as active
// 将现有事务标记为活动状态
transactionManager.activateTransaction(session, isTransactionControlStatement(preparedQuery.getStatement()), accessControl);

//**** 在这里将进行queryExcution创建 ****
DispatchQuery dispatchQuery = dispatchQueryFactory.createDispatchQuery(
session,
query,
preparedQuery,
slug,
selectionContext.getResourceGroupId());

//将query添加到queryTracker
boolean queryAdded = queryCreated(dispatchQuery);
if (queryAdded && !dispatchQuery.isDone()) {
try {
resourceGroupManager.submit(dispatchQuery, selectionContext, dispatchExecutor);
}
catch (Throwable e) {
// dispatch query has already been registered, so just fail it directly
dispatchQuery.fail(e);
}
}
}
catch (Throwable throwable) {
// creation must never fail, so register a failed query in this case
if (session == null) {
session = Session.builder(new SessionPropertyManager())
.setQueryId(queryId)
.setIdentity(sessionContext.getIdentity())
.setSource(sessionContext.getSource())
.build();
}
Optional<String> preparedSql = Optional.ofNullable(preparedQuery).flatMap(PreparedQuery::getPrepareSql);
DispatchQuery failedDispatchQuery = failedDispatchQueryFactory.createFailedDispatchQuery(session, query, preparedSql, Optional.empty(), throwable);
queryCreated(failedDispatchQuery);
}
}
io.trino.execution.QueryPreparer
1
2
3
4
5
6
public PreparedQuery prepareQuery(Session session, String query)
throws ParsingException, TrinoException
{
Statement wrappedStatement = sqlParser.createStatement(query, createParsingOptions(session));
return prepareQuery(session, wrappedStatement);
}
io.trino.sql.parser.SqlParser
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
public Statement createStatement(String sql, ParsingOptions parsingOptions)
{
return (Statement) invokeParser("statement", sql, SqlBaseParser::singleStatement, parsingOptions);
}


private Node invokeParser(String name, String sql, Function<SqlBaseParser, ParserRuleContext> parseFunction, ParsingOptions parsingOptions)
{
try {
// 解析器是通过AstBuilder编译SqlBase.g4文件生成的类
//SqlBaseLexer:Generated from io/trino/sql/parser/SqlBase.g4 by ANTLR 4.9
//SqlBaseParser:Generated from io/trino/sql/parser/SqlBase.g4 by ANTLR 4.9
SqlBaseLexer lexer = new SqlBaseLexer(new CaseInsensitiveStream(CharStreams.fromString(sql)));
CommonTokenStream tokenStream = new CommonTokenStream(lexer);
SqlBaseParser parser = new SqlBaseParser(tokenStream);
initializer.accept(lexer, parser);

// Override the default error strategy to not attempt inserting or deleting a token.
// Otherwise, it messes up error reporting
parser.setErrorHandler(new DefaultErrorStrategy()
{
@Override
public Token recoverInline(Parser recognizer)
throws RecognitionException
{
if (nextTokensContext == null) {
throw new InputMismatchException(recognizer);
}
else {
throw new InputMismatchException(recognizer, nextTokensState, nextTokensContext);
}
}
});

parser.addParseListener(new PostProcessor(Arrays.asList(parser.getRuleNames()), parser));

lexer.removeErrorListeners();
lexer.addErrorListener(LEXER_ERROR_LISTENER);

parser.removeErrorListeners();
parser.addErrorListener(PARSER_ERROR_HANDLER);

//两种解析模式,最终为了得到ParserRuleContext tree
ParserRuleContext tree;
try {
// first, try parsing with potentially faster SLL mode
//首先使用速度更快的SLL 模式进行语法解析
parser.getInterpreter().setPredictionMode(PredictionMode.SLL);
tree = parseFunction.apply(parser);
}
catch (ParseCancellationException ex) {
// if we fail, parse with LL mode
// SLL失败,使用LL模式进行语法预测,该模式能保证sql的解析结果是正确的
tokenStream.seek(0); // rewind input stream
parser.reset();

parser.getInterpreter().setPredictionMode(PredictionMode.LL);
tree = parseFunction.apply(parser);
}
//后续解析工作就交给ANTLR4 语法解析器了
return new AstBuilder(parsingOptions).visit(tree);
}
catch (StackOverflowError e) {
throw new ParsingException(name + " is too large (stack overflow while parsing)");
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
new AstBuilder(parsingOptions).visit(tree);


public T visit(ParseTree tree) {
return tree.accept(this);
}
//继续跟accept 后,将进入到 SqlBaseParser类
//该类也是由 io/trino/sql/parser/SqlBase.g4 生成,里面有非常多的静态内部类
@Override
public <T> T accept(ParseTreeVisitor<? extends T> visitor) {
if ( visitor instanceof SqlBaseVisitor ) return ((SqlBaseVisitor<? extends T>)visitor).visitAddColumn(this);
else return visitor.visitChildren(this);
}

public static class CreateMaterializedViewContext extends StatementContext {
//。。。。
@Override
public <T> T accept(ParseTreeVisitor<? extends T> visitor) {
if ( visitor instanceof SqlBaseVisitor ) return ((SqlBaseVisitor<? extends T>)visitor).visitCreateMaterializedView(this);
else return visitor.visitChildren(this);
}
}

Antlr (ANother Tool for Language Recognition) 是一个强大的跨语言语法解析器,可以用来读取、处理、执行或翻译结构化文本或二进制文件。它被广泛用来构建语言,工具和框架。Antlr可以从语法上来生成一个可以构建和遍历解析树的解析器。

官方地址:https://github.com/antlr/grammars-v4

简明教程:https://iamazy.github.io/2020/02/12/antlr4-jiao-cheng/

经过Antlr 4语法解析起进行语法分析后,最终生成了一个Node,然后转成Statement,然后再包装成PreparedQuery


Comments

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×