1. Xlog 数据格式
一条 WAL 日志分为四个分布,结构如下:
1.1 块私有区域数据
当添加一个数据页到
wal
日志里时,会生成一个registered_buffer
实例,里面包含了自身的私有数据链表。typedef struct{
bool in_use; /* is this slot in use? */
uint8 flags; /* REGBUF_* flags */
RelFileNode rnode; /* 指定所属表的存储目录 */
ForkNumber forkno; /* 哪种文件类型 */
BlockNumber block; /* 块编号 */
Page page; /* 对应的原始数据页 */
uint32 rdata_len; /* 私有数据链表的长度总和 */
XLogRecData *rdata_head; /* 私有数据链表头部节点,这个节点只用来标志头部,它的 next 是第一个有效数据 */
XLogRecData *rdata_tail; /* 私有数据链表尾部节点 */
XLogRecData bkp_rdatas[2]; /* 存储着压缩后或忽略空闲数据的数据,如果有空闲位置且没有压缩,那么数据会被分成两个部分,存储在两个数组元素里 */
char compressed_page[PGLZ_MAX_BLCKSZ]; /* 如果开启了压缩,那么存储着压缩后的数据 */
} registered_buffer;
1.2 XLog 数据
Xlog 数据同样是链表的形式组织起来,由全局变量管理
static XLogRecData *mainrdata_head; /* 链表头 */
static XLogRecData *mainrdata_last = (XLogRecData *) &mainrdata_head; /* 链表尾 */
static uint32 mainrdata_len; /* 链表所有节点的数据总和 */
2. XLogBeginInsert
通过设置
begininsert_called
标志防止递归调用日志生成函数;通过 XLogInsertAllowed
函数和一些 Assert
做代码检查工作。3. XLogRegisterData(char *data, int len)
预先已经定义了一个数组
static XLogRecData *rdatas
,这个结构表示链表节点,每次添加块私有数据或者 Xlog 数据,都会生成一个节点,添加到对应的链表中。如果是块私有数据,那么会添加到块的私有链表里。如果是共享数据,那么会添加到全局的共享链表里。XLogRegisterData
插入的是 Xlog 数据。统一管理的数据结构为:typedef struct XLogRecData{
struct XLogRecData *next; /* 指向下个节点,如果为NULL表示到达链表结尾 */
char *data; /* 数据的起始地址 */
uint32 len; /* 数据的长度 */
} XLogRecData;
自定义数据结构为
xl_**_insert
,取一个 XLogRecData *rdata
(在rdatas
的下一个空闲位置),设置如下参数。rdata->data = data;
rdata->len = len;
其中
rdata
类型为 XLogRecData *
,data
类型为 xl_**_insert *
。需要注意的是,每个 WAL 记录的 rdata
在 rdatas
中的位置不一定是连续的,如下图:
然后设置
mainrdata_last
,其中 XLog 数据由链表组成,mainrdata_head
是链表头,mainrdata_last
是链表尾,mainrdata_len
是链表所有节点的数据总和。由上文可知,这部分是 XLog 数据保存过程mainrdata_last->next = rdata;
mainrdata_last = rdata;
mainrdata_len += len;
4. XLogRegisterBuffer(uint8 block_id, Buffer buffer, uint8 flags)
将涉及到的 buff 注册到 wal 记录,比如 insert 语句的目标 buff、update 语句的目标 buff 和源 buff。同样的,这个 buffer 也需要我们自己实现。实现过程为,找到 register_buffer 数组中的第一个空位,将这个位置填充。最终实现的效果如下:

5. XLogRegisterBufData(uint8 block_id, char *data, int len)
将元组内容注册到 wal 记录。比如 insert 语句的元组数据、update 语句的新元组数据。由 2.1 可知,这部分是注册块头部和块私有数据,注册的数据结构为 xl_*_header 和 实际数据(其中第一次注册 xl_*_header,随后注册实际数据,例如在 heap_insert 中,第一次注册 xl_heap_header,随后注册 heaptup->t_data)

通过 block_id 从 registered_buffers 中拿到对应的 regbuf,在 rdatas 数组中取下一个空闲位置,然后
rdata->data = data;
rdata->len = len;
regbuf->rdata_tail->next = rdata;
regbuf->rdata_tail = rdata;
regbuf->rdata_len += len;
其中 rdata 类型是统一管理类型 XLogRecData ,data 类型为 xl_*_header *,最终的效果如下

6. XLogInsert(RmgrId rmid, uint8 info)
插入一个具有指定的 RMID 和 info 字节的 XLog 记录,该记录的主体是先前通过 XLogRegister* 调用注册的数据和缓冲区引用。首先判断调用方必须设置 rmgr 位: XLR_SPECIAL_REL_UPDATE & XLR_CHECK_CONSISTENCY,其余在这里保留使用。随后判断如果在 bootstrap 模式下,除了 XLog 资源外,不需要实际记录内容,返回一个伪记录指针即可
if (IsBootstrapProcessingMode() && rmid != RM_XLOG_ID)
{
XLogResetInsertion();
EndPos = SizeOfXLogLongPHD; /* start of 1st chkpt record */
return EndPos;
}
否则执行下面的循环:首先获取全页写信息,然后调用 XLogRecordAssemble 拼装上面已经注册的信息,再调用 XLogInsertRecord 插入记录。执行完循环后,调用 XLogResetInsertion() 清空所有注册信息。
do
{
XLogRecPtr RedoRecPtr;
bool doPageWrites;
XLogRecPtr fpw_lsn;
XLogRecData *rdt;
/*
* Get values needed to decide whether to do full-page writes. Since
* we don't yet have an insertion lock, these could change under us,
* but XLogInsertRecord will recheck them once it has a lock.
*/
GetFullPageWriteInfo(&RedoRecPtr, &doPageWrites);
rdt = XLogRecordAssemble(rmid, info, RedoRecPtr, doPageWrites,
&fpw_lsn);
EndPos = XLogInsertRecord(rdt, fpw_lsn, curinsert_flags);
} while (EndPos == InvalidXLogRecPtr);
XLogResetInsertion();
可以看到,函数的逻辑主要由 XLogRecordAssemble 和 XLogInsertRecord 实现。
7. XLogRecordAssemble(RmgrId rmid, uint8 info, XLogRecPtr RedoRecPtr, bool doPageWrites, XLogRecPtr *fpw_lsn)
XLogRecordAssemble 函数从已注册的数据和缓冲区中组装 XLog record 到 XLogRecData 链表中,组装完成后可以使用 XLogInsertRecord() 函数插入到 WAL buffer 中。在上文图中可以看到,每个 WAL 日志包含了一个 XLog 头部,定义如下:
typedef struct XLogRecord{
uint32 xl_tot_len; /* 整个xlog的长度,包含头部 */
TransactionId xl_xid; /* xact id */
XLogRecPtr xl_prev; /* 指向前一个 xlog 位置的指针 */
uint8 xl_info; /* 标记位 */
RmgrId xl_rmid; /* 表示用于哪种用途 */
pg_crc32c xl_crc; /* xlog的crc32c校检码,不包含头部 */
} XLogRecord;
rmid(Resource Manager ID) 的定义
0 XLOG
1 Transaction
2 Storage
3 CLOG
4 Database
5 Tablespace
6 MultiXact
7 RelMap
8 Standby
9 Heap2
10 Heap
11 Btree
12 Hash
13 Gin
14 Gist
15 Sequence
16 SPGist
8. XLogInsertRecord(XLogRecData *rdata, XLogRecPtr fpw_lsn, uint8 flags)
将日志写入 WAL buffer,主要步骤有两部分:
8.1 预留空间
组装完成后日志记录的长度已经确定,因此可以先计算这个长度,并在 WAL Buffer 里预留空间,空间预留的过程通过 XLogCtl->Insert->insertpos_lck 锁保护。也就是说,每个需要写入 WAL 日志记录的进程在预留空间时都是互斥的。预留空间时会调用 ReserveXLogInsertLocation,返回预留的 StartPos(起始位置)和 EndPos(结束位置)。
8.2 数据复制
一旦空间预留完成,数据复制的过程是可以并发的,通过 WALInsertLocks 锁来控制并发复制的过程。PG 声明了NUM_XLOGINSERT_LOCKS(目前是8)个 WALInsertLocks,每个 WALInsertLocks 由轻量级+日志写入位置组成。不同进程的不同事务在刷入日志时会随机(参照自己的 MyProc->pgprocno )获取一个 WALInsertLocks。数据辅助调用 CopyXLogRecordToWAL 函数,将 XLog 数据复制到 WAL Buffer。最后返回 XLog 的 EndPos,标志日志写入的结束位置。
下面简述一下该函数的流程:
- 首先是一些检查:
/* WAL日志段切换期间会拿排他锁,此时其他进程不能预留空间 */ if (isLogSwitch) WALInsertLockAcquireExclusive(); else WALInsertLockAcquire(); /* 进程当前copy的RedoRecPtr有没有过期 如果过期了(只会发生在恰好做完checkpoint操作),需要回到调用函数重新计算 因此这种场景下会比其他场景慢 */ if (RedoRecPtr != Insert->RedoRecPtr) { Assert(RedoRecPtr < Insert->RedoRecPtr); RedoRecPtr = Insert->RedoRecPtr; }
- 随后检查需不需要全页写
/* 检查是否启用了 fullPageWrites 或者 forcePageWrites */ doPageWrites = (Insert->fullPageWrites || Insert->forcePageWrites); if (fpw_lsn != InvalidXLogRecPtr && fpw_lsn <= RedoRecPtr && doPageWrites) { /* * 如果配置了全页写,但我们没做全页写,需要回炉重做,直接报错返回 */ WALInsertLockRelease(); END_CRIT_SECTION(); return InvalidXLogRecPtr; }
-
在 WAL buffer 中预留空间
/* * 如果是日志切换记录,恰好需要做日志切换,则可能 StartPos 和 EndPos相同 * 也就是说不需要记这个 WAL日志记录 */ if (isLogSwitch) inserted = ReserveXLogSwitch(&StartPos, &EndPos, &rechdr->xl_prev); else { ReserveXLogInsertLocation(rechdr->xl_tot_len, &StartPos, &EndPos, &rechdr->xl_prev); inserted = true; }
在 WAL buffer 中预留空间
/* 预留空间之后,开始做数据复制。inserted 为true,表示非日志切换记录 */ if (inserted) { /* * 目前 xl_prev 已经填充了,对记录头做 cdc 校验 */ rdata_crc = rechdr->xl_crc; COMP_CRC32C(rdata_crc, rechdr, offsetof(XLogRecord, xl_crc)); FIN_CRC32C(rdata_crc); rechdr->xl_crc = rdata_crc; /* * 将日志记录复制到WAL Buffer */ CopyXLogRecordToWAL(rechdr->xl_tot_len, isLogSwitch, rdata, StartPos, EndPos); /* * 除非是一些被标记为不重要的数据,否则都需要更新当前槽位的 lastImportantAt 值 * 如果 holdingAllLocks 为真,则更新第一个值 */ if ((flags & XLOG_MARK_UNIMPORTANT) == 0) { int lockno = holdingAllLocks ? 0 : MyLockNo; WALInsertLocks[lockno].l.lastImportantAt = StartPos; } } else // inserted 为false,表示日志切换记录 { /* * 这是一条日志切换记录,但当前插入位置正好在段的开始位置,没东西可以复制。 */ } /* 操作完成,释放锁 */ WALInsertLockRelease(); MarkCurrentTransactionIdLoggedIfAny(); END_CRIT_SECTION(); … /* * 更新全局变量 */ ProcLastRecPtr = StartPos; XactLastRecEnd = EndPos; … return EndPos;