基于Redis6.0 部署迷你版本消息队列实战(下)

简介: 基于Redis6.0 部署迷你版本消息队列实战(下)

基于Stream实现消息队列



Redis5.0中发布的Stream类型,也用来实现典型的消息队列。提供了消息的持久化和主备复制功能,可以让任何客户端访问任何时刻的数据,并且能记住每一个客户端的访问位置,还能保证消息不丢失。该Stream类型的出现,几乎满足了消息队列具备的全部内容,包括但不限于:


  • 消息ID的序列化生成
  • 消息遍历
  • 消息的阻塞和非阻塞读取
  • 消息的分组消费
  • 未完成消息的处理
  • 消息队列监控


关于Stream的一些基本入门篇章这里不做过多介绍,感兴趣的朋友可以去阅读下这篇文章:

xie.infoq.cn/article/cdb…


下边的部分我们直接来进入关于Redis XStream相关的实战环节。


封装消息监听功能


首先是定义一个MQ相关的接口:


public interface RedisStreamListener {
    /**
     * 处理正常消息
     */
    HandlerResult handleMsg(StreamEntry streamEntry);
}
复制代码


接着是基于这套接口做消息发送的实现:


package org.idea.mq.redis.framework.listener;
import com.alibaba.fastjson.JSON;
import org.idea.mq.redis.framework.bean.HandlerResult;
import org.idea.mq.redis.framework.config.StreamListener;
import org.idea.mq.redis.framework.mq.xstream.RedisStreamMQListener;
import org.idea.mq.redis.framework.redis.IRedisService;
import org.idea.mq.redis.framework.utils.PayMsg;
import redis.clients.jedis.StreamEntry;
import javax.annotation.Resource;
import java.util.Map;
import static org.idea.mq.redis.framework.config.MQConstants.SUCCESS;
/**
 * @Author linhao
 * @Date created in 10:07 下午 2022/2/9
 */
@StreamListener(streamName = "order-service:order-payed-stream", groupName = "order-service-group", consumerName = "user-service-consumer")
public class OrderPayedListener implements RedisStreamMQListener {
    @Resource
    private IRedisService iRedisService;
    @Override
    public HandlerResult handleMsg(StreamEntry streamEntry) {
        Map<String, String> map = streamEntry.getFields();
        String json = map.get("json");
        PayMsg payMsg = JSON.parseObject(json, PayMsg.class);
        System.out.println("pending payMsg is : " + payMsg);
        return SUCCESS;
    }
}
复制代码


自定义消息注解


package org.idea.mq.redis.framework.config;
import org.springframework.stereotype.Component;
import java.lang.annotation.*;
/**
 * @Author linhao
 * @Date created in 10:04 下午 2022/2/9
 */
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Component
public @interface StreamListener {
    String streamName() default "";
    String groupName() default "";
    String consumerName() default "";
}
复制代码


代码中有一个自定义的@StreamListener的注解,该注解的内部包含了一个@Component的注解,可以将使用了该注解的对象注入到Spring容器中。


为了能将这些个初始化类进行自动装配,还需要加入一个配置的对象,代码如下:


package org.idea.mq.redis.framework.config;
import org.idea.mq.redis.framework.bean.HandlerResult;
import org.idea.mq.redis.framework.mq.xstream.RedisStreamMQListener;
import org.idea.mq.redis.framework.redis.IRedisService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import redis.clients.jedis.StreamEntry;
import redis.clients.jedis.StreamEntryID;
import redis.clients.jedis.StreamPendingEntry;
import javax.annotation.Resource;
import java.util.List;
import java.util.Map;
import static org.idea.mq.redis.framework.config.MQConstants.SUCCESS;
/**
 * @Author linhao
 * @Date created in 3:25 下午 2022/2/7
 */
@Configuration
public class StreamListenerConfiguration implements ApplicationListener<ApplicationReadyEvent> {
    @Resource
    private ApplicationContext applicationContext;
    @Resource
    private IRedisService iRedisService;
    private static Logger logger = LoggerFactory.getLogger(StreamListenerConfiguration.class);
    @Override
    public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {
        Map<String, RedisStreamMQListener> beanMap = applicationContext.getBeansOfType(RedisStreamMQListener.class);
        beanMap.values().forEach(redisStreamMQListener -> {
            StreamListener StreamListener = redisStreamMQListener.getClass().getAnnotation(StreamListener.class);
            ListenerInitWrapper listenerInitWrapper = new ListenerInitWrapper(StreamListener.streamName(), StreamListener.groupName(), StreamListener.consumerName());
            Thread handleThread = new Thread(new CoreMsgHandlerThread(listenerInitWrapper, redisStreamMQListener, iRedisService));
            Thread pendingHandleThread = new Thread(new PendingMsgHandlerThread(listenerInitWrapper, redisStreamMQListener, iRedisService));
            handleThread.start();
            pendingHandleThread.start();
            logger.info("{} load successed ", redisStreamMQListener);
        });
    }
    class PendingMsgHandlerThread implements Runnable {
        private ListenerInitWrapper listenerInitWrapper;
        private RedisStreamMQListener redisStreamMQListener;
        private IRedisService iRedisService;
        public PendingMsgHandlerThread(ListenerInitWrapper listenerInitWrapper, RedisStreamMQListener redisStreamMQListener, IRedisService iRedisService) {
            this.redisStreamMQListener = redisStreamMQListener;
            this.listenerInitWrapper = listenerInitWrapper;
            this.iRedisService = iRedisService;
        }
        @Override
        public void run() {
            String startId = "0-0";
            while (true) {
                List<StreamPendingEntry> streamConsumersInfos = iRedisService.xpending(listenerInitWrapper.getStreamName(), listenerInitWrapper.getGroupName(), new StreamEntryID(startId), 1);
                //如果该集合非空,则触发监听行为
                if (!CollectionUtils.isEmpty(streamConsumersInfos)) {
                    for (StreamPendingEntry streamConsumersInfo : streamConsumersInfos) {
                        StreamEntryID streamEntryID = streamConsumersInfo.getID();
                        //比当前pending的streamId小1
                        String streamIdStr = streamEntryID.toString();
                        String[] items = streamIdStr.split("-");
                        Long timestamp = Long.valueOf(items[0]) - 1;
                        String beforeId = timestamp + "-" + "0";
                        List<Map.Entry<String, List<StreamEntry>>> result = iRedisService.xreadGroup(listenerInitWrapper.getStreamName(), listenerInitWrapper.getGroupName(), new StreamEntryID(beforeId), 1, listenerInitWrapper.getConsumerName());
                        for (Map.Entry<String, List<StreamEntry>> streamInfo : result) {
                            List<StreamEntry> streamEntries = streamInfo.getValue();
                            for (StreamEntry streamEntry : streamEntries) {
                                try {
                                    //业务处理
                                    HandlerResult handlerResult = redisStreamMQListener.handleMsg(streamEntry);
                                    if (SUCCESS.equals(handlerResult)) {
                                        startId = streamEntryID.toString();
                                        iRedisService.xack(listenerInitWrapper.getStreamName(), listenerInitWrapper.getGroupName(), new StreamEntryID(startId));
                                    }
                                } catch (Exception e) {
                                    logger.error("[PendingMsgHandlerThread] e is ", e);
                                }
                            }
                        }
                    }
                }
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    class CoreMsgHandlerThread implements Runnable {
        private ListenerInitWrapper listenerInitWrapper;
        private RedisStreamMQListener redisStreamMQListener;
        private IRedisService iRedisService;
        public CoreMsgHandlerThread(ListenerInitWrapper listenerInitWrapper, RedisStreamMQListener redisStreamMQListener, IRedisService iRedisService) {
            this.redisStreamMQListener = redisStreamMQListener;
            this.listenerInitWrapper = listenerInitWrapper;
            this.iRedisService = iRedisService;
        }
        @Override
        public void run() {
            while (true) {
                List<Map.Entry<String, List<StreamEntry>>> streamConsumersInfos = iRedisService.xreadGroup(listenerInitWrapper.getStreamName(), listenerInitWrapper.getGroupName(), StreamEntryID.UNRECEIVED_ENTRY, 1, listenerInitWrapper.getConsumerName());
                for (Map.Entry<String, List<StreamEntry>> streamInfo : streamConsumersInfos) {
                    List<StreamEntry> streamEntries = streamInfo.getValue();
                    for (StreamEntry streamEntry : streamEntries) {
                        //业务处理
                        try {
                            HandlerResult result = redisStreamMQListener.handleMsg(streamEntry);
                            if (SUCCESS.equals(result)) {
                                iRedisService.xack(listenerInitWrapper.getStreamName(), listenerInitWrapper.getGroupName(), streamEntry.getID());
                            }
                        } catch (Exception e) {
                            logger.error("[CoreMsgHandlerThread] e is ", e);
                        }
                    }
                }
            }
        }
    }
}
复制代码


其原理是在Spring容器启动好了之后,监听Spring容器内部发出的


ApplicationReadyEvent事件,监听该事件,并且开启两个后台线程用于处理redis内部的stream数据。


封装相关的消息发布功能


消息的发送部分比较简单,直接通过redis往stream里面写入数据即可


package org.idea.mq.redis.framework.producer;
/**
 * @Author linhao
 * @Date created in 12:23 下午 2022/2/10
 */
public interface IStreamProducer {
    /**
     * 指定streamName发布消息
     * @param streamName
     * @param json
     */
    void sendMsg(String streamName, String json);
}
复制代码


消息的传输格式采用json字符串的方式写入到redis内部的stream当中。


package org.idea.mq.redis.framework.producer;
import org.idea.mq.redis.framework.redis.IRedisService;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
/**
 * @Author linhao
 * @Date created in 12:19 下午 2022/2/10
 */
public class StreamProducer implements IStreamProducer{
    @Resource
    private IRedisService iRedisService;
    @Override
    public void sendMsg(String streamName,String json){
        Map<String,String> map = new HashMap<>();
        map.put("json",json);
        iRedisService.xAdd(streamName,map);
    }
}
复制代码


注意,写入底层的时候,我使用的是Redis内部自动生成的ID序号,代码如下:


@Override
public boolean xAdd(String streamName, Map<String, String> stringMap) {
    try (Jedis jedis = iRedisFactory.getConnection()) {
        jedis.xadd(streamName, StreamEntryID.NEW_ENTRY, stringMap);
        return true;
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
}
复制代码


为了方便将其作为一个SpringBoot的starter组件供外界团队人员使用,我们可以将其封装为一个starter组件:


网络异常,图片无法展示
|


组件的测试



点对点发送测试


建立两套微服务工程,user-service 和 order-service,其中user-service部署两个服务节点,同属user-service-group。order-service也要部署两个服务节点,同属order-service-group。


最后两个微服务集群之间互相发布对方订阅的消息,查看是否能够正常接受,且同一个组内一次只有一个节点接收消息。


网络异常,图片无法展示
|


广播发送测试


使用之前搭建好的user-service模块,部署四个节点,订阅同一个stream队列,但是将其groupName设置为不同的属性,最后发布消息,查看四个节点都能正常接收。


网络异常,图片无法展示
|


具体细节在现有工程内部已经建立了测试模版,感兴趣的朋友可以去阅读下mq-redis-test模块的部分。


问题思考



为何同一个StreamName需要采用双线程消费?


一个线程用于接受Stream内部正常数据,如果业务正常处理则对其返回为ack信号,确认该消息已经消费成功。如果处理过程中出现异常,则不反回ACK信号,此时Redis内部会将该消息放入到Pending队列中,而第二个线程专门用于处理Pending队列内部的数据。如果处于Pending状态的消息第二次消费依然失败,则会进行定时轮询状况。


是否支持延迟重试


目前的设计其实一直都存在不足点,例如当消息消费异常后会进入轮询,严重情况下可能会导致消息消费出现死循环,并且一直堵塞。暂时还未实现类似于RocketMQ的那种间隔1,3,5…分钟定时投递消费失败消息都功能。感兴趣的小伙伴可以基于现有代码进行简单改造。


本文完整代码案例地址

gitee.com/IdeaHome_ad…

目录
相关文章
|
5月前
|
存储 NoSQL 前端开发
Redis专题-实战篇一-基于Session和Redis实现登录业务
本项目基于SpringBoot实现黑马点评系统,涵盖Session与Redis两种登录方案。通过验证码登录、用户信息存储、拦截器校验等流程,解决集群环境下Session不共享问题,采用Redis替代Session实现数据共享与自动续期,提升系统可扩展性与安全性。
335 3
Redis专题-实战篇一-基于Session和Redis实现登录业务
|
4月前
|
存储 NoSQL Redis
手把手教你用 Docker 部署 Redis
Redis是高性能内存数据库,支持多种数据结构,适用于缓存、消息队列等场景。本文介绍如何通过Docker快速拉取轩辕镜像并部署Redis,涵盖快速启动、持久化存储及docker-compose配置,助力开发者高效搭建稳定服务。
1297 7
|
5月前
|
存储 缓存 NoSQL
Redis专题-实战篇二-商户查询缓存
本文介绍了缓存的基本概念、应用场景及实现方式,涵盖Redis缓存设计、缓存更新策略、缓存穿透问题及其解决方案。重点讲解了缓存空对象与布隆过滤器的使用,并通过代码示例演示了商铺查询的缓存优化实践。
269 1
Redis专题-实战篇二-商户查询缓存
|
11月前
|
数据采集 存储 数据可视化
分布式爬虫框架Scrapy-Redis实战指南
本文介绍如何使用Scrapy-Redis构建分布式爬虫系统,采集携程平台上热门城市的酒店价格与评价信息。通过代理IP、Cookie和User-Agent设置规避反爬策略,实现高效数据抓取。结合价格动态趋势分析,助力酒店业优化市场策略、提升服务质量。技术架构涵盖Scrapy-Redis核心调度、代理中间件及数据解析存储,提供完整的技术路线图与代码示例。
1153 0
分布式爬虫框架Scrapy-Redis实战指南
|
6月前
|
NoSQL 安全 Linux
如何在phpStudy环境中升级Redis版本
以上流程详尽覆盖从准备工作至实际操作再至事后检查各个阶段, 遵循此方案可大幅度减少因技术操作失误导致业务影响风险发生概率, 同时也为未来进一步扩展提供坚实基础支撑点 。
296 15
|
8月前
|
缓存 监控 NoSQL
Redis 实操要点:Java 最新技术栈的实战解析
本文介绍了基于Spring Boot 3、Redis 7和Lettuce客户端的Redis高级应用实践。内容包括:1)现代Java项目集成Redis的配置方法;2)使用Redisson实现分布式可重入锁与公平锁;3)缓存模式解决方案,包括布隆过滤器防穿透和随机过期时间防雪崩;4)Redis数据结构的高级应用,如HyperLogLog统计UV和GeoHash处理地理位置。文章提供了详细的代码示例,涵盖Redis在分布式系统中的核心应用场景,特别适合需要处理高并发、分布式锁等问题的开发场景。
523 41
|
8月前
|
机器学习/深度学习 存储 NoSQL
基于 Flink + Redis 的实时特征工程实战:电商场景动态分桶计数实现
本文介绍了基于 Flink 与 Redis 构建的电商场景下实时特征工程解决方案,重点实现动态分桶计数等复杂特征计算。通过流处理引擎 Flink 实时加工用户行为数据,结合 Redis 高性能存储,满足推荐系统毫秒级特征更新需求。技术架构涵盖状态管理、窗口计算、Redis 数据模型设计及特征服务集成,有效提升模型预测效果与系统吞吐能力。
887 10
|
11月前
|
缓存 NoSQL Java
基于SpringBoot的Redis开发实战教程
Redis在Spring Boot中的应用非常广泛,其高性能和灵活性使其成为构建高效分布式系统的理想选择。通过深入理解本文的内容,您可以更好地利用Redis的特性,为应用程序提供高效的缓存和消息处理能力。
1056 79
|
8月前
|
缓存 NoSQL 算法
高并发秒杀系统实战(Redis+Lua分布式锁防超卖与库存扣减优化)
秒杀系统面临瞬时高并发、资源竞争和数据一致性挑战。传统方案如数据库锁或应用层锁存在性能瓶颈或分布式问题,而基于Redis的分布式锁与Lua脚本原子操作成为高效解决方案。通过Redis的`SETNX`实现分布式锁,结合Lua脚本完成库存扣减,确保操作原子性并大幅提升性能(QPS从120提升至8,200)。此外,分段库存策略、多级限流及服务降级机制进一步优化系统稳定性。最佳实践包括分层防控、黄金扣减法则与容灾设计,强调根据业务特性灵活组合技术手段以应对高并发场景。
2267 7
|
NoSQL 安全 测试技术
Redis游戏积分排行榜项目中通义灵码的应用实战
Redis游戏积分排行榜项目中通义灵码的应用实战
320 4