第5章 输入输出(下)

 

5.5 Redis 对 epoll 的封装

Redis 的作者和 Nginx 的作者一样,不喜欢引入第三方的库,比如 libevent、libev 来做事件处理,而是自己封装了 epoll,不像 Memcachd 的 I/O 模型还得依赖 libevent。Redis 的 I/O 模型针对不同系统做了不同的实现,比如 Linux 中的实现是对 epoll 的封装,BSD 中的实现是对 kqueue 的封装。针对 Linux 的实现,我们来看其核心的 ae_epoll.c:

aeApiState 封装了 epoll_event:

typedef struct aeApiState {
    int epfd;
    struct epoll_event *events;
} aeApiState;

aeApiCreate 用于调用 epoll_create 创建 epoll:

static int aeApiCreate(aeEventLoop *eventLoop) {
    aeApiState *state = zmalloc(sizeof(aeApiState));
    …
    state->epfd = epoll_create(1024); // 1024是内核设置的默认值
 …
}

aeApiAddEvent 用于向 epoll 中注册事件:

static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
    aeApiState *state = eventLoop->apidata;
    struct epoll_event ee;
    …
    ee.data.fd = fd;
    if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
    return 0;
}

aeApiDelEvent 用于从 epoll 中删除事件:

static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask) {
    aeApiState *state = eventLoop->apidata;
    struct epoll_event ee;
    int mask = eventLoop->events[fd].mask & (~delmask);
    ee.events = 0;
    if (mask & AE_READABLE) ee.events |= EPOLLIN;
    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
    ee.data.u64 = 0; /* avoid valgrind warning */
    ee.data.fd = fd;
    if (mask != AE_NONE) {
        epoll_ctl(state->epfd,EPOLL_CTL_MOD,fd,&ee);
    } else {
        // 注意, Kernel < 2.6.9 EPOLL_CTL_DEL 需要一个非空的事件指针
        epoll_ctl(state->epfd,EPOLL_CTL_DEL,fd,&ee);
    }
}

aeApiPoll 通过调用 epoll_wait 等待 epoll 事件就绪:

static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    aeApiState *state = eventLoop->apidata;
    int retval, numevents = 0;
    retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
            tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
    if (retval > 0) {
        int j;
        numevents = retval;
        for (j = 0; j < numevents; j++) {
            int mask = 0;
            struct epoll_event *e = state->events+j;
            if (e->events & EPOLLIN) mask |= AE_READABLE;
            if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
            if (e->events & EPOLLERR) mask |= AE_WRITABLE;
            if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
            eventLoop->fired[j].fd = e->data.fd;
            eventLoop->fired[j].mask = mask;
        }
    }
    return numevents;
}

其中:

  • aeApiCreate:调用 epoll_create 创建了一个 epoll 池子。

  • aeApiAddEvent:调用 epoll_ctl 向 epoll 中注册事件。

  • aeApiPoll:通过调用 epoll_wait 来获取已经响应的事件。

那么这个过程是如何呢?我们来一步一步看 server epoll 初始化过程:

首先在 initServer 函数执行的时候初始化了 epoll:

server.el = aeCreateEventLoop(server.maxclients+REDIS_EVENTLOOP_FDSET_INCR);

接着设置回调函数:

aeSetBeforeSleepProc(server.el,beforeSleep);

再来看主循环中的 aeMain 函数:

void aeMain(aeEventLoop *eventLoop) {

  eventLoop->stop = 0;
  while (!eventLoop->stop) {
      if (eventLoop->beforesleep != NULL)
          eventLoop->beforesleep(eventLoop);
      aeProcessEvents(eventLoop, AE_ALL_EVENTS);
  }
}

最后循环调用 aeProcessEvents 来进行事件处理(见图5-10)。

第5章 输入输出(下)

图5-10 Redis epoll 主循和事件的关系

可以看到 eventLoop 会对两类事件进行处理,定时器事件和 file 事件。

最后我们来看 aeProcessEvents 函数:

int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    int processed = 0, numevents;
    …
        aeTimeEvent *shortest = NULL;
        struct timeval tv, *tvp;
        if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
            shortest = aeSearchNearestTimer(eventLoop);
        if (shortest) {
            long now_sec, now_ms;
            aeGetTime(&now_sec, &now_ms);
            tvp = &tv;
            tvp->tv_sec = shortest->when_sec - now_sec;
            if (shortest->when_ms < now_ms) {
                tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000;
                tvp->tv_sec --;
            } else {
                tvp->tv_usec = (shortest->when_ms - now_ms)*1000;
            }
            if (tvp->tv_sec < 0) tvp->tv_sec = 0;
            if (tvp->tv_usec < 0) tvp->tv_usec = 0;
        } else {
            // AE_DONT_WAIT 标志置位,则设置超时时间为0
            if (flags & AE_DONT_WAIT) {
                tv.tv_sec = tv.tv_usec = 0;
                tvp = &tv;
            } else {
                // 否则会发生阻塞
                tvp = NULL;         // 一直等待
            }
        }

        numevents = aeApiPoll(eventLoop, tvp);
        for (j = 0; j < numevents; j++) {
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int rfired = 0;
            if (fe->mask & mask & AE_READABLE) {
                rfired = 1;
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
            }
            if (fe->mask & mask & AE_WRITABLE) {
                if (!rfired || fe->wfileProc != fe->rfileProc)
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
            }
            processed++;
        }
    }
    if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(eventLoop);
    return processed;                 // 返回需要处理的 file/time 事件数量
}

这个函数大致上分为以下几个步骤:

1)aeSearchNearestTimer 查找是否有要优先处理的定时器任务,如果有就先处理。

2)假如没有,则执行 aeApiPoll 来处理 epoll 中的就绪事件,而且是无限等待的哦:

    if (flags & AE_DONT_WAIT) {
        tv.tv_sec = tv.tv_usec = 0;
        tvp = &tv;
    } else {
        tvp = NULL;
    }

3)处理定时器任务。

最后我们来看一下 Redis 的整体事件处理流程(见图5-11),由于 Redis 本身是单线程的,没有锁的竞争,为了提高处理的吞吐量,Redis 把工作的流程拆成了很多步,每步都是通过 epoll 的机制来回调,这样尽量不让一个请求 hold 住主线程,让系统的吞吐量得到有效的提升。

第5章 输入输出(下)

图5-11 Redis 的整体事件处理流程图

5.6 Nginx 文件异步 I/O

为了提升对 I/O 事件的及时响应速度,Linux 提供了 aio 机制,该机制实现了真正的异步 I/O 响应处理,不像 libc 的 aio 是异步线程伪装的。

因为 Linux 的 aio 对缓存不支持,所以在 Nginx 中,仅仅对读文件做了 aio 的支持。

aio 的使用可以分为以下几个步骤:

1)io_setup:初始化异步 I/O 上下文,类似于 epoll_create。

2)io_submit:注册异步事件和回调 handler。

ngx_epoll_module 在初始化的时候,会先进行 aio 的初始化:

ngx_epoll_aio_init(ngx_cycle_t *cycle, ngx_epoll_conf_t *epcf)
{
    int                    n;
    struct epoll_event  ee;

#if (NGX_HAVE_SYS_EVENTFD_H)
    ngx_eventfd = eventfd(0, 0);
#else
    ngx_eventfd = syscall(SYS_eventfd, 0);
#endif
…
    n = 1;

    if (ioctl(ngx_eventfd, FIONBIO, &n) == -1) {
        ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno,
                      "ioctl(eventfd, FIONBIO) failed");
        goto failed;
    }

    if (io_setup(epcf->aio_requests, &ngx_aio_ctx) == -1) {
        ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno,
                      "io_setup() failed");
        goto failed;
    }

    ngx_eventfd_event.data = &ngx_eventfd_conn;
    ngx_eventfd_event.handler = ngx_epoll_eventfd_handler;
    ngx_eventfd_event.log = cycle->log;
    ngx_eventfd_event.active = 1;
    ngx_eventfd_conn.fd = ngx_eventfd;
    ngx_eventfd_conn.read = &ngx_eventfd_event;
    ngx_eventfd_conn.log = cycle->log;
    ee.events = EPOLLIN|EPOLLET;
    ee.data.ptr = &ngx_eventfd_conn;

    if (epoll_ctl(ep, EPOLL_CTL_ADD, ngx_eventfd, &ee) != -1) {
        return;
    }
…
}

然后 Nginx 会在读取文件的时候调用 ngx_file_aio_read 函数进行异步读取:

ssize_t
ngx_file_aio_read(ngx_file_t *file, u_char *buf, size_t size, off_t offset,
    ngx_pool_t *pool)
{
    ngx_err_t         err;
    struct iocb      *piocb[1];
    ngx_event_t      *ev;
    ngx_event_aio_t  *aio;
    ...
    aio = file->aio;
    ev = &aio->event;
    ...
    if (ev->complete) {
        ev->active = 0;
        ev->complete = 0;

        if (aio->res >= 0) {
            ngx_set_errno(0);
            return aio->res;
        }

       ...
    }

    ngx_memzero(&aio->aiocb, sizeof(struct iocb));

    aio->aiocb.aio_data = (uint64_t) (uintptr_t) ev;
    aio->aiocb.aio_lio_opcode = IOCB_CMD_PREAD;
    aio->aiocb.aio_fildes = file->fd;
    aio->aiocb.aio_buf = (uint64_t) (uintptr_t) buf;
    aio->aiocb.aio_nbytes = size;
    aio->aiocb.aio_offset = offset;
    aio->aiocb.aio_flags = IOCB_FLAG_RESFD;
    aio->aiocb.aio_resfd = ngx_eventfd;

    ev->handler = ngx_file_aio_event_handler;

    piocb[0] = &aio->aiocb;

    if (io_submit(ngx_aio_ctx, 1, piocb) == 1) {
        ev->active = 1;
        ev->ready = 0;
        ev->complete = 0;

        return NGX_AGAIN;
    }
...
}

5.7 tail 指令为何牛

因为需要采集线上环境的数据,我开发了一个 Java agent 程序来采集相关信息。跑了一段时间后发现一个问题,假如采集数据量压力过大的话,会产生该进程占用 CPU 过高,例如100%以上的情况。

首先来看代理程序的伪代码:

public void run() {
    try {
    accesslog.seek(accesslog.length());
        int i = 0;
        while (!Thread.currentThread().isInterrupted()) {
            String line = accesslog.readLine();
            if (line != null) {
                try {
                    parseLineAndLog(line);
                } catch (Exception ex) {

                    LOGGER.error("parseLineAndLog log error:", ex);
                }
                try {
                    if (i++ % 100 == 0) {
                        Thread.sleep(100);
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    } catch (IOException e) {
        LOGGER.error("read ngx access log error:", e);
    }
}

我们首先通过 top-H-p${pid}观察该进程中的具体哪个线程占用 CPU 比较高。找到后,通过以下指令把 pid 转换成16进制的数据:

awk '{printf("%x",1234)}'

然后我们通过如下命令产生线程堆栈信息:

jstack ${pid} > stack.log

再通过刚才 awk 的16进制进程号查看,发现 jstack 中该进程一直在做如下操作:

String line = accesslog.readLine();

因为上面代码采用的 accesslog 其实是:

this.accesslog = new RandomAccessFile(path, "r");

而它的 readLine()方法是不阻塞的,轮询必然导致 CPU 占用率的提升。

那如何解决呢?我使用了 tail 指令:

 Process p = Runtime.getRuntime().exec("tail -n 1 -F " + path);
 br = new BufferedReader(new InputStreamReader(p.getInputStream()));

然后在 while 循环中用 br.readline 来解决问题。

上线后再观察,神奇的事情发生了,CPU 占用率始终控制在5%以下。

那么一条 tail 指令为什么能那么神奇呢?我们根据 Linus 大神的指示:从代码中寻找答案。

首先我们找到 tail 的源码:

http:// git.savannah.gnu.org/cgit/coreutils.git/tree/src/tail.c

经过一步一步分析后,我们发现,最终会调用 tail_forever_inotify 函数。

在2.6内核之后,Linux 提供了 inotify 功能,内核通过监控文件系统的变更来反向通知用户,这样减少了轮询的开销。我们来看其实现:

tail_forever_inotify (int wd, struct File_spec *f, size_t n_files,
                        double sleep_interval)
{
...
f[i].wd = inotify_add_watch (wd, f[i].name, inotify_wd_mask);
...
    if (pid)
        {
            if (writer_is_dead)
                exit (EXIT_SUCCESS);

            writer_is_dead = (kill (pid, 0) != 0 && errno != EPERM);

            struct timeval delay; // 等待文件变化的时间
            if (writer_is_dead)
                delay.tv_sec = delay.tv_usec = 0;
            else
                {
                    delay.tv_sec = (time_t) sleep_interval;
                    delay.tv_usec = 1000000 * (sleep_interval - delay.tv_sec);
                }

                fd_set rfd;
                FD_ZERO (&rfd);
                FD_SET (wd, &rfd);

                int file_change = select (wd + 1, &rfd, NULL, NULL, &delay);

                if (file_change == 0)
                    continue;
                else if (file_change == -1)
                    die (EXIT_FAILURE, errno, _("error monitoring inotify event"));
            }
...
 len = safe_read (wd, evbuf, evlen);
...

所以以上步骤主要分为三步:

1)注册 inotify 的 watch。

2)用 select 等待 watch 事件发生。

3)用 safe_read 读取准备好的数据。

5.8 零拷贝技术应用分析

在常见的 I/O 场景中,都是先通过 read+write(或 send)的方式来完成的,如图5-12所示,read 调用先从用户态切换到内核态,然后从文件中读取了数据,存储到内核的缓冲区中,然后再把数据从内核态缓冲区拷贝到用户态,同时从内核态切换到用户态。

接着用 send 写入到指定文件也是类似的过程,这里存在4次上下文切换和4次缓冲区的拷贝。

第5章 输入输出(下)

图5-12 一次 read/send 的过程

为了优化这个缓冲区拷贝和上下文切换的次数,Linux 提供了几种方案,下面分别介绍。

5.8.1 mmap

假如仅仅是把数据写入到文件,Linux 提供了 mmap 的方式来共享内存虚拟地址空间,这样只要写共享内存就是写文件,读共享内存就是读文件,减少了缓冲区拷贝的次数。

mmap 的实现最终通过 do_mmap 函数来实现:

unsigned long do_mmap(struct file *file, unsigned long addr,
            unsigned long len, unsigned long prot,
            unsigned long flags, vm_flags_t vm_flags,
            unsigned long pgoff, unsigned long *populate)
{
    struct mm_struct *mm = current->mm;                        // 当前进程的 mm
    …
    if ((prot & PROT_READ) && (current->personality & READ_IMPLIES_EXEC))
                                                                // 是否隐藏了可执行属性
        if (!(file && path_noexec(&file->f_path)))
            prot |= PROT_EXEC;
    if (!(flags & MAP_FIXED))        // MAP_FIXED没有设置
        addr = round_hint_to_min(addr);        // 判断输入的欲映射的起始地址是否小于最小映射地址,如果小于,将 addr 修改为最小地址
    len = PAGE_ALIGN(len);        // 检测 len 是否越界
    …
    if ((pgoff + (len >> PAGE_SHIFT)) < pgoff)                // 再次检测是否越界
        return -EOVERFLOW;
    if (mm->map_count > sysctl_max_map_count)        // 超过一个进程中对于 mmap 的最大个数限制
        return -ENOMEM;
    addr = get_unmapped_area(file, addr, len, pgoff, flags);        // 获取没有映射的地址(查询 mm 中空闲的内存地址)
    …
    // 设置 vm_flags,根据传入的 port 和 flags 以及 mm 自己的 flag 来设置
    vm_flags |= calc_vm_prot_bits(prot) | calc_vm_flag_bits(flags) |
            mm->def_flags | VM_MAYREAD | VM_MAYWRITE | VM_MAYEXEC;
    …
    if (file) {
        struct inode *inode = file_inode(file);
        switch (flags & MAP_TYPE) {
        case MAP_SHARED:
            if ((prot&PROT_WRITE) && !(file->f_mode&FMODE_WRITE))
                                                                // file 应该被打开并允许写入
                return -EACCES;
            if (IS_APPEND(inode) && (file->f_mode & FMODE_WRITE))
                                                                // 不能写入一个只允许写追加的文件
                return -EACCES;
            if (locks_verify_locked(file))                // 文件被强制锁定
                return -EAGAIN;
            vm_flags |= VM_SHARED | VM_MAYSHARE        // 尝试允许其他进程共享
            if (!(file->f_mode & FMODE_WRITE))                // 如果 file 不允许写,取消共享
                vm_flags &= ~(VM_MAYWRITE | VM_SHARED);
        …
        }
    }

    …
    addr = mmap_region(file, addr, len, vm_flags, pgoff); // 建立从文件到虚存区间的映射
    …
    return addr;
}

以上过程最重要的两步是:

1)get_unmapped_area 查询并获取当前进程虚拟地址空间中空闲的没有映射的地址。

2)mmap_region 建立从文件到虚存区间的映射。

最终映射后的关系如图5-13所示。

第5章 输入输出(下)

图5-13 文件和虚拟地址空间映射后的关系

5.8.2 sendfile

假如需要从一个文件读数据,并且写入到另一个文件,mmap 的方式还是会存在2次系统调用4次上下文切换,所以 Linux 又提供了 sendfile 的调用,1次系统调用搞定(见图5-14)。

第5章 输入输出(下)

图5-14 sendfile 调用过程

下面我们来分析一下 sendfile 的实现,sendfile 调用最终会调用 do_sendfile 函数:

static ssize_t do_sendfile(int out_fd, int in_fd, loff_t *ppos,
                    size_t count, loff_t max)
{
    file_start_write(out.file);
    retval = do_splice_direct(in.file, &pos, out.file, &out_pos, count, fl);
    file_end_write(out.file);
…
    return retval;
}

其中最关键的一行是 do_splice_direct:

long do_splice_direct(struct file *in, loff_t *ppos, struct file *out,
                loff_t *opos, size_t len, unsigned int flags)
{
    struct splice_desc sd = {
            .len        = len,
        .total_len      = len,
        .flags          = flags,
        .pos            = *ppos,
        .u.file         = out,
        .opos           = opos,
    };
    long ret;

    …
    ret = splice_direct_to_actor(in, &sd, direct_splice_actor);
    if (ret > 0)
        *ppos = sd.pos;
    return ret;
}

ssize_t splice_direct_to_actor(struct file *in, struct splice_desc *sd,
                 splice_direct_actor *actor)
{
    struct pipe_inode_info *pipe;
    long ret, bytes;
    umode_t i_mode;
    size_t len;
    int i, flags, more;
    …
    pipe = current->splice_pipe;
    if (unlikely(!pipe)) {
        pipe = alloc_pipe_info();
        …
        pipe->readers = 1;
        current->splice_pipe = pipe;
    }
    // 进行拼接
    ret = 0;
    bytes = 0;
    len = sd->total_len;
    flags = sd->flags;
     // 不要在输出的时候阻塞,我们需要清空 direct pipe
    sd->flags &= ~SPLICE_F_NONBLOCK;
    more = sd->flags & SPLICE_F_MORE;

    while (len) {
        size_t read_len;
        loff_t pos = sd->pos, prev_pos = pos;

        ret = do_splice_to(in, &pos, pipe, len, flags);
        …
        ret = actor(pipe, sd);
        if (unlikely(ret <= 0)) {
            sd->pos = prev_pos;
            goto out_release;
        }

        bytes += ret;
        len -= ret;
        sd->pos = pos;

        if (ret < read_len) {
            sd->pos = prev_pos + ret;
            goto out_release;
        }
    }

done:
    pipe->nrbufs = pipe->curbuf = 0;
    file_accessed(in);
    return bytes;
…
}

在上述代码中,总结起来就三个步骤:

1)alloc_pipe_info 分配 pipe 对象,pipe 其实就是个缓冲区。

2)do_splice_to 把 in 文件的数据读入到缓冲区。

3)actor 把缓冲区的数据读到 out 文件中。

5.8.3 mmap 和 sendfile 在开源软件中的使用

在 MongoDB 中,使用了操作系统底层提供的内存映射机制,即 mmap,数据文件使用 mmap 映射到内存空间进行管理,内存的管理(哪些数据何时换入/换出)完全交给 OS 管理。

MongoDB 对不同操作系统的 MemoryMappedFile 有不同的实现,我们这里针对 Linux 操作系统的实现来分析 MongoDB 中把文件数据映射到进程地址空间的操作:

void* MemoryMappedFile::map(const char *filename, unsigned long long &length, int options) {
    setFilename(filename);
    FileAllocator::get()->allocateAsap( filename, length );
    len = length;
    …
    unsigned long long filelen = lseek(fd, 0, SEEK_END);
    …
    lseek( fd, 0, SEEK_SET );
    void * view = mmap(NULL, length, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
    …
    views.push_back( view );
    return view;
}

MongoDB 通知操作系统去映射所有数据文件到内存,操作系统使用 mmap()系统调用来完成。从这一点看,数据文件,包括所有的 docments、collections 及其索引,都会被操作系统通过页(page)的方式交换到内存。如果有足够的内存,所有数据文件最终都会加载到内存中。

当内存发生了改变,比如一个写操作,产生的变化将会异步刷新到磁盘,但写操作仍是很快的,直接操作内存。数据量可以适应内存大小,从而达到一个理想状况——对磁盘的操作达到最小量。但是如果数据量超出内存,页面访问错误(page faults)将会悄悄上来,那么系统就会频繁访问内存,读写操作要慢很多。最糟糕的状况是数据量远大于内存,读写不稳定,性能急剧下降。

Kafka 是 Apache 社区下的消息中间件,在 Kafka 上,有两个原因可能导致低效:1)太多的网络请求;2)过多的字节拷贝。为了提高效率,Kafka 把 message 分成一组一组的,每次请求会把一组 message 发给相应的 consumer。此外,为了减少字节拷贝,采用了 sendfile 系统调用。

Kafka 设计了一种“标准字节消息”,Producer、Broker、Consumer 共享这一种消息格式。Kakfa 的消息日志在 broker 端就是一些目录文件,这些日志文件都是 MessageSet 按照这种“标准字节消息”格式写入磁盘的。

维持这种通用的格式对这些操作的优化尤为重要:持久化 log 块的网络传输。流行的 Unix 操作系统提供了一种非常高效的途径来实现页面缓存和 socket 之间的数据传递。在 Linux 操作系统中,这种方式称作:sendfile 系统调用(Java 提供了访问这个系统调用的方法:FileChannel.transferTo api)。

下面我们来分析在 Kafka 中的零拷贝流程。

首先我们来看 kafka 的服务端 socketServer 逻辑:

override def run() {
    startupComplete()
    while(isRunning) {
        try {
            // 配置任意一个新的可以用来排队的连接
            configureNewConnections()
            // 注册一个新的请求用来写
            processNewResponses()

            try {
              selector.poll(300)
            } catch {
              case...
            }

SocketServer 会 poll 队列,一旦对应的 KafkaChannel 写操作准备好了,就会调用 KafkaChannel 的 write 方法:

// KafkaChannel.scala
public Send write() throws IOException {
    if (send != null && send(send))
}
// KafkaChannel.scala
private boolean send(Send send) throws IOException {
    send.writeTo(transportLayer);
    if (send.completed())
        transportLayer.removeInterestOps(SelectionKey.OP_WRITE);
    return send.completed();
}

其中 write 会调用 send 方法,对应的 Send 对象其实就是上面我们注册的 FetchRes-ponseSend 对象。

这段代码里实际发送数据的代码是 send.writeTo(transportLayer),对应的 writeTo 方法为:

private val sends = new MultiSend(dest, JavaConversions.seqAsJavaList(fetchResponse.
    dataGroupedByTopic.toList.map {
    case(topic, data) => new TopicDataSend(dest, TopicData(topic,data.map{case
        (topicAndPartition, message) => (topicAndPartition.partition, message)}))
    }))
override def writeTo(channel: GatheringByteChannel): Long = {
    …
        written += sends.writeTo(channel)
    …
}

这里最后调用了 sends 的 writeTo 方法,而 sends 其实是个 MultiSend。MultiSend 里有两个东西:

  • topicAndPartition.partition:分区。

  • message:FetchResponsePartitionData。还记得这个 FetchResponsePartitionData 吗?我们的 MessageSet 就放在了这个对象里。

TopicDataSend 也包含了 sends,该 sends 包含了 PartitionDataSend,而 PartitionDataSend 则包含了 FetchResponsePartitionData。

最后进行 writeTo 的时候,其实是调用了:

// partitionData 就是 FetchResponsePartitionData,messages 就是 FileMessageSet
val bytesSent = partitionData.messages.writeTo(channel, messagesSentSize, messageSize - messagesSentSize)

FileMessageSet 也有个 writeTo 方法,就是我们之前已经提到过的那段代码:

def writeTo(destChannel: GatheringByteChannel, writePosition: Long, size: Int): Int = {
    ...
    val bytesTransferred = (destChannel match {
        case tl: TransportLayer => tl.transferFrom(channel, position, count)
        case dc => channel.transferTo(position, count, dc)
    }).toInt
    bytesTransferred
}

最后通过 tl.transferFrom(channel,position,count)来完成最后的数据发送的。trans-ferFrom 其实是 Kafka 自己封装的一个方法,最终里面调用的也是 transerTo:

public long transferFrom(FileChannel fileChannel, long position, long count) throws IOException {
    return fileChannel.transferTo(position, count, socketChannel);
}

5.9 本章小结

我们编写的应用程序或多或少都会涉及 I/O,比如读写数据库、读写网络等,总会遇到很多问题,例如在高并发场景下,如何编写高性能的服务端和客户端程序。对 I/O 的理解是否深入,关系到写出来的应用对性能的影响程度。

从狭义的角度来讲,I/O 就是 in 和 out 两条输入输出的汇编指令。但是从广义角度讲,I/O 可以涉及操作系统整个 I/O 模型的构建,系统从分层的角度,将数据从写文件开始最终转换成数据块并落入磁盘。从更深入的视角看,会涉及 epoll 这样的 I/O 多路复用模型。

因此,只有从操作系统的角度来理解 I/O,才能真正编写出高性能的应用程序。

上一篇:JavaNIO,AIO


下一篇:数组 – io_submit等待所有oracle dbwriter I / O.