Redis同样支持消息的发布/订阅(Pub/Sub)模式,这和中间件activemq有些类似。订阅者(Subscriber)可以订阅自己感兴趣的频道(Channel),发布者(Publisher)可以将消息发往指定的频道(Channel),正式通过这种方式,可以将消息的发送者和接收者解耦。另外,由于可以动态的Subscribe和Unsubscribe,也可以提高系统的灵活性和可扩展性。
关于如何搭建Redis环境,请参考其他文章。这里假设有一个可用的Redis环境(单节点和集群均可)。
在redis-cli中使用Pub/Sub
普通channel的Pub/Sub
先用一个客户端来订阅频道:
上图中先使用redis-cli作为客户端连接了Redis,之后使用了SUBSCRIBE命令,后面的参数表示订阅了china和hongkong两个channel。可以看到"SUBSCRIBE china hongkong"这条命令的输出是6行(可以分为2组,每一组是一个Message)。因为订阅、取消订阅的操作跟发布的消息都是通过消息(Message)的方式发送的,消息的第一个元素就是消息类型,它可以是以下几种类型:
subscribe: means that we successfully subscribed to the channel given as the second element in the reply. The third argument represents the number of channels we are currently subscribed to.
unsubscribe: means that we successfully unsubscribed from the channel given as second element in the reply. The third argument represents the number of channels we are currently subscribed to. When the last argument is zero, we are no longer subscribed to any channel, and the client can issue any kind of Redis command as we are outside the Pub/Sub state.
message: it is a message received as result of a PUBLISH command issued by another client. The second element is the name of the originating channel, and the third argument is the actual message payload.
--from http://redis.io/topics/pubsub
上图的订阅命令将使得发往这两个channel的消息会被这个客户端接收到。需要注意的是,redis-cli客户端在进入subscribe模式以后,将不能再响应其他的任何命令:
A client subscribed to one or more channels should not issue commands, although it can subscribe and unsubscribe to and from other channels.
The commands that are allowed in the context of a subscribed client are SUBSCRIBE, PSUBSCRIBE, UNSUBSCRIBE, PUNSUBSCRIBE, PING and QUIT
--from http://redis.io/topics/pubsub
官网说客户端在subscribe下除了可以使用以上命令外,不能使用其他命令了。但是本人在Subscribe状态下使用上述几个命令,根本没反应。也就是说,使用redis-cli订阅channel后,该客户端将不能响应任何命令。除非按下(ctrl+c),但该操作不是取消订阅,而是退出redis-cli,此时将回到shell命令行下。
关于这个情况,我在官网上没有找到对这种情况的解释,也有不少的人在网上问,找来找去,本人觉得还算合理的解释是:
On this page: http://redis.io/commands/subscribe applies only to those clients.
The redis-cli is among those clients. So, the comment is not an instruction for users of redis-cli.
Instead, redis-cli blocks waiting for messages on the bus (only to be unsubcribed via a ctrl+c).
--from http://*.com/questions/17621371/redis-unsubscribe
就是说,官网中说明的client,并不包含这里使用的redis-cli,于是它可以和其他的client有不同表现。(先不纠结这个问题,稍后再用jedis来测试一下。)
接下来再用一个客户端来发布消息:
可以看到,新的一个客户端使用PUBLISH命令往china频道发布了一条叫"China News"的消息,接下来再看看订阅端:
可以看见,这条消息已经被接收到了。可以看到,收到的消息中第一个参数是类型"message",第二个参数是channel名字"china",第三个参数是消息内容"China News",这和开始说的message类型的结构一致。
通配符的Pub/Sub
Redis还支持通配符的订阅和发布。客户端可以订阅满足一个或多个规则的channel消息,相应的命令是PSUBSCRIBE和PUNSUBSCRIBE。接下来我们再用另一个redis-cli客户端来订阅"chi*"的channel,如图:
和subscribe/unsubscribe的输出类似,可以看到第一部分是消息类型“psubscribe”,第二部分是订阅的规则“chi*”,第三部分则是该客户端目前订阅的所有规则个数。
接下来再发布一条消息到china这个channel中,此时,两个订阅者应该都能收到该消息:
实际测试结果跟预期相同。需要注意的是,订阅者2通过通配符订阅的,收到的消息类型是“pmessage”:
pmessage: it is a message received as result of a PUBLISH command issued by another client, matching a pattern-matching subscription. The second element is the original pattern matched, the third element is the name of the originating channel, and the last element the actual message payload.
--from http://redis.io/topics/pubsub
第二部分是匹配的模式“chi*”,第三部分是实际的channel名字“china”,第四部分是消息内容“China Daily”。
我们再发布一条消息到chinnna中,此时只有订阅者2能接收到消息了:
同样,在使用PSUBSCRIBE进入订阅模式以后,该redis-cli也不能再监听其他任何的命令,要退出该模式,只能使用ctrl+c。
使用Jedis实现Pub/Sub
Jedis是Redis客户端的一种Java实现,在http://redis.io/clients#java中也能找到。
这里使用maven来管理包的依赖,由于使用了Log4j来输出日志,因此会用到log4j的jar包:
1
2
3
4
5
6
7
8
9
10
|
< dependency >
< groupId >redis.clients</ groupId >
< artifactId >jedis</ artifactId >
< version >2.8.0</ version >
</ dependency >
< dependency >
< groupId >log4j</ groupId >
< artifactId >log4j</ artifactId >
< version >1.2.17</ version >
</ dependency >
|
Jedis中的JedisPubSub抽象类提供了订阅和取消的功能。想处理订阅和取消订阅某些channel的相关事件,我们得扩展JedisPubSub类并实现相关的方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
package com.demo.redis;
import org.apache.log4j.Logger;
import redis.clients.jedis.JedisPubSub;
public class Subscriber extends JedisPubSub { //注意这里继承了抽象类JedisPubSub
private static final Logger LOGGER = Logger.getLogger(Subscriber. class );
@Override
public void onMessage(String channel, String message) {
LOGGER.info(String.format( "Message. Channel: %s, Msg: %s" , channel, message));
}
@Override
public void onPMessage(String pattern, String channel, String message) {
LOGGER.info(String.format( "PMessage. Pattern: %s, Channel: %s, Msg: %s" ,
pattern, channel, message));
}
@Override
public void onSubscribe(String channel, int subscribedChannels) {
LOGGER.info( "onSubscribe" );
}
@Override
public void onUnsubscribe(String channel, int subscribedChannels) {
LOGGER.info( "onUnsubscribe" );
}
@Override
public void onPUnsubscribe(String pattern, int subscribedChannels) {
LOGGER.info( "onPUnsubscribe" );
}
@Override
public void onPSubscribe(String pattern, int subscribedChannels) {
LOGGER.info( "onPSubscribe" );
}
} |
有了订阅者,我们还需要一个发布者:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
|
package com.demo.redis;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import org.apache.log4j.Logger;
import redis.clients.jedis.Jedis;
public class Publisher {
private static final Logger LOGGER = Logger.getLogger(Publisher. class );
private final Jedis publisherJedis;
private final String channel;
public Publisher(Jedis publisherJedis, String channel) {
this .publisherJedis = publisherJedis;
this .channel = channel;
}
/**
* 不停的读取输入,然后发布到channel上面,遇到quit则停止发布。
*/
public void startPublish() {
LOGGER.info( "Type your message (quit for terminate)" );
try {
BufferedReader reader = new BufferedReader( new InputStreamReader(System.in));
while ( true ) {
String line = reader.readLine();
if (! "quit" .equals(line)) {
publisherJedis.publish(channel, line);
} else {
break ;
}
}
} catch (IOException e) {
LOGGER.error( "IO failure while reading input" , e);
}
}
} |
为简单起见,这个发布者接收控制台的输入,然后将输入的消息发布到指定的channel上面,如果输入quit,则停止发布消息。
接下来是主函数:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
|
package com.demo.redis;
import org.apache.log4j.Logger;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
public class Program {
public static final String CHANNEL_NAME = "MyChannel" ;
//我这里的Redis是一个集群,192.168.56.101和192.168.56.102都可以使用
public static final String REDIS_HOST = "192.168.56.101" ;
public static final int REDIS_PORT = 7000 ;
private final static Logger LOGGER = Logger.getLogger(Program. class );
private final static JedisPoolConfig POOL_CONFIG = new JedisPoolConfig();
private final static JedisPool JEDIS_POOL =
new JedisPool(POOL_CONFIG, REDIS_HOST, REDIS_PORT, 0 );
public static void main(String[] args) throws Exception {
final Jedis subscriberJedis = JEDIS_POOL.getResource();
final Jedis publisherJedis = JEDIS_POOL.getResource();
final Subscriber subscriber = new Subscriber();
//订阅线程:接收消息
new Thread( new Runnable() {
public void run() {
try {
LOGGER.info( "Subscribing to \"MyChannel\". This thread will be blocked." );
//使用subscriber订阅CHANNEL_NAME上的消息,这一句之后,线程进入订阅模式,阻塞。
subscriberJedis.subscribe(subscriber, CHANNEL_NAME);
//当unsubscribe()方法被调用时,才执行以下代码
LOGGER.info( "Subscription ended." );
} catch (Exception e) {
LOGGER.error( "Subscribing failed." , e);
}
}
}).start();
//主线程:发布消息到CHANNEL_NAME频道上
new Publisher(publisherJedis, CHANNEL_NAME).startPublish();
publisherJedis.close();
//Unsubscribe
subscriber.unsubscribe();
subscriberJedis.close();
}
} |
主类Program中定义了channel名字、连接redis的地址和端口,并使用JedisPool来获取Jedis实例。由于订阅者(subscriber)在进入订阅状态后会阻塞线程,因此新起一个线程(new Thread())作为订阅线程,并是用主线程来发布消息。待发布者(类中的new Publisher)停止发布消息(控制台中输入quit即可)时,解除订阅者的订阅(subscriber.unsubscribe()方法)。此时订阅线程解除阻塞,打印结束的日志并退出。
运行程序之前,还需要一个简单的log4j配置以观察输出:
1
2
3
4
5
|
log4j.rootLogger=INFO,stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d{HH:mm:ss} %m%n |
运行Program,以下是执行结果:
从结果看,当订阅者订阅后,订阅线程阻塞,主线程中的Publisher接收输入后,发布消息到MyChannel中,此时订阅该channel的订阅者收到消息并打印。
Jedis源码简要分析
关于使用UNSUBSCRIBE
开始使用redis-cli时,在subscriber进入监听状态后,并不能使用UNSUBSCRIBE和PUNSUBSCRIBE命令,现在在Jedis中,在订阅线程阻塞时,通过在main线程中调用改subscriber的unsubscribe()方法来解除阻塞。查看Jedis源码,其实该方法也就是给redis发送了一个UNSUBSCRIBE命令而已:
因此这里是支持在“客户端”使用UNSUBSCRIBE命令的。
关于订阅者接收消息
在接收消息前,需要订阅channel,订阅完成之后,会执行一个循环,这个循环会一直阻塞,直到该Client没有订阅数为止,如下图:
中间省略的其他行,主要是用于解析收到的Redis响应,这段代码也是根据响应的第一部分确定响应的消息类型,然后挨个解析响应的后续内容,最后根据解析到消息类型,并使用后续解析到的内容作为参数来回调相应的方法,省略的内容如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
|
final byte [] resp = ( byte []) firstObj;
if (Arrays.equals(SUBSCRIBE.raw, resp)) {
subscribedChannels = ((Long) reply.get( 2 )).intValue();
final byte [] bchannel = ( byte []) reply.get( 1 );
final String strchannel = (bchannel == null ) ? null : SafeEncoder.encode(bchannel);
//调用onSubscribe方法,该方法在我们的Subscriber类中实现
onSubscribe(strchannel, subscribedChannels);
} else if (Arrays.equals(UNSUBSCRIBE.raw, resp)) {
subscribedChannels = ((Long) reply.get( 2 )).intValue();
final byte [] bchannel = ( byte []) reply.get( 1 );
final String strchannel = (bchannel == null ) ? null : SafeEncoder.encode(bchannel);
//调用onUnsubscribe方法,该方法在我们的Subscriber类中实现
onUnsubscribe(strchannel, subscribedChannels);
} else if (Arrays.equals(MESSAGE.raw, resp)) {
final byte [] bchannel = ( byte []) reply.get( 1 );
final byte [] bmesg = ( byte []) reply.get( 2 );
final String strchannel = (bchannel == null ) ? null : SafeEncoder.encode(bchannel);
final String strmesg = (bmesg == null ) ? null : SafeEncoder.encode(bmesg);
//调用onMessage方法,该方法在我们的Subscriber类中实现
onMessage(strchannel, strmesg);
} else if (Arrays.equals(PMESSAGE.raw, resp)) {
final byte [] bpattern = ( byte []) reply.get( 1 );
final byte [] bchannel = ( byte []) reply.get( 2 );
final byte [] bmesg = ( byte []) reply.get( 3 );
final String strpattern = (bpattern == null ) ? null : SafeEncoder.encode(bpattern);
final String strchannel = (bchannel == null ) ? null : SafeEncoder.encode(bchannel);
final String strmesg = (bmesg == null ) ? null : SafeEncoder.encode(bmesg);
//调用onPMessage方法,该方法在我们的Subscriber类中实现
onPMessage(strpattern, strchannel, strmesg);
} else if (Arrays.equals(PSUBSCRIBE.raw, resp)) {
subscribedChannels = ((Long) reply.get( 2 )).intValue();
final byte [] bpattern = ( byte []) reply.get( 1 );
final String strpattern = (bpattern == null ) ? null : SafeEncoder.encode(bpattern);
onPSubscribe(strpattern, subscribedChannels);
} else if (Arrays.equals(PUNSUBSCRIBE.raw, resp)) {
subscribedChannels = ((Long) reply.get( 2 )).intValue();
final byte [] bpattern = ( byte []) reply.get( 1 );
final String strpattern = (bpattern == null ) ? null : SafeEncoder.encode(bpattern);
//调用onPUnsubscribe方法,该方法在我们的Subscriber类中实现
onPUnsubscribe(strpattern, subscribedChannels);
} else {
//对于其他Redis没有定义的返回消息类型,则直接报错
throw new JedisException( "Unknown message type: " + firstObj);
} |
以上就是为什么我们需要在Subscriber中实现这几个方法的原因了(这些方法并不是抽象的,可以选择实现使用到的方法)。