searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

PostgreSQL计划节点源码剖析

2023-08-09 01:29:52
60
0

计划节点

可优化语句执行的核心是对各种计划节点的处理。PG 中计划节点分为四类:
  • 扫描节点(Scan Node):用于扫描表等对象以从中获取元组
  • 连接节点(Join Node):用于关系代数中连接操作
  • 控制节点(Control Node):用于处理特殊情况的节点
  • 物化节点(Materialization Node):能够缓存执行结果到辅助存储中

 

扫描节点

扫描节点的作用是扫描表,每次获取一条元组作为上层节点的输入。PG 中扫描节点有如下几类:

 

扫描节点的公共数据结构:
// 计划节点类型
typedef struct Scan
{
    Plan        plan;      // 包含了所有计划节点都需要的成员变量和方法,如成本估算、执行和释放计划等
    Index       scanrelid; // 指定要扫描的关系的 relid(relation identifier)
} Scan;

// 节点的执行状态
typedef struct ScanState
{
    PlanState   ps;           /* 计划节点状态结构 */
    Relation    ss_currentRelation;
    struct TableScanDescData *ss_currentScanDesc;
    TupleTableSlot *ss_ScanTupleSlot;
} ScanState;


 

扫描节点的执行函数都统一调用ExecScan函数,但根据节点的类型给ExecScan的参数 AccessMtd 赋予不同的函数指针来实现不同扫描节点的扫描。

 

SeqScan

  顺序扫描使用场景:对无索引字段进行查询、或者判断到查询将返回大多数数据
  主流程:

 

SeqScan 相关数据结构:
// 顺序扫描节点
typedef Scan SeqScan;

/* SeqScanState information */
typedef struct SeqScanState
{
    ScanState   ss;             /* 扫描状态信息 */
    Size        pscan_len;      /* size of parallel heap scan descriptor */
} SeqScanState;
SeqScan 节点初始化:
SeqScanState *
ExecInitSeqScan(SeqScan *node, EState *estate, int eflags)
{
    SeqScanState *scanstate;    // 创建 SeqScanState 结构体

    // 将 Seqscan 节点链接到 SeqScanState 相应字段
    scanstate = makeNode(SeqScanState);
    scanstate->ss.ps.plan = (Plan *) node;
    scanstate->ss.ps.state = estate;
    scanstate->ss.ps.ExecProcNode = ExecSeqScan;
    
    ...;
    
    return scanstate;
}
SeqScan 的执行函数ExecSeqScan,函数中会调用ExecScan
static TupleTableSlot *
ExecSeqScan(PlanState *pstate)
{
    SeqScanState *node = castNode(SeqScanState, pstate);

    return ExecScan(&node->ss,
                    (ExecScanAccessMtd) SeqNext,  // 将 SeqNext 函数指针作为 ExecScan 函数参数
                    (ExecScanRecheckMtd) SeqRecheck);
}

TupleTableSlot *
ExecScan(ScanState *node,
         ExecScanAccessMtd accessMtd,   /* function returning a tuple */
         ExecScanRecheckMtd recheckMtd)
{
    ...;
    for (;;)
    {
        TupleTableSlot *slot;

        slot = ExecScanFetch(node, accessMtd, recheckMtd);
        ...;
        if (qual == NULL || ExecQual(qual, econtext))
        {
            if (projInfo)    return ExecProject(projInfo);
            else    return slot;
     }
    ...;
}

static inline TupleTableSlot *
ExecScanFetch(ScanState *node,
              ExecScanAccessMtd accessMtd,
              ExecScanRecheckMtd recheckMtd)
{
    ...;
    return (*accessMtd) (node);
}

static TupleTableSlot *
SeqNext(SeqScanState *node)
{
    ...;
    // scandesc 变量用于获取当前扫描描述符,类似于迭代器
    // 如果扫描描述符为空,则说明当前还没有开始扫描操作,需要调用 table_beginscan() 函数创建一个新的扫描描述符
    if (scandesc == NULL)   
    {
        scandesc = table_beginscan(node->ss.ss_currentRelation,
                                   estate->es_snapshot,
                                   0, NULL);
        node->ss.ss_currentScanDesc = scandesc;
    }

    // get the next tuple from the table
    if (table_scan_getnextslot(scandesc, direction, slot))
        return slot;
    return NULL;
}

static inline bool
table_scan_getnextslot(TableScanDesc sscan, ScanDirection direction, TupleTableSlot *slot)
{
    slot->tts_tableOid = RelationGetRelid(sscan->rs_rd);
    return sscan->rs_rd->rd_tableam->scan_getnextslot(sscan, direction, slot);
}

 

IndexScan

索引扫描使用场景:如果选择条件涉及的属性上建立了索引,判断到查询将会命中非常少的数据

 

扫描索引节点 IndexScan:
typedef struct IndexScan
{
    Scan        scan;
    Oid         indexid;            /* 用于存储索引的 OID */
    List       *indexqual;          /* 用于存储索引扫描的过滤条件 */
    List       *indexqualorig;      /* 存储没有处理的原始扫描条件链表 */
    List       *indexorderby;       /* 索引扫描的排序条件,通常是表达式组成的列表 */
    List       *indexorderbyorig;   /* 索引扫描排序条件的原始形式的列表 */
    List       *indexorderbyops;    /* 用于排序的操作符OID的列表 */
    ScanDirection indexorderdir;    /* 索引扫描的排序方向,可以是向前、向后或不关心 */
} IndexScan;

 

扫描索引的状态结构体 IndexScanState:
  • ScanKeys:存储索引列的信息、操作函数等,描述了一个完整的过滤条件,并用于索引扫描
  • 但如果过滤条件是个复杂的表达式,引入 iss_RuntimeKeys 来处理
typedef struct IndexScanState
{
    ScanState   ss;
    ExprState  *indexqualorig;           // 索引过滤条件
    List       *indexorderbyorig;        
    struct ScanKeyData *iss_ScanKeys;    // 简单的索引过滤条件,如常量
    int         iss_NumScanKeys;         // 指示数组 iss_ScanKeys 长度
    struct ScanKeyData *iss_OrderByKeys; 
    int         iss_NumOrderByKeys;
    IndexRuntimeKeyInfo *iss_RuntimeKeys; // 索引过滤条件不是常量,需要动态计算的
    int         iss_NumRuntimeKeys;       // 指示数组 iss_RuntimeKeys 长度
    bool        iss_RuntimeKeysReady;
    ExprContext *iss_RuntimeContext;
    Relation    iss_RelationDesc;
    struct IndexScanDescData *iss_ScanDesc;  // 扫描描述符
    ...;
} IndexScanState;

 

IndexScan 主要执行函数:
/* 基于索引读取元组,并返回给执行器上层 */
static TupleTableSlot *
ExecIndexScan(PlanState *pstate)
{
    IndexScanState *node = castNode(IndexScanState, pstate);
    ...;
    if (node->iss_NumOrderByKeys > 0)    // 有 ORDER BY子句,需要按指定的顺序对结果进行排序
        return ExecScan(&node->ss,
                        (ExecScanAccessMtd) IndexNextWithReorder,
                        (ExecScanRecheckMtd) IndexRecheck);
    else    // 查询计划中没有 ORDER BY子句,可以直接使用普通的索引扫描方式
        return ExecScan(&node->ss,
                        (ExecScanAccessMtd) IndexNext,
                        (ExecScanRecheckMtd) IndexRecheck);
}

/* 被 ExecSeqScan 调用 */
TupleTableSlot *
ExecScan(ScanState *node,
         ExecScanAccessMtd accessMtd,   /* function returning a tuple */
         ExecScanRecheckMtd recheckMtd)
{
    ExprContext *econtext;
    ExprState  *qual;
    ProjectionInfo *projInfo;

    for (;;)
    {
        TupleTableSlot *slot;
        // 获取下一个元组
        slot = ExecScanFetch(node, accessMtd, recheckMtd);
        ...;
        
        if (qual == NULL || ExecQual(qual, econtext))
        {
            // Found a satisfactory scan tuple.
            if (projInfo)    return ExecProject(projInfo);
            else    return slot;
    }
}

/* 执行 IndexNext 函数 */
static inline TupleTableSlot *
ExecScanFetch(ScanState *node,
              ExecScanAccessMtd accessMtd,
              ExecScanRecheckMtd recheckMtd)
{
    EState    *estate = node->ps.state;
    ...;
    return (*accessMtd) (node);
}

/* 不断进行元组扫描,读取元组,并将元组封装成 TupleTableSlot 传递给上层节点 */
static TupleTableSlot *
IndexNext(IndexScanState *node)
{
    EState     *estate;
    ExprContext *econtext;
    ScanDirection direction;
    IndexScanDesc scandesc;
    TupleTableSlot *slot;
    ...;
    while (index_getnext_slot(scandesc, direction, slot))
    {
        CHECK_FOR_INTERRUPTS();
        /*
         * If the index was lossy, we have to recheck the index quals using
         * the fetched tuple.
         */
        if (scandesc->xs_recheck)
        {
            econtext->ecxt_scantuple = slot;
            if (!ExecQualAndReset(node->indexqualorig, econtext))
            {
                /* Fails recheck, so drop it and loop back for another */
                InstrCountFiltered2(node, 1);
                continue;
            }
        }

        return slot;
    }
    
    /*
     * if we get here it means the index scan failed so we are at the end of
     * the scan..
     */
    node->iss_ReachedEnd = true;
    return ExecClearTuple(slot);
}

/* 获取数据 */
bool
index_getnext_slot(IndexScanDesc scan, ScanDirection direction, TupleTableSlot *slot)
{
    for (;;)
    {
        if (!scan->xs_heap_continue)
        {
            ItemPointer tid;

            // 从索引中拿到遍历的索引元组 TID,TID是堆表中实际数据的物理位置
            tid = index_getnext_tid(scan, direction);

            /* If we're out of index entries, we're done */
            if (tid == NULL)
                break;
            Assert(ItemPointerEquals(tid, &scan->xs_heaptid));
        }

        Assert(ItemPointerIsValid(&scan->xs_heaptid));
        if (index_fetch_heap(scan, slot))   // 通过索引元组拿到堆表中实际数据的元组
            return true;
    }
    return false;
}

 

连接节点

PG 中有如下的连接类型:
 
PG 中实现了三种连接操作:嵌套循环连接(Nest Loop)、归并连接(Merge Loop)和 Hash 连接(Hash Join)。
连接节点的公共数据结构:
/* 连接操作(Join)的节点 */
typedef struct Join
{
    Plan        plan;           // 包含了所有计划节点都需要的成员变量和方法,如成本估算、执行和释放计划等
    JoinType    jointype;       // 连接的类型(Join Type),例如内连接、左外连接、右外连接等
    bool        inner_unique;   // 为 true 表示每个外部元组最多只能匹配一个内部元组
    List       *joinqual;       // 连接的条件
} Join;

/* 连接计划状态节点的超类(superclass) */
typedef struct JoinState
{
    PlanState   ps;
    JoinType    jointype;    // 连接的类型(Join Type),例如内连接、左外连接、右外连接等
    bool        single_match;// 为 true 表示在找到一个内表匹配后应该跳过到下一个外表元组。这在某些连接算法中可以提高性能
    ExprState  *joinqual;    // 连接的条件
} JoinState;
 

NestLoop

Nestloop Join:嵌套循环连接(Nestloop Join)是两个表做连接时最朴素的方式。通过两层循环,用第一张表做 Outer Table,第二张表做 Inner Table,Outer Table 的每一条记录跟 Inner Table 的记录作比较,符合条件的就输出。适用于返回的结果集不能太大,外表不能太大。
 
嵌套循环连接基本思想:
FOR each tuple s in S DO
    FOR each tuple r in R DO
        IF r and s join to make a tuple t THEN
            output t;
函数调用流程:
 
节点类型:NestLoop
/* 嵌套循环连接的节点 */
typedef struct NestLoop
{
    Join        join;
    List       *nestParams;     // NestLoopParam节点的列表,用于标识执行器参数
} NestLoop;
执行过程状态信息:NestLoopState
/* 用于存储嵌套循环连接(nested loop join)在数据库查询执行过程中的状态信息 */
typedef struct NestLoopState
{
    JoinState   js;             /* its first field is NodeTag */
    // 控制循环,内层循环时为 false;内层循环结束时置为 true
    bool        nl_NeedNewOuter; 
    // 跟踪是否找到了当前外部元组的连接匹配。如果找到了匹配,则设置为true
    bool        nl_MatchedOuter;
    // 存储一个准备好的空元组。为 Left Join 和 Anti Join 可能用到空元组的情况准备
    //当在左外连接中找不到与外部元组(左侧表中的记录)匹配的内部元组(右侧表中的记录)时,会使用这个空元组来表示缺失的值
    TupleTableSlot *nl_NullInnerTupleSlot; 
} NestLoopState;
执行相关函数:
static TupleTableSlot *
ExecNestLoop(PlanState *pstate)
{
    ...;
    outerPlan = outerPlanState(node);
    innerPlan = innerPlanState(node);
    for (;;)
    {    
        // 如果当前没有外表元组可用(即 nl_NeedNewOuter 为真),则需要获取下一个外部元组并重置内部扫描
        if (node->nl_NeedNewOuter)
        {
            outerTupleSlot = ExecProcNode(outerPlan);
            ...;
        }
        // 有一个外表元组(outerTuple),尝试获取内表元组(inner tuple)
        innerTupleSlot = ExecProcNode(innerPlan);
        
        ...;
        
        // 判断内表元组和外表元组是否符合限定条件
        // joinqual 保存着 join 条件信息,比如哪个column=哪个column
        // excontext 里保存和 outer tuple 和 inner tuple
        if (ExecQual(joinqual, econtext))
        {
            ...;
            return ExecProject(node->js.ps.ps_ProjInfo);
        }
 }
 
static inline TupleTableSlot *
ExecProject(ProjectionInfo *projInfo)
{
    ExprContext *econtext = projInfo->pi_exprContext;
    ExprState  *state = &projInfo->pi_state;
    TupleTableSlot *slot = state->resultslot;   // 最终投影后的结果存储到此,返回给上层
    bool        isnull;
    
    // 清空结果槽(slot)中的先前内容,将槽的状态重置为初始状态
    ExecClearTuple(slot);

    // 执行器投影算子入口函数, 底层调用了 ExecInterpExpr 来获得 slot
    (void) ExecEvalExprSwitchContext(state, econtext, &isnull);

    // 成功生成了一个结果行,将结果槽标记为包含一个有效的虚拟元组
    slot->tts_flags &= ~TTS_FLAG_EMPTY;
    slot->tts_nvalid = slot->tts_tupleDescriptor->natts;

    return slot;
}
 

MergeJoin

MergeJoin 实现了对排序关系的归并连接算法,其输入都是已经排好序的。
其伪代码:
Join{
    get initial outer and inner tuples   初始化
    do forever {
        while(outer != inner) {
            if(outer < inner)
                advance outer position
            else
                advance inner position
        }
        mark inner position
        do forever {
            while(outer == inner) {
                join tuples
                advance inner position
            }
            advance outer position
            if(outer == mark)
                restore inner position to mark
            else
                break
        }
    }
}
以 Inner Join 为例:
 
函数调用流程:
 
MergeJoin 数据结构:
typedef struct MergeJoin
{
    Join        join;
    bool        skip_mark_restore;  /* 指示是否可以跳过某些 mark */
    List       *mergeclauses;   /* 存储用于计算左右子节点元组是否匹配的表达式链表 */
    /*  
    * 与表达式链表 mergeclauses 对应
    * 表明其中每个操作符的操作符类、排序规则、执行的策略以及空值排序策略
    */
    Oid        *mergeFamilies;  /* per-clause OIDs of btree opfamilies */
    Oid        *mergeCollations;    /* per-clause OIDs of collations */
    int        *mergeStrategies;    /* per-clause ordering (ASC or DESC) */
    bool       *mergeNullsFirst;    /* per-clause nulls ordering */
} MergeJoin;

/* 初始化过程中使用 MergeJoin 初始化 MergeJoinState */
typedef struct MergeJoinState
{
    JoinState   js;             /* its first field is NodeTag */
    int         mj_NumClauses;
    MergeJoinClause mj_Clauses; /* array of length mj_NumClauses */
    int         mj_JoinState;
    bool        mj_SkipMarkRestore;
    bool        mj_ExtraMarks;
    bool        mj_ConstFalseJoin;
    /* true 表示不能忽略没有匹配项的左子节点元组,需要与空元组连接,用于Left Join、ANTI JOIN、FULL JOIN */
    bool        mj_FillOuter;   
    /* true 表示不能忽略没有匹配项的右子节点元组,需要与空元组连接,用于RIGHT Join、FULL JOIN */
    bool        mj_FillInner;   
    bool        mj_MatchedOuter;    /* 当前左子节点元组是否已找到了连接的匹配内部元组 */
    bool        mj_MatchedInner;    /* 当前右子节点元组是否已找到了连接的匹配内部元组 */
    TupleTableSlot *mj_OuterTupleSlot;
    TupleTableSlot *mj_InnerTupleSlot;
    TupleTableSlot *mj_MarkedTupleSlot;
    TupleTableSlot *mj_NullOuterTupleSlot;  /* 为左子节点生成的空元组,在 mj_FillInner 为真时构造*/
    TupleTableSlot *mj_NullInnerTupleSlot;  /* 为右子节点生成的空元组,在 mj_FillOuter 为真时构造*/
    ExprContext *mj_OuterEContext;
    ExprContext *mj_InnerEContext;
} MergeJoinState;
主要执行函数:
ExecMergeJoin 通过两层 switch 判断当前归并连接的状态,计算连接值如发现匹配元组则直接返回,否则继续从外表或内表中获取有序元组,按照连接状态做匹配判断。
/*
 * States of the ExecMergeJoin state machine
 */
#define EXEC_MJ_INITIALIZE_OUTER        1
#define EXEC_MJ_INITIALIZE_INNER        2
#define EXEC_MJ_JOINTUPLES              3
#define EXEC_MJ_NEXTOUTER               4
#define EXEC_MJ_TESTOUTER               5
#define EXEC_MJ_NEXTINNER               6
#define EXEC_MJ_SKIP_TEST               7
#define EXEC_MJ_SKIPOUTER_ADVANCE       8
#define EXEC_MJ_SKIPINNER_ADVANCE       9
#define EXEC_MJ_ENDOUTER                10
#define EXEC_MJ_ENDINNER                11

static TupleTableSlot *
ExecMergeJoin(PlanState *pstate)
{
    ...;
    for (;;)
    {
        // 获取当前连接状态并执行相关操作
        switch (node->mj_JoinState)
        {
            // 初始状态,获取外表记录
            case EXEC_MJ_INITIALIZE_OUTER:
                outerTupleSlot = ExecProcNode(outerPlan);
                node->mj_OuterTupleSlot = outerTupleSlot;
                /* Compute join values and check for unmatchability */
                switch (MJEvalOuterValues(node))
                {
                    // 成功获取外表记录,跳转获取内表记录
                    case MJEVAL_MATCHABLE:
                        /* OK to go get the first inner tuple */
                        node->mj_JoinState = EXEC_MJ_INITIALIZE_INNER;
                        break;
                    // 外表值为 NULL 保持原状态继续扫描外表
                    case MJEVAL_NONMATCHABLE:
                        ...;
                        break;
                    // 外表扫描完,结束 Join
                    case MJEVAL_ENDOFJOIN:
                        ...;
                        return NULL;
                }
                break;
            // 扫描内表
            case EXEC_MJ_INITIALIZE_INNER:
                ...;
            
            // 该状态下将左右表的值进行连接投影,输出结果
            // 下个周期调用 ExecMergeJoin 函数时,直接进入 EXEC_MJ_NEXTINNER 状态
            case EXEC_MJ_JOINTUPLES:
                node->mj_JoinState = EXEC_MJ_NEXTINNER;
                qualResult = (otherqual == NULL ||
                                  ExecQual(otherqual, econtext));

                    if (qualResult)
                    {
                        return ExecProject(node->js.ps.ps_ProjInfo);
                    }
              ...;

HashJoin

HashJoin 能够提高连接效率,且不需要输入都是已经排好序的。
HashJoin 包含两个阶段:
  • build phase,理想情况下对小表构建 hash table,该表通常也称为 inner table
  • probe phase,扫描关联的另一张表的内容,并通过 hash table 探测是否有匹配的行/元组,该表通常称为 outer table
 
如果 Inner table 太大不能放到内存中的话,PG 采用 Hybrid hash join 来构建哈希表。首先对 Inner table 进行分 Batch,如果 Tuple 属于 Batch 0,则加入内存中的 Hashtable 中,否则则写入该 Batch 对应的磁盘文件中。然后对 Outer table 进行分 Batch
  • 如果 Outer table 的 Tuple 属于 Batch 0,则执行 HashJoin 算法
  • 如果不属于 Batch 0,则写入该 Batch 对应的磁盘文件中
Outer table 扫描结束时, Batch 0 也就处理完了。则继续处理 Batch 1:加载 Batch 1 的 Inner table 临时数据到内存中,构建 Hashtable,然后扫描 Batch 1 的 Outer table 执行 HashJoin 算法。完成 Batch 1 后,继续处理 Batch 2,直到完成所有的 Batch。
 
PG 在经典的 Hybrid hash join 之上还做了一些优化,一个重要的优化是对倾斜数据的优化,因为现实中的数据很多是非正态分布的数据。
Skew 优化的核心思想是尽量避免磁盘 IO:在 Batch 0 阶段处理 Outer table 最常见的 (MCV,Most common value) 数据。选择 Outer table 的 MCV 而不是 Inner table 的 MCV 的原因是优化器通常选择小表和正态分布的表做 Inner table,这样 Outer table 会更大,或者更大概率是非正态分布。
准备 Skew hash table,包括三个步骤:
  • 确定 Skew hash table 的大小。PostgreSQL 默认分配 2% 的内存用户构建 Skew hash table,并计算能容纳多少 MCV tuples。
  • 根据 pg_statistic syscache 数据,获得 Outer table 的 MCV 统计信息,对每个 MCV ,计算其 Hash 值,并放到其对应 Skew hash bucket 中
  • 填充 Skew hash table:扫描 Inner table 构建 Main hash table 时,如果当前 Tuple 属于 Skew hash table,则加入到 Skew hashtable 而非 Main hash table
扫描 Outer table 时,如果是 MCV 数据,则使用 Skew hash table 进行处理, 否则按照 Hybrid hash join 算法处理
HashJoin 相关数据结构:
typedef struct HashJoinState
{
    JoinState   js;             /* 基类,its first field is NodeTag */
    ExprState  *hashclauses;    // hash 连接条件
    List       *hj_OuterHashKeys;   /* list of ExprState nodes */
    List       *hj_HashOperators;   /* list of operator OIDs */
    List       *hj_Collations;
    HashJoinTable hj_HashTable;     // Hash 表
    uint32      hj_CurHashValue;    // 当前 hash 值
    int         hj_CurBucketNo;     // 当前 bucket 编号
    int         hj_CurSkewBucketNo; // skew bucket 编号
    HashJoinTuple hj_CurTuple;      // 当前元组
    TupleTableSlot *hj_OuterTupleSlot;
    TupleTableSlot *hj_HashTupleSlot;
    TupleTableSlot *hj_NullOuterTupleSlot;
    TupleTableSlot *hj_NullInnerTupleSlot;
    TupleTableSlot *hj_FirstOuterTupleSlot;
    int         hj_JoinState;       // JoinState 状态,存储状态信息
    bool        hj_MatchedOuter;    // 是否匹配
    bool        hj_OuterNotEmpty;
} HashJoinState;
执行函数:
  ExecHashJoin函数实现了 Hash Join 算法,实际实现的函数是ExecHashJoinImpl
  ExecHashJoinImpl函数把 Hash Join 划分为多个状态保存在 HashJoinState->hj_JoinState 字段中
#define HJ_BUILD_HASHTABLE      1   // 创建 Hash table
#define HJ_NEED_NEW_OUTER       2   // 扫描外表,计算外表连接键的 hash 值,把相匹配元组放在合适的 bucket 中
#define HJ_SCAN_BUCKET          3   // 拿到外表 tuple,在哈希表中查找外表tuple是否存在匹配
#define HJ_FILL_OUTER_TUPLE     4   // 外表元组扫描完成,检查是否使用虚拟外连接元组
#define HJ_FILL_INNER_TUPLES    5   // 已完成一个批处理,但如果做的是右外连接/全连接, 填充虚拟连接元组
#define HJ_NEED_NEW_BATCH       6   // 开启下一批次

static TupleTableSlot *         /* return: a tuple or NULL */
ExecHashJoin(PlanState *pstate)
{
    return ExecHashJoinImpl(pstate, false);
}

static pg_attribute_always_inline TupleTableSlot *
ExecHashJoinImpl(PlanState *pstate, bool parallel)
{
    ...;
    for (;;)
    {
        switch (node->hj_JoinState)
        {
            case HJ_BUILD_HASHTABLE:
                ...;
            case HJ_NEED_NEW_OUTER:
                ...;
            ...;
        }
    }
}
  • 第一个状态是 HJ_BUILD_HASHTABLE 节点,即为 Inner table 构造 Hashtable;
  • HJ_NEED_NEW_OUTER 状态去获取 Outer tuple
    • 如果获取不到 Outer Tuple(外表遍历完成)
      • 如果是右连接/全连接,则跳转 HJ_FILL_INNER_TUPLES 节点填充空 tuple
      • 否则跳转 HJ_NEED_NEW_BATCH 处理下一批次
    • 如果获取到了 Outer Tuple
      • 如果属于当前批次,跳转至 HJ_SCAN_BUCKET 进行匹配
      • 否则跳转 HJ_NEED_NEW_OUTER(当前 Tuple 不属于当前 Batch,则写盘继续读取外表)
  • HJ_SCAN_BUCKET 状态,已经拿到了 Outer Tuple,需要在哈希表中查找该外表 Tuple 是否存在匹配
    • 如果不匹配,则跳转 HJ_FILL_OUTER_TUPLE
    • 如果匹配,且不是 ANTI join 返回 Joined Tuple
  • HJ_FILL_OUTER_TUPLE 状态说明 Inner 和 Outer 没有匹配上,需要跳转 HJ_NEED_NEW_OUTER 获取新的外表元组,但在此之前需要判断是否是左连接/全连接
  • HJ_FILL_INNER_TUPLES 状态中如果是右连接/全连接,不断获取 Batch 中的 Inner Tuple,与空元组匹配。直到这个 Batch 用完了,进入 HJ_NEED_NEW_BATCH
 
0条评论
0 / 1000
z****n
4文章数
0粉丝数
z****n
4 文章 | 0 粉丝
z****n
4文章数
0粉丝数
z****n
4 文章 | 0 粉丝
原创

PostgreSQL计划节点源码剖析

2023-08-09 01:29:52
60
0

计划节点

可优化语句执行的核心是对各种计划节点的处理。PG 中计划节点分为四类:
  • 扫描节点(Scan Node):用于扫描表等对象以从中获取元组
  • 连接节点(Join Node):用于关系代数中连接操作
  • 控制节点(Control Node):用于处理特殊情况的节点
  • 物化节点(Materialization Node):能够缓存执行结果到辅助存储中

 

扫描节点

扫描节点的作用是扫描表,每次获取一条元组作为上层节点的输入。PG 中扫描节点有如下几类:

 

扫描节点的公共数据结构:
// 计划节点类型
typedef struct Scan
{
    Plan        plan;      // 包含了所有计划节点都需要的成员变量和方法,如成本估算、执行和释放计划等
    Index       scanrelid; // 指定要扫描的关系的 relid(relation identifier)
} Scan;

// 节点的执行状态
typedef struct ScanState
{
    PlanState   ps;           /* 计划节点状态结构 */
    Relation    ss_currentRelation;
    struct TableScanDescData *ss_currentScanDesc;
    TupleTableSlot *ss_ScanTupleSlot;
} ScanState;


 

扫描节点的执行函数都统一调用ExecScan函数,但根据节点的类型给ExecScan的参数 AccessMtd 赋予不同的函数指针来实现不同扫描节点的扫描。

 

SeqScan

  顺序扫描使用场景:对无索引字段进行查询、或者判断到查询将返回大多数数据
  主流程:

 

SeqScan 相关数据结构:
// 顺序扫描节点
typedef Scan SeqScan;

/* SeqScanState information */
typedef struct SeqScanState
{
    ScanState   ss;             /* 扫描状态信息 */
    Size        pscan_len;      /* size of parallel heap scan descriptor */
} SeqScanState;
SeqScan 节点初始化:
SeqScanState *
ExecInitSeqScan(SeqScan *node, EState *estate, int eflags)
{
    SeqScanState *scanstate;    // 创建 SeqScanState 结构体

    // 将 Seqscan 节点链接到 SeqScanState 相应字段
    scanstate = makeNode(SeqScanState);
    scanstate->ss.ps.plan = (Plan *) node;
    scanstate->ss.ps.state = estate;
    scanstate->ss.ps.ExecProcNode = ExecSeqScan;
    
    ...;
    
    return scanstate;
}
SeqScan 的执行函数ExecSeqScan,函数中会调用ExecScan
static TupleTableSlot *
ExecSeqScan(PlanState *pstate)
{
    SeqScanState *node = castNode(SeqScanState, pstate);

    return ExecScan(&node->ss,
                    (ExecScanAccessMtd) SeqNext,  // 将 SeqNext 函数指针作为 ExecScan 函数参数
                    (ExecScanRecheckMtd) SeqRecheck);
}

TupleTableSlot *
ExecScan(ScanState *node,
         ExecScanAccessMtd accessMtd,   /* function returning a tuple */
         ExecScanRecheckMtd recheckMtd)
{
    ...;
    for (;;)
    {
        TupleTableSlot *slot;

        slot = ExecScanFetch(node, accessMtd, recheckMtd);
        ...;
        if (qual == NULL || ExecQual(qual, econtext))
        {
            if (projInfo)    return ExecProject(projInfo);
            else    return slot;
     }
    ...;
}

static inline TupleTableSlot *
ExecScanFetch(ScanState *node,
              ExecScanAccessMtd accessMtd,
              ExecScanRecheckMtd recheckMtd)
{
    ...;
    return (*accessMtd) (node);
}

static TupleTableSlot *
SeqNext(SeqScanState *node)
{
    ...;
    // scandesc 变量用于获取当前扫描描述符,类似于迭代器
    // 如果扫描描述符为空,则说明当前还没有开始扫描操作,需要调用 table_beginscan() 函数创建一个新的扫描描述符
    if (scandesc == NULL)   
    {
        scandesc = table_beginscan(node->ss.ss_currentRelation,
                                   estate->es_snapshot,
                                   0, NULL);
        node->ss.ss_currentScanDesc = scandesc;
    }

    // get the next tuple from the table
    if (table_scan_getnextslot(scandesc, direction, slot))
        return slot;
    return NULL;
}

static inline bool
table_scan_getnextslot(TableScanDesc sscan, ScanDirection direction, TupleTableSlot *slot)
{
    slot->tts_tableOid = RelationGetRelid(sscan->rs_rd);
    return sscan->rs_rd->rd_tableam->scan_getnextslot(sscan, direction, slot);
}

 

IndexScan

索引扫描使用场景:如果选择条件涉及的属性上建立了索引,判断到查询将会命中非常少的数据

 

扫描索引节点 IndexScan:
typedef struct IndexScan
{
    Scan        scan;
    Oid         indexid;            /* 用于存储索引的 OID */
    List       *indexqual;          /* 用于存储索引扫描的过滤条件 */
    List       *indexqualorig;      /* 存储没有处理的原始扫描条件链表 */
    List       *indexorderby;       /* 索引扫描的排序条件,通常是表达式组成的列表 */
    List       *indexorderbyorig;   /* 索引扫描排序条件的原始形式的列表 */
    List       *indexorderbyops;    /* 用于排序的操作符OID的列表 */
    ScanDirection indexorderdir;    /* 索引扫描的排序方向,可以是向前、向后或不关心 */
} IndexScan;

 

扫描索引的状态结构体 IndexScanState:
  • ScanKeys:存储索引列的信息、操作函数等,描述了一个完整的过滤条件,并用于索引扫描
  • 但如果过滤条件是个复杂的表达式,引入 iss_RuntimeKeys 来处理
typedef struct IndexScanState
{
    ScanState   ss;
    ExprState  *indexqualorig;           // 索引过滤条件
    List       *indexorderbyorig;        
    struct ScanKeyData *iss_ScanKeys;    // 简单的索引过滤条件,如常量
    int         iss_NumScanKeys;         // 指示数组 iss_ScanKeys 长度
    struct ScanKeyData *iss_OrderByKeys; 
    int         iss_NumOrderByKeys;
    IndexRuntimeKeyInfo *iss_RuntimeKeys; // 索引过滤条件不是常量,需要动态计算的
    int         iss_NumRuntimeKeys;       // 指示数组 iss_RuntimeKeys 长度
    bool        iss_RuntimeKeysReady;
    ExprContext *iss_RuntimeContext;
    Relation    iss_RelationDesc;
    struct IndexScanDescData *iss_ScanDesc;  // 扫描描述符
    ...;
} IndexScanState;

 

IndexScan 主要执行函数:
/* 基于索引读取元组,并返回给执行器上层 */
static TupleTableSlot *
ExecIndexScan(PlanState *pstate)
{
    IndexScanState *node = castNode(IndexScanState, pstate);
    ...;
    if (node->iss_NumOrderByKeys > 0)    // 有 ORDER BY子句,需要按指定的顺序对结果进行排序
        return ExecScan(&node->ss,
                        (ExecScanAccessMtd) IndexNextWithReorder,
                        (ExecScanRecheckMtd) IndexRecheck);
    else    // 查询计划中没有 ORDER BY子句,可以直接使用普通的索引扫描方式
        return ExecScan(&node->ss,
                        (ExecScanAccessMtd) IndexNext,
                        (ExecScanRecheckMtd) IndexRecheck);
}

/* 被 ExecSeqScan 调用 */
TupleTableSlot *
ExecScan(ScanState *node,
         ExecScanAccessMtd accessMtd,   /* function returning a tuple */
         ExecScanRecheckMtd recheckMtd)
{
    ExprContext *econtext;
    ExprState  *qual;
    ProjectionInfo *projInfo;

    for (;;)
    {
        TupleTableSlot *slot;
        // 获取下一个元组
        slot = ExecScanFetch(node, accessMtd, recheckMtd);
        ...;
        
        if (qual == NULL || ExecQual(qual, econtext))
        {
            // Found a satisfactory scan tuple.
            if (projInfo)    return ExecProject(projInfo);
            else    return slot;
    }
}

/* 执行 IndexNext 函数 */
static inline TupleTableSlot *
ExecScanFetch(ScanState *node,
              ExecScanAccessMtd accessMtd,
              ExecScanRecheckMtd recheckMtd)
{
    EState    *estate = node->ps.state;
    ...;
    return (*accessMtd) (node);
}

/* 不断进行元组扫描,读取元组,并将元组封装成 TupleTableSlot 传递给上层节点 */
static TupleTableSlot *
IndexNext(IndexScanState *node)
{
    EState     *estate;
    ExprContext *econtext;
    ScanDirection direction;
    IndexScanDesc scandesc;
    TupleTableSlot *slot;
    ...;
    while (index_getnext_slot(scandesc, direction, slot))
    {
        CHECK_FOR_INTERRUPTS();
        /*
         * If the index was lossy, we have to recheck the index quals using
         * the fetched tuple.
         */
        if (scandesc->xs_recheck)
        {
            econtext->ecxt_scantuple = slot;
            if (!ExecQualAndReset(node->indexqualorig, econtext))
            {
                /* Fails recheck, so drop it and loop back for another */
                InstrCountFiltered2(node, 1);
                continue;
            }
        }

        return slot;
    }
    
    /*
     * if we get here it means the index scan failed so we are at the end of
     * the scan..
     */
    node->iss_ReachedEnd = true;
    return ExecClearTuple(slot);
}

/* 获取数据 */
bool
index_getnext_slot(IndexScanDesc scan, ScanDirection direction, TupleTableSlot *slot)
{
    for (;;)
    {
        if (!scan->xs_heap_continue)
        {
            ItemPointer tid;

            // 从索引中拿到遍历的索引元组 TID,TID是堆表中实际数据的物理位置
            tid = index_getnext_tid(scan, direction);

            /* If we're out of index entries, we're done */
            if (tid == NULL)
                break;
            Assert(ItemPointerEquals(tid, &scan->xs_heaptid));
        }

        Assert(ItemPointerIsValid(&scan->xs_heaptid));
        if (index_fetch_heap(scan, slot))   // 通过索引元组拿到堆表中实际数据的元组
            return true;
    }
    return false;
}

 

连接节点

PG 中有如下的连接类型:
 
PG 中实现了三种连接操作:嵌套循环连接(Nest Loop)、归并连接(Merge Loop)和 Hash 连接(Hash Join)。
连接节点的公共数据结构:
/* 连接操作(Join)的节点 */
typedef struct Join
{
    Plan        plan;           // 包含了所有计划节点都需要的成员变量和方法,如成本估算、执行和释放计划等
    JoinType    jointype;       // 连接的类型(Join Type),例如内连接、左外连接、右外连接等
    bool        inner_unique;   // 为 true 表示每个外部元组最多只能匹配一个内部元组
    List       *joinqual;       // 连接的条件
} Join;

/* 连接计划状态节点的超类(superclass) */
typedef struct JoinState
{
    PlanState   ps;
    JoinType    jointype;    // 连接的类型(Join Type),例如内连接、左外连接、右外连接等
    bool        single_match;// 为 true 表示在找到一个内表匹配后应该跳过到下一个外表元组。这在某些连接算法中可以提高性能
    ExprState  *joinqual;    // 连接的条件
} JoinState;
 

NestLoop

Nestloop Join:嵌套循环连接(Nestloop Join)是两个表做连接时最朴素的方式。通过两层循环,用第一张表做 Outer Table,第二张表做 Inner Table,Outer Table 的每一条记录跟 Inner Table 的记录作比较,符合条件的就输出。适用于返回的结果集不能太大,外表不能太大。
 
嵌套循环连接基本思想:
FOR each tuple s in S DO
    FOR each tuple r in R DO
        IF r and s join to make a tuple t THEN
            output t;
函数调用流程:
 
节点类型:NestLoop
/* 嵌套循环连接的节点 */
typedef struct NestLoop
{
    Join        join;
    List       *nestParams;     // NestLoopParam节点的列表,用于标识执行器参数
} NestLoop;
执行过程状态信息:NestLoopState
/* 用于存储嵌套循环连接(nested loop join)在数据库查询执行过程中的状态信息 */
typedef struct NestLoopState
{
    JoinState   js;             /* its first field is NodeTag */
    // 控制循环,内层循环时为 false;内层循环结束时置为 true
    bool        nl_NeedNewOuter; 
    // 跟踪是否找到了当前外部元组的连接匹配。如果找到了匹配,则设置为true
    bool        nl_MatchedOuter;
    // 存储一个准备好的空元组。为 Left Join 和 Anti Join 可能用到空元组的情况准备
    //当在左外连接中找不到与外部元组(左侧表中的记录)匹配的内部元组(右侧表中的记录)时,会使用这个空元组来表示缺失的值
    TupleTableSlot *nl_NullInnerTupleSlot; 
} NestLoopState;
执行相关函数:
static TupleTableSlot *
ExecNestLoop(PlanState *pstate)
{
    ...;
    outerPlan = outerPlanState(node);
    innerPlan = innerPlanState(node);
    for (;;)
    {    
        // 如果当前没有外表元组可用(即 nl_NeedNewOuter 为真),则需要获取下一个外部元组并重置内部扫描
        if (node->nl_NeedNewOuter)
        {
            outerTupleSlot = ExecProcNode(outerPlan);
            ...;
        }
        // 有一个外表元组(outerTuple),尝试获取内表元组(inner tuple)
        innerTupleSlot = ExecProcNode(innerPlan);
        
        ...;
        
        // 判断内表元组和外表元组是否符合限定条件
        // joinqual 保存着 join 条件信息,比如哪个column=哪个column
        // excontext 里保存和 outer tuple 和 inner tuple
        if (ExecQual(joinqual, econtext))
        {
            ...;
            return ExecProject(node->js.ps.ps_ProjInfo);
        }
 }
 
static inline TupleTableSlot *
ExecProject(ProjectionInfo *projInfo)
{
    ExprContext *econtext = projInfo->pi_exprContext;
    ExprState  *state = &projInfo->pi_state;
    TupleTableSlot *slot = state->resultslot;   // 最终投影后的结果存储到此,返回给上层
    bool        isnull;
    
    // 清空结果槽(slot)中的先前内容,将槽的状态重置为初始状态
    ExecClearTuple(slot);

    // 执行器投影算子入口函数, 底层调用了 ExecInterpExpr 来获得 slot
    (void) ExecEvalExprSwitchContext(state, econtext, &isnull);

    // 成功生成了一个结果行,将结果槽标记为包含一个有效的虚拟元组
    slot->tts_flags &= ~TTS_FLAG_EMPTY;
    slot->tts_nvalid = slot->tts_tupleDescriptor->natts;

    return slot;
}
 

MergeJoin

MergeJoin 实现了对排序关系的归并连接算法,其输入都是已经排好序的。
其伪代码:
Join{
    get initial outer and inner tuples   初始化
    do forever {
        while(outer != inner) {
            if(outer < inner)
                advance outer position
            else
                advance inner position
        }
        mark inner position
        do forever {
            while(outer == inner) {
                join tuples
                advance inner position
            }
            advance outer position
            if(outer == mark)
                restore inner position to mark
            else
                break
        }
    }
}
以 Inner Join 为例:
 
函数调用流程:
 
MergeJoin 数据结构:
typedef struct MergeJoin
{
    Join        join;
    bool        skip_mark_restore;  /* 指示是否可以跳过某些 mark */
    List       *mergeclauses;   /* 存储用于计算左右子节点元组是否匹配的表达式链表 */
    /*  
    * 与表达式链表 mergeclauses 对应
    * 表明其中每个操作符的操作符类、排序规则、执行的策略以及空值排序策略
    */
    Oid        *mergeFamilies;  /* per-clause OIDs of btree opfamilies */
    Oid        *mergeCollations;    /* per-clause OIDs of collations */
    int        *mergeStrategies;    /* per-clause ordering (ASC or DESC) */
    bool       *mergeNullsFirst;    /* per-clause nulls ordering */
} MergeJoin;

/* 初始化过程中使用 MergeJoin 初始化 MergeJoinState */
typedef struct MergeJoinState
{
    JoinState   js;             /* its first field is NodeTag */
    int         mj_NumClauses;
    MergeJoinClause mj_Clauses; /* array of length mj_NumClauses */
    int         mj_JoinState;
    bool        mj_SkipMarkRestore;
    bool        mj_ExtraMarks;
    bool        mj_ConstFalseJoin;
    /* true 表示不能忽略没有匹配项的左子节点元组,需要与空元组连接,用于Left Join、ANTI JOIN、FULL JOIN */
    bool        mj_FillOuter;   
    /* true 表示不能忽略没有匹配项的右子节点元组,需要与空元组连接,用于RIGHT Join、FULL JOIN */
    bool        mj_FillInner;   
    bool        mj_MatchedOuter;    /* 当前左子节点元组是否已找到了连接的匹配内部元组 */
    bool        mj_MatchedInner;    /* 当前右子节点元组是否已找到了连接的匹配内部元组 */
    TupleTableSlot *mj_OuterTupleSlot;
    TupleTableSlot *mj_InnerTupleSlot;
    TupleTableSlot *mj_MarkedTupleSlot;
    TupleTableSlot *mj_NullOuterTupleSlot;  /* 为左子节点生成的空元组,在 mj_FillInner 为真时构造*/
    TupleTableSlot *mj_NullInnerTupleSlot;  /* 为右子节点生成的空元组,在 mj_FillOuter 为真时构造*/
    ExprContext *mj_OuterEContext;
    ExprContext *mj_InnerEContext;
} MergeJoinState;
主要执行函数:
ExecMergeJoin 通过两层 switch 判断当前归并连接的状态,计算连接值如发现匹配元组则直接返回,否则继续从外表或内表中获取有序元组,按照连接状态做匹配判断。
/*
 * States of the ExecMergeJoin state machine
 */
#define EXEC_MJ_INITIALIZE_OUTER        1
#define EXEC_MJ_INITIALIZE_INNER        2
#define EXEC_MJ_JOINTUPLES              3
#define EXEC_MJ_NEXTOUTER               4
#define EXEC_MJ_TESTOUTER               5
#define EXEC_MJ_NEXTINNER               6
#define EXEC_MJ_SKIP_TEST               7
#define EXEC_MJ_SKIPOUTER_ADVANCE       8
#define EXEC_MJ_SKIPINNER_ADVANCE       9
#define EXEC_MJ_ENDOUTER                10
#define EXEC_MJ_ENDINNER                11

static TupleTableSlot *
ExecMergeJoin(PlanState *pstate)
{
    ...;
    for (;;)
    {
        // 获取当前连接状态并执行相关操作
        switch (node->mj_JoinState)
        {
            // 初始状态,获取外表记录
            case EXEC_MJ_INITIALIZE_OUTER:
                outerTupleSlot = ExecProcNode(outerPlan);
                node->mj_OuterTupleSlot = outerTupleSlot;
                /* Compute join values and check for unmatchability */
                switch (MJEvalOuterValues(node))
                {
                    // 成功获取外表记录,跳转获取内表记录
                    case MJEVAL_MATCHABLE:
                        /* OK to go get the first inner tuple */
                        node->mj_JoinState = EXEC_MJ_INITIALIZE_INNER;
                        break;
                    // 外表值为 NULL 保持原状态继续扫描外表
                    case MJEVAL_NONMATCHABLE:
                        ...;
                        break;
                    // 外表扫描完,结束 Join
                    case MJEVAL_ENDOFJOIN:
                        ...;
                        return NULL;
                }
                break;
            // 扫描内表
            case EXEC_MJ_INITIALIZE_INNER:
                ...;
            
            // 该状态下将左右表的值进行连接投影,输出结果
            // 下个周期调用 ExecMergeJoin 函数时,直接进入 EXEC_MJ_NEXTINNER 状态
            case EXEC_MJ_JOINTUPLES:
                node->mj_JoinState = EXEC_MJ_NEXTINNER;
                qualResult = (otherqual == NULL ||
                                  ExecQual(otherqual, econtext));

                    if (qualResult)
                    {
                        return ExecProject(node->js.ps.ps_ProjInfo);
                    }
              ...;

HashJoin

HashJoin 能够提高连接效率,且不需要输入都是已经排好序的。
HashJoin 包含两个阶段:
  • build phase,理想情况下对小表构建 hash table,该表通常也称为 inner table
  • probe phase,扫描关联的另一张表的内容,并通过 hash table 探测是否有匹配的行/元组,该表通常称为 outer table
 
如果 Inner table 太大不能放到内存中的话,PG 采用 Hybrid hash join 来构建哈希表。首先对 Inner table 进行分 Batch,如果 Tuple 属于 Batch 0,则加入内存中的 Hashtable 中,否则则写入该 Batch 对应的磁盘文件中。然后对 Outer table 进行分 Batch
  • 如果 Outer table 的 Tuple 属于 Batch 0,则执行 HashJoin 算法
  • 如果不属于 Batch 0,则写入该 Batch 对应的磁盘文件中
Outer table 扫描结束时, Batch 0 也就处理完了。则继续处理 Batch 1:加载 Batch 1 的 Inner table 临时数据到内存中,构建 Hashtable,然后扫描 Batch 1 的 Outer table 执行 HashJoin 算法。完成 Batch 1 后,继续处理 Batch 2,直到完成所有的 Batch。
 
PG 在经典的 Hybrid hash join 之上还做了一些优化,一个重要的优化是对倾斜数据的优化,因为现实中的数据很多是非正态分布的数据。
Skew 优化的核心思想是尽量避免磁盘 IO:在 Batch 0 阶段处理 Outer table 最常见的 (MCV,Most common value) 数据。选择 Outer table 的 MCV 而不是 Inner table 的 MCV 的原因是优化器通常选择小表和正态分布的表做 Inner table,这样 Outer table 会更大,或者更大概率是非正态分布。
准备 Skew hash table,包括三个步骤:
  • 确定 Skew hash table 的大小。PostgreSQL 默认分配 2% 的内存用户构建 Skew hash table,并计算能容纳多少 MCV tuples。
  • 根据 pg_statistic syscache 数据,获得 Outer table 的 MCV 统计信息,对每个 MCV ,计算其 Hash 值,并放到其对应 Skew hash bucket 中
  • 填充 Skew hash table:扫描 Inner table 构建 Main hash table 时,如果当前 Tuple 属于 Skew hash table,则加入到 Skew hashtable 而非 Main hash table
扫描 Outer table 时,如果是 MCV 数据,则使用 Skew hash table 进行处理, 否则按照 Hybrid hash join 算法处理
HashJoin 相关数据结构:
typedef struct HashJoinState
{
    JoinState   js;             /* 基类,its first field is NodeTag */
    ExprState  *hashclauses;    // hash 连接条件
    List       *hj_OuterHashKeys;   /* list of ExprState nodes */
    List       *hj_HashOperators;   /* list of operator OIDs */
    List       *hj_Collations;
    HashJoinTable hj_HashTable;     // Hash 表
    uint32      hj_CurHashValue;    // 当前 hash 值
    int         hj_CurBucketNo;     // 当前 bucket 编号
    int         hj_CurSkewBucketNo; // skew bucket 编号
    HashJoinTuple hj_CurTuple;      // 当前元组
    TupleTableSlot *hj_OuterTupleSlot;
    TupleTableSlot *hj_HashTupleSlot;
    TupleTableSlot *hj_NullOuterTupleSlot;
    TupleTableSlot *hj_NullInnerTupleSlot;
    TupleTableSlot *hj_FirstOuterTupleSlot;
    int         hj_JoinState;       // JoinState 状态,存储状态信息
    bool        hj_MatchedOuter;    // 是否匹配
    bool        hj_OuterNotEmpty;
} HashJoinState;
执行函数:
  ExecHashJoin函数实现了 Hash Join 算法,实际实现的函数是ExecHashJoinImpl
  ExecHashJoinImpl函数把 Hash Join 划分为多个状态保存在 HashJoinState->hj_JoinState 字段中
#define HJ_BUILD_HASHTABLE      1   // 创建 Hash table
#define HJ_NEED_NEW_OUTER       2   // 扫描外表,计算外表连接键的 hash 值,把相匹配元组放在合适的 bucket 中
#define HJ_SCAN_BUCKET          3   // 拿到外表 tuple,在哈希表中查找外表tuple是否存在匹配
#define HJ_FILL_OUTER_TUPLE     4   // 外表元组扫描完成,检查是否使用虚拟外连接元组
#define HJ_FILL_INNER_TUPLES    5   // 已完成一个批处理,但如果做的是右外连接/全连接, 填充虚拟连接元组
#define HJ_NEED_NEW_BATCH       6   // 开启下一批次

static TupleTableSlot *         /* return: a tuple or NULL */
ExecHashJoin(PlanState *pstate)
{
    return ExecHashJoinImpl(pstate, false);
}

static pg_attribute_always_inline TupleTableSlot *
ExecHashJoinImpl(PlanState *pstate, bool parallel)
{
    ...;
    for (;;)
    {
        switch (node->hj_JoinState)
        {
            case HJ_BUILD_HASHTABLE:
                ...;
            case HJ_NEED_NEW_OUTER:
                ...;
            ...;
        }
    }
}
  • 第一个状态是 HJ_BUILD_HASHTABLE 节点,即为 Inner table 构造 Hashtable;
  • HJ_NEED_NEW_OUTER 状态去获取 Outer tuple
    • 如果获取不到 Outer Tuple(外表遍历完成)
      • 如果是右连接/全连接,则跳转 HJ_FILL_INNER_TUPLES 节点填充空 tuple
      • 否则跳转 HJ_NEED_NEW_BATCH 处理下一批次
    • 如果获取到了 Outer Tuple
      • 如果属于当前批次,跳转至 HJ_SCAN_BUCKET 进行匹配
      • 否则跳转 HJ_NEED_NEW_OUTER(当前 Tuple 不属于当前 Batch,则写盘继续读取外表)
  • HJ_SCAN_BUCKET 状态,已经拿到了 Outer Tuple,需要在哈希表中查找该外表 Tuple 是否存在匹配
    • 如果不匹配,则跳转 HJ_FILL_OUTER_TUPLE
    • 如果匹配,且不是 ANTI join 返回 Joined Tuple
  • HJ_FILL_OUTER_TUPLE 状态说明 Inner 和 Outer 没有匹配上,需要跳转 HJ_NEED_NEW_OUTER 获取新的外表元组,但在此之前需要判断是否是左连接/全连接
  • HJ_FILL_INNER_TUPLES 状态中如果是右连接/全连接,不断获取 Batch 中的 Inner Tuple,与空元组匹配。直到这个 Batch 用完了,进入 HJ_NEED_NEW_BATCH
 
文章来自个人专栏
文章 | 订阅
0条评论
0 / 1000
请输入你的评论
0
0