Spring Data Redis 2 之消息订阅

  • 抽象出RedisConnection ,RedisConnectionFactory概念,集成了4个redis客户端

  • 提供的RedisTemplate,是一个高层次操作视图

  • 主要提供操作视图,主要包括两大类目:*Operations,Bound*Operations。当然Bound*Operations是对*Operations的简单封装。

  • 提供了Serializers序列工具,主要应用与key,value,hash方面。

本节主要内容,使用SDR工具,完成消息订阅与分发,事务管理,消息管道化。

1.消息订阅与分发

1.1 主要相关命令

Spring Data Redis 2 之消息订阅

命令参照:http://redis.readthedocs.io/en/2.4/pub_sub.html

1.2 Redis消息监听容器声明和消息监听器注册

为了订阅消息,需要实现MessageListener回调,每次新消息到达时,回调被调用,用户代码通过onMessage方法执行。

DefaultMessageListener.java 一个简单的MessageListener实现

1
2
3
4
5
6
7
8
9
public class DefaultMessageListener  implements MessageListener {
    protected final Logger LOGGER = LoggerFactory.getLogger(DefaultMessageListener.class);
    @Override
    public void onMessage(Message message, byte[] pattern) {
        byte[] channel = message.getChannel();
        byte[] body = message.getBody();
        LOGGER.info(new String(channel) + "-->" new String(body) + "-->" new String(pattern));
    }
}

考虑到“等待消息“过程是阻塞的,SDR提供了RedisMessageListenerContainer。

RedisMessageListenerContainer充当消息侦听器容器;它用于从Redis通道接收消息,并驱动注入它的MessageListener,RedisMessageListenerContainer负责消息接收和分派到侦听器中的所有线程的处理。

lettuce-context.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:redis="http://www.springframework.org/schema/redis"
       xmlns:p="http://www.springframework.org/schema/p"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
   http://www.springframework.org/schema/redis http://www.springframework.org/schema/redis/spring-redis.xsd">
<bean id="lettuceConnectionFactory" class="org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory"
       p:port="6379" p:hostName="192.168.163.146"/>
 
<bean id="listener" class="DefaultMessageListener"></bean>
</bean>
<!--注册listener-->
<redis:listener-container connection-factory="lettuceConnectionFactory">
       <redis:listener ref="listener" topic="chatroom"></redis:listener>
</redis:listener-container>
</beans>

当然为了研究方便,当然使用下面的声明方式完全可以代替上面<redis:listener-container>部分,完全等价。

1
2
3
4
5
6
7
8
9
10
11
12
<bean id="redisContainer" class="org.springframework.data.redis.listener.RedisMessageListenerContainer">
    <property name="connectionFactory" ref="lettuceConnectionFactory"/>
    <property name="messageListeners">
        <map>
            <entry key-ref="messageListener">
                <bean class="org.springframework.data.redis.listener.ChannelTopic">
                    <constructor-arg value="chatroom" />
                </bean>
            </entry>
        </map>
    </property>
</bean>

至此,就完成了消息的订阅了。

1.3 MessageListenerAdapter

MessageListenerAdapter类是Spring的异步消息传递支持的最后一个组件:简而言之,它允许您将几乎任何类暴露为MDP。比如上面的DefaultMessageListener,毕竟还是实现了MessageListener接口,并没有完全实现了MDP。可以借助MessageListenerAdapter轻松实现。

简单展示下实现过程

1.3.1 一个完全符合MDP的接口

1
2
3
4
5
6
7
8
public interface MessageDelegate {
//  void handleMessage(String message);
  void handleMessage(Map message);
    void handleMessage(byte[] message);
  void handleMessage(Serializable message);
  // pass the channel/pattern as well
  void handleMessage(Serializable message, String channel);
 }

1.3.2 MDP接口实现类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class DefaultMessageDelegate   implements MessageDelegate {
    protected final Logger LOGGER = LoggerFactory.getLogger(DefaultMessageDelegate.class);
    public void handleMessage(String message) {
        LOGGER.info("--------handleMessage 1-----------");
        LOGGER.info(message);
    }
 
    @Override
    public void handleMessage(Map message) {
        LOGGER.info("--------handleMessage 2-----------");
    }
 
    @Override
    public void handleMessage(byte[] message) {
        LOGGER.info("--------handleMessage 3-----------");
        LOGGER.info(new String(message));
    }
 
    public void handleMessage(Serializable message) {
        LOGGER.info("--------handleMessage 4-----------");
        LOGGER.info(message.toString());
    }
 
    public void handleMessage(Serializable message, String channel) {
        LOGGER.info("--------handleMessage 5-----------");
        LOGGER.info(message + "------>" + channel);
    }
    // implementation elided for clarity...
}

1.3.3配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:redis="http://www.springframework.org/schema/redis"
       xmlns:p="http://www.springframework.org/schema/p"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
   http://www.springframework.org/schema/redis http://www.springframework.org/schema/redis/spring-redis.xsd">
 
       <!-- Jedis ConnectionFactory -->
       <bean id="lettuceConnectionFactory" class="org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory"
              p:port="6379" p:hostName="192.168.163.146"/>
 
       <bean id="messageListener" class="org.springframework.data.redis.listener.adapter.MessageListenerAdapter">
              <constructor-arg>
                     <bean class="DefaultMessageDelegate"/>
              </constructor-arg>
       </bean>
    <bean id="redisContainer" class="org.springframework.data.redis.listener.RedisMessageListenerContainer">
        <property name="connectionFactory" ref="lettuceConnectionFactory"/>
        <property name="messageListeners">
            <map>
                <entry key-ref="messageListener">
                    <bean class="org.springframework.data.redis.listener.ChannelTopic">
                        <constructor-arg value="chatroom" />
                    </bean>
                </entry>
            </map>
        </property>
    </bean>
</beans>

1.4 总结

主要涉及4个组件

MessageListener:Redis中发布的消息的侦听器。

MessageListenerAdapter:消息侦听器适配器,通过反射将消息处理委托给目标侦听器方法,并进行灵活的消息类型转换

ChannelTopic:Channel topic 

RedisMessageListenerContainer:Redis消息侦听器提供异步行为的容器。 处理侦听,转换和消息分派的低级细节

2. 事务管理

redis事务命令,具体见http://redis.readthedocs.io/en/2.4/transaction.html。

首先看一个测试用例

2.1 一个不能运行的事务代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Test
public void testTran(){
        this.redisTemplate.multi();
    try {
        for (int i = 0; i < 1; i++) {
            String key = "key" + i;
            String value = "00000";
            this.redisTemplate.opsForValue().set(key, value);
        }
        this.redisTemplate.exec();
    }catch (Exception e){
        this.redisTemplate.discard();
    }
}

执行代码会报异常。具体异常

Caused by: com.lambdaworks.redis.RedisCommandExecutionException: ERR DISCARD without MULTI
    at com.lambdaworks.redis.LettuceFutures.await(LettuceFutures.java:76)
    at com.lambdaworks.redis.FutureSyncInvocationHandler.handleInvocation(FutureSyncInvocationHandler.java:59)
    at com.google.common.reflect.AbstractInvocationHandler.invoke(AbstractInvocationHandler.java:87)
    at com.sun.proxy.$Proxy15.discard(Unknown Source)
    at org.springframework.data.redis.connection.lettuce.LettuceConnection.discard(LettuceConnection.java:833)

这是因为Redis通过multi,exec和discard命令为事务提供支持。 这些操作在RedisTemplate上可用,但是RedisTemplate不能保证在事务中使用相同的连接执行所有操作。

处理异常

通过设置setEnableTransactionSupport(true)显式为每个正在使用的RedisTemplate启用。 这将强制将正在使用的RedisConnection绑定到触发MULTI的当前线程。 

修改后的代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public void testTran(){
    StringRedisSerializer serializer = new StringRedisSerializer();
    redisTemplate.setKeySerializer(serializer);
    redisTemplate.setValueSerializer(serializer);
        this.redisTemplate.setEnableTransactionSupport(true);
        this.redisTemplate.multi();
    try {
        for (int i = 0; i < 100; i++) {
            String key = "key" + i;
            String value = "00000";
            this.redisTemplate.opsForValue().set(key, value);
        }
        this.redisTemplate.exec();
    }catch (Exception e){
        this.redisTemplate.discard();
    }
}

运行结果

 1) "key95"
  2) "key82"
  3) "key69"
  4) "key90"
  5) "key18"
  6) "key1"
  7) "key4"
  8) "key63"
  9) "key78"

执行结果,也简介认证了redisTemplate 执行结果无序。

2.2 正常事务代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
  @Test
    public void testTran3(){
        StringRedisSerializer serializer = new StringRedisSerializer();
        redisTemplate.setKeySerializer(serializer);
        redisTemplate.setValueSerializer(serializer);
        redisTemplate.execute(new SessionCallback<List<Object>>() {
            @Override
            public  List<Object> execute(RedisOperations operations) throws DataAccessException {
                operations.multi();
                try {
                    for(int i=0;i<10;i++){
                        operations.opsForSet().add("key"+i, "value");
                        if(i>5){
                           throw  new Exception();
                        }
                    }
                    return operations.exec();
                catch (Exception e) {
                    operations.discard();
                }
                // This will contain the results of all ops in the transaction
 
                return Collections.emptyList();
            }
        });
    }
}

3. 管道支持

Redis提供对流水线的支持,这涉及向服务器发送多个命令,而不必等待答复,然后在一个步骤中读取答复。当您需要在一行中发送多个命令时,流水线可以提高性能,例如向同一列表中添加许多元素。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
/**
 *  在管道连接上执行给定的操作对象,返回结果。 
 *  注意,回调不能返回非空值,因为它被管道覆盖。 此方法将使用默认序列化器反序列化结果
 
 * @param action callback object to execute
 * @return list of objects returned by the pipeline
 */
List<Object> executePipelined(RedisCallback<?> action);
 
/**
 *在流水线连接上执行给定的操作对象,使用专用的序列化程序返回结果 
 
 * @param action callback object to execute
 * @param resultSerializer The Serializer to use for individual values or Collections of values. If any returned
 *          values are hashes, this serializer will be used to deserialize both the key and value
 * @return list of objects returned by the pipeline
 */
List<Object> executePipelined(final RedisCallback<?> action, final RedisSerializer<?> resultSerializer);
 
/**
 *在流水线连接上执行给定的Redis会话。 允许事务流水线化。 
 * 注意,回调不能返回非空值,因为它被管道覆盖。
 * @param session Session callback
 * @return list of objects returned by the pipeline
 */
List<Object> executePipelined(final SessionCallback<?> session);
 
/**
 * 在管道连接上执行给定的Redis会话,使用专用的序列化程序返回结果。 允许事务流水线化。 
 * 注意,回调不能返回非空值,因为它被管道覆盖。
 
 * @param session Session callback
 * @param resultSerializer
 * @return list of objects returned by the pipeline
 */
List<Object> executePipelined(final SessionCallback<?> session, final RedisSerializer<?> resultSerializer);


上一篇:【VIP视频网站项目一】搭建视频网站的前台页面(导航栏+轮播图+电影列表+底部友情链接)


下一篇:CSV和集合对象基于Annotation操作封装