发布订阅
订阅频道,如果该频道中有新的消息,那么订阅该频道的客户端都可以收到消息。主要涉及的命令:PUBLISH、SUBSCRIBE、PSUBSCRIBE。
频道的订阅与退订
订阅频道
操作
订阅某个频道使用SUBSCRIBE命令,如果想订阅多个频道,直接追加即可。
SUBSCRIBE "news.sport" "news.movie"
原理
在redisServer结构中,保存了所有频道的订阅关系:
struct redisServer {
// ...
// 保存所有频道的订阅关系
dict *pubsub_channels;
// ...
};
这个字典的键是某个被订阅的频道,而键的值则是一个链表,链表里面记录了所有订阅这个频道的客户端。举例:如果客户端要订阅一个test频道,如果pubsub_channels字典中有这个频道,就添加在这个频道的链表中。否则就在pubsub_channels字典中添加一个key为test的键,值是一个链表,链表中保存客户端的信息。
伪代码:
def subscribe(*all_input_channels):
# 遍历输入的所有频道
for channel in all_input_channels:
# 如果channel不存在于pubsub_channels字典(没有任何订阅者)
# 那么在字典中添加channel键,并设置它的值为空链表
if channel not in server.pubsub_channels:
server.pubsub_channels[channel] = []
# 将订阅者添加到频道所对应的链表的末尾
server.pubsub_channels[channel].append(client)
退订频道
操作
退订和订阅相反,使用的是UNSUBSCRIBE命令。
UNSUBSCRIBE "news.sport" "news.movie"
原理
退订操作就是将该客户端从pubsub_channels中对应频道中的链表中移除。
伪代码:
def unsubscribe(*all_input_channels):
# 遍历要退订的所有频道
for channel in all_input_channels:
# 在订阅者链表中删除退订的客户端
server.pubsub_channels[channel].remove(client)
# 如果频道已经没有任何订阅者了(订阅者链表为空)
# 那么将频道从字典中删除
if len(server.pubsub_channels[channel]) == 0:
server.pubsub_channels.remove(channel)
模式的订阅与退订
订阅模式是什么呢?其实就是通过正在表达式匹配多个频道。new.[ie]t可以匹配上new.it和new.et两个频道,这两个频道的消息,订阅模式的客户端都可以收到。
操作
命令:PSUBSCRIBE。
PSUBSCRIBE "news.*"
原理
服务器也将所有模式的订阅关系都保存在服务器状态的pubsub_patterns属性里面:
struct redisServer {
// ...
// 保存所有模式订阅关系
list *pubsub_patterns;
// ...
};
pubsub_patterns属性是一个链表,链表中的每个节点都包含着一个pubsub Pattern结构,这个结构的pattern属性记录了被订阅的模式,而client属性则记录了订阅模式的客户端:
typedef struct pubsubPattern {
// 订阅模式的客户端
redisClient *client;
// 被订阅的模式
robj *pattern;
} pubsubPattern;
订阅模式就是添加一个节点,退订模式就是删除对应的节点。
发送消息
了解了底层结构,向频道中发消息可以很容易理解了。
当向一个频道发消息时,直接遍历该频道的链表,将消息发送给链表中的所有客户端。之后再遍历模式的链表,找到与频道匹配的节点,将消息发送给该客户端。
给订阅频道的客户端发送消息,伪代码:
def channel_publish(channel, message):
# 如果channel键不存在于pubsub_channels字典中
# 那么说明channel频道没有任何订阅者
# 程序不做发送动作,直接返回
if channel not in server.pubsub_channels:
return
# 运行到这里,说明channel频道至少有一个订阅者
# 程序遍历channel频道的订阅者链表
# 将消息发送给所有订阅者
for subscriber in server.pubsub_channels[channel]:
send_message(subscriber, message)
给订阅模式的客户端发送消息,伪代码:
def pattern_publish(channel, message):
# 遍历所有模式订阅消息
for pubsubPattern in server.pubsub_patterns:
# 如果频道和模式相匹配
if match(channel, pubsubPattern.pattern):
# 那么将消息发送给订阅该模式的客户端
send_message(pubsubPattern.client, message)
发送消息的伪代码:
def publish(channel, message):
# 将消息发送给channel频道的所有订阅者
channel_publish(channel, message)
# 将消息发送给所有和channel频道相匹配的模式的订阅者
pattern_publish(channel, message)
查看订阅消息
主要涉及几个命令:
PUBSUB CHANNELS[pattern]
查看所有频道的名称,如果指定到了pattern,返回的将是与模式匹配的频道名称:
redis> PUBSUB CHANNELS
1) "news.it"
2) "news.sport"
3) "news.business"
redis> PUBSUB CHANNELS "news.[is]*"
1) "news.it"
2) "news.sport"
返回这些频道的订阅者数量
PUBSUB NUMSUB[channel-1 channel-2...channel-n]
redis> PUBSUB NUMSUB news.it news.sport news.business news.movie
1) "news.it"
2) "3"
3) "news.sport"
4) "2"
5) "news.business"
6) "2"
7) "news.movie"
8) "1"
PUBSUB NUMPAT子命令用于返回服务器当前被订阅模式的数量:
redis> PUBSUB NUMPAT
(integer) 3
总结
- 服务器状态在pubsub_channels字典保存了所有频道的订阅关系:SUBSCRIBE命令负责将客户端和被订阅的频道关联到这个字典里面,而UNSUBSCRIBE命令则负责解除客户端和被退订频道之间的关联。
- 服务器状态在pubsub_patterns链表保存了所有模式的订阅关系:PSUBSCRIBE命令负责将客户端和被订阅的模式记录到这个链表中,而PUNSUBSCRIBE命令则负责移除客户端和被退订模式在链表中的记录。
- PUBLISH命令通过访问pubsub_channels字典来向频道的所有订阅者发送消息,通过访问pubsub_patterns链表来向所有匹配频道的模式的订阅者发送消息。
- PUBSUB命令的三个子命令都是通过读取pubsub_channels字典和pubsub_patterns链表中的信息来实现的。