Presto核心设计

presto 核心设计

1 RESTFUL架构

在presto中几乎所有的操作都是依赖于AirLift构造的RESTful服务来完成的,包括worker节点的管理、查询语句的提交、查询状态的显示、各个task之间数据的传递等。因此presto中的RESTful服务是presto集群的基础。

presto中提供了四种类型的RESTful接口,分别是statement服务接口、query服务接口、stage服务接口、task服务接口

1.statement服务接口

与sql语句相关的请求均由该服务接口处理,包括接收提交的sql语句、获取查询执行结果的语句、取消查询语句等。statement服务接口的实现类为StatementResource。

2.query服务接口

与查询相关 的RESTful请求均由query服务接口处理,包裹sql语句的提交、获取查询执行的结果、取消查询等。query服务接口实现类为QueryResource。

3.stage服务接口

与stage相关的RESTful请求均由stage服务接口处理,其实该接口只提供了一个功能,就是取消或者结束一个指定的stage。stage服务接口的实现类为StageResource。

4.task服务接口

与task相关的RESTful请求均由ask服务接口处理,包括task的创建、更新、状态查询和结果查询等。task服务接口的实现类为TaskResource

presto集群中的数据传输、节点通信、心跳感应、计算监控、计算调度和计算分布全部都是基于RESTful服务实现的,因此presto中的RESTful服务就是presto所有服务的基石。

2 提交查询

终端用户可以用过jdbc或者cli提交查询语句,也可以通过第三方机构或者个人使用python、c语言开发的驱动提交查询。presto客户端对查询语句的提交主要分为三个步骤。

  • 从指定的文件、命令行参数或者cli窗口中获取需要执行的sql语句。

  • 将得到的sql语句组装成一个RESTful请求,发送给Coordinator,并处理返回的response。

  • cli会不停的循环分批读取查询结果并在屏幕进行动态展示,直到查询结果完全显示完毕。

查询流程图

3 生成查询计划

本章主要讲述presto对一个传入的sql语句如何进行解析并生成最终的执行计划。

从上图可以看到,生成查询计划分成语法分析、词法分析、语义分析、执行计划生成、执行计划优化、执行计划分阶段执行。

1基本概念

11node

查询语句经过词法和语法分析之后,会生成抽象语法树(AST),该语法树中的每一个节点都是一个Node(SQL语句的一部分,如select、where、group by等)。Node是一个抽象类,实现类如下:

  1. approximate 用于近似查询
  2. explainOption 标识explain语句的可选参数,有explainFormat和explainType两类。explainFormat标识输出结果的格式,有text和graphviz两种类型。explainType标识explain语句的类型,有logical和distributed两类,分别标识生成逻辑执行计划与生成分布式执行计划。
  3. expression 标识sql语句中出现的表达式。
  4. frame bound 用于窗口函数中滑动窗口的可选参数。
  5. relation 是一个抽象类,标识多个节点之间的关系,如join、union等。
  6. select 标识查询语句中的select部分。
  7. select item 标识select语句中的列类型,有allcolumns和singlecolumns两种类型。
  8. sort item 标识排序的某一列及其类型。
  9. statement 标识presto能使用的sql类型的sql语句。
  10. table element 标识建表语句描述表的每一列,包括列名与类型。
  11. window 表示一个窗口函数。
  12. window Frame 表示窗口函数中欢动窗口的可选参数。
  13. with 表示一个查询中所有的with语句,主要元素有recursive、querys。
  14. with query 表示一个with语句,主要元素有name、query、columnNames。

12metadata API

metadata API即是matadata接口,其提供了对源数据进行各种操作的接口,列如列出所有的数据库名、表名等。这些接口在对sql进行语义分宜以及某些ddl操作(如create table)的执行过程中会用到。
metadata api将不同Connector对其元数据的各种啊哦做抽象成一了统一的接口,使得在使用这些接口时无需考虑具体的底层connector实现。
metadata api除了提供对元数据操作的接口,还提供了一些通用的与connector无关的方法,例如列出presto支持的自定义函数等。

2 词法和语法分析

presto的此法于语法分析是封装在SQLQuerymanager的createQuery方法中。

21语法规则

presto使用ANTLR4编写sql语法,语法规则的定义在presto-parse项目的sqlbase.g4文件中,通过ANTLR4查看该文件的语法图。

22词法分析

SQLParse的createStatement方法调用其内部方法invokeParser。

23语法分析

presto使用visitor模式对sql语句进行语法分析。

3获取查询执行引擎

queryexecution表示一次查询执行,用于启动、停止与管理一个查询,以及统计这个查询的相关信息。

3.1 获取queryExecutionFactory

根据statement类型获取相对应的QueryExecutionFactory。QueryExecutionFactory是一个接口,其实现类有DataDefinitionExecutionFactory以及SqlQueryExecutionFactory。 executionFactories则是一个Map,存储了不同的Statement类型与QueryExecutionFactory实现类的对应关系,该map的初始化实在CoordinatorModule中进行的,对应关系如表:

总结:create table 、rename table 等ddl操作的sql语句对应了DataDefinitionExecutionFactory,而非ddl操作的sql语句。例如select、insert等对应了SqlQueryExecutionFactory。

3.2 创建QueryExecution

当以上的词法与语法分析出错,照着找不到statement实现类与QueryExecutionFactory实现类的对应关系时,将创建一个FailedQueryExecution,并封装错误信息,最后返回给用户。

调用之前获取的QueryExecutionFactory的createQueryExecution方法,获取对应的QueryExecution。DataDefinitionExecutionFactory创建的是DataDefinitionExecution,而 SqlQueryExecutionFactory创建的是SqlQueryExecution。

在DataDefinitionExecutionFactory创建DataDefinitionExecution时,根据statement类型将对应的 DataDefinitionExecutionTask实现类与DataDefinitionExecution绑定。

3.3启动QueryExecution

获取QueryExecution之后,SqlQueryQueueManager方法将QueryExecution与配置的查询队列规则进行匹配,如匹配成功且队列未满,则将QueryExecution加入匹配队列。

查询队列按照 FIFO规则调度查询。最后启动QueryExecution。

DataDefinitionExecution启动直接调用其绑定的DataDefinitionTask实现类的execute方法即可。以dropTable为例,由于DropTable与dropTableTask绑定,就会执行DropTableTask 的execute方法。 SqlQueryExecution启动比较复杂,需要执行查询计划、优化查询计划、分阶段执行查询计划。

语义分析

由于DataDefinitionExecution的执行直接调用DataDefinitionTask实现类的execute方法,并未经过执行计划生成的步骤,故以下的内容只针对SqlQueryExecutionFactory。

statement分析

statementAnalyzer是对statement进行予以分析的类,针对不同的statement实现类进行语义分析。

relation分析

TupleAnalyzer类是对Query中的Relation进行分析的类。

表达式分析

ExpressionAnalyzer类对sql语句中的表达式进行分析,主要功能如下:

  1. 获取表达式的类型
  2. 获取需要进行类型转换的表达式及其转换的目的类型。
  3. 获取表达式中存在的函数信息。
  4. 获取表达式中所有合法的列名及对应列的编号。
  5. 获取表达式中In语句中的子查询。

执行计划生成

LogicalPlanner类会根据以上针对SQL语句分析所得的结果,生成逻辑执行计划。

执行计划节点

执行计划树中的节点类型

节点 名称
AggregationNode 聚合操作的节点,聚合的类型有Final、Partial、Single三种,分别表示最终聚合、局部聚合和单点聚合,其中执行计划在进行优化之前,聚合的类型都是单点聚合,在执行计划优化器中会对其进行拆分成局部聚合和最终聚合。
DeleteNode 用于Delete操作的节点
DistinctLimitNode
ExchangeNode 用于在执行计划中不同stage之间交换数据的节点,出现在逻辑执行计划中
FilterNode 进行过滤操作的节点
IndexJoinNode 用于对Index Join操作的节点
IndexSorceNode 与Index join配合使用的执行数据源读取操作的节点
JoinNode 执行Join操作的节点
LimitNode 执行limit操作的节点
MarkDistinctNode 用于处理一下outputNode、projectNode的sql语句的节点
OutputNode 输出最终结果的节点
project 用于进行列映射的节点,用于将ProjectNode下层节点输出的列映射到Project上层节点输入的列
RemoteSourceNode 类似于ExchangeNode,用于分布式执行计划中不同的stage之间交换数据,出现在分布式执行计划中
RowNumberNode 用于处理窗口函数row_number
SampleNode 用于处理抽样函数
SemiJoinnode 用于处理执行计划生成过程中产生的SemiJoin
SortNode 用于排序操作
TableCommitNode 用于对create table as select语句、insert语句、delete语句的操作执行commit
TableScanNode 用于读取表的数据
TableWriterNode 用于向目的的表写入数据
TopNNode 用于取数据排序后的前N条结果,使用效率更高的TopN算法,而不是对所有数据进行全局派去在取前N条
TopNRowNumberNode 用于处理窗口函数row_number中排序前N条记录,使用效率更高的TopN算法。
UnionNode 用于处理Union操作
UnnestNode 用于处理Unnest操作
ValuesNode 用于处理Values语句
WindowNode 用于处理窗口函数

sql执行计划

RelationPlanner用于针对Relation类型的sql语句生成执行计划。

1、table

visitTable对table进行分析主要分为以下两步。如果该table是with所定义的表明,或者该table实际是一个view,则处理其所关联的查询生成执行计划。如果该table是普通的表,则构建TableScanNode。

2、AliasedRelation

visitAliasedRelation处理AliasedRelation所关联的relation,并生成执行计划。

3、SampledRelation

visitSampledRelation处理SampledRelation分为处理其关联的relation,生成执行计划树。构建一个sampleNode,添加到以上的执行计划树之上。

4、join

visitjoin处理join分为处理join左侧的relation,生成左侧执行计划树。如果join右侧是unnest且join类型为cross join或者Implicit join,则根据unnest构造一个UnnestNode以及一个ProjectNode,添加到左侧 执行计划树智商并返回。

query执行计划

queryplanner用于处理query和querySpecification。

执行计划优化

生成执行计划之后,会对所生成的执行计划进行优化,目前presto只支持基于规则的优化器。现有的优化器包括如下几种。

ImplementSampleAsFilter

将bernoulli抽样的samplenode改写为filternode,filternode的过滤条件为 rand() < SampleRatio

CannonicalizeExpressions

将执行计划中设计的表达式进行标准化,标准化的主要工作有。

1
2
3
is not null 改写为 not(is null)
if 语句改写为case when语句
处理时间函数

SimplifyExpressions

对执行计划中设计的表达式进行简化和优化处理,具体可查看ExpressionInterpreter。

UnaliaseSymbolReferences

用于去除执行计划中projectnode中的无异议映射。

执行计划分段

经过执行计划生成与执行计划优化之后,最后对执行计划进行分段。

执行计划分段 描述
source source阶段是从数据源的表中读取数据的阶段,一般包括tableScanNode和projectNode,以及可能存在的filterNode等。
fixed fixed阶段位于source阶段之后,该阶段将source阶段读取的数据分散到多个节点上进行处理,主要处理的操作有局部聚合、局部join、局部数据写入表等。
single single阶段位于fixed阶段之后,只在单个节点上执行,用于汇总所有的处理结果,例如针对局部聚合的数据进行最终聚合,并将结果传输给coordinator。
Coordinator_only Coordinator_only阶段只在coordinator上执行,对insert和create table操作进行commit的tableCommitNode属于Coordinator_only阶段。

举例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
Fragment 0 [SINGLE]                                                                                                                                                          
Output layout: [count]
Output partitioning: SINGLE []
Stage Execution Strategy: UNGROUPED_EXECUTION
Output[rc]
│ Layout: [count:bigint]
│ Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}
│ rc := count
└─ Aggregate(FINAL)
│ Layout: [count:bigint]
│ Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}
│ count := count("count_49")
└─ LocalExchange[SINGLE] ()
│ Layout: [count_49:bigint]
│ Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}
└─ RemoteSource[1]
Layout: [count_49:bigint]

Fragment 1 [HASH]
Output layout: [count_49]
Output partitioning: SINGLE []
Stage Execution Strategy: UNGROUPED_EXECUTION
Aggregate(PARTIAL)
│ Layout: [count_49:bigint]
│ count_49 := count("item_id_30")
└─ LeftJoin[("item_id" = "item_id_30") AND ("ship_from_org_id" = "expr_43")][$hashvalue_59, $hashvalue_60]
│ Layout: [item_id_30:bigint]
│ Estimates: {rows: 0 (0B), cpu: 0, memory: 0B, network: 0B}
│ Distribution: REPLICATED
├─ Project[]
│ │ Layout: [item_id:bigint, ship_from_org_id:bigint, $hashvalue_59:bigint]
│ │ Estimates: {rows: 0 (0B), cpu: 0, memory: 0B, network: 0B}
│ │ $hashvalue_59 := combine_hash(combine_hash(bigint '0', COALESCE("$operator$hash_code"("item_id"), 0)), COALESCE("$operator$hash_code"("ship_from_org_id"), 0))
│ └─ InnerJoin[("order_source_line_id" = "order_line_id")][$hashvalue, $hashvalue_51]
│ │ Layout: [item_id:bigint, ship_from_org_id:bigint]
│ │ Estimates: {rows: 0 (0B), cpu: 0, memory: 0B, network: 0B}
│ │ Distribution: PARTITIONED
│ │ dynamicFilterAssignments = {order_line_id -> df_557}
│ ├─ RemoteSource[2]
│ │ Layout: [order_source_line_id:bigint, item_id:bigint, $hashvalue:bigint]
│ └─ LocalExchange[HASH][$hashvalue_51] ("order_line_id")
│ │ Layout: [order_line_id:bigint, ship_from_org_id:bigint, $hashvalue_51:bigint]
│ │ Estimates: {rows: 0 (0B), cpu: 0, memory: 0B, network: 0B}
│ └─ RemoteSource[3]
│ Layout: [order_line_id:bigint, ship_from_org_id:bigint, $hashvalue_52:bigint]
└─ LocalExchange[HASH][$hashvalue_60] ("item_id_30", "expr_43")
│ Layout: [expr_43:bigint, item_id_30:bigint, $hashvalue_60:bigint]
│ Estimates: {rows: 0 (0B), cpu: 0, memory: 0B, network: 0B}
└─ RemoteSource[6]
Layout: [expr_43:bigint, item_id_30:bigint, $hashvalue_61:bigint]

Fragment 2 [SOURCE]
Output layout: [order_source_line_id, item_id, $hashvalue_50]
Output partitioning: HASH [order_source_line_id][$hashvalue_50]
Stage Execution Strategy: UNGROUPED_EXECUTION
ScanFilterProject[table = hive:edw:ods_cux_sino_app_ftb1, grouped = false, filterPredicate = true, dynamicFilter = {df_557 -> "order_source_line_id"}]
Layout: [order_source_line_id:bigint, item_id:bigint, $hashvalue_50:bigint]
Estimates: {rows: 0 (0B), cpu: 0, memory: 0B, network: 0B}/{rows: 0 (0B), cpu: 0, memory: 0B, network: 0B}/{rows: 0 (0B), cpu: 0, memory: 0B, network: 0B}
$hashvalue_50 := combine_hash(bigint '0', COALESCE("$operator$hash_code"("order_source_line_id"), 0))
item_id := item_id:bigint:REGULAR
order_source_line_id := order_source_line_id:bigint:REGULAR

Fragment 3 [HASH]
Output layout: [order_line_id, ship_from_org_id, $hashvalue_58]
Output partitioning: HASH [order_line_id][$hashvalue_58]
Stage Execution Strategy: UNGROUPED_EXECUTION
Project[]
│ Layout: [order_line_id:bigint, ship_from_org_id:bigint, $hashvalue_58:bigint]
│ Estimates: {rows: 0 (0B), cpu: 0, memory: 0B, network: 0B}
│ $hashvalue_58 := combine_hash(bigint '0', COALESCE("$operator$hash_code"("order_line_id"), 0))
└─ InnerJoin[("order_id" = "order_id_11")][$hashvalue_53, $hashvalue_55]
│ Layout: [order_line_id:bigint, ship_from_org_id:bigint]
│ Estimates: {rows: 0 (0B), cpu: 0, memory: 0B, network: 0B}
│ Distribution: PARTITIONED
│ dynamicFilterAssignments = {order_id_11 -> df_559}
├─ RemoteSource[4]
│ Layout: [order_id:bigint, order_line_id:bigint, $hashvalue_53:bigint]
└─ LocalExchange[HASH][$hashvalue_55] ("order_id_11")
│ Layout: [order_id_11:bigint, ship_from_org_id:bigint, $hashvalue_55:bigint]
│ Estimates: {rows: 0 (0B), cpu: 0, memory: 0B, network: 0B}
└─ RemoteSource[5]
Layout: [order_id_11:bigint, ship_from_org_id:bigint, $hashvalue_56:bigint]

Fragment 4 [SOURCE]
Output layout: [order_id, order_line_id, $hashvalue_54]
Output partitioning: HASH [order_id][$hashvalue_54]
Stage Execution Strategy: UNGROUPED_EXECUTION
ScanFilterProject[table = hive:edw:dwd_erp_sale_order_line, grouped = false, filterPredicate = true, dynamicFilter = {df_559 -> "order_id"}]
Layout: [order_id:bigint, order_line_id:bigint, $hashvalue_54:bigint]
Estimates: {rows: 0 (0B), cpu: 0, memory: 0B, network: 0B}/{rows: 0 (0B), cpu: 0, memory: 0B, network: 0B}/{rows: 0 (0B), cpu: 0, memory: 0B, network: 0B}
$hashvalue_54 := combine_hash(bigint '0', COALESCE("$operator$hash_code"("order_id"), 0))
order_line_id := order_line_id:bigint:REGULAR
order_id := order_id:bigint:REGULAR

Fragment 5 [SOURCE]
Output layout: [order_id_11, ship_from_org_id, $hashvalue_57]
Output partitioning: HASH [order_id_11][$hashvalue_57]
Stage Execution Strategy: UNGROUPED_EXECUTION
ScanProject[table = hive:edw:dwd_erp_sale_order_header, grouped = false]
Layout: [order_id_11:bigint, ship_from_org_id:bigint, $hashvalue_57:bigint]
Estimates: {rows: 0 (0B), cpu: 0, memory: 0B, network: 0B}/{rows: 0 (0B), cpu: 0, memory: 0B, network: 0B}
$hashvalue_57 := combine_hash(bigint '0', COALESCE("$operator$hash_code"("order_id_11"), 0))
ship_from_org_id := ship_from_org_id:bigint:REGULAR
order_id_11 := order_id:bigint:REGULAR

Fragment 6 [SOURCE]
Output layout: [expr_43, item_id_30, $hashvalue_62]
Output partitioning: BROADCAST []
Stage Execution Strategy: UNGROUPED_EXECUTION
Project[]
│ Layout: [expr_43:bigint, item_id_30:bigint, $hashvalue_62:bigint]
│ Estimates: {rows: 0 (0B), cpu: 0, memory: 0B, network: 0B}
│ $hashvalue_62 := combine_hash(combine_hash(bigint '0', COALESCE("$operator$hash_code"("item_id_30"), 0)), COALESCE("$operator$hash_code"("expr_43"), 0))
└─ ScanProject[table = hive:edw:dim_materiel, grouped = false]
Layout: [expr_43:bigint, item_id_30:bigint]
Estimates: {rows: 0 (0B), cpu: 0, memory: 0B, network: 0B}/{rows: 0 (0B), cpu: 0, memory: 0B, network: 0B}
expr_43 := CAST("organization_id" AS bigint)
organization_id := organization_id:int:REGULAR
item_id_30 := item_id:bigint:REGULAR
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
 Fragment 0 [SINGLE]                                                                                                                                                             
Output layout: [count]
Output partitioning: SINGLE []
Stage Execution Strategy: UNGROUPED_EXECUTION
Output[rc]
│ Layout: [count:bigint]
│ Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}
│ rc := count
└─ Aggregate(FINAL)
│ Layout: [count:bigint]
│ Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}
│ count := count("count_49")
└─ LocalExchange[SINGLE] ()
│ Layout: [count_49:bigint]
│ Estimates: {rows: ? (?), cpu: ?, memory: ?, network: ?}
└─ RemoteSource[1]
Layout: [count_49:bigint]

Fragment 1 [HASH]
Output layout: [count_49]
Output partitioning: SINGLE []
Stage Execution Strategy: UNGROUPED_EXECUTION
Aggregate(PARTIAL)
│ Layout: [count_49:bigint]
│ count_49 := count("item_id")
└─ InnerJoin[("order_source_line_id" = "order_line_id") AND ("item_id" = "item_id_30")][$hashvalue, $hashvalue_51]
│ Layout: [item_id:bigint]
│ Estimates: {rows: 0 (0B), cpu: 0, memory: 0B, network: 0B}
│ Distribution: PARTITIONED
│ dynamicFilterAssignments = {order_line_id -> df_576, item_id_30 -> df_577}
├─ RemoteSource[2]
│ Layout: [order_source_line_id:bigint, item_id:bigint, $hashvalue:bigint]
└─ LocalExchange[HASH][$hashvalue_51] ("order_line_id", "item_id_30")
│ Layout: [order_line_id:bigint, item_id_30:bigint, $hashvalue_51:bigint]
│ Estimates: {rows: 0 (0B), cpu: 0, memory: 0B, network: 0B}
└─ RemoteSource[3]
Layout: [order_line_id:bigint, item_id_30:bigint, $hashvalue_52:bigint]

Fragment 2 [SOURCE]
Output layout: [order_source_line_id, item_id, $hashvalue_50]
Output partitioning: HASH [order_source_line_id, item_id][$hashvalue_50]
Stage Execution Strategy: UNGROUPED_EXECUTION
ScanFilterProject[table = hive:edw:ods_cux_sino_app_ftb1, grouped = false, filterPredicate = true, dynamicFilter = {df_576 -> "order_source_line_id", df_577 -> "item_id"}]
Layout: [order_source_line_id:bigint, item_id:bigint, $hashvalue_50:bigint]
Estimates: {rows: 0 (0B), cpu: 0, memory: 0B, network: 0B}/{rows: 0 (0B), cpu: 0, memory: 0B, network: 0B}/{rows: 0 (0B), cpu: 0, memory: 0B, network: 0B}
$hashvalue_50 := combine_hash(combine_hash(bigint '0', COALESCE("$operator$hash_code"("order_source_line_id"), 0)), COALESCE("$operator$hash_code"("item_id"), 0))
item_id := item_id:bigint:REGULAR
order_source_line_id := order_source_line_id:bigint:REGULAR

Fragment 3 [HASH]
Output layout: [order_line_id, item_id_30, $hashvalue_63]
Output partitioning: HASH [order_line_id, item_id_30][$hashvalue_63]
Stage Execution Strategy: UNGROUPED_EXECUTION
Project[]
│ Layout: [order_line_id:bigint, item_id_30:bigint, $hashvalue_63:bigint]
│ Estimates: {rows: 0 (0B), cpu: 0, memory: 0B, network: 0B}
│ $hashvalue_63 := combine_hash(combine_hash(bigint '0', COALESCE("$operator$hash_code"("order_line_id"), 0)), COALESCE("$operator$hash_code"("item_id_30"), 0))
└─ InnerJoin[("order_id" = "order_id_11")][$hashvalue_53, $hashvalue_55]
│ Layout: [order_line_id:bigint, item_id_30:bigint]
│ Estimates: {rows: 0 (0B), cpu: 0, memory: 0B, network: 0B}
│ Distribution: PARTITIONED
│ dynamicFilterAssignments = {order_id_11 -> df_579}
├─ RemoteSource[4]
│ Layout: [order_id:bigint, order_line_id:bigint, $hashvalue_53:bigint]
└─ LocalExchange[HASH][$hashvalue_55] ("order_id_11")
│ Layout: [order_id_11:bigint, item_id_30:bigint, $hashvalue_55:bigint]
│ Estimates: {rows: 0 (0B), cpu: 0, memory: 0B, network: 0B}
└─ RemoteSource[5]
Layout: [order_id_11:bigint, item_id_30:bigint, $hashvalue_56:bigint]

Fragment 4 [SOURCE]
Output layout: [order_id, order_line_id, $hashvalue_54]
Output partitioning: HASH [order_id][$hashvalue_54]
Stage Execution Strategy: UNGROUPED_EXECUTION
ScanFilterProject[table = hive:edw:dwd_erp_sale_order_line, grouped = false, filterPredicate = true, dynamicFilter = {df_579 -> "order_id"}]
Layout: [order_id:bigint, order_line_id:bigint, $hashvalue_54:bigint]
Estimates: {rows: 0 (0B), cpu: 0, memory: 0B, network: 0B}/{rows: 0 (0B), cpu: 0, memory: 0B, network: 0B}/{rows: 0 (0B), cpu: 0, memory: 0B, network: 0B}
$hashvalue_54 := combine_hash(bigint '0', COALESCE("$operator$hash_code"("order_id"), 0))
order_line_id := order_line_id:bigint:REGULAR
order_id := order_id:bigint:REGULAR

Fragment 5 [HASH]
Output layout: [order_id_11, item_id_30, $hashvalue_62]
Output partitioning: HASH [order_id_11][$hashvalue_62]
Stage Execution Strategy: UNGROUPED_EXECUTION
Project[]
│ Layout: [order_id_11:bigint, item_id_30:bigint, $hashvalue_62:bigint]
│ Estimates: {rows: 0 (0B), cpu: 0, memory: 0B, network: 0B}
│ $hashvalue_62 := combine_hash(bigint '0', COALESCE("$operator$hash_code"("order_id_11"), 0))
└─ InnerJoin[("ship_from_org_id" = "expr_43")][$hashvalue_57, $hashvalue_59]
│ Layout: [order_id_11:bigint, item_id_30:bigint]
│ Estimates: {rows: 0 (0B), cpu: 0, memory: 0B, network: 0B}
│ Distribution: PARTITIONED
│ dynamicFilterAssignments = {expr_43 -> df_581}
├─ RemoteSource[6]
│ Layout: [order_id_11:bigint, ship_from_org_id:bigint, $hashvalue_57:bigint]
└─ LocalExchange[HASH][$hashvalue_59] ("expr_43")
│ Layout: [expr_43:bigint, item_id_30:bigint, $hashvalue_59:bigint]
│ Estimates: {rows: 0 (0B), cpu: 0, memory: 0B, network: 0B}
└─ RemoteSource[7]
Layout: [expr_43:bigint, item_id_30:bigint, $hashvalue_60:bigint]

Fragment 6 [SOURCE]
Output layout: [order_id_11, ship_from_org_id, $hashvalue_58]
Output partitioning: HASH [ship_from_org_id][$hashvalue_58]
Stage Execution Strategy: UNGROUPED_EXECUTION
ScanFilterProject[table = hive:edw:dwd_erp_sale_order_header, grouped = false, filterPredicate = true, dynamicFilter = {df_581 -> "ship_from_org_id"}]
Layout: [order_id_11:bigint, ship_from_org_id:bigint, $hashvalue_58:bigint]
Estimates: {rows: 0 (0B), cpu: 0, memory: 0B, network: 0B}/{rows: 0 (0B), cpu: 0, memory: 0B, network: 0B}/{rows: 0 (0B), cpu: 0, memory: 0B, network: 0B}
$hashvalue_58 := combine_hash(bigint '0', COALESCE("$operator$hash_code"("ship_from_org_id"), 0))
ship_from_org_id := ship_from_org_id:bigint:REGULAR
order_id_11 := order_id:bigint:REGULAR

Fragment 7 [SOURCE]
Output layout: [expr_43, item_id_30, $hashvalue_61]
Output partitioning: HASH [expr_43][$hashvalue_61]
Stage Execution Strategy: UNGROUPED_EXECUTION
Project[]
│ Layout: [expr_43:bigint, item_id_30:bigint, $hashvalue_61:bigint]
│ Estimates: {rows: 0 (0B), cpu: 0, memory: 0B, network: 0B}
│ $hashvalue_61 := combine_hash(bigint '0', COALESCE("$operator$hash_code"("expr_43"), 0))
└─ ScanProject[table = hive:edw:dim_materiel, grouped = false]
Layout: [expr_43:bigint, item_id_30:bigint]
Estimates: {rows: 0 (0B), cpu: 0, memory: 0B, network: 0B}/{rows: 0 (0B), cpu: 0, memory: 0B, network: 0B}
expr_43 := CAST("organization_id" AS bigint)
organization_id := organization_id:int:REGULAR
item_id_30 := item_id:bigint:REGULAR



select count(mib.item_id) rc
from edw.ods_cux_sino_app_ftb1 cf
inner join edw.dwd_erp_sale_order_line ola
on cf.order_source_line_id = ola.order_line_id
inner join edw.dwd_erp_sale_order_header oha
on ola.order_id = oha.order_id
inner join edw.dim_materiel mib
on cf.item_id = mib.item_id and oha.ship_from_org_id=mib.organization_id;

Comments

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×