看到Redis源码中主从复制的源码,对某些逻辑不是很确定。梳理了Redis非阻塞connect的大概实现之后,自己写了一个简单的版本。
主要流程:
- 创建非阻塞socket,socket(...., SOCK_NONBLOCK, ...)
- 检查connect(fd, ...)返回是否为0
- 如果为-1,检查errno是否为EINPROGRESS,如果connect失败且错误不为EINPROGRESS,返回错误。
- 返回fd,并利用IO多路复用阻塞,监听POLLOUT事件。
- getsockopt(fd, SOL_SOCKET, SO_ERROR, ...)检查socket状态
- 成功
#include <arpa/inet.h>
#include <assert.h>
#include <errno.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <strings.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>
#define AF_ERR -1
#define MAX_EVENTS 1024
static void epoll_ctl_add(int epfd, int fd, int evts) {
struct epoll_event ev;
ev.events = evts;
ev.data.fd = fd;
int err = epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev);
assert(!err);
}
/* check socket status*/
void connectionEstablished(int fd) {
int sockerr = 0;
socklen_t errlen = sizeof(sockerr);
getsockopt(fd, SOL_SOCKET, SO_ERROR, &sockerr, &errlen);
assert(sockerr == 0);
printf("connection done.\n");
}
void handle_events(struct epoll_event* e, int epfd) {
printf("events %d: ", e->data.fd);
if (e->events & EPOLLOUT) {
printf("EPOLLOUT ");
connectionEstablished(e->data.fd);
}
}
/* non-blocking-connect */
int connect(const char* ip, int port) {
struct sockaddr_in address;
bzero(&address, sizeof(address));
address.sin_family = AF_INET;
inet_pton(AF_INET, ip, &address.sin_addr);
address.sin_port = htons(port);
int s = socket(PF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);
if (connect(s, (struct sockaddr*)&address, sizeof(address)) == -1) {
if (errno == EINPROGRESS) {
goto end;;
}
close(s);
s = AF_ERR;
}
end:
return s;
}
int main() {
int fd = connect("127.0.0.1", 8888);
if (fd == -1) {
printf("connect failed\n");
return 1;
}
int epfd;
struct epoll_event events[MAX_EVENTS];
epfd = epoll_create1(0);
assert(epfd != -1);
epoll_ctl_add(epfd, fd, EPOLLOUT);
int n = epoll_wait(epfd, events, MAX_EVENTS, -1);
assert(n != -1);
for (int i = 0; i < n; i++) {
handle_events(&events[i], epfd);
}
close(fd);
return 0;
}
使用nc -l 8888当服务端,测试发现确实是可以通过监听POLLOUT事件来判断connect成功的
Redis源码:
// src
int connectWithMaster(void) {
int fd;
/* 从服务器作为client,执行connect(2)连接到master */
fd = anetTcpNonBlockBindConnect(NULL,
server.masterhost,server.masterport,REDIS_BIND_ADDR);
if (fd == -1) {
redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
strerror(errno));
return REDIS_ERR;
}
/* 监听读写事件,设置事件处理回调函数为syncWithMaster */
if (aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL) ==
AE_ERR)
{
close(fd);
redisLog(REDIS_WARNING,"Can't create readable event for SYNC");
return REDIS_ERR;
}
server.repl_transfer_lastio = server.unixtime;
server.repl_transfer_s = fd;
server.repl_state = REDIS_REPL_CONNECTING;
return REDIS_OK;
}
void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
char tmpfile[256], *err;
int dfd, maxtries = 5;
int sockerr = 0, psync_result;
socklen_t errlen = sizeof(sockerr);
REDIS_NOTUSED(el);
REDIS_NOTUSED(privdata);
REDIS_NOTUSED(mask);
/* If this event fired after the user turned the instance into a master
* with SLAVEOF NO ONE we must just return ASAP. */
if (server.repl_state == REDIS_REPL_NONE) {
close(fd);
return;
}
/* Check for errors in the socket. */
/* 检查socket状态 */
if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &sockerr, &errlen) == -1)
sockerr = errno;
if (sockerr) {
aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE);
redisLog(REDIS_WARNING,"Error condition on socket for SYNC: %s",
strerror(sockerr));
goto error;
}
/* If we were connecting, it's time to send a non blocking PING, we want to
* make sure the master is able to reply before going into the actual
* replication process where we have long timeouts in the order of
* seconds (in the meantime the slave would block). */
/* 建立连接后首先给master发PING,确保两端读写正常和master可以正确处理命令
因为从服务器注册了RD and WR,而非阻塞connect(2)会触发EPOLLOUT,所以会执行第一步
*/
if (server.repl_state == REDIS_REPL_CONNECTING) {
redisLog(REDIS_NOTICE,"Non blocking connect for SYNC fired the event.");
/* Delete the writable event so that the readable event remains
* registered and we can wait for the PONG reply. */
/* 这一步之后WR事件就可以取消 */
aeDeleteFileEvent(server.el,fd,AE_WRITABLE);
server.repl_state = REDIS_REPL_RECEIVE_PONG;
/* Send the PING, don't check for errors at all, we have the timeout
* that will take care about this. */
syncWrite(fd,"PING\r\n",6,100);
return;
}
/* 对读事件的监听 */
/* Receive the PONG command. */
if (server.repl_state == REDIS_REPL_RECEIVE_PONG) {
char buf[1024];
// ...
}
}
继续看一下Redis非阻塞IO的实现:
// src/netdb.h
#define ANET_CONNECT_NONE 0
#define ANET_CONNECT_NONBLOCK 1
static int anetTcpGenericConnect(char *err, char *addr, int port,
char *source_addr, int flags)
{
// ...
for (p = servinfo; p != NULL; p = p->ai_next) {
if (connect(s,p->ai_addr,p->ai_addrlen) == -1) {
/* If the socket is non-blocking, it is ok for connect() to
* return an EINPROGRESS error here. */
if (errno == EINPROGRESS && flags & ANET_CONNECT_NONBLOCK)
goto end;
close(s);
s = ANET_ERR;
continue;
}
goto end;
// ...