redis使用pipeline通道大幅度提升redis的处理速度,节省成本
最近在做项目的时候,遇到大量的读写,最开始都是set,get一条条的循环去取数据,当数据量大的时候,数据处理相当慢慢,就想到批处理数据的方式,最开始set数据的时候,想到的是mset 也算是批量插入数据,这个在数据量几百的话甚至几千的插入量,也是OK的,取数据的时候用mget 这个100的数据量以下,性能还可以保证,再大的话就是严重有问题,数据量越大取出的成本本越高 ,另外一个,在用mset批量插入的时候,遇到一个难道,不过这是对redis不太了解的原因,后面无意中发现,set的时候,可以直接在后面参加参数,利用redisTemplate模板,
redisTemplate.opsForValue().set("key_s2", maps2.toString(), 1000, TimeUnit.SECONDS); 轻松搞定
下面主要是针对 pipeline 管道方式批量插入,插入的数据量都 是在100000条数据,redis通过tcp来对外提供服务,client通过socket连接发起请求,每个请求在命令发出后会阻塞等待redis服务器进行处理,处理完毕后将结果返回给client。每一个命令都对应了发送、接收两个网络传输,假如一个流程需要0.1秒,那么一秒最多只能处理10个请求,将严重制约redis的性能。
当我们要处理上万以上数据的时候,就不得不另想办法,此时pipeline管道就是解决执行大量命令时、会产生大量数据来回次数而导致延迟的技术。其实原理比较简单,pipeline是把所有的命令一次发过去,避免频繁的发送、接收带来的网络开销,redis在打包接收到一堆命令后,依次执行,然后把结果再打包返回给客户端。
下面直接用代码试下,当然还作了其它的一些改动,比如RedisConnection 和SessionCallback 参数性能上不太一样,相差也不多,重点说一下, redisConnection.openPipeline(); 这个方法在doInRedis内部其实已经有,但是在这里首先打开和关闭,对性能有一定的提高,10万次提升100毫秒左右
//加 redisConnection.openPipeline(); 打开资源 第一次耗时:470 第二次 耗时:468 毫秒
//不加redisConnection.openPipeline();打开和关闭 第一次 耗时:533 第二次 耗时:490,主要是第一次执行操作 其实是可以忽略了,并且这个的参数是 doInRedis重写方法是 RedisConnection 参数,后续对set的的操作必须是以redisConnection 参数开始,应该是新建一个接连去处理后续的所有操作,再到通道关闭,这样麻烦一些,所有的序列化都要自己去处理
public void pipelinedTest(){
Long time = System.currentTimeMillis();
for (int i = 0; i < 10000; i++) {
redisTemplate.opsForValue().increment("pipline", 1);
}
System.out.println("耗时:" + (System.currentTimeMillis() - time));
time = System.currentTimeMillis();
Map maps = new HashMap<>();
maps.put("name", "lmc");
maps.put("姓名", "李四");
redisTemplate.executePipelined(new RedisCallback<Object>() {
@Override
public Object doInRedis(RedisConnection redisConnection) throws DataAccessException {
redisConnection.openPipeline();
for (int i = 0;i < 100000;i++){
redisConnection.incrBy("a".getBytes(),1L);
}
redisConnection.set("keys".getBytes(), maps.toString().getBytes());
redisConnection.closePipeline();
return null;
}
});
System.out.println("*****************"+redisTemplate.opsForValue().get("keys"));
System.out.println("耗时:" + (System.currentTimeMillis() - time));
}
第二种方式 execute 重写方法,参数RedisOperations 这种方式比较省心,不需要参数来操作, 用redisTemplate模板操作直接省去了很多数据封装的问题,但是性能上稍微差一点
// 这是第一次执行耗时:596 第二次执行耗时:589 毫秒
Long time = System.currentTimeMillis();
Map maps2 = new HashMap<>();
maps2.put("name", "lmc");
maps2.put("姓名", "李四");
time = System.currentTimeMillis();
redisTemplate.executePipelined(new SessionCallback<Object>() {
@Override
public <K, V> Object execute(RedisOperations<K, V> redisOperations) throws DataAccessException {
for (int i = 0; i < 100000; i++) {
redisTemplate.opsForValue().increment("piplineddd", 1L);
}
redisTemplate.opsForValue().set("key_s2", maps2.toString(), 1000, TimeUnit.SECONDS);
return null;
}
});
System.out.println("
System.out.println("耗时:" + (System.currentTimeMillis() - time));
直接一次性提交,所以executePipelined源码提供了多种方式供我们选择,底层提交都 一样的,只是在对外提供的方式为了满足更多的需求,
还在慢慢研究当中,如有描述不恰当的,还请各位指教
原文地址https://blog.csdn.net/limingcai168/article/details/81170399