Executor Nodes
和impala的架构类似,一个sql语句从client输入给Doris,会先经过fe
(frontend)解析并生成若干fragment
,
再分配并传递给be
(backend)执行.
查看执行计划
这里可以使用explain
来查看一个查询的具体执行计划是什么样的.
explain select sum(table1.pv) from table1 join table2 on table1.siteid=table2.siteid group by table1.siteid;
输出为:
+-------------------------------------------------------------------------+
| Explain String |
+-------------------------------------------------------------------------+
| PLAN FRAGMENT 0 |
| OUTPUT EXPRS:<slot 4> sum(`table1`.`pv`) |
| PARTITION: HASH_PARTITIONED: `default_cluster:test`.`table1`.`siteid` |
| |
| RESULT SINK |
| |
| 3:AGGREGATE (update finalize) |
| | output: sum(`table1`.`pv`) |
| | group by: `table1`.`siteid` |
| | cardinality=-1 |
| | |
| 2:HASH JOIN |
| | join op: INNER JOIN (BROADCAST) |
| | hash predicates: |
| | colocate: false, reason: Tables are not in the same group |
| | equal join conjunct: `table1`.`siteid` = `table2`.`siteid` |
| | runtime filters: RF000[in] <- `table2`.`siteid` |
| | cardinality=0 |
| | |
| |----4:EXCHANGE |
| | |
| 0:OlapScanNode |
| TABLE: table1 |
| PREAGGREGATION: ON |
| runtime filters: RF000[in] -> `table1`.`siteid` |
| partitions=0/1 |
| rollup: null |
| tabletRatio=0/0 |
| tabletList= |
| cardinality=0 |
| avgRowSize=12.0 |
| numNodes=1 |
| |
| PLAN FRAGMENT 1 |
| OUTPUT EXPRS: |
| PARTITION: RANDOM |
| |
| STREAM DATA SINK |
| EXCHANGE ID: 04 |
| UNPARTITIONED |
| |
| 1:OlapScanNode |
| TABLE: table2 |
| PREAGGREGATION: OFF. Reason: null |
| partitions=0/3 |
| rollup: null |
| tabletRatio=0/0 |
| tabletList= |
| cardinality=0 |
| avgRowSize=4.0 |
| numNodes=1 |
+-------------------------------------------------------------------------+
用explain graph
则能看到图形化的树形执行计划.
explain graph select sum(table1.pv) from table1 join table2 on table1.siteid=table2.siteid group by table1.siteid;
+------------------------------------------------------------------------------------------------------------+
| Explain String |
+------------------------------------------------------------------------------------------------------------+
| ┌───────────────┐ |
| │[3: ResultSink]│ |
| │[Fragment: 0] │ |
| │RESULT SINK │ |
| └───────────────┘ |
| │ |
| │ |
| ┌────────────────────────────────┐ |
| │[3: AGGREGATE (update finalize)]│ |
| │[Fragment: 0] │ |
| └────────────────────────────────┘ |
| │ |
| │ |
| ┌───────────────────────────────┐ |
| │[2: HASH JOIN] │ |
| │[Fragment: 0] │ |
| │join op: INNER JOIN (BROADCAST)│ |
| └───────────────────────────────┘ |
| ┌──────────┴─────────┐ |
| │ │ |
| ┌─────────────────┐ ┌─────────────┐ |
| │[0: OlapScanNode]│ │[4: EXCHANGE]│ |
| │[Fragment: 0] │ │[Fragment: 0]│ |
| │TABLE: table1 │ └─────────────┘ |
| └─────────────────┘ │ |
| │ |
| ┌───────────────────┐ |
| │[4: DataStreamSink]│ |
| │[Fragment: 1] │ |
| │STREAM DATA SINK │ |
| │ EXCHANGE ID: 04 │ |
| │ UNPARTITIONED │ |
| └───────────────────┘ |
| │ |
| │ |
| ┌─────────────────┐ |
| │[1: OlapScanNode]│ |
| │[Fragment: 1] │ |
| │TABLE: table2 │ |
| └─────────────────┘ |
+------------------------------------------------------------------------------------------------------------+
执行计划树由exec node
(算子)组成,以上面的查询为例,数据从树的叶子节点往上一直到根节点,经过一系列操作最终得到查询结果.
接下来我们具体分析这个计划树中的各个节点分别起什么作用.
首先,这个查询被规划成了2个fragment
,分别为\(f_{0}\)和\(f_{1}\).
其中\(f_{1}\)为\(f_{0}\)的子树,它最底层的OlapScanNode
从存储层读取table2
的数据,然后经过DataStreamSinkNode
,将数据传递给\(f_{0}\).
在\(f_{0}\)中\(f_{1}\)所代表的子树表示为一个ExchangeNode
,从\(f_{1}\)接受数据.
并且它自身也通过一个OlapScanNode
读取了table1
的数据. table1
和table2
在HashJoinNode
被join算子合并。
然后向上传递给AggregateNode
(sum函数的聚合算子),最终结果通过ResultSinkNode
返回给fe
.
我们可以发现查询规划实际上通过从执行计划树拆分出若干子树的方式,实现Shared Nothing
的分布式执行.
未完待续