Redis: pipeline加速Redis请求,实现批处理

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: Redis: pipeline加速Redis请求,实现批处理

Redis是基于TCP连接进行通信


Redis是使用客户端 - 服务器模型的TCP服务器,称为请求/响应协议。

这意味着通常一个请求是通过以下步骤完成的:

  1. 客户端向服务器发送查询,并通常以阻塞的方式从套接字读取服务器响应。
  2. 服务器处理命令并将响应发送回客户端。

知道redis是基于TCP连接进行通信的,每一个request/response都需要经历一个RTT(Round-Trip Time 往返时间),如果需要执行很多短小的命令,这些往返时间的开销是很大的,在此情形下,redis提出了管道来提高执行效率。

 

pipeline的思想

  1. 如果client执行一些相互之间无关的命令或者不需要获取命令的返回值,那么redis允许你连续发送多条命令,而不需要等待前面命令执行完毕。
  2. 比如我们执行3INCR命令,如果使用管道,理论上只需要一个RTT+3条命令的执行时间即可,如果不适用管道,那么可能需要额外的两个RTT时间。
  3. 因此,管道相当于批处理脚本,相当于是命令集。

pipeline不是打包的命令越多越好

  1. 通过pipeline方式当有大批量的操作时候。我们可以节省很多原来浪费在网络延迟的时间。
  2. 需要注意到是用 pipeline方式打包命令发送,redis必须在处理完所有命令前先缓存起所有命令的处理结果。
  3. 打包的命令越多,缓存消耗内存也越多。所以并不是打包的命令越多越好。具体多少合适需要根据具体情况测试。

pipeline常用API

图片.png

 

 

package redis.clients.jedis;
import redis.clients.jedis.exceptions.JedisDataException;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class Pipeline extends MultiKeyPipelineBase implements Closeable {
  private MultiResponseBuilder currentMulti;
  private class MultiResponseBuilder extends Builder<List<Object>> {
    private List<Response<?>> responses = new ArrayList<Response<?>>();
    @Override
    public List<Object> build(Object data) {
      @SuppressWarnings("unchecked")
      List<Object> list = (List<Object>) data;
      List<Object> values = new ArrayList<Object>();
      if (list.size() != responses.size()) {
        throw new JedisDataException("Expected data size " + responses.size() + " but was "
            + list.size());
      }
      for (int i = 0; i < list.size(); i++) {
        Response<?> response = responses.get(i);
        response.set(list.get(i));
        Object builtResponse;
        try {
          builtResponse = response.get();
        } catch (JedisDataException e) {
          builtResponse = e;
        }
        values.add(builtResponse);
      }
      return values;
    }
    public void setResponseDependency(Response<?> dependency) {
      for (Response<?> response : responses) {
        response.setDependency(dependency);
      }
    }
    public void addResponse(Response<?> response) {
      responses.add(response);
    }
  }
  @Override
  protected <T> Response<T> getResponse(Builder<T> builder) {
    if (currentMulti != null) {
      super.getResponse(BuilderFactory.STRING); // Expected QUEUED
      Response<T> lr = new Response<T>(builder);
      currentMulti.addResponse(lr);
      return lr;
    } else {
      return super.getResponse(builder);
    }
  }
  public void setClient(Client client) {
    this.client = client;
  }
  @Override
  protected Client getClient(byte[] key) {
    return client;
  }
  @Override
  protected Client getClient(String key) {
    return client;
  }
  public void clear() {
    if (isInMulti()) {
      discard();
    }
    sync();
  }
  public boolean isInMulti() {
    return currentMulti != null;
  }
  /**
   * Synchronize pipeline by reading all responses. This operation close the pipeline. In order to
   * get return values from pipelined commands, capture the different Response<?> of the
   * commands you execute.
   */
  public void sync() {
    if (getPipelinedResponseLength() > 0) {
      List<Object> unformatted = client.getAll();
      for (Object o : unformatted) {
        generateResponse(o);
      }
    }
  }
  /**
   * Synchronize pipeline by reading all responses. This operation close the pipeline. Whenever
   * possible try to avoid using this version and use Pipeline.sync() as it won't go through all the
   * responses and generate the right response type (usually it is a waste of time).
   * @return A list of all the responses in the order you executed them.
   */
  public List<Object> syncAndReturnAll() {
    if (getPipelinedResponseLength() > 0) {
      List<Object> unformatted = client.getAll();
      List<Object> formatted = new ArrayList<Object>();
      for (Object o : unformatted) {
        try {
          formatted.add(generateResponse(o).get());
        } catch (JedisDataException e) {
          formatted.add(e);
        }
      }
      return formatted;
    } else {
      return java.util.Collections.<Object> emptyList();
    }
  }
  public Response<String> discard() {
    if (currentMulti == null) throw new JedisDataException("DISCARD without MULTI");
    client.discard();
    currentMulti = null;
    return getResponse(BuilderFactory.STRING);
  }
  public Response<List<Object>> exec() {
    if (currentMulti == null) throw new JedisDataException("EXEC without MULTI");
    client.exec();
    Response<List<Object>> response = super.getResponse(currentMulti);
    currentMulti.setResponseDependency(response);
    currentMulti = null;
    return response;
  }
  public Response<String> multi() {
    if (currentMulti != null) throw new JedisDataException("MULTI calls can not be nested");
    client.multi();
    Response<String> response = getResponse(BuilderFactory.STRING); // Expecting
    // OK
    currentMulti = new MultiResponseBuilder();
    return response;
  }
  @Override
  public void close() throws IOException {
    clear();
  }
}

 

  • Pipeline在某些场景下非常有用,比如有多个command需要被“及时的”提交,而且他们对相应结果没有互相依赖,而且对结果响应也无需立即获得,那么pipeline就可以充当这种“批处理”的工具;而且在一定程度上,可以较大的提升性能,性能提升的原因主要是TCP链接中较少了“交互往返”的时间。

 

通过Jedis操作pipeline

@Test
    public void testPipeline() {
        Jedis jedis = null;
        Pipeline pipeline = null;
        try {
            // 创建一个jedis的对象。
            jedis = new Jedis("ip", 6379);
            jedis.auth("密码");
            // 获取一个管道对象
            pipeline = jedis.pipelined();
            // 删除已经存在的key
            pipeline.del("pipelinedList");
            // 循环添加
            for (int i = 0; i < 100; i++) {
                pipeline.rpush("pipelinedList",i+"");
            }
            // 执行
            pipeline.sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 关闭pipeline
            if(pipeline != null){
                try {
                    pipeline.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            // 关闭jedis。
            if(jedis != null){
                jedis.close();
            }
        }
    }


pipelineVS事务

  • 管道和事务是不同的,pipeline只是表达“交互”中操作的传递的方向性,pipeline也可以在事务中运行,也可以不在。
  • 无论如何,pipeline中发送的每个command都会被server立即执行,如果执行失败,将会在此后的相应中得到信息;也就是pipeline并不是表达“所有command都一起成功”的语义,管道中前面命令失败,后面命令不会有影响,继续执行
  • 简单来说就是管道中的命令是没有关系的,它们只是像管道一样流水发给server,而不是串行执行,仅此而已;但是如果pipeline的操作被封装在事务中,那么将有事务来确保操作的成功与失败。
  • pipeline 只是把多个redis指令一起发出去,redis并没有保证这些指定的执行是原子的;multi相当于一个redis的transaction的,保证整个操作的原子性,避免由于中途出错而导致最后产生的数据不一致


pipelineVS脚本

  • 使用管道可能在效率上比使用script要好,但是有的情况下只能使用script。因为在执行后面的命令时,无法得到前面命令的结果,就像事务一样,所以如果需要在后面命令中使用前面命令的value等结果,则只能使用script或者事务+watch
  • 使用Redis脚本(在Redis版本2.6或更高版本中可用),可以使用执行服务器端所需的大量工作的脚本更高效地处理一些 pipelining 用例。
  • 脚本的一大优势是它能够以最小的延迟读取和写入数据,使得读取,计算,写入等操作非常快速(在这种情况下,流水线操作无法提供帮助,因为客户端先需要读命令的回应,它才可以调用写命令)。
  • 有时,应用程序可能还想在 pipeline 中发送EVALEVALSHA命令。这是完全可能的,Redis通过SCRIPT LOAD命令明确地支持它(它保证可以调用EVALSHA而没有失败的风险)。

 

来源: https://redis.io/topics/pipelining

来源: http://blog.csdn.net/fangjian1204/article/details/50585080



相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
缓存 NoSQL 中间件
redis如何通过读写分离来承载读请求高并发
redis如何通过读写分离来承载读请求高并发
99 0
|
27天前
|
NoSQL 网络协议 Java
【赵渝强老师】Redis的管道Pipeline
Redis采用客户端-服务器模型和请求/响应协议,通常一个请求包括客户端发送查询请求并等待服务端响应。为了提高性能,Redis引入了管道PipeLine技术,可以一次性发送多条命令并一次性返回结果,减少客户端与服务器间的通信次数,从而降低往返延迟。示例代码展示了普通命令和管道命令在插入1万条数据时的性能差异,后者执行时间显著缩短。视频讲解提供了更详细的解释。
|
2月前
|
JSON NoSQL Java
springBoot:jwt&redis&文件操作&常见请求错误代码&参数注解 (九)
该文档涵盖JWT(JSON Web Token)的组成、依赖、工具类创建及拦截器配置,并介绍了Redis的依赖配置与文件操作相关功能,包括文件上传、下载、删除及批量删除的方法。同时,文档还列举了常见的HTTP请求错误代码及其含义,并详细解释了@RequestParam与@PathVariable等参数注解的区别与用法。
|
4月前
|
监控 NoSQL Go
Go语言中高效使用Redis的Pipeline
Redis 是构建高性能应用时常用的内存数据库,通过其 Pipeline 和 Watch 机制可批量执行命令并确保数据安全性。Pipeline 类似于超市购物一次性结账,减少网络交互时间,提升效率。Go 语言示例展示了如何使用 Pipeline 和 Pipelined 方法简化代码,并通过 TxPipeline 保证操作原子性。Watch 机制则通过监控键变化实现乐观锁,防止并发问题导致的数据不一致。这些机制简化了开发流程,提高了应用程序的性能和可靠性。
72 0
|
4月前
|
NoSQL Java 应用服务中间件
使用Redis和Nginx分别实现限制接口请求频率
这篇文章介绍了如何使用Redis和Nginx分别实现限制接口请求频率的方法,包括具体的命令使用、代码实现和配置步骤。
76 0
|
5月前
|
NoSQL Redis
Redis 使用 hyperLogLog 实现请求ip去重的浏览量
Redis 使用 hyperLogLog 实现请求ip去重的浏览量
40 0
|
6月前
|
缓存 负载均衡 NoSQL
Redis系列学习文章分享---第十四篇(Redis多级缓存--封装Http请求+向tomcat发送http请求+根据商品id对tomcat集群负载均衡)
Redis系列学习文章分享---第十四篇(Redis多级缓存--封装Http请求+向tomcat发送http请求+根据商品id对tomcat集群负载均衡)
84 1
|
存储 NoSQL Java
Redis性能优化:理解与使用Redis Pipeline
当我们谈论Redis数据处理和存储的优化方法时, Redis Pipeline 无疑是一个不能忽视的重要技术。
831 0
Redis性能优化:理解与使用Redis Pipeline
|
NoSQL Redis Anolis
性能优化特性之:Redis批处理pipeline模式
本文介绍了一种更贴近实际使用的redis验测方法:多pipline模式,并从原理、使用方法进行详细阐述。
|
7月前
|
存储 NoSQL 关系型数据库
Redis协议与异步方式(redis网络层、pipeline、事务、lua脚本、ACID特性、发布订阅、hiredis实现同步连接与异步连接)
Redis协议与异步方式(redis网络层、pipeline、事务、lua脚本、ACID特性、发布订阅、hiredis实现同步连接与异步连接)
206 0