基于curator的延迟队列
这里不介绍关于curator的用法及优劣,旨在探究curator对于延迟队列的使用原理
怎么使用
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.1</version>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.0.1</version>
public class Processor {
private final static CuratorFramework client;
private final static DistributedDelayQueue<String> queue;
static{
ZookeeperConfig config = ZookeeperConfig.getConfig();
// create client
client = CuratorFrameworkFactory.newClient(config.getRegistryAddress(),
new ExponentialBackoffRetry(3000, 2));
// build queue
queue = QueueBuilder.builder(client, new AutoSubmitConsumer(),
new AutoSubmitQueueSerializer(), DelayQueueEnum.AUTO_SUBMIT.getPath())
.buildDelayQueue();
// 开启执行计划
enable();
}
/**
* 生产数据
*
* @param id
* @param endTime
* @throws Exception
*/
public void producer(String id, Date endTime) throws Exception {
queue.put(id, endTime.getTime());
}
private static void enable(){
try {
client.start();
queue.start();
} catch (Exception e) {
logger.error("enable queue fail, exception:{}", e);
}
}
}
// Serializer
class AutoSubmitQueueSerializer implements QueueSerializer {
@Override
public byte[] serialize(String s) {
return s.getBytes("utf-8");
}
@Override
public String deserialize(byte[] bytes) {
return new String(bytes);
}
}
// consumer
AutoSubmitConsumer implements QueueConsumer {
@Override
public void consumeMessage(String id) {
logger.info("consumeMessage, :{}", id);
// service processor.
logger.info("consumeMessage# auto submit end, result:{}, id:{}", result, id);
}
@Override
public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
}
}
是临时节点还是持久化节点,如果基于内存的话客户端或者服务端挂了以后就会存在数据丢失的问题? 是否会重新排序,zk是按照请求的时间先后顺序写入的,那么curator是怎么监听到期时间的呢?
猜想
是否持久化
是否会在每次请求的时候拿到服务端所有的节点数据进行排序后存入到服务端
验证
针对第一点,我们关闭zookeeper服务端和客户端后重新启动后之前的节点还存在所以是持久化节点
通过客户端工具连接zookeeper发现并不会每次请求的时候都会重新排序,也就是说可能在client端进行处理的
以下是在客户端工具上截取的一部分信息,key是由三部分组成的,第一部分固定的queue- , 第二部分暂不确定,第三部分是节点的序号
源码求证
// org.apache.curator.framework.recipes.queue.DistributedQueue#start
// 部分片段
client.create().creatingParentContainersIfNeeded().forPath(queuePath);
if ( !isProducerOnly )
{
service.submit
(
new Callable<Object>()
{
@Override
public Object call()
{
runLoop(); // step1
return null;
}
}
);
}
// org.apache.curator.framework.recipes.queue.DistributedQueue#runLoop
// step1中的代码片段
while ( state.get() == State.STARTED )
{
try
{
ChildrenCache.Data data = (maxWaitMs > 0) ? childrenCache.blockingNextGetData(currentVersion, maxWaitMs, TimeUnit.MILLISECONDS) : childrenCache.blockingNextGetData(currentVersion);
currentVersion = data.version;
// 诸如:
//queue-|2E1D86A3BB6|0000000019
//queue-|1712F752AA0|0000000036
//queue-|1712F76FF60|0000000035
// 拿到所有的子节点
List<String> children = Lists.newArrayList(data.children);
// 根据过期时间排序
// step6
sortChildren(children);
// 排序后
//queue-|1712F752AA0|0000000036
//queue-|1712F76FF60|0000000035
//queue-|2E1D86A3BB6|0000000019
if ( children.size() > 0 )
{ //获取到期时间
maxWaitMs = getDelay(children.get(0));
if ( maxWaitMs > 0 ) continue;
}
else continue;
// 死循环不断轮询是否有满足条件的节点;
// 只要有满足条件的节点就将整个排序后的集合往下传递
processChildren(children, currentVersion); // step2
}
}
// org.apache.curator.framework.recipes.queue.DistributedQueue#processChildren
// step2对应的代码片段:
private void processChildren(List children, long currentVersion)
{
final Semaphore processedLatch = new Semaphore(0);
final boolean isUsingLockSafety = (lockPath != null);
int min = minItemsBeforeRefresh;
for ( final String itemNode : children )
{
if ( Thread.currentThread().isInterrupted() )
{
processedLatch.release(children.size());
break;
}
if ( !itemNode.startsWith(QUEUE_ITEM_NAME) )
{
processedLatch.release();
continue;
}
if ( min-- <= 0 )
{
if ( refreshOnWatch && (currentVersion != childrenCache.getData().version) )
{
processedLatch.release(children.size());
break;
}
}
// step3
if ( getDelay(itemNode) > 0 )
{
processedLatch.release();
continue;
}
//这里使用了线程池,为了保证每一个节点都执行完毕后才返回方法所以使用了信号灯
executor.execute
(
new Runnable()
{
@Override
public void run()
{
try
{
//是否采用了分布式锁,因为我们初始化的时候并未使用所以没有用到这里的安全锁,实际上是进入到了else中
if ( isUsingLockSafety )
{
processWithLockSafety(itemNode, ProcessType.NORMAL);
}
else
{
// 看这里 step4
processNormally(itemNode, ProcessType.NORMAL);
}
}finally
{
processedLatch.release();
}
}
}
);
}
processedLatch.acquire(children.size());
}
// org.apache.curator.framework.recipes.queue.DistributedQueue#getDelay(java.lang.String)
// 对应step3处的代码片段
protected long getDelay(String itemNode)
{
return getDelay(itemNode, System.currentTimeMillis());
}
private long getDelay(String itemNode, long sortTime)
{ // 会从key上获取时间戳
// step5
long epoch = getEpoch(itemNode);
return epoch - sortTime; // 计算过期时间
}
// 对应step5处的代码
private static long getEpoch(String itemNode)
{
// itemNode -> queue-|时间戳|序号
int index2 = itemNode.lastIndexOf(SEPARATOR);
int index1 = (index2 > 0) ? itemNode.lastIndexOf(SEPARATOR, index2 - 1) : -1;
if ( (index1 > 0) && (index2 > (index1 + 1)) )
{
try
{
String epochStr = itemNode.substring(index1 + 1, index2);
return Long.parseLong(epochStr, 16); // 从这里可以知道queue-|这里是16进制的时间戳了|序号| 可能是出于key长度的考量吧(更节省内存),用10进制的时间戳会长很多
}
}
return 0;
}
// org.apache.curator.framework.recipes.queue.DistributedQueue#sortChildren
// 会根据延时时间排序
// step6处的代码片段
protected void sortChildren(List children)
{
final long sortTime = System.currentTimeMillis();
Collections.sort
(
children,
new Comparator<String>()
{
@Override
public int compare(String o1, String o2)
{
long diff = getDelay(o1, sortTime) - getDelay(o2, sortTime);
return (diff < 0) ? -1 : ((diff > 0) ? 1 : 0);
}
}
);
}
// 对应step4处的代码片段
private boolean processNormally(String itemNode, ProcessType type) throws Exception
{
try
{
String itemPath = ZKPaths.makePath(queuePath, itemNode);
Stat stat = new Stat();
byte[] bytes = null;
if ( type == ProcessType.NORMAL )
{
// 获取key对应的value
bytes = client.getData().storingStatIn(stat).forPath(itemPath);
}
if ( client.getState() == CuratorFrameworkState.STARTED )
{
// 移除节点
client.delete().withVersion(stat.getVersion()).forPath(itemPath);
}
if ( type == ProcessType.NORMAL )
{
//step7
processMessageBytes(itemNode, bytes);
}
return true;
}
return false;
}
//对应step7处代码,会回调我们的业务代码
private ProcessMessageBytesCode processMessageBytes(String itemNode, byte[] bytes) throws Exception
{
ProcessMessageBytesCode resultCode = ProcessMessageBytesCode.NORMAL;
MultiItem<T> items;
try
{
// 根据我们定义的序列化器序列化
items = ItemSerializer.deserialize(bytes, serializer);
}
for(;;)
{
// 省略一部分代码
try
{
consumer.consumeMessage(item); // 这里就会回调到我们的业务代码
}
}
return resultCode;
}
总结
org.apache.curator.framework.recipes.queue.DistributedQueue#internalCreateNode这个方法也证实了确实是持久化且有序的节点;
如果过期时间太长而数据生产的过于频繁的话,那么势必会造成数据的积压对于性能和内存都是很大的考验;
而且是客户端不断的循环获取所有的节点、排序、再处理,由此我们也证明了前面猜想是排序后在服务端重新添加所有节点每次监听第一个节点变化的想法看来是错误的;
原文地址https://my.oschina.net/u/2486137/blog/3215445