Redis是使用客户端 - 服务器模型的TCP服务器,称为请求/响应协议。
知道redis是基于TCP连接进行通信的,每一个request/response都需要经历一个RTT(Round-Trip Time 往返时间),如果需要执行很多短小的命令,这些往返时间的开销是很大的,在此情形下,redis提出了管道来提高执行效率。
需要注意到是用 pipeline方式打包命令发送,redis必须在处理完所有命令前先缓存起所有命令的处理结果。
package redis.clients.jedis; import redis.clients.jedis.exceptions.JedisDataException; import; import; import java.util.ArrayList; import java.util.List; public class Pipeline extends MultiKeyPipelineBase implements Closeable { private MultiResponseBuilder currentMulti; private class MultiResponseBuilder extends Builder<List<Object>> { private List<Response<?>> responses = new ArrayList<Response<?>>(); @Override public List<Object> build(Object data) { @SuppressWarnings("unchecked") List<Object> list = (List<Object>) data; List<Object> values = new ArrayList<Object>(); if (list.size() != responses.size()) { throw new JedisDataException("Expected data size " + responses.size() + " but was " + list.size()); } for (int i = 0; i < list.size(); i++) { Response<?> response = responses.get(i); response.set(list.get(i)); Object builtResponse; try { builtResponse = response.get(); } catch (JedisDataException e) { builtResponse = e; } values.add(builtResponse); } return values; } public void setResponseDependency(Response<?> dependency) { for (Response<?> response : responses) { response.setDependency(dependency); } } public void addResponse(Response<?> response) { responses.add(response); } } @Override protected <T> Response<T> getResponse(Builder<T> builder) { if (currentMulti != null) { super.getResponse(BuilderFactory.STRING); // Expected QUEUED Response<T> lr = new Response<T>(builder); currentMulti.addResponse(lr); return lr; } else { return super.getResponse(builder); } } public void setClient(Client client) { this.client = client; } @Override protected Client getClient(byte[] key) { return client; } @Override protected Client getClient(String key) { return client; } public void clear() { if (isInMulti()) { discard(); } sync(); } public boolean isInMulti() { return currentMulti != null; } /** * Synchronize pipeline by reading all responses. This operation close the pipeline. In order to * get return values from pipelined commands, capture the different Response<?> of the * commands you execute. */ public void sync() { if (getPipelinedResponseLength() > 0) { List<Object> unformatted = client.getAll(); for (Object o : unformatted) { generateResponse(o); } } } /** * Synchronize pipeline by reading all responses. This operation close the pipeline. Whenever * possible try to avoid using this version and use Pipeline.sync() as it won't go through all the * responses and generate the right response type (usually it is a waste of time). * @return A list of all the responses in the order you executed them. */ public List<Object> syncAndReturnAll() { if (getPipelinedResponseLength() > 0) { List<Object> unformatted = client.getAll(); List<Object> formatted = new ArrayList<Object>(); for (Object o : unformatted) { try { formatted.add(generateResponse(o).get()); } catch (JedisDataException e) { formatted.add(e); } } return formatted; } else { return java.util.Collections.<Object> emptyList(); } } public Response<String> discard() { if (currentMulti == null) throw new JedisDataException("DISCARD without MULTI"); client.discard(); currentMulti = null; return getResponse(BuilderFactory.STRING); } public Response<List<Object>> exec() { if (currentMulti == null) throw new JedisDataException("EXEC without MULTI"); client.exec(); Response<List<Object>> response = super.getResponse(currentMulti); currentMulti.setResponseDependency(response); currentMulti = null; return response; } public Response<String> multi() { if (currentMulti != null) throw new JedisDataException("MULTI calls can not be nested"); client.multi(); Response<String> response = getResponse(BuilderFactory.STRING); // Expecting // OK currentMulti = new MultiResponseBuilder(); return response; } @Override public void close() throws IOException { clear(); } }
- Pipeline在某些场景下非常有用,比如有多个command需要被“及时的”提交,而且他们对相应结果没有互相依赖,而且对结果响应也无需立即获得,那么pipeline就可以充当这种“批处理”的工具;而且在一定程度上,可以较大的提升性能,性能提升的原因主要是TCP链接中较少了“交互往返”的时间。
- pipeline中不同数据结构的命令和不使用pipeline是一样,具体参考:Redis不同数据类型命令使用及应用场景_琦彦-CSDN博客_redis不同数据类型的应用场景
@Test public void testPipeline() { Jedis jedis = null; Pipeline pipeline = null; try { // 创建一个jedis的对象。 jedis = new Jedis("ip", 6379); jedis.auth("密码"); // 获取一个管道对象 pipeline = jedis.pipelined(); // 删除已经存在的key pipeline.del("pipelinedList"); // 循环添加 for (int i = 0; i < 100; i++) { pipeline.rpush("pipelinedList",i+""); } // 执行 pipeline.sync(); } catch (Exception e) { e.printStackTrace(); } finally { // 关闭pipeline if(pipeline != null){ try { pipeline.close(); } catch (IOException e) { e.printStackTrace(); } } // 关闭jedis。 if(jedis != null){ jedis.close(); } } }
- 管道和事务是不同的,pipeline只是表达“交互”中操作的传递的方向性,pipeline也可以在事务中运行,也可以不在。
- 无论如何,pipeline中发送的每个command都会被server立即执行,如果执行失败,将会在此后的相应中得到信息;也就是pipeline并不是表达“所有command都一起成功”的语义,管道中前面命令失败,后面命令不会有影响,继续执行。
- 简单来说就是管道中的命令是没有关系的,它们只是像管道一样流水发给server,而不是串行执行,仅此而已;但是如果pipeline的操作被封装在事务中,那么将有事务来确保操作的成功与失败。
- pipeline 只是把多个redis指令一起发出去,redis并没有保证这些指定的执行是原子的;multi相当于一个redis的transaction的,保证整个操作的原子性,避免由于中途出错而导致最后产生的数据不一致
- 使用管道可能在效率上比使用script要好,但是有的情况下只能使用script。因为在执行后面的命令时,无法得到前面命令的结果,就像事务一样,所以如果需要在后面命令中使用前面命令的value等结果,则只能使用script或者事务+watch。
- 使用Redis脚本(在Redis版本2.6或更高版本中可用),可以使用执行服务器端所需的大量工作的脚本更高效地处理一些 pipelining 用例。
- 脚本的一大优势是它能够以最小的延迟读取和写入数据,使得读取,计算,写入等操作非常快速(在这种情况下,流水线操作无法提供帮助,因为客户端先需要读命令的回应,它才可以调用写命令)。
- 有时,应用程序可能还想在 pipeline 中发送EVAL或EVALSHA命令。这是完全可能的,Redis通过SCRIPT LOAD命令明确地支持它(它保证可以调用EVALSHA而没有失败的风险)。
来源: Using pipelining to speedup Redis queries – Redis
来源: redis中的事务、lua脚本和管道的使用场景_fangjian1204的专栏-CSDN博客_if