XLOG 日志的创建
在系统安装时,需要调用BootStrapXLog来初始化XLOG相关的段。
BootStrapXLOG
在数据库安装时调用该函数初始化XLOG相关的数据
- 生成数据库唯一标识符,即pg_control文件中的“Database system identifier” ,根据当前的时间的秒数、微妙数以及当前的进程号通过一定转换生成。
- 申请两个XLOG的buffer页
- 初始化checkpoint
- 初始化全局共享变量
- 初始化CLOG、MultiXact、CommitTS等
- 初始化XLOG普通页头即XLogPageHeaderData
- 初始化XLOG第一个页的头即XLogLongPageHeaderData
- 插入初始化的checkpoint记录
- 初始化XLOG段文件
- 将相关checkpoint记录更新到pg_control全局变量中
- 创建CLOG、CommitTS、SUBTRANS、MultiXact
- 将pg_control文件读入内存
XLOG 日志的初始化
XLOGShmemInit
初始化XLOGCtl这个结构体,这个结构体是XLOG读写的重要管理结构。
- 初始化XLOG Ctl
- 初始化Control file
- 初始化XLogCtl的xlblocks和WALInsertLocks
- 初始化XLogCtl的其他成员
XLOG日志的启动
StartupXLOG
数据库启动时,根据数据库的状态启动XLOG或者进入故障回放模式进行XLOG的回放,从而使数据库恢复到正常情况,由于startupXLOG包含了正常启动和故障恢复的流程,这里只讨论启动的流程,故障恢复的流程后面再讲。 #故障恢复
XLOG日志的插入
XLOG日志插入相关的结构体
- XLogCtlData: XLog 日志在共享内存中的结构体,XLOG的写入和读取都由该结构体控制。
- XLogCtlInsert: XLog 写入相关的控制结构体
- Checkpoint: checkpoint相关的结构体
- XLogWrtRqst: 要写入的XLOG位置的结构体
- XLOGWrtResult: 已经写入完成的XLOG的位置的结构体。
- WALInsertLock: XLOG写入时的控制锁。
XLogCtlData
XLog日志数据在共享内存中存放时的控制结构体
typedef struct XLogCtlData
{
XLogCtlInsert Insert;//插入相关结构体
XLogwrtRqst LogwrtRqst; //需要写入的位置
XLogRecPtr RedoRecPtr; /* REDO的位置 */
FullTransactionId ckptFullXid; /* checkpoint时的最新位置 */
XLogRecPtr asyncXactLSN; /* 异步提交或终止的LSN */
XLogRecPtr replicationSlotMinLSN; /* 每个槽需要的最老的LSN */
XLogSegNo lastRemovedSegNo; /* 最新的删除或回收的段号*/
XLogRecPtr unloggedLSN; //unlogged表的LSN位置
slock_t ulsn_lck; //unlogged表写入时的锁
pg_time_t lastSegSwitchTime; //最后一个段切换时的时间
XLogRecPtr lastSegSwitchLSN; //最后一个段切换的LSN
XLogwrtResult LogwrtResult; //已经写入完成的位置
XLogRecPtr InitializedUpTo; //最新初始化的缓存页面
char *pages; /* WAL缓冲中还未写XLOG的页 */
XLogRecPtr *xlblocks; /* 首个字节位置 */
int XLogCacheBlck; /* 分配的最高的XLOG缓冲页索引*/
TimeLineID ThisTimeLineID;//当前的时间线
TimeLineID PrevTimeLineID;//前一个时间线
RecoveryState SharedRecoveryState; //恢复状态
bool SharedHotStandbyActive; //是否是热备模式
bool SharedPromoteIsTriggered;//是否有备节点已经被激活
bool WalWriterSleeping; //是否处于低功耗模式
Latch recoveryWakeupLatch;//用于WAL在重放过程中唤醒进程
XLogRecPtr lastCheckPointRecPtr; //最新的checkpoint起始位置
XLogRecPtr lastCheckPointEndPtr;//最新的checkpoint结束位置
CheckPoint lastCheckPoint;//最新的checkpoint
XLogRecPtr lastReplayedEndRecPtr; //上一次的重放结束的位置
TimeLineID lastReplayedTLI;//上一次重放的timeline
XLogRecPtr replayEndRecPtr;//重放结束的位置
TimeLineID replayEndTLI;//重放结束时的timeline
TimestampTz recoveryLastXTime; //恢复的时间
TimestampTz currentChunkStartTime;//WAL 开始重放的时间
RecoveryPauseState recoveryPauseState;//重放终止的状态
ConditionVariable recoveryNotPausedCV;//条件变量,可用于同步并通知恢复过程不再处于暂停状态。
XLogRecPtr lastFpwDisableRecPtr;//最后一次FPW的位置
slock_t info_lck; /* 共享变量的保护锁 */
} XLogCtlData;
XLogCtlInsert
进行WAL日志插入时需要的控制信息
/*
WAL插入时用的结构体
*/
typedef struct XLogCtlInsert
{
slock_t insertpos_lck; /* 插入锁,保护CurrBytePos和PrevBytePos */
uint64 CurrBytePos; //当前已预留的WAL空间的末尾位置
uint64 PrevBytePos;//先前插入记录的位置
char pad[PG_CACHE_LINE_SIZE]; //填充字节,确保缓冲行对齐
XLogRecPtr RedoRecPtr; /* 当前插入的REDO位置*/
bool forcePageWrites; /* PITR操作时是否强制全页写 */
bool fullPageWrites; //是否全页写
ExclusiveBackupState exclusiveBackupState; //表示独占备份的状态(
int nonExclusiveBackups;//是一个计数器,表示当前正在进行的非独占流式基础备份的数量。
XLogRecPtr lastBackupStart;//是在线备份的最晚检查点重做位置起点
WALInsertLockPadded *WALInsertLocks; //WAL插入需要的锁
} XLogCtlInsert;
XLogwrtRqst
需要写入的XLOG的位置数据
typedef struct XLogwrtRqst
{
XLogRecPtr Write; /* last byte + 1 to write out */
XLogRecPtr Flush; /* last byte + 1 to flush */
} XLogwrtRqst;
XLogwrtResult
已经写入的XLOG的位置数据
typedef struct XLogwrtResult
{
XLogRecPtr Write; /* last byte + 1 written out */
XLogRecPtr Flush; /* last byte + 1 flushed */
} XLogwrtResult;
XLOG日志的注册
注册一个register_buffer,将数据存入到rdata数组中,然后链接到注册的register_buffer中。以插入表数据为例(heap_insert),主要注册流程如下:
- 将元组插入信息(xl_heap_insert)存储mainrdata中
- 申请一个register_buffer并初始化,例如表的relfilenode、forknum等信息
- 元组插入相关的XLOG数据(元组数据)存入到register_buffer的rdata数据链中
- 调用XLogAssemble函数封装XLOG Record
- 调用XLogInsertRecord函数写入到WAL Buffer中
- 调用XLOGFlush函数刷入磁盘
InitXLogInsert
初始化Insert XLOG所需要的共享内存变量,比如注册要用到的register_buffers,内存上下文、rdatas、hdr_scrach等。
- register_buffers: XLOG日志注册时使用,需要先将头数据注册到register buffers,数量是5个
- xloginsert_cxt: XLOG插入相关的内存上下文,所有插入过程的内存的管理都由该上下文确定,
- rdatas: main data存放位置,20个
- hdr_scrach: 封装时使用,会将register buffer和rdatas的数据都挪到这里进行封装,最终封装成一个XLOG Record日志
XLogBeginInsert
准备开始构建一个WAL Record记录,必须在注册前调用,主要是判断注册相关的register_buffer和rdata等全局变量是否重置。将begininsert_called设置为true,可以防止递归调用出现问题。
XLogRegisterData
开始注册XLOG数据,将数据添加到rdatas数组中,一般保存插入相关数据信息,在进行REDO时会调用XLogRecGetData()函数获取相关信息
- 从rdata数组中取一个槽位
- 初始化该槽位,存入要写入的数据的地址。
- mainrdata_last和mainrdata_len更新,形成一个数据链表。
XLogRegisterBuffer
注册buffer,获取一个register_buffer并初始化
- 根据block id选取一个register_buffer
- 初始化buffer对应的rnode,forkno,block,page
- 初始化其他元素
XLogRegisterBufData
注册register_buffer数据,写入元组头相关信息,以及数据。
- 选取一个rdata槽位,将数据存入
- 将rdata槽位地址存入register_buffer中的rdata_tail的链表中
XLOG日志的封装
这个阶段主要是将已注册的数据和缓冲区内容组织成一个适用于插入到WAL(预写日志)的记录格式,并生成XLogRecData链表。在组装过程中,会填充记录头的各个字段,但不包括xl_prev字段,并计算CRC校验值(不含记录头)。如果有些缓冲区没有进行全页备份,还会设置fpw_lsn为这些缓冲区中最小的LSN,以此表明这个记录的有效性依赖于RedoRecPtr和doPageWrites值的实时更新。最终,这个组装好的记录会通过XLogInsertRecord()函数插入到WAL中。
每个register_buffer对应一个block data
XLogInsert
插入已经指定了RMID和info信息的XLOG日志
- 初步获取后面判断是否需要进行FPW需要用到的一些参数
- 组装XLogRecord数据
- 写入到WAL buffer中
- 重置所有相关全局变量
do
{
XLogRecPtr RedoRecPtr;
bool doPageWrites;
XLogRecPtr fpw_lsn;
XLogRecData *rdt;
int num_fpi = 0;
GetFullPageWriteInfo(&RedoRecPtr, &doPageWrites); //初步获取判断是否需要进行FPW的一些参数,可能会变化,后面还会获取
rdt = XLogRecordAssemble(rmid, info, RedoRecPtr, doPageWrites, &fpw_lsn, &num_fpi);//组装XlogRecord记录
EndPos = XLogInsertRecord(rdt, fpw_lsn, curinsert_flags, num_fpi);//插入到WAL缓冲区
} while (EndPos == InvalidXLogRecPtr);
XLogResetInsertion();//重置使用过的共享变量,如register_buffer
XlogRecordAssemble
将已注册的数据和缓冲区内容组织成一个适用于插入到WAL(预写日志)的记录格式,并生成XLogRecData链表
组装XLogRecord的头信息,这里通过一个scratch游标实现,逐个将各个阶段的头信息加载到scratch中;将需要写入的数据块链接到hdr_rdt的数据链中,链表头就是scratch,最终得到以hdr_rdt为头的完整的数据链表
函数主要流程:
-
初始化游标scratch
-
遍历所有注册的register_buffer,每一个register_buffer对应的就是一个XLogRecordBlock,一般如元组的插入更新只会使用一个block,但是像索引的操作一般都会使用大于2个block。
- 判断是否需要进行FPW
判断该页是否需要备份:- 判断标志位, 若包含REGBUF_FORCE_IMAGE 则需要备份,若包含REGBUF_NO_IMAGE则不需要备份
- 传入参数doPageWrites是否为true,若为true就需要强制进行备份
- 判断LSN: 如果页中的REDO位置小于当前的REDO位置,那么就需要备份,否则不需要
if (regbuf->flags & REGBUF_FORCE_IMAGE) needs_backup = true; else if (regbuf->flags & REGBUF_NO_IMAGE) needs_backup = false; else if (!doPageWrites) needs_backup = false; else { XLogRecPtr page_lsn = PageGetLSN(regbuf->page); needs_backup = (page_lsn <= RedoRecPtr); if (!needs_backup) { if (*fpw_lsn == InvalidXLogRecPtr || page_lsn < *fpw_lsn) *fpw_lsn = page_lsn; } }
- 如果需要进行FPW,先计算空洞,空洞的计算即页内的upper-lower即为空洞的长度,lower为空洞的起始位置。
if (regbuf->flags & REGBUF_STANDARD) { /* Assume we can omit data between pd_lower and pd_upper */ uint16 lower = ((PageHeader) page)->pd_lower; uint16 upper = ((PageHeader) page)->pd_upper; if (lower >= SizeOfPageHeaderData && upper > lower && upper <= BLCKSZ) { bimg.hole_offset = lower; cbimg.hole_length = upper - lower; } else { /* No "hole" to remove */ bimg.hole_offset = 0; cbimg.hole_length = 0; } } else { /* Not a standard page header, don't try to eliminate "hole" */ bimg.hole_offset = 0; cbimg.hole_length = 0; }
- 如果压缩开关打开,需要进行页面压缩,调用XLogCompressBackupBlock函数压缩页面。
- 添加压缩的头文件,FPW的头文件
- 添加Block的数据和头信息
- 判断是否需要进行FPW
-
添加数据到hdr_rdt链上
-
初始化XLogRecord头信息
XLOGRecord的头信息填充
所有的XLOG相关的头信息,会用一个游标scratch串起来,最终组合成一个完整的头信息。
char *scratch = hdr_scratch;
rechdr = (XLogRecord *) scratch; //以一个scrath游标封装所有的头信息
scratch += SizeOfXLogRecord;//先跳过XLOGRecord的头信息,空间预留,后面会初始化
/* 封装Block的头数据 */
memcpy(scratch, &bkpb, SizeOfXLogRecordBlockHeader);
scratch += SizeOfXLogRecordBlockHeader;
if (include_image)
{
//封装FPW的头数据
memcpy(scratch, &bimg, SizeOfXLogRecordBlockImageHeader);
scratch += SizeOfXLogRecordBlockImageHeader;
if (cbimg.hole_length != 0 && is_compressed) //
{
//封装压缩的头数据
memcpy(scratch, &cbimg, SizeOfXLogRecordBlockCompressHeader);
scratch += SizeOfXLogRecordBlockCompressHeader;
}
}
if (!samerel)
{
//如果跟上一个记录不是同一张表,记录表的relfilenode信息
memcpy(scratch, ®buf->rnode, sizeof(RelFileNode));
scratch += sizeof(RelFileNode);
}
memcpy(scratch, ®buf->block, sizeof(BlockNumber)); //添加Blocknumber信息
scratch += sizeof(BlockNumber);
/* 复制源ID打开时会记录复制源ID到XLOG中,一般在配置了流复制时使用 */
if ((curinsert_flags & XLOG_INCLUDE_ORIGIN) &&
replorigin_session_origin != InvalidRepOriginId)
{
*(scratch++) = (char) XLR_BLOCK_ID_ORIGIN;
memcpy(scratch, &replorigin_session_origin, sizeof(replorigin_session_origin));
scratch += sizeof(replorigin_session_origin);
}
//配置逻辑解码时添加事务ID信息
*(scratch++) = (char) XLR_BLOCK_ID_TOPLEVEL_XID;
memcpy(scratch, &xid, sizeof(TransactionId));
scratch += sizeof(TransactionId);
//添加数据的头信息,大于255时为DATA_LONG,小于255时为DATA_SHORT
if (mainrdata_len > 255)
{
*(scratch++) = (char) XLR_BLOCK_ID_DATA_LONG;
memcpy(scratch, &mainrdata_len, sizeof(uint32));
scratch += sizeof(uint32);
}
else
{
*(scratch++) = (char) XLR_BLOCK_ID_DATA_SHORT;
*(scratch++) = (uint8) mainrdata_len;
}
//初始化XLOG Record的头信息,开头已经分配过空间
rechdr->xl_xid = GetCurrentTransactionIdIfAny();
rechdr->xl_tot_len = total_len;
rechdr->xl_info = info;
rechdr->xl_rmid = rmid;
rechdr->xl_prev = InvalidXLogRecPtr;
rechdr->xl_crc = rdata_crc;
数据链封装
所有注册好的数据会以链表的形式串联起来,链表头为hdr_rdt,存储的是头信息数据,后面为压缩或FPW备份的页数据,register_buffer中注册的数据,maindata中的数据。
//初始化链表头,存储数据位XLOG Record的头信息
hdr_rdt.next = NULL;
rdt_datas_last = &hdr_rdt;
hdr_rdt.data = hdr_scratch; //数据链的第一个数据节点为头信息scratch
//第二个数据节点,当存在FPW或者压缩的时候保存。
/* 暂用register_buffer中的临时空间bkp_rdatas[0] */
rdt_datas_last->next = ®buf->bkp_rdatas[0];
rdt_datas_last = rdt_datas_last->next;
if (is_compressed) //如果进行了压缩,保存压缩数据
{
rdt_datas_last->data = regbuf->compressed_page;
rdt_datas_last->len = compressed_len;
}
else //如果没有存在压缩,保存页的数据
{
bimg.length = BLCKSZ - cbimg.hole_length;
if (cbimg.hole_length == 0)//没有空洞就直接保存整个页
{
rdt_datas_last->data = page;
rdt_datas_last->len = BLCKSZ;
}
else
{
/* 如果有空洞就需要跳过空洞,这里就需要使用两个缓冲页存放空洞两边的数据*/
rdt_datas_last->data = page; //bkp_rdatas[0]保存空洞左边的数据
rdt_datas_last->len = bimg.hole_offset;
rdt_datas_last->next = ®buf->bkp_rdatas[1];
rdt_datas_last = rdt_datas_last->next;
rdt_datas_last->data =
page + (bimg.hole_offset + cbimg.hole_length);//bkp_rdatas[1]保存空洞右边的数据
rdt_datas_last->len =
BLCKSZ - (bimg.hole_offset + cbimg.hole_length);
}
}
//第三个数据节点(可能包含好几个数据节点),register_buffer中注册的数据链,即rdata链。
/*如果保存的有数据,那么就将register_buffer中的rdata数据挂到当前的数据链中* /
if (needs_data)
{
rdt_datas_last->next = regbuf->rdata_head;
rdt_datas_last = regbuf->rdata_tail;
}
//第四个数据节点,maindata中存储的数据。
/* 下面是存储的主要数据 */
if (mainrdata_len > 0)
{
rdt_datas_last->next = mainrdata_head; //将main数据挂到数据链上
rdt_datas_last = mainrdata_last;
total_len += mainrdata_len;
}
rdt_datas_last->next = NULL;
XlogResetInsertion
重置已使用的register_buffer、mainrdata等全局变量。
for (i = 0; i < max_registered_block_id; i++)
registered_buffers[i].in_use = false;
num_rdatas = 0;
max_registered_block_id = 0;
mainrdata_len = 0;
mainrdata_last = (XLogRecData *) &mainrdata_head;
curinsert_flags = 0;
begininsert_called = false;
XLOG日志的写入
XLogInsertRecord
将封装好的XLogRecord数据插入WAL Buffer然后刷入磁盘。这里主要分为两个过程:
- 预留空间: 由于封装好的日志已经知道了其长度,可以先去WAL缓冲区中预留对应长度的空间,预留过程由XLogCtl->Insert->insertpos_lck锁保护,进程之间互斥。
- 数据复制:将数据拷贝到预留的空间中,这里可以并发执行,由WALInsertLocks锁进行并发控制,当前是8个锁。每个写入进程都需要申请一把锁。
函数流程: - 申请一把插入WALInsertLocks锁,如果涉及WAL段文件切换,申请的排它锁会阻塞其他进程。
if (isLogSwitch) //判断是否会切换WAL段文件
WALInsertLockAcquireExclusive();
else
WALInsertLockAcquire();//获取一把插入锁
- 更新全局的RedoRecPtr,并且判断是否需要FPW
if (RedoRecPtr != Insert->RedoRecPtr)
{
RedoRecPtr = Insert->RedoRecPtr; //获取当前插入的redo位置
}
doPageWrites = (Insert->fullPageWrites || Insert->forcePageWrites);//是否需要FPW
- 调用ReserveXLogInsertLocation函数从WAL Buffer中预申请一片空间,或者需要切换段文件时调用ReserveXLogSwitch函数预留剩余所有空间
if (isLogSwitch)//如果是日志切换的XLOG,则段的剩余空间都预留,保证下一次一定会切换日志
inserted = ReserveXLogSwitch(&StartPos, &EndPos, &rechdr->xl_prev);
else
{
ReserveXLogInsertLocation(rechdr->xl_tot_len, &StartPos, &EndPos, &rechdr->xl_prev);
inserted = true;
}
- 调用函数CopyXLogRecordToWAL函数将XLOG数据拷贝预留的WAL缓冲区中
- 释放插入锁
- 标记当前事务ID已经被记录XLOG
- 跨页的话,更新XLogCtl->LogwrtRqst.Write和LogwrtResult
- 如果是XLOG段文件切换,则调用XLogFlush函数刷入磁盘
ReserveXLogInsertLocation
预留WAL缓冲区空间,这里是串行的,所以需要抢insertpos_lck锁,会是一个性能瓶颈。
SpinLockAcquire(&Insert->insertpos_lck);//申请自旋锁,保护
startbytepos = Insert->CurrBytePos;//当前要预留的WAL空间起始位置
endbytepos = startbytepos + size; //XLog Record的长度
prevbytepos = Insert->PrevBytePos;//先前插入的XLOG的位置
Insert->CurrBytePos = endbytepos;//更新WAL BUffer插入的起始位置,占住位置
Insert->PrevBytePos = startbytepos; //更新先前的位置
SpinLockRelease(&Insert->insertpos_lck);
*StartPos = XLogBytePosToRecPtr(startbytepos);//转换为要写入的XLOG文件的起始位置
*EndPos = XLogBytePosToEndRecPtr(endbytepos);//转换为要写入的XLOG文件的起始位置
*PrevPtr = XLogBytePosToRecPtr(prevbytepos);//转换为要写入的XLOG文件的起上一个记录的位置
CopyXLogRecordToWAL
将XLogRecord数据拷贝到预留的WAL缓冲区中
- 根据WAL的LSN位置转换成WAL Buffer中地址
CurrPos = StartPos;
currpos = GetXLogBuffer(CurrPos);//获取要写入的位置在WAL BUffer中的地址
freespace = INSERT_FREESPACE(CurrPos);//页内剩余空间
- 遍历每个XLOGRecord中rdata数据链中的每个rdata,然后拷贝到对应的Buffer为止
written = 0;
while (rdata != NULL)//遍历每个rdata数据链,将其拷贝到对应的WAL BUffer中
{
char *rdata_data = rdata->data;
int rdata_len = rdata->len;
//如果rdata要写入的长度大于每个页中的剩余空间,则循环遍历逐页去写
.....
memcpy(currpos, rdata_data, rdata_len); //拷贝到buffer中
currpos += rdata_len;
CurrPos += rdata_len;
freespace -= rdata_len;
written += rdata_len;
rdata = rdata->next;
}
- 如果rdata要写入的长度大于每个页中的剩余空间,则循环遍历逐页去写
while (rdata_len > freespace)//如果页内剩余空间不足以写完一个rdata,循环遍历写
{
memcpy(currpos, rdata_data, freespace);//先拷贝业内剩余的空间的数据
rdata_data += freespace;
rdata_len -= freespace;
written += freespace;
CurrPos += freespace;
currpos = GetXLogBuffer(CurrPos);//跳到下一页为止
pagehdr = (XLogPageHeader) currpos;
pagehdr->xlp_rem_len = write_len - written;
pagehdr->xlp_info |= XLP_FIRST_IS_CONTRECORD;
/* 跳过页头 */
if (XLogSegmentOffset(CurrPos, wal_segment_size) == 0)
{
CurrPos += SizeOfXLogLongPHD;
currpos += SizeOfXLogLongPHD;
}
else
{
CurrPos += SizeOfXLogShortPHD;
currpos += SizeOfXLogShortPHD;
}
freespace = INSERT_FREESPACE(CurrPos);
}
- 如果日志切换的XLOG日志,还需要将剩余段内空间置零。
XLogFlush
将数据刷入磁盘,根据传入的Record位置刷盘,会将WAL Buffer中小于指定位置的数据写入磁盘。
- 如果Record值小于LogwrtResult.Flush,说明数据已经被刷入磁盘,直接返回即可
if (record <= LogwrtResult.Flush)
return;
- 等待所有小于WriteRqstPtr的插入操作结束,只有这样才能将所有小于Record的数据全刷入磁盘,更新要写入的位置为XLogCtl->LogwrtRqst.Write,然后 等待所有写入完成
//如果小于全局要刷的,则更新为全局的
SpinLockAcquire(&XLogCtl->info_lck); //抢锁更新XLogCtl
if (WriteRqstPtr < XLogCtl->LogwrtRqst.Write)
WriteRqstPtr = XLogCtl->LogwrtRqst.Write;
LogwrtResult = XLogCtl->LogwrtResult;
SpinLockRelease(&XLogCtl->info_lck);
//等待所有插入操作完成,返回最终的位置
insertpos = WaitXLogInsertionsToFinish(WriteRqstPtr)
- 抢占WALWriteLock锁
if (!LWLockAcquireOrWait(WALWriteLock, LW_EXCLUSIVE))//等待写入锁
{
continue;
}
- 写数据库前再检查一下是否所有数据已经刷入磁盘
LogwrtResult = XLogCtl->LogwrtResult;
if (record <= LogwrtResult.Flush)//再检查一下是否已经刷入过了
{
LWLockRelease(WALWriteLock);
break;
}
- 将数据写入磁盘
WriteRqst.Write = insertpos;
WriteRqst.Flush = insertpos;
XLogWrite(WriteRqst, false);
- 释放WALWriteLock锁
XLogWrite
XLogWrite
函数是PostgreSQL数据库中用于处理WAL(预写式日志)记录的内部核心函数。以下是对该函数详细和具体的功能描述:
-
参数解释:
-
XLogwrtRqst WriteRqst
:表示要求写入的日志位置,包括需要写入的日志结束点。 -
bool flexible
:指示是否可以灵活地选择写入点。如果为true
,则可以在合适的边界处停止写入,例如缓存或日志文件边界,以避免不必要的多次写操作。
-
-
前置条件:
- 在调用此函数前,必须先获取WALWriteLock锁,并通过
WaitXLogInsertionsToFinish(WriteRqst)
确保待写入的数据已经准备就绪。
- 在调用此函数前,必须先获取WALWriteLock锁,并通过
-
功能实现:
- 函数首先检查当前是否在临界区,确保并发安全。
- 更新本地
LogwrtResult
变量到最新的日志写入状态。 - 使用循环遍历日志缓冲区中的各个页面,从下一个未写入或部分写入的页面开始。
- 检查并保证不超越插入过程,防止非法的日志写入请求。
- 根据日志段切换逻辑,决定是否打开新的日志文件或者继续使用已有的日志文件。
- 将连续的页面数据分组进行一次I/O写操作,提高效率。
- 调用
pg_pwrite
将一组日志页写入磁盘,并在器和统计信息。 - 如果每次写入后更新相关计数一页写入,则立即执行fsync完成了一个日志文件段的最后相关的归档、检查点和wal操作,确保数据持久化,并触发 sender唤醒等逻辑。
- 如果执行相应的刷盘操作并更新
4 最后,在共享内存中更新日志写入结果的状态信息,确保LogwrtResult
始终小于等于LogwrtRqst
。
XLOG日志的读取
ReadRecord
ReadRecord函数一般是在xlogStartup函数中调用,会启动一个reader流,然后回调该函数读取每个xlog记录数据。该函数又调用XLogReadRecord读取一个XLOG Record记录。调用链为:ReadRecord->XLogReadRecord->ReadPageInternal,等分析 #故障恢复 时再分析。
XLOG日志的删除
RemoveOldXlogFiles函数实现,流程如下
- 根据入参计算出要删除的段文件名称,要回收的段号等信息
- 遍历pg_wal目录下的每个文件,不是WAL日志文件格式就跳过
- 通过文件名的逻辑ID+段ID判断是否小于要删除的段文件的ID,如果小于就删除文件,否则跳过
- 更新XLOGCtl->lastRemovedSegNo
- 调用RemoveXlogFile函数删除或回收重用该段文件
XLOG日志的checkpoint
CreateCheckPoint实现,参见checkpoint章节
#Checkpoint
XLOG日志的redo
xlog_redo函数实现,会根据XLOG的类型尽显对应的redo操作。等分析 #故障恢复 时再分析。
【参考】
- 《PostgreSQL数据库内核分析》
- 《Postgresql技术内幕-事务处理深度探索》
- 《PostgreSQL指南:内幕探索》
- pg14源码