Redis应用场景很多,现在介绍一下它的几大特性之一 发布订阅(pub/sub)
特性介绍:
什么是redis的发布订阅(pub/sub)? Pub/Sub功能(means Publish, Subscribe)即发布及订阅功能。基于事件的系统中,Pub/Sub是目前广泛使用的通信模型,它采用事件作为基本的通信机制,提供大规模系统所要求的松散耦合的交互模式:订阅者(如客户端)以事件订阅的方式表达出它有兴趣接收的一个事件或一类事件;发布者(如服务器)可将订阅者感兴趣的事件随时通知相关订阅者。熟悉设计模式的朋友应该了解这与23种设计模式中的观察者模式极为相似。
同样,Redis的pub/sub是一种消息通信模式,主要的目的是解除消息发布者和消息订阅者之间的耦合, Redis作为一个pub/sub的server, 在订阅者和发布者之间起到了消息路由的功能。
上面的都是概念,不知道没关系,其实我也看不懂。
简单来讲,这里面还有个channel的概念,这里就是频道的意思,比如你订阅了银行的频道,当你的资金发生变动时,银行就会通过它的频道给你发送信息,在这里,你是属于被动接收的,而不是向银行索要信息,这个例子中,你就是sub(订阅者),而银行就是pub(发布者)。
代码:
先引入依赖
- <dependency>
- <groupId>redis.clients</groupId>
- <artifactId>jedis</artifactId>
- <version>2.9.0</version>
- </dependency>
新建一个发布者
- public class Publisher extends Thread{
- private final JedisPool jedisPool;
- public Publisher(JedisPool jedisPool) {
- this.jedisPool = jedisPool;
- }
- @Override
- public void run() {
- BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
- Jedis jedis = jedisPool.getResource(); //连接池中取出一个连接
- while (true) {
- String line = null;
- try {
- line = reader.readLine();
- if (!"quit".equals(line)) {
- jedis.publish("mychannel", line); //从 mychannel 的频道上推送消息
- } else {
- break;
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
- }
新建一个订阅者
- public class Subscriber extends JedisPubSub {
- public Subscriber(){}
- @Override
- public void onMessage(String channel, String message) { //收到消息会调用
- System.out.println(String.format("receive redis published message, channel %s, message %s", channel, message));
- }
- @Override
- public void onSubscribe(String channel, int subscribedChannels) { //订阅了频道会调用
- System.out.println(String.format("subscribe redis channel success, channel %s, subscribedChannels %d",
- channel, subscribedChannels));
- }
- @Override
- public void onUnsubscribe(String channel, int subscribedChannels) { //取消订阅 会调用
- System.out.println(String.format("unsubscribe redis channel, channel %s, subscribedChannels %d",
- channel, subscribedChannels));
- }
- }
订阅频道
- public class SubThread extends Thread {
- private final JedisPool jedisPool;
- private final Subscriber subscriber = new Subscriber();
- private final String channel = "mychannel";
- public SubThread(JedisPool jedisPool) {
- super("SubThread");
- this.jedisPool = jedisPool;
- }
- @Override
- public void run() {
- System.out.println(String.format("subscribe redis, channel %s, thread will be blocked", channel));
- Jedis jedis = null;
- try {
- jedis = jedisPool.getResource(); //取出一个连接
- jedis.subscribe(subscriber, channel); //通过subscribe 的api去订阅,入参是订阅者和频道名
- } catch (Exception e) {
- System.out.println(String.format("subsrcibe channel error, %s", e));
- } finally {
- if (jedis != null) {
- jedis.close();
- }
- }
- }
- }
测试下
- public class PubSubDemo {
- public static void main( String[] args )
- {
- // 连接redis服务端
- JedisPool jedisPool = new JedisPool(new JedisPoolConfig(), "172.31.12.18", 7379);
- System.out.println(String.format("redis pool is starting, redis ip %s, redis port %d", "127.0.0.1", 7379));
- SubThread subThread = new SubThread(jedisPool); //订阅者
- subThread.start();
- Publisher publisher = new Publisher(jedisPool); //发布者
- publisher.start();
- }
- }
打印信息