Presto提交查询

Presto提交查询

Presto提交查询

阅读源码时梳理逻辑用,无阅读价值

提交查询的流程

Presto/Trino 客户端对查询语句的提交分三个步骤:

  1. 从指定的文件、命令行参数或者Cli窗口中获取需要执行的SQL语句。
  2. 将得到的SQL语句组装成一个RESTFul 请求,发送给Coordinator,并返回处理的response
  3. Client 会不停地循环分配获取查询结果,返回给终端显示/返回给JDBC connect,直到查询结果完全显示

cli提交查询的入口在:client 目录下的trino-cli 工程中,入口类io.trino.cli.Trino

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public static void main(String[] args)
{
System.exit(createCommandLine(new Console()).execute(args));
}

public static CommandLine createCommandLine(Object command)
{
CommandLine commandLine = new CommandLine(command)
.registerConverter(ClientResourceEstimate.class, ClientResourceEstimate::new)
.registerConverter(ClientSessionProperty.class, ClientSessionProperty::new)
.registerConverter(ClientExtraCredential.class, ClientExtraCredential::new)
.registerConverter(HostAndPort.class, HostAndPort::fromString)
.registerConverter(Duration.class, Duration::valueOf);

getConfigFile().ifPresent(file -> ValidatingPropertiesDefaultProvider.attach(commandLine, file));
return commandLine;
}

Console 实现了Callable<Integer>,新线程,主要看run 方法,run方法中,会根据命令行参数,判断是否存在需要执行的SQL(SQL 根据参数--execute 或者 --file获取),如果存在SQL直接执行sql,如果不存在SQL则启动一个presto-cli命令行终端。

io.trino.cli.Console.run()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
// 部分代码,详情见 io.trino.cli.Console.run()
// 初始化一个新的clientSession,主要根据用户输入的参数项构建 包括 serverURI,catalog,schema以及认证连接属性项等等
// 启动trinocli 的命令: ./trino-cli --server presto1:8285 --catalog hive --schema default
ClientSession session = clientOptions.toClientSession();
// .... 中间省略 获取 输入指令/SQL......
// 构建一个QueryRunner,主要属性包括一个clientSession、一个OkHttpClient
// queryRunner看上去用了很多clientOptions,大部分就是为了和coordinator在http通信过程中进行安全认证
try (QueryRunner queryRunner = new QueryRunner(
session,
clientOptions.debug,
clientOptions.socksProxy,
clientOptions.httpProxy,
clientOptions.keystorePath,
clientOptions.keystorePassword,
clientOptions.keystoreType,
clientOptions.truststorePath,
clientOptions.truststorePassword,
clientOptions.truststoreType,
clientOptions.insecure,
clientOptions.accessToken,
clientOptions.user,
clientOptions.password ? Optional.of(getPassword()) : Optional.empty(),
clientOptions.krb5Principal,
clientOptions.krb5ServicePrincipalPattern,
clientOptions.krb5RemoteServiceName,
clientOptions.krb5ConfigPath,
clientOptions.krb5KeytabPath,
clientOptions.krb5CredentialCachePath,
!clientOptions.krb5DisableRemoteServiceHostnameCanonicalization,
clientOptions.externalAuthentication)) {
// 如果存在指令/SQL,则执行
if (hasQuery) {
return executeCommand(
queryRunner,
exiting,
query,
clientOptions.outputFormat,
clientOptions.ignoreErrors,
clientOptions.progress);
}
//反之启动一个命令行终端
runConsole(queryRunner, exiting);
return true;
}
finally {
exited.countDown();
interruptor.close();
}

再来看 executeCommand()

io.trino.cli.Console.executeCommand()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private static boolean executeCommand(
QueryRunner queryRunner,
AtomicBoolean exiting,
String query,
OutputFormat outputFormat,
boolean ignoreErrors,
boolean showProgress)
{
boolean success = true;
StatementSplitter splitter = new StatementSplitter(query);
for (Statement split : splitter.getCompleteStatements()) {
if (!isEmptyStatement(split.statement())) {
// 分割后的指令处理均在process(),showProgress 如果没指定,默认一次性给定结果
if (!process(queryRunner, split.statement(), outputFormat, () -> {}, false, showProgress, getTerminal(), System.out, System.err)) {
if (!ignoreErrors) {
return false;
}
success = false;
}
}
if (exiting.get()) {
return success;
}
}
if (!isEmptyStatement(splitter.getPartialStatement())) {
System.err.println("Non-terminated statement: " + splitter.getPartialStatement());
return false;
}
return success;
}

从上面代码可以看出,核心代码是将获取到的SQL进行分割(按;)得到statement(内部类,包含两个属性String statement;String terminator;并非经过词法语法分析后在io.trino.sql.tree包下的Statement),然后转给process()处理。

再看runConsole() 控制台接受到SQL指令后如何处理:

runConsole() 部分
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
								// read a line of input from user
String line;
try {
line = reader.readLine(commandPrompt, remaining);
}
catch (UserInterruptException e) {
if (!e.getPartialLine().isEmpty()) {
reader.getHistory().add(e.getPartialLine());
}
remaining = "";
continue;
}
catch (EndOfFileException e) {
System.out.println();
return;
}
// check for special commands -- must match InputParser
String command = CharMatcher.is(';').or(whitespace()).trimTrailingFrom(line);
switch (command.toLowerCase(ENGLISH)) {
case "exit":
case "quit":
return;
case "history":
for (History.Entry entry : reader.getHistory()) {
System.out.println(new AttributedStringBuilder()
.style(DEFAULT.foreground(CYAN))
.append(format("%5d", entry.index() + 1))
.style(DEFAULT)
.append(" ")
.append(entry.line())
.toAnsi(reader.getTerminal()));
}
continue;
case "help":
System.out.println();
System.out.println(getHelpText());
continue;
}

// execute any complete statements
StatementSplitter splitter = new StatementSplitter(line, STATEMENT_DELIMITERS);
for (Statement split : splitter.getCompleteStatements()) {
OutputFormat outputFormat = OutputFormat.ALIGNED;
if (split.terminator().equals("\\G")) {
outputFormat = OutputFormat.VERTICAL;
}
// 转到 process处理,其中 showProgress参数为true,这个参数后续决定是否循环按批次获取执行结果
process(queryRunner, split.statement(), outputFormat, tableNameCompleter::populateCache, true, true, reader.getTerminal(), System.out, System.out);
}

// replace remaining with trailing partial statement
remaining = whitespace().trimTrailingFrom(splitter.getPartialStatement());

按行读取,遇到分号转为一条命令,除了特殊的命令外,其余也是分割后交给 process()方法

所以不顾是直接接收sql,还是在终端中接受sql 命令,最后处理都在 io.trino.cli.Console.process()方法:

io.trino.cli.Console.process()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
private static boolean process(
QueryRunner queryRunner,
String sql,
OutputFormat outputFormat,
Runnable schemaChanged,
boolean usePager,
boolean showProgress,
Terminal terminal,
PrintStream out,
PrintStream errorChannel)
{
String finalSql;
try {
// 预处理指令
// 预处理指令 从 String preprocessorCommand = System.getenv(ENV_PREPROCESSOR);获取

finalSql = preprocessQuery(
terminal,
Optional.ofNullable(queryRunner.getSession().getCatalog()),
Optional.ofNullable(queryRunner.getSession().getSchema()),
sql);
}
catch (QueryPreprocessorException e) {
System.err.println(e.getMessage());
if (queryRunner.isDebug()) {
e.printStackTrace(System.err);
}
return false;
}
// startQuery主要是构建了 new StatementClientV1(httpClient, session, query),其中和coordinator进行http通信及response处理的逻辑都在StatementClientV1类中
// new StatementClientV1() 时,就发起了初次请求
try (Query query = queryRunner.startQuery(finalSql)) {
// 后续循环获取执行结果和执行状态在renderOutput()方法中
// 后续跟到renderQueryOutput(),再到processInitialStatusUpdates,最后到client.advance()方法
boolean success = query.renderOutput(terminal, out, errorChannel, outputFormat, usePager, showProgress);

ClientSession session = queryRunner.getSession();
// 如果 catalog 和 schema 发生了变更,则重新build ClientSession(比如 执行了 use other_schema 指令时)
// query的属性值setCatalog、setSchema、setPath的变更在StatementClient的实现类StatementClientV1的processResponse()方法)
// update catalog and schema if present
if (query.getSetCatalog().isPresent() || query.getSetSchema().isPresent()) {
session = ClientSession.builder(session)
.withCatalog(query.getSetCatalog().orElse(session.getCatalog()))
.withSchema(query.getSetSchema().orElse(session.getSchema()))
.build();
}

// update transaction ID if necessary
if (query.isClearTransactionId()) {
session = stripTransactionId(session);
}

ClientSession.Builder builder = ClientSession.builder(session);

if (query.getStartedTransactionId() != null) {
builder = builder.withTransactionId(query.getStartedTransactionId());
}

// update path if present
if (query.getSetPath().isPresent()) {
builder = builder.withPath(query.getSetPath().get());
}

// update session properties if present
if (!query.getSetSessionProperties().isEmpty() || !query.getResetSessionProperties().isEmpty()) {
Map<String, String> sessionProperties = new HashMap<>(session.getProperties());
sessionProperties.putAll(query.getSetSessionProperties());
sessionProperties.keySet().removeAll(query.getResetSessionProperties());
builder = builder.withProperties(sessionProperties);
}

// update session roles
if (!query.getSetRoles().isEmpty()) {
Map<String, ClientSelectedRole> roles = new HashMap<>(session.getRoles());
roles.putAll(query.getSetRoles());
builder = builder.withRoles(roles);
}

// update prepared statements if present
if (!query.getAddedPreparedStatements().isEmpty() || !query.getDeallocatedPreparedStatements().isEmpty()) {
Map<String, String> preparedStatements = new HashMap<>(session.getPreparedStatements());
preparedStatements.putAll(query.getAddedPreparedStatements());
preparedStatements.keySet().removeAll(query.getDeallocatedPreparedStatements());
builder = builder.withPreparedStatements(preparedStatements);
}

session = builder.build();
queryRunner.setSession(session);

if (query.getSetCatalog().isPresent() || query.getSetSchema().isPresent()) {
schemaChanged.run();
}

return success;
}
catch (RuntimeException e) {
System.err.println("Error running command: " + e.getMessage());
if (queryRunner.isDebug()) {
e.printStackTrace(System.err);
}
return false;
}
}

process()方法着重分析Query query = queryRunner.startQuery(finalSql)boolean success = query.renderOutput(..)

queryRunner.startQuery()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public Query startQuery(String query)
{
return new Query(startInternalQuery(session.get(), query), debug);
}

public StatementClient startInternalQuery(String query)
{
return startInternalQuery(stripTransactionId(session.get()), query);
}

private StatementClient startInternalQuery(ClientSession session, String query)
{
// 在new Query中,构建了一个OKHttpClient
OkHttpClient.Builder builder = httpClient.newBuilder();
sslSetup.accept(builder);
OkHttpClient client = builder.build();
// 最终目的 构建了一个StatementClientV1
return newStatementClient(client, session, query);
}
// -------- StatementClientFactory.newStatementClient() ------------
public static StatementClient newStatementClient(OkHttpClient httpClient, ClientSession session, String query)
{
// 最终目的 构建了一个StatementClientV1
return new StatementClientV1(httpClient, session, query);
}
// -------- StatementClientV1 类的构造方法StatementClientV1() ------------
public StatementClientV1(OkHttpClient httpClient, ClientSession session, String query)
{
requireNonNull(httpClient, "httpClient is null");
requireNonNull(session, "session is null");
requireNonNull(query, "query is null");

this.httpClient = httpClient;
this.timeZone = session.getTimeZone();
this.query = query;
this.requestTimeoutNanos = session.getClientRequestTimeout();
this.user = session.getUser().orElse(session.getPrincipal());
this.clientCapabilities = Joiner.on(",").join(ClientCapabilities.values());
this.compressionDisabled = session.isCompressionDisabled();
// 构建request
Request request = buildQueryRequest(session, query);
// 初次向coordinator 提交Http请求
JsonResponse<QueryResults> response = JsonResponse.execute(QUERY_RESULTS_CODEC, httpClient, request);
if ((response.getStatusCode() != HTTP_OK) || !response.hasValue()) {
state.compareAndSet(State.RUNNING, State.CLIENT_ERROR);
throw requestFailedException("starting query", request, response);
}
// 处理response
processResponse(response.getHeaders(), response.getValue());
}

queryRunner.startQuery主要实例化了 StatementClientV1对象,并且初次向coordinator 提交Http请求;然后在来看query.renderOutput(..)

query.renderOutput(..)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
// --------  Query.renderOutput() ------------
public boolean renderOutput(Terminal terminal, PrintStream out, PrintStream errorChannel, OutputFormat outputFormat, boolean usePager, boolean showProgress)
{
Thread clientThread = Thread.currentThread();
SignalHandler oldHandler = terminal.handle(Signal.INT, signal -> {
if (ignoreUserInterrupt.get() || client.isClientAborted()) {
return;
}
client.close();
//可随时终止
clientThread.interrupt();
});
try {
// 下跟 renderQueryOutput
return renderQueryOutput(terminal, out, errorChannel, outputFormat, usePager, showProgress);
}
finally {
terminal.handle(Signal.INT, oldHandler);
Thread.interrupted(); // clear interrupt status
}
}
// -------- Query.renderQueryOutput() ------------
private boolean renderQueryOutput(Terminal terminal, PrintStream out, PrintStream errorChannel, OutputFormat outputFormat, boolean usePager, boolean showProgress)
{
StatusPrinter statusPrinter = null;
WarningsPrinter warningsPrinter = new PrintStreamWarningsPrinter(errorChannel);

if (showProgress) {
// 显示进度
statusPrinter = new StatusPrinter(client, errorChannel, debug);
statusPrinter.printInitialStatusUpdates(terminal);
}
else {
// 不展示进度
processInitialStatusUpdates(warningsPrinter);
}
// .... 省略

// -------- Query.printInitialStatusUpdates() ------------
/*

Query 16, RUNNING, 1 node, 855 splits
http://my.server:8080/v1/query/16?pretty
Splits: 646 queued, 34 running, 175 done
CPU Time: 33.7s total, 191K rows/s, 16.6MB/s, 22% active
Per Node: 2.5 parallelism, 473K rows/s, 41.1MB/s
Parallelism: 2.5
Peak Memory: 1.97GB
Spilled: 20GB
0:13 [6.45M rows, 560MB] [ 473K rows/s, 41.1MB/s] [=========>> ] 20%

STAGES ROWS ROWS/s BYTES BYTES/s PEND RUN DONE
0.........R 13.8M 336K 1.99G 49.5M 0 1 706
1.......R 666K 41.5K 82.1M 5.12M 563 65 79
2.....R 4.58M 234K 620M 31.6M 406 65 236

*/
public void printInitialStatusUpdates(Terminal terminal)
{
Attributes originalAttributes = terminal.enterRawMode();
long lastPrint = System.nanoTime();
try {
WarningsPrinter warningsPrinter = new ConsoleWarningsPrinter(console);
while (client.isRunning()) {
try {
// update screen
if (update) {
updateScreen(warningsPrinter);
lastPrint = System.nanoTime();
}

// fetch next results (server will wait for a while if no data)
client.advance();
// .... 省略
}

private void processInitialStatusUpdates(WarningsPrinter warningsPrinter)
{
while (client.isRunning() && (client.currentData().getData() == null)) {
warningsPrinter.print(client.currentStatusInfo().getWarnings(), true, false);
try {
client.advance();
}
catch (RuntimeException e) {
log.debug(e, "error printing status");
}
}
List<Warning> warnings;
if (client.isRunning()) {
warnings = client.currentStatusInfo().getWarnings();
}
else {
warnings = client.finalStatusInfo().getWarnings();
}
warningsPrinter.print(warnings, false, true);
}

不管是否显示进度,最后都通过client.advance()进行处理:

StatementClientV1.advance()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
// --------  StatementClientV1.advance() ------------		
// advance() 是在while(client.isRunning()) 循环体中循环执行的
@Override
public boolean advance()
{
if (!isRunning()) {
return false;
}

URI nextUri = currentStatusInfo().getNextUri();
// 如果response 中不包含NextUri,则判断执行结束
if (nextUri == null) {
state.compareAndSet(State.RUNNING, State.FINISHED);
return false;
}
// 根据nextUri 重新构建 http请求
Request request = prepareRequest(HttpUrl.get(nextUri)).build();

Exception cause = null;
long start = System.nanoTime();
long attempts = 0;

while (true) {
if (isClientAborted()) {
return false;
}

Duration sinceStart = Duration.nanosSince(start);
if (attempts > 0 && sinceStart.compareTo(requestTimeoutNanos) > 0) {
state.compareAndSet(State.RUNNING, State.CLIENT_ERROR);
throw new RuntimeException(format("Error fetching next (attempts: %s, duration: %s)", attempts, sinceStart), cause);
}

if (attempts > 0) {
// back-off on retry
try {
MILLISECONDS.sleep(attempts * 100);
}
catch (InterruptedException e) {
try {
close();
}
finally {
Thread.currentThread().interrupt();
}
state.compareAndSet(State.RUNNING, State.CLIENT_ERROR);
throw new RuntimeException("StatementClient thread was interrupted");
}
}
attempts++;

JsonResponse<QueryResults> response;
try {
// 发起http请求, 结果包装成 QueryResult,具体见后面的代码
response = JsonResponse.execute(QUERY_RESULTS_CODEC, httpClient, request);
}
catch (RuntimeException e) {
cause = e;
continue;
}

if ((response.getStatusCode() == HTTP_OK) && response.hasValue()) {
// response 处理
processResponse(response.getHeaders(), response.getValue());
return true;
}

if (response.getStatusCode() != HTTP_UNAVAILABLE) {
state.compareAndSet(State.RUNNING, State.CLIENT_ERROR);
throw requestFailedException("fetching next", request, response);
}
}
}
// -------- StatementClientV1.processResponse() ------------
// 大部分是header 处理,属性值变了,后续处理在io.trino.cli.Console.process() 方法的boolean success = query.renderOutput 行后面
private void processResponse(Headers headers, QueryResults results)
{
//如果传过去的指令变更了 catalog 或者 schema,比如 use kudu.default
setCatalog.set(headers.get(TRINO_HEADERS.responseSetCatalog()));
setSchema.set(headers.get(TRINO_HEADERS.responseSetSchema()));
setPath.set(headers.get(TRINO_HEADERS.responseSetPath()));
// session属性值重设
for (String setSession : headers.values(TRINO_HEADERS.responseSetSession())) {
List<String> keyValue = SESSION_HEADER_SPLITTER.splitToList(setSession);
if (keyValue.size() != 2) {
continue;
}
setSessionProperties.put(keyValue.get(0), urlDecode(keyValue.get(1)));
}
resetSessionProperties.addAll(headers.values(TRINO_HEADERS.responseClearSession()));

for (String setRole : headers.values(TRINO_HEADERS.responseSetRole())) {
List<String> keyValue = SESSION_HEADER_SPLITTER.splitToList(setRole);
if (keyValue.size() != 2) {
continue;
}
setRoles.put(keyValue.get(0), ClientSelectedRole.valueOf(urlDecode(keyValue.get(1))));
}

for (String entry : headers.values(TRINO_HEADERS.responseAddedPrepare())) {
List<String> keyValue = SESSION_HEADER_SPLITTER.splitToList(entry);
if (keyValue.size() != 2) {
continue;
}
addedPreparedStatements.put(urlDecode(keyValue.get(0)), urlDecode(keyValue.get(1)));
}
for (String entry : headers.values(TRINO_HEADERS.responseDeallocatedPrepare())) {
deallocatedPreparedStatements.add(urlDecode(entry));
}

String startedTransactionId = headers.get(TRINO_HEADERS.responseStartedTransactionId());
if (startedTransactionId != null) {
this.startedTransactionId.set(startedTransactionId);
}
if (headers.get(TRINO_HEADERS.responseClearTransactionId()) != null) {
clearTransactionId.set(true);
}
// 将结果设置到 currentResults
currentResults.set(results);
}

// -------- io.trino.client.QueryResults 的构造方法------------
// nextUri 是通过Json直接反序列化的
//JsonResponse<QueryResults> response;
// try {
// // 发起http请求, 结果包装成 QueryResult,具体见后面的代码
// response = JsonResponse.execute(QUERY_RESULTS_CODEC, httpClient, request);
}

@JsonCreator
public QueryResults(
@JsonProperty("id") String id,
@JsonProperty("infoUri") URI infoUri,
@JsonProperty("partialCancelUri") URI partialCancelUri,
@JsonProperty("nextUri") URI nextUri,
@JsonProperty("columns") List<Column> columns,
@JsonProperty("data") List<List<Object>> data,
@JsonProperty("stats") StatementStats stats,
@JsonProperty("error") QueryError error,
@JsonProperty("warnings") List<Warning> warnings,
@JsonProperty("updateType") String updateType,
@JsonProperty("updateCount") Long updateCount)
{
this(
id,
infoUri,
partialCancelUri,
nextUri,
columns,
fixData(columns, data),
stats,
error,
firstNonNull(warnings, ImmutableList.of()),
updateType,
updateCount);
}

最后查询过程中的状态打印在 io.trino.cli.StatusPrinter

结果的打印在 io.trino.cli.Query的renderResults()方法->doRenderResults()方法->pageOutput() & sendOutput() 方法 -> processRows(StatementClient client)


Comments

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×