【原创】modb 功能设计之“支持多消费者单生产者”

【功能的具体化】
      首先,因为经我改造后的 rabbitmq-c 客户端程序是基于 libevent 的,所以天然可以做到单线程中同时处理多 TCP 连接。理论上,可以对外宣称”该客户端程序支持任意数量生产者和消费者的组合“,而不用担心多线程切换的开销。唯一可能拖慢整个系统的地方,是在 Consumer 针对 RabbitMQ 服务器应答进行回调处理的过程中,即回调处理函数中不能,或者说不应该,进行耗时的操作。 

a. 处理应答的回调函数中可能需要实现的处理逻辑有哪些? 
  1. 获取本地应用发来的 rabbitmq msg,再按目的地(routing_key)进行纯转发动作。对应模型为【1P + 1C + 内部转发 Queue】,运行在单独一个线程中。其中 P 代表 Producer,C 代表 Consumer,均是 RabbitMQ 里的概念。
  2. 获取外部应用发来的 rabbitmq msg,提取消息中含有的 sql 语句并执行,执行成功后再发送 rabbitmq msg 通知本地应用数据库更新成功。对应模型为【1P + nC + 内部 sql 处理 Queue】,运行于两个线程中,一个用于 rabbitmq 相关消息处理和转发,另一用于 sql 处理。
  3. 需要支持 json 或/和 XML 解析。
b. 采用单线模型还是多线程模型(每个线程中都有独自的 event_base)? 
      对于功能 1,单线程足矣;对于功能 2,目前看来至少需要2个线程,即 1P + nC 使用一个线程,sql 处理使用单独一个线程。 

c. 1P + 1C 和 1P + nC 是否可以或者应该在一个线程中处理? 
      按照前文的说明,一个线程中可以处理任意多 TCP 连接(当前改造后的 rabbitmq-c 的基础前提是:一个 TCP 连接上仅使用一个 channel,所以每个 Producer 和 Consumer 都需要独占一个 TCP 连接),所以关键问题是是否应该这么做。对于 Consumer 来说,因为可读事件是会被即时检测的,所以对于两种业务模型的差别就在于回调函数的实现:前者仅需关注转发目的地,后者需要额外的线程来执行 sql 语句。
      一种可能的执行流程如下:Consumer 在回调函数处理中将待执行 sql 语句提取出来后放入内部 sql 处理 Queue 中,待 sql 处理线程从中提取并完成 sql 执行后,将结果 push 进另外一个内部 sql_result Queue 中,而 Producer 会通过定时检测的方式获取 sql_result Queue 中的结果,并发送到用于通知本地业务的 exchange 上。
      上述流程没有详细说明的细节:Consumer 从 RabbitMQ 服务器消费消息时采用的是”自动应答“模式还是”手动应答“模式,这将会对消息的可靠性产生影响,间接影响到数据的一致性。因为在”自动应答“模式下,消息一旦被消费就会被 RabbitMQ 服务器删除掉,而一旦在 sql 执行过程中出现异常,则会导致数据不一致,需要自行实现某种机制保证数据的一致性。若在”手动应答“模式下,则可能的一种实现方式为,针对从 RabbitMQ 消费消息的应答,需要等待 sql 执行完成后才能进行。一则会拖慢整个事件驱动的轮转,二则当 sql 执行失败后,即使使用 REJECT 信令告之 RabbitMQ 服务器无法正确处理,该消息仍然会被删除掉,依然会有数据不一致的问题存在。如果使用了 带有 requeue 属性的 REJECT 信令呢?结果还是一样的,因为只有唯一一个 Consumer 去消费该消息,当再次重新消费该消息时,之前出现的 sql 异常可能还会出现。看来只有华山一条路了,自己实现某种纠错机制,保证数据的一致性。(关于纠错的问题,后续博客再说) 

【实现中遇到的挫折】

a. rabbitmq 线程与 sql 执行线程之间的跨线程通信手段
      如何在 sql 线程执行完成后,告之 Producer 去发送通知消息给其他业务。手段可能有很多种,但是考虑到 rabbitmq 线程是基于 libevent 运转的,这就需要一点小技巧了,后续有单独博客进行说明。

b. 消息传递 Queue 的使用
 
      在最初的设计中,【1P + 1C 
+ 内部转发 Queue】模型里使用的内部转发 Queue 是基于 Producer 和 Consumer 所使用的 conn 连接的,即每一个 conn 上都有这样一个 Queue 的存在。当开始调试【1P + 1C】功能时,发现当 Consumer 拿到 msg 后只能将其存放在自身 conn 上的 Queue 中,而 Producer 无法直接获得这些 msg (当然可以借助外部处理来 pop 和 push ,但似乎不是个好方案),所以决定还是采用全局 msgQ(转发 msg 用)的方式来处理。
 
      在改为全局 Queue 后又发现一个新问题,即 Consumer 对 msg 的接收具有即时性,但 Producer 对 msg 的发送却无法做到。原因在于,可以针对 Consumer 对应 conn 的 sockfd 监听可读和超时事件,但针对 Producer 对应 conn 的 sockfd 却不能去监听可写事件。这就导致了一个问题,即 Producer 只能按照定时器指定的时间发送 msg 。换句话说,即使设置的定时时间再短,也达不到即时发送的效果。那么如何解决这个问题呢?留个思考给大家把~~

(在只使用可读、可写、超时事件的情况下应该是无解的,估计可以利用信号事件解决)


最后附上一张【1P + 1C】的结构图: 
【原创】modb 功能设计之“支持多消费者单生产者”
上一篇:GRE Over IPsec ***思科路由器配置案例(EVE-NG模拟器环境下)


下一篇:时间序列学习(2):白噪声、随机游走