binlog作为mysql中最重要的日志之一,能实现异常恢复以及主从复制。
我们主要讨论的是主从复制中的binlog,这里将以mysql5.7.13的源码为主要依据来分析binlog。
在主从复制中,binlog一般使用row模式,在主服务器上是binlog,在副服务器上是relaylog,在sql\binlog.h中is_relay_log这个变量来区分该情况.
在整个mysql中只有一个MYSQL_BIN_LOG实例,那就是在mysqld.h中定义的mysql_bin_log
1.binlog写定位的机制
以下是所有binlog的列表,通过show binary logs可以看到。
这个列表事实上是从mysqlbin-log.index的文件中拉出来的,我们称这个文件为binlog index 文件。
这些binlog相关文件一般在数据库路径下面。
这里我们要讲到binlog写的定位机制,其实通过binlog文件名和binlog文件偏移来定位binlog的写位置。
可以查看代码
sql\binlog.h:
class MYSQL_BIN_LOG: public TC_LOG
{
enum enum_log_state { LOG_OPENED, LOG_CLOSED, LOG_TO_BE_OPENED }; /* LOCK_log is inited by init_pthread_objects() */
mysql_mutex_t LOCK_log;
char *name;
char log_file_name[FN_REFLEN];
char db[NAME_LEN + ];
bool write_error, inited;
IO_CACHE log_file;
IO_CACHE index_file;
char index_file_name[FN_REFLEN];
index_file,log_file分别是binlog index文件以及binlog文件的读写io
log_file_name,index_file_name分别是binlog index文件以及binlog文件的文件名
include\my_sys.h:
typedef struct st_io_cache /* Used when cacheing files */
{
/* Offset in file corresponding to the first byte of uchar* buffer. */
my_off_t pos_in_file;
/*
The offset of end of file for READ_CACHE and WRITE_CACHE.
For SEQ_READ_APPEND it the maximum of the actual end of file and
the position represented by read_end.
*/
my_off_t end_of_file;
/* Points to current read position in the buffer */
uchar *read_pos;
/* the non-inclusive boundary in the buffer for the currently valid read */
uchar *read_end;
uchar *buffer; /* The read buffer */
/* Used in ASYNC_IO */
uchar *request_pos; /* Only used in WRITE caches and in SEQ_READ_APPEND to buffer writes */
uchar *write_buffer;
/*
Only used in SEQ_READ_APPEND, and points to the current read position
in the write buffer. Note that reads in SEQ_READ_APPEND caches can
happen from both read buffer (uchar* buffer) and write buffer
(uchar* write_buffer).
*/
uchar *append_read_pos;
/* Points to current write position in the write buffer */
uchar *write_pos;
/* The non-inclusive boundary of the valid write area */
uchar *write_end; /*
Current_pos and current_end are convenience variables used by
my_b_tell() and other routines that need to know the current offset
current_pos points to &write_pos, and current_end to &write_end in a
WRITE_CACHE, and &read_pos and &read_end respectively otherwise
*/
uchar **current_pos, **current_end; /*
The lock is for append buffer used in SEQ_READ_APPEND cache
need mutex copying from append buffer to read buffer.
*/
mysql_mutex_t append_buffer_lock;
/*
The following is used when several threads are reading the
same file in parallel. They are synchronized on disk
accesses reading the cached part of the file asynchronously.
It should be set to NULL to disable the feature. Only
READ_CACHE mode is supported.
*/
IO_CACHE_SHARE *share; /*
A caller will use my_b_read() macro to read from the cache
if the data is already in cache, it will be simply copied with
memcpy() and internal variables will be accordinging updated with
no functions invoked. However, if the data is not fully in the cache,
my_b_read() will call read_function to fetch the data. read_function
must never be invoked directly.
*/
int (*read_function)(struct st_io_cache *,uchar *,size_t);
/*
Same idea as in the case of read_function, except my_b_write() needs to
be replaced with my_b_append() for a SEQ_READ_APPEND cache
*/
int (*write_function)(struct st_io_cache *,const uchar *,size_t);
/*
Specifies the type of the cache. Depending on the type of the cache
certain operations might not be available and yield unpredicatable
results. Details to be documented later
*/
enum cache_type type;
/*
Callbacks when the actual read I/O happens. These were added and
are currently used for binary logging of LOAD DATA INFILE - when a
block is read from the file, we create a block create/append event, and
when IO_CACHE is closed, we create an end event. These functions could,
of course be used for other things
*/
IO_CACHE_CALLBACK pre_read;
IO_CACHE_CALLBACK post_read;
IO_CACHE_CALLBACK pre_close;
/*
Counts the number of times, when we were forced to use disk. We use it to
increase the binlog_cache_disk_use and binlog_stmt_cache_disk_use status
variables.
*/
ulong disk_writes;
void* arg; /* for use by pre/post_read */
char *file_name; /* if used with 'open_cached_file' */
char *dir,*prefix;
File file; /* file descriptor */
PSI_file_key file_key; /* instrumented file key */ /*
seek_not_done is set by my_b_seek() to inform the upcoming read/write
operation that a seek needs to be preformed prior to the actual I/O
error is 0 if the cache operation was successful, -1 if there was a
"hard" error, and the actual number of I/O-ed bytes if the read/write was
partial.
*/
int seek_not_done,error;
/* buffer_length is memory size allocated for buffer or write_buffer */
size_t buffer_length;
/* read_length is the same as buffer_length except when we use async io */
size_t read_length;
myf myflags; /* Flags used to my_read/my_write */
/*
alloced_buffer is 1 if the buffer was allocated by init_io_cache() and
0 if it was supplied by the user.
Currently READ_NET is the only one that will use a buffer allocated
somewhere else
*/
my_bool alloced_buffer;
} IO_CACHE;
看英文注释很容易明白pos_in_file就是在文件中的偏移量
再查看sql\binlog.h,这是定位binlog写位置的结构体
typedef struct st_log_info
{
char log_file_name[FN_REFLEN];
my_off_t index_file_offset, index_file_start_offset;
my_off_t pos;
bool fatal; // if the purge happens to give us a negative offset
int entry_index; //used in purge_logs(), calculatd in find_log_pos().
st_log_info()
: index_file_offset(), index_file_start_offset(),
pos(), fatal(), entry_index()
{
memset(log_file_name, , FN_REFLEN);
}
} LOG_INFO;
log_file_name,pos是binlog文件的文件名以及binlog文件的偏移量,一般pos不使用。
index_file_offset, index_file_start_offset是该binlog在index文件中所处的偏移量以及index文件下一个查找开始的偏移量。
这样我们来看sql\binlog.cc代码find_log_pos以及find_log_next
/**
Find the position in the log-index-file for the given log name. @param[out] linfo The found log file name will be stored here, along
with the byte offset of the next log file name in the index file.
@param log_name Filename to find in the index file, or NULL if we
want to read the first entry.
@param need_lock_index If false, this function acquires LOCK_index;
otherwise the lock should already be held by the caller. @note
On systems without the truncate function the file will end with one or
more empty lines. These will be ignored when reading the file. @retval
0 ok
@retval
LOG_INFO_EOF End of log-index-file found
@retval
LOG_INFO_IO Got IO error while reading file
*/ int MYSQL_BIN_LOG::find_log_pos(LOG_INFO *linfo, const char *log_name,
bool need_lock_index)
{
int error= ;
char *full_fname= linfo->log_file_name;
char full_log_name[FN_REFLEN], fname[FN_REFLEN];
size_t log_name_len= , fname_len= ;
DBUG_ENTER("find_log_pos");
full_log_name[]= full_fname[]= ; /*
Mutex needed because we need to make sure the file pointer does not
move from under our feet
*/
if (need_lock_index)
mysql_mutex_lock(&LOCK_index);
else
mysql_mutex_assert_owner(&LOCK_index); // extend relative paths for log_name to be searched
if (log_name)
{
if(normalize_binlog_name(full_log_name, log_name, is_relay_log))
{
error= LOG_INFO_EOF;
goto end;
}
} log_name_len= log_name ? strlen(full_log_name) : ;
DBUG_PRINT("enter", ("log_name: %s, full_log_name: %s",
log_name ? log_name : "NULL", full_log_name)); /* As the file is flushed, we can't get an error here */
my_b_seek(&index_file, (my_off_t) ); for (;;)
{
size_t length;
my_off_t offset= my_b_tell(&index_file); DBUG_EXECUTE_IF("simulate_find_log_pos_error",
error= LOG_INFO_EOF; break;);
/* If we get 0 or 1 characters, this is the end of the file */
if ((length= my_b_gets(&index_file, fname, FN_REFLEN)) <= )
{
/* Did not find the given entry; Return not found or error */
error= !index_file.error ? LOG_INFO_EOF : LOG_INFO_IO;
break;
} // extend relative paths and match against full path
if (normalize_binlog_name(full_fname, fname, is_relay_log))
{
error= LOG_INFO_EOF;
break;
}
fname_len= strlen(full_fname); // if the log entry matches, null string matching anything
if (!log_name ||
(log_name_len == fname_len &&
!memcmp(full_fname, full_log_name, log_name_len)))
{
DBUG_PRINT("info", ("Found log file entry"));
linfo->index_file_start_offset= offset;
linfo->index_file_offset = my_b_tell(&index_file);
break;
}
linfo->entry_index++;
} end:
if (need_lock_index)
mysql_mutex_unlock(&LOCK_index);
DBUG_RETURN(error);
} /**
Find the position in the log-index-file for the given log name. @param[out] linfo The filename will be stored here, along with the
byte offset of the next filename in the index file. @param need_lock_index If true, LOCK_index will be acquired;
otherwise it should already be held by the caller. @note
- Before calling this function, one has to call find_log_pos()
to set up 'linfo'
- Mutex needed because we need to make sure the file pointer does not move
from under our feet @retval 0 ok
@retval LOG_INFO_EOF End of log-index-file found
@retval LOG_INFO_IO Got IO error while reading file
*/
int MYSQL_BIN_LOG::find_next_log(LOG_INFO* linfo, bool need_lock_index)
{
int error= ;
size_t length;
char fname[FN_REFLEN];
char *full_fname= linfo->log_file_name; if (need_lock_index)
mysql_mutex_lock(&LOCK_index);
else
mysql_mutex_assert_owner(&LOCK_index); /* As the file is flushed, we can't get an error here */
my_b_seek(&index_file, linfo->index_file_offset); linfo->index_file_start_offset= linfo->index_file_offset;
if ((length=my_b_gets(&index_file, fname, FN_REFLEN)) <= )
{
error = !index_file.error ? LOG_INFO_EOF : LOG_INFO_IO;
goto err;
} if (fname[] != )
{
if(normalize_binlog_name(full_fname, fname, is_relay_log))
{
error= LOG_INFO_EOF;
goto err;
}
length= strlen(full_fname);
} linfo->index_file_offset= my_b_tell(&index_file); err:
if (need_lock_index)
mysql_mutex_unlock(&LOCK_index);
return error;
}
可以看到find_log_pos使用过binlog的文件名在index文件中的偏移量来查找,具体是将两个文件名转化为绝对路径来比较的。
而find_log_next通过读取下一行的文件名来查看定位下一个文件。这两个函数被用于重启时恢复数据库等。
最后通过raw_get_current_log可以找到当前binlog的定位,这个函数被用于show master status
int MYSQL_BIN_LOG::raw_get_current_log(LOG_INFO* linfo)
{
strmake(linfo->log_file_name, log_file_name, sizeof(linfo->log_file_name)-);
linfo->pos = my_b_safe_tell(&log_file);
return ;
}
2.binlog的文件格式以及更新时机
binlog的文件格式是由binlog_event的格式写入的,可以用show binlog events in 'log file name'或者show binlog events等SQL命令查看。
写binlog文件首先写一个格式信息就是format_desc和previous_gtids两个binlog_event。
写binlog的时机(常见的)
(1)非事务性的语句,如一些ddl,仅仅记录sql语句。
1)关于数据库的
在sql\sql_db.cc中的
删除数据库、创建数据库、修改数据库名、升级数据库
2)视图
创建视图、修改视图、删除视图
3)表
创建表,丢弃或导入表,简单地重命名或开关索引,修改表。
3)其他
在sql_base.cc中close_temporary_tables 关闭临时表==>MYSQL_BIN_LOG::write_event
在sp_head.cc中==>MYSQL_BIN_LOG::write_event
4)灾难错误
MYSQL_BIN_LOG::write_incident
(2)事务性的语句,如一些ddl,先让事务开始,有一个开始标识,再提交修改表的信息,然后记录修改前后的内容,最后提交事务。
即以这样的形式记录
1)insert, update, delete
2)删除表 回滚
还可以使用程序mysqlbinlog查看:
../bin/mysqlbinlog --base64-output=DECODE-ROWS -v mysqlbin-log.
3)在rpl_injector.h中 use_table(tablemap) write_row(inster) delete_row update_row commit rollback
4)在transaction.h中
bool trans_check_state(THD *thd);
void trans_reset_one_shot_chistics(THD *thd);
void trans_track_end_trx(THD *thd); bool trans_begin(THD *thd, uint flags= );
bool trans_commit(THD *thd);
bool trans_commit_implicit(THD *thd);
bool trans_rollback(THD *thd);
bool trans_rollback_implicit(THD *thd); bool trans_commit_stmt(THD *thd);
bool trans_rollback_stmt(THD *thd);
bool trans_commit_attachable(THD *thd); bool trans_savepoint(THD *thd, LEX_STRING name);
bool trans_rollback_to_savepoint(THD *thd, LEX_STRING name);
bool trans_release_savepoint(THD *thd, LEX_STRING name);
(3)binlog异常恢复
这个是指mysql在重启时的异常恢复,代码入口在mysqld.h的4137行 open函数,具体实现在binlog.h的683行
int open(const char *opt_name) { return open_binlog(opt_name); }
open_binlog是通过找到binlog的最后一个binlog文件,具体请看代码
do
{
strmake(log_name, log_info.log_file_name, sizeof(log_name)-);
} while (!(error= find_next_log(&log_info, true/*need_lock_index=true*/)));
接着打开相关文件,开始读取。
if ((file= open_binlog_file(&log, log_name, &errmsg)) < )
{
sql_print_error("%s", errmsg);
goto err;
} my_stat(log_name, &s, MYF());
binlog_size= s.st_size; /*
If the binary log was not properly closed it means that the server
may have crashed. In that case, we need to call MYSQL_BIN_LOG::recover
to: a) collect logged XIDs;
b) complete the 2PC of the pending XIDs;
c) collect the last valid position. Therefore, we do need to iterate over the binary log, even if
total_ha_2pc == 1, to find the last valid group of events written.
Later we will take this value and truncate the log if need be.
*/
if ((ev= Log_event::read_log_event(&log, , &fdle,
opt_master_verify_checksum)) &&
ev->get_type_code() == binary_log::FORMAT_DESCRIPTION_EVENT &&
(ev->common_header->flags & LOG_EVENT_BINLOG_IN_USE_F ||
DBUG_EVALUATE_IF("eval_force_bin_log_recovery", true, false)))
{
sql_print_information("Recovering after a crash using %s", opt_name);
valid_pos= my_b_tell(&log);
error= recover(&log, (Format_description_log_event *)ev, &valid_pos);
}
else
error=;
在recove中实现了恢复,把其中的没有commit的命令执行一遍来实现的。
/**
MYSQLD server recovers from last crashed binlog. @param log IO_CACHE of the crashed binlog.
@param fdle Format_description_log_event of the crashed binlog.
@param valid_pos The position of the last valid transaction or
event(non-transaction) of the crashed binlog. @retval
0 ok
@retval
1 error
*/
int MYSQL_BIN_LOG::recover(IO_CACHE *log, Format_description_log_event *fdle,
my_off_t *valid_pos)
{
Log_event *ev;
HASH xids;
MEM_ROOT mem_root;
/*
The flag is used for handling the case that a transaction
is partially written to the binlog.
*/
bool in_transaction= FALSE; if (! fdle->is_valid() ||
my_hash_init(&xids, &my_charset_bin, TC_LOG_PAGE_SIZE/, ,
sizeof(my_xid), , , MYF(),
key_memory_binlog_recover_exec))
goto err1; init_alloc_root(key_memory_binlog_recover_exec,
&mem_root, TC_LOG_PAGE_SIZE, TC_LOG_PAGE_SIZE); while ((ev= Log_event::read_log_event(log, , fdle, TRUE))
&& ev->is_valid())
{
if (ev->get_type_code() == binary_log::QUERY_EVENT &&
!strcmp(((Query_log_event*)ev)->query, "BEGIN"))
in_transaction= TRUE; if (ev->get_type_code() == binary_log::QUERY_EVENT &&
!strcmp(((Query_log_event*)ev)->query, "COMMIT"))
{
DBUG_ASSERT(in_transaction == TRUE);
in_transaction= FALSE;
}
else if (ev->get_type_code() == binary_log::XID_EVENT)
{
DBUG_ASSERT(in_transaction == TRUE);
in_transaction= FALSE;
Xid_log_event *xev=(Xid_log_event *)ev;
uchar *x= (uchar *) memdup_root(&mem_root, (uchar*) &xev->xid,
sizeof(xev->xid));
if (!x || my_hash_insert(&xids, x))
goto err2;
} /*
Recorded valid position for the crashed binlog file
which did not contain incorrect events. The following
positions increase the variable valid_pos: 1 -
...
<---> HERE IS VALID <--->
GTID
BEGIN
...
COMMIT
... 2 -
...
<---> HERE IS VALID <--->
GTID
DDL/UTILITY
... In other words, the following positions do not increase
the variable valid_pos: 1 -
GTID
<---> HERE IS VALID <--->
... 2 -
GTID
BEGIN
<---> HERE IS VALID <--->
...
*/
if (!log->error && !in_transaction &&
!is_gtid_event(ev))
*valid_pos= my_b_tell(log); delete ev;
} /*
Call ha_recover if and only if there is a registered engine that
does 2PC, otherwise in DBUG builds calling ha_recover directly
will result in an assert. (Production builds would be safe since
ha_recover returns right away if total_ha_2pc <= opt_log_bin.)
*/
if (total_ha_2pc > && ha_recover(&xids))
goto err2; free_root(&mem_root, MYF());
my_hash_free(&xids);
return ; err2:
free_root(&mem_root, MYF());
my_hash_free(&xids);
err1:
sql_print_error("Crash recovery failed. Either correct the problem "
"(if it's, for example, out of memory error) and restart, "
"or delete (or rename) binary log and start mysqld with "
"--tc-heuristic-recover={commit|rollback}");
return ;
}
在通过调用ha_recove对弈每一条记录进行执行的操作,ha_recover使用哈希表存储数据进行恢复操作的
int ha_recover(HASH *commit_list)
{
struct xarecover_st info;
DBUG_ENTER("ha_recover");
info.found_foreign_xids= info.found_my_xids= ;
info.commit_list= commit_list;
info.dry_run= (info.commit_list == &&
tc_heuristic_recover == TC_HEURISTIC_NOT_USED);
info.list= NULL; /* commit_list and tc_heuristic_recover cannot be set both */
DBUG_ASSERT(info.commit_list == ||
tc_heuristic_recover == TC_HEURISTIC_NOT_USED);
/* if either is set, total_ha_2pc must be set too */
DBUG_ASSERT(info.dry_run || total_ha_2pc>(ulong)opt_bin_log); if (total_ha_2pc <= (ulong)opt_bin_log)
DBUG_RETURN(); if (info.commit_list)
sql_print_information("Starting crash recovery..."); if (total_ha_2pc > (ulong)opt_bin_log + )
{
if (tc_heuristic_recover == TC_HEURISTIC_RECOVER_ROLLBACK)
{
sql_print_error("--tc-heuristic-recover rollback strategy is not safe "
"on systems with more than one 2-phase-commit-capable "
"storage engine. Aborting crash recovery.");
DBUG_RETURN();
}
}
else
{
/*
If there is only one 2pc capable storage engine it is always safe
to rollback. This setting will be ignored if we are in automatic
recovery mode.
*/
tc_heuristic_recover= TC_HEURISTIC_RECOVER_ROLLBACK; // forcing ROLLBACK
info.dry_run= false;
} for (info.len= MAX_XID_LIST_SIZE ;
info.list == && info.len > MIN_XID_LIST_SIZE; info.len/= )
{
info.list= (XID *)my_malloc(key_memory_XID,
info.len * sizeof(XID), MYF());
}
if (!info.list)
{
sql_print_error(ER(ER_OUTOFMEMORY),
static_cast<int>(info.len * sizeof(XID)));
DBUG_RETURN();
} plugin_foreach(NULL, xarecover_handlerton,
MYSQL_STORAGE_ENGINE_PLUGIN, &info); my_free(info.list);
if (info.found_foreign_xids)
sql_print_warning("Found %d prepared XA transactions",
info.found_foreign_xids);
if (info.dry_run && info.found_my_xids)
{
sql_print_error("Found %d prepared transactions! It means that mysqld was "
"not shut down properly last time and critical recovery "
"information (last binlog or %s file) was manually deleted"
" after a crash. You have to start mysqld with "
"--tc-heuristic-recover switch to commit or rollback "
"pending transactions.",
info.found_my_xids, opt_tc_log_file);
DBUG_RETURN();
}
if (info.commit_list)
sql_print_information("Crash recovery finished.");
DBUG_RETURN();
}
其中plugin_foreach(NULL, xarecover_handlerton,MYSQL_STORAGE_ENGINE_PLUGIN, &info);每一条进行如下xarecover_handlerton操作
static my_bool xarecover_handlerton(THD *unused, plugin_ref plugin,
void *arg)
{
handlerton *hton= plugin_data<handlerton*>(plugin);
struct xarecover_st *info= (struct xarecover_st *) arg;
int got; if (hton->state == SHOW_OPTION_YES && hton->recover)
{
while ((got= hton->recover(hton, info->list, info->len)) > )
{
sql_print_information("Found %d prepared transaction(s) in %s",
got, ha_resolve_storage_engine_name(hton));
for (int i= ; i < got; i++)
{
my_xid x= info->list[i].get_my_xid();
if (!x) // not "mine" - that is generated by external TM
{
#ifndef DBUG_OFF
char buf[XIDDATASIZE * + ]; // see xid_to_str
XID *xid= info->list + i;
sql_print_information("ignore xid %s", xid->xid_to_str(buf));
#endif
transaction_cache_insert_recovery(info->list + i);
info->found_foreign_xids++;
continue;
}
if (info->dry_run)
{
info->found_my_xids++;
continue;
}
// recovery mode
if (info->commit_list ?
my_hash_search(info->commit_list, (uchar *)&x, sizeof(x)) != :
tc_heuristic_recover == TC_HEURISTIC_RECOVER_COMMIT)
{
#ifndef DBUG_OFF
char buf[XIDDATASIZE * + ]; // see xid_to_str
XID *xid= info->list + i;
sql_print_information("commit xid %s", xid->xid_to_str(buf));
#endif
hton->commit_by_xid(hton, info->list + i);
}
else
{
#ifndef DBUG_OFF
char buf[XIDDATASIZE * + ]; // see xid_to_str
XID *xid= info->list + i;
sql_print_information("rollback xid %s", xid->xid_to_str(buf));
#endif
hton->rollback_by_xid(hton, info->list + i);
}
}
if (got < info->len)
break;
}
}
return false;
}
hton->recover(hton, info->list, info->len)是个函数指针
赋值如下:
nnobase_hton->recover = innobase_xa_recover
具体实现如下:
static
int
innobase_xa_recover(
/*================*/
handlerton* hton, /*!< in: InnoDB handlerton */
XID* xid_list,/*!< in/out: prepared transactions */
uint len) /*!< in: number of slots in xid_list */
{
return(trx_recover_for_mysql(xid_list, len));
} int
trx_recover_for_mysql(
/*==================*/
XID* xid_list, /*!< in/out: prepared transactions */
ulint len) /*!< in: number of slots in xid_list */
{
const trx_t* trx;
ulint count = ; ut_ad(xid_list);
ut_ad(len); /* We should set those transactions which are in the prepared state
to the xid_list */ trx_sys_mutex_enter(); for (trx = UT_LIST_GET_FIRST(trx_sys->rw_trx_list);
trx != NULL;
trx = UT_LIST_GET_NEXT(trx_list, trx)) { assert_trx_in_rw_list(trx); /* The state of a read-write transaction cannot change
from or to NOT_STARTED while we are holding the
trx_sys->mutex. It may change to PREPARED, but not if
trx->is_recovered. It may also change to COMMITTED. */
if (trx_state_eq(trx, TRX_STATE_PREPARED)) {
xid_list[count] = *trx->xid; if (count == ) {
ib::info() << "Starting recovery for"
" XA transactions...";
} ib::info() << "Transaction "
<< trx_get_id_for_print(trx)
<< " in prepared state after recovery"; ib::info() << "Transaction contains changes to "
<< trx->undo_no << " rows"; count++; if (count == len) {
break;
}
}
} trx_sys_mutex_exit(); if (count > ){
ib::info() << count << " transactions in prepared state"
" after recovery";
} return(int (count));
}