FIO线程模型

目的

fio 本身支持测试多种引擎,为了提高准确度,其自身线程开销就要求比较小,线程模型就要求比较合适。
为此,有必要了解FIO内部io 请求提交和完成的流程。

异步提交过程

fio.c main()

51     fio_time_init();                                                                                         1508 /*
 52                                                                                                              1509  * Entry point for the thread based jobs. The process based jobs end up
 53     if (nr_clients) {                                                                                        1510  * here as well, after a little setup.
 54         set_genesis_time();                                                                                  1511  */
 55                                                                                                              1512 static void *thread_main(void *data)
 56         if (fio_start_all_clients())                                                                         1513 {
 57             goto done_key;                                                                                   1514     struct fork_data *fd = data;
 58         ret = fio_handle_clients(&fio_client_ops);                                                           1515     unsigned long long elapsed_us[DDIR_RWDIR_CNT] = { 0, };
 59     } else                                                                                                   1516     struct thread_data *td = fd->td;
 60         ret = fio_backend(NULL);

重点在fio_backend()函数里面,主要内容如下:

int fio_backend(struct sk_out *sk_out)
{
    struct thread_data *td;
    int i;

    if (exec_profile) {
        if (load_profile(exec_profile))
            return 1;
        free(exec_profile);
        exec_profile = NULL;
    }
    if (!thread_number)
        return 0;

    if (write_bw_log) {
        struct log_params p = {
            .log_type = IO_LOG_TYPE_BW,
        };

        setup_log(&agg_io_log[DDIR_READ], &p, "agg-read_bw.log");
        setup_log(&agg_io_log[DDIR_WRITE], &p, "agg-write_bw.log");
        setup_log(&agg_io_log[DDIR_TRIM], &p, "agg-trim_bw.log");
    }

    startup_sem = fio_sem_init(FIO_SEM_LOCKED);
    if (startup_sem == NULL)
        return 1;

    set_genesis_time();
    stat_init();
    helper_thread_create(startup_sem, sk_out);

    cgroup_list = smalloc(sizeof(*cgroup_list));
    if (cgroup_list)
        INIT_FLIST_HEAD(cgroup_list);

    run_threads(sk_out);

    helper_thread_exit();
......

上面主要的函数是 run_threads(),它会建立主要的IO线程:

建立IO 提交的线程 thread_main

td->rusage_sem = fio_sem_init(FIO_SEM_LOCKED);
            td->update_rusage = 0;

            /*
             * Set state to created. Thread will transition
             * to TD_INITIALIZED when it's done setting up.
             */
            td_set_runstate(td, TD_CREATED);
            map[this_jobs++] = td;
            nr_started++;

            fd = calloc(1, sizeof(*fd));
            fd->td = td;
            fd->sk_out = sk_out;

            if (td->o.use_thread) {
                int ret;

                dprint(FD_PROCESS, "will pthread_create\n");
                ret = pthread_create(&td->thread, NULL,
                            thread_main, fd);
                if (ret) {
                    log_err("pthread_create: %s\n",
                            strerror(ret));
                    free(fd);
                    nr_started--;
                    break;
                }

thread_main里和IO相关的核心函数是:do_io

while (keep_running(td)) {
        uint64_t verify_bytes;

        fio_gettime(&td->start, NULL);
        memcpy(&td->ts_cache, &td->start, sizeof(td->start));

        if (clear_state) {
            clear_io_state(td, 0);

            if (o->unlink_each_loop && unlink_all_files(td))
                break;
        }

        prune_io_piece_log(td);

        if (td->o.verify_only && td_write(td))
            verify_bytes = do_dry_run(td);
        else {
            do_io(td, bytes_done);

            if (!ddir_rw_sum(bytes_done)) {
                fio_mark_td_terminate(td);
                verify_bytes = 0;
            } else {
                verify_bytes = bytes_done[DDIR_WRITE] +
                        bytes_done[DDIR_TRIM];
            }
        }

分析do_io函数可以看到IO 提交和收割的过程:


/*
 * Main IO worker function. It retrieves io_u's to process and queues
 * and reaps them, checking for rate and errors along the way.
 *
 * Returns number of bytes written and trimmed.
 */
static void do_io(struct thread_data *td, uint64_t *bytes_done)
{
    unsigned int i;
    int ret = 0;
    uint64_t total_bytes, bytes_issued = 0;

    for (i = 0; i < DDIR_RWDIR_CNT; i++)
        bytes_done[i] = td->bytes_done[i];

    if (in_ramp_time(td))
        td_set_runstate(td, TD_RAMP);
    else
        td_set_runstate(td, TD_RUNNING);

    lat_target_init(td);

    total_bytes = td->o.size;
    /*
    * Allow random overwrite workloads to write up to io_size
    * before starting verification phase as 'size' doesn't apply.
    */
    if (td_write(td) && td_random(td) && td->o.norandommap)
        total_bytes = max(total_bytes, (uint64_t) td->o.io_size);
    /*

重点关注下面的提交请求的函数:


           ret = io_u_submit(td, io_u);

            if (should_check_rate(td))
                td->rate_next_io_time[ddir] = usec_for_io(td, ddir);

            if (io_queue_event(td, io_u, &ret, ddir, &bytes_issued, 0, &comp_time))
                break;

            /*
             * See if we need to complete some commands. Note that
             * we can get BUSY even without IO queued, if the
             * system is resource starved.
             */
reap:
            full = queue_full(td) ||
                (ret == FIO_Q_BUSY && td->cur_depth);
            if (full || io_in_polling(td))
                ret = wait_for_completions(td, &comp_time);

其中上面io_u_submit()会调用实际存储引擎注册的io_submit 函数;

wait_for_completions 会调用后端实际存储引擎注册的getevents 函数。(wait_for_completions <--io_u_queued_complete() 循环
<--ret = td_io_getevents(td, min_evts, td->o.iodepth_batch_complete_max, tvp);)
io_queue_event()判断当前是否还有没有处理完的io events;

特别要关注的是何时收割event的逻辑:
当可用来提交io 请求的空闲槽位都占满了,或者前端有正在执行的polling 操作的时候,就调用注册的存储引擎的get_events函数。

上一篇:linux使用FIO测试磁盘的iops


下一篇:IVS_技术