无论是哪款数据库产品,在使用过程中都会经常用到join操作,join操作是指利用两张或多张表中相同的属性筛选出存在连接匹配关系的记录。下面为分布式融合数据库HTAP的join操作实现过程,以及代码分析。
一、实现过程
- 从Outer 表中取一批数据,设为 B;
- 通过 Join Key 以及 B 中的数据构造 Inner 表取值范围,只读取对应取值范围的数据,设为 S;
- 对 B 中的每一行数据,与 S 中的每一条数据执行 Join 操作并输出结果;
- 重复步骤 1,2,3,直至遍历完 Outer 表中的所有数据。
二、源代码分析
如图1所示
- 启动 Outer Worker 及 Inner Workers:这部分工作由 startWorkers 函数完成。该函数会 启动一个 Outer Worker 和 多个 Inner Worker 。Inner Woker 的数量可以通过 tidb_index_lookup_concurrency这个系统变量进行设置,默认为 4。
- 读取 Outer 表数据:这部分工作由 buildTask 函数完成。此处主要注意两点:
第一点,对于每次读取的 batch 大小,如果将其设置为固定值,则可能会出现如下问题:
(1)若设置的 batch 值较大,但 Outer 表数据量较小时。各个 Inner Worker 所需处理的任务量可能会不均匀,出现数据倾斜的情况,导致并发整体性能相对单线程提升有限。
(2)若设置的 batch 值较小,但 Outer 表数据量较大时。Inner Worker 处理任务时间短,需要频繁从管道中取任务,CPU 不能被持续高效利用,由此带来大量的线程切换开销。此外, 当 batch 值较小时,同一批 inner 表数据能会被反复读取多次,带来更大的网络开销,对整体性能产生极大影响。因此,我们通过指数递增的方式动态控制 batch 的大小(由函数increaseBatchSize 完成),以避免上述问题,batch size 的最大值由 session 变量 tidb_index_join_batch_size控制,默认是 25000。读取到的 batch 存储在 outerResult 中。
- Outer Worker 将 task 发送给 Inner Worker 和 Main Thread:Inner Worker 需要根据 Outer 表每个 batch 的数据,构建 Inner 表的数据扫描范围并读取数据,因此 Outer Worker 需要将 task 发送给 Inner Worker 。
如前文所述,ILJ 多线程并发执行,且 Join 结果的顺序与 Outer 表的数据顺序一致。 为了实现这一点,Outer Worker 通过管道将 task 发送给 Main Thread ,Main Thread 从管道中按序读取 task 并执行 Join 操作,这样便可以实现在多线程并发执行的情况下的保序需求。
- Inner Worker 读取 inner 表数据:这部分工作由 handleTask 这个函数完成。handleTask 有如下几个步骤:
(1)constructDatumLookupKeys 函数计算 Outer 表对应的 Join Keys 的值,我们可以根据 Join Keys 的值从 Inner 表中仅查询所需要的数据即可,而不用对 Inner 表中的所有数据进行遍历。为了避免对同一个 batch 中相同的 Join Keys 重复查询 Inner 表中的数据, sortAndDedupDatumLookUpKeys 会在查询前对前面计算出的 Join Keys 的值进行去重。
(2)fetchInnerResult 函数利用去重后的 Join Keys 构造对 Inner 表进行查询的执行器,并读取数据存储于 innerResult中。
(3)buildLookUpMap 函数对读取的 Inner 数据按照对应的 Join Keys 构建哈希表,存储于 lookupMap中。
上述步骤完成后,Inner Worker 向 task.doneCh中发送数据,以唤醒 Main Thread 进行接下来的工作。
- Main Thread 执行 Join 操作:这部分工作由 prepareJoinResult 函数完成。prepareJoinResult 有如下几个步骤:
(1)getFinishedTask 从 resultCh 中读取 task,并等待 doneCh 发送来的数据,若该 task 没有完成,则阻塞住;
(2)接下来的步骤与 Hash Join类似,lookUpMatchedInners 取一行 OuterRow 对应的 Join Key,从 lookupMap 中 probe 对应的 Inner 表的数据;
(3)主线程对该 OuterRow,与取出的对应的 InnerRows 执行 Join 操作,写满存储结果的 chk 后返回。
图1 index Lookup join 执行步骤:
图1 index Lookup join 执行步骤
如图2所示
(1)主线程启动一个OuterWorker和若干个InnerWorkers。
(2)OuterWorker发送task到innerCh和resultCh。主线程从resultCh按顺序依次取出Outer表的每一个task。等待
(3)InnerWorker流水线并行处理innerCh中的每个task。每处理完一个task,则发送到doneCh。
(4)主线程从doneCh读取到task,则解除等待。
(5)innerWorke处理后得到的inner batch表与task中的outer batch表完成join操作。
(6)直到2中的task取完。
图2 Index Lookup Join 代码逻辑图
如图3举例说明:查询语句一种可能的执行流程如图4所示,其中由上往下剪头表示时间线。
图3 join样例
图4 执行流程
(1)Outer Worker 负责读取 Outer表的每一个batch,构造task,并将task 发送到 Inner Worker 和 主线程。
(2)主线程接收 Outer Worker 发送过来的 task,并等待Inner worker中的 task 执行完成通知主线程,最后主线程 完成 Outer 表和inner表的Join
(3)Inner Worker 负责对每个Outer表的每个batch,抽取Inner表对应的Joinkey数据行。
参考连接:https://github.com/pingcap/tidb