基于Redis的消息订阅/发布

在工业生产设计中,我们往往需要实现一个基于消息订阅的模式,用来对非定时的的消息进行监听订阅。

这种设计模式在 总线设计模式中得到体现。微软以前的WCF中实现了服务总线 ServiceBus的设计模式。然并卵。WCF已经好像是上个世纪的产物................

基于事件订阅的模式,比如 EventBus类的组件产品。但是往往设计比较复杂。

如果依赖于 Redis做事件消息推送。那就大大简化了这种设计模式,而且性能也比较客观。

Redis在 2.0之后的版本中 实现了 事件推送的  pub/sub命令

PSUBSCRIBE订阅一个或多个符合给定模式的频道
PUBLISH将信息message 发送到指定的频道channel
PUBSUB是一个查看订阅与发布系统状态的内省命令
PUBSUB CHANNELS pattern 列出当前的活跃频道
PUBSUB NUMSUB channel-1 channel-N 返回给定频道的订阅者数量
PUBSUB NUMPAT 返回订阅模式的数量
PUNSUBSCRIBE 指示客户端退订所有给定模式
SUBSCRIBE 订阅给定的一个或多个频道的信息
UNSUBSCRIBE 指示客户端退订给定的频道

P 开头的 (pattern)支持通配符模式。

简单例子(来自:http://www.yiibai.com/redis/redis_pub_sub.html)

以下举例说明如何发布用户的概念工作。在下面的例子给出一个客户端订阅一个通道名为redisChat

redis 127.0.0.1:6379> SUBSCRIBE redisChat

Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "redisChat"
3) (integer) 1

现在,两个客户端都发布在同一个通道名redisChat消息及以上的订阅客户端接收消息。

redis 127.0.0.1:6379> PUBLISH redisChat "Redis is a great caching technique"

(integer) 1

redis 127.0.0.1:6379> PUBLISH redisChat "Learn redis by tutorials point"

(integer) 1

1) "message"
2) "redisChat"
3) "Redis is a great caching technique"
1) "message"
2) "redisChat"
3) "Learn redis by tutorials point"

上面的代码简单的演示了,订阅信道,向指定的信道发布消息。然后消息推送到订阅者。

在 redis-cli客户端中 推送消息的时候,返回成功发送到订阅者的数目。

如:(integer) 1

原理:

RedisServer包含两个重要的结构:
1. channels:实际上就是一个key-value的Map结构,key为订阅地频道,value为Client的List
2. patterns:存放模式+client地址的列表

基于Redis的消息订阅/发布

 

流程:从pubsub_channels中找出跟publish中channel相符的clients-list,然后再去pubsub_patterns中找出每一个相符的pattern和client。向这些客户端发送publish的消息。

订阅信道

基于Redis的消息订阅/发布

消息推送

基于Redis的消息订阅/发布

在C#中的实现

基于

ServiceStack.Redis

             Task.Factory.StartNew(() =>
{
var client = new RedisClient("192.168.1.100", , "你的密码");
try
{
var isOpen = client.Ping();
if (isOpen == false)
{
return;
}
var sub1 = client.CreateSubscription(); //接受消息的委托
sub1.OnMessage = (chanel, message) =>
{
Console.WriteLine("chanel is :{0}", chanel);
Console.WriteLine("message is :{0}", message);
};
sub1.SubscribeToChannels(new string[] { "testchat" });//注意:订阅信道的时候 会开启阻塞模式,所以,需要将监听放到单独的线程里
}
catch (Exception)
{ throw;
} });

注意:在程序终止或者类的实例被销毁的时候,请将订阅者实例注销掉,否则,在redis中一直存在这个订阅者。

1 使用idispose 显示释放

2 使用析构函数 CLR回收的时候 释放

sub1.Dispose();

例如:

  public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
} protected virtual void Dispose(bool disposing)
{
if (disposing)
{
if (redisClient != null)
redisClient.Dispose();
if (subscription != null)
subscription.Dispose();
if (log != null)
log.Dispose();
}
} ~RedisEx()
{
Dispose(false);
}

官方推荐这种写法

 var clientsManager = new PooledRedisClientManager(new string[] { "密码@192.168.1.200:6379" });
var redisPubSub = new RedisPubSubServer(clientsManager, new string[] { "testchat" })
{
OnMessage = (channel, msg) => {
Console.WriteLine("方式2订阅演示.............");
Console.WriteLine("Received '{0}' from '{1}'", msg, channel);
}
}.Start();

To use RedisPubSubServer, initialize it with the channels you want to subscribe to and assign handlers for each of the events you want to handle.

At a minimum you'll want to handle OnMessage:

Calling Start() after it's initialized will get it to start listening and processing any messages published to the subscribed channels.

官方文档:https://github.com/ServiceStack/ServiceStack.Redis

附加文档

 
 
 

PSUBSCRIBE(订阅一个或多个符合给定模式的频道)

PSUBSCRIBE pattern [pattern …] 
订阅一个或多个符合给定模式的频道。 
每个模式以* 作为匹配符,比如it* 匹配所有以it 开头的频道( it.news 、it.blog 、it.tweets 等等), 
news.* 匹配所有以news. 开头的频道( news.it 、news.global.today 等等),诸如此类。 
可用版本: >= 2.0.0 
时间复杂度: O(N),N 是订阅的模式的数量。 
返回值: 接收到的信息(请参见下面的代码说明)。

redis> psubscribe news.*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe" # 返回值的类型:显示订阅成功
2) "news.*" # 订阅的模式
3) (integer) 1 # 目前已订阅的模式的数量 1) "pmessage" # 返回值的类型:信息
2) "news.*" # 信息匹配的模式
3) "news.it" # 信息本身的目标频道
4) "Google buy Motorola" # 信息的内容

PUBLISH(将信息message 发送到指定的频道channel )

PUBLISH channel message 
将信息message 发送到指定的频道channel 。 
可用版本: >= 2.0.0 
时间复杂度: O(N+M),其中N 是频道channel 的订阅者数量,而M 则是使用模式订阅(subscribed 
patterns) 的客户端的数量。 
返回值: 接收到信息message 的订阅者数量。

# 对没有订阅者的频道发送信息
redis> publish bad_channel "can any body hear me?"
(integer) 0
# 向有一个订阅者的频道发送信息
redis> publish msg "good morning"
(integer) 1
# 向有多个订阅者的频道发送信息
redis> publish chat_room "hello~ everyone"
(integer) 3

PUBSUB(是一个查看订阅与发布系统状态的内省命令)

PUBSUB [argument [argument …]] 
PUBSUB 是一个查看订阅与发布系统状态的内省命令,它由数个不同格式的子命令组成,以下将分别对这 
些子命令进行介绍。 
可用版本: >= 2.8.0

PUBSUB CHANNELS [pattern] (列出当前的活跃频道)

列出当前的活跃频道。 
活跃频道指的是那些至少有一个订阅者的频道,订阅模式的客户端不计算在内。 
pattern 参数是可选的: 
• 如果不给出pattern 参数,那么列出订阅与发布系统中的所有活跃频道。 
• 如果给出pattern 参数,那么只列出和给定模式pattern 相匹配的那些活跃频道。 
复杂度: O(N) ,N 为活跃频道的数量(对于长度较短的频道和模式来说,将进行模式匹配的复杂度视为常 
数)。 
返回值: 一个由活跃频道组成的列表。

# client-1 订阅news.it 和news.sport 两个频道
client-1> SUBSCRIBE news.it news.sport
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "news.it"
3) (integer) 1
1) "subscribe"
2) "news.sport"
3) (integer) 2
# client-2 订阅news.it 和news.internet 两个频道
client-2> SUBSCRIBE news.it news.internet
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "news.it"
3) (integer) 1
1) "subscribe"
2) "news.internet"
3) (integer) 2
# 首先, client-3 打印所有活跃频道
# 注意,即使一个频道有多个订阅者,它也只输出一次,比如news.it
client-3> PUBSUB CHANNELS
1) "news.sport"
2) "news.internet"
3) "news.it"
# 接下来, client-3 打印那些与模式news.i* 相匹配的活跃频道
# 因为news.sport 不匹配news.i* ,所以它没有被打印
redis> PUBSUB CHANNELS news.i*
1) "news.internet"
2) "news.it"

PUBSUB NUMSUB [channel-1 … channel-N] (返回给定频道的订阅者数量)

返回给定频道的订阅者数量,订阅模式的客户端不计算在内。 
复杂度: O(N) ,N 为给定频道的数量。 
返回值: 一个多条批量回复( Multi-bulk reply),回复中包含给定的频道,以及频道的订阅者数量。格式为:频道 channel-1 ,channel-1 的订阅者数量,频道 channel-2 ,channel-2 的订阅者数量,诸如此类。 
回复中频道的排列顺序和执行命令时给定频道的排列顺序一致。不给定任何频道而直接调用这个命令也是可以的,在这种情况下,命令只返回一个空列表。

# client-1 订阅 news.it 和 news.sport 两个频道
client-1> SUBSCRIBE news.it news.sport
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "news.it"
3) (integer) 1
1) "subscribe"
2) "news.sport"
3) (integer) 2
# client-2 订阅 news.it 和 news.internet 两个频道
client-2> SUBSCRIBE news.it news.internet
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "news.it"
3) (integer) 1
1) "subscribe"
2) "news.internet"
3) (integer) 2
# client-3 打印各个频道的订阅者数量
client-3> PUBSUB NUMSUB news.it news.internet news.sport news.music
1) "news.it" # 频道
2) "2" # 订阅该频道的客户端数量
3) "news.internet"
4) "1"
5) "news.sport"
6) "1"
7) "news.music" # 没有任何订阅者
8) "0"

PUBSUB NUMPAT (返回订阅模式的数量)

注意,这个命令返回的不是订阅模式的客户端的数量,而是客户端订阅的所有模式的数量总和。 
复杂度: O(1) 。 
返回值: 一个整数回复( Integer reply)。

# client-1 订阅 news.* 和 discount.* 两个模式
client-1> PSUBSCRIBE news.* discount.*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "news.*"
3) (integer) 1
1) "psubscribe"
2) "discount.*"
3) (integer) 2
# client-2 订阅 tweet.* 一个模式
client-2> PSUBSCRIBE tweet.*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "tweet.*"
3) (integer) 1
# client-3 返回当前订阅模式的数量为 3
client-3> PUBSUB NUMPAT
(integer) 3
# 注意,当有多个客户端订阅相同的模式时,相同的订阅也被计算在 PUBSUB NUMPAT 之内
# 比如说,再新建一个客户端 client-4 ,让它也订阅 news.* 频道
client-4> PSUBSCRIBE news.*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "news.*"
3) (integer) 1
# 这时再计算被订阅模式的数量,就会得到数量为 4
client-3> PUBSUB NUMPAT
(integer) 4
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

PUNSUBSCRIBE (指示客户端退订所有给定模式)

PUNSUBSCRIBE [pattern [pattern …]] 
指示客户端退订所有给定模式。 
如果没有模式被指定,也即是,一个无参数的 PUNSUBSCRIBE 调用被执行,那么客户端使用PSUBSCRIBE 
命令订阅的所有模式都会被退订。在这种情况下,命令会返回一个信息,告知客户端所有被退订的模式。 
可用版本: >= 2.0.0 
时间复杂度: O(N+M) ,其中 N 是客户端已订阅的模式的数量,M 则是系统中所有客户端订阅的模式的数量。 
返回值: 这个命令在不同的客户端中有不同的表现。

SUBSCRIBE (订阅给定的一个或多个频道的信息)

订阅给定的一个或多个频道的信息。 
可用版本: >= 2.0.0 
时间复杂度: O(N),其中 N 是订阅的频道的数量。 
返回值: 接收到的信息 (请参见下面的代码说明)。

# 订阅 msg 和 chat_room 两个频道
# 1 - 6 行是执行 subscribe 之后的反馈信息
# 第 7 - 9 行才是接收到的第一条信息
# 第 10 - 12 行是第二条
redis> subscribe msg chat_room
Reading messages... (press Ctrl-C to quit)
1) "subscribe" # 返回值的类型:显示订阅成功
2) "msg" # 订阅的频道名字
3) (integer) 1 # 目前已订阅的频道数量
1) "subscribe"
2) "chat_room"
3) (integer) 2
1) "message" # 返回值的类型:信息
2) "msg" # 来源 (从那个频道发送过来)
3) "hello moto" # 信息内容
1) "message"
2) "chat_room"
3) "testing...haha"

UNSUBSCRIBE (指示客户端退订给定的频道)

UNSUBSCRIBE [channel [channel …]] 
指示客户端退订给定的频道。 
如果没有频道被指定,也即是,一个无参数的 UNSUBSCRIBE 调用被执行,那么客户端使用SUBSCRIBE 命令订阅的所有频道都会被退订。在这种情况下,命令会返回一个信息,告知客户端所有被退订的频道。 
可用版本: >= 2.0.0 
时间复杂度: O(N) ,N 是客户端已订阅的频道的数量。 
返回值: 这个命令在不同的客户端中有不同的表现。

 
 

参考文档:

http://blog.csdn.net/u011506468/article/details/47337839

http://my.oschina.net/itblog/blog/601284

http://www.oschina.net/code/snippet_584165_52231

http://redis.io/topics/pubsub

上一篇:Linux的IO性能监控


下一篇:warnings.filterwarnings(“ignore“)啥作用