linux下使用hiredis异步API实现sub/pub消息订阅和发布的功能

原文链接:https://blog.csdn.net/chenzba/article/details/51224715

 

转自:https://blog.csdn.net/chenzba/article/details/51224715

 

       最近使用redis的c接口——hiredis,使客户端与redis服务器通信,实现消息订阅和发布(PUB/SUB)的功能,我把遇到的一些问题和解决方法列出来供大家学习。

       废话不多说,先贴代码。

 

redis_publisher.h


 
  1. /*************************************************************************

  2. > File Name: redis_publisher.h

  3. > Author: chenzengba

  4. > Mail: chenzengba@gmail.com

  5. > Created Time: Sat 23 Apr 2016 10:15:09 PM CST

  6. > Description: 封装hiredis,实现消息发布给redis功能

  7. ************************************************************************/

  8.  
  9. #ifndef REDIS_PUBLISHER_H

  10. #define REDIS_PUBLISHER_H

  11.  
  12. #include <stdlib.h>

  13. #include <hiredis/async.h>

  14. #include <hiredis/adapters/libevent.h>

  15. #include <string>

  16. #include <vector>

  17. #include <unistd.h>

  18. #include <pthread.h>

  19. #include <semaphore.h>

  20. #include <boost/tr1/functional.hpp>

  21.  
  22. class CRedisPublisher

  23. {

  24. public:

  25. CRedisPublisher();

  26. ~CRedisPublisher();

  27.  
  28. bool init();

  29. bool uninit();

  30. bool connect();

  31. bool disconnect();

  32.  
  33. bool publish(const std::string &channel_name,

  34. const std::string &message);

  35.  
  36. private:

  37. // 下面三个回调函数供redis服务调用

  38. // 连接回调

  39. static void connect_callback(const redisAsyncContext *redis_context,

  40. int status);

  41.  
  42. // 断开连接的回调

  43. static void disconnect_callback(const redisAsyncContext *redis_context,

  44. int status);

  45.  
  46. // 执行命令回调

  47. static void command_callback(redisAsyncContext *redis_context,

  48. void *reply, void *privdata);

  49.  
  50. // 事件分发线程函数

  51. static void *event_thread(void *data);

  52. void *event_proc();

  53.  
  54. private:

  55. // libevent事件对象

  56. event_base *_event_base;

  57. // 事件线程ID

  58. pthread_t _event_thread;

  59. // 事件线程的信号量

  60. sem_t _event_sem;

  61. // hiredis异步对象

  62. redisAsyncContext *_redis_context;

  63. };

  64.  
  65. #endif

 

redis_publisher.cpp


 
  1. /*************************************************************************

  2. > File Name: redis_publisher.cpp

  3. > Author: chenzengba

  4. > Mail: chenzengba@gmail.com

  5. > Created Time: Sat 23 Apr 2016 10:15:09 PM CST

  6. > Description:

  7. ************************************************************************/

  8.  
  9. #include <stddef.h>

  10. #include <assert.h>

  11. #include <string.h>

  12. #include "redis_publisher.h"

  13.  
  14. CRedisPublisher::CRedisPublisher():_event_base(0), _event_thread(0),

  15. _redis_context(0)

  16. {

  17. }

  18.  
  19. CRedisPublisher::~CRedisPublisher()

  20. {

  21. }

  22.  
  23. bool CRedisPublisher::init()

  24. {

  25. // initialize the event

  26. _event_base = event_base_new(); // 创建libevent对象

  27. if (NULL == _event_base)

  28. {

  29. printf(": Create redis event failed.\n");

  30. return false;

  31. }

  32.  
  33. memset(&_event_sem, 0, sizeof(_event_sem));

  34. int ret = sem_init(&_event_sem, 0, 0);

  35. if (ret != 0)

  36. {

  37. printf(": Init sem failed.\n");

  38. return false;

  39. }

  40.  
  41. return true;

  42. }

  43.  
  44. bool CRedisPublisher::uninit()

  45. {

  46. _event_base = NULL;

  47.  
  48. sem_destroy(&_event_sem);

  49. return true;

  50. }

  51.  
  52. bool CRedisPublisher::connect()

  53. {

  54. // connect redis

  55. _redis_context = redisAsyncConnect("127.0.0.1", 6379); // 异步连接到redis服务器上,使用默认端口

  56. if (NULL == _redis_context)

  57. {

  58. printf(": Connect redis failed.\n");

  59. return false;

  60. }

  61.  
  62. if (_redis_context->err)

  63. {

  64. printf(": Connect redis error: %d, %s\n",

  65. _redis_context->err, _redis_context->errstr); // 输出错误信息

  66. return false;

  67. }

  68.  
  69. // attach the event

  70. redisLibeventAttach(_redis_context, _event_base); // 将事件绑定到redis context上,使设置给redis的回调跟事件关联

  71.  
  72. // 创建事件处理线程

  73. int ret = pthread_create(&_event_thread, 0, &CRedisPublisher::event_thread, this);

  74. if (ret != 0)

  75. {

  76. printf(": create event thread failed.\n");

  77. disconnect();

  78. return false;

  79. }

  80.  
  81. // 设置连接回调,当异步调用连接后,服务器处理连接请求结束后调用,通知调用者连接的状态

  82. redisAsyncSetConnectCallback(_redis_context,

  83. &CRedisPublisher::connect_callback);

  84.  
  85. // 设置断开连接回调,当服务器断开连接后,通知调用者连接断开,调用者可以利用这个函数实现重连

  86. redisAsyncSetDisconnectCallback(_redis_context,

  87. &CRedisPublisher::disconnect_callback);

  88.  
  89. // 启动事件线程

  90. sem_post(&_event_sem);

  91. return true;

  92. }

  93.  
  94. bool CRedisPublisher::disconnect()

  95. {

  96. if (_redis_context)

  97. {

  98. redisAsyncDisconnect(_redis_context);

  99. redisAsyncFree(_redis_context);

  100. _redis_context = NULL;

  101. }

  102.  
  103. return true;

  104. }

  105.  
  106. bool CRedisPublisher::publish(const std::string &channel_name,

  107. const std::string &message)

  108. {

  109. int ret = redisAsyncCommand(_redis_context,

  110. &CRedisPublisher::command_callback, this, "PUBLISH %s %s",

  111. channel_name.c_str(), message.c_str());

  112. if (REDIS_ERR == ret)

  113. {

  114. printf("Publish command failed: %d\n", ret);

  115. return false;

  116. }

  117.  
  118. return true;

  119. }

  120.  
  121. void CRedisPublisher::connect_callback(const redisAsyncContext *redis_context,

  122. int status)

  123. {

  124. if (status != REDIS_OK)

  125. {

  126. printf(": Error: %s\n", redis_context->errstr);

  127. }

  128. else

  129. {

  130. printf(": Redis connected!\n");

  131. }

  132. }

  133.  
  134. void CRedisPublisher::disconnect_callback(

  135. const redisAsyncContext *redis_context, int status)

  136. {

  137. if (status != REDIS_OK)

  138. {

  139. // 这里异常退出,可以尝试重连

  140. printf(": Error: %s\n", redis_context->errstr);

  141. }

  142. }

  143.  
  144. // 消息接收回调函数

  145. void CRedisPublisher::command_callback(redisAsyncContext *redis_context,

  146. void *reply, void *privdata)

  147. {

  148. printf("command callback.\n");

  149. // 这里不执行任何操作

  150. }

  151.  
  152. void *CRedisPublisher::event_thread(void *data)

  153. {

  154. if (NULL == data)

  155. {

  156. printf(": Error!\n");

  157. assert(false);

  158. return NULL;

  159. }

  160.  
  161. CRedisPublisher *self_this = reinterpret_cast<CRedisPublisher *>(data);

  162. return self_this->event_proc();

  163. }

  164.  
  165. void *CRedisPublisher::event_proc()

  166. {

  167. sem_wait(&_event_sem);

  168.  
  169. // 开启事件分发,event_base_dispatch会阻塞

  170. event_base_dispatch(_event_base);

  171.  
  172. return NULL;

  173. }

 

redis_subscriber.h


 
  1. /*************************************************************************

  2. > File Name: redis_subscriber.h

  3. > Author: chenzengba

  4. > Mail: chenzengba@gmail.com

  5. > Created Time: Sat 23 Apr 2016 10:15:09 PM CST

  6. > Description: 封装hiredis,实现消息订阅redis功能

  7. ************************************************************************/

  8.  
  9. #ifndef REDIS_SUBSCRIBER_H

  10. #define REDIS_SUBSCRIBER_H

  11.  
  12. #include <stdlib.h>

  13. #include <hiredis/async.h>

  14. #include <hiredis/adapters/libevent.h>

  15. #include <string>

  16. #include <vector>

  17. #include <unistd.h>

  18. #include <pthread.h>

  19. #include <semaphore.h>

  20. #include <boost/tr1/functional.hpp>

  21.  
  22. class CRedisSubscriber

  23. {

  24. public:

  25. typedef std::tr1::function<void (const char *, const char *, int)> \

  26. NotifyMessageFn; // 回调函数对象类型,当接收到消息后调用回调把消息发送出去

  27.  
  28. CRedisSubscriber();

  29. ~CRedisSubscriber();

  30.  
  31. bool init(const NotifyMessageFn &fn); // 传入回调对象

  32. bool uninit();

  33. bool connect();

  34. bool disconnect();

  35.  
  36. // 可以多次调用,订阅多个频道

  37. bool subscribe(const std::string &channel_name);

  38.  
  39. private:

  40. // 下面三个回调函数供redis服务调用

  41. // 连接回调

  42. static void connect_callback(const redisAsyncContext *redis_context,

  43. int status);

  44.  
  45. // 断开连接的回调

  46. static void disconnect_callback(const redisAsyncContext *redis_context,

  47. int status);

  48.  
  49. // 执行命令回调

  50. static void command_callback(redisAsyncContext *redis_context,

  51. void *reply, void *privdata);

  52.  
  53. // 事件分发线程函数

  54. static void *event_thread(void *data);

  55. void *event_proc();

  56.  
  57. private:

  58. // libevent事件对象

  59. event_base *_event_base;

  60. // 事件线程ID

  61. pthread_t _event_thread;

  62. // 事件线程的信号量

  63. sem_t _event_sem;

  64. // hiredis异步对象

  65. redisAsyncContext *_redis_context;

  66.  
  67. // 通知外层的回调函数对象

  68. NotifyMessageFn _notify_message_fn;

  69. };

  70.  
  71. #endif

 

redis_subscriber.cpp:


 
  1. /*************************************************************************

  2. > File Name: redis_subscriber.cpp

  3. > Author: chenzengba

  4. > Mail: chenzengba@gmail.com

  5. > Created Time: Sat 23 Apr 2016 10:15:09 PM CST

  6. > Description:

  7. ************************************************************************/

  8.  
  9. #include <stddef.h>

  10. #include <assert.h>

  11. #include <string.h>

  12. #include "redis_subscriber.h"

  13.  
  14. CRedisSubscriber::CRedisSubscriber():_event_base(0), _event_thread(0),

  15. _redis_context(0)

  16. {

  17. }

  18.  
  19. CRedisSubscriber::~CRedisSubscriber()

  20. {

  21. }

  22.  
  23. bool CRedisSubscriber::init(const NotifyMessageFn &fn)

  24. {

  25. // initialize the event

  26. _notify_message_fn = fn;

  27. _event_base = event_base_new(); // 创建libevent对象

  28. if (NULL == _event_base)

  29. {

  30. printf(": Create redis event failed.\n");

  31. return false;

  32. }

  33.  
  34. memset(&_event_sem, 0, sizeof(_event_sem));

  35. int ret = sem_init(&_event_sem, 0, 0);

  36. if (ret != 0)

  37. {

  38. printf(": Init sem failed.\n");

  39. return false;

  40. }

  41.  
  42. return true;

  43. }

  44.  
  45. bool CRedisSubscriber::uninit()

  46. {

  47. _event_base = NULL;

  48.  
  49. sem_destroy(&_event_sem);

  50. return true;

  51. }

  52.  
  53. bool CRedisSubscriber::connect()

  54. {

  55. // connect redis

  56. _redis_context = redisAsyncConnect("127.0.0.1", 6379); // 异步连接到redis服务器上,使用默认端口

  57. if (NULL == _redis_context)

  58. {

  59. printf(": Connect redis failed.\n");

  60. return false;

  61. }

  62.  
  63. if (_redis_context->err)

  64. {

  65. printf(": Connect redis error: %d, %s\n",

  66. _redis_context->err, _redis_context->errstr); // 输出错误信息

  67. return false;

  68. }

  69.  
  70. // attach the event

  71. redisLibeventAttach(_redis_context, _event_base); // 将事件绑定到redis context上,使设置给redis的回调跟事件关联

  72.  
  73. // 创建事件处理线程

  74. int ret = pthread_create(&_event_thread, 0, &CRedisSubscriber::event_thread, this);

  75. if (ret != 0)

  76. {

  77. printf(": create event thread failed.\n");

  78. disconnect();

  79. return false;

  80. }

  81.  
  82. // 设置连接回调,当异步调用连接后,服务器处理连接请求结束后调用,通知调用者连接的状态

  83. redisAsyncSetConnectCallback(_redis_context,

  84. &CRedisSubscriber::connect_callback);

  85.  
  86. // 设置断开连接回调,当服务器断开连接后,通知调用者连接断开,调用者可以利用这个函数实现重连

  87. redisAsyncSetDisconnectCallback(_redis_context,

  88. &CRedisSubscriber::disconnect_callback);

  89.  
  90. // 启动事件线程

  91. sem_post(&_event_sem);

  92. return true;

  93. }

  94.  
  95. bool CRedisSubscriber::disconnect()

  96. {

  97. if (_redis_context)

  98. {

  99. redisAsyncDisconnect(_redis_context);

  100. redisAsyncFree(_redis_context);

  101. _redis_context = NULL;

  102. }

  103.  
  104. return true;

  105. }

  106.  
  107. bool CRedisSubscriber::subscribe(const std::string &channel_name)

  108. {

  109. int ret = redisAsyncCommand(_redis_context,

  110. &CRedisSubscriber::command_callback, this, "SUBSCRIBE %s",

  111. channel_name.c_str());

  112. if (REDIS_ERR == ret)

  113. {

  114. printf("Subscribe command failed: %d\n", ret);

  115. return false;

  116. }

  117.  
  118. printf(": Subscribe success: %s\n", channel_name.c_str());

  119. return true;

  120. }

  121.  
  122. void CRedisSubscriber::connect_callback(const redisAsyncContext *redis_context,

  123. int status)

  124. {

  125. if (status != REDIS_OK)

  126. {

  127. printf(": Error: %s\n", redis_context->errstr);

  128. }

  129. else

  130. {

  131. printf(": Redis connected!");

  132. }

  133. }

  134.  
  135. void CRedisSubscriber::disconnect_callback(

  136. const redisAsyncContext *redis_context, int status)

  137. {

  138. if (status != REDIS_OK)

  139. {

  140. // 这里异常退出,可以尝试重连

  141. printf(": Error: %s\n", redis_context->errstr);

  142. }

  143. }

  144.  
  145. // 消息接收回调函数

  146. void CRedisSubscriber::command_callback(redisAsyncContext *redis_context,

  147. void *reply, void *privdata)

  148. {

  149. if (NULL == reply || NULL == privdata) {

  150. return ;

  151. }

  152.  
  153. // 静态函数中,要使用类的成员变量,把当前的this指针传进来,用this指针间接访问

  154. CRedisSubscriber *self_this = reinterpret_cast<CRedisSubscriber *>(privdata);

  155. redisReply *redis_reply = reinterpret_cast<redisReply *>(reply);

  156.  
  157. // 订阅接收到的消息是一个带三元素的数组

  158. if (redis_reply->type == REDIS_REPLY_ARRAY &&

  159. redis_reply->elements == 3)

  160. {

  161. printf(": Recieve message:%s:%d:%s:%d:%s:%d\n",

  162. redis_reply->element[0]->str, redis_reply->element[0]->len,

  163. redis_reply->element[1]->str, redis_reply->element[1]->len,

  164. redis_reply->element[2]->str, redis_reply->element[2]->len);

  165.  
  166. // 调用函数对象把消息通知给外层

  167. self_this->_notify_message_fn(redis_reply->element[1]->str,

  168. redis_reply->element[2]->str, redis_reply->element[2]->len);

  169. }

  170. }

  171.  
  172. void *CRedisSubscriber::event_thread(void *data)

  173. {

  174. if (NULL == data)

  175. {

  176. printf(": Error!\n");

  177. assert(false);

  178. return NULL;

  179. }

  180.  
  181. CRedisSubscriber *self_this = reinterpret_cast<CRedisSubscriber *>(data);

  182. return self_this->event_proc();

  183. }

  184.  
  185. void *CRedisSubscriber::event_proc()

  186. {

  187. sem_wait(&_event_sem);

  188.  
  189. // 开启事件分发,event_base_dispatch会阻塞

  190. event_base_dispatch(_event_base);

  191.  
  192. return NULL;

  193. }

 

问题1:hiredis官网没有异步接口的实现例子。

        hiredis提供了几个异步通信的API,一开始根据API名字的理解,我们实现了跟redis服务器建立连接、订阅和发布的功能,可在实际使用的时候,程序并没有像我们预想的那样,除了能够建立连接外,任何事情都没发生。

        网上查了很多资料,原来hiredis的异步实现是通过事件来分发redis发送过来的消息的,hiredis可以使用libae、libev、libuv和libevent中的任何一个实现事件的分发,网上的资料提示使用libae、libev和libuv可能发生其他问题,这里为了方便就选用libevent。hireds官网并没有对libevent做任何介绍,也没用说明使用异步机制需要引入事件的接口,所以一开始走了很多弯路。

        关于libevent的使用这里就不再赘述,详情可以见libevent官网。

libevent官网:http://libevent.org/

libevent api文档:https://www.monkey.org/~provos/libevent/doxygen-2.0.1/include_2event2_2event_8h.html#6e9827de8c3014417b11b48f2fe688ae

 

CRedisPublisher和CRedisSubscriber的初始化过程:

初始化事件处理,并获得事件处理的实例:

_event_base = event_base_new();


在获得redisAsyncContext *之后,调用

redisLibeventAttach(_redis_context, _event_base);

这样就将事件处理和redis关联起来,最后在另一个线程调用

event_base_dispatch(_event_base);

启动事件的分发,这是一个阻塞函数,因此,创建了一个新的线程处理事件分发,值得注意的是,这里用信号灯_event_sem控制线程的启动,意在程序调用


 
  1. redisAsyncSetConnectCallback(_redis_context,

  2. &CRedisSubscriber::connect_callback);

  3. redisAsyncSetDisconnectCallback(_redis_context,

  4. &CRedisSubscriber::disconnect_callback);

之后,能够完全捕捉到这两个回调。

 

问题2 奇特的‘ERR only (P)SUBSCRIBE / (P)UNSUBSCRIBE / QUIT allowed in this context’错误

        有些人会觉得这两个类设计有点冗余,我们发现CRedisPublisher和CRedisSubscriber很多逻辑是一样的,为什么不把他们整合到一起成一个类,既能够发布消息也能够订阅消息。其实一开始我就是这么干的,在使用的时候发现,用同个redisAsynContex *对象进行消息订阅和发布,与redis服务连接会自动断开,disconnect_callback回调会被调用,并且返回奇怪的错误:ERR only (P)SUBSCRIBE / (P)UNSUBSCRIBE / QUIT allowed in this context,因此,不能使用同个redisAsyncContext *对象实现发布和订阅。这里为了减少设计的复杂性,就将两个类的逻辑分开了。

        当然,你也可以将相同的逻辑抽象到一个基类里,并实现publish和subscribe接口。

 

问题3 相关依赖的库

        编译之前,需要安装hiredis、libevent和boost库,我是用的是Ubuntu x64系统。

hiredis官网:https://github.com/redis/hiredis

下载源码解压,进入解压目录,执行make && make install命令。

libevent官网:http://libevent.org/下载最新的稳定版

解压后进入解压目录,执行命令

./configure -prefix=/usr

sudo make && make install

boost库:直接执行安装:sudo apt-get install libboost-dev

如果你不是用std::tr1::function的函数对象来给外层通知消息,就不需要boost库。你可以用接口的形式实现回调,把接口传给CRedisSubscribe类,让它在接收到消息后调用接口回调,通知外层。

 

问题4 如何使用

        最后贴出例子代码。

publisher.cpp,实现发布消息:


 
  1. /*************************************************************************

  2. > File Name: publisher.cpp

  3. > Author: chenzengba

  4. > Mail: chenzengba@gmail.com

  5. > Created Time: Sat 23 Apr 2016 12:13:24 PM CST

  6. ************************************************************************/

  7.  
  8. #include "redis_publisher.h"

  9.  
  10. int main(int argc, char *argv[])

  11. {

  12. CRedisPublisher publisher;

  13.  
  14. bool ret = publisher.init();

  15. if (!ret)

  16. {

  17. printf("Init failed.\n");

  18. return 0;

  19. }

  20.  
  21. ret = publisher.connect();

  22. if (!ret)

  23. {

  24. printf("connect failed.");

  25. return 0;

  26. }

  27.  
  28. while (true)

  29. {

  30. publisher.publish("test-channel", "Test message");

  31. sleep(1);

  32. }

  33.  
  34. publisher.disconnect();

  35. publisher.uninit();

  36. return 0;

  37. }


subscriber.cpp实现订阅消息:


 
  1. /*************************************************************************

  2. > File Name: subscriber.cpp

  3. > Author: chenzengba

  4. > Mail: chenzengba@gmail.com

  5. > Created Time: Sat 23 Apr 2016 12:26:42 PM CST

  6. ************************************************************************/

  7.  
  8. #include "redis_subscriber.h"

  9.  
  10. void recieve_message(const char *channel_name,

  11. const char *message, int len)

  12. {

  13. printf("Recieve message:\n channel name: %s\n message: %s\n",

  14. channel_name, message);

  15. }

  16.  
  17. int main(int argc, char *argv[])

  18. {

  19. CRedisSubscriber subscriber;

  20. CRedisSubscriber::NotifyMessageFn fn =

  21. bind(recieve_message, std::tr1::placeholders::_1,

  22. std::tr1::placeholders::_2, std::tr1::placeholders::_3);

  23.  
  24. bool ret = subscriber.init(fn);

  25. if (!ret)

  26. {

  27. printf("Init failed.\n");

  28. return 0;

  29. }

  30.  
  31. ret = subscriber.connect();

  32. if (!ret)

  33. {

  34. printf("Connect failed.\n");

  35. return 0;

  36. }

  37.  
  38. subscriber.subscribe("test-channel");

  39.  
  40. while (true)

  41. {

  42. sleep(1);

  43. }

  44.  
  45. subscriber.disconnect();

  46. subscriber.uninit();

  47.  
  48. return 0;

  49. }


关于编译的问题:在g++中编译,注意要加上-lhiredis -levent参数,下面是一个简单的Makefile:


 
  1. EXE=server_main client_main

  2. CC=g++

  3. FLAG=-lhiredis -levent

  4. OBJ=redis_publisher.o publisher.o redis_subscriber.o subscriber.o

  5.  
  6. all:$(EXE)

  7.  
  8. $(EXE):$(OBJ)

  9. $(CC) -o publisher redis_publisher.o publisher.o $(FLAG)

  10. $(CC) -o subscriber redis_subscriber.o subscriber.o $(FLAG)

  11.  
  12. redis_publisher.o:redis_publisher.h

  13. redis_subscriber.o:redis_subscriber.h

  14.  
  15. publisher.o:publisher.cpp

  16. $(CC) -c publisher.cpp

  17.  
  18. subscriber.o:subscriber.cpp

  19. $(CC) -c subscriber.cpp

  20. clean:

  21. rm publisher subscriber *.o

 

致谢:

redis异步API使用libevent:http://www.tuicool.com/articles/N73uuu

上一篇:同一个sshkey用在多台电脑上


下一篇:django(一)