实战干货:基于Redis6.0 部署迷你版本消息队列(中)

简介: 实战干货:基于Redis6.0部署迷你版本消息队列(中)

问题思考


如何保证消息的可靠性传输?


通过subscibe/publish处理的消息没有持久化的特性,一旦出现网络中断,Redis宕机这类异常的时候就会导致消息丢失,而且也没有较好的机制取支持消息重复消费的问题。因此可靠性方面较差。


基于Stream实现消息队列


Redis5.0中发布的Stream类型,也用来实现典型的消息队列。提供了消息的持久化和主备复制功能,可以让任何客户端访问任何时刻的数据,并且能记住每一个客户端的访问位置,还能保证消息不丢失。该Stream类型的出现,几乎满足了消息队列具备的全部内容,包括但不限于:


  • 消息ID的序列化生成
  • 消息遍历
  • 消息的阻塞和非阻塞读取
  • 消息的分组消费
  • 未完成消息的处理
  • 消息队列监控


关于Stream的一些基本入门篇章这里不做过多介绍,感兴趣的朋友可以去阅读下这篇文章:


https://xie.infoq.cn/article/cdb47caddc5ff49dc09ea58cd


下边的部分我们直接来进入关于Redis XStream相关的实战环节。


封装消息监听功能


首先是定义一个MQ相关的接口:


public interface RedisStreamListener {
    /**
     * 处理正常消息
     */
    HandlerResult handleMsg(StreamEntry streamEntry);
}


接着是基于这套接口做消息发送的实现:


package org.idea.mq.redis.framework.listener;
import com.alibaba.fastjson.JSON;
import org.idea.mq.redis.framework.bean.HandlerResult;
import org.idea.mq.redis.framework.config.StreamListener;
import org.idea.mq.redis.framework.mq.xstream.RedisStreamMQListener;
import org.idea.mq.redis.framework.redis.IRedisService;
import org.idea.mq.redis.framework.utils.PayMsg;
import redis.clients.jedis.StreamEntry;
import javax.annotation.Resource;
import java.util.Map;
import static org.idea.mq.redis.framework.config.MQConstants.SUCCESS;
/**
 * @Author linhao
 * @Date created in 10:07 下午 2022/2/9
 */
@StreamListener(streamName = "order-service:order-payed-stream", groupName = "order-service-group", consumerName = "user-service-consumer")
public class OrderPayedListener implements RedisStreamMQListener {
    @Resource
    private IRedisService iRedisService;
    @Override
    public HandlerResult handleMsg(StreamEntry streamEntry) {
        Map<String, String> map = streamEntry.getFields();
        String json = map.get("json");
        PayMsg payMsg = JSON.parseObject(json, PayMsg.class);
        System.out.println("pending payMsg is : " + payMsg);
        return SUCCESS;
    }
}


自定义消息注解


package org.idea.mq.redis.framework.config;
import org.springframework.stereotype.Component;
import java.lang.annotation.*;
/**
 * @Author linhao
 * @Date created in 10:04 下午 2022/2/9
 */
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Component
public @interface StreamListener {
    String streamName() default "";
    String groupName() default "";
    String consumerName() default "";
}


代码中有一个自定义的@StreamListener的注解,该注解的内部包含了一个@Component的注解,可以将使用了该注解的对象注入到Spring容器中。


为了能将这些个初始化类进行自动装配,还需要加入一个配置的对象,代码如下:


package org.idea.mq.redis.framework.config;
import org.idea.mq.redis.framework.bean.HandlerResult;
import org.idea.mq.redis.framework.mq.xstream.RedisStreamMQListener;
import org.idea.mq.redis.framework.redis.IRedisService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import redis.clients.jedis.StreamEntry;
import redis.clients.jedis.StreamEntryID;
import redis.clients.jedis.StreamPendingEntry;
import javax.annotation.Resource;
import java.util.List;
import java.util.Map;
import static org.idea.mq.redis.framework.config.MQConstants.SUCCESS;
/**
 * @Author linhao
 * @Date created in 3:25 下午 2022/2/7
 */
@Configuration
public class StreamListenerConfiguration implements ApplicationListener<ApplicationReadyEvent> {
    @Resource
    private ApplicationContext applicationContext;
    @Resource
    private IRedisService iRedisService;
    private static Logger logger = LoggerFactory.getLogger(StreamListenerConfiguration.class);
    @Override
    public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {
        Map<String, RedisStreamMQListener> beanMap = applicationContext.getBeansOfType(RedisStreamMQListener.class);
        beanMap.values().forEach(redisStreamMQListener -> {
            StreamListener StreamListener = redisStreamMQListener.getClass().getAnnotation(StreamListener.class);
            ListenerInitWrapper listenerInitWrapper = new ListenerInitWrapper(StreamListener.streamName(), StreamListener.groupName(), StreamListener.consumerName());
            Thread handleThread = new Thread(new CoreMsgHandlerThread(listenerInitWrapper, redisStreamMQListener, iRedisService));
            Thread pendingHandleThread = new Thread(new PendingMsgHandlerThread(listenerInitWrapper, redisStreamMQListener, iRedisService));
            handleThread.start();
            pendingHandleThread.start();
            logger.info("{} load successed ", redisStreamMQListener);
        });
    }
    class PendingMsgHandlerThread implements Runnable {
        private ListenerInitWrapper listenerInitWrapper;
        private RedisStreamMQListener redisStreamMQListener;
        private IRedisService iRedisService;
        public PendingMsgHandlerThread(ListenerInitWrapper listenerInitWrapper, RedisStreamMQListener redisStreamMQListener, IRedisService iRedisService) {
            this.redisStreamMQListener = redisStreamMQListener;
            this.listenerInitWrapper = listenerInitWrapper;
            this.iRedisService = iRedisService;
        }
        @Override
        public void run() {
            String startId = "0-0";
            while (true) {
                List<StreamPendingEntry> streamConsumersInfos = iRedisService.xpending(listenerInitWrapper.getStreamName(), listenerInitWrapper.getGroupName(), new StreamEntryID(startId), 1);
                //如果该集合非空,则触发监听行为
                if (!CollectionUtils.isEmpty(streamConsumersInfos)) {
                    for (StreamPendingEntry streamConsumersInfo : streamConsumersInfos) {
                        StreamEntryID streamEntryID = streamConsumersInfo.getID();
                        //比当前pending的streamId小1
                        String streamIdStr = streamEntryID.toString();
                        String[] items = streamIdStr.split("-");
                        Long timestamp = Long.valueOf(items[0]) - 1;
                        String beforeId = timestamp + "-" + "0";
                        List<Map.Entry<String, List<StreamEntry>>> result = iRedisService.xreadGroup(listenerInitWrapper.getStreamName(), listenerInitWrapper.getGroupName(), new StreamEntryID(beforeId), 1, listenerInitWrapper.getConsumerName());
                        for (Map.Entry<String, List<StreamEntry>> streamInfo : result) {
                            List<StreamEntry> streamEntries = streamInfo.getValue();
                            for (StreamEntry streamEntry : streamEntries) {
                                try {
                                    //业务处理
                                    HandlerResult handlerResult = redisStreamMQListener.handleMsg(streamEntry);
                                    if (SUCCESS.equals(handlerResult)) {
                                        startId = streamEntryID.toString();
                                        iRedisService.xack(listenerInitWrapper.getStreamName(), listenerInitWrapper.getGroupName(), new StreamEntryID(startId));
                                    }
                                } catch (Exception e) {
                                    logger.error("[PendingMsgHandlerThread] e is ", e);
                                }
                            }
                        }
                    }
                }
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    class CoreMsgHandlerThread implements Runnable {
        private ListenerInitWrapper listenerInitWrapper;
        private RedisStreamMQListener redisStreamMQListener;
        private IRedisService iRedisService;
        public CoreMsgHandlerThread(ListenerInitWrapper listenerInitWrapper, RedisStreamMQListener redisStreamMQListener, IRedisService iRedisService) {
            this.redisStreamMQListener = redisStreamMQListener;
            this.listenerInitWrapper = listenerInitWrapper;
            this.iRedisService = iRedisService;
        }
        @Override
        public void run() {
            while (true) {
                List<Map.Entry<String, List<StreamEntry>>> streamConsumersInfos = iRedisService.xreadGroup(listenerInitWrapper.getStreamName(), listenerInitWrapper.getGroupName(), StreamEntryID.UNRECEIVED_ENTRY, 1, listenerInitWrapper.getConsumerName());
                for (Map.Entry<String, List<StreamEntry>> streamInfo : streamConsumersInfos) {
                    List<StreamEntry> streamEntries = streamInfo.getValue();
                    for (StreamEntry streamEntry : streamEntries) {
                        //业务处理
                        try {
                            HandlerResult result = redisStreamMQListener.handleMsg(streamEntry);
                            if (SUCCESS.equals(result)) {
                                iRedisService.xack(listenerInitWrapper.getStreamName(), listenerInitWrapper.getGroupName(), streamEntry.getID());
                            }
                        } catch (Exception e) {
                            logger.error("[CoreMsgHandlerThread] e is ", e);
                        }
                    }
                }
            }
        }
    }
}



相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
6天前
|
存储 NoSQL Redis
Docker 部署 Redis
在使用 Docker 部署 Redis 时,为实现数据持久化,需正确挂载容器内的数据目录到宿主机。推荐命令如下: ``` docker run -d --name redis -v /mnt/data/redis:/data -p 6379:6379 redis ``` 该命令将宿主机的 `/mnt/data/redis` 目录挂载到容器的 `/data` 目录,确保 Redis 数据持久化。此路径更通用,适合大多数场景。避免使用不匹配的挂载路径,如 `/var/lib/redis` 或 `/mnt/data/redis` 到非默认目录,以防止数据无法正确持久化。
|
2月前
|
NoSQL Java 关系型数据库
Liunx部署java项目Tomcat、Redis、Mysql教程
本文详细介绍了如何在 Linux 服务器上安装和配置 Tomcat、MySQL 和 Redis,并部署 Java 项目。通过这些步骤,您可以搭建一个高效稳定的 Java 应用运行环境。希望本文能为您在实际操作中提供有价值的参考。
185 26
|
3月前
|
NoSQL 安全 测试技术
Redis游戏积分排行榜项目中通义灵码的应用实战
Redis游戏积分排行榜项目中通义灵码的应用实战
103 4
|
3月前
|
监控 NoSQL 网络协议
【Azure Redis】部署在AKS中的应用,连接Redis高频率出现timeout问题
查看Redis状态,没有任何异常,服务没有更新,Service Load, CPU, Memory, Connect等指标均正常。在排除Redis端问题后,转向了AKS中。 开始调查AKS的网络状态。最终发现每次Redis客户端出现超时问题时,几乎都对应了AKS NAT Gateway的更新事件,而Redis服务端没有任何异常。因此,超时问题很可能是由于NAT Gateway更新事件导致TCP连接被重置。
|
3月前
|
消息中间件 运维 UED
消息队列运维实战:攻克消息丢失、重复与积压难题
消息队列(MQ)作为分布式系统中的核心组件,承担着解耦、异步处理和流量削峰等功能。然而,在实际应用中,消息丢失、重复和积压等问题时有发生,严重影响系统的稳定性和数据的一致性。本文将深入探讨这些问题的成因及其解决方案,帮助您在运维过程中有效应对这些挑战。
56 1
|
4月前
|
NoSQL 关系型数据库 MySQL
MySQL与Redis协同作战:优化百万数据查询的实战经验
【10月更文挑战第13天】 在处理大规模数据集时,传统的关系型数据库如MySQL可能会遇到性能瓶颈。为了提升数据处理的效率,我们可以结合使用MySQL和Redis,利用两者的优势来优化数据查询。本文将分享一次实战经验,探讨如何通过MySQL与Redis的协同工作来优化百万级数据统计。
202 5
|
4月前
|
存储 NoSQL Redis
Redis 新版本引入多线程的利弊分析
【10月更文挑战第16天】Redis 新版本引入多线程是一个具有挑战性和机遇的改变。虽然多线程带来了一些潜在的问题和挑战,但也为 Redis 提供了进一步提升性能和扩展能力的可能性。在实际应用中,我们需要根据具体的需求和场景,综合评估多线程的利弊,谨慎地选择和使用 Redis 的新版本。同时,Redis 开发者也需要不断努力,优化和完善多线程机制,以提供更加稳定、高效和可靠的 Redis 服务。
107 1
|
4月前
|
缓存 NoSQL Java
Spring Boot与Redis:整合与实战
【10月更文挑战第15天】本文介绍了如何在Spring Boot项目中整合Redis,通过一个电商商品推荐系统的案例,详细展示了从添加依赖、配置连接信息到创建配置类的具体步骤。实战部分演示了如何利用Redis缓存提高系统响应速度,减少数据库访问压力,从而提升用户体验。
240 2
|
4月前
|
NoSQL Linux Shell
Redis 的安装与部署(图文)
Redis 的安装与部署(图文)
|
5月前
|
NoSQL 网络协议 Unix
1)Redis 属于单线程还是多线程?不同版本之间有什么区别?
1)Redis 属于单线程还是多线程?不同版本之间有什么区别?
138 1