摘要:导读在《Spark SQL 解析层优化》、《Spark SQL 分析层优化》以及《Spark SQL 表达式优化》三次分享中,分别介绍了从 SQL 解析、 SQL 分析以及 SQL 表达式三个角度, 通过优化框架、提炼代码、优化数据结构等方法来进行优化。以上优
导读在《Spark SQL 解析层优化》、《Spark SQL 分析层优化》以及《Spark SQL 表达式优化》三次分享中,分别介绍了从 SQL 解析、 SQL 分析以及 SQL 表达式三个角度, 通过优化框架、提炼代码、优化数据结构等方法来进行优化。以上优化更多的是从代码设计方向的优化,是广义上的 SQL 优化。业内普遍讨论的 SQL 优化则是 SQL 优化器和执行计划的优化。本文将剖析 Spark SQL 优化器原理,并在此基础上介绍三种较常见的优化思路。这些优化器的执行计划优化范式是可以复用的。
主要内容包括以下几大部分:
1. Spark SQL 优化器原理
2. 下推优化思路及代码实现
3. 算子消除与算子合并
4. 表达式消除与表达式替换
5. 总结
分享嘉宾|耿嘉安 浙江数新网络有限公司 高级架构专家
编辑整理|张龙春
内容校对|李瑶
出品社区|DataFun
01
Spark SQL 优化器原理
Spark SQL 优化器的优化是建立在整个 Spark SQL 原理基础之上。所以我们首先来回顾一下 Spark SQL 的执行流程,如下图所示。
图中的 Optimizer,就是 Spark SQL 优化器。Optimizer 是通用名字,实际上有不同的实现。优化器与之前的分析层类似,需要用到元数据信息协助它做优化。
无论是分析器还是优化器,都是规则执行器的具体实现。优化器中的优化规则与分析器规则有很大的不同,分析器里主要是元数据的绑定从而定位数据的信息;而优化器在分析器基础上,借助分析结果做进一步优化。在 Spark 3.0.0 之前,Spark 优化器是 Spark Optimizer 组件。Spark 3.0.0 之后引入了 AQE(Adaptive Query Execution),它实际上也是规则执行器的实现,复用了很多原来的规则,并额外增加了一些新的优化规则,如小分区合并、大分区拆分、数据倾斜的处理等。但总体上从技术角度而言,代码设计同之前的优化器都是类似的。下面主要围绕经典的优化器进行介绍。
Spark Optimizer 作为规则执行器,其中有一批批规则,每一批规则里有一个或多个优化规则。优化规则最后会应用到经分析器输出的分析后的逻辑计划。经过一批批的优化规则处理后,最终输出优化后逻辑计划树。
下面以 SimplifyCasts 优化规则为例,介绍优化规则的基本用处。并以它为例,与分析器的分析规则进行比较和区分。
以 SimplifyCasts 第一个分支为例,会匹配到 Cast 表达式,该表达式有子表达式,如果发现当前 Cast 的目标数据类型和子表达式数据类型相同,例如把 int 类型 cast 成 int 类型,这一操作是没有意义的,因此可以将其消除,只需要输出子表达式即可。如果不消除,在执行时,该 Cast 会额外耗费 CPU 时钟浪费计算资源。除此之外,别的分支可能会触发别的条件,并对 Cast 进行消除。
通过这一示例可以看到,优化器的主要作用就是优化性能、节省资源。
接下来将介绍三个比较经典的优化范式。之所以称其为范式,是因为它们可以作为模板应用于很多不同的场景中。
02
下推优化思路及代码实现
首先介绍的是下推优化。Spark 在下推优化方面有很多优化规则,如PushProjectionThroughUnion、PushProjectionThroughLimit,PushDownPredicates(谓词下推)。除此之外,还有 limit 下推,包括 LimitPushDown、LimitPushDownThroughWindow 等。它们都是基于同样的优化思路。下面以 PushProjectionThroughUnion 为例,来具体讲解其实现。
Union 操作最后需要进行 Shuffle。如果 Union 的上游 Projection 输出字段非常多,而 Shuffle 之后又进行字段的选择,比如 Shuffle 前有 10 个字段, Shuffle 后实际上只用了 3 个字段,这种情况下,另外 7 个字段仍然需要占用磁盘 IO、网络 IO 等,所以非常低效。如果能够把 Projection 下推到 Union 之前,就可以减少 Shuffle 的数据量,节省存储和计算等方面的资源。其它下推的优化规则也都是类似的思想。
下面介绍 PushProjectionThroughUnion 代码实现,它实际上把 Projection 算子从 Union 的下游推到上游,通过把 Projection 计算提前执行的方式减少数据量。
比如某个 Union,会将三个子逻辑计划进行输出,它们的输出要有统一的 Schema。在 Spark 里 Union 的上游,它们的输出范式或者 Schema 必须统一。比如 A 子计划输出了 int 类型名字为 a 的字段,然后 B 又输出了布尔类型名字为的 a 字段,这种情况在 Spark 里面会严格校验,分析层 Parse 时就会提前把错误抛出,就不可能到优化这一步。如果到了优化这一步,就说明三个子计划输出的 Schema 是一致的。经过 Shuffle 后,最后用 Project 算子进行投影。
在进行 PushProjectionThroughUnion 优化后,Projection 被直接放在 Union 之前,通过投影大量减少 Shuffle 数据量,进而提升性能。
PushProjectionThroughUnion 优化规则实际上是重新构建了一个 Project 算子,把它直接下推到 Union 上游和 Child 中间。
03
算子消除与算子合并
第二个优化的例子是算子消除或者算子合并。SimplifyCasts 对 Cast 的消除,是表达式层面的消除,而这里的算子消除是对物理节点或者物理计划的消除,也就是 Logic plan 层面的消除,以及 Logic plan 层面的合并。在 Spark 优化器里很多规则用到了算子消除的思路,包括 EliminateOffsets、EliminateLimits、EliminateOuterJoin、RemoveRedundantAggregates、RemoveRedundantAliases、CollapseRepartition、CollapseProject、CollapseWindow、CombineUnions 等。其中 Eliminateoffsets 是对 Spark SQL 的 offset 算子进行消除。聚合实际上可以合并。再比如一个属性反复执行别名,中间的别名没有必要,只需最后一个才有意义。
下面以消除 offset 算子为例,来看一下具体的实现。
在分页应用中,offset 起到类似 skip 的作用,或者说是跳过一些行的作用。有三种可能:最左边的场景是 offset 值为 0,实际上一行也不跳,这时 offset 可以消除掉。中间的场景,offset 的值大于它的子表达式的最大输出,比如物理计划只输出了 3 行数据,但 offset 要 10 行。这种情况实际上可以把整个执行计划过滤掉。另外针对最右侧的情况,会出现多个连续的 offset 算子的场景,可能先要跳过 V2 行,再去跳过 V1 行,实际上可以把它们合并为一个 offset,通过合并来减少物理计算。
经过上述优化,三种不同逻辑计划实际上会被优化成上图的三种结果。第一个彻底消除了 offset;第二个实际也消除了 offset,上游的 logical plan,或者子计划全部被消除;第三种其实是算子的合并。
Spark 优化器里面类似上述 offset 的优化规则还有很多,如 EliminateOffsets、EliminateLimits、RemoveRedundantAggregates、RemoveRedundantAliases、CollapseRepartition、CollapseProject、CollapseWindow、CombineUnions 等。每一个不同的优化器的优化规则都会有注释,可以帮助理解优化的思路。
04
表达式消除与表达式替换
1. 表达式消除
前面介绍的 SimplifyCasts 本身是表达式消除的一个例子。除此之外,还有 EliminateWindowPartitions,可以把 Window 窗口中间的 Partition 进行消除;EliminateAggregateFilter 可以聚合 filter 从句;又比如 OptimizeRepartition,既涉及算子消除,又涉及表达式消除;OptimizeRand 是跟 SimplifyCasts 非常类似的一种算子;此外针对最终结果为布尔型的表达式,有 BooleanSimplification 规则;还有关于列的裁剪等其它一些跟表达式消除相关的优化。
以消除聚合 filter 为例进行说明,以上图所示为例。首先这并不是一个 ANSI SQL 标准语法,而是主要数据库厂商以及 Spark SQL 支持的一种语法。这个语法提供一次对数据的遍历过程中,针对不同 filter 条件产生结果的能力,具有减少数据计算量、减少数据 scan 的优化能力。
假设有一张 Student 表,里面存了学生信息。以 count 函数为例,如果想统计班里面所有女生总人数。需要写 count *以及 where 性别等于女。如果要计算男生的总人数,可能又需要执行 count *以及 where 条件性别等于男。如果要同时获取两个结果,就要执行两条 SQL。因为有了 filter 从句,就可以在一条 SQL 里同时得到两个结果。Spark SQL 在执行聚合的时候,肯定需要 scan 过程,如果上述语句分别执行两次,就会对数据进行两次 scan。如果用上述语法,只需要 scan 一遍就可以得到业务上需要的两个结果。
有时候,filter 从句可能是没有意义的,如上图中间的两个 SQL,第一个 filter 的 where 等于 true,第二条 filter 的 where 等于 false。这是两个极端的例子,针对这两种情况可以对整个执行计划进行优化。为 false 的场景可以把filter整个从句消除掉,因为相当于没有任何数据,直接 select 0 了,甚至 from 都可以不要。
实际代码中的物理计划操作,要用符合 Spark SQL 的设计方式去实现。以刚才的分支为例,当 filter 永远为 true 的时候,直接把聚合表达式的 filter 设置为 None。在 Spark 里面 None 就会把该表达式给消除掉。
再看 false 的情况。最后实际上直接生成了结果为 0 的一行数据。通过创建了一个自变量 0,把之前的整个聚合表达式给替换掉了。
Spark 里聚合函数有两种,即声明式聚合和推断型聚合。这两种不同接口的实现稍微有些区别,但功能是一样的。
2. 表达式替换
Optimizer 中有很多优化规则都采用了表达式替换的思路——即在一些情况下可以用更低廉的表达式替换昂贵的表达式。这类优化规则包括:OptimizeWindowFunctions、OptimizeRepartition、NullPropagation、ConstantPropagation、OptimizeJoinCondition 等。
在 Spark SQL 里有 hint 的机制,通过 hint 的语法,如/*+Repartition*/可以提示 Spark SQL 在执行时对于分区进行 Repartition 操作。Spark 也提供了不同的 API 去执行类似的 hint 的机制。用户在写比如 partition 3 时,可能特别主观,跟数据的真正分区情况可能不同,这样就会有优化的空间。
如上图,比如优化 repartition,它最主要的功能就是进行分区。在重新优化时,如果所有的分区表达式本身可以折叠时,再进行 repartition 是没有意义的。效果跟把分区数量设置成1是一样的。这两个 SQL 是等价的,所以这个优化比较简单,直接子表达式替换即可。
05
总结
本文主要介绍了 Spark SQL 优化器的原理,然后分别介绍了三种优化的范式,并分别举了几个例子对代码实现进行了介绍。
优化器规则,一方面通过表达式的消除、替换,降低计算成本;另一方面通过逻辑计划的消除、合并或替换来降低计算成本。实际使用中需要综合应用。
以上就是本次分享的内容,谢谢大家。
来源:DataFunTalk