基于curator的延迟队列

本文涉及的产品
注册配置 MSE Nacos/ZooKeeper,118元/月
云原生网关 MSE Higress,422元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介:

基于curator的延迟队列
这里不介绍关于curator的用法及优劣,旨在探究curator对于延迟队列的使用原理

怎么使用

 <groupId>org.apache.curator</groupId>
 <artifactId>curator-recipes</artifactId>
 <version>4.0.1</version>

 <groupId>org.apache.curator</groupId>
 <artifactId>curator-framework</artifactId>
 <version>4.0.1</version>


public class Processor {

private final static CuratorFramework client;
private final static DistributedDelayQueue<String> queue;

static{
    ZookeeperConfig config = ZookeeperConfig.getConfig();
    // create client
    client = CuratorFrameworkFactory.newClient(config.getRegistryAddress(),
            new ExponentialBackoffRetry(3000, 2));
    // build queue
    queue = QueueBuilder.builder(client, new AutoSubmitConsumer(),
            new AutoSubmitQueueSerializer(), DelayQueueEnum.AUTO_SUBMIT.getPath())
            .buildDelayQueue();
    // 开启执行计划
    enable();
}

/**
 * 生产数据
 *
 * @param id
 * @param endTime
 * @throws Exception
 */
public void producer(String id, Date endTime) throws Exception {
    queue.put(id, endTime.getTime());
}
private static void enable(){
    try {
        client.start();
        queue.start();
    } catch (Exception e) {
        logger.error("enable queue fail, exception:{}", e);
    }
}

}
// Serializer
class AutoSubmitQueueSerializer implements QueueSerializer {

@Override
public byte[] serialize(String s) {
     return s.getBytes("utf-8");
}

@Override
public String deserialize(byte[] bytes) {
    return new String(bytes);
}

}

// consumer
AutoSubmitConsumer implements QueueConsumer {

@Override
public void consumeMessage(String id)  {
    logger.info("consumeMessage, :{}", id);
      // service processor.
    logger.info("consumeMessage# auto submit end, result:{}, id:{}", result, id);
}

@Override
public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
}

}
是临时节点还是持久化节点,如果基于内存的话客户端或者服务端挂了以后就会存在数据丢失的问题? 是否会重新排序,zk是按照请求的时间先后顺序写入的,那么curator是怎么监听到期时间的呢?

猜想
是否持久化
是否会在每次请求的时候拿到服务端所有的节点数据进行排序后存入到服务端
验证
针对第一点,我们关闭zookeeper服务端和客户端后重新启动后之前的节点还存在所以是持久化节点
通过客户端工具连接zookeeper发现并不会每次请求的时候都会重新排序,也就是说可能在client端进行处理的
以下是在客户端工具上截取的一部分信息,key是由三部分组成的,第一部分固定的queue- , 第二部分暂不确定,第三部分是节点的序号 

源码求证
// org.apache.curator.framework.recipes.queue.DistributedQueue#start
// 部分片段
client.create().creatingParentContainersIfNeeded().forPath(queuePath);
if ( !isProducerOnly )

    {
        service.submit
            (
                new Callable<Object>()
                {
                    @Override
                    public Object call()
                    {
                        runLoop(); // step1
                        return null;
                    }
                }
            );
    }

// org.apache.curator.framework.recipes.queue.DistributedQueue#runLoop
// step1中的代码片段
while ( state.get() == State.STARTED )

        {
            try
            {
                ChildrenCache.Data      data = (maxWaitMs > 0) ? childrenCache.blockingNextGetData(currentVersion, maxWaitMs, TimeUnit.MILLISECONDS) : childrenCache.blockingNextGetData(currentVersion);
                currentVersion = data.version;
                // 诸如:
                //queue-|2E1D86A3BB6|0000000019
                //queue-|1712F752AA0|0000000036
                //queue-|1712F76FF60|0000000035
        // 拿到所有的子节点
                List<String> children = Lists.newArrayList(data.children); 
                // 根据过期时间排序
            // step6
                sortChildren(children); 
        // 排序后
                //queue-|1712F752AA0|0000000036
                //queue-|1712F76FF60|0000000035
                //queue-|2E1D86A3BB6|0000000019
                if ( children.size() > 0 )
                { //获取到期时间
                    maxWaitMs = getDelay(children.get(0));
                   
                    if ( maxWaitMs > 0 ) continue;
                }
                else  continue;
               // 死循环不断轮询是否有满足条件的节点;
               // 只要有满足条件的节点就将整个排序后的集合往下传递
                processChildren(children, currentVersion); // step2
            }
           
        }

// org.apache.curator.framework.recipes.queue.DistributedQueue#processChildren
// step2对应的代码片段:
private void processChildren(List children, long currentVersion)

{
    final Semaphore processedLatch = new Semaphore(0);
    final boolean   isUsingLockSafety = (lockPath != null);
    int             min = minItemsBeforeRefresh;
    for ( final String itemNode : children )
    {
        if ( Thread.currentThread().isInterrupted() )
        {
            processedLatch.release(children.size());
            break;
        }

        if ( !itemNode.startsWith(QUEUE_ITEM_NAME) )
        {
            processedLatch.release();
            continue;
        }

        if ( min-- <= 0 )
        {
            if ( refreshOnWatch && (currentVersion != childrenCache.getData().version) )
            {
                processedLatch.release(children.size());
                break;
            }
        }
    // step3
        if ( getDelay(itemNode) > 0 )
        {
            processedLatch.release();
            continue;
        }
        //这里使用了线程池,为了保证每一个节点都执行完毕后才返回方法所以使用了信号灯
        executor.execute
        (
            new Runnable()
            {
                @Override
                public void run()
                {
                    try
                    {
                        //是否采用了分布式锁,因为我们初始化的时候并未使用所以没有用到这里的安全锁,实际上是进入到了else中
                        if ( isUsingLockSafety )
                        {
                            
                            processWithLockSafety(itemNode, ProcessType.NORMAL);
                        }
                        else
                        {
                // 看这里 step4
                            processNormally(itemNode, ProcessType.NORMAL);
                        }
                    }finally
                    {
                        processedLatch.release();
                    }
                }
            }
        );
    }

    processedLatch.acquire(children.size());
}

// org.apache.curator.framework.recipes.queue.DistributedQueue#getDelay(java.lang.String)
// 对应step3处的代码片段
protected long getDelay(String itemNode)

        {
            return getDelay(itemNode, System.currentTimeMillis());
        }
        
        private long getDelay(String itemNode, long sortTime)
        {  // 会从key上获取时间戳        
    // step5
            long epoch = getEpoch(itemNode); 
            return epoch - sortTime; // 计算过期时间
        }

// 对应step5处的代码
private static long getEpoch(String itemNode)

{
// itemNode -> queue-|时间戳|序号
    int     index2 = itemNode.lastIndexOf(SEPARATOR);
    int     index1 = (index2 > 0) ? itemNode.lastIndexOf(SEPARATOR, index2 - 1) : -1;
    if ( (index1 > 0) && (index2 > (index1 + 1)) )
    {
        try
        {
            String  epochStr = itemNode.substring(index1 + 1, index2);
            return Long.parseLong(epochStr, 16); // 从这里可以知道queue-|这里是16进制的时间戳了|序号| 可能是出于key长度的考量吧(更节省内存),用10进制的时间戳会长很多
        }
    }
    return 0;
}

// org.apache.curator.framework.recipes.queue.DistributedQueue#sortChildren
// 会根据延时时间排序
// step6处的代码片段
protected void sortChildren(List children)

        {
            final long sortTime = System.currentTimeMillis();
            Collections.sort
            (
                children,
                new Comparator<String>()
                {
                    @Override
                    public int compare(String o1, String o2)
                    {
                        long        diff = getDelay(o1, sortTime) - getDelay(o2, sortTime);
                        return (diff < 0) ? -1 : ((diff > 0) ? 1 : 0);
                    }
                }
            );
        }

// 对应step4处的代码片段
private boolean processNormally(String itemNode, ProcessType type) throws Exception

{
    try
    {
        String  itemPath = ZKPaths.makePath(queuePath, itemNode);
        Stat    stat = new Stat();

        byte[]  bytes = null;
        if ( type == ProcessType.NORMAL )
        {
            // 获取key对应的value
            bytes = client.getData().storingStatIn(stat).forPath(itemPath);
        }
        if ( client.getState() == CuratorFrameworkState.STARTED )
        {
           // 移除节点
                        client.delete().withVersion(stat.getVersion()).forPath(itemPath);
        }

        if ( type == ProcessType.NORMAL )
        {
        //step7
            processMessageBytes(itemNode, bytes);
        }

        return true;
    }

    return false;
}

//对应step7处代码,会回调我们的业务代码
private ProcessMessageBytesCode processMessageBytes(String itemNode, byte[] bytes) throws Exception

{
    ProcessMessageBytesCode     resultCode = ProcessMessageBytesCode.NORMAL;
    MultiItem<T>                items;
    try
    {
      // 根据我们定义的序列化器序列化
        items = ItemSerializer.deserialize(bytes, serializer);
    }

    for(;;)
    {
     // 省略一部分代码
        try
        {
            consumer.consumeMessage(item); // 这里就会回调到我们的业务代码
        }
    }
    return resultCode;
}

总结
org.apache.curator.framework.recipes.queue.DistributedQueue#internalCreateNode这个方法也证实了确实是持久化且有序的节点;
如果过期时间太长而数据生产的过于频繁的话,那么势必会造成数据的积压对于性能和内存都是很大的考验;
而且是客户端不断的循环获取所有的节点、排序、再处理,由此我们也证明了前面猜想是排序后在服务端重新添加所有节点每次监听第一个节点变化的想法看来是错误的;
原文地址https://my.oschina.net/u/2486137/blog/3215445

相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
相关文章
|
消息中间件 存储 Java
RabbitMQ之延迟队列解读
RabbitMQ之延迟队列解读
|
1月前
|
设计模式 NoSQL Go
Redis 实现高效任务队列:异步队列与延迟队列详解
本文介绍了如何使用 Redis 实现异步队列和延迟队列。通过 Go 语言的 `github.com/go-redis/redis` 客户端,详细讲解了 Redis 客户端的初始化、异步队列的实现和测试、以及延迟队列的实现和测试。文章从基础连接开始,逐步构建了完整的队列系统,帮助读者更好地理解和应用这些概念,提升系统的响应速度和性能。
49 6
|
5月前
|
消息中间件 Java Kafka
说说RabbitMQ延迟队列实现原理?
说说RabbitMQ延迟队列实现原理?
85 0
说说RabbitMQ延迟队列实现原理?
|
6月前
|
消息中间件 数据库
03.RabbitMQ延迟队列
03.RabbitMQ延迟队列
55 0
|
7月前
|
NoSQL Java Redis
分布式延时消息的另外一种选择 Redisson (推荐使用)
分布式延时消息的另外一种选择 Redisson (推荐使用)
274 1
|
7月前
|
存储 消息中间件 NoSQL
Redisson源码(二)延迟队列RDelayedQueue的使用及原理分析
Redisson源码(二)延迟队列RDelayedQueue的使用及原理分析
1378 3
Redisson源码(二)延迟队列RDelayedQueue的使用及原理分析
|
7月前
|
消息中间件 存储 NoSQL
rocketmq实现延迟队列思路探讨
本文介绍了两种实现RocketMQ延迟消息的方法。非任意时间延迟可通过在服务器端配置`messageDelayLevel`实现,但需重启服务。任意时间延迟则分为两种策略:一是结合原生逻辑和时间轮,利用RocketMQ的默认延迟等级组合支持任意延迟,但可能丢失1分钟内的数据;二是使用存储介质(如Redis)加时间轮,消息存储和定时发送结合,能处理数据不一致和丢失问题,但涉及更多组件。推荐项目[civism-rocket](https://github.com/civism/civism-rocket)作为参考。
238 1
|
7月前
|
消息中间件 存储 Java
RabbitMQ之延迟队列(手把手教你学习延迟队列)
【1月更文挑战第12天】延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列的。
1725 15
|
消息中间件 Java Docker
RabbitMQ 如何实现延迟队列?
RabbitMQ 如何实现延迟队列?
430 1
|
消息中间件 存储 调度
RabbitMQ的延迟队列
RabbitMQ的延迟队列是一种特殊的队列,可以在消息发送后延迟一段时间后再将消息投递给消费者。
188 0