基于Redis实现分布式消息队列(4)

简介:

基于Redis实现分布式消息队列(4)

1、访问Redis的工具类

public class RedisManager {

    private static Pool<Jedis> pool;

    protected final static Logger logger = Logger.getLogger(RedisManager.class);

    static{
        try {
            init();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void init() throws Exception {

        Properties props = ConfigManager.getProperties("redis");
        logger.debug("初始化Redis连接池。");
        if(props==null){
            throw new RuntimeException("没有找到redis配置文件");
        }
        // 创建jedis池配置实例
        JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
        // 设置池配置项值
        int poolMaxTotal = Integer.valueOf(props.getProperty("redis.pool.maxTotal").trim());
        jedisPoolConfig.setMaxTotal(poolMaxTotal);

        int poolMaxIdle = Integer.valueOf(props.getProperty("redis.pool.maxIdle").trim());
        jedisPoolConfig.setMaxIdle(poolMaxIdle);

        long poolMaxWaitMillis = Long.valueOf(props.getProperty("redis.pool.maxWaitMillis").trim());
        jedisPoolConfig.setMaxWaitMillis(poolMaxWaitMillis);

        logger.debug(String.format("poolMaxTotal: %s , poolMaxIdle : %s , poolMaxWaitMillis : %s ",
                poolMaxTotal,poolMaxIdle,poolMaxWaitMillis));

        // 根据配置实例化jedis池
        String connectMode = props.getProperty("redis.connectMode");
        String hostPortStr = props.getProperty("redis.hostPort");

        logger.debug(String.format("host : %s ",hostPortStr));
        logger.debug(String.format("mode : %s ",connectMode));

        if(StringUtils.isEmpty(hostPortStr)){
            throw new OptimusException("redis配置文件未配置主机-端口集");
        }
        String[] hostPortSet = hostPortStr.split(","); 
        if("single".equals(connectMode)){
            String[] hostPort = hostPortSet[0].split(":");
            pool = new JedisPool(jedisPoolConfig, hostPort[0], Integer.valueOf(hostPort[1].trim()));
        }else if("sentinel".equals(connectMode)){
            Set<String> sentinels = new HashSet<String>();     
            for(String hostPort : hostPortSet){
                sentinels.add(hostPort);
            }
            pool = new JedisSentinelPool("mymaster", sentinels, jedisPoolConfig);
        }
    }

    /**
     * 使用完成后,必须调用 returnResource 还回。
     * @return 获取Jedis对象
     */
    public static Jedis getResource(){
        Jedis jedis = pool.getResource();
        if(logger.isDebugEnabled()){
            logger.debug("获得链接:" + jedis);
        }
        return jedis;
    }

    /**
     * 获取Jedis对象。
     * 
     * 用完后,需要调用returnResource放回连接池。
     * 
     * @param db 数据库序号
     * @return
     */
    public static Jedis getResource(int db){
        Jedis jedis = pool.getResource();
        jedis.select(db);
        if(logger.isDebugEnabled()){
            logger.debug("获得链接:" + jedis);
        }
        return jedis;
    }

    /**
     * @param jedis
     */
    public static void returnResource(Jedis jedis){
        if(jedis!=null){
            pool.returnResource(jedis);
            if(logger.isDebugEnabled()){
                logger.debug("放回链接:" + jedis);
            }
        }
    }

    /**
     * 需要通过Spring确认这个方法被调用。
     * @throws Exception
     */
    public static void destroy() throws Exception {
        pool.destroy();
    }
}
AI 代码解读

这个类没有通过技术手段强制调用returnResource和destroy,需要想想办法。

2、队列接口

public interface TaskQueue {

    /**
     * 获取队列名
     * @return
     */
    String getName();

    /**
     * 往队列中添加任务
     * @param task
     */
    void pushTask(String task);

    /**
     * 从队列中取出一个任务
     * @return
     */
    String popTask();

}
AI 代码解读

用String类型描述任务,也可以考虑byte[],要求对每个任务描述的数据尽可能短。

3、队列的Redis实现类

/**
 * 任务队列Redis实现。
 * 
 * 采用每次获取Jedis并放回pool的方式。
 * 如果获得Jedis后一直不放手,反复重用,两个操作耗时可以降低1/3。
 * 暂时先忍受这种低性能,不明确Jedis是否线程安全。
 *
 */
public class TaskQueueRedisImpl implements TaskQueue {

    private final static int REDIS_DB_IDX = 9;

    private final static Logger logger = Logger.getLogger(TaskQueueRedisImpl.class);

    private final String name; 

    /**
     * 构造函数。
     * 
     * @param name
     */
    public TaskQueueRedisImpl(String name) {
        this.name = name;
    }

    /* (non-Javadoc)
     * @see com.gwssi.common.mq.TaskQueue#getName()
     */
    public String getName() {
        return this.name;
    }
    /* (non-Javadoc)
     * @see com.gwssi.common.mq.TaskQueue#pushTask(String)
     */
    public void pushTask(String task) {
        Jedis jedis = null;
        try{
            jedis = RedisManager.getResource(REDIS_DB_IDX);
            jedis.lpush(this.name, task);
        }catch(Throwable e){
            logger.error(e.getMessage(),e);
        }finally{
            if(jedis!=null){
                RedisManager.returnResource(jedis);
            }
        }
    }

    /* (non-Javadoc)
     * @see com.gwssi.common.mq.TaskQueue#popTask()
     */
    public String popTask() {
        Jedis jedis = null;
        String task = null;
        try{
            jedis = RedisManager.getResource(REDIS_DB_IDX);
            task = jedis.rpop(this.name);
        }catch(Throwable e){
            logger.error(e.getMessage(),e);
        }finally{
            if(jedis!=null){
                RedisManager.returnResource(jedis);
            }
        }
        return task;
    }

}
AI 代码解读

4、获取队列实例的工具类

/**
 * <pre>
 *  // 获得队列
 *  TaskQueue tq = TaskQueueManager.get(TaskQueueManager.SMS_QUEUE);
 *  
 *  // 添加任务到队列
 *  String task = "task id";
 *  tq.pushTask(task);
 * 
 *  // 从队列中取出任务执行
 *  String taskToDo = tq.popTask();
 * </pre>
 * @author liuhailong
 */
public class TaskQueueManager {

    protected final static Logger logger = Logger.getLogger(TaskQueueManager.class);

    private static Map<String, TaskQueueRedisImpl> queneMap = new ConcurrentHashMap<String, TaskQueueRedisImpl>();

    /**
     * 短信队列名。
     */
    public static final String SMS_QUEUE = "SMS_QUEUE";

    /**
     * 规则队列名。
     */
    public static final String RULE_QUEUE = "RULE_QUEUE";

    private static void initQueneMap() {
        logger.debug("初始化任务队列...");
        queneMap.put(RULE_QUEUE, new TaskQueueRedisImpl(RULE_QUEUE));
        logger.debug("建立队列:"+RULE_QUEUE);
        queneMap.put(SMS_QUEUE, new TaskQueueRedisImpl(SMS_QUEUE));
        logger.debug("建立队列:"+SMS_QUEUE);
    }

    static {
        initQueneMap();
    }

    public static TaskQueue get(String name){
        return getRedisTaskQueue(name);
    }

    public static TaskQueue getRedisTaskQueue(String name){
        return queneMap.get(name);
    }

}
AI 代码解读

和具体的队列过于紧耦合,但简单好用。
先跑起来再说。

5、向队列中添加任务的代码

TaskQueue tq = TaskQueueManager.get(TaskQueueManager.SMS_QUEUE);
tq.pushTask(smsMessageId);
AI 代码解读

6、从队列中取出任务执行的代码

public class SmsSendTask{

    protected final static Logger logger = Logger.getLogger(SmsSendTask.class);

    protected static SmsSendService smsSendService = new SmsSendServiceUnicomImpl();
    /**
     * 入口方法。
     */
    public void execute()  {
        TaskQueue taskQueue = null;
        String task = null;
        try {
            taskQueue = TaskQueueManager.get(TaskQueueManager.SMS_QUEUE);

            // 非线程安全
            Set<Serializable> executedTaskSet = new HashSet<Serializable>();

            task = taskQueue.popTask();
            while(task!=null){
                // 判断是否把所有任务都执行一遍了,避免死循环
                if(executedTaskSet.contains(task)){
                    taskQueue.pushTask(task);
                    break;
                }

                executeSingleTask(taskQueue,task);

                task = taskQueue.popTask();
            }
        }catch(Throwable e){
            logger.error(e.getMessage(),e);
            e.printStackTrace();
        }
    }

    /**
     * 发送单条短信。
     * 
     * 取出任务并执行,如果失败,放回任务列表。
     * 
     * @param taskQueue
     * @param task
     */
    @SuppressWarnings({ "rawtypes", "unchecked" })
    private void executeSingleTask(TaskQueue taskQueue, String task) {
        try {
            // do the job
            String smsId = task;
            Map<String,String> sms = smsSendService.getSmsList(smsId);

            smsSendService.send(sms);

            smsSendService.updateSmsStatus(task,SmsSendService.STATUS_SENT);

            String opType = "2";
            TaskQueueUtil.taskLog(taskQueue.getName(), opType, task);
        } catch (Throwable e) {
            if(task!=null){
                taskQueue.pushTask(task);
                smsSendService.updateSmsStatus(task,SmsSendService.STATUS_WAIT);
                if(logger.isDebugEnabled()){
                    logger.error(String.format("任务%s执行失败:%s,重新放回队列", task, e.getMessage()));
                }
            }else {
                e.printStackTrace();
            }
        }
    }

}
AI 代码解读

这部分代码是固定模式,而且不这样做存在重大缺陷,会有任务执行失败,被丢弃,这部分代码应该写到队列实现中。
有空再改。

原文地址http://www.bieryun.com/1989.html

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
打赏
0
2
0
0
101
分享
相关文章
分布式爬虫框架Scrapy-Redis实战指南
本文介绍如何使用Scrapy-Redis构建分布式爬虫系统,采集携程平台上热门城市的酒店价格与评价信息。通过代理IP、Cookie和User-Agent设置规避反爬策略,实现高效数据抓取。结合价格动态趋势分析,助力酒店业优化市场策略、提升服务质量。技术架构涵盖Scrapy-Redis核心调度、代理中间件及数据解析存储,提供完整的技术路线图与代码示例。
419 0
分布式爬虫框架Scrapy-Redis实战指南
基于Scrapy-Redis的分布式景点数据爬取与热力图生成
基于Scrapy-Redis的分布式景点数据爬取与热力图生成
236 67
高并发秒杀系统实战(Redis+Lua分布式锁防超卖与库存扣减优化)
秒杀系统面临瞬时高并发、资源竞争和数据一致性挑战。传统方案如数据库锁或应用层锁存在性能瓶颈或分布式问题,而基于Redis的分布式锁与Lua脚本原子操作成为高效解决方案。通过Redis的`SETNX`实现分布式锁,结合Lua脚本完成库存扣减,确保操作原子性并大幅提升性能(QPS从120提升至8,200)。此外,分段库存策略、多级限流及服务降级机制进一步优化系统稳定性。最佳实践包括分层防控、黄金扣减法则与容灾设计,强调根据业务特性灵活组合技术手段以应对高并发场景。
452 7
|
2月前
|
redis分布式锁在高并发场景下的方案设计与性能提升
本文探讨了Redis分布式锁在主从架构下失效的问题及其解决方案。首先通过CAP理论分析,Redis遵循AP原则,导致锁可能失效。针对此问题,提出两种解决方案:Zookeeper分布式锁(追求CP一致性)和Redlock算法(基于多个Redis实例提升可靠性)。文章还讨论了可能遇到的“坑”,如加从节点引发超卖问题、建议Redis节点数为奇数以及持久化策略对锁的影响。最后,从性能优化角度出发,介绍了减少锁粒度和分段锁的策略,并结合实际场景(如下单重复提交、支付与取消订单冲突)展示了分布式锁的应用方法。
202 3
Redis设计与实现——分布式Redis
Redis Sentinel 和 Cluster 是 Redis 高可用与分布式架构的核心组件。Sentinel 提供主从故障检测与自动切换,通过主观/客观下线判断及 Raft 算法选举领导者完成故障转移,但存在数据一致性和复杂度问题。Cluster 支持数据分片和水平扩展,基于哈希槽分配数据,具备自动故障转移和节点发现机制,适合大规模高并发场景。复制机制包括全量同步和部分同步,通过复制积压缓冲区优化同步效率,但仍面临延迟和资源消耗挑战。两者各有优劣,需根据业务需求选择合适方案。
分布式爬虫去重:Python + Redis实现高效URL去重
分布式爬虫去重:Python + Redis实现高效URL去重
|
2月前
|
从扣减库存场景来讲讲redis分布式锁中的那些“坑”
本文从一个简单的库存扣减场景出发,深入分析了高并发下的超卖问题,并逐步优化解决方案。首先通过本地锁解决单机并发问题,但集群环境下失效;接着引入Redis分布式锁,利用SETNX命令实现加锁,但仍存在死锁、锁过期等隐患。文章详细探讨了通过设置唯一标识、续命机制等方法完善锁的可靠性,并最终引出Redisson工具,其内置的锁续命和原子性操作极大简化了分布式锁的实现。最后,作者剖析了Redisson源码,揭示其实现原理,并预告后续关于主从架构下分布式锁的应用与性能优化内容。
134 0
【📕分布式锁通关指南 03】通过Lua脚本保证redis操作的原子性
本文介绍了如何通过Lua脚本在Redis中实现分布式锁的原子性操作,避免并发问题。首先讲解了Lua脚本的基本概念及其在Redis中的使用方法,包括通过`eval`指令执行Lua脚本和通过`script load`指令缓存脚本。接着详细展示了如何用Lua脚本实现加锁、解锁及可重入锁的功能,确保同一线程可以多次获取锁而不发生死锁。最后,通过代码示例演示了如何在实际业务中调用这些Lua脚本,确保锁操作的原子性和安全性。
294 6
【📕分布式锁通关指南 03】通过Lua脚本保证redis操作的原子性
Redis分布式锁如何实现 ?
Redis分布式锁主要依靠一个SETNX指令实现的 , 这条命令的含义就是“SET if Not Exists”,即不存在的时候才会设置值。 只有在key不存在的情况下,将键key的值设置为value。如果key已经存在,则SETNX命令不做任何操作。 这个命令的返回值如下。 ● 命令在设置成功时返回1。 ● 命令在设置失败时返回0。 假设此时有线程A和线程B同时访问临界区代码,假设线程A首先执行了SETNX命令,并返回结果1,继续向下执行。而此时线程B再次执行SETNX命令时,返回的结果为0,则线程B不能继续向下执行。只有当线程A执行DELETE命令将设置的锁状态删除时,线程B才会成功执行S
|
5月前
|
【📕分布式锁通关指南 04】redis分布式锁的细节问题以及RedLock算法原理
本文深入探讨了基于Redis实现分布式锁时遇到的细节问题及解决方案。首先,针对锁续期问题,提出了通过独立服务、获取锁进程自己续期和异步线程三种方式,并详细介绍了如何利用Lua脚本和守护线程实现自动续期。接着,解决了锁阻塞问题,引入了带超时时间的`tryLock`机制,确保在高并发场景下不会无限等待锁。最后,作为知识扩展,讲解了RedLock算法原理及其在实际业务中的局限性。文章强调,在并发量不高的场景中手写分布式锁可行,但推荐使用更成熟的Redisson框架来实现分布式锁,以保证系统的稳定性和可靠性。
227 0
【📕分布式锁通关指南 04】redis分布式锁的细节问题以及RedLock算法原理
AI助理

你好,我是AI助理

可以解答问题、推荐解决方案等