Netty:模拟Redis的客户端
因为redis是部署在服务器上的 我们只需要模拟客户端发送请求即可所以只需要编写客户端的代码就可以了
前置知识
编写前我们需要知道 redis的请求规范
Redis 的通信 是需要遵循 RESP 协议的
例子:
set hello 123 *3\r\n $3 \r\n set \r\n hello \r\n 123
建立 channel 通道后 发送命令给服务端 此时是写数据【输出】 在出战handler里增加逻辑 当接受响应后 此时数据需要【输入】 存入栈 handler 中 在入栈handler 里增加逻辑
Netty中 Redis 命令对应的类型
- 单行
- 错误
- 整形数字
- 批量回复
- 多个批量回复
客户端
我们只需要编写客户端。请求搭建redis的服务器接受和传输数据即可就可以了
客户端还是常见的写法
public class RedisClient { private static final String HOST = "xxxxxxxxx"; private static final int PORT = 6379; public static void main(String[] args) { NioEventLoopGroup bossGroup = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap(); bootstrap.group(bossGroup) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { // 调用netty 提供的支持netty 协议的编解码器 // RedisBulkStringAggregator 和 RedisarrayAggregator 是协议中两种特殊格式的聚合器 ChannelPipeline pipeline = socketChannel.pipeline(); pipeline.addLast(new RedisDecoder()); pipeline.addLast(new RedisBulkStringAggregator()); pipeline.addLast(new RedisArrayAggregator()); pipeline.addLast(new RedisEncoder()); // 我们自己的处理器 pipeline.addLast(new RedisClientHandler()); } }); try { ChannelFuture syncFuture = bootstrap.connect(HOST, PORT).sync(); System.out.println("enter redis commands"); BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in)); for (; ; ) { String input = bufferedReader.readLine(); //退出任务 if ("quit".equals(input)) { syncFuture.channel().close().sync(); break; } syncFuture.channel().writeAndFlush(input).sync(); } syncFuture.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { bossGroup.shutdownGracefully(); } } }
处理器
因为要读取和写入 所以我们需要集成混合处理器类 : ChannelDuplexHandler
我们需要判断五种请求类型 所以先封装一个类别输出的方法
RedisMessage Netty提供的redis的信息对象,以下五种类型都来自于它
private void printRedisResponse(RedisMessage msg) { if (msg instanceof SimpleStringRedisMessage) { SimpleStringRedisMessage tmpMsg = (SimpleStringRedisMessage) msg; System.out.println(tmpMsg.content()); } else if (msg instanceof ErrorRedisMessage) { ErrorRedisMessage tmpMsg = (ErrorRedisMessage) msg; System.out.println(tmpMsg.content()); } else if (msg instanceof IntegerRedisMessage) { IntegerRedisMessage tmpMsg = (IntegerRedisMessage) msg; System.out.println(tmpMsg.value()); } else if (msg instanceof FullBulkStringRedisMessage) { FullBulkStringRedisMessage tmpMsg = (FullBulkStringRedisMessage) msg; if (tmpMsg.isNull()) { return; } System.out.println(tmpMsg.content().toString(CharsetUtil.UTF_8)); } else if (msg instanceof ArrayRedisMessage) { ArrayRedisMessage tmpMsg = (ArrayRedisMessage) msg; for (RedisMessage child : tmpMsg.children()) { printRedisResponse(child); } } // 如果都是不是应该抛出异常 }
写命令
// 写 // 出栈handler 的常用方法 //需要处理redis 命令 @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { // 键盘传入的字符串 如 : keys * String commandStr = (String) msg; // 通过空格来分离出命令 String[] commandArr = commandStr.split("\\s+"); // 接受redis 命令的列表 RedisMessag保存命令的内容 List<RedisMessage> redisMessageList = new ArrayList<>(commandArr.length); for (String cmdStr : commandArr) { FullBulkStringRedisMessage message = new FullBulkStringRedisMessage( ByteBufUtil.writeUtf8(ctx.alloc(), cmdStr)); redisMessageList.add(message); } //将分离的命令 合并成一个完整的命令 RedisMessage request = new ArrayRedisMessage(redisMessageList); //写入通道 ctx.write(request, promise); }
读取命令
// 读取 @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // 此时接受的时候这个值 就是我们存储redismessage 的对象 RedisMessage redisMessage = (RedisMessage) msg; // 返回的结果是不同类型 分开进行处理 printRedisResponse(redisMessage); // 内存管理使用了 引用计数的方式 所以我们需要释放资源 ReferenceCountUtil.release(redisMessage); }
演示效果