Spring-data-redis + Lettuce 如何使用 Pipeline

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: Spring-data-redis + Lettuce 如何使用 Pipeline

关于 spring-data-redis 和 lettuce,笔者写过不少文章:

最近,私信还有留言中,网友提到 spring-data-redis 和 lettuce 一起使用,pipeline 通过抓包一看,并没有生效,这个如何配置才能生效呢?

首先,在上面的文章中,我们分析过 Spring-data-redis + Lettuce 的基本原理,在这种环境下 RedisTemplate 使用的连接内部包括:

  • asyncSharedConn:可以为空,如果开启了连接共享,则不为空,默认是开启的;所有 LettuceConnection 共享的 Redis 连接,对于每个 LettuceConnection 实际上都是同一个连接;用于执行简单命令,因为 Netty 客户端与 Redis 的单处理线程特性,共享同一个连接也是很快的。如果没开启连接共享,则这个字段为空,使用 asyncDedicatedConn 执行命令。
  • asyncDedicatedConn:私有连接,如果需要保持会话,执行事务,以及 Pipeline 命令,固定连接,则必须使用这个 asyncDedicatedConn 执行 Redis 命令。

execute(RedisCallback),流程是:


image.png


对于 executePipelined(RedisCallback),如果使用正确的话,会使用 asyncDedicatedConn 私有连接执行。那么怎么算使用正确呢?

需要使用回调的连接进行 Redis 调用,不能直接使用redisTemplate调用,否则 pipeline 不生效

Pipeline 生效

List<Object> objects = redisTemplate.executePipelined(new RedisCallback<Object>() {
    @Override
    public Object doInRedis(RedisConnection connection) throws DataAccessException {
        connection.get("test".getBytes());
        connection.get("test2".getBytes());
        return null;
    }
});

Pipeline 不生效

List<Object> objects = redisTemplate.executePipelined(new RedisCallback<Object>() {
    @Override
    public Object doInRedis(RedisConnection connection) throws DataAccessException {
        redisTemplate.opsForValue().get("test");
        redisTemplate.opsForValue().get("test2");
        return null;
    }
});

这样我们就能使用保证 API 层正确使用 pipeline 了,但是默认配置的情况下, 底层还是没有执行 Pipeline,这是怎么回事呢?


Redis Pipeline 类比 Lettuce 中的 AutoFlushCommands


Redis Pipeline 是 Redis 中的 批量操作,它能将一组 Redis 命令进行组装,通过一次传输给 Redis 并返回结果集,大大减少了如果命令时一条条单独传输需要的 RTT 时间(包括 Redis 客户端,Redis 服务端切换系统调用发送接收数据的时间,以及网络传输时间)。

如果原来的命令是这么发送的:

Client -> Server: INCR X\r\n
Server -> Client: 1
Client -> Server: INCR X\r\n
Server -> Client: 2
Client -> Server: INCR X\r\n
Server -> Client: 3
Client -> Server: INCR X\r\n
Server -> Client: 4

那么使用 PIPELINE 之后,命令就是类似于这么发送的

Client -> Server: INCR X\r\nINCR X\r\nINCR X\r\nINCR X\r\n
Server -> Client: 1\r\n2\r\n3\r\n4

我们可以看出,其实它的原理,就是客户端先将所有命令拼接在一起然后本地缓存起来,之后统一发到服务端,服务端执行所有命令之后,统一响应。

Lettuce 的连接有一个 AutoFlushCommands 配置,就是指在这个连接上执行的命令,如果发送到服务端。默认是 false,即收到一个命令就发到服务端一个。如果配置为 false,则将所有命令缓存起来,手动调用 flushCommands 的时候,将缓存的命令一起发到服务端,这样其实就是实现了 Pipeline。


配置 Spring-data-redis + Lettuce 使用 Pipeline


Spring-data-redis 从 2.3.0 版本开始,对于 Lettuce 也兼容了 Pipeline 配置,参考:

我们可以这样配置:

@Bean
public BeanPostProcessor lettuceConnectionFactoryBeanProcessor() {
    return new BeanPostProcessor() {
        @Override
        public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
            //在 LettuceConnectionFactory 这个 Bean 初始化之后,设置 PipeliningFlushPolicy 为 flushOnClose
            if (bean instanceof LettuceConnectionFactory) {
                LettuceConnectionFactory lettuceConnectionFactory = (LettuceConnectionFactory) bean;
                lettuceConnectionFactory.setPipeliningFlushPolicy(LettuceConnection.PipeliningFlushPolicy.flushOnClose());
                //多谢评论区的[孤胆枪手](https://juejin.cn/user/2084329775180605) 的指正,我之前忘了放这个配置了
                lettuceConnectionFactory.setShareNativeConnection(false);
            }
            return bean;
        }
    };
}

注意这里将 shareNativeConnection 设置为 false。本来基于 Lettuce 的 RedisTemplate 中大部分请求都可以通过共享连接使用同一个连接,关闭的话每次都获取的是独占连接。这种情况下我们要注意使用连接池(防止每次创建新连接),同时连接池需要大于可能的并发线程个数防止阻塞等待连接。

为啥要关闭这个共享链接呢,参考源码:

RedisClusterAsyncCommands<byte[], byte[]> getAsyncConnection() {
    //Redis事务才是 true
    if (this.isQueueing()) {
        return this.getAsyncDedicatedConnection();
    } else {
        //如果有共享连接则返回共享连接,否则返回独占连接,只有独占连接 PipeliningFlushPolicy 才会生效,PipeliningFlushPolicy 不会修改共享连接
        return (RedisClusterAsyncCommands)(this.asyncSharedConn != null && this.asyncSharedConn instanceof StatefulRedisConnection ? ((StatefulRedisConnection)this.asyncSharedConn).async() : this.getAsyncDedicatedConnection());
    }
}

由于我们要使用 PipeliningFlushPolicy,所以需要这里返回独占连接,也就不能打开共享连接。

我们来看下这个 PipeliningFlushPolicy 的源码就知道这个 flushOnClose 的意义:

public interface PipeliningFlushPolicy {
    //其实就是默认的每个命令都直接发到 Redis Server
    static PipeliningFlushPolicy flushEachCommand() {
    return FlushEachCommand.INSTANCE;
  }
  //在连接关闭的时候,将命令一起发到 Redis
  static PipeliningFlushPolicy flushOnClose() {
    return FlushOnClose.INSTANCE;
  }
  //手动设置在多少条命令之后,统一发到 Redis,但是同样的,连接关闭的时候也会发到 Redis
  static PipeliningFlushPolicy buffered(int bufferSize) {
    return () -> new BufferedFlushing(bufferSize);
  }
}

这三个类也都实现了 PipeliningFlushState 接口:

public interface PipeliningFlushState {
    //对于 executePipelined,刚开始就会调用 connection.openPipeline(); 开启 pipeline,里面会调用这个方法
    void onOpen(StatefulConnection<?, ?> connection);
    //对于 executePipelined 中的每个命令都会调用这个方法
    void onCommand(StatefulConnection<?, ?> connection);
    //在 executePipelined 的最后会调用 connection.closePipeline(),里面会调用这个方法
    void onClose(StatefulConnection<?, ?> connection);
}

默认的每个命令都直接发到 Redis Server 的实现是:其实就是方法里什么都不做。

private enum FlushEachCommand implements PipeliningFlushPolicy, PipeliningFlushState {
  INSTANCE;
  @Override
  public PipeliningFlushState newPipeline() {
    return INSTANCE;
  }
  @Override
  public void onOpen(StatefulConnection<?, ?> connection) {}
  @Override
  public void onCommand(StatefulConnection<?, ?> connection) {}
  @Override
  public void onClose(StatefulConnection<?, ?> connection) {}
}

对于 flushOnClose:

private enum FlushOnClose implements PipeliningFlushPolicy, PipeliningFlushState {
  INSTANCE;
  @Override
  public PipeliningFlushState newPipeline() {
    return INSTANCE;
  }
  @Override
  public void onOpen(StatefulConnection<?, ?> connection) {
      //首先配置连接的 AutoFlushCommands 为 false,这样命令就不会立刻发到 Redis
    connection.setAutoFlushCommands(false);
  }
  @Override
  public void onCommand(StatefulConnection<?, ?> connection) {
        //收到命令时什么都不做
  }
  @Override
  public void onClose(StatefulConnection<?, ?> connection) {
      //在 pipeline 关闭的时候发送所有命令
    connection.flushCommands();
    //恢复默认配置,这样连接如果退回连接池不会影响后续使用
    connection.setAutoFlushCommands(true);
  }
}

对于 buffered:

private static class BufferedFlushing implements PipeliningFlushState {
  private final AtomicLong commands = new AtomicLong();
  private final int flushAfter;
  public BufferedFlushing(int flushAfter) {
    this.flushAfter = flushAfter;
  }
  @Override
  public void onOpen(StatefulConnection<?, ?> connection) {
      //首先配置连接的 AutoFlushCommands 为 false,这样命令就不会立刻发到 Redis
    connection.setAutoFlushCommands(false);
  }
  @Override
  public void onCommand(StatefulConnection<?, ?> connection) {
      //如果命令达到指定个数,就发到 Redis
    if (commands.incrementAndGet() % flushAfter == 0) {
      connection.flushCommands();
    }
  }
  @Override
  public void onClose(StatefulConnection<?, ?> connection) {
      //在 pipeline 关闭的时候发送所有命令
    connection.flushCommands();
    //恢复默认配置,这样连接如果退回连接池不会影响后续使用
    connection.setAutoFlushCommands(true);
  }
}
相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
NoSQL Java Redis
redis集群拓扑结构自动更新:使用Lettuce连接Cluster集群实例时异常处理(一)
redis集群拓扑结构自动更新:使用Lettuce连接Cluster集群实例时异常处理
795 0
|
3月前
|
缓存 NoSQL 网络协议
【Azure Redis 缓存】Lettuce 连接到Azure Redis服务,出现15分钟Timeout问题
【Azure Redis 缓存】Lettuce 连接到Azure Redis服务,出现15分钟Timeout问题
【Azure Redis 缓存】Lettuce 连接到Azure Redis服务,出现15分钟Timeout问题
|
3月前
|
NoSQL 网络协议 Redis
【Azure Redis】AKS中使用Lettuce连接Redis Cache出现 timed out 问题的解决思路
【Azure Redis】AKS中使用Lettuce连接Redis Cache出现 timed out 问题的解决思路
【Azure Redis】AKS中使用Lettuce连接Redis Cache出现 timed out 问题的解决思路
|
3月前
|
NoSQL 网络协议 Linux
【Azure Redis】Lettuce客户端遇见连接Azure Redis长达15分钟的超时
【Azure Redis】Lettuce客户端遇见连接Azure Redis长达15分钟的超时
|
3月前
|
缓存 NoSQL Java
【Azure Redis 缓存】定位Java Spring Boot 使用 Jedis 或 Lettuce 无法连接到 Redis的网络连通性步骤
【Azure Redis 缓存】定位Java Spring Boot 使用 Jedis 或 Lettuce 无法连接到 Redis的网络连通性步骤
|
3月前
|
NoSQL Java 调度
Lettuce的特性和内部实现问题之Redis的管道模式提升性能的问题如何解决
Lettuce的特性和内部实现问题之Redis的管道模式提升性能的问题如何解决
|
3月前
|
NoSQL 网络协议 安全
Lettuce的特性和内部实现问题之Lettuce天然地使用管道模式与Redis交互的问题如何解决
Lettuce的特性和内部实现问题之Lettuce天然地使用管道模式与Redis交互的问题如何解决
|
6月前
|
NoSQL Java 数据库连接
springboot整合Redis中连接池jedis与lettuce的对比和实现
springboot整合Redis中连接池jedis与lettuce的对比和实现
917 0
|
4月前
|
安全 NoSQL Java
网络安全-----Redis12的Java客户端----客户端对比12,Jedis介绍,使用简单安全性不足,lettuce(官方默认)是基于Netty,支持同步,异步和响应式,并且线程是安全的,支持R
网络安全-----Redis12的Java客户端----客户端对比12,Jedis介绍,使用简单安全性不足,lettuce(官方默认)是基于Netty,支持同步,异步和响应式,并且线程是安全的,支持R
|
6月前
|
NoSQL 网络协议 Java
Redis客户端Lettuce深度分析介绍(上)
Spring Boot自2.0版本开始默认使用Lettuce作为Redis的客户端(注1)。Lettuce客户端基于Netty的NIO框架实现,对于大多数的Redis操作,只需要维持单一的连接即可高效支持业务端的并发请求 —— 这点与Jedis的连接池模式有很大不同。同时,Lettuce支持的特性更加全面,且其性能表现并不逊于,甚至优于Jedis。本文通过分析Lettuce的特性和内部实现(基于6.0版本),及其与Jedis的对照比较,对这两种客户端,以及Redis服务端进行深度探讨。
101579 8