关于 Spark AQE、CBO、RBO 的简单介绍

在 Apache Spark 中 AQE、CB0 和 RBO 是针对不同的问题和场景设计的

  • AQE 更加关注查询执行计划的动态调整
  • CB0 更加关注广播操作的优化策略
  • RBO 更加关注数据执行流的优化

Spark SQL 核心是 Catalyst
Catalyst 执行流程主要分4个阶段:
① 语句解析; ② 逻辑计划与优化; ③ 物理计划与优化 ;④ 代码生成

前 3 个阶段都由 Catalyst 负责,其中逻辑计划的优化采用 RBO 思路,物理计划的优化采用 CBO 思路

袋鼠云数栈基于CBO在Spark SQL优化上的探索


RBO 基于规则优化 (Rule Based Optimization)

通过一系列预定好的规则 (Rule) 对逻辑计划进行等价转换,以提高查询效率。

两个主要思路

  • 减少参与计算的数据量

  • 降低重复计算的代价

    常用的规则都基于经验指定,可覆盖大部分查询场景,且方便扩展,缺点是不够灵活。

    • 常量折叠 (ConstantFolding)

      • 把纯常量运算表达式预先转化,比如把1+2转化为3.0,消除不必要的重复计算
    • 谓词下推 (PushdownPredicate)

      • 最常见的用于减少参与计算的数据量的方法
        • 谓词, where条件,join on中的过滤条件
        • 将SQL中的谓词逻辑尽量提前执行,参与join的数据量减少,速度提高
    • 列裁剪 (ColumnPruning)

      • 扫表时,只筛选出符合后续逻辑计划的最小列集合,,省掉扫描全部列的资源
      • 如果使用的 Parquet,ORC 等列式存储格式持久化的,效率会更高

CBO 基于代价优化 (Cost Based Optimization)

CBO 优化主要在物理计划层,,原理是计算所有可能的物理计划代价, 并选出代价最小的。

  • 考虑了数据本身的特点(大小, 分布)
  • 考虑了操作算子的特点(中间结果集的分布及大小)及代价

物理执行计划是一个树状结构,其代价等于每个执行节点的代价总和

每个执行节点的代价,分为两个部分

  • 该执行节点对数据集的影响,即该节点输入数据集的大小与分布
    • 初始数据集,其大小与分布可直接通过统计得到
    • 中间节点输出数据集的大小与分布,可由输入数据集的信息和操作本身的特点推算
  • 该执行节点操作算子的代价,相对固定,可用规则来描述

CBO 基于代价的优化,主要有 Build侧选择,优化Join类型,优化多表Join顺序 等

CBO的缺点:

  • 数据统计信息普遍缺失, 统计信息的收集代价较高
  • 储存计算分离的架构, 使得收集到的统计信息可能不再准确
  • Spark部署在某个单一硬件架构上,Cost 很难被估计
  • Spark的UDF简单易用, 种类繁多, 但是对于CBO来说是黑盒, 无法估计 Cost

CBO是一种静态优化策略, 一旦执行计划交付运行, 就不能再对物理计划进行修改
在分布式系统中使用CBO是一个极其复杂的问题,,并且收集和维护一组准确和最新的统计数据是昂贵的


AQE 自适应查询执行 (Adaptive Query Execution)

AQE 是一种动态优化机制,可以在 Spark SQL 优化的过程中动态地调整执行计划。

在运行时,每当 Shuffle Map 阶段执行完毕,AQE 都会结合这个阶段的统计信息,基于既定的规则动态地调整、修正尚未执行的逻辑计划和物理计划,来完成对原始查询语句的运行时优化。

AQE 的统计信息与 CBO 不同, 不是关于表, 而是Shuffle Map阶段输出的中间文件。

AQE 在运行时获取统计信息, 在条件允许时, 优化决策会分别作用到逻辑计划和物理计划。

AQE 主要带来了 3 个方面的改进:

  • 自动分区合并

    • 在Shuffle过后, Reduce Task数据分布参差不齐, AQE将自动合并过小的数据分区
  • 数据倾斜

    • 如果某张表在过滤后, 尺寸小于广播变量阈值, 这张表参与的Join就会从Shuffle Sort Merge Join 降级为执行效率更高的Broadcast Hash Join
  • Join 策略调整

    • 结合配置项,,AQE自动拆分 Reduce 阶段过大的数据分区, 降低单个 Reduce Task 的工作负载。

总结

基于规则优化(Rule-Based Optimization,简称RBO)的优化器,这是一种经验式、启发式的优化思路,优化规则都已经预先定义好,只需要将SQL往这些规则上套就可以。简单的说,RBO就像是一个经验丰富的司机,基本套路全都知道。

然而世界上有一种东西叫做 不按套路来。与其说它不按套路来,倒不如说它本身并没有什么套路。最典型的莫过于复杂 Join 算子优化,对于这些 Join 来说,通常有两个选择要做:

  1. Join 应该选择哪种算法策略来执行?BroadcastJoin or ShuffleHashJoin or SortMergeJoin?不同的执行策略对系统的资源要求不同,执行效率也有天壤之别,同一个SQL,选择到合适的策略执行可能只需要几秒钟,而如果没有选择到合适的执行策略就可能会导致系统OOM
  2. 对于雪花模型或者星型模型来讲,多表Join应该选择什么样的顺序执行?不同的Join顺序意味着不同的执行效率,比如A join B join C,A、B表都很大,C表很小,那A join B很显然需要大量的系统资源来运算,执行时间必然不会短。而如果使用A join C join B的执行顺序,因为C表很小,所以A join C会很快得到结果,而且结果集会很小,再使用小的结果集 join B,性能显而易见会好于前一种方案。

大家想想,这有什么固定的优化规则么?并没有。说白了,你需要知道更多关于表的基础信息(表大小、表记录总条数等),再通过一定规则代价评估才能从中选择一条最优的执行计划。所以,CBO 意为基于代价优化策略,它需要计算所有可能执行计划的代价,并挑选出代价最小的执行计划。

AQE 对于整体的 Spark SQL 的执行过程做了相应的调整和优化,它最大的亮点是可以根据已经完成的计划节点真实且精确地执行统计结果来不停的反馈并重新优化剩下的执行计划。


CBO这么难实现,Spark怎么解决?

CBO 会计算一些和业务数据相关的统计数据,来优化查询,例如行数、去重后的行数、空值、最大最小值等。Spark会根据这些数据,自动选择BHJ或者SMJ,对于多Join场景下的Cost-based Join Reorder,来达到优化执行计划的目的。

但是,由于这些统计数据是需要预先处理的,会过时,所以我们在用过时的数据进行判断,在某些情况下反而会变成负面效果,拉低了SQL执行效率。

Spark3.0的AQE框架用了三招解决这个问题:

  • 动态合并shuffle分区(Dynamically coalescing shuffle partitions)
  • 动态调整Join策略(Dynamically switching join strategies)
  • 动态优化数据倾斜Join(Dynamically optimizing skew joins)

后面文章会具体学习AQE的三大特性。
Spark AQE 的三大特性


Krys