抽象出RedisConnection ,RedisConnectionFactory概念,集成了4个redis客户端
提供的RedisTemplate,是一个高层次操作视图
主要提供操作视图,主要包括两大类目:*Operations,Bound*Operations。当然Bound*Operations是对*Operations的简单封装。
提供了Serializers序列工具,主要应用与key,value,hash方面。
本节主要内容,使用SDR工具,完成消息订阅与分发,事务管理,消息管道化。
1.消息订阅与分发
1.1 主要相关命令
命令参照:http://redis.readthedocs.io/en/2.4/pub_sub.html
1.2 Redis消息监听容器声明和消息监听器注册
为了订阅消息,需要实现MessageListener回调,每次新消息到达时,回调被调用,用户代码通过onMessage方法执行。
DefaultMessageListener.java 一个简单的MessageListener实现
1
2
3
4
5
6
7
8
9
|
public
class
DefaultMessageListener
implements
MessageListener {
protected
final
Logger LOGGER = LoggerFactory.getLogger(DefaultMessageListener.
class
);
@Override
public
void
onMessage(Message message,
byte
[] pattern) {
byte
[] channel = message.getChannel();
byte
[] body = message.getBody();
LOGGER.info(
new
String(channel) +
"-->"
+
new
String(body) +
"-->"
+
new
String(pattern));
}
}
|
考虑到“等待消息“过程是阻塞的,SDR提供了RedisMessageListenerContainer。
RedisMessageListenerContainer充当消息侦听器容器;它用于从Redis通道接收消息,并驱动注入它的MessageListener,RedisMessageListenerContainer负责消息接收和分派到侦听器中的所有线程的处理。
lettuce-context.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
<?
xml
version
=
"1.0"
encoding
=
"UTF-8"
?>
<
beans
xmlns
=
"http://www.springframework.org/schema/beans"
xmlns:xsi
=
"http://www.w3.org/2001/XMLSchema-instance"
xmlns:redis
=
"http://www.springframework.org/schema/redis"
xmlns:p
=
"http://www.springframework.org/schema/p"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/redis http://www.springframework.org/schema/redis/spring-redis.xsd">
<
bean
id
=
"lettuceConnectionFactory"
class
=
"org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory"
p:port
=
"6379"
p:hostName
=
"192.168.163.146"
/>
<
bean
id
=
"listener"
class
=
"DefaultMessageListener"
></
bean
>
</
bean
>
<!--注册listener-->
<
redis:listener-container
connection-factory
=
"lettuceConnectionFactory"
>
<
redis:listener
ref
=
"listener"
topic
=
"chatroom"
></
redis:listener
>
</
redis:listener-container
>
</
beans
>
|
当然为了研究方便,当然使用下面的声明方式完全可以代替上面<redis:listener-container>部分,完全等价。
1
2
3
4
5
6
7
8
9
10
11
12
|
<
bean
id
=
"redisContainer"
class
=
"org.springframework.data.redis.listener.RedisMessageListenerContainer"
>
<
property
name
=
"connectionFactory"
ref
=
"lettuceConnectionFactory"
/>
<
property
name
=
"messageListeners"
>
<
map
>
<
entry
key-ref
=
"messageListener"
>
<
bean
class
=
"org.springframework.data.redis.listener.ChannelTopic"
>
<
constructor-arg
value
=
"chatroom"
/>
</
bean
>
</
entry
>
</
map
>
</
property
>
</
bean
>
|
至此,就完成了消息的订阅了。
1.3 MessageListenerAdapter
MessageListenerAdapter类是Spring的异步消息传递支持的最后一个组件:简而言之,它允许您将几乎任何类暴露为MDP。比如上面的DefaultMessageListener,毕竟还是实现了MessageListener接口,并没有完全实现了MDP。可以借助MessageListenerAdapter轻松实现。
简单展示下实现过程
1.3.1 一个完全符合MDP的接口
1
2
3
4
5
6
7
8
|
public
interface
MessageDelegate {
// void handleMessage(String message);
void
handleMessage(Map message);
void
handleMessage(
byte
[] message);
void
handleMessage(Serializable message);
// pass the channel/pattern as well
void
handleMessage(Serializable message, String channel);
}
|
1.3.2 MDP接口实现类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
public
class
DefaultMessageDelegate
implements
MessageDelegate {
protected
final
Logger LOGGER = LoggerFactory.getLogger(DefaultMessageDelegate.
class
);
public
void
handleMessage(String message) {
LOGGER.info(
"--------handleMessage 1-----------"
);
LOGGER.info(message);
}
@Override
public
void
handleMessage(Map message) {
LOGGER.info(
"--------handleMessage 2-----------"
);
}
@Override
public
void
handleMessage(
byte
[] message) {
LOGGER.info(
"--------handleMessage 3-----------"
);
LOGGER.info(
new
String(message));
}
public
void
handleMessage(Serializable message) {
LOGGER.info(
"--------handleMessage 4-----------"
);
LOGGER.info(message.toString());
}
public
void
handleMessage(Serializable message, String channel) {
LOGGER.info(
"--------handleMessage 5-----------"
);
LOGGER.info(message +
"------>"
+ channel);
}
// implementation elided for clarity...
}
|
1.3.3配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
<
beans
xmlns
=
"http://www.springframework.org/schema/beans"
xmlns:xsi
=
"http://www.w3.org/2001/XMLSchema-instance"
xmlns:redis
=
"http://www.springframework.org/schema/redis"
xmlns:p
=
"http://www.springframework.org/schema/p"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/redis http://www.springframework.org/schema/redis/spring-redis.xsd">
<!-- Jedis ConnectionFactory -->
<
bean
id
=
"lettuceConnectionFactory"
class
=
"org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory"
p:port
=
"6379"
p:hostName
=
"192.168.163.146"
/>
<
bean
id
=
"messageListener"
class
=
"org.springframework.data.redis.listener.adapter.MessageListenerAdapter"
>
<
constructor-arg
>
<
bean
class
=
"DefaultMessageDelegate"
/>
</
constructor-arg
>
</
bean
>
<
bean
id
=
"redisContainer"
class
=
"org.springframework.data.redis.listener.RedisMessageListenerContainer"
>
<
property
name
=
"connectionFactory"
ref
=
"lettuceConnectionFactory"
/>
<
property
name
=
"messageListeners"
>
<
map
>
<
entry
key-ref
=
"messageListener"
>
<
bean
class
=
"org.springframework.data.redis.listener.ChannelTopic"
>
<
constructor-arg
value
=
"chatroom"
/>
</
bean
>
</
entry
>
</
map
>
</
property
>
</
bean
>
</
beans
>
|
1.4 总结
主要涉及4个组件
MessageListener:Redis中发布的消息的侦听器。
MessageListenerAdapter:消息侦听器适配器,通过反射将消息处理委托给目标侦听器方法,并进行灵活的消息类型转换
ChannelTopic:Channel topic
RedisMessageListenerContainer:Redis消息侦听器提供异步行为的容器。 处理侦听,转换和消息分派的低级细节
2. 事务管理
redis事务命令,具体见http://redis.readthedocs.io/en/2.4/transaction.html。
首先看一个测试用例
2.1 一个不能运行的事务代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
@Test
public
void
testTran(){
this
.redisTemplate.multi();
try
{
for
(
int
i =
0
; i <
1
; i++) {
String key =
"key"
+ i;
String value =
"00000"
;
this
.redisTemplate.opsForValue().set(key, value);
}
this
.redisTemplate.exec();
}
catch
(Exception e){
this
.redisTemplate.discard();
}
}
|
执行代码会报异常。具体异常
Caused by: com.lambdaworks.redis.RedisCommandExecutionException: ERR DISCARD without MULTI
at com.lambdaworks.redis.LettuceFutures.await(LettuceFutures.java:76)
at com.lambdaworks.redis.FutureSyncInvocationHandler.handleInvocation(FutureSyncInvocationHandler.java:59)
at com.google.common.reflect.AbstractInvocationHandler.invoke(AbstractInvocationHandler.java:87)
at com.sun.proxy.$Proxy15.discard(Unknown Source)
at org.springframework.data.redis.connection.lettuce.LettuceConnection.discard(LettuceConnection.java:833)
这是因为Redis通过multi,exec和discard命令为事务提供支持。 这些操作在RedisTemplate上可用,但是RedisTemplate不能保证在事务中使用相同的连接执行所有操作。
处理异常
通过设置setEnableTransactionSupport(true)显式为每个正在使用的RedisTemplate启用。 这将强制将正在使用的RedisConnection绑定到触发MULTI的当前线程。
修改后的代码如下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
public
void
testTran(){
StringRedisSerializer serializer =
new
StringRedisSerializer();
redisTemplate.setKeySerializer(serializer);
redisTemplate.setValueSerializer(serializer);
this
.redisTemplate.setEnableTransactionSupport(
true
);
this
.redisTemplate.multi();
try
{
for
(
int
i =
0
; i <
100
; i++) {
String key =
"key"
+ i;
String value =
"00000"
;
this
.redisTemplate.opsForValue().set(key, value);
}
this
.redisTemplate.exec();
}
catch
(Exception e){
this
.redisTemplate.discard();
}
}
|
运行结果
1) "key95"
2) "key82"
3) "key69"
4) "key90"
5) "key18"
6) "key1"
7) "key4"
8) "key63"
9) "key78"
执行结果,也简介认证了redisTemplate 执行结果无序。
2.2 正常事务代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
@Test
public
void
testTran3(){
StringRedisSerializer serializer =
new
StringRedisSerializer();
redisTemplate.setKeySerializer(serializer);
redisTemplate.setValueSerializer(serializer);
redisTemplate.execute(
new
SessionCallback<List<Object>>() {
@Override
public
List<Object> execute(RedisOperations operations)
throws
DataAccessException {
operations.multi();
try
{
for
(
int
i=
0
;i<
10
;i++){
operations.opsForSet().add(
"key"
+i,
"value"
);
if
(i>
5
){
throw
new
Exception();
}
}
return
operations.exec();
}
catch
(Exception e) {
operations.discard();
}
// This will contain the results of all ops in the transaction
return
Collections.emptyList();
}
});
}
}
|
3. 管道支持
Redis提供对流水线的支持,这涉及向服务器发送多个命令,而不必等待答复,然后在一个步骤中读取答复。当您需要在一行中发送多个命令时,流水线可以提高性能,例如向同一列表中添加许多元素。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
/**
* 在管道连接上执行给定的操作对象,返回结果。
* 注意,回调不能返回非空值,因为它被管道覆盖。 此方法将使用默认序列化器反序列化结果
*
* @param action callback object to execute
* @return list of objects returned by the pipeline
*/
List<Object> executePipelined(RedisCallback<?> action);
/**
*在流水线连接上执行给定的操作对象,使用专用的序列化程序返回结果
*
* @param action callback object to execute
* @param resultSerializer The Serializer to use for individual values or Collections of values. If any returned
* values are hashes, this serializer will be used to deserialize both the key and value
* @return list of objects returned by the pipeline
*/
List<Object> executePipelined(
final
RedisCallback<?> action,
final
RedisSerializer<?> resultSerializer);
/**
*在流水线连接上执行给定的Redis会话。 允许事务流水线化。
* 注意,回调不能返回非空值,因为它被管道覆盖。
* @param session Session callback
* @return list of objects returned by the pipeline
*/
List<Object> executePipelined(
final
SessionCallback<?> session);
/**
* 在管道连接上执行给定的Redis会话,使用专用的序列化程序返回结果。 允许事务流水线化。
* 注意,回调不能返回非空值,因为它被管道覆盖。
*
* @param session Session callback
* @param resultSerializer
* @return list of objects returned by the pipeline
*/
List<Object> executePipelined(
final
SessionCallback<?> session,
final
RedisSerializer<?> resultSerializer);
|