Redis: pipeline加速Redis请求,实现批处理

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
简介: Redis: pipeline加速Redis请求,实现批处理

 Redis是基于TCP连接进行通信

Redis是使用客户端 - 服务器模型的TCP服务器,称为请求/响应协议。

这意味着通常一个请求是通过以下步骤完成的:

    1. 客户端向服务器发送查询,并通常以阻塞的方式从套接字读取服务器响应。
    2. 服务器处理命令并将响应发送回客户端。

    知道redis是基于TCP连接进行通信的,每一个request/response都需要经历一个RTT(Round-Trip Time 往返时间),如果需要执行很多短小的命令,这些往返时间的开销是很大的,在此情形下,redis提出了管道来提高执行效率。

    pipeline的思想

      1. 如果client执行一些相互之间无关的命令或者不需要获取命令的返回值,那么redis允许你连续发送多条命令,而不需要等待前面命令执行完毕。
      2. 比如我们执行3INCR命令,如果使用管道,理论上只需要一个RTT+3条命令的执行时间即可,如果不适用管道,那么可能需要额外的两个RTT时间。
      3. 因此,管道相当于批处理脚本,相当于是命令集。

      pipeline不是打包的命令越多越好

        1. 通过pipeline方式当有大批量的操作时候。我们可以节省很多原来浪费在网络延迟的时间。
        2. 需要注意到是用 pipeline方式打包命令发送,redis必须在处理完所有命令前先缓存起所有命令的处理结果。
        3. 打包的命令越多,缓存消耗内存也越多。所以并不是打包的命令越多越好。具体多少合适需要根据具体情况测试。

        pipeline常用API

        20181022150840717.png

        image.gif

        package redis.clients.jedis;
        import redis.clients.jedis.exceptions.JedisDataException;
        import java.io.Closeable;
        import java.io.IOException;
        import java.util.ArrayList;
        import java.util.List;
        public class Pipeline extends MultiKeyPipelineBase implements Closeable {
          private MultiResponseBuilder currentMulti;
          private class MultiResponseBuilder extends Builder<List<Object>> {
            private List<Response<?>> responses = new ArrayList<Response<?>>();
            @Override
            public List<Object> build(Object data) {
              @SuppressWarnings("unchecked")
              List<Object> list = (List<Object>) data;
              List<Object> values = new ArrayList<Object>();
              if (list.size() != responses.size()) {
                throw new JedisDataException("Expected data size " + responses.size() + " but was "
                    + list.size());
              }
              for (int i = 0; i < list.size(); i++) {
                Response<?> response = responses.get(i);
                response.set(list.get(i));
                Object builtResponse;
                try {
                  builtResponse = response.get();
                } catch (JedisDataException e) {
                  builtResponse = e;
                }
                values.add(builtResponse);
              }
              return values;
            }
            public void setResponseDependency(Response<?> dependency) {
              for (Response<?> response : responses) {
                response.setDependency(dependency);
              }
            }
            public void addResponse(Response<?> response) {
              responses.add(response);
            }
          }
          @Override
          protected <T> Response<T> getResponse(Builder<T> builder) {
            if (currentMulti != null) {
              super.getResponse(BuilderFactory.STRING); // Expected QUEUED
              Response<T> lr = new Response<T>(builder);
              currentMulti.addResponse(lr);
              return lr;
            } else {
              return super.getResponse(builder);
            }
          }
          public void setClient(Client client) {
            this.client = client;
          }
          @Override
          protected Client getClient(byte[] key) {
            return client;
          }
          @Override
          protected Client getClient(String key) {
            return client;
          }
          public void clear() {
            if (isInMulti()) {
              discard();
            }
            sync();
          }
          public boolean isInMulti() {
            return currentMulti != null;
          }
          /**
           * Synchronize pipeline by reading all responses. This operation close the pipeline. In order to
           * get return values from pipelined commands, capture the different Response<?> of the
           * commands you execute.
           */
          public void sync() {
            if (getPipelinedResponseLength() > 0) {
              List<Object> unformatted = client.getAll();
              for (Object o : unformatted) {
                generateResponse(o);
              }
            }
          }
          /**
           * Synchronize pipeline by reading all responses. This operation close the pipeline. Whenever
           * possible try to avoid using this version and use Pipeline.sync() as it won't go through all the
           * responses and generate the right response type (usually it is a waste of time).
           * @return A list of all the responses in the order you executed them.
           */
          public List<Object> syncAndReturnAll() {
            if (getPipelinedResponseLength() > 0) {
              List<Object> unformatted = client.getAll();
              List<Object> formatted = new ArrayList<Object>();
              for (Object o : unformatted) {
                try {
                  formatted.add(generateResponse(o).get());
                } catch (JedisDataException e) {
                  formatted.add(e);
                }
              }
              return formatted;
            } else {
              return java.util.Collections.<Object> emptyList();
            }
          }
          public Response<String> discard() {
            if (currentMulti == null) throw new JedisDataException("DISCARD without MULTI");
            client.discard();
            currentMulti = null;
            return getResponse(BuilderFactory.STRING);
          }
          public Response<List<Object>> exec() {
            if (currentMulti == null) throw new JedisDataException("EXEC without MULTI");
            client.exec();
            Response<List<Object>> response = super.getResponse(currentMulti);
            currentMulti.setResponseDependency(response);
            currentMulti = null;
            return response;
          }
          public Response<String> multi() {
            if (currentMulti != null) throw new JedisDataException("MULTI calls can not be nested");
            client.multi();
            Response<String> response = getResponse(BuilderFactory.STRING); // Expecting
            // OK
            currentMulti = new MultiResponseBuilder();
            return response;
          }
          @Override
          public void close() throws IOException {
            clear();
          }
        }

        image.gif

          • Pipeline在某些场景下非常有用,比如有多个command需要被“及时的”提交,而且他们对相应结果没有互相依赖,而且对结果响应也无需立即获得,那么pipeline就可以充当这种“批处理”的工具;而且在一定程度上,可以较大的提升性能,性能提升的原因主要是TCP链接中较少了“交互往返”的时间。

          通过Jedis操作pipeline

            @Test
                public void testPipeline() {
                    Jedis jedis = null;
                    Pipeline pipeline = null;
                    try {
                        // 创建一个jedis的对象。
                        jedis = new Jedis("ip", 6379);
                        jedis.auth("密码");
                        // 获取一个管道对象
                        pipeline = jedis.pipelined();
                        // 删除已经存在的key
                        pipeline.del("pipelinedList");
                        // 循环添加
                        for (int i = 0; i < 100; i++) {
                            pipeline.rpush("pipelinedList",i+"");
                        }
                        // 执行
                        pipeline.sync();
                    } catch (Exception e) {
                        e.printStackTrace();
                    } finally {
                        // 关闭pipeline
                        if(pipeline != null){
                            try {
                                pipeline.close();
                            } catch (IOException e) {
                                e.printStackTrace();
                            }
                        }
                        // 关闭jedis。
                        if(jedis != null){
                            jedis.close();
                        }
                    }
                }

            image.gif

            pipelineVS事务

              • 管道和事务是不同的,pipeline只是表达“交互”中操作的传递的方向性,pipeline也可以在事务中运行,也可以不在。
              • 无论如何,pipeline中发送的每个command都会被server立即执行,如果执行失败,将会在此后的相应中得到信息;也就是pipeline并不是表达“所有command都一起成功”的语义,管道中前面命令失败,后面命令不会有影响,继续执行
              • 简单来说就是管道中的命令是没有关系的,它们只是像管道一样流水发给server,而不是串行执行,仅此而已;但是如果pipeline的操作被封装在事务中,那么将有事务来确保操作的成功与失败。
                • pipeline 只是把多个redis指令一起发出去,redis并没有保证这些指定的执行是原子的;multi相当于一个redis的transaction的,保证整个操作的原子性,避免由于中途出错而导致最后产生的数据不一致

                pipelineVS脚本

                  • 使用管道可能在效率上比使用script要好,但是有的情况下只能使用script。因为在执行后面的命令时,无法得到前面命令的结果,就像事务一样,所以如果需要在后面命令中使用前面命令的value等结果,则只能使用script或者事务+watch
                  • 使用Redis脚本(在Redis版本2.6或更高版本中可用),可以使用执行服务器端所需的大量工作的脚本更高效地处理一些 pipelining 用例。
                  • 脚本的一大优势是它能够以最小的延迟读取和写入数据,使得读取,计算,写入等操作非常快速(在这种情况下,流水线操作无法提供帮助,因为客户端先需要读命令的回应,它才可以调用写命令)。
                  • 有时,应用程序可能还想在 pipeline 中发送EVALEVALSHA命令。这是完全可能的,Redis通过SCRIPT LOAD命令明确地支持它(它保证可以调用EVALSHA而没有失败的风险)。

                  来源: Using pipelining to speedup Redis queries – Redis

                  来源: redis中的事务、lua脚本和管道的使用场景_fangjian1204的专栏-CSDN博客_if redis.call

                  相关实践学习
                  基于Redis实现在线游戏积分排行榜
                  本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
                  云数据库 Redis 版使用教程
                  云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
                  目录
                  相关文章
                  |
                  缓存 NoSQL 网络协议
                  Redis: pipeline加速Redis请求,实现批处理
                  Redis: pipeline加速Redis请求,实现批处理
                  511 0
                  Redis: pipeline加速Redis请求,实现批处理
                  |
                  3月前
                  |
                  存储 缓存 NoSQL
                  数据的存储--Redis缓存存储(一)
                  数据的存储--Redis缓存存储(一)
                  117 1
                  |
                  14天前
                  |
                  存储 缓存 NoSQL
                  解决Redis缓存数据类型丢失问题
                  解决Redis缓存数据类型丢失问题
                  157 85
                  |
                  3月前
                  |
                  存储 缓存 NoSQL
                  数据的存储--Redis缓存存储(二)
                  数据的存储--Redis缓存存储(二)
                  53 2
                  数据的存储--Redis缓存存储(二)
                  |
                  3月前
                  |
                  消息中间件 缓存 NoSQL
                  Redis 是一个高性能的键值对存储系统,常用于缓存、消息队列和会话管理等场景。
                  【10月更文挑战第4天】Redis 是一个高性能的键值对存储系统,常用于缓存、消息队列和会话管理等场景。随着数据增长,有时需要将 Redis 数据导出以进行分析、备份或迁移。本文详细介绍几种导出方法:1)使用 Redis 命令与重定向;2)利用 Redis 的 RDB 和 AOF 持久化功能;3)借助第三方工具如 `redis-dump`。每种方法均附有示例代码,帮助你轻松完成数据导出任务。无论数据量大小,总有一款适合你。
                  85 6
                  |
                  11天前
                  |
                  缓存 监控 NoSQL
                  Redis经典问题:缓存穿透
                  本文详细探讨了分布式系统和缓存应用中的经典问题——缓存穿透。缓存穿透是指用户请求的数据在缓存和数据库中都不存在,导致大量请求直接落到数据库上,可能引发数据库崩溃或性能下降。文章介绍了几种有效的解决方案,包括接口层增加校验、缓存空值、使用布隆过滤器、优化数据库查询以及加强监控报警机制。通过这些方法,可以有效缓解缓存穿透对系统的影响,提升系统的稳定性和性能。
                  |
                  2月前
                  |
                  缓存 NoSQL 关系型数据库
                  大厂面试高频:如何解决Redis缓存雪崩、缓存穿透、缓存并发等5大难题
                  本文详解缓存雪崩、缓存穿透、缓存并发及缓存预热等问题,提供高可用解决方案,帮助你在大厂面试和实际工作中应对这些常见并发场景。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
                  大厂面试高频:如何解决Redis缓存雪崩、缓存穿透、缓存并发等5大难题
                  |
                  2月前
                  |
                  存储 缓存 NoSQL
                  【赵渝强老师】基于Redis的旁路缓存架构
                  本文介绍了引入缓存后的系统架构,通过缓存可以提升访问性能、降低网络拥堵、减轻服务负载和增强可扩展性。文中提供了相关图片和视频讲解,并讨论了数据库读写分离、分库分表等方法来减轻数据库压力。同时,文章也指出了缓存可能带来的复杂度增加、成本提高和数据一致性问题。
                  【赵渝强老师】基于Redis的旁路缓存架构
                  |
                  2月前
                  |
                  缓存 NoSQL Redis
                  Redis 缓存使用的实践
                  《Redis缓存最佳实践指南》涵盖缓存更新策略、缓存击穿防护、大key处理和性能优化。包括Cache Aside Pattern、Write Through、分布式锁、大key拆分和批量操作等技术,帮助你在项目中高效使用Redis缓存。
                  329 22
                  |
                  2月前
                  |
                  缓存 NoSQL PHP
                  Redis作为PHP缓存解决方案的优势、实现方式及注意事项。Redis凭借其高性能、丰富的数据结构、数据持久化和分布式支持等特点,在提升应用响应速度和处理能力方面表现突出
                  本文深入探讨了Redis作为PHP缓存解决方案的优势、实现方式及注意事项。Redis凭借其高性能、丰富的数据结构、数据持久化和分布式支持等特点,在提升应用响应速度和处理能力方面表现突出。文章还介绍了Redis在页面缓存、数据缓存和会话缓存等应用场景中的使用,并强调了缓存数据一致性、过期时间设置、容量控制和安全问题的重要性。
                  45 5