Spring-data-redis + Lettuce 如何使用 Pipeline

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
简介: Spring-data-redis + Lettuce 如何使用 Pipeline

关于 spring-data-redis 和 lettuce,笔者写过不少文章:

最近,私信还有留言中,网友提到 spring-data-redis 和 lettuce 一起使用,pipeline 通过抓包一看,并没有生效,这个如何配置才能生效呢?

首先,在上面的文章中,我们分析过 Spring-data-redis + Lettuce 的基本原理,在这种环境下 RedisTemplate 使用的连接内部包括:

  • asyncSharedConn:可以为空,如果开启了连接共享,则不为空,默认是开启的;所有 LettuceConnection 共享的 Redis 连接,对于每个 LettuceConnection 实际上都是同一个连接;用于执行简单命令,因为 Netty 客户端与 Redis 的单处理线程特性,共享同一个连接也是很快的。如果没开启连接共享,则这个字段为空,使用 asyncDedicatedConn 执行命令。
  • asyncDedicatedConn:私有连接,如果需要保持会话,执行事务,以及 Pipeline 命令,固定连接,则必须使用这个 asyncDedicatedConn 执行 Redis 命令。

execute(RedisCallback),流程是:


image.png


对于 executePipelined(RedisCallback),如果使用正确的话,会使用 asyncDedicatedConn 私有连接执行。那么怎么算使用正确呢?

需要使用回调的连接进行 Redis 调用,不能直接使用redisTemplate调用,否则 pipeline 不生效

Pipeline 生效

List<Object> objects = redisTemplate.executePipelined(new RedisCallback<Object>() {
    @Override
    public Object doInRedis(RedisConnection connection) throws DataAccessException {
        connection.get("test".getBytes());
        connection.get("test2".getBytes());
        return null;
    }
});

Pipeline 不生效

List<Object> objects = redisTemplate.executePipelined(new RedisCallback<Object>() {
    @Override
    public Object doInRedis(RedisConnection connection) throws DataAccessException {
        redisTemplate.opsForValue().get("test");
        redisTemplate.opsForValue().get("test2");
        return null;
    }
});

这样我们就能使用保证 API 层正确使用 pipeline 了,但是默认配置的情况下, 底层还是没有执行 Pipeline,这是怎么回事呢?


Redis Pipeline 类比 Lettuce 中的 AutoFlushCommands


Redis Pipeline 是 Redis 中的 批量操作,它能将一组 Redis 命令进行组装,通过一次传输给 Redis 并返回结果集,大大减少了如果命令时一条条单独传输需要的 RTT 时间(包括 Redis 客户端,Redis 服务端切换系统调用发送接收数据的时间,以及网络传输时间)。

如果原来的命令是这么发送的:

Client -> Server: INCR X\r\n
Server -> Client: 1
Client -> Server: INCR X\r\n
Server -> Client: 2
Client -> Server: INCR X\r\n
Server -> Client: 3
Client -> Server: INCR X\r\n
Server -> Client: 4

那么使用 PIPELINE 之后,命令就是类似于这么发送的

Client -> Server: INCR X\r\nINCR X\r\nINCR X\r\nINCR X\r\n
Server -> Client: 1\r\n2\r\n3\r\n4

我们可以看出,其实它的原理,就是客户端先将所有命令拼接在一起然后本地缓存起来,之后统一发到服务端,服务端执行所有命令之后,统一响应。

Lettuce 的连接有一个 AutoFlushCommands 配置,就是指在这个连接上执行的命令,如果发送到服务端。默认是 false,即收到一个命令就发到服务端一个。如果配置为 false,则将所有命令缓存起来,手动调用 flushCommands 的时候,将缓存的命令一起发到服务端,这样其实就是实现了 Pipeline。


配置 Spring-data-redis + Lettuce 使用 Pipeline


Spring-data-redis 从 2.3.0 版本开始,对于 Lettuce 也兼容了 Pipeline 配置,参考:

我们可以这样配置:

@Bean
public BeanPostProcessor lettuceConnectionFactoryBeanProcessor() {
    return new BeanPostProcessor() {
        @Override
        public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
            //在 LettuceConnectionFactory 这个 Bean 初始化之后,设置 PipeliningFlushPolicy 为 flushOnClose
            if (bean instanceof LettuceConnectionFactory) {
                LettuceConnectionFactory lettuceConnectionFactory = (LettuceConnectionFactory) bean;
                lettuceConnectionFactory.setPipeliningFlushPolicy(LettuceConnection.PipeliningFlushPolicy.flushOnClose());
                //多谢评论区的[孤胆枪手](https://juejin.cn/user/2084329775180605) 的指正,我之前忘了放这个配置了
                lettuceConnectionFactory.setShareNativeConnection(false);
            }
            return bean;
        }
    };
}

注意这里将 shareNativeConnection 设置为 false。本来基于 Lettuce 的 RedisTemplate 中大部分请求都可以通过共享连接使用同一个连接,关闭的话每次都获取的是独占连接。这种情况下我们要注意使用连接池(防止每次创建新连接),同时连接池需要大于可能的并发线程个数防止阻塞等待连接。

为啥要关闭这个共享链接呢,参考源码:

RedisClusterAsyncCommands<byte[], byte[]> getAsyncConnection() {
    //Redis事务才是 true
    if (this.isQueueing()) {
        return this.getAsyncDedicatedConnection();
    } else {
        //如果有共享连接则返回共享连接,否则返回独占连接,只有独占连接 PipeliningFlushPolicy 才会生效,PipeliningFlushPolicy 不会修改共享连接
        return (RedisClusterAsyncCommands)(this.asyncSharedConn != null && this.asyncSharedConn instanceof StatefulRedisConnection ? ((StatefulRedisConnection)this.asyncSharedConn).async() : this.getAsyncDedicatedConnection());
    }
}

由于我们要使用 PipeliningFlushPolicy,所以需要这里返回独占连接,也就不能打开共享连接。

我们来看下这个 PipeliningFlushPolicy 的源码就知道这个 flushOnClose 的意义:

public interface PipeliningFlushPolicy {
    //其实就是默认的每个命令都直接发到 Redis Server
    static PipeliningFlushPolicy flushEachCommand() {
    return FlushEachCommand.INSTANCE;
  }
  //在连接关闭的时候,将命令一起发到 Redis
  static PipeliningFlushPolicy flushOnClose() {
    return FlushOnClose.INSTANCE;
  }
  //手动设置在多少条命令之后,统一发到 Redis,但是同样的,连接关闭的时候也会发到 Redis
  static PipeliningFlushPolicy buffered(int bufferSize) {
    return () -> new BufferedFlushing(bufferSize);
  }
}

这三个类也都实现了 PipeliningFlushState 接口:

public interface PipeliningFlushState {
    //对于 executePipelined,刚开始就会调用 connection.openPipeline(); 开启 pipeline,里面会调用这个方法
    void onOpen(StatefulConnection<?, ?> connection);
    //对于 executePipelined 中的每个命令都会调用这个方法
    void onCommand(StatefulConnection<?, ?> connection);
    //在 executePipelined 的最后会调用 connection.closePipeline(),里面会调用这个方法
    void onClose(StatefulConnection<?, ?> connection);
}

默认的每个命令都直接发到 Redis Server 的实现是:其实就是方法里什么都不做。

private enum FlushEachCommand implements PipeliningFlushPolicy, PipeliningFlushState {
  INSTANCE;
  @Override
  public PipeliningFlushState newPipeline() {
    return INSTANCE;
  }
  @Override
  public void onOpen(StatefulConnection<?, ?> connection) {}
  @Override
  public void onCommand(StatefulConnection<?, ?> connection) {}
  @Override
  public void onClose(StatefulConnection<?, ?> connection) {}
}

对于 flushOnClose:

private enum FlushOnClose implements PipeliningFlushPolicy, PipeliningFlushState {
  INSTANCE;
  @Override
  public PipeliningFlushState newPipeline() {
    return INSTANCE;
  }
  @Override
  public void onOpen(StatefulConnection<?, ?> connection) {
      //首先配置连接的 AutoFlushCommands 为 false,这样命令就不会立刻发到 Redis
    connection.setAutoFlushCommands(false);
  }
  @Override
  public void onCommand(StatefulConnection<?, ?> connection) {
        //收到命令时什么都不做
  }
  @Override
  public void onClose(StatefulConnection<?, ?> connection) {
      //在 pipeline 关闭的时候发送所有命令
    connection.flushCommands();
    //恢复默认配置,这样连接如果退回连接池不会影响后续使用
    connection.setAutoFlushCommands(true);
  }
}

对于 buffered:

private static class BufferedFlushing implements PipeliningFlushState {
  private final AtomicLong commands = new AtomicLong();
  private final int flushAfter;
  public BufferedFlushing(int flushAfter) {
    this.flushAfter = flushAfter;
  }
  @Override
  public void onOpen(StatefulConnection<?, ?> connection) {
      //首先配置连接的 AutoFlushCommands 为 false,这样命令就不会立刻发到 Redis
    connection.setAutoFlushCommands(false);
  }
  @Override
  public void onCommand(StatefulConnection<?, ?> connection) {
      //如果命令达到指定个数,就发到 Redis
    if (commands.incrementAndGet() % flushAfter == 0) {
      connection.flushCommands();
    }
  }
  @Override
  public void onClose(StatefulConnection<?, ?> connection) {
      //在 pipeline 关闭的时候发送所有命令
    connection.flushCommands();
    //恢复默认配置,这样连接如果退回连接池不会影响后续使用
    connection.setAutoFlushCommands(true);
  }
}
相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
打赏
0
0
0
0
33
分享
相关文章
【赵渝强老师】Redis的管道Pipeline
Redis采用客户端-服务器模型和请求/响应协议,通常一个请求包括客户端发送查询请求并等待服务端响应。为了提高性能,Redis引入了管道PipeLine技术,可以一次性发送多条命令并一次性返回结果,减少客户端与服务器间的通信次数,从而降低往返延迟。示例代码展示了普通命令和管道命令在插入1万条数据时的性能差异,后者执行时间显著缩短。视频讲解提供了更详细的解释。
187 1
【Azure Redis】AKS中使用Lettuce连接Redis Cache出现 timed out 问题的解决思路
【Azure Redis】AKS中使用Lettuce连接Redis Cache出现 timed out 问题的解决思路
133 1
【Azure Redis】AKS中使用Lettuce连接Redis Cache出现 timed out 问题的解决思路
【Azure Redis】Lettuce客户端遇见连接Azure Redis长达15分钟的超时
【Azure Redis】Lettuce客户端遇见连接Azure Redis长达15分钟的超时
【Azure Redis 缓存】Lettuce 连接到Azure Redis服务,出现15分钟Timeout问题
【Azure Redis 缓存】Lettuce 连接到Azure Redis服务,出现15分钟Timeout问题
【Azure Redis 缓存】Lettuce 连接到Azure Redis服务,出现15分钟Timeout问题
【Azure Redis 缓存】定位Java Spring Boot 使用 Jedis 或 Lettuce 无法连接到 Redis的网络连通性步骤
【Azure Redis 缓存】定位Java Spring Boot 使用 Jedis 或 Lettuce 无法连接到 Redis的网络连通性步骤
108 0
|
7月前
|
Go语言中高效使用Redis的Pipeline
Redis 是构建高性能应用时常用的内存数据库,通过其 Pipeline 和 Watch 机制可批量执行命令并确保数据安全性。Pipeline 类似于超市购物一次性结账,减少网络交互时间,提升效率。Go 语言示例展示了如何使用 Pipeline 和 Pipelined 方法简化代码,并通过 TxPipeline 保证操作原子性。Watch 机制则通过监控键变化实现乐观锁,防止并发问题导致的数据不一致。这些机制简化了开发流程,提高了应用程序的性能和可靠性。
97 0
Lettuce的特性和内部实现问题之Redis的管道模式提升性能的问题如何解决
Lettuce的特性和内部实现问题之Redis的管道模式提升性能的问题如何解决
Lettuce的特性和内部实现问题之Lettuce天然地使用管道模式与Redis交互的问题如何解决
Lettuce的特性和内部实现问题之Lettuce天然地使用管道模式与Redis交互的问题如何解决
网络安全-----Redis12的Java客户端----客户端对比12,Jedis介绍,使用简单安全性不足,lettuce(官方默认)是基于Netty,支持同步,异步和响应式,并且线程是安全的,支持R
网络安全-----Redis12的Java客户端----客户端对比12,Jedis介绍,使用简单安全性不足,lettuce(官方默认)是基于Netty,支持同步,异步和响应式,并且线程是安全的,支持R
Redis客户端Lettuce深度分析介绍(上)
Spring Boot自2.0版本开始默认使用Lettuce作为Redis的客户端(注1)。Lettuce客户端基于Netty的NIO框架实现,对于大多数的Redis操作,只需要维持单一的连接即可高效支持业务端的并发请求 —— 这点与Jedis的连接池模式有很大不同。同时,Lettuce支持的特性更加全面,且其性能表现并不逊于,甚至优于Jedis。本文通过分析Lettuce的特性和内部实现(基于6.0版本),及其与Jedis的对照比较,对这两种客户端,以及Redis服务端进行深度探讨。
101833 8
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等