不管是传统数据库或者基于sql的分布式大数据分析工具,基本原理都是把一个sql转换成sql语法树(AST),通过对语法树的分析转换成执行计划。传统数据库会根据执行计划通过执行引擎并返回结果;而大数据sql分析工具,由于针对更大数据量而生,为了更好的扩展性、容错性和高可用,会把执行计划分成逻辑执行计划和物理执行计划,并且根据查询sql的特点切分逻辑计划,这样可以把分块的逻辑计划分配到更具扩展性的并行节点,最后根据逻辑执行计划转成物理执行计划进行查询。
本文档以当前流行的分布式大数据查询引擎Presto为切入点,分析一个query语句怎么生成为一个分段的逻辑计划。下图是当前流行大数据sql查询引擎(包括hive/sparksql),生成逻辑计划的过程:
从图中可以看到,当用户通过presto-cli或者jdbc接口提交了一个query请求到Presto的Coordinator节点,首先会被解析器(Parser)转换成一颗sql语法树,这一步只是通过预定的分词规则把一个词组结构(List)转换成了树结构(Tree),但是这时候不能理解这颗树代表的含义是什么?所以被称作Unresovled AST,这时候需要再通过分析器(Analyzer)来绑定元数据(metaData)。
数据结构和编译原理知识知道,Tree这种结构或者说AST这种结构有一个非常重要的特性就是可以等价变换,这个特性在其做分析元数据及优化查询时非常有用。在通过等价变换成Unresovled AST后,称为UnOptimized AST,这时候通过这棵AST可以基本分析出提交了一个什么样的语句,其中关联了什么表,这些表的基本结构是怎样的,其中又使用了什么函数等等。绑定元数据的AST后还需针对具体的操作(主要是join)节点进行优化,使用优化器(Optimizer)进行优化并最终转换成Optimized AST。最后把优化后AST进行逻辑分段,变成可供分布式分析的分段逻辑执行计划。
下面以Presto为例具体实际分析怎么实施。
Parser
Parser的过程实际是一个把sql语句根据分词规则及语法规则再组装成基本AST的过程。当前大部分都是使用的Antlr4工具。从源码的角度看:
presto-main模块的execution包中SqlQueryManager的createQuery发起了Query操作,
Antlr4工具具体分为lexer和parser,lexer叫做词法分析器,而parser叫做语法分析器。举个小例子,以下面这个定义chars sp =100来说,会先根据定义好的tokens进行分词,再语法分析成AST:
而presto它的lexer是在presto-parser中定义,其中分词器:
由于Antlr4是业内使用最多也是最成熟的方案,所以资料也非常多,这里就不赘述了,工具更多内容可参考:https://legacy.gitbook.com/book/dohkoos/antlr4-short-course/details
https://github.com/antlr/antlr4
Analyzer
分析器Analyzer也叫做语义分析器(Semantic Analysis),主要是用于绑定元数据。SqlQuery的数据也即是DQL的数据通过SqlQueryExecution执行器被拉起。真正实现是doAnalyzeQuery方法中。
语义分析可以看作包括了语句(statement)分析和表达式(expression)分析。
其分析的实现是以典型的visitor模式使用元数据和会话(sesssion,presto在每个session中有自己的Catalog和Schema)信息遍历Unresolved AST来实现的。
Scope是其递归遍历时列描述符集:
对查询的select和showXXX语句返回了包含渠道的每一列,每一个filed代表一列。而insert /delete/create table as select返回只有一列表示操作的行数。
针对不同的statement将使用不用的statement实现类进行处理,在analyzer后将得到一个Analysis类的实例。其中除了statement为root的AST以外还有为了构建执行计划树所添加的信息。
LogicalPlanner
在AST绑定相应元数据后,将把AST转换成逻辑计划树。
public PlanNode planStatement(Analysis analysis, Statement statement)
{
if (statement instanceof CreateTableAsSelect && analysis.isCreateTableAsSelectNoOp()) {
checkState(analysis.getCreateTableDestination().isPresent(), "Table destination is missing");
Symbol symbol = symbolAllocator.newSymbol("rows", BIGINT);
PlanNode source = new ValuesNode(idAllocator.getNextId(), ImmutableList.of(symbol), ImmutableList.of(ImmutableList.of(new LongLiteral("0"))));
return new OutputNode(idAllocator.getNextId(), source, ImmutableList.of("rows"), ImmutableList.of(symbol));
}
//分成两步:1.planStatementWithoutOutput 根据不同sql语句生成不同relationPlan
// 2.createOutputPlan 输出得到LogicalPlan
return createOutputPlan(planStatementWithoutOutput(analysis, statement), analysis);
}
//依据statement划分成creat、insert、delete、query、explain五类logicalPlan,并执行不同的plan生成函数
//其中,create、insert、delete的逻辑计划可直接生成,create/insert会生成TableWriterPlan,delete生成Plan,
//而query将由依旧是根据visitor模式 RelationPlanner 生成RelationPlan后,在visitQuery中将使用QueryPlanner使用visitor模式来生成QueryPlan
private RelationPlan planStatementWithoutOutput(Analysis analysis, Statement statement)
{
if (statement instanceof CreateTableAsSelect) {
if (analysis.isCreateTableAsSelectNoOp()) {
throw new PrestoException(NOT_SUPPORTED, "CREATE TABLE IF NOT EXISTS is not supported in this context " + statement.getClass().getSimpleName());
}
return createTableCreationPlan(analysis, ((CreateTableAsSelect) statement).getQuery());
}
else if (statement instanceof Insert) {
checkState(analysis.getInsert().isPresent(), "Insert handle is missing");
return createInsertPlan(analysis, (Insert) statement);
}
else if (statement instanceof Delete) {
return createDeletePlan(analysis, (Delete) statement);
}
else if (statement instanceof Query) {
return createRelationPlan(analysis, (Query) statement);
}
else if (statement instanceof Explain && ((Explain) statement).isAnalyze()) {
return createExplainAnalyzePlan(analysis, (Explain) statement);
}
else {
throw new PrestoException(NOT_SUPPORTED, "Unsupported statement type " + statement.getClass().getSimpleName());
}
}
insert和createTableAsSelec语句t会通过LoggiclaPlanner的createTableWriteerPlan方法 生成CreateTableWriteerPlan:
TableCommitNode可以防止数据写入失败导致的中间状态,确保成功后再进行commit。QueryPlan是指insert/creat table as select后面生成的执行计划树。
同理,Delete会生成DeletePlan:
Relation类型SQL语句会生成QueryPlan,由LoggiclaPlanner委托RelationPlanner进行分析。
public Plan plan(Analysis analysis, Stage stage)
{
//生成逻辑计划树,返回的为planNode子类的实例
PlanNode root = planStatement(analysis, analysis.getStatement());
PlanSanityChecker.validateIntermediatePlan(root, session, metadata, sqlParser, symbolAllocator.getTypes());
//使用针对的优化器optimizers,在presto1.90前,planOptimizers被初始化为一个list,顺序执行基于ruler的优化器。
if (stage.ordinal() >= Stage.OPTIMIZED.ordinal()) {
for (PlanOptimizer optimizer : planOptimizers) {
root = optimizer.optimize(root, session, symbolAllocator.getTypes(), symbolAllocator, idAllocator);
requireNonNull(root, format("%s returned a null plan", optimizer.getClass().getName()));
}
}
if (stage.ordinal() >= Stage.OPTIMIZED_AND_VALIDATED.ordinal()) {
// make sure we produce a valid plan after optimizations run. This is mainly to catch programming errors
PlanSanityChecker.validateFinalPlan(root, session, metadata, sqlParser, symbolAllocator.getTypes());
}
Map<PlanNodeId, PlanNodeCost> planNodeCosts = costCalculator.calculateCostForPlan(session, symbolAllocator.getTypes(), root);
return new Plan(root, symbolAllocator.getTypes(), planNodeCosts);
}
而Query和QuerySpecification由RelationPlanner委托QueryPlanner来分析。
private RelationPlan createRelationPlan(Analysis analysis, Query query)
{
return new RelationPlanner(analysis, symbolAllocator, idAllocator, buildLambdaDeclarationToSymbolMap(analysis, symbolAllocator), metadata, session)
.process(query, null);
}
@Override
protected RelationPlan visitQuery(Query node, Void context)
{
return new QueryPlanner(analysis, symbolAllocator, idAllocator, lambdaDeclarationToSymbolMap, metadata, session)
.plan(node);
}
Optimizer
在sql的优化思路上最基本的分为基于规则和基于代价(rbo和cbo),基于规则是传统数据库积累的一套经验,指定一些规则,然后遍历逻辑执行树模式符合规则时则等价转换(AST转换)进行优化,比如谓词下推(Predicate Pushdown),常量累加(Constant Folding)等;而基于代价是计算所有执行路径的代价,并挑选代价最小的执行路径,这种思路当前针对分布式的执行引擎很流行但目前都做的都还不够好,大部分cbo都认为代价是以mem为主,但如何确定路径上代价就有很多思路。
更多讨论可参考:
- Orca: A Modular Query Optimizer Architecture for Big Data
- Optimization of Common Table Expressions in MPP Database Systems
- Multi Query Optimization
presto 0.190前只支持rbo,在0.190后也开始支持cbo优化器伪代码:
start:
- break up expression into single-assignment expression
- add each assignment to the memo in a separate equivalence class
- optimize(root class, unbounded cost, no physical reqs)
optimize(equivalence class, cost bound, requirements):
- initialize exploration queue (rule + top operator in equivalence class)
- find potential match candidates and add them to queue
- while queue is not empty
- enumerate bindings for each named argument (by iterating over all expressions
in each equivalence class that's part of the match)
- if binding + physical requirements can be handled by rule
- apply rule
- for each expression generated by rule
- add to memo
- if top function is physical
- determine cost bound for children
- for each input
- derive required physical properties & cost upper bound
- optimize corresponding equivalence class
with required properties and upper bound
- update max bound for remaining children
- find additional potential matches and enqueue
一个分布式引擎执行的快不快,很大程度就来自于其优化器,本文档暂不讨论更多,presto优化器可参考:
(https://github.com/prestodb/presto/wiki/New-Optimizer)
Plan Fragmenter
把逻辑执行计划分段的最重要目的就是能够以分片(splited)方式运输(shipped)和执行在分布式节点上。分布式sql引擎相比于传统数据库引擎最大的区别之一就是并发度理论上可以无限横向扩展,presto也不例外,presto切分的目的就是为了更好的分发到各个woker节点,但是sql执行的时候难免会被一些操作阻塞,比如join,aggregation,sort等,那么一个执行计划就在这些点切分(fragment)成多个子执行计划(SubPlan)。在相同的SubPlan(执行逻辑一样,数据split不通)中可以多个节点的task中并发执行。
下面我们还是以presto举例说明:
presto支持的阶段为Source、Fixed、Single和Coordinator_only。其中Source即是分片从数据源读数据;Fixed则是将读取的数据分散到分布式节点上进行处理,包括局部聚合、局部join及局部数据写入等;Single则是将所有结果进行汇总处理,并返回结果,只在单个节点上执行。Coordinator_onlye也是在单节点对insert和createtable的commitNode是这种类型。可以看到,不同subPlan间有明显层级关系,一般来说是SourceStage->FixedStage->SingleStage。
在presto中的划分是依据logicalPlan逻辑执行计划树的PlanNode来决定的。通过PlanFragMenter深度优先遍历逻辑执行树,使用visitor模式遍历到需要分段的节点则加入不同的subPlan。 Exchange PlanNode即是其presto分段点,表示不同Stage之间交换数据,也即是常说的shuffle,因为需要等待分布式节点的数据的传输。在分段成subPlan后Exchangge PlanNode会转成多个RemoteSourceNode节点。
public PlanNode visitExchange(ExchangeNode exchange, RewriteContext<FragmentProperties> context)
{
if (exchange.getScope() != REMOTE) {
return context.defaultRewrite(exchange, context.get());
}
PartitioningScheme partitioningScheme = exchange.getPartitioningScheme();
if (exchange.getType() == ExchangeNode.Type.GATHER) {
context.get().setSingleNodeDistribution();
}
else if (exchange.getType() == ExchangeNode.Type.REPARTITION) {
context.get().setDistribution(partitioningScheme.getPartitioning().getHandle());
}
ImmutableList.Builder<SubPlan> builder = ImmutableList.builder();
for (int sourceIndex = 0; sourceIndex < exchange.getSources().size(); sourceIndex++) {
FragmentProperties childProperties = new FragmentProperties(partitioningScheme.translateOutputLayout(exchange.getInputs().get(sourceIndex)));
builder.add(buildSubPlan(exchange.getSources().get(sourceIndex), childProperties, context));
}
List<SubPlan> children = builder.build();
context.get().addChildren(children);
List<PlanFragmentId> childrenIds = children.stream()
.map(SubPlan::getFragment)
.map(PlanFragment::getId)
.collect(toImmutableList());
return new RemoteSourceNode(exchange.getId(), childrenIds, exchange.getOutputSymbols());
}
后续
在生成分段的逻辑执行计划后,是不能直接放到执行引擎中执行的,因为这里还是抽象的概念,比如Aggregation还是抽象的,其代表的是相同id进行合并,而实现方法具体到引擎比如mr需要hash shuffle来实现。所以需要根据不同执行引擎(presto/spark/mr/tez等)生成对应的物理执行计划,虽然不同执行引擎各有差异,但大体逻辑还是1.由分段逻辑计划生成task执行图;2.以及task的执行图转换成基于Operator的最小执行单元执行图。与生成逻辑计划都在master节点不同,1.和2.一般都会在worker节点中生成并运算。在生成物理计划时还需考虑执行引擎本身的特性,来确定最终的物理计划。比较重要的有几点:1.如何确保数据划分(source和parition)均匀;2.stage内并发度怎么提高同时又有比较高的效率;3.如何做数据交换,保证传输效率高同时容灾又有保障等。更多有关分析,请关注下一篇分析:分布式sql引擎–生成物理计划分布式执行。
本文为从大数据到人工智能博主「bajiebajie2333」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://lrting.top/backend/11569/