目的
fio 本身支持测试多种引擎,为了提高准确度,其自身线程开销就要求比较小,线程模型就要求比较合适。
为此,有必要了解FIO内部io 请求提交和完成的流程。
异步提交过程
fio.c main()
51 fio_time_init(); 1508 /*
52 1509 * Entry point for the thread based jobs. The process based jobs end up
53 if (nr_clients) { 1510 * here as well, after a little setup.
54 set_genesis_time(); 1511 */
55 1512 static void *thread_main(void *data)
56 if (fio_start_all_clients()) 1513 {
57 goto done_key; 1514 struct fork_data *fd = data;
58 ret = fio_handle_clients(&fio_client_ops); 1515 unsigned long long elapsed_us[DDIR_RWDIR_CNT] = { 0, };
59 } else 1516 struct thread_data *td = fd->td;
60 ret = fio_backend(NULL);
重点在fio_backend()函数里面,主要内容如下:
int fio_backend(struct sk_out *sk_out)
{
struct thread_data *td;
int i;
if (exec_profile) {
if (load_profile(exec_profile))
return 1;
free(exec_profile);
exec_profile = NULL;
}
if (!thread_number)
return 0;
if (write_bw_log) {
struct log_params p = {
.log_type = IO_LOG_TYPE_BW,
};
setup_log(&agg_io_log[DDIR_READ], &p, "agg-read_bw.log");
setup_log(&agg_io_log[DDIR_WRITE], &p, "agg-write_bw.log");
setup_log(&agg_io_log[DDIR_TRIM], &p, "agg-trim_bw.log");
}
startup_sem = fio_sem_init(FIO_SEM_LOCKED);
if (startup_sem == NULL)
return 1;
set_genesis_time();
stat_init();
helper_thread_create(startup_sem, sk_out);
cgroup_list = smalloc(sizeof(*cgroup_list));
if (cgroup_list)
INIT_FLIST_HEAD(cgroup_list);
run_threads(sk_out);
helper_thread_exit();
......
上面主要的函数是 run_threads(),它会建立主要的IO线程:
建立IO 提交的线程 thread_main
td->rusage_sem = fio_sem_init(FIO_SEM_LOCKED);
td->update_rusage = 0;
/*
* Set state to created. Thread will transition
* to TD_INITIALIZED when it's done setting up.
*/
td_set_runstate(td, TD_CREATED);
map[this_jobs++] = td;
nr_started++;
fd = calloc(1, sizeof(*fd));
fd->td = td;
fd->sk_out = sk_out;
if (td->o.use_thread) {
int ret;
dprint(FD_PROCESS, "will pthread_create\n");
ret = pthread_create(&td->thread, NULL,
thread_main, fd);
if (ret) {
log_err("pthread_create: %s\n",
strerror(ret));
free(fd);
nr_started--;
break;
}
thread_main里和IO相关的核心函数是:do_io
while (keep_running(td)) {
uint64_t verify_bytes;
fio_gettime(&td->start, NULL);
memcpy(&td->ts_cache, &td->start, sizeof(td->start));
if (clear_state) {
clear_io_state(td, 0);
if (o->unlink_each_loop && unlink_all_files(td))
break;
}
prune_io_piece_log(td);
if (td->o.verify_only && td_write(td))
verify_bytes = do_dry_run(td);
else {
do_io(td, bytes_done);
if (!ddir_rw_sum(bytes_done)) {
fio_mark_td_terminate(td);
verify_bytes = 0;
} else {
verify_bytes = bytes_done[DDIR_WRITE] +
bytes_done[DDIR_TRIM];
}
}
分析do_io函数可以看到IO 提交和收割的过程:
/*
* Main IO worker function. It retrieves io_u's to process and queues
* and reaps them, checking for rate and errors along the way.
*
* Returns number of bytes written and trimmed.
*/
static void do_io(struct thread_data *td, uint64_t *bytes_done)
{
unsigned int i;
int ret = 0;
uint64_t total_bytes, bytes_issued = 0;
for (i = 0; i < DDIR_RWDIR_CNT; i++)
bytes_done[i] = td->bytes_done[i];
if (in_ramp_time(td))
td_set_runstate(td, TD_RAMP);
else
td_set_runstate(td, TD_RUNNING);
lat_target_init(td);
total_bytes = td->o.size;
/*
* Allow random overwrite workloads to write up to io_size
* before starting verification phase as 'size' doesn't apply.
*/
if (td_write(td) && td_random(td) && td->o.norandommap)
total_bytes = max(total_bytes, (uint64_t) td->o.io_size);
/*
重点关注下面的提交请求的函数:
ret = io_u_submit(td, io_u);
if (should_check_rate(td))
td->rate_next_io_time[ddir] = usec_for_io(td, ddir);
if (io_queue_event(td, io_u, &ret, ddir, &bytes_issued, 0, &comp_time))
break;
/*
* See if we need to complete some commands. Note that
* we can get BUSY even without IO queued, if the
* system is resource starved.
*/
reap:
full = queue_full(td) ||
(ret == FIO_Q_BUSY && td->cur_depth);
if (full || io_in_polling(td))
ret = wait_for_completions(td, &comp_time);
其中上面io_u_submit()会调用实际存储引擎注册的io_submit 函数;
wait_for_completions 会调用后端实际存储引擎注册的getevents 函数。(wait_for_completions <--io_u_queued_complete() 循环
<--ret = td_io_getevents(td, min_evts, td->o.iodepth_batch_complete_max, tvp);)
io_queue_event()判断当前是否还有没有处理完的io events;
特别要关注的是何时收割event的逻辑:
当可用来提交io 请求的空闲槽位都占满了,或者前端有正在执行的polling 操作的时候,就调用注册的存储引擎的get_events函数。