PostgreSQL的存储系统二:REDOLOG文件存储结构二

REDOLOG文件里的用户数据和数据文件里的用户数据存储结构相同

几个月前同事给*一家公司培训《pg9 ad admin》时,有个学员提及WAL里记录的内容为Query时的SQL语句(比如insert等),同事告知WAL里记录的tuple信息,而非SQL,该学员坚持里面是SQL或SQL+tuple,并说oracle的redo日志里记录的是SQL(不知到这个从哪里知道的,也许是日志挖掘出来SQL的缘由吧)。便看了一下源码(还是开源的好)。

前面我写过一篇文章《PostgreSQL的存储系统二:REDOLOG文件存储结构》,见地址http://beigang.iteye.com/blog/1565121或http://blog.csdn.net/beiigang/article/details/7680905,其中提到Pg XLOG文件的存储格式大致如下:

<PageHeaderData>

<XLogRecord>

<rmgr-specific data>

<BkpBlock>

<XLogRecData>里面包括<CheckPoint>等

<BkpBlock>

<XLogRecData>

<BkpBlock>

<XLogRecData>

……

用户相关的数据写在XLogRecData结构(定义见下面)的buffer成员里,但具体写成什么样子没有提及,正好这儿再深入讨论一下。

typedefstruct XLogRecData

{

char      *data;         /* 资源管理器包含数据的开始 */

uint32     len;          /* 资源管理器包含数据的长度 */

Buffer     buffer;       /* 有相应数据的buffer,如果有的话 */

bool       buffer_std;   /* buffer是否有标准pd_lower/pd_upper头 */

struct XLogRecData *next;   /* 链里的下一个结构 */

} XLogRecData;

为了说清楚这个问题,跑了个例子如下:

INSERT INTO TABLE1(ID,GNAME) VALUES(18,’GangBei’);

看这个例子涉及的调用流程前,先回顾一下pg服务进程的调用流程,一切就绪后进入无限循环,等候客户端指令,

PostgreSQL的存储系统二:REDOLOG文件存储结构二

Postgres服务进程调用流程图

这个例子的调用流程和《PostgreSQL服务过程中的那些事二:Pg服务进程处理简单查询》系列博文中的流程大致相同,也是调用exec_simple_query方法,和前面《PostgreSQL服务过程中的那些事二:Pg服务进程处理简单查询》中select例子不同的是,本节中insert的例子在portalrun方法里调用了执行器的ExecInsert方法,最终调用了heap_insert方法,在这个方法里完成了记录写入数据文件,并调用了XLogInsert方法,完成了XLOG的WAL日志写入。更具体的方法调用流程参见下面的调用流程图,其他和《PostgreSQL服务过程中的那些事二:Pg服务进程处理简单查询》基本相同的部分略去。

PostgreSQL的存储系统二:REDOLOG文件存储结构二

Insert SQL 语句调用流程

在heap_insert方法里,组装好tuple,调用RelationGetBufferForTuple方法找到shmem里缓存数据文件块的buffer,调用RelationPutHeapTuple方法,把组装好的元组放到合适的buffer中合适的位置;然后组装XLogRecData类型变量rdata,把buffer赋给XLogRecData的成员buffer,接着调用XLogInsert方法,并传入rdata,在XLogInsert方法里,用memcpy方法把rdata写入shmem对应的cache里,最后pg都是通过操作系统接口I/O接口把WAL日志和数据写入对应的文件。

既然XLOG里写的 Insert的wal日志里的用户数据和数据文件中的一样,那我们简单看一下pg中数据文件里的tuple,tuple存放在堆中,一个tuple就是一行表记录,在数据文件的页里存放的结构如下图:

PostgreSQL的存储系统二:REDOLOG文件存储结构二

数据文件页面布局图

PostgreSQL的存储系统二:REDOLOG文件存储结构二

元组结构图

元组头结构和其字段表示意义见下面:

typedefstruct HeapTupleHeaderData

{

union

{

HeapTupleFieldst_heap;

DatumTupleFieldst_datum;

}                    t_choice;

ItemPointerDatat_ctid;        /* current TID of this or newer tuple */

/* Fields below here must match MinimalTupleData! */

uint16            t_infomask2;  /* number of attributes + various flags */

uint16            t_infomask;           /* various flag bits, see below */

uint8              t_hoff;                  /* sizeof header incl. bitmap, padding */

/* ^ - 23 bytes - ^ */

bits8              t_bits[1];        /* bitmap of NULLs -- VARIABLE LENGTH */

/* MORE DATA FOLLOWS AT END OF STRUCT */

} HeapTupleHeaderData;

typedefstruct HeapTupleFields

{

TransactionIdt_xmin;           /* inserting xact ID */

TransactionIdt_xmax;          /* deleting or locking xact ID */

union

{

CommandId   t_cid;             /* inserting or deleting command ID, or both */

TransactionIdt_xvac;    /* old-style VACUUM FULL xact ID */

}                    t_field3;

} HeapTupleFields;

typedefstruct DatumTupleFields

{

int32              datum_len_;          /* varlena header (do not touch directly!) */

int32              datum_typmod;     /* -1, or identifier of a record type */

Oid                datum_typeid;       /* composite type OID, or RECORDOID */

/*

* Note: field ordering is chosen with thought that Oid might someday

* widen to 64 bits.

*/

} DatumTupleFields;

typedefstruct ItemPointerData

{

BlockIdDataip_blkid;

OffsetNumberip_posid;

}

PostgreSQL的元组头结构是MVCC算法的基础。这个以后再说吧。

下面把heap_insert方法和XLogInsert方法贴到了下面,为了突显主题,删掉了其余代码,并把XLOG内容相关变量和方法置为红色,方便串读。

Oid

heap_insert(Relation relation, HeapTuple tup, CommandId cid,

int options, BulkInsertState bistate)

{

TransactionId xid = GetCurrentTransactionId();

HeapTuple     heaptup;

Buffer            buffer;

bool              all_visible_cleared = false;

/*1 组装元组头信息 */

tup->t_data->t_infomask &= ~(HEAP_XACT_MASK);

tup->t_data->t_infomask2 &= ~(HEAP2_XACT_MASK);

tup->t_data->t_infomask |= HEAP_XMAX_INVALID;

HeapTupleHeaderSetXmin(tup->t_data, xid);

HeapTupleHeaderSetCmin(tup->t_data, cid);

HeapTupleHeaderSetXmax(tup->t_data, 0);

tup->t_tableOid = RelationGetRelid(relation);

heaptup = tup;

/*2 Find buffer to insert this tuple into */

buffer = RelationGetBufferForTuple(relation, heaptup->t_len,

InvalidBuffer, options, bistate);

/*3

* We're about to do the actual insert -- check for conflict at the

* relation or buffer level first, to avoid possibly having to roll back

* work we've just done.

*/

CheckForSerializableConflictIn(relation, NULL, buffer);

/*4 NO EREPORT(ERROR) from here till changes are logged */

START_CRIT_SECTION();

RelationPutHeapTuple(relation, buffer, heaptup)

MarkBufferDirty(buffer);

/* XLOG stuff */

if (!(options & HEAP_INSERT_SKIP_WAL) && RelationNeedsWAL(relation))

{

xl_heap_insert xlrec;

xl_heap_header xlhdr;

XLogRecPtr   recptr;

XLogRecData rdata[3];

Page             page = BufferGetPage(buffer);

uint8              info = XLOG_HEAP_INSERT;

xlrec.all_visible_cleared = all_visible_cleared;

xlrec.target.node = relation->rd_node;

xlrec.target.tid = heaptup->t_self;

rdata[0].data = (char *) &xlrec;

rdata[0].len = SizeOfHeapInsert;

rdata[0].buffer = InvalidBuffer;

rdata[0].next = &(rdata[1]);

xlhdr.t_infomask2 = heaptup->t_data->t_infomask2;

xlhdr.t_infomask = heaptup->t_data->t_infomask;

xlhdr.t_hoff = heaptup->t_data->t_hoff;

/*

* note we mark rdata[1] as belonging to buffer; if XLogInsert decides

* to write the whole page to the xlog, we don't need to store

* xl_heap_header in the xlog.

*/

rdata[1].data = (char *) &xlhdr;

rdata[1].len = SizeOfHeapHeader;

rdata[1].buffer = buffer;

rdata[1].buffer_std = true;

rdata[1].next = &(rdata[2]);

/* PG73FORMAT: write bitmap [+ padding] [+ oid] + data */

rdata[2].data = (char *) heaptup->t_data + offsetof(HeapTupleHeaderData, t_bits);

rdata[2].len = heaptup->t_len - offsetof(HeapTupleHeaderData, t_bits);

rdata[2].buffer = buffer;

rdata[2].buffer_std = true;

rdata[2].next = NULL;

recptr = XLogInsert(RM_HEAP_ID, info, rdata);

PageSetLSN(page, recptr);

PageSetTLI(page, ThisTimeLineID);

}

END_CRIT_SECTION();

UnlockReleaseBuffer(buffer);

pgstat_count_heap_insert(relation);

return HeapTupleGetOid(tup);

}

XLogRecPtr

XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)

{

XLogCtlInsert *Insert = &XLogCtl->Insert;

XLogRecord *record;

XLogContRecord *contrecord;

XLogRecPtr   RecPtr;

XLogRecPtr   WriteRqst;

uint32            freespace;

int                 curridx;

XLogRecData *rdt;

Buffer            dtbuf[XLR_MAX_BKP_BLOCKS];

bool              dtbuf_bkp[XLR_MAX_BKP_BLOCKS];

BkpBlock       dtbuf_xlg[XLR_MAX_BKP_BLOCKS];

XLogRecPtr   dtbuf_lsn[XLR_MAX_BKP_BLOCKS];

XLogRecData dtbuf_rdt1[XLR_MAX_BKP_BLOCKS];

XLogRecData dtbuf_rdt2[XLR_MAX_BKP_BLOCKS];

XLogRecData dtbuf_rdt3[XLR_MAX_BKP_BLOCKS];

pg_crc32       rdata_crc;

uint32            len,

write_len;

unsigned       i;

TRACE_POSTGRESQL_XLOG_INSERT(rmid, info);

/*

* Here we scan the rdata chain, determine which buffers must be backed

* up, and compute the CRC values for the data.

*/

START_CRIT_SECTION();

/* Now wait to get insert lock */

LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);

/* Compute record's XLOG location */

curridx = Insert->curridx;

INSERT_RECPTR(RecPtr, Insert, curridx);

/*

* Append the data, including backup blocks if any

*/

/* 把rdata中的数据写入XLOG */

while (write_len)

{

while (rdata->data == NULL)

rdata = rdata->next;

if (freespace > 0)

{

if (rdata->len > freespace)

{

memcpy(Insert->currpos, rdata->data, freespace);

rdata->data += freespace;

rdata->len -= freespace;

write_len -= freespace;

}

else

{

memcpy(Insert->currpos, rdata->data, rdata->len);

freespace -= rdata->len;

write_len -= rdata->len;

Insert->currpos += rdata->len;

rdata = rdata->next;

continue;

}

}

/* Use next buffer */

updrqst = AdvanceXLInsertBuffer(false);

curridx = Insert->curridx;

/* Insert cont-record header */

Insert->currpage->xlp_info |= XLP_FIRST_IS_CONTRECORD;

contrecord = (XLogContRecord *) Insert->currpos;

contrecord->xl_rem_len = write_len;

Insert->currpos += SizeOfXLogContRecord;

freespace = INSERT_FREESPACE(Insert);

}

LWLockRelease(WALInsertLock);

XactLastRecEnd = RecPtr;

END_CRIT_SECTION();

return RecPtr;

}

下面这个图是WAL日志中存放的有关的INSERT、UPDATE、DELETE操作的内容,该图引自《Internals Of PostgreSQL Wal》

PostgreSQL的存储系统二:REDOLOG文件存储结构二

就到这儿吧。

-----------------

转载请著明出处:
blog.csdn.net/beiigang
beigang.iteye.com

上一篇:NSArray使用小结


下一篇:{Django基础九之中间件} 一 前戏 二 中间件介绍 三 自定义中间件 四 中间件的执行流程 五 中间件版登陆认证