• 抽象出RedisConnection ,RedisConnectionFactory概念,集成了4个redis客户端

  • 提供的RedisTemplate,是一个高层次操作视图

  • 主要提供操作视图,主要包括两大类目:*Operations,Bound*Operations。当然Bound*Operations是对*Operations的简单封装。

  • 提供了Serializers序列工具,主要应用与key,value,hash方面。

本节主要内容,使用SDR工具,完成消息订阅与分发,事务管理,消息管道化。

1.消息订阅与分发

1.1 主要相关命令

uqENje.jpg%21web

命令参照:http://redis.readthedocs.io/en/2.4/pub_sub.html

1.2 Redis消息监听容器声明和消息监听器注册

为了订阅消息,需要实现MessageListener回调,每次新消息到达时,回调被调用,用户代码通过onMessage方法执行。

DefaultMessageListener.java 一个简单的MessageListener实现

1
2
3
4
5
6
7
8
9
public  class  DefaultMessageListener   implements  MessageListener {
     protected  final  Logger LOGGER = LoggerFactory.getLogger(DefaultMessageListener. class );
     @Override
     public  void  onMessage(Message message,  byte [] pattern) {
         byte [] channel = message.getChannel();
         byte [] body = message.getBody();
         LOGGER.info( new  String(channel) +  "-->"  new  String(body) +  "-->"  new  String(pattern));
     }
}

考虑到“等待消息“过程是阻塞的,SDR提供了RedisMessageListenerContainer。

RedisMessageListenerContainer充当消息侦听器容器;它用于从Redis通道接收消息,并驱动注入它的MessageListener,RedisMessageListenerContainer负责消息接收和分派到侦听器中的所有线程的处理。

lettuce-context.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<? xml  version = "1.0"  encoding = "UTF-8" ?>
< beans  xmlns = "http://www.springframework.org/schema/beans"
        xmlns:xsi = "http://www.w3.org/2001/XMLSchema-instance"
        xmlns:redis = "http://www.springframework.org/schema/redis"
        xmlns:p = "http://www.springframework.org/schema/p"
        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/redis http://www.springframework.org/schema/redis/spring-redis.xsd">
< bean  id = "lettuceConnectionFactory"  class = "org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory"
        p:port = "6379"  p:hostName = "192.168.163.146" />
 
< bean  id = "listener"  class = "DefaultMessageListener" ></ bean >
</ bean >
<!--注册listener-->
< redis:listener-container  connection-factory = "lettuceConnectionFactory" >
        < redis:listener  ref = "listener"  topic = "chatroom" ></ redis:listener >
</ redis:listener-container >
</ beans >

当然为了研究方便,当然使用下面的声明方式完全可以代替上面<redis:listener-container>部分,完全等价。

1
2
3
4
5
6
7
8
9
10
11
12
< bean  id = "redisContainer"  class = "org.springframework.data.redis.listener.RedisMessageListenerContainer" >
     < property  name = "connectionFactory"  ref = "lettuceConnectionFactory" />
     < property  name = "messageListeners" >
         < map >
             < entry  key-ref = "messageListener" >
                 < bean  class = "org.springframework.data.redis.listener.ChannelTopic" >
                     < constructor-arg  value = "chatroom"  />
                 </ bean >
             </ entry >
         </ map >
     </ property >
</ bean >

至此,就完成了消息的订阅了。

1.3 MessageListenerAdapter

MessageListenerAdapter类是Spring的异步消息传递支持的最后一个组件:简而言之,它允许您将几乎任何类暴露为MDP。比如上面的DefaultMessageListener,毕竟还是实现了MessageListener接口,并没有完全实现了MDP。可以借助MessageListenerAdapter轻松实现。

简单展示下实现过程

1.3.1 一个完全符合MDP的接口

1
2
3
4
5
6
7
8
public  interface  MessageDelegate {
//  void handleMessage(String message);
   void  handleMessage(Map message);
     void  handleMessage( byte [] message);
   void  handleMessage(Serializable message);
   // pass the channel/pattern as well
   void  handleMessage(Serializable message, String channel);
  }

1.3.2 MDP接口实现类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public  class  DefaultMessageDelegate    implements  MessageDelegate {
     protected  final  Logger LOGGER = LoggerFactory.getLogger(DefaultMessageDelegate. class );
     public  void  handleMessage(String message) {
         LOGGER.info( "--------handleMessage 1-----------" );
         LOGGER.info(message);
     }
 
     @Override
     public  void  handleMessage(Map message) {
         LOGGER.info( "--------handleMessage 2-----------" );
     }
 
     @Override
     public  void  handleMessage( byte [] message) {
         LOGGER.info( "--------handleMessage 3-----------" );
         LOGGER.info( new  String(message));
     }
 
     public  void  handleMessage(Serializable message) {
         LOGGER.info( "--------handleMessage 4-----------" );
         LOGGER.info(message.toString());
     }
 
     public  void  handleMessage(Serializable message, String channel) {
         LOGGER.info( "--------handleMessage 5-----------" );
         LOGGER.info(message +  "------>"  + channel);
     }
     // implementation elided for clarity...
}

1.3.3配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
< beans  xmlns = "http://www.springframework.org/schema/beans"
        xmlns:xsi = "http://www.w3.org/2001/XMLSchema-instance"
        xmlns:redis = "http://www.springframework.org/schema/redis"
        xmlns:p = "http://www.springframework.org/schema/p"
        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/redis http://www.springframework.org/schema/redis/spring-redis.xsd">
 
        <!-- Jedis ConnectionFactory -->
        < bean  id = "lettuceConnectionFactory"  class = "org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory"
               p:port = "6379"  p:hostName = "192.168.163.146" />
 
        < bean  id = "messageListener"  class = "org.springframework.data.redis.listener.adapter.MessageListenerAdapter" >
               < constructor-arg >
                      < bean  class = "DefaultMessageDelegate" />
               </ constructor-arg >
        </ bean >
     < bean  id = "redisContainer"  class = "org.springframework.data.redis.listener.RedisMessageListenerContainer" >
         < property  name = "connectionFactory"  ref = "lettuceConnectionFactory" />
         < property  name = "messageListeners" >
             < map >
                 < entry  key-ref = "messageListener" >
                     < bean  class = "org.springframework.data.redis.listener.ChannelTopic" >
                         < constructor-arg  value = "chatroom"  />
                     </ bean >
                 </ entry >
             </ map >
         </ property >
     </ bean >
</ beans >

1.4 总结

主要涉及4个组件

MessageListener:Redis中发布的消息的侦听器。

MessageListenerAdapter:消息侦听器适配器,通过反射将消息处理委托给目标侦听器方法,并进行灵活的消息类型转换

ChannelTopic:Channel topic 

RedisMessageListenerContainer:Redis消息侦听器提供异步行为的容器。 处理侦听,转换和消息分派的低级细节

2. 事务管理

redis事务命令,具体见http://redis.readthedocs.io/en/2.4/transaction.html。

首先看一个测试用例

2.1 一个不能运行的事务代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Test
public  void  testTran(){
         this .redisTemplate.multi();
     try  {
         for  ( int  i =  0 ; i <  1 ; i++) {
             String key =  "key"  + i;
             String value =  "00000" ;
             this .redisTemplate.opsForValue().set(key, value);
         }
         this .redisTemplate.exec();
     } catch  (Exception e){
         this .redisTemplate.discard();
     }
}

执行代码会报异常。具体异常

Caused by: com.lambdaworks.redis.RedisCommandExecutionException: ERR DISCARD without MULTI
    at com.lambdaworks.redis.LettuceFutures.await(LettuceFutures.java:76)
    at com.lambdaworks.redis.FutureSyncInvocationHandler.handleInvocation(FutureSyncInvocationHandler.java:59)
    at com.google.common.reflect.AbstractInvocationHandler.invoke(AbstractInvocationHandler.java:87)
    at com.sun.proxy.$Proxy15.discard(Unknown Source)
    at org.springframework.data.redis.connection.lettuce.LettuceConnection.discard(LettuceConnection.java:833)

这是因为Redis通过multi,exec和discard命令为事务提供支持。 这些操作在RedisTemplate上可用,但是RedisTemplate不能保证在事务中使用相同的连接执行所有操作。

处理异常

通过设置setEnableTransactionSupport(true)显式为每个正在使用的RedisTemplate启用。 这将强制将正在使用的RedisConnection绑定到触发MULTI的当前线程。 

修改后的代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public  void  testTran(){
     StringRedisSerializer serializer =  new  StringRedisSerializer();
     redisTemplate.setKeySerializer(serializer);
     redisTemplate.setValueSerializer(serializer);
         this .redisTemplate.setEnableTransactionSupport( true );
         this .redisTemplate.multi();
     try  {
         for  ( int  i =  0 ; i <  100 ; i++) {
             String key =  "key"  + i;
             String value =  "00000" ;
             this .redisTemplate.opsForValue().set(key, value);
         }
         this .redisTemplate.exec();
     } catch  (Exception e){
         this .redisTemplate.discard();
     }
}

运行结果

 1) "key95"
  2) "key82"
  3) "key69"
  4) "key90"
  5) "key18"
  6) "key1"
  7) "key4"
  8) "key63"
  9) "key78"

执行结果,也简介认证了redisTemplate 执行结果无序。

2.2 正常事务代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
   @Test
     public  void  testTran3(){
         StringRedisSerializer serializer =  new  StringRedisSerializer();
         redisTemplate.setKeySerializer(serializer);
         redisTemplate.setValueSerializer(serializer);
         redisTemplate.execute( new  SessionCallback<List<Object>>() {
             @Override
             public   List<Object> execute(RedisOperations operations)  throws  DataAccessException {
                 operations.multi();
                 try  {
                     for ( int  i= 0 ;i< 10 ;i++){
                         operations.opsForSet().add( "key" +i,  "value" );
                         if (i> 5 ){
                            throw   new  Exception();
                         }
                     }
                     return  operations.exec();
                 catch  (Exception e) {
                     operations.discard();
                 }
                 // This will contain the results of all ops in the transaction
 
                 return  Collections.emptyList();
             }
         });
     }
}

3. 管道支持

Redis提供对流水线的支持,这涉及向服务器发送多个命令,而不必等待答复,然后在一个步骤中读取答复。当您需要在一行中发送多个命令时,流水线可以提高性能,例如向同一列表中添加许多元素。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
/**
  *  在管道连接上执行给定的操作对象,返回结果。 
  *  注意,回调不能返回非空值,因为它被管道覆盖。 此方法将使用默认序列化器反序列化结果
 
  * @param action callback object to execute
  * @return list of objects returned by the pipeline
  */
List<Object> executePipelined(RedisCallback<?> action);
 
/**
  *在流水线连接上执行给定的操作对象,使用专用的序列化程序返回结果 
 
  * @param action callback object to execute
  * @param resultSerializer The Serializer to use for individual values or Collections of values. If any returned
  *          values are hashes, this serializer will be used to deserialize both the key and value
  * @return list of objects returned by the pipeline
  */
List<Object> executePipelined( final  RedisCallback<?> action,  final  RedisSerializer<?> resultSerializer);
 
/**
  *在流水线连接上执行给定的Redis会话。 允许事务流水线化。 
  * 注意,回调不能返回非空值,因为它被管道覆盖。
  * @param session Session callback
  * @return list of objects returned by the pipeline
  */
List<Object> executePipelined( final  SessionCallback<?> session);
 
/**
  * 在管道连接上执行给定的Redis会话,使用专用的序列化程序返回结果。 允许事务流水线化。 
  * 注意,回调不能返回非空值,因为它被管道覆盖。
 
  * @param session Session callback
  * @param resultSerializer
  * @return list of objects returned by the pipeline
  */
List<Object> executePipelined( final  SessionCallback<?> session,  final  RedisSerializer<?> resultSerializer);