searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享

《F1 Query: Declarative Querying at Scale》论文阅读分享

2023-08-02 08:14:03
17
0

1. 介绍

  • 数据碎片化:对于不同的底层数据(例如存储在Spaner,bigTable,各种文件系统)F1 Query可以提供一个统一的视图进行查询。

  • 数据中心架构:对于存储与计算分离的数据中心当中 F1 Query 可以作为一个查询交互引擎。以下几点作为支撑:

    • google 网速快,主要指本地访问远程数据的延迟比较小。

    • 磁盘的 I/O 也没问题,因为他们有自己的文件系统可以保证。

    • 远程的数据管理服务(spaner)可以均衡的访问数据

  • 伸缩性(针对于查询的大小和类型):单点短查询、分布式中性查询和大的分布式查询(在mapReduce框架下)。F1 Query 通过增加用于查询处理的计算并行性来缓解大数据量的高延迟。

  • 扩展性(针对于复杂的自定义查询):F1 Query 可以满足不容易用SQL表达的查询需求,例如用户定义函数(UDF)、用户定义聚合函数(UDA)、表值函数(TVF)。

2. 概述

2.1 F1 Query 架构

在单个数据中心内组件之间的基本架构和通信。

F1 Master负责监控和维护所有F1 Server;F1 Server负责接收客户端的小型查询和处理小型查询;当查询的数据量比较大时,F1 Server会将任务分配给F1 Worker池,利用F1 Worker池中动态的分配执行线程去处理查询请求。

F1 Server和F1 Worker都是无状态的,因此,扩容这两个组件不会引发数据重新分配的代价。

2.2 Query执行过程:

  • 单点模式:当客户端发送一条 SQL 查询语句到F1,F1 Server 首先解析和分析 SQL,然后遍历下层 Data Source 的列表,如果这个本地数据中心没有这条 SQL 查询的数据,那么就找就近并且会返回 SQL 所需要数据的 F1 Server。然后客户端重新发 SQL 给这个 F1 Server 执行。虽然google服务器之间网速快,IO快。但是上面这个过程(选择就近的并且带有SQL所需数据的F1 Server,对延迟会有很大影响)。

F1 Server中的优化器将SQL转化为抽象语法树在转化为关系代数运算符DAG,在物理和逻辑和物理上优化,生成执行计划。再根据客户端判断是进行交互式执行还是批量式执行。

  • 交互式:优化器会选择在单点执行还是分布式执行,单点执行,接收到请求的F1 Server会立即分析、查询返回结果。如果是分布式执行,那么接收到请求的Server只是充当查询协调器。该Server在一个单独的worker中工作,然后一起执行查询。交互执行在中小型查询中有良好的性能和较高的效率。

  • 批量模式:有更高的可靠性,查询计划单独存储,调度逻辑使用MapReduce异步运行查询。可以容忍服务器重启和故障。

  • 数据源:F1 Server和F1Worke可以访问其他数据中心的数据;而且通过计算存储分离,访问时也可以兼容各种数据格式。F1 Query将每种数据源抽象成类似关系表的模式,可以通过SQL查询各种数据源类型。

  • 数据接收器:查询的输出可以指定成各种类型的数据格式返回,可以返回给客户端,也可以保存到其他存储。

  • 查询语言:兼容SQL 2011标准。

3. 交互式执行

F1 Query 交互查询支持集中式和分布式,这根据优化器分析查询语句在确定。

  • 若是集中式:也就是单点查询,F1 Server使用单个执行线程立即执行查询计划。

  • 若是分布式:也就是分布式查询,则当前F1 Server充当查询协调器,安排 F1 Worker一起执行查询。

3.1 集中式单线程执行

 

# 用UserID作为Join key 联合查询Users表和Activity表,在以UserID进行排序
SELECT * FROM Users JOIN Activity USING (UserID) ORDER BY UserID;

F1 Query支持各种链接查找,包括lookup Join、hash join、merge join 和array join。

文中的描述是:Lookup Join 操作符首先从Activity表中分批次读取join key为UserId的行到内存,在从Users表中用索引查找key为UserID的行。然后对左右两个输入进行merge,最后进行排序。

F1 Query对spaner提供了一个集成的 scan/join方法。

除了链接:F1 Query也支持投影、聚合额、排序、unioning和分析窗口功能。这些都支持过滤谓词以及LIMIT 和 OFFSET。

3.2 分布式执行

  • 每个分片包含一个或多个运算符(例如聚合)以及有关如何读取分片输入和重新分配分片输出的信息。

  • 每个分片有不同的并行度。

  • 分片可以将数据发送到多个目的地(分叉)执行计划可以有多个输出。

# 用AdId 作为Join key联合查询Ads表和Clicks表;
# 然后过滤出 开始日期是2018年5月14号以后 并且 使用Chrome浏览器搜索 的数据;
# 然后以地区分组,最后以点击数量降序排序。
SELECT Clicks.Region, COUNT(*) ClickCount
FROM Ads JOIN Clicks USING (AdId)
WHERE Ads.StartDate > ‘2018-05-14AND Clicks.OS = ‘Chrome OS’
GROUP BY Clicks.Region
ORDER BY ClickCount DESC;

当优化器发现分布式执行比较合理时,会生成一个分布式执行计划。查询计划会被拆分成一个个分片。每个分片都在一组F1 Worker节点上,F1 Worker是多线程的,每个分片同时执行。

Scan(Clicks) 被分配给 1000 个 F1 Worker 上并行拉取满足 Clicks.OS=Chrome OS条件的数据,每个worker根据每一行数据的 Hash(AdID) 发送给对应的 HashJoin 分片。

同样,Scan(Ads) 被分配给 200 个 F1 Worker 上并行拉取满足 Ads.StartDate > ‘2018-05-14’ 条件的数据,每个worker根据每一行数据的 Hash(AdID) 发送给对应的 HashJoin 分片。

因为做hash Join的分片(最右侧蓝色分片)有1000个F1 Workers,所以下面两个Scan F1 Worker发送数据给对应的Hash Join分片时也分成1000个并行发送任务。

HashJoin 及 PartialAggregation:根据 Join Key 分成了 1000 个并行任务,各自做 Join 计算,并做一次聚合。

最后,F1 Server把各个分片的聚合结果再汇总起来,返回给客户端。

3.3 分片策略

分布式执行模式中,F1 Query并行执行多个分片,整个执行的数据流可以看做一个有向无环图。

数据从Fragment流出的时候会被Exchange operate 利用一个分区函数计算流出的目的地(partition number),这个目的地就是分片里面的一个worker。

论文中说Exchange操作是通过RPC实现的可以很好的扩展到每个分片和上千个分区。google的Jupiter网络速度很快,每个机器间通信都可以达到10GB/S。

为了提高查询效率,Table Scan 会将底层的数据分成N个分片描述,调度N个worker,每个worker执行一个分片描述。因为分片描述可以让worker获取相应数据的子集,所以每个worker扫描元数据后获取的数据不重叠。

聚合可以被优化,之前提到数据从Fragment流出的时候会被Exchange operate 利用一个分区函数计算流出的目的地(partition number),图5中,当Aggregation(partial)流出到Aggregation(final)之前会被Exchange operate尽量在内存中聚合。 这就减少了数据的传输量,降低了hot grouping keys 的影响。

总结:什么样的算子会归到一个Fragment里?

Buttom-up策略进行汇聚

对数据分布有共同需求的算子,数据是否按照某个字段进行分片(Hash join、Group by Fields、部分的聚合算子,例如Partail Count)

3.4 性能考量

F1 Query中的性能问题主要是由数据倾斜和读取数据源方式不合理导致的。hash join对两边的热点key非常敏感:接收方可能数据量太大溢出到磁盘上,发送方可能消耗太多cpu和网络资源。如果其中一方数据量很少,F1 Query支持broadcast hash join,将它的拷贝发送给所有参与的worker。

最基本的lookup join的实现是一个一个key做lookup,考虑到延时,这么做就太慢了。F1 Query可以缓存若干行之后批量做lookup,这样还可以减少对相同key的重复的lookup。scan算子也可以用类似的方法来提速,即将对相同数据源分片的多次访问合并为一次。如果对数据源分片的请求数量超过了它能服务的并发数,这些请求可以乱序执行,因此能隐藏掉底层存储系统的延时,即慢请求在处理的同时,快请求也一直在处理。

如果直接将lookup join算子与它的左输入放到一起,也可能产生倾斜和非预期的数据访问,比如根本没办法聚合相同的key。为此,优化器可以将左输入再次分片,分片函数可以有多种。像Spanner和BigTable这样的key range分片的数据源可以得益于基于key range的分片方法。如果想利用这一点,我们可以把所有destination fragment按静态的key range进行分片。

F1 Query采用了一种动态分片,即各个输入源按自己的数据进行分片(接收端仍然和左输入源一起)。基于观察,一个数据源的数据分布通常与整体数据分布类似。这样分片的数据在worker之间更均匀,也能自适应地将热点key分散到多个接收分片。

F1 Query的算子通常尽可能地在内存中做流计算,而不是checkpoint到磁盘上。配合有激进的缓存机制的数据源,即使是复杂的分布式query也可以在几十或几百ms内完成。client端会重试失败的查询,但对于长时间的查询,分布式执行不够了,需要批处理了。

4. 批量查询执行

F1 Query 也支持长时间运行的大量数据的转换,例如ETL。google里面的ETL基本都是用FliumeJava或者mapReduce写出来的,然而这两个的开发和维护成本太高,并且享受不到SQL优化器的很多优化,比如属性裁剪、条件下推等等。

当F1 Worker 或者F1 Server故障出现故障时,上一章的交互式模型(内存过程模型)不适合,比如出现长查询。所以用批处理的方式就可以很好的应对。另外,批处理方式也能应对客户端故障,通过客户端断开重连进行异步查询。

批量模式和交互式模式共用一个“SQL前端”,也就是共享优化器、执行计划生成。

主要区别是在调度过程。如上图所示。交互式查询是同步执行的,F1 Server监控查询进度,直到完成;批量模式中,F1 Server异步调度查询去执行,中央注册表记录查询进度。

这种模式的挑战:

  • 通信方面:交互式可以同时激活所有分片,进而可以通过RPC通信;批量模式每个分片执行的时间不同。

  • 机器故障问题:因为批量模式是长时间运行的,如果出现短暂的机器故障,批量模式要持久化长查询的中间状态,然后能够继续执行。

  • 多任务情况:需要一个上层框架来跟踪多个批量模式的任务。

4.1 解决通信和短时间宕机问题

解决通信方面:将批处理执行计划中每个分片映射到MapReduce Stage,处理管道中每个stage都将数据输出存储到Colossus文件系统中。解决机器故障:因为mapReduce框架具有容错性,所以出现短时间宕机没有问题。

由图六所示左侧,F1将执行计划中的页分片被抽象映射到右侧的MapReduce操作,MapReduce分为两个阶段处理,右侧最下端为map-Reduce-Reduce,即将一个分片数据 F 处理为按照所需规则的分类数据 E;右侧上面两个操作为map<identity>-reduce,如中间,即将处理后的数据 E 和 D 按照所需的规则分类成数据 B。这样一来,左侧的6个数据处理阶段,变为右侧的3个数据处理阶段。

与交互式通过RPC发送数据不同,批处理模型将数据写入缓存文件,再将其读出到下一个分片。通过执行计划器的公共I/O接口实现。另外交互式模型中执行计划中每个节点都是同时被激活的,通过流水线方式实现并行,但是批量模式在MapReduce时只有所有数据完全流入后方可启动。

4.2 解决追踪所有MapReduce任务问题

F1 Client发送批量查询请求,其中一个F1 Server接收请求,并生成一个执行计划。Query Registry是一个Spanner数据存储,用来跟踪所有批处理模式查询的元数据。Query Distributor 根据负载均衡和数据源的可用性选择数据中心,然后分发查询给数据中心。然后数据中心获取查询,每个数据中心都有一个都有Query Scheduler,他定期从Query Registry中检索新分配的查询,当查询任务准备好并且资源可用时,Query Scheduler将任务发送给Query Executor,然后Query Executor使用MapReduce工作池执行Query任务。好处如下:

1.每个组件都是无状态的,并且都有备用节点,另外Query Registry中维护了执行状态,使得所有组件都可以被替换。

2.MapReduce失败的话也会重试几次。

3.如果数据中心完全宕机 Query Distributor会将查询重新分配给备用数据中心,然后重新执行。

5. 查询优化器

5.1 优化器基础结构

查询优化器执行过程:

AST->逻辑转换器->生成逻辑查询计划->逻辑计划的等价转换(规则)->物理策略生成器->物理查询计划->执行计划生成器->最终执行计划**。

功能:查询优化器主要作用是将SQL抽象语法树(AST)转化为物理计划(包括所有数据源访问路径和执行算法),如图8绿色输入框到绿色输出框。

5.2 逻辑查询计划优化

Logical Converter;Logical Transformations(Rules)

优化器将传来的AST转换为关系代数树;关系代数树应用逻辑重写规则来应用启发式更新改进查询计划。

F1 Query支持的规则有:filter下推、常量折叠、属性裁剪、常量传播、outer join窄化、排序消除、公共子计划消除、materialized view重写。

5.3 物理查询计划构建

Physical Strategies

基于关系代数, 创建一个物理查询计划树,表示实际的执行算法和数据源访问路径。

每生成一个物理计划都表示为一个类,该类追踪多个数据的属性,比如分布情况、顺序、唯一性、相比其他数据源的波动性,优化器根据这些属性决定何时插入exchange算子。

如果并发度超过了集中模式的处理能力,就变成分布式查询。

5.4 执行计划生成器

Execution plan Generator

将物理执行计划转化为一系列可以直接执行的执行片段。即将物理执行计划树中每个节点转化为相应的运算符。

生成器还负责计算每个片段的最终并行度,从叶节向上传播到根节点。

6. 延展性 tiSpark?

F1 Query支持多种存储系统和多种输入类型。

1.自定义数据源(spanner、mesa)

2.用户定义的标量函数(UDF)、聚合函数(UDA)和表值函数(TVF)

UDF Server是由F1 Query单独拥有和部署的RPC服务。与F1 Server与F1 Worker一样,执行在相同的数据中心。

 

UDF Server 公开了一个通用的RPC接口,使F1 Server能够找到UDF Server导出函数的详细信息,然后执行这些函数。

执行器通过RPC远程调用远端的UDF函数。通过Batch/流水线/异步化将远程调用的延迟抹平。

UDF Server也是无状态的,可以很好的横向扩展。

6.1 UDF(User-Defined Functions)

F1 Query 支持的 UDF 可以使用 SQL 或者 Lua 脚本语言编写。标量就是通过一行函数转化为另外一行。如下是通过Lua编写的一个方法----将字符串日期转化为相应的无符号整数Unix时间。

local function string2unixtime(value)
 local y,m,d = match("(%d+)%-(%d+)%-(%d+)")
 return os.time({year=y, month=m, day=d})
end

F1 Query在解析上面这种脚本或者SQL时,会生成一个函数表达式。然后优化器会将表达式移动到投影中,投影操作符将数据缓冲并计算相关的参数。然后F1 Worker将RPC发送到关联的UDF服务器,为了降低延迟F1 Worker会发送多个RPC。

6.2 UDA(User-Defined Aggregate)

F1 Query 还支持用户定义的聚合函数,即用户定义的这个函数方法,能够将多个输入组合成一条输出。一批过来转化为一行,比如Count()。可以分成几部分来做,最后整合为count()。

UDF Server 是无状态的,并允许每个F1 Server并行的将请求发送给多个 UDF Server 进程。这样一来,如图所示每个F1 Worker可以并行的将自己内部聚合后的数据( AGGREGATION(partial) ),并行的发送给 F1 Server。

6.3 TVF(Table-Value Functions)

# 计算过去3天的广告点击活动:
SELECT * FROM EventsFromPastDays(3, TABLE Clicks);

# 表中时间字段 大于 当前时间-3天,即过去三天内的所有数据。
CREATE TABLE FUNCTION EventsFromPastDays(num_days INT64, events ANY TABLE)
AS SELECT * FROM events
WHERE date >= DATE_SUB(CURRENT_DATE(),INTERVAL num_days DAY);

一张表作为输入,转化为另外一张表,例如这样一段输入到F1 Query中,会返回一张过去三天内数据的表。

如图所示,输入进来的表会转化为多个Row batches,然后F1 Server通过RPC调用 UDF Server,UDF Server会根据Where条件处理传过来的每个Row batches。然后F1 Server再将UDF Server输出的已经处理好的Row batches 都接收回来,组成新表,返回给Client。

7. 高级特性

7.1 鲁棒性能

F1 Query很重视性能的稳健性,当出现非预期的数据模式(数据量、选择度等)或者其他意外因素时,性能会优雅的下降,即性能降级,并且在降级的过程平滑化,避免出现陡峭的曲线。

例子:原本以为内存排序的数据,当数据量超出预期,需要转化为磁盘参与的排序算法时,F1 Query会智能的将必要的部分数据放到磁盘上,而不是将整个排序都改掉。

8. 生产性能指标

如下图所示:对于交互模式,14天,每天的QPS曲线。平均每秒是450000个Query,那么每天就是400亿个Query。

如下图(左)所示:显示了集中式查询的延迟百分比。其中红色小球曲线表示有50%的 Query 低于10ms,蓝色小三角曲线表示有90%的 Query 低于50ms,黄色小×曲线表示有99%的 Query 低于300ms。

如下图(右)所示:显示了分布式查询的延迟百分比。

 

0条评论
0 / 1000
tulv
4文章数
0粉丝数
tulv
4 文章 | 0 粉丝

《F1 Query: Declarative Querying at Scale》论文阅读分享

2023-08-02 08:14:03
17
0

1. 介绍

  • 数据碎片化:对于不同的底层数据(例如存储在Spaner,bigTable,各种文件系统)F1 Query可以提供一个统一的视图进行查询。

  • 数据中心架构:对于存储与计算分离的数据中心当中 F1 Query 可以作为一个查询交互引擎。以下几点作为支撑:

    • google 网速快,主要指本地访问远程数据的延迟比较小。

    • 磁盘的 I/O 也没问题,因为他们有自己的文件系统可以保证。

    • 远程的数据管理服务(spaner)可以均衡的访问数据

  • 伸缩性(针对于查询的大小和类型):单点短查询、分布式中性查询和大的分布式查询(在mapReduce框架下)。F1 Query 通过增加用于查询处理的计算并行性来缓解大数据量的高延迟。

  • 扩展性(针对于复杂的自定义查询):F1 Query 可以满足不容易用SQL表达的查询需求,例如用户定义函数(UDF)、用户定义聚合函数(UDA)、表值函数(TVF)。

2. 概述

2.1 F1 Query 架构

在单个数据中心内组件之间的基本架构和通信。

F1 Master负责监控和维护所有F1 Server;F1 Server负责接收客户端的小型查询和处理小型查询;当查询的数据量比较大时,F1 Server会将任务分配给F1 Worker池,利用F1 Worker池中动态的分配执行线程去处理查询请求。

F1 Server和F1 Worker都是无状态的,因此,扩容这两个组件不会引发数据重新分配的代价。

2.2 Query执行过程:

  • 单点模式:当客户端发送一条 SQL 查询语句到F1,F1 Server 首先解析和分析 SQL,然后遍历下层 Data Source 的列表,如果这个本地数据中心没有这条 SQL 查询的数据,那么就找就近并且会返回 SQL 所需要数据的 F1 Server。然后客户端重新发 SQL 给这个 F1 Server 执行。虽然google服务器之间网速快,IO快。但是上面这个过程(选择就近的并且带有SQL所需数据的F1 Server,对延迟会有很大影响)。

F1 Server中的优化器将SQL转化为抽象语法树在转化为关系代数运算符DAG,在物理和逻辑和物理上优化,生成执行计划。再根据客户端判断是进行交互式执行还是批量式执行。

  • 交互式:优化器会选择在单点执行还是分布式执行,单点执行,接收到请求的F1 Server会立即分析、查询返回结果。如果是分布式执行,那么接收到请求的Server只是充当查询协调器。该Server在一个单独的worker中工作,然后一起执行查询。交互执行在中小型查询中有良好的性能和较高的效率。

  • 批量模式:有更高的可靠性,查询计划单独存储,调度逻辑使用MapReduce异步运行查询。可以容忍服务器重启和故障。

  • 数据源:F1 Server和F1Worke可以访问其他数据中心的数据;而且通过计算存储分离,访问时也可以兼容各种数据格式。F1 Query将每种数据源抽象成类似关系表的模式,可以通过SQL查询各种数据源类型。

  • 数据接收器:查询的输出可以指定成各种类型的数据格式返回,可以返回给客户端,也可以保存到其他存储。

  • 查询语言:兼容SQL 2011标准。

3. 交互式执行

F1 Query 交互查询支持集中式和分布式,这根据优化器分析查询语句在确定。

  • 若是集中式:也就是单点查询,F1 Server使用单个执行线程立即执行查询计划。

  • 若是分布式:也就是分布式查询,则当前F1 Server充当查询协调器,安排 F1 Worker一起执行查询。

3.1 集中式单线程执行

 

# 用UserID作为Join key 联合查询Users表和Activity表,在以UserID进行排序
SELECT * FROM Users JOIN Activity USING (UserID) ORDER BY UserID;

F1 Query支持各种链接查找,包括lookup Join、hash join、merge join 和array join。

文中的描述是:Lookup Join 操作符首先从Activity表中分批次读取join key为UserId的行到内存,在从Users表中用索引查找key为UserID的行。然后对左右两个输入进行merge,最后进行排序。

F1 Query对spaner提供了一个集成的 scan/join方法。

除了链接:F1 Query也支持投影、聚合额、排序、unioning和分析窗口功能。这些都支持过滤谓词以及LIMIT 和 OFFSET。

3.2 分布式执行

  • 每个分片包含一个或多个运算符(例如聚合)以及有关如何读取分片输入和重新分配分片输出的信息。

  • 每个分片有不同的并行度。

  • 分片可以将数据发送到多个目的地(分叉)执行计划可以有多个输出。

# 用AdId 作为Join key联合查询Ads表和Clicks表;
# 然后过滤出 开始日期是2018年5月14号以后 并且 使用Chrome浏览器搜索 的数据;
# 然后以地区分组,最后以点击数量降序排序。
SELECT Clicks.Region, COUNT(*) ClickCount
FROM Ads JOIN Clicks USING (AdId)
WHERE Ads.StartDate > ‘2018-05-14AND Clicks.OS = ‘Chrome OS’
GROUP BY Clicks.Region
ORDER BY ClickCount DESC;

当优化器发现分布式执行比较合理时,会生成一个分布式执行计划。查询计划会被拆分成一个个分片。每个分片都在一组F1 Worker节点上,F1 Worker是多线程的,每个分片同时执行。

Scan(Clicks) 被分配给 1000 个 F1 Worker 上并行拉取满足 Clicks.OS=Chrome OS条件的数据,每个worker根据每一行数据的 Hash(AdID) 发送给对应的 HashJoin 分片。

同样,Scan(Ads) 被分配给 200 个 F1 Worker 上并行拉取满足 Ads.StartDate > ‘2018-05-14’ 条件的数据,每个worker根据每一行数据的 Hash(AdID) 发送给对应的 HashJoin 分片。

因为做hash Join的分片(最右侧蓝色分片)有1000个F1 Workers,所以下面两个Scan F1 Worker发送数据给对应的Hash Join分片时也分成1000个并行发送任务。

HashJoin 及 PartialAggregation:根据 Join Key 分成了 1000 个并行任务,各自做 Join 计算,并做一次聚合。

最后,F1 Server把各个分片的聚合结果再汇总起来,返回给客户端。

3.3 分片策略

分布式执行模式中,F1 Query并行执行多个分片,整个执行的数据流可以看做一个有向无环图。

数据从Fragment流出的时候会被Exchange operate 利用一个分区函数计算流出的目的地(partition number),这个目的地就是分片里面的一个worker。

论文中说Exchange操作是通过RPC实现的可以很好的扩展到每个分片和上千个分区。google的Jupiter网络速度很快,每个机器间通信都可以达到10GB/S。

为了提高查询效率,Table Scan 会将底层的数据分成N个分片描述,调度N个worker,每个worker执行一个分片描述。因为分片描述可以让worker获取相应数据的子集,所以每个worker扫描元数据后获取的数据不重叠。

聚合可以被优化,之前提到数据从Fragment流出的时候会被Exchange operate 利用一个分区函数计算流出的目的地(partition number),图5中,当Aggregation(partial)流出到Aggregation(final)之前会被Exchange operate尽量在内存中聚合。 这就减少了数据的传输量,降低了hot grouping keys 的影响。

总结:什么样的算子会归到一个Fragment里?

Buttom-up策略进行汇聚

对数据分布有共同需求的算子,数据是否按照某个字段进行分片(Hash join、Group by Fields、部分的聚合算子,例如Partail Count)

3.4 性能考量

F1 Query中的性能问题主要是由数据倾斜和读取数据源方式不合理导致的。hash join对两边的热点key非常敏感:接收方可能数据量太大溢出到磁盘上,发送方可能消耗太多cpu和网络资源。如果其中一方数据量很少,F1 Query支持broadcast hash join,将它的拷贝发送给所有参与的worker。

最基本的lookup join的实现是一个一个key做lookup,考虑到延时,这么做就太慢了。F1 Query可以缓存若干行之后批量做lookup,这样还可以减少对相同key的重复的lookup。scan算子也可以用类似的方法来提速,即将对相同数据源分片的多次访问合并为一次。如果对数据源分片的请求数量超过了它能服务的并发数,这些请求可以乱序执行,因此能隐藏掉底层存储系统的延时,即慢请求在处理的同时,快请求也一直在处理。

如果直接将lookup join算子与它的左输入放到一起,也可能产生倾斜和非预期的数据访问,比如根本没办法聚合相同的key。为此,优化器可以将左输入再次分片,分片函数可以有多种。像Spanner和BigTable这样的key range分片的数据源可以得益于基于key range的分片方法。如果想利用这一点,我们可以把所有destination fragment按静态的key range进行分片。

F1 Query采用了一种动态分片,即各个输入源按自己的数据进行分片(接收端仍然和左输入源一起)。基于观察,一个数据源的数据分布通常与整体数据分布类似。这样分片的数据在worker之间更均匀,也能自适应地将热点key分散到多个接收分片。

F1 Query的算子通常尽可能地在内存中做流计算,而不是checkpoint到磁盘上。配合有激进的缓存机制的数据源,即使是复杂的分布式query也可以在几十或几百ms内完成。client端会重试失败的查询,但对于长时间的查询,分布式执行不够了,需要批处理了。

4. 批量查询执行

F1 Query 也支持长时间运行的大量数据的转换,例如ETL。google里面的ETL基本都是用FliumeJava或者mapReduce写出来的,然而这两个的开发和维护成本太高,并且享受不到SQL优化器的很多优化,比如属性裁剪、条件下推等等。

当F1 Worker 或者F1 Server故障出现故障时,上一章的交互式模型(内存过程模型)不适合,比如出现长查询。所以用批处理的方式就可以很好的应对。另外,批处理方式也能应对客户端故障,通过客户端断开重连进行异步查询。

批量模式和交互式模式共用一个“SQL前端”,也就是共享优化器、执行计划生成。

主要区别是在调度过程。如上图所示。交互式查询是同步执行的,F1 Server监控查询进度,直到完成;批量模式中,F1 Server异步调度查询去执行,中央注册表记录查询进度。

这种模式的挑战:

  • 通信方面:交互式可以同时激活所有分片,进而可以通过RPC通信;批量模式每个分片执行的时间不同。

  • 机器故障问题:因为批量模式是长时间运行的,如果出现短暂的机器故障,批量模式要持久化长查询的中间状态,然后能够继续执行。

  • 多任务情况:需要一个上层框架来跟踪多个批量模式的任务。

4.1 解决通信和短时间宕机问题

解决通信方面:将批处理执行计划中每个分片映射到MapReduce Stage,处理管道中每个stage都将数据输出存储到Colossus文件系统中。解决机器故障:因为mapReduce框架具有容错性,所以出现短时间宕机没有问题。

由图六所示左侧,F1将执行计划中的页分片被抽象映射到右侧的MapReduce操作,MapReduce分为两个阶段处理,右侧最下端为map-Reduce-Reduce,即将一个分片数据 F 处理为按照所需规则的分类数据 E;右侧上面两个操作为map<identity>-reduce,如中间,即将处理后的数据 E 和 D 按照所需的规则分类成数据 B。这样一来,左侧的6个数据处理阶段,变为右侧的3个数据处理阶段。

与交互式通过RPC发送数据不同,批处理模型将数据写入缓存文件,再将其读出到下一个分片。通过执行计划器的公共I/O接口实现。另外交互式模型中执行计划中每个节点都是同时被激活的,通过流水线方式实现并行,但是批量模式在MapReduce时只有所有数据完全流入后方可启动。

4.2 解决追踪所有MapReduce任务问题

F1 Client发送批量查询请求,其中一个F1 Server接收请求,并生成一个执行计划。Query Registry是一个Spanner数据存储,用来跟踪所有批处理模式查询的元数据。Query Distributor 根据负载均衡和数据源的可用性选择数据中心,然后分发查询给数据中心。然后数据中心获取查询,每个数据中心都有一个都有Query Scheduler,他定期从Query Registry中检索新分配的查询,当查询任务准备好并且资源可用时,Query Scheduler将任务发送给Query Executor,然后Query Executor使用MapReduce工作池执行Query任务。好处如下:

1.每个组件都是无状态的,并且都有备用节点,另外Query Registry中维护了执行状态,使得所有组件都可以被替换。

2.MapReduce失败的话也会重试几次。

3.如果数据中心完全宕机 Query Distributor会将查询重新分配给备用数据中心,然后重新执行。

5. 查询优化器

5.1 优化器基础结构

查询优化器执行过程:

AST->逻辑转换器->生成逻辑查询计划->逻辑计划的等价转换(规则)->物理策略生成器->物理查询计划->执行计划生成器->最终执行计划**。

功能:查询优化器主要作用是将SQL抽象语法树(AST)转化为物理计划(包括所有数据源访问路径和执行算法),如图8绿色输入框到绿色输出框。

5.2 逻辑查询计划优化

Logical Converter;Logical Transformations(Rules)

优化器将传来的AST转换为关系代数树;关系代数树应用逻辑重写规则来应用启发式更新改进查询计划。

F1 Query支持的规则有:filter下推、常量折叠、属性裁剪、常量传播、outer join窄化、排序消除、公共子计划消除、materialized view重写。

5.3 物理查询计划构建

Physical Strategies

基于关系代数, 创建一个物理查询计划树,表示实际的执行算法和数据源访问路径。

每生成一个物理计划都表示为一个类,该类追踪多个数据的属性,比如分布情况、顺序、唯一性、相比其他数据源的波动性,优化器根据这些属性决定何时插入exchange算子。

如果并发度超过了集中模式的处理能力,就变成分布式查询。

5.4 执行计划生成器

Execution plan Generator

将物理执行计划转化为一系列可以直接执行的执行片段。即将物理执行计划树中每个节点转化为相应的运算符。

生成器还负责计算每个片段的最终并行度,从叶节向上传播到根节点。

6. 延展性 tiSpark?

F1 Query支持多种存储系统和多种输入类型。

1.自定义数据源(spanner、mesa)

2.用户定义的标量函数(UDF)、聚合函数(UDA)和表值函数(TVF)

UDF Server是由F1 Query单独拥有和部署的RPC服务。与F1 Server与F1 Worker一样,执行在相同的数据中心。

 

UDF Server 公开了一个通用的RPC接口,使F1 Server能够找到UDF Server导出函数的详细信息,然后执行这些函数。

执行器通过RPC远程调用远端的UDF函数。通过Batch/流水线/异步化将远程调用的延迟抹平。

UDF Server也是无状态的,可以很好的横向扩展。

6.1 UDF(User-Defined Functions)

F1 Query 支持的 UDF 可以使用 SQL 或者 Lua 脚本语言编写。标量就是通过一行函数转化为另外一行。如下是通过Lua编写的一个方法----将字符串日期转化为相应的无符号整数Unix时间。

local function string2unixtime(value)
 local y,m,d = match("(%d+)%-(%d+)%-(%d+)")
 return os.time({year=y, month=m, day=d})
end

F1 Query在解析上面这种脚本或者SQL时,会生成一个函数表达式。然后优化器会将表达式移动到投影中,投影操作符将数据缓冲并计算相关的参数。然后F1 Worker将RPC发送到关联的UDF服务器,为了降低延迟F1 Worker会发送多个RPC。

6.2 UDA(User-Defined Aggregate)

F1 Query 还支持用户定义的聚合函数,即用户定义的这个函数方法,能够将多个输入组合成一条输出。一批过来转化为一行,比如Count()。可以分成几部分来做,最后整合为count()。

UDF Server 是无状态的,并允许每个F1 Server并行的将请求发送给多个 UDF Server 进程。这样一来,如图所示每个F1 Worker可以并行的将自己内部聚合后的数据( AGGREGATION(partial) ),并行的发送给 F1 Server。

6.3 TVF(Table-Value Functions)

# 计算过去3天的广告点击活动:
SELECT * FROM EventsFromPastDays(3, TABLE Clicks);

# 表中时间字段 大于 当前时间-3天,即过去三天内的所有数据。
CREATE TABLE FUNCTION EventsFromPastDays(num_days INT64, events ANY TABLE)
AS SELECT * FROM events
WHERE date >= DATE_SUB(CURRENT_DATE(),INTERVAL num_days DAY);

一张表作为输入,转化为另外一张表,例如这样一段输入到F1 Query中,会返回一张过去三天内数据的表。

如图所示,输入进来的表会转化为多个Row batches,然后F1 Server通过RPC调用 UDF Server,UDF Server会根据Where条件处理传过来的每个Row batches。然后F1 Server再将UDF Server输出的已经处理好的Row batches 都接收回来,组成新表,返回给Client。

7. 高级特性

7.1 鲁棒性能

F1 Query很重视性能的稳健性,当出现非预期的数据模式(数据量、选择度等)或者其他意外因素时,性能会优雅的下降,即性能降级,并且在降级的过程平滑化,避免出现陡峭的曲线。

例子:原本以为内存排序的数据,当数据量超出预期,需要转化为磁盘参与的排序算法时,F1 Query会智能的将必要的部分数据放到磁盘上,而不是将整个排序都改掉。

8. 生产性能指标

如下图所示:对于交互模式,14天,每天的QPS曲线。平均每秒是450000个Query,那么每天就是400亿个Query。

如下图(左)所示:显示了集中式查询的延迟百分比。其中红色小球曲线表示有50%的 Query 低于10ms,蓝色小三角曲线表示有90%的 Query 低于50ms,黄色小×曲线表示有99%的 Query 低于300ms。

如下图(右)所示:显示了分布式查询的延迟百分比。

 

文章来自个人专栏
文章 | 订阅
0条评论
0 / 1000
请输入你的评论
0
0