5.5 Redis 对 epoll 的封装
Redis 的作者和 Nginx 的作者一样,不喜欢引入第三方的库,比如 libevent、libev 来做事件处理,而是自己封装了 epoll,不像 Memcachd 的 I/O 模型还得依赖 libevent。Redis 的 I/O 模型针对不同系统做了不同的实现,比如 Linux 中的实现是对 epoll 的封装,BSD 中的实现是对 kqueue 的封装。针对 Linux 的实现,我们来看其核心的 ae_epoll.c:
aeApiState 封装了 epoll_event:
typedef struct aeApiState {
int epfd;
struct epoll_event *events;
} aeApiState;
aeApiCreate 用于调用 epoll_create 创建 epoll:
static int aeApiCreate(aeEventLoop *eventLoop) {
aeApiState *state = zmalloc(sizeof(aeApiState));
…
state->epfd = epoll_create(1024); // 1024是内核设置的默认值
…
}
aeApiAddEvent 用于向 epoll 中注册事件:
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
aeApiState *state = eventLoop->apidata;
struct epoll_event ee;
…
ee.data.fd = fd;
if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
return 0;
}
aeApiDelEvent 用于从 epoll 中删除事件:
static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask) {
aeApiState *state = eventLoop->apidata;
struct epoll_event ee;
int mask = eventLoop->events[fd].mask & (~delmask);
ee.events = 0;
if (mask & AE_READABLE) ee.events |= EPOLLIN;
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
ee.data.u64 = 0; /* avoid valgrind warning */
ee.data.fd = fd;
if (mask != AE_NONE) {
epoll_ctl(state->epfd,EPOLL_CTL_MOD,fd,&ee);
} else {
// 注意, Kernel < 2.6.9 EPOLL_CTL_DEL 需要一个非空的事件指针
epoll_ctl(state->epfd,EPOLL_CTL_DEL,fd,&ee);
}
}
aeApiPoll 通过调用 epoll_wait 等待 epoll 事件就绪:
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
int retval, numevents = 0;
retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
if (retval > 0) {
int j;
numevents = retval;
for (j = 0; j < numevents; j++) {
int mask = 0;
struct epoll_event *e = state->events+j;
if (e->events & EPOLLIN) mask |= AE_READABLE;
if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
if (e->events & EPOLLERR) mask |= AE_WRITABLE;
if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
eventLoop->fired[j].fd = e->data.fd;
eventLoop->fired[j].mask = mask;
}
}
return numevents;
}
其中:
aeApiCreate:调用 epoll_create 创建了一个 epoll 池子。
aeApiAddEvent:调用 epoll_ctl 向 epoll 中注册事件。
aeApiPoll:通过调用 epoll_wait 来获取已经响应的事件。
那么这个过程是如何呢?我们来一步一步看 server epoll 初始化过程:
首先在 initServer 函数执行的时候初始化了 epoll:
server.el = aeCreateEventLoop(server.maxclients+REDIS_EVENTLOOP_FDSET_INCR);
接着设置回调函数:
aeSetBeforeSleepProc(server.el,beforeSleep);
再来看主循环中的 aeMain 函数:
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
aeProcessEvents(eventLoop, AE_ALL_EVENTS);
}
}
最后循环调用 aeProcessEvents 来进行事件处理(见图5-10)。
图5-10 Redis epoll 主循和事件的关系
可以看到 eventLoop 会对两类事件进行处理,定时器事件和 file 事件。
最后我们来看 aeProcessEvents 函数:
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
int processed = 0, numevents;
…
aeTimeEvent *shortest = NULL;
struct timeval tv, *tvp;
if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
shortest = aeSearchNearestTimer(eventLoop);
if (shortest) {
long now_sec, now_ms;
aeGetTime(&now_sec, &now_ms);
tvp = &tv;
tvp->tv_sec = shortest->when_sec - now_sec;
if (shortest->when_ms < now_ms) {
tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000;
tvp->tv_sec --;
} else {
tvp->tv_usec = (shortest->when_ms - now_ms)*1000;
}
if (tvp->tv_sec < 0) tvp->tv_sec = 0;
if (tvp->tv_usec < 0) tvp->tv_usec = 0;
} else {
// AE_DONT_WAIT 标志置位,则设置超时时间为0
if (flags & AE_DONT_WAIT) {
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
} else {
// 否则会发生阻塞
tvp = NULL; // 一直等待
}
}
numevents = aeApiPoll(eventLoop, tvp);
for (j = 0; j < numevents; j++) {
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int rfired = 0;
if (fe->mask & mask & AE_READABLE) {
rfired = 1;
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
}
if (fe->mask & mask & AE_WRITABLE) {
if (!rfired || fe->wfileProc != fe->rfileProc)
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
}
processed++;
}
}
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);
return processed; // 返回需要处理的 file/time 事件数量
}
这个函数大致上分为以下几个步骤:
1)aeSearchNearestTimer 查找是否有要优先处理的定时器任务,如果有就先处理。
2)假如没有,则执行 aeApiPoll 来处理 epoll 中的就绪事件,而且是无限等待的哦:
if (flags & AE_DONT_WAIT) {
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
} else {
tvp = NULL;
}
3)处理定时器任务。
最后我们来看一下 Redis 的整体事件处理流程(见图5-11),由于 Redis 本身是单线程的,没有锁的竞争,为了提高处理的吞吐量,Redis 把工作的流程拆成了很多步,每步都是通过 epoll 的机制来回调,这样尽量不让一个请求 hold 住主线程,让系统的吞吐量得到有效的提升。
图5-11 Redis 的整体事件处理流程图
5.6 Nginx 文件异步 I/O
为了提升对 I/O 事件的及时响应速度,Linux 提供了 aio 机制,该机制实现了真正的异步 I/O 响应处理,不像 libc 的 aio 是异步线程伪装的。
因为 Linux 的 aio 对缓存不支持,所以在 Nginx 中,仅仅对读文件做了 aio 的支持。
aio 的使用可以分为以下几个步骤:
1)io_setup:初始化异步 I/O 上下文,类似于 epoll_create。
2)io_submit:注册异步事件和回调 handler。
ngx_epoll_module 在初始化的时候,会先进行 aio 的初始化:
ngx_epoll_aio_init(ngx_cycle_t *cycle, ngx_epoll_conf_t *epcf)
{
int n;
struct epoll_event ee;
#if (NGX_HAVE_SYS_EVENTFD_H)
ngx_eventfd = eventfd(0, 0);
#else
ngx_eventfd = syscall(SYS_eventfd, 0);
#endif
…
n = 1;
if (ioctl(ngx_eventfd, FIONBIO, &n) == -1) {
ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno,
"ioctl(eventfd, FIONBIO) failed");
goto failed;
}
if (io_setup(epcf->aio_requests, &ngx_aio_ctx) == -1) {
ngx_log_error(NGX_LOG_EMERG, cycle->log, ngx_errno,
"io_setup() failed");
goto failed;
}
ngx_eventfd_event.data = &ngx_eventfd_conn;
ngx_eventfd_event.handler = ngx_epoll_eventfd_handler;
ngx_eventfd_event.log = cycle->log;
ngx_eventfd_event.active = 1;
ngx_eventfd_conn.fd = ngx_eventfd;
ngx_eventfd_conn.read = &ngx_eventfd_event;
ngx_eventfd_conn.log = cycle->log;
ee.events = EPOLLIN|EPOLLET;
ee.data.ptr = &ngx_eventfd_conn;
if (epoll_ctl(ep, EPOLL_CTL_ADD, ngx_eventfd, &ee) != -1) {
return;
}
…
}
然后 Nginx 会在读取文件的时候调用 ngx_file_aio_read 函数进行异步读取:
ssize_t
ngx_file_aio_read(ngx_file_t *file, u_char *buf, size_t size, off_t offset,
ngx_pool_t *pool)
{
ngx_err_t err;
struct iocb *piocb[1];
ngx_event_t *ev;
ngx_event_aio_t *aio;
...
aio = file->aio;
ev = &aio->event;
...
if (ev->complete) {
ev->active = 0;
ev->complete = 0;
if (aio->res >= 0) {
ngx_set_errno(0);
return aio->res;
}
...
}
ngx_memzero(&aio->aiocb, sizeof(struct iocb));
aio->aiocb.aio_data = (uint64_t) (uintptr_t) ev;
aio->aiocb.aio_lio_opcode = IOCB_CMD_PREAD;
aio->aiocb.aio_fildes = file->fd;
aio->aiocb.aio_buf = (uint64_t) (uintptr_t) buf;
aio->aiocb.aio_nbytes = size;
aio->aiocb.aio_offset = offset;
aio->aiocb.aio_flags = IOCB_FLAG_RESFD;
aio->aiocb.aio_resfd = ngx_eventfd;
ev->handler = ngx_file_aio_event_handler;
piocb[0] = &aio->aiocb;
if (io_submit(ngx_aio_ctx, 1, piocb) == 1) {
ev->active = 1;
ev->ready = 0;
ev->complete = 0;
return NGX_AGAIN;
}
...
}
5.7 tail 指令为何牛
因为需要采集线上环境的数据,我开发了一个 Java agent 程序来采集相关信息。跑了一段时间后发现一个问题,假如采集数据量压力过大的话,会产生该进程占用 CPU 过高,例如100%以上的情况。
首先来看代理程序的伪代码:
public void run() {
try {
accesslog.seek(accesslog.length());
int i = 0;
while (!Thread.currentThread().isInterrupted()) {
String line = accesslog.readLine();
if (line != null) {
try {
parseLineAndLog(line);
} catch (Exception ex) {
LOGGER.error("parseLineAndLog log error:", ex);
}
try {
if (i++ % 100 == 0) {
Thread.sleep(100);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
} catch (IOException e) {
LOGGER.error("read ngx access log error:", e);
}
}
我们首先通过 top-H-p${pid}观察该进程中的具体哪个线程占用 CPU 比较高。找到后,通过以下指令把 pid 转换成16进制的数据:
awk '{printf("%x",1234)}'
然后我们通过如下命令产生线程堆栈信息:
jstack ${pid} > stack.log
再通过刚才 awk 的16进制进程号查看,发现 jstack 中该进程一直在做如下操作:
String line = accesslog.readLine();
因为上面代码采用的 accesslog 其实是:
this.accesslog = new RandomAccessFile(path, "r");
而它的 readLine()方法是不阻塞的,轮询必然导致 CPU 占用率的提升。
那如何解决呢?我使用了 tail 指令:
Process p = Runtime.getRuntime().exec("tail -n 1 -F " + path);
br = new BufferedReader(new InputStreamReader(p.getInputStream()));
然后在 while 循环中用 br.readline 来解决问题。
上线后再观察,神奇的事情发生了,CPU 占用率始终控制在5%以下。
那么一条 tail 指令为什么能那么神奇呢?我们根据 Linus 大神的指示:从代码中寻找答案。
首先我们找到 tail 的源码:
http:// git.savannah.gnu.org/cgit/coreutils.git/tree/src/tail.c
经过一步一步分析后,我们发现,最终会调用 tail_forever_inotify 函数。
在2.6内核之后,Linux 提供了 inotify 功能,内核通过监控文件系统的变更来反向通知用户,这样减少了轮询的开销。我们来看其实现:
tail_forever_inotify (int wd, struct File_spec *f, size_t n_files,
double sleep_interval)
{
...
f[i].wd = inotify_add_watch (wd, f[i].name, inotify_wd_mask);
...
if (pid)
{
if (writer_is_dead)
exit (EXIT_SUCCESS);
writer_is_dead = (kill (pid, 0) != 0 && errno != EPERM);
struct timeval delay; // 等待文件变化的时间
if (writer_is_dead)
delay.tv_sec = delay.tv_usec = 0;
else
{
delay.tv_sec = (time_t) sleep_interval;
delay.tv_usec = 1000000 * (sleep_interval - delay.tv_sec);
}
fd_set rfd;
FD_ZERO (&rfd);
FD_SET (wd, &rfd);
int file_change = select (wd + 1, &rfd, NULL, NULL, &delay);
if (file_change == 0)
continue;
else if (file_change == -1)
die (EXIT_FAILURE, errno, _("error monitoring inotify event"));
}
...
len = safe_read (wd, evbuf, evlen);
...
所以以上步骤主要分为三步:
1)注册 inotify 的 watch。
2)用 select 等待 watch 事件发生。
3)用 safe_read 读取准备好的数据。
5.8 零拷贝技术应用分析
在常见的 I/O 场景中,都是先通过 read+write(或 send)的方式来完成的,如图5-12所示,read 调用先从用户态切换到内核态,然后从文件中读取了数据,存储到内核的缓冲区中,然后再把数据从内核态缓冲区拷贝到用户态,同时从内核态切换到用户态。
接着用 send 写入到指定文件也是类似的过程,这里存在4次上下文切换和4次缓冲区的拷贝。
图5-12 一次 read/send 的过程
为了优化这个缓冲区拷贝和上下文切换的次数,Linux 提供了几种方案,下面分别介绍。
5.8.1 mmap
假如仅仅是把数据写入到文件,Linux 提供了 mmap 的方式来共享内存虚拟地址空间,这样只要写共享内存就是写文件,读共享内存就是读文件,减少了缓冲区拷贝的次数。
mmap 的实现最终通过 do_mmap 函数来实现:
unsigned long do_mmap(struct file *file, unsigned long addr,
unsigned long len, unsigned long prot,
unsigned long flags, vm_flags_t vm_flags,
unsigned long pgoff, unsigned long *populate)
{
struct mm_struct *mm = current->mm; // 当前进程的 mm
…
if ((prot & PROT_READ) && (current->personality & READ_IMPLIES_EXEC))
// 是否隐藏了可执行属性
if (!(file && path_noexec(&file->f_path)))
prot |= PROT_EXEC;
if (!(flags & MAP_FIXED)) // MAP_FIXED没有设置
addr = round_hint_to_min(addr); // 判断输入的欲映射的起始地址是否小于最小映射地址,如果小于,将 addr 修改为最小地址
len = PAGE_ALIGN(len); // 检测 len 是否越界
…
if ((pgoff + (len >> PAGE_SHIFT)) < pgoff) // 再次检测是否越界
return -EOVERFLOW;
if (mm->map_count > sysctl_max_map_count) // 超过一个进程中对于 mmap 的最大个数限制
return -ENOMEM;
addr = get_unmapped_area(file, addr, len, pgoff, flags); // 获取没有映射的地址(查询 mm 中空闲的内存地址)
…
// 设置 vm_flags,根据传入的 port 和 flags 以及 mm 自己的 flag 来设置
vm_flags |= calc_vm_prot_bits(prot) | calc_vm_flag_bits(flags) |
mm->def_flags | VM_MAYREAD | VM_MAYWRITE | VM_MAYEXEC;
…
if (file) {
struct inode *inode = file_inode(file);
switch (flags & MAP_TYPE) {
case MAP_SHARED:
if ((prot&PROT_WRITE) && !(file->f_mode&FMODE_WRITE))
// file 应该被打开并允许写入
return -EACCES;
if (IS_APPEND(inode) && (file->f_mode & FMODE_WRITE))
// 不能写入一个只允许写追加的文件
return -EACCES;
if (locks_verify_locked(file)) // 文件被强制锁定
return -EAGAIN;
vm_flags |= VM_SHARED | VM_MAYSHARE // 尝试允许其他进程共享
if (!(file->f_mode & FMODE_WRITE)) // 如果 file 不允许写,取消共享
vm_flags &= ~(VM_MAYWRITE | VM_SHARED);
…
}
}
…
addr = mmap_region(file, addr, len, vm_flags, pgoff); // 建立从文件到虚存区间的映射
…
return addr;
}
以上过程最重要的两步是:
1)get_unmapped_area 查询并获取当前进程虚拟地址空间中空闲的没有映射的地址。
2)mmap_region 建立从文件到虚存区间的映射。
最终映射后的关系如图5-13所示。
图5-13 文件和虚拟地址空间映射后的关系
5.8.2 sendfile
假如需要从一个文件读数据,并且写入到另一个文件,mmap 的方式还是会存在2次系统调用4次上下文切换,所以 Linux 又提供了 sendfile 的调用,1次系统调用搞定(见图5-14)。
图5-14 sendfile 调用过程
下面我们来分析一下 sendfile 的实现,sendfile 调用最终会调用 do_sendfile 函数:
static ssize_t do_sendfile(int out_fd, int in_fd, loff_t *ppos,
size_t count, loff_t max)
{
file_start_write(out.file);
retval = do_splice_direct(in.file, &pos, out.file, &out_pos, count, fl);
file_end_write(out.file);
…
return retval;
}
其中最关键的一行是 do_splice_direct:
long do_splice_direct(struct file *in, loff_t *ppos, struct file *out,
loff_t *opos, size_t len, unsigned int flags)
{
struct splice_desc sd = {
.len = len,
.total_len = len,
.flags = flags,
.pos = *ppos,
.u.file = out,
.opos = opos,
};
long ret;
…
ret = splice_direct_to_actor(in, &sd, direct_splice_actor);
if (ret > 0)
*ppos = sd.pos;
return ret;
}
ssize_t splice_direct_to_actor(struct file *in, struct splice_desc *sd,
splice_direct_actor *actor)
{
struct pipe_inode_info *pipe;
long ret, bytes;
umode_t i_mode;
size_t len;
int i, flags, more;
…
pipe = current->splice_pipe;
if (unlikely(!pipe)) {
pipe = alloc_pipe_info();
…
pipe->readers = 1;
current->splice_pipe = pipe;
}
// 进行拼接
ret = 0;
bytes = 0;
len = sd->total_len;
flags = sd->flags;
// 不要在输出的时候阻塞,我们需要清空 direct pipe
sd->flags &= ~SPLICE_F_NONBLOCK;
more = sd->flags & SPLICE_F_MORE;
while (len) {
size_t read_len;
loff_t pos = sd->pos, prev_pos = pos;
ret = do_splice_to(in, &pos, pipe, len, flags);
…
ret = actor(pipe, sd);
if (unlikely(ret <= 0)) {
sd->pos = prev_pos;
goto out_release;
}
bytes += ret;
len -= ret;
sd->pos = pos;
if (ret < read_len) {
sd->pos = prev_pos + ret;
goto out_release;
}
}
done:
pipe->nrbufs = pipe->curbuf = 0;
file_accessed(in);
return bytes;
…
}
在上述代码中,总结起来就三个步骤:
1)alloc_pipe_info 分配 pipe 对象,pipe 其实就是个缓冲区。
2)do_splice_to 把 in 文件的数据读入到缓冲区。
3)actor 把缓冲区的数据读到 out 文件中。
5.8.3 mmap 和 sendfile 在开源软件中的使用
在 MongoDB 中,使用了操作系统底层提供的内存映射机制,即 mmap,数据文件使用 mmap 映射到内存空间进行管理,内存的管理(哪些数据何时换入/换出)完全交给 OS 管理。
MongoDB 对不同操作系统的 MemoryMappedFile 有不同的实现,我们这里针对 Linux 操作系统的实现来分析 MongoDB 中把文件数据映射到进程地址空间的操作:
void* MemoryMappedFile::map(const char *filename, unsigned long long &length, int options) {
setFilename(filename);
FileAllocator::get()->allocateAsap( filename, length );
len = length;
…
unsigned long long filelen = lseek(fd, 0, SEEK_END);
…
lseek( fd, 0, SEEK_SET );
void * view = mmap(NULL, length, PROT_READ|PROT_WRITE, MAP_SHARED, fd, 0);
…
views.push_back( view );
return view;
}
MongoDB 通知操作系统去映射所有数据文件到内存,操作系统使用 mmap()系统调用来完成。从这一点看,数据文件,包括所有的 docments、collections 及其索引,都会被操作系统通过页(page)的方式交换到内存。如果有足够的内存,所有数据文件最终都会加载到内存中。
当内存发生了改变,比如一个写操作,产生的变化将会异步刷新到磁盘,但写操作仍是很快的,直接操作内存。数据量可以适应内存大小,从而达到一个理想状况——对磁盘的操作达到最小量。但是如果数据量超出内存,页面访问错误(page faults)将会悄悄上来,那么系统就会频繁访问内存,读写操作要慢很多。最糟糕的状况是数据量远大于内存,读写不稳定,性能急剧下降。
Kafka 是 Apache 社区下的消息中间件,在 Kafka 上,有两个原因可能导致低效:1)太多的网络请求;2)过多的字节拷贝。为了提高效率,Kafka 把 message 分成一组一组的,每次请求会把一组 message 发给相应的 consumer。此外,为了减少字节拷贝,采用了 sendfile 系统调用。
Kafka 设计了一种“标准字节消息”,Producer、Broker、Consumer 共享这一种消息格式。Kakfa 的消息日志在 broker 端就是一些目录文件,这些日志文件都是 MessageSet 按照这种“标准字节消息”格式写入磁盘的。
维持这种通用的格式对这些操作的优化尤为重要:持久化 log 块的网络传输。流行的 Unix 操作系统提供了一种非常高效的途径来实现页面缓存和 socket 之间的数据传递。在 Linux 操作系统中,这种方式称作:sendfile 系统调用(Java 提供了访问这个系统调用的方法:FileChannel.transferTo api)。
下面我们来分析在 Kafka 中的零拷贝流程。
首先我们来看 kafka 的服务端 socketServer 逻辑:
override def run() {
startupComplete()
while(isRunning) {
try {
// 配置任意一个新的可以用来排队的连接
configureNewConnections()
// 注册一个新的请求用来写
processNewResponses()
try {
selector.poll(300)
} catch {
case...
}
SocketServer 会 poll 队列,一旦对应的 KafkaChannel 写操作准备好了,就会调用 KafkaChannel 的 write 方法:
// KafkaChannel.scala
public Send write() throws IOException {
if (send != null && send(send))
}
// KafkaChannel.scala
private boolean send(Send send) throws IOException {
send.writeTo(transportLayer);
if (send.completed())
transportLayer.removeInterestOps(SelectionKey.OP_WRITE);
return send.completed();
}
其中 write 会调用 send 方法,对应的 Send 对象其实就是上面我们注册的 FetchRes-ponseSend 对象。
这段代码里实际发送数据的代码是 send.writeTo(transportLayer),对应的 writeTo 方法为:
private val sends = new MultiSend(dest, JavaConversions.seqAsJavaList(fetchResponse.
dataGroupedByTopic.toList.map {
case(topic, data) => new TopicDataSend(dest, TopicData(topic,data.map{case
(topicAndPartition, message) => (topicAndPartition.partition, message)}))
}))
override def writeTo(channel: GatheringByteChannel): Long = {
…
written += sends.writeTo(channel)
…
}
这里最后调用了 sends 的 writeTo 方法,而 sends 其实是个 MultiSend。MultiSend 里有两个东西:
topicAndPartition.partition:分区。
message:FetchResponsePartitionData。还记得这个 FetchResponsePartitionData 吗?我们的 MessageSet 就放在了这个对象里。
TopicDataSend 也包含了 sends,该 sends 包含了 PartitionDataSend,而 PartitionDataSend 则包含了 FetchResponsePartitionData。
最后进行 writeTo 的时候,其实是调用了:
// partitionData 就是 FetchResponsePartitionData,messages 就是 FileMessageSet
val bytesSent = partitionData.messages.writeTo(channel, messagesSentSize, messageSize - messagesSentSize)
FileMessageSet 也有个 writeTo 方法,就是我们之前已经提到过的那段代码:
def writeTo(destChannel: GatheringByteChannel, writePosition: Long, size: Int): Int = {
...
val bytesTransferred = (destChannel match {
case tl: TransportLayer => tl.transferFrom(channel, position, count)
case dc => channel.transferTo(position, count, dc)
}).toInt
bytesTransferred
}
最后通过 tl.transferFrom(channel,position,count)来完成最后的数据发送的。trans-ferFrom 其实是 Kafka 自己封装的一个方法,最终里面调用的也是 transerTo:
public long transferFrom(FileChannel fileChannel, long position, long count) throws IOException {
return fileChannel.transferTo(position, count, socketChannel);
}
5.9 本章小结
我们编写的应用程序或多或少都会涉及 I/O,比如读写数据库、读写网络等,总会遇到很多问题,例如在高并发场景下,如何编写高性能的服务端和客户端程序。对 I/O 的理解是否深入,关系到写出来的应用对性能的影响程度。
从狭义的角度来讲,I/O 就是 in 和 out 两条输入输出的汇编指令。但是从广义角度讲,I/O 可以涉及操作系统整个 I/O 模型的构建,系统从分层的角度,将数据从写文件开始最终转换成数据块并落入磁盘。从更深入的视角看,会涉及 epoll 这样的 I/O 多路复用模型。
因此,只有从操作系统的角度来理解 I/O,才能真正编写出高性能的应用程序。