Redis源码笔记--非阻塞connect

看到Redis源码中主从复制的源码,对某些逻辑不是很确定。梳理了Redis非阻塞connect的大概实现之后,自己写了一个简单的版本。

主要流程:

  • 创建非阻塞socket,socket(...., SOCK_NONBLOCK, ...)
  • 检查connect(fd, ...)返回是否为0
  • 如果为-1,检查errno是否为EINPROGRESS,如果connect失败且错误不为EINPROGRESS,返回错误。
  • 返回fd,并利用IO多路复用阻塞,监听POLLOUT事件。
  • getsockopt(fd, SOL_SOCKET, SO_ERROR, ...)检查socket状态
  • 成功
#include <arpa/inet.h>
#include <assert.h>
#include <errno.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <strings.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>

#define AF_ERR -1
#define MAX_EVENTS 1024

static void epoll_ctl_add(int epfd, int fd, int evts) {
    struct epoll_event ev;
    ev.events = evts;
    ev.data.fd = fd;
    int err = epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev);
    assert(!err);
}

/* check socket status*/
void connectionEstablished(int fd) {
    int sockerr = 0;
    socklen_t errlen = sizeof(sockerr);
    getsockopt(fd, SOL_SOCKET, SO_ERROR, &sockerr, &errlen);
    assert(sockerr == 0);
    printf("connection done.\n");
}

void handle_events(struct epoll_event* e, int epfd) {
    printf("events %d: ", e->data.fd);
    if (e->events & EPOLLOUT) {
        printf("EPOLLOUT ");
        connectionEstablished(e->data.fd);
    }
}

/* non-blocking-connect */
int connect(const char* ip, int port) {
    struct sockaddr_in address;
    bzero(&address, sizeof(address));
    address.sin_family = AF_INET;
    inet_pton(AF_INET, ip, &address.sin_addr);
    address.sin_port = htons(port);
    int s = socket(PF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);
    if (connect(s, (struct sockaddr*)&address, sizeof(address)) == -1) {
        if (errno == EINPROGRESS) {
            goto end;;
        }
        close(s);
        s = AF_ERR;
    }
end:
    return s;
}


int main() {
    int fd = connect("127.0.0.1", 8888);
    if (fd == -1) {
        printf("connect failed\n");
        return 1;
    }
    int epfd;
    struct epoll_event events[MAX_EVENTS];
    epfd = epoll_create1(0);
    assert(epfd != -1);
    epoll_ctl_add(epfd, fd, EPOLLOUT);
    int n = epoll_wait(epfd, events, MAX_EVENTS, -1);
    assert(n != -1);
    for (int i = 0; i < n; i++) {
        handle_events(&events[i], epfd);
    }
    close(fd);
    return 0;
}

使用nc -l 8888当服务端,测试发现确实是可以通过监听POLLOUT事件来判断connect成功的

Redis源码:

// src

int connectWithMaster(void) {
    int fd;

    /* 从服务器作为client,执行connect(2)连接到master */
    fd = anetTcpNonBlockBindConnect(NULL,
        server.masterhost,server.masterport,REDIS_BIND_ADDR);
    if (fd == -1) {
        redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
            strerror(errno));
        return REDIS_ERR;
    }

    /* 监听读写事件,设置事件处理回调函数为syncWithMaster */
    if (aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL) ==
            AE_ERR)
    {
        close(fd);
        redisLog(REDIS_WARNING,"Can't create readable event for SYNC");
        return REDIS_ERR;
    }

    server.repl_transfer_lastio = server.unixtime;
    server.repl_transfer_s = fd;
    server.repl_state = REDIS_REPL_CONNECTING;
    return REDIS_OK;
}



void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
    char tmpfile[256], *err;
    int dfd, maxtries = 5;
    int sockerr = 0, psync_result;
    socklen_t errlen = sizeof(sockerr);
    REDIS_NOTUSED(el);
    REDIS_NOTUSED(privdata);
    REDIS_NOTUSED(mask);

    /* If this event fired after the user turned the instance into a master
     * with SLAVEOF NO ONE we must just return ASAP. */
    if (server.repl_state == REDIS_REPL_NONE) {
        close(fd);
        return;
    }

    /* Check for errors in the socket. */
    /* 检查socket状态 */
    if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &sockerr, &errlen) == -1)
        sockerr = errno;
    if (sockerr) {
        aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE);
        redisLog(REDIS_WARNING,"Error condition on socket for SYNC: %s",
            strerror(sockerr));
        goto error;
    }

    /* If we were connecting, it's time to send a non blocking PING, we want to
     * make sure the master is able to reply before going into the actual
     * replication process where we have long timeouts in the order of
     * seconds (in the meantime the slave would block). */
    /* 建立连接后首先给master发PING,确保两端读写正常和master可以正确处理命令
       因为从服务器注册了RD and WR,而非阻塞connect(2)会触发EPOLLOUT,所以会执行第一步
     */
    if (server.repl_state == REDIS_REPL_CONNECTING) {
        redisLog(REDIS_NOTICE,"Non blocking connect for SYNC fired the event.");
        /* Delete the writable event so that the readable event remains
         * registered and we can wait for the PONG reply. */
        /* 这一步之后WR事件就可以取消 */
        aeDeleteFileEvent(server.el,fd,AE_WRITABLE);
        server.repl_state = REDIS_REPL_RECEIVE_PONG;
        /* Send the PING, don't check for errors at all, we have the timeout
         * that will take care about this. */
        syncWrite(fd,"PING\r\n",6,100);
        return;
    }
	
 	/* 对读事件的监听 */
    /* Receive the PONG command. */
    if (server.repl_state == REDIS_REPL_RECEIVE_PONG) {
        char buf[1024];
        // ...
    }
}

继续看一下Redis非阻塞IO的实现:

// src/netdb.h

#define ANET_CONNECT_NONE 0
#define ANET_CONNECT_NONBLOCK 1
static int anetTcpGenericConnect(char *err, char *addr, int port,
                                 char *source_addr, int flags)
{
    // ...
    for (p = servinfo; p != NULL; p = p->ai_next) {
        if (connect(s,p->ai_addr,p->ai_addrlen) == -1) {
            /* If the socket is non-blocking, it is ok for connect() to
                 * return an EINPROGRESS error here. */
            if (errno == EINPROGRESS && flags & ANET_CONNECT_NONBLOCK)
                goto end;
            close(s);
            s = ANET_ERR;
            continue;
    	}
        goto end;
    // ...

上一篇:屏幕增强——IW31/32/33工单抬头屏幕增强


下一篇:Transformer课程 第32章Transformer模型FNet架构