发布订阅
-
什么是发布订阅
Redis发布订阅(pub/sub)是一种消息通讯模式:可以指定一个频道(key),发布者(pub)向频道发送消息,订阅者(sub)可以接受该频道的消息。 -
客户端演示
先创建频道名redisWechat
redis 127.0.0.1:6379> SUBSCRIBE redisChat
Reading messages… (press Ctrl-C to quit)
- “subscribe”
- “redisChat”
- (integer) 1
重新开启个 redis 客户端,然后在同一个频道 redisChat 发布两次消息,订阅者就
能接收到消息。
redis 127.0.0.1:6379> PUBLISH redisChat “Redis is a great caching technique”
(integer) 1
redis 127.0.0.1:6379> PUBLISH redisChat “Learn redis by runoob.com”
(integer) 1
- “message”
- “redisChat”
- “Redis is a great caching technique”
- “message”
- “redisChat”
- “Learn redis by runoob.com”
-
Java Api演示
-
引入依赖
-
代码
-
//发布者
public class Publisher extends Thread{
private final JedisPool jedisPool;
public Publisher(JedisPool jedisPool) {
this.jedisPool = jedisPool;
}
@Override
public void run() {
BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
Jedis jedis = jedisPool.getResource();//从连接池子中获取连接
while (true){
try {
String input = reader.readLine();
if (!"quit".equals(input)){
jedis.publish("mychannel",input);
}else{
break;
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
//订阅者
public class Subscriber extends JedisPubSub {
@Override
//收到消息会调用这个方法
public void onMessage(String channel, String message) {
System.out.println(String.format("receive one message from channel %s,message is %s",channel,message));
}
@Override
//订阅频道时会调用
public void onSubscribe(String channel, int subscribedChannels) {
System.out.println(String.format("subscribe redis channel success, channel %s, subscribedChannels %d",
channel, subscribedChannels));
}
@Override
//取消订阅时会调用
public void onUnsubscribe(String channel, int subscribedChannels) {
System.out.println(String.format("unsubscribe redis channel, channel %s, subscribedChannels %d",
channel, subscribedChannels));
}
}
//订阅频道
public class SubThread extends Thread{
private final JedisPool jedisPool;
private final Subscriber subscriber = new Subscriber();
private final String channel = "mychannel";
public SubThread(JedisPool jedisPool) {
super("SubThread");
this.jedisPool = jedisPool;
}
@Override
public void run() {
System.out.println(String.format("subscribe redis, channel %s, thread will be blocked", channel));
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
//通过subscriber的api去订阅,入参是订阅者和频道名
jedis.subscribe(subscriber, channel);
} catch (Exception e) {
e.printStackTrace();
System.out.println(String.format("subsrcibe channel error, %s", e));
} finally {
jedis.close();
}
}
}
//测试类
public class PubSubDemo {
public static void main(String[] args) {
// 连接redis服务端
JedisPool jedisPool = new JedisPool(new JedisPoolConfig(), "119.23.69.255", 6379);
SubThread subThread1 = new SubThread(jedisPool); //订阅者1
subThread1.start();
SubThread subThread2 = new SubThread(jedisPool); //订阅者2
subThread2.start();
Publisher publisher = new Publisher(jedisPool); //发布者
publisher.start();
}
}
- 运行结果
- 使用场景
1 即时聊天
2 微博订阅,推送
3 异步处理等…
- Redis发布订阅和rabbitmq的区别
可靠性 :
redis: 如果发布一条消息,没有订阅者的话,这条消息就会丢失,不会存在内存中
rabbitmq: 如果发布一条消息,如果没有消费者消费该队列,那么这条消息就会一直存在队列中
消费者负载均衡
rabbitmq:队列可以被多个消费者同时监控消费,但是每一条消息只能被消费一次,由于rabbitmq的消费确 认机制,因此它能够根据消费者的消费能力而调整它的负载;
redis: 发布订阅模式,一个队列可以被多个消费者同时订阅,当有消息到达时,会将该消息依次发送给每个订 阅者;
队列监控
rabbitmq : 实现了后台监控平台,可以在该平台上看到所有创建的队列的详细情况,良好的后台管理平台可以 方便我们更好的使用;
redis : 没有所谓的监控平台
总结
redis: 轻量级,低延迟,高并发,低可靠性;
rabbitmq:重量级,高可靠,异步,不保证实时;
rabbitmq是一个专门的AMQP协议队列,他的优势就在于提供可靠的队列服务,并且可做到异步,而redis主 要是用于缓存的,redis的发布订阅模块,可用于实现及时性,且可靠性低的功能。