Windows下libevent C++封装类实现

1. Libevent介绍

[*]libevent是一个异步事件处理软件函式库,以BSD许可证发布。libevent提供了一组应用程序编程接口(API),让程序员可以设定某些事件发生时所执行的回调函数,也就是说,libevent可以用来取代网络服务器所使用的事件循环检查框架。由于可以省去对网络的处理,且拥有不错的效能,有些软件使用libevent作为网络底层的函式库,如:memcached、Tor。


[libevent官网介绍]

libevent API提供了一种机制,用于在文件描述符上发生特定事件或达到超时后执行回调函数。此外,libevent还支持基于信号或常规超时的回调。

libevent旨在替换在事件驱动的网络服务器中的事件循环机制。应用程序只需要调用event_dispatch()接口,然后动态添加或删除事件,而不必更改事件循环。

目前,libevent支持 /dev/poll, kqueue, event ports, POSIX select, Windows select(), poll 和 epoll。 其内在的事件机制是完全独立于外在的事件API ,并且libevent的简单更新可以提供新的功能,而无需重新设计应用程序。因此:

1)Libevent允许便携式应用程序开发,并提供操作系统上可用的最可扩展的事件通知机制。

2)Libevent也可以用于多线程应用程序,通过隔离每个event_base,以便只有一个线程访问它,或通过锁定访问单个共享的event_base。 Libevent可以在Linux,* BSD,Mac OS X,Solaris,Windows等上编译。

3)Libevent还为缓冲网络IO提供了复杂的框架,支持套接字,过滤器,速率限制,SSL,零拷贝文件传输和IOCP。

4)Libevent包括对几个有用的协议的支持,包括DNS,HTTP和最小的RPC框架。


以下关于阻塞、非阻塞,同步、异步,大牛陈硕的经典回复。

在处理 IO 的时候,阻塞和非阻塞都是同步 IO。

只有使用了特殊的 API 才是异步 IO。

Windows下libevent C++封装类实现

2.为什么要使用封装好的网络库?

[陈硕]网络编程是什么?是熟练使用Sockets API吗?说实话,在实际项目里我只用过两次Sockets API,其他时候都是使用封装好的网络库。

使用封装好的网络库如libevent, muduo网络库 目的之一就是想让日常的网络编程从Sockets API的琐碎细节中解脱出来,让程序员专注于业务逻辑,把时间用在刀刃上。 程序员的主要工作是在事件处理函数中实现业务逻辑,而不是和Sockets API较劲。

陈硕认为网络编程也可以分为三个层次:

1). 读过教程和文档

2). 熟悉本系统TCP/IP协议栈的脾气

3). 自己写过一个简单的TCP/IP stack


陈硕认为TCP网络编程有三个例子最值得学习研究,分别是echo、chat、proxy,都是长连接协议。

Echo的作用:熟悉服务端被动接受新连接、收发数据、被动处理连接断开。每个连接是独立服务的,连接之间没有关联。在消息内容方面Echo有一些变种:比如做成一问一答的方式,收到的请求和发送响应的内容不一样,这时候要考虑打包与拆包格式的设计,进一步还可以写简单的HTTP服务。


3.Libevent通信核心

服务端核心步骤简化如下:


步骤1:设置sockfd为nonblocking;

步骤2:使用bufferevent_socket_new创建一个struct bufferevent *bev,关联该sockfd,托管给event_base;

步骤3:使用bufferevent_setcb(bev, read_cb, write_cb, error_cb, (void *)arg)将EV_READ/EV_WRITE对应的函数;

步骤4:使用bufferevent_enable(bev, EV_READ|EV_WRITE|EV_PERSIST)来启动read/write事件;

其中,在read_cb里面从input读取数据,处理完毕后塞到output中,会自动写入到sockfd;

在write_cb里面(对于一个echo server来说,read_cb足够了)

在error_cb里面处理遇到的错误。使用bufferevent_set_timeout(bev, struct timeval *READ, struct timeval *WRITE)来设置读写超时,在error_cb里面也可以处理超时。

可以使用bev中libevent的API提取出event_base,sockfd,input/output等相关数据。

客户端的操作步骤详见代码,提炼即可。


4.C++封装的libevent Echo类

分为:服务端YuLibeventServer类和客户端YuLibeventClient类。


//服务端核心代码如下参考地址:


#include "YuLibeventServer.h"

/*

**@author: laoyang360

**@date: 20161211

**@brief: The server of SimLibeventClient

*/


static int s_iBlockSize = 10;

#define MAX_LINE 1024

YuLibeventServer *YuLibeventServer::pThis = NULL;


YuLibeventServer::YuLibeventServer()

{

pThis = this; //将this指针赋给pThis,使得回调函数能通过pThis指针访问本对象

m_pBase = NULL;

m_pListener = NULL;

m_pEvstop = NULL;


}


YuLibeventServer::~YuLibeventServer()

{


}


/*

**@author: laoyang360

**@date: 20161211

**@param: evutil_socket_t fd

**@brief: 设置非阻塞,禁止Nagle算法。

*/

void YuLibeventServer::set_tcp_no_delay(evutil_socket_t fd)

{

int iOne = 1;

setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (const char*)&iOne, sizeof iOne);

}


/*

**@author: laoyang360

**@date: 20161211

**@param: evutil_socket_t fd

**@brief: 等待接受客户端连接处理accept、一个新客户端连接上服务器了

*/

void YuLibeventServer::accept_conn_cb(evconnlistener *listener, evutil_socket_t fd,

struct sockaddr *sock, int socklen, void *arg)

{

printf("We got a new connection! Set up a bufferevent for it. accept a client %d\n", fd);


event_base *base = evconnlistener_get_base(listener);


//为这个客户端分配一个bufferevent

bufferevent *bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE);


set_tcp_no_delay(fd);


bufferevent_setcb(bev, echo_read_cb, NULL, echo_event_cb, NULL);

bufferevent_enable(bev, EV_READ | EV_WRITE);


}


/*

**@author: laoyang360

**@date: 20161211

**@param: bufferevent *bev, void *arg

**@brief: echo_read_cb回调接口

*/

void YuLibeventServer::echo_read_cb(bufferevent *bev, void *arg)

{


char msg[MAX_LINE + 1] = {0};

int iLen = 0;

evutil_socket_t fd = bufferevent_getfd(bev);

while (iLen = bufferevent_read(bev, msg, sizeof(msg)-1 ), iLen > 0)

{

msg[iLen] = '\0';

printf("fd=%u, read len = %d\t read msg: %s\n", fd, iLen, msg);

int iRst = bufferevent_write(bev, msg, iLen);

if (-1 == iRst)

{

printf("[socket_write_cb]:error occur!\n");

}

}


/*

char reply[] = "[server: i'm server, send 1111]";

printf("writecb: len = %d\n", 1 + strlen(reply));

int iRst = bufferevent_write(bev, reply, 1 + strlen(reply));

if (-1 == iRst)

{

printf("[socket_write_cb]:error occur!\n");

}

*/

/*This callback is invoked when there is data to read on bev */

//struct evbuffer *input = bufferevent_get_input(bev);

//struct evbuffer *output = bufferevent_get_output(bev);

/*把input buffer中的所有数据 拷贝到 output buffer*/

//evbuffer_add_buffer(output, input);


}


/*

**@author: laoyang360

**@date: 20161211

**@param: bufferevent *bev, void *arg

**@brief: socket_write_cb回调接口,暂时未使用

*/

void YuLibeventServer::socket_write_cb(bufferevent *bev, void *arg)

{

/*

char reply[] = "[server: i'm server, send 1111]";

printf("writecb: len = %d\n", 1 + strlen(reply));

int iRst = bufferevent_write(bev, reply, 1 + strlen(reply));

if (-1 == iRst)

{

printf("[socket_write_cb]:error occur!\n");

}

*/

}


/*

**@author: laoyang360

**@date: 20161211

**@param: bufferevent *bev, short events, void *arg

**@brief: echo_event_cb事件处理或异常处理

*/

void YuLibeventServer::echo_event_cb(bufferevent *bev, short events, void *arg)

{

struct evbuffer *output = bufferevent_get_output(bev);

size_t remain = evbuffer_get_length(output);


if (events & BEV_EVENT_TIMEOUT)

{

printf("Timed out\n"); //if bufferevent_set_timeouts() called.

}

else if (events & BEV_EVENT_EOF)

{

printf("connection closed, remain %d\n", remain);

}

else if (events & BEV_EVENT_ERROR)

{

printf("some other error, remain %d\n", remain);

}

//这将自动close套接字和free读写缓冲区

bufferevent_free(bev);

}


/*

**@author: laoyang360

**@date: 20161211

**@param: bufferevent *bev, short events, void *arg

**@brief: signal_cb停止信号处理

*/

void YuLibeventServer::signal_cb(evutil_socket_t sig, short events, void *arg)

{

struct event_base *base = (event_base *)arg;

printf("exception: interrupt, stop now!\n");


event_base_loopexit(base, NULL);

}


/*

**@author: laoyang360

**@date: 20161211

**@param: int port, 传入端口。

**@brief: libevent,socket初始化等

*/

void YuLibeventServer::init(int port)

{

WSADATA wsaData;

DWORD Ret;

if ((Ret = WSAStartup(MAKEWORD(2, 2), &wsaData)) != 0)

{

printf("WSAStartup failed with error %d\n", Ret);

exit(1);

}


m_pBase = event_base_new();

if (NULL == m_pBase)

{

printf("couldn't open event base!\n");

exit(1);

}


m_pEvstop = evsignal_new(m_pBase, SIGINT, signal_cb, m_pBase);

evsignal_add(m_pEvstop, NULL);


struct sockaddr_in sin;

memset(&sin, 0, sizeof(struct sockaddr_in));

sin.sin_family = AF_INET;

sin.sin_port = htons(port);


m_pListener = evconnlistener_new_bind(m_pBase, accept_conn_cb, NULL,

LEV_OPT_REUSEABLE|LEV_OPT_CLOSE_ON_FREE,

-1, (struct sockaddr*)&sin,

sizeof(struct sockaddr_in));


if (NULL == m_pListener)

{

printf("couldn't create listener!\n");

exit(1);

}

}


/*

**@author: laoyang360

**@date: 20161211

**@param: 无

**@brief: 启动,循环执行

*/

void YuLibeventServer::start()

{

event_base_dispatch(m_pBase);

}


/*

**@author: laoyang360

**@date: 20161211

**@param: 无

**@brief: 停止

*/

void YuLibeventServer::stop()

{

if (NULL != m_pListener)

{

evconnlistener_free(m_pListener);

}

if (NULL != m_pEvstop)

{

event_free(m_pEvstop);

}

if (NULL != m_pBase)

{

event_base_free(m_pBase);

}

}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216

217

218

219

220

221

222

223

224

225

226

227

228

229

230

231

232

233

234

235

//客户端核心代码参考地址:


#include "yuLibEventClient.h"


/*

**@author: laoyang360

**@date: 20161211

**@brief: The client of SimLibeventClient

*/


YuLibeventClient *YuLibeventClient::pThis = NULL;

const static char* s_serverIpAddr = "127.0.0.1";

const static int s_iBlockSize = 10;

const static long s_iTimeOut = 10; //超时时间

const static int s_iSessionCnt = 10;

int YuLibeventClient::m_siLtotal_bytes_read = 0;

int YuLibeventClient::m_siLtotal_messages_read = 0;


YuLibeventClient::YuLibeventClient()

{

pThis = this; //将this指针赋给pThis,使得回调函数能通过pThis指针访问本对象

m_pBase = NULL;

m_pListener = NULL;

m_pszMsg = NULL;

m_evtimeout = NULL;

m_bevs = NULL;

}


YuLibeventClient::~YuLibeventClient()

{


}


/*

**@author: laoyang360

**@date: 20161211

**@param: evutil_socket_t fd

**@brief: 设置非阻塞,禁止Nagle算法。

*/

void YuLibeventClient::set_tcp_no_delay(evutil_socket_t fd)

{

int iOne = 1;

setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (const char*)&iOne, sizeof iOne);

}


/*

**@author: laoyang360

**@date: 20161211

**@param: evutil_socket_t fd, short events, void *arg

**@brief: 超时回调函数。

*/

void YuLibeventClient::timeoutcb(evutil_socket_t fd, short events, void *arg)

{

struct event_base *base = (event_base*)arg;

printf("timeout...\n");


event_base_loopexit(base, NULL);

}


/*

**@author: laoyang360

**@date: 20161211

**@param: int fd, short events, void* arg

**@brief: 暂时未使用

*/

void YuLibeventClient::cmd_msg_cb(int fd, short events, void* arg)

{

printf("server_msg_cb ing....\n");

struct bufferevent* bev = (struct bufferevent*)arg;


char msg[1024] = "testlaoyang20161210";

int iLen = 1 + strlen(msg);

/*int iLen = bufferevent_read(bev, msg, sizeof(msg));

if (0 == iLen)

{

printf("recv message empty.\n");

exit(1);

}*/


//把终端的消息发送给服务器端

bufferevent_write(bev, msg, iLen);

}


/*

**@author: laoyang360

**@date: 20161211

**@param: struct bufferevent* bev, void* arg

**@brief: writecb回调函数,暂时未使用

*/

void YuLibeventClient::writecb(struct bufferevent* bev, void* arg)

{

/*

printf("send_server_cb running....\n");


char szSendMsg[1024] = "[writecb: i'am client]";

int iLen = 1 + strlen(szSendMsg);

printf("iLen = %d\n", iLen);

//把终端的消息发送给服务器端

bufferevent_write(bev, szSendMsg, iLen);

*/

}


/*

**@author: laoyang360

**@date: 20161211

**@param: struct bufferevent* bev, void* arg

**@brief: readcb回调函数,接收处理回调接口。

*/

void YuLibeventClient::readcb(struct bufferevent* bev, void* arg)

{

char szRecvMsg[1024] = {0};

int len = bufferevent_read(bev, szRecvMsg, sizeof(szRecvMsg));

szRecvMsg[len] = '\0';

printf("recv from server: cnt = %d, len = %d, msg = %s\n", m_siLtotal_messages_read, len, szRecvMsg);


++m_siLtotal_messages_read;

m_siLtotal_bytes_read += len;


//把终端的消息发送给服务器端

bufferevent_write(bev, szRecvMsg, len);


//以下是chenshuo的使用方法

/*This callback is invoked when there is data to read on bev @by chenshuo below */

//struct evbuffer *input = bufferevent_get_input(bev);

//struct evbuffer *output = bufferevent_get_output(bev);

//++m_siLtotal_messages_read;

//m_siLtotal_bytes_read += evbuffer_get_length(input);

//evbuffer_add_buffer(output, input);


}


/*

**@author: laoyang360

**@date: 20161211

**@param: struct bufferevent *bev, short event, void *arg

**@brief: eventcb回调函数,事件或出错处理回调接口。

*/

void YuLibeventClient::eventcb(struct bufferevent *bev, short event, void *arg)

{


if (event & BEV_EVENT_EOF)

{

printf("connection closed\n");

}

else if (event & BEV_EVENT_ERROR)

{

printf("some other error\n");

}

else if( event & BEV_EVENT_CONNECTED)

{

printf("the client has connected to server\n");

evutil_socket_t fd = bufferevent_getfd(bev);

set_tcp_no_delay(fd);

}

}


/*

**@author: laoyang360

**@date: 20161211

**@param: int iPort, 传入端口。

**@brief: libevent,socket初始化等

*/

void YuLibeventClient::init(int iPort)

{

WSADATA wsaData;

DWORD Ret;

if ((Ret = WSAStartup(MAKEWORD(2, 2), &wsaData)) != 0)

{

printf("WSAStartup failed with error %d\n", Ret);

exit(1);

}


m_timeout.tv_sec = s_iTimeOut; //60s超时

m_timeout.tv_usec = 0;


m_pszMsg = (char*)malloc(1 + s_iBlockSize);

memset(m_pszMsg, 0, s_iBlockSize);

for (int i = 0; i < s_iBlockSize; ++i)

{

m_pszMsg[i] = 't'; /*i%128;*/

}

m_pszMsg[s_iBlockSize] = '\0';

//printf("m_pszMsg = %s\n", m_pszMsg);


m_pBase = event_base_new();

if (!m_pBase)

{

printf("Couldn't open event base!\n");

exit(1);

}


//设定超时

m_evtimeout = evtimer_new(m_pBase, timeoutcb, m_pBase);

evtimer_add(m_evtimeout, &m_timeout);


struct sockaddr_in server_addr;

memset(&server_addr, 0, sizeof(server_addr) );

server_addr.sin_family = AF_INET;

server_addr.sin_port = htons(iPort);

server_addr.sin_addr.s_addr = inet_addr(s_serverIpAddr);


m_bevs = (bufferevent**)malloc(s_iSessionCnt * sizeof(struct bufferevent *));

for (int i=0; i < s_iSessionCnt; ++i)

{

struct bufferevent* bev = bufferevent_socket_new(m_pBase, -1, BEV_OPT_CLOSE_ON_FREE);

bufferevent_setcb(bev, readcb, NULL, eventcb, NULL);

bufferevent_enable(bev, EV_READ | EV_WRITE);


evbuffer_add(bufferevent_get_output(bev), m_pszMsg, s_iBlockSize);


if (bufferevent_socket_connect(bev, (struct sockaddr *)&server_addr,

sizeof(server_addr)) < 0)

{

printf("Error starting connection!\n");

bufferevent_free(bev);

exit(1);

}

m_bevs[i] = bev;

}


}


/*

**@author: laoyang360

**@date: 20161211

**@param: 无

**@brief: 启动,循环执行

*/

void YuLibeventClient::start()

{

event_base_dispatch(m_pBase);

}


/*

**@author: laoyang360

**@date: 20161211

**@param: 无

**@brief: 停止,内存等释放&结果统计

*/

void YuLibeventClient::stop()

{

//evconnlistener_free(m_pListener);

if (NULL != m_pBase)

{

event_base_free(m_pBase);

}


for (int i = 0; i < s_iSessionCnt; ++i)

{

if (NULL != m_bevs[i])

{

bufferevent_free(m_bevs[i]);

}

}


if (NULL != m_bevs)

{

free(m_bevs);

}


if (NULL != m_pszMsg)

{

free(m_pszMsg);

}


printf("%d total bytes read\n", m_siLtotal_bytes_read);

printf("%d total messages read\n", m_siLtotal_messages_read);

printf("%.3f average messages size read\n", (double)m_siLtotal_bytes_read/m_siLtotal_messages_read);

printf("%.3f MiB/s throughtput\n", (double)m_siLtotal_bytes_read/(m_timeout.tv_sec * 1024 * 1024));


}

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

133

134

135

136

137

138

139

140

141

142

143

144

145

146

147

148

149

150

151

152

153

154

155

156

157

158

159

160

161

162

163

164

165

166

167

168

169

170

171

172

173

174

175

176

177

178

179

180

181

182

183

184

185

186

187

188

189

190

191

192

193

194

195

196

197

198

199

200

201

202

203

204

205

206

207

208

209

210

211

212

213

214

215

216

217

218

219

220

221

222

223

224

225

226

227

228

229

230

231

232

233

234

235

236

237

238

239

240

241

242

243

244

245

246

247

248

249

250

251

252

253

254

255

256

257

258

259

260

261

262

263

264

265

266

267

268

269

4.测试效果如下Windows下libevent C++封装类实现

5.源码包下载

http://download.csdn.net/detail/wojiushiwo987/9708418


后记:

项目中用到了libevent,但我自己一直没有总结这块,从去年开始到现在这个想法持续了一年,总算了了心愿。

代码对大牛陈硕的C的测试代码进行了C++的封装、测试、验证。向大牛的钻研精神和毅力学习和致敬!


参考:

http://blog.csdn.net/solstice/article/details/6527585

https://github.com/chenshuo/recipes/tree/master/pingpong/libevent

http://blog.csdn.net/funkri/article/details/9352955

http://blog.csdn.net/laoyang360/article/details/8675922

上一篇:Pi(树莓派/香蕉派/NanoPi/香橙派)Ubuntu换KDE桌面


下一篇:Cannot load JDBC driver class 'oracle.jdbc.OracleDriver'