一文彻底理解Redis序列化协议,你也可以编写Redis客户端(下)

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介: 最近学习Netty的时候想做一个基于Redis服务协议的编码解码模块,过程中顺便阅读了Redis服务序列化协议RESP,结合自己的理解对文档进行了翻译并且简单实现了RESP基于Java语言的解析。编写本文的使用使用的JDK版本为[8+]。

基于RESP编写高性能解析器



因为JDK原生提供的字节缓冲区java.nio.ByteBuffer存在不能自动扩容、需要切换读写模式等等问题,这里直接引入Netty并且使用Netty提供的ByteBuf进行RESP数据类型解析。编写本文的时候(2019-10-09)Netty的最新版本为4.1.42.Final。引入依赖:


<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-buffer</artifactId>
    <version>4.1.42.Final</version>
</dependency>
复制代码


定义解码器接口:


public interface RespDecoder<V>{
    V decode(ByteBuf buffer);
}
复制代码


定义常量:


public class RespConstants {
    public static final Charset ASCII = StandardCharsets.US_ASCII;
    public static final Charset UTF_8 = StandardCharsets.UTF_8;
    public static final byte DOLLAR_BYTE = '$';
    public static final byte ASTERISK_BYTE = '*';
    public static final byte PLUS_BYTE = '+';
    public static final byte MINUS_BYTE = '-';
    public static final byte COLON_BYTE = ':';
    public static final String EMPTY_STRING = "";
    public static final Long ZERO = 0L;
    public static final Long NEGATIVE_ONE = -1L;
    public static final byte CR = (byte) '\r';
    public static final byte LF = (byte) '\n';
    public static final byte[] CRLF = "\r\n".getBytes(ASCII);
    public enum ReplyType {
        SIMPLE_STRING,
        ERROR,
        INTEGER,
        BULK_STRING,
        RESP_ARRAY
    }
}
复制代码


下面的章节中解析模块的实现已经忽略第一个字节的解析,因为第一个字节是决定具体的数据类型。


解析简单字符串


简单字符串类型就是单行字符串,它的解析结果对应的就是Java中的String类型。解码器实现如下:


// 解析单行字符串
public class LineStringDecoder implements RespDecoder<String> {
    @Override
    public String decode(ByteBuf buffer) {
        return CodecUtils.X.readLine(buffer);
    }
}
public enum CodecUtils {
    X;
    public int findLineEndIndex(ByteBuf buffer) {
        int index = buffer.forEachByte(ByteProcessor.FIND_LF);
        return (index > 0 && buffer.getByte(index - 1) == '\r') ? index : -1;
    }
    public String readLine(ByteBuf buffer) {
        int lineEndIndex = findLineEndIndex(buffer);
        if (lineEndIndex > -1) {
            int lineStartIndex = buffer.readerIndex();
            // 计算字节长度
            int size = lineEndIndex - lineStartIndex - 1;
            byte[] bytes = new byte[size];
            buffer.readBytes(bytes);
            // 重置读游标为\r\n之后的第一个字节
            buffer.readerIndex(lineEndIndex + 1);
            buffer.markReaderIndex();
            return new String(bytes, RespConstants.UTF_8);
        }
        return null;
    }
}
public class RespSimpleStringDecoder extends LineStringDecoder {
}
复制代码


这里抽取出一个类LineStringDecoder用于解析单行字符串,这样在解析错误消息的时候可以做一次继承即可。测试一下:


public static void main(String[] args) throws Exception {
    ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
    // +OK\r\n
    buffer.writeBytes("+OK".getBytes(RespConstants.UTF_8));
    buffer.writeBytes(RespConstants.CRLF);
    String value = RespCodec.X.decode(buffer);
    log.info("Decode result:{}", value);
}
// Decode result:OK
复制代码


解析错误消息


错误消息的本质也是单行字符串,所以其解码的实现可以和简单字符串的解码实现一致。错误消息数据类型的解码器如下:


public class RespErrorDecoder extends LineStringDecoder {
}
复制代码


测试一下:


public static void main(String[] args) throws Exception {
    ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
    // -ERR unknown command 'foobar'\r\n
    buffer.writeBytes("-ERR unknown command 'foobar'".getBytes(RespConstants.UTF_8));
    buffer.writeBytes(RespConstants.CRLF);
    String value = RespCodec.X.decode(buffer);
    log.info("Decode result:{}", value);
}
// Decode result:ERR unknown command 'foobar'
复制代码


解析整型数字


整型数字类型,本质就是需要从字节序列中还原出带符号的64bit的长整型,因为是带符号的,类型标识位:后的第一个字节需要判断是否负数字符-,因为是从左向右解析,然后每解析出一个新的位,当前的数字值要乘10。其解码器的实现如下:


public class RespIntegerDecoder implements RespDecoder<Long> {
    @Override
    public Long decode(ByteBuf buffer) {
        int lineEndIndex = CodecUtils.X.findLineEndIndex(buffer);
        // 没有行尾,异常
        if (-1 == lineEndIndex) {
            return null;
        }
        long result = 0L;
        int lineStartIndex = buffer.readerIndex();
        boolean negative = false;
        byte firstByte = buffer.getByte(lineStartIndex);
        // 负数
        if (RespConstants.MINUS_BYTE == firstByte) {
            negative = true;
        } else {
            int digit = firstByte - '0';
            result = result * 10 + digit;
        }
        for (int i = lineStartIndex + 1; i < (lineEndIndex - 1); i++) {
            byte value = buffer.getByte(i);
            int digit = value - '0';
            result = result * 10 + digit;
        }
        if (negative) {
            result = -result;
        }
        // 重置读游标为\r\n之后的第一个字节
        buffer.readerIndex(lineEndIndex + 1);
        return result;
    }
}
复制代码


整型数字类型的解析相对复杂,一定要注意负数判断。测试一下:


public static void main(String[] args) throws Exception {
    ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
    // :-1000\r\n
    buffer.writeBytes(":-1000".getBytes(RespConstants.UTF_8));
    buffer.writeBytes(RespConstants.CRLF);
    Long value = RespCodec.X.decode(buffer);
    log.info("Decode result:{}", value);
}
// Decode result:-1000
复制代码


解析定长字符串


定长字符串类型解析的关键是先读取类型标识符$后的第一个字节序列分块解析成64bit带符号的整数,用来确定后面需要解析的字符串内容的字节长度,然后再按照该长度读取后面的字节。其解码器实现如下:


public class RespBulkStringDecoder implements RespDecoder<String> {
    @Override
    public String decode(ByteBuf buffer) {
        int lineEndIndex = CodecUtils.X.findLineEndIndex(buffer);
        if (-1 == lineEndIndex) {
            return null;
        }
        // 使用RespIntegerDecoder读取长度
        Long length = (Long) DefaultRespCodec.DECODERS.get(ReplyType.INTEGER).decode(buffer);
        if (null == length) {
            return null;
        }
        // Bulk Null String
        if (RespConstants.NEGATIVE_ONE.equals(length)) {
            return null;
        }
        // Bulk Empty String
        if (RespConstants.ZERO.equals(length)) {
            return RespConstants.EMPTY_STRING;
        }
        // 真实字节内容的长度
        int readLength = (int) length.longValue();
        if (buffer.readableBytes() > readLength) {
            byte[] bytes = new byte[readLength];
            buffer.readBytes(bytes);
            // 重置读游标为\r\n之后的第一个字节
            buffer.readerIndex(buffer.readerIndex() + 2);
            return new String(bytes, RespConstants.UTF_8);
        }
        return null;
    }
}
复制代码


测试一下:


public static void main(String[] args) throws Exception{
    ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
    // $6\r\nthrowable\r\n
    buffer = ByteBufAllocator.DEFAULT.buffer();
    buffer.writeBytes("$9".getBytes(RespConstants.UTF_8));
    buffer.writeBytes(RespConstants.CRLF);
    buffer.writeBytes("throwable".getBytes(RespConstants.UTF_8));
    buffer.writeBytes(RespConstants.CRLF);
    String value = RespCodec.X.decode(buffer);
    log.info("Decode result:{}", value);
}
// Decode result:throwable
复制代码


解析RESP数组


RESP数组类型解析的关键:


  • 先读取类型标识符*后的第一个字节序列分块解析成64bit带符号的整数,确定数组中的元素个数。
  • 递归解析每个元素。


参考过不少Redis协议解析框架,不少是用栈或者状态机实现,这里先简单点用递归实现,解码器代码如下:


public class RespArrayDecoder implements RespDecoder {
    @Override
    public Object decode(ByteBuf buffer) {
        int lineEndIndex = CodecUtils.X.findLineEndIndex(buffer);
        if (-1 == lineEndIndex) {
            return null;
        }
        // 解析元素个数
        Long length = (Long) DefaultRespCodec.DECODERS.get(ReplyType.INTEGER).decode(buffer);
        if (null == length) {
            return null;
        }
        // Null Array
        if (RespConstants.NEGATIVE_ONE.equals(length)) {
            return null;
        }
        // Array Empty List
        if (RespConstants.ZERO.equals(length)) {
            return Lists.newArrayList();
        }
        List<Object> result = Lists.newArrayListWithCapacity((int) length.longValue());
        // 递归
        for (int i = 0; i < length; i++) {
            result.add(DefaultRespCodec.X.decode(buffer));
        }
        return result;
    }
}
复制代码


测试一下:


public static void main(String[] args) throws Exception {
    ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
    //*2\r\n$3\r\nfoo\r\n$3\r\nbar\r\n
    buffer = ByteBufAllocator.DEFAULT.buffer();
    buffer.writeBytes("*2".getBytes(RespConstants.UTF_8));
    buffer.writeBytes(RespConstants.CRLF);
    buffer.writeBytes("$3".getBytes(RespConstants.UTF_8));
    buffer.writeBytes(RespConstants.CRLF);
    buffer.writeBytes("foo".getBytes(RespConstants.UTF_8));
    buffer.writeBytes(RespConstants.CRLF);
    buffer.writeBytes("$3".getBytes(RespConstants.UTF_8));
    buffer.writeBytes(RespConstants.CRLF);
    buffer.writeBytes("bar".getBytes(RespConstants.UTF_8));
    buffer.writeBytes(RespConstants.CRLF);
    List value = RespCodec.X.decode(buffer);
    log.info("Decode result:{}", value);
}
// Decode result:[foo, bar]
复制代码


小结



RESP的内容和其编码解码的过程有相对深刻的认识后,就可以基于Netty编写Redis服务的编码解码模块,作为Netty入门的十分有意义的例子。本文的最后一节只演示了RESP的解码部分,编码模块和更多细节会在另一篇用Netty实现Redis客户端的文章中展示。


参考资料:


链接



希望你能读到这里,然后发现我:


附录



本文涉及的所有代码:


public class RespConstants {
    public static final Charset ASCII = StandardCharsets.US_ASCII;
    public static final Charset UTF_8 = StandardCharsets.UTF_8;
    public static final byte DOLLAR_BYTE = '$';
    public static final byte ASTERISK_BYTE = '*';
    public static final byte PLUS_BYTE = '+';
    public static final byte MINUS_BYTE = '-';
    public static final byte COLON_BYTE = ':';
    public static final String EMPTY_STRING = "";
    public static final Long ZERO = 0L;
    public static final Long NEGATIVE_ONE = -1L;
    public static final byte CR = (byte) '\r';
    public static final byte LF = (byte) '\n';
    public static final byte[] CRLF = "\r\n".getBytes(ASCII);
    public enum ReplyType {
        SIMPLE_STRING,
        ERROR,
        INTEGER,
        BULK_STRING,
        RESP_ARRAY
    }
}
public enum CodecUtils {
    X;
    public int findLineEndIndex(ByteBuf buffer) {
        int index = buffer.forEachByte(ByteProcessor.FIND_LF);
        return (index > 0 && buffer.getByte(index - 1) == '\r') ? index : -1;
    }
    public String readLine(ByteBuf buffer) {
        int lineEndIndex = findLineEndIndex(buffer);
        if (lineEndIndex > -1) {
            int lineStartIndex = buffer.readerIndex();
            // 计算字节长度
            int size = lineEndIndex - lineStartIndex - 1;
            byte[] bytes = new byte[size];
            buffer.readBytes(bytes);
            // 重置读游标为\r\n之后的第一个字节
            buffer.readerIndex(lineEndIndex + 1);
            buffer.markReaderIndex();
            return new String(bytes, RespConstants.UTF_8);
        }
        return null;
    }
}
public interface RespCodec {
    RespCodec X = DefaultRespCodec.X;
    <IN, OUT> OUT decode(ByteBuf buffer);
    <IN, OUT> ByteBuf encode(IN in);
}
public enum DefaultRespCodec implements RespCodec {
    X;
    static final Map<ReplyType, RespDecoder> DECODERS = Maps.newConcurrentMap();
    private static final RespDecoder DEFAULT_DECODER = new DefaultRespDecoder();
    static {
        DECODERS.put(ReplyType.SIMPLE_STRING, new RespSimpleStringDecoder());
        DECODERS.put(ReplyType.ERROR, new RespErrorDecoder());
        DECODERS.put(ReplyType.INTEGER, new RespIntegerDecoder());
        DECODERS.put(ReplyType.BULK_STRING, new RespBulkStringDecoder());
        DECODERS.put(ReplyType.RESP_ARRAY, new RespArrayDecoder());
    }
    @SuppressWarnings("unchecked")
    @Override
    public <IN, OUT> OUT decode(ByteBuf buffer) {
        return (OUT) DECODERS.getOrDefault(determineReplyType(buffer), DEFAULT_DECODER).decode(buffer);
    }
    private ReplyType determineReplyType(ByteBuf buffer) {
        byte firstByte = buffer.readByte();
        ReplyType replyType;
        switch (firstByte) {
            case RespConstants.PLUS_BYTE:
                replyType = ReplyType.SIMPLE_STRING;
                break;
            case RespConstants.MINUS_BYTE:
                replyType = ReplyType.ERROR;
                break;
            case RespConstants.COLON_BYTE:
                replyType = ReplyType.INTEGER;
                break;
            case RespConstants.DOLLAR_BYTE:
                replyType = ReplyType.BULK_STRING;
                break;
            case RespConstants.ASTERISK_BYTE:
                replyType = ReplyType.RESP_ARRAY;
                break;
            default: {
                throw new IllegalArgumentException("first byte:" + firstByte);
            }
        }
        return replyType;
    }
    @Override
    public <IN, OUT> ByteBuf encode(IN in) {
        // TODO
        throw new UnsupportedOperationException("encode");
    }
}
public interface RespDecoder<V> {
    V decode(ByteBuf buffer);
}
public class DefaultRespDecoder implements RespDecoder {
    @Override
    public Object decode(ByteBuf buffer) {
        throw new IllegalStateException("decoder");
    }
}
public class LineStringDecoder implements RespDecoder<String> {
    @Override
    public String decode(ByteBuf buffer) {
        return CodecUtils.X.readLine(buffer);
    }
}
public class RespSimpleStringDecoder extends LineStringDecoder {
}
public class RespErrorDecoder extends LineStringDecoder {
}
public class RespIntegerDecoder implements RespDecoder<Long> {
    @Override
    public Long decode(ByteBuf buffer) {
        int lineEndIndex = CodecUtils.X.findLineEndIndex(buffer);
        // 没有行尾,异常
        if (-1 == lineEndIndex) {
            return null;
        }
        long result = 0L;
        int lineStartIndex = buffer.readerIndex();
        boolean negative = false;
        byte firstByte = buffer.getByte(lineStartIndex);
        // 负数
        if (RespConstants.MINUS_BYTE == firstByte) {
            negative = true;
        } else {
            int digit = firstByte - '0';
            result = result * 10 + digit;
        }
        for (int i = lineStartIndex + 1; i < (lineEndIndex - 1); i++) {
            byte value = buffer.getByte(i);
            int digit = value - '0';
            result = result * 10 + digit;
        }
        if (negative) {
            result = -result;
        }
        // 重置读游标为\r\n之后的第一个字节
        buffer.readerIndex(lineEndIndex + 1);
        return result;
    }
}
public class RespBulkStringDecoder implements RespDecoder<String> {
    @Override
    public String decode(ByteBuf buffer) {
        int lineEndIndex = CodecUtils.X.findLineEndIndex(buffer);
        if (-1 == lineEndIndex) {
            return null;
        }
        Long length = (Long) DefaultRespCodec.DECODERS.get(ReplyType.INTEGER).decode(buffer);
        if (null == length) {
            return null;
        }
        // Bulk Null String
        if (RespConstants.NEGATIVE_ONE.equals(length)) {
            return null;
        }
        // Bulk Empty String
        if (RespConstants.ZERO.equals(length)) {
            return RespConstants.EMPTY_STRING;
        }
        // 真实字节内容的长度
        int readLength = (int) length.longValue();
        if (buffer.readableBytes() > readLength) {
            byte[] bytes = new byte[readLength];
            buffer.readBytes(bytes);
            // 重置读游标为\r\n之后的第一个字节
            buffer.readerIndex(buffer.readerIndex() + 2);
            return new String(bytes, RespConstants.UTF_8);
        }
        return null;
    }
}
public class RespArrayDecoder implements RespDecoder {
    @Override
    public Object decode(ByteBuf buffer) {
        int lineEndIndex = CodecUtils.X.findLineEndIndex(buffer);
        if (-1 == lineEndIndex) {
            return null;
        }
        // 解析元素个数
        Long length = (Long) DefaultRespCodec.DECODERS.get(ReplyType.INTEGER).decode(buffer);
        if (null == length) {
            return null;
        }
        // Null Array
        if (RespConstants.NEGATIVE_ONE.equals(length)) {
            return null;
        }
        // Array Empty List
        if (RespConstants.ZERO.equals(length)) {
            return Lists.newArrayList();
        }
        List<Object> result = Lists.newArrayListWithCapacity((int) length.longValue());
        // 递归
        for (int i = 0; i < length; i++) {
            result.add(DefaultRespCodec.X.decode(buffer));
        }
        return result;
    }
}


相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
2月前
|
NoSQL Redis 数据安全/隐私保护
Redis 最流行的图形化界面下载及使用超详细教程(带安装包)! redis windows客户端下载
文章提供了Redis最流行的图形化界面工具Another Redis Desktop Manager的下载及使用教程,包括如何下载、解压、连接Redis服务器以及使用控制台和查看数据类型详细信息。
178 6
Redis 最流行的图形化界面下载及使用超详细教程(带安装包)! redis windows客户端下载
|
2月前
|
NoSQL Redis 数据库
Redis 图形化界面下载及使用超详细教程(带安装包)! redis windows下客户端下载
文章提供了Redis图形化界面工具的下载及使用教程,包括如何连接本地Redis服务器、操作键值对、查看日志和使用命令行等功能。
174 0
Redis 图形化界面下载及使用超详细教程(带安装包)! redis windows下客户端下载
|
2月前
|
NoSQL 网络协议 算法
Redis 客户端连接
10月更文挑战第21天
38 1
|
3月前
|
JSON NoSQL Java
redis的java客户端的使用(Jedis、SpringDataRedis、SpringBoot整合redis、redisTemplate序列化及stringRedisTemplate序列化)
这篇文章介绍了在Java中使用Redis客户端的几种方法,包括Jedis、SpringDataRedis和SpringBoot整合Redis的操作。文章详细解释了Jedis的基本使用步骤,Jedis连接池的创建和使用,以及在SpringBoot项目中如何配置和使用RedisTemplate和StringRedisTemplate。此外,还探讨了RedisTemplate序列化的两种实践方案,包括默认的JDK序列化和自定义的JSON序列化,以及StringRedisTemplate的使用,它要求键和值都必须是String类型。
redis的java客户端的使用(Jedis、SpringDataRedis、SpringBoot整合redis、redisTemplate序列化及stringRedisTemplate序列化)
|
2月前
|
存储 消息中间件 NoSQL
Redis 入门 - C#.NET Core客户端库六种选择
Redis 入门 - C#.NET Core客户端库六种选择
69 8
|
2月前
|
JSON 缓存 NoSQL
Redis 在线查看序列化对象技术详解
Redis 在线查看序列化对象技术详解
45 2
|
3月前
|
JSON 缓存 NoSQL
redis序列化数据时,如何包含clsss类型信息?
通过配置 `com.fasterxml.jackson.databind.ObjectMapper` 的 `enableDefaultTyping` 方法,可以使序列化后的 JSON 包含类信息。
58 2
|
1月前
|
JSON 数据格式 索引
Python中序列化/反序列化JSON格式的数据
【11月更文挑战第4天】本文介绍了 Python 中使用 `json` 模块进行序列化和反序列化的操作。序列化是指将 Python 对象(如字典、列表)转换为 JSON 字符串,主要使用 `json.dumps` 方法。示例包括基本的字典和列表序列化,以及自定义类的序列化。反序列化则是将 JSON 字符串转换回 Python 对象,使用 `json.loads` 方法。文中还提供了具体的代码示例,展示了如何处理不同类型的 Python 对象。
|
1月前
|
存储 安全 Java
Java编程中的对象序列化与反序列化
【10月更文挑战第22天】在Java的世界里,对象序列化和反序列化是数据持久化和网络传输的关键技术。本文将带你了解如何在Java中实现对象的序列化与反序列化,并探讨其背后的原理。通过实际代码示例,我们将一步步展示如何将复杂数据结构转换为字节流,以及如何将这些字节流还原为Java对象。文章还将讨论在使用序列化时应注意的安全性问题,以确保你的应用程序既高效又安全。
|
2月前
|
存储 Java
Java编程中的对象序列化与反序列化
【10月更文挑战第9天】在Java的世界里,对象序列化是连接数据持久化与网络通信的桥梁。本文将深入探讨Java对象序列化的机制、实践方法及反序列化过程,通过代码示例揭示其背后的原理。从基础概念到高级应用,我们将一步步揭开序列化技术的神秘面纱,让读者能够掌握这一强大工具,以应对数据存储和传输的挑战。
下一篇
DataWorks