问题描述
在做DML操作时,是先修改buffer pool数据,还是先记录redo log呢?
答案:先修改buffer pool数据,然后再记录redo log。
代码学习
在page0cur.cc文件有page_cur_insert_rec_low方法,该方法用于插入新记录,其方法中有如下注解:
/* 1. Get the size of the physical record in the page */
/* 2. Try to find suitable space from page memory management */
/* 3. Create the record */
/* 4. Insert the record in the linked list of records */
/* 5. Set the n_owned field in the inserted record to zero,
and set the heap_no field */
/* 6. Update the last insertion info in page header */
/* 7. It remains to update the owner record. */
/* 8. Now we have incremented the n_owned field of the owner
record. If the number exceeds PAGE_DIR_SLOT_MAX_N_OWNED,
we have to split the corresponding directory slot in two. */
/* 9. Write log record of the insert */
if (UNIV_LIKELY(mtr != NULL)) {
page_cur_insert_rec_write_log(insert_rec, rec_size,
current_rec, index, mtr);
}
在第9步中会调用函数page_cur_insert_rec_write_log来记录redo log。
page_cur_insert_rec_low函数
/***********************************************************//**
Inserts a record next to page cursor on an uncompressed page.
Returns pointer to inserted record if succeed, i.e., enough
space available, NULL otherwise. The cursor stays at the same position.
@return pointer to record if succeed, NULL otherwise */
rec_t*
page_cur_insert_rec_low(
/*====================*/
rec_t* current_rec,/*!< in: pointer to current record after
which the new record is inserted */
dict_index_t* index, /*!< in: record descriptor */
const rec_t* rec, /*!< in: pointer to a physical record */
ulint* offsets,/*!< in/out: rec_get_offsets(rec, index) */
mtr_t* mtr) /*!< in: mini-transaction handle, or NULL */
{
byte* insert_buf;
ulint rec_size;
page_t* page; /*!< the relevant page */
rec_t* last_insert; /*!< cursor position at previous
insert */
rec_t* free_rec; /*!< a free record that was reused,
or NULL */
rec_t* insert_rec; /*!< inserted record */
ulint heap_no; /*!< heap number of the inserted
record */
ut_ad(rec_offs_validate(rec, index, offsets));
page = page_align(current_rec);
ut_ad(dict_table_is_comp(index->table)
== (ibool) !!page_is_comp(page));
ut_ad(fil_page_index_page_check(page));
ut_ad(mach_read_from_8(page + PAGE_HEADER + PAGE_INDEX_ID) == index->id
|| recv_recovery_is_on()
|| (mtr ? mtr->is_inside_ibuf() : dict_index_is_ibuf(index)));
ut_ad(!page_rec_is_supremum(current_rec));
/* 1. Get the size of the physical record in the page */
rec_size = rec_offs_size(offsets);
#ifdef UNIV_DEBUG_VALGRIND
{
const void* rec_start
= rec - rec_offs_extra_size(offsets);
ulint extra_size
= rec_offs_extra_size(offsets)
- (rec_offs_comp(offsets)
? REC_N_NEW_EXTRA_BYTES
: REC_N_OLD_EXTRA_BYTES);
/* All data bytes of the record must be valid. */
UNIV_MEM_ASSERT_RW(rec, rec_offs_data_size(offsets));
/* The variable-length header must be valid. */
UNIV_MEM_ASSERT_RW(rec_start, extra_size);
}
#endif /* UNIV_DEBUG_VALGRIND */
/* 2. Try to find suitable space from page memory management */
free_rec = page_header_get_ptr(page, PAGE_FREE);
if (UNIV_LIKELY_NULL(free_rec)) {
/* Try to allocate from the head of the free list. */
ulint foffsets_[REC_OFFS_NORMAL_SIZE];
ulint* foffsets = foffsets_;
mem_heap_t* heap = NULL;
rec_offs_init(foffsets_);
foffsets = rec_get_offsets(
free_rec, index, foffsets, ULINT_UNDEFINED, &heap);
if (rec_offs_size(foffsets) < rec_size) {
if (UNIV_LIKELY_NULL(heap)) {
mem_heap_free(heap);
}
goto use_heap;
}
insert_buf = free_rec - rec_offs_extra_size(foffsets);
if (page_is_comp(page)) {
heap_no = rec_get_heap_no_new(free_rec);
page_mem_alloc_free(page, NULL,
rec_get_next_ptr(free_rec, TRUE),
rec_size);
} else {
heap_no = rec_get_heap_no_old(free_rec);
page_mem_alloc_free(page, NULL,
rec_get_next_ptr(free_rec, FALSE),
rec_size);
}
if (UNIV_LIKELY_NULL(heap)) {
mem_heap_free(heap);
}
} else {
use_heap:
free_rec = NULL;
insert_buf = page_mem_alloc_heap(page, NULL,
rec_size, &heap_no);
if (UNIV_UNLIKELY(insert_buf == NULL)) {
return(NULL);
}
}
/* 3. Create the record */
insert_rec = rec_copy(insert_buf, rec, offsets);
rec_offs_make_valid(insert_rec, index, offsets);
/* 4. Insert the record in the linked list of records */
ut_ad(current_rec != insert_rec);
{
/* next record after current before the insertion */
rec_t* next_rec = page_rec_get_next(current_rec);
#ifdef UNIV_DEBUG
if (page_is_comp(page)) {
ut_ad(rec_get_status(current_rec)
<= REC_STATUS_INFIMUM);
ut_ad(rec_get_status(insert_rec) < REC_STATUS_INFIMUM);
ut_ad(rec_get_status(next_rec) != REC_STATUS_INFIMUM);
}
#endif
page_rec_set_next(insert_rec, next_rec);
page_rec_set_next(current_rec, insert_rec);
}
page_header_set_field(page, NULL, PAGE_N_RECS,
1 + page_get_n_recs(page));
/* 5. Set the n_owned field in the inserted record to zero,
and set the heap_no field */
if (page_is_comp(page)) {
rec_set_n_owned_new(insert_rec, NULL, 0);
rec_set_heap_no_new(insert_rec, heap_no);
} else {
rec_set_n_owned_old(insert_rec, 0);
rec_set_heap_no_old(insert_rec, heap_no);
}
UNIV_MEM_ASSERT_RW(rec_get_start(insert_rec, offsets),
rec_offs_size(offsets));
/* 6. Update the last insertion info in page header */
last_insert = page_header_get_ptr(page, PAGE_LAST_INSERT);
ut_ad(!last_insert || !page_is_comp(page)
|| rec_get_node_ptr_flag(last_insert)
== rec_get_node_ptr_flag(insert_rec));
if (!dict_index_is_spatial(index)) {
if (UNIV_UNLIKELY(last_insert == NULL)) {
page_header_set_field(page, NULL, PAGE_DIRECTION,
PAGE_NO_DIRECTION);
page_header_set_field(page, NULL, PAGE_N_DIRECTION, 0);
} else if ((last_insert == current_rec)
&& (page_header_get_field(page, PAGE_DIRECTION)
!= PAGE_LEFT)) {
page_header_set_field(page, NULL, PAGE_DIRECTION,
PAGE_RIGHT);
page_header_set_field(page, NULL, PAGE_N_DIRECTION,
page_header_get_field(
page, PAGE_N_DIRECTION) + 1);
} else if ((page_rec_get_next(insert_rec) == last_insert)
&& (page_header_get_field(page, PAGE_DIRECTION)
!= PAGE_RIGHT)) {
page_header_set_field(page, NULL, PAGE_DIRECTION,
PAGE_LEFT);
page_header_set_field(page, NULL, PAGE_N_DIRECTION,
page_header_get_field(
page, PAGE_N_DIRECTION) + 1);
} else {
page_header_set_field(page, NULL, PAGE_DIRECTION,
PAGE_NO_DIRECTION);
page_header_set_field(page, NULL, PAGE_N_DIRECTION, 0);
}
}
page_header_set_ptr(page, NULL, PAGE_LAST_INSERT, insert_rec);
/* 7. It remains to update the owner record. */
{
rec_t* owner_rec = page_rec_find_owner_rec(insert_rec);
ulint n_owned;
if (page_is_comp(page)) {
n_owned = rec_get_n_owned_new(owner_rec);
rec_set_n_owned_new(owner_rec, NULL, n_owned + 1);
} else {
n_owned = rec_get_n_owned_old(owner_rec);
rec_set_n_owned_old(owner_rec, n_owned + 1);
}
/* 8. Now we have incremented the n_owned field of the owner
record. If the number exceeds PAGE_DIR_SLOT_MAX_N_OWNED,
we have to split the corresponding directory slot in two. */
if (UNIV_UNLIKELY(n_owned == PAGE_DIR_SLOT_MAX_N_OWNED)) {
page_dir_split_slot(
page, NULL,
page_dir_find_owner_slot(owner_rec));
}
}
/* 9. Write log record of the insert */
if (UNIV_LIKELY(mtr != NULL)) {
page_cur_insert_rec_write_log(insert_rec, rec_size,
current_rec, index, mtr);
}
return(insert_rec);
}
page_cur_insert_rec_write_log函数
***********************************************************//**
Writes the log record of a record insert on a page. */
static
void
page_cur_insert_rec_write_log(
/*==========================*/
rec_t* insert_rec, /*!< in: inserted physical record */
ulint rec_size, /*!< in: insert_rec size */
rec_t* cursor_rec, /*!< in: record the
cursor is pointing to */
dict_index_t* index, /*!< in: record descriptor */
mtr_t* mtr) /*!< in: mini-transaction handle */
{
ulint cur_rec_size;
ulint extra_size;
ulint cur_extra_size;
const byte* ins_ptr;
const byte* log_end;
ulint i;
/* Avoid REDO logging to save on costly IO because
temporary tables are not recovered during crash recovery. */
if (dict_table_is_temporary(index->table)) {
byte* log_ptr = mlog_open(mtr, 0);
if (log_ptr == NULL) {
return;
}
mlog_close(mtr, log_ptr);
log_ptr = NULL;
}
ut_a(rec_size < UNIV_PAGE_SIZE);
ut_ad(mtr->is_named_space(index->space));
ut_ad(page_align(insert_rec) == page_align(cursor_rec));
ut_ad(!page_rec_is_comp(insert_rec)
== !dict_table_is_comp(index->table));
{
mem_heap_t* heap = NULL;
ulint cur_offs_[REC_OFFS_NORMAL_SIZE];
ulint ins_offs_[REC_OFFS_NORMAL_SIZE];
ulint* cur_offs;
ulint* ins_offs;
rec_offs_init(cur_offs_);
rec_offs_init(ins_offs_);
cur_offs = rec_get_offsets(cursor_rec, index, cur_offs_,
ULINT_UNDEFINED, &heap);
ins_offs = rec_get_offsets(insert_rec, index, ins_offs_,
ULINT_UNDEFINED, &heap);
extra_size = rec_offs_extra_size(ins_offs);
cur_extra_size = rec_offs_extra_size(cur_offs);
ut_ad(rec_size == rec_offs_size(ins_offs));
cur_rec_size = rec_offs_size(cur_offs);
if (UNIV_LIKELY_NULL(heap)) {
mem_heap_free(heap);
}
}
ins_ptr = insert_rec - extra_size;
i = 0;
if (cur_extra_size == extra_size) {
ulint min_rec_size = ut_min(cur_rec_size, rec_size);
const byte* cur_ptr = cursor_rec - cur_extra_size;
/* Find out the first byte in insert_rec which differs from
cursor_rec; skip the bytes in the record info */
do {
if (*ins_ptr == *cur_ptr) {
i++;
ins_ptr++;
cur_ptr++;
} else if ((i < extra_size)
&& (i >= extra_size
- page_rec_get_base_extra_size
(insert_rec))) {
i = extra_size;
ins_ptr = insert_rec;
cur_ptr = cursor_rec;
} else {
break;
}
} while (i < min_rec_size);
}
byte* log_ptr;
if (mtr_get_log_mode(mtr) != MTR_LOG_SHORT_INSERTS) {
if (page_rec_is_comp(insert_rec)) {
log_ptr = mlog_open_and_write_index(
mtr, insert_rec, index, MLOG_COMP_REC_INSERT,
2 + 5 + 1 + 5 + 5 + MLOG_BUF_MARGIN);
if (UNIV_UNLIKELY(!log_ptr)) {
/* Logging in mtr is switched off
during crash recovery: in that case
mlog_open returns NULL */
return;
}
} else {
log_ptr = mlog_open(mtr, 11
+ 2 + 5 + 1 + 5 + 5
+ MLOG_BUF_MARGIN);
if (UNIV_UNLIKELY(!log_ptr)) {
/* Logging in mtr is switched off
during crash recovery: in that case
mlog_open returns NULL */
return;
}
log_ptr = mlog_write_initial_log_record_fast(
insert_rec, MLOG_REC_INSERT, log_ptr, mtr);
}
log_end = &log_ptr[2 + 5 + 1 + 5 + 5 + MLOG_BUF_MARGIN];
/* Write the cursor rec offset as a 2-byte ulint */
mach_write_to_2(log_ptr, page_offset(cursor_rec));
log_ptr += 2;
} else {
log_ptr = mlog_open(mtr, 5 + 1 + 5 + 5 + MLOG_BUF_MARGIN);
if (!log_ptr) {
/* Logging in mtr is switched off during crash
recovery: in that case mlog_open returns NULL */
return;
}
log_end = &log_ptr[5 + 1 + 5 + 5 + MLOG_BUF_MARGIN];
}
if (page_rec_is_comp(insert_rec)) {
if (UNIV_UNLIKELY
(rec_get_info_and_status_bits(insert_rec, TRUE)
!= rec_get_info_and_status_bits(cursor_rec, TRUE))) {
goto need_extra_info;
}
} else {
if (UNIV_UNLIKELY
(rec_get_info_and_status_bits(insert_rec, FALSE)
!= rec_get_info_and_status_bits(cursor_rec, FALSE))) {
goto need_extra_info;
}
}
if (extra_size != cur_extra_size || rec_size != cur_rec_size) {
need_extra_info:
/* Write the record end segment length
and the extra info storage flag */
log_ptr += mach_write_compressed(log_ptr,
2 * (rec_size - i) + 1);
/* Write the info bits */
mach_write_to_1(log_ptr,
rec_get_info_and_status_bits(
insert_rec,
page_rec_is_comp(insert_rec)));
log_ptr++;
/* Write the record origin offset */
log_ptr += mach_write_compressed(log_ptr, extra_size);
/* Write the mismatch index */
log_ptr += mach_write_compressed(log_ptr, i);
ut_a(i < UNIV_PAGE_SIZE);
ut_a(extra_size < UNIV_PAGE_SIZE);
} else {
/* Write the record end segment length
and the extra info storage flag */
log_ptr += mach_write_compressed(log_ptr, 2 * (rec_size - i));
}
/* Write to the log the inserted index record end segment which
differs from the cursor record */
rec_size -= i;
if (log_ptr + rec_size <= log_end) {
memcpy(log_ptr, ins_ptr, rec_size);
mlog_close(mtr, log_ptr + rec_size);
} else {
mlog_close(mtr, log_ptr);
ut_a(rec_size < UNIV_PAGE_SIZE);
mlog_catenate_string(mtr, ins_ptr, rec_size);
}
}
#else /* !UNIV_HOTBACKUP */
# define page_cur_insert_rec_write_log(ins_rec,size,cur,index,mtr) ((void) 0)
#endif /* !UNIV_HOTBACKUP */