Storm-源码分析- timer (backtype.storm.timer)

简介:

mk-timer

timer是基于PriorityQueue实现的(和PriorityBlockingQueue区别, 在于没有阻塞机制, 不是线程安全的), 优先级队列是堆数据结构的典型应用 
默认情况下, 按照自然顺序(其实就是默认comparator的定义), 最小的元素排在堆头 
当然也可以自己重新实现comparator接口, 比如timer就用reify重新实现了comparator接口

整个过程其实比较简单, 开个timer-thread, 不断check PriorityQueue里面时间最小的timer是否已经可以触发 
如果可以, 就poll出来, 调用callback, 并sleep, 都很好理解

唯一需要说的是, 这里使用Semaphore, 
信号量和lock相似, 都是用于互斥 
不同在于, 信号量模拟资源管理, 所以不同于lock的排他, 信号量可以接收多个aquire(取决于配置) 
另外一个比较大的区别, lock是解铃还须系铃人, 谁锁谁解, 而信号量无所谓, 任何线程都可以调用release, 或acquire 
这里使用信号量, 是用于在cancel-timer时, 等待timer-thread结束

(defn cancel-timer [timer]
  (check-active! timer)
  (locking (:lock timer)
    (reset! (:active timer) false)
    (.interrupt (:timer-thread timer)))
  (.acquire (:cancel-notifier timer)))
因为cancel的过程就是将active置false, 然后就是调用acquire等待信号量cancel-notifier被释放 
而timer-thread在线程结束前, 会release这个信号量

 

(defnk mk-timer [:kill-fn (fn [& _] )]
  (let [queue (PriorityQueue. 10
                              (reify Comparator
                                (compare [this o1 o2]
                                  (- (first o1) (first o2))
                                  )
                                (equals [this obj]
                                  true
                                  )))
        active (atom true) ;;标志位
        lock (Object.)     ;;创建lock对象, 由于PriorityQueue非线程安全, 所以使用locking来保证同时只有一个线程访问queue
        notifier (Semaphore. 0) ;;创建信号量, 初始为0
        timer-thread (Thread.
                      (fn []
                        (while @active
                          (try
                            ;;peek读但不从queue中取出, 先读出time看看, 符合条件再取出 
                            (let [[time-secs _ _ :as elem] (locking lock (.peek queue))]
                              (if (and elem (>= (current-time-secs) time-secs)) 
                                ;;无法保证恰好, 只要当前时间>=time-secs, 就可以执行, 可想而知对于afn必须不能耗时, 否则会影响其他timer
                                ;; imperative to not run the function inside the timer lock
                                ;; otherwise, it's possible to deadlock if function deals with other locks
                                ;; (like the submit lock)
                                (let [afn (locking lock (second (.poll queue)))]  ;;poll从queue中取出
                                  (afn))    ;;真正执行timer中的callback
                                (Time/sleep 1000)
                                ))
                            (catch Throwable t
                              ;; because the interrupted exception can be wrapped in a runtimeexception
                              (when-not (exception-cause? InterruptedException t)
                                (kill-fn t)
                                (reset! active false)
                                (throw t))
                              )))
                        (.release notifier)))]
    (.setDaemon timer-thread true)
    (.setPriority timer-thread Thread/MAX_PRIORITY)
    (.start timer-thread)
    {:timer-thread timer-thread
     :queue queue
     :active active
     :lock lock
     :cancel-notifier notifier}))

 

schedule

schedule其实就是往PriorityQueue里面插入timer

对于循环schdule, 就是在timer的callback里面, 再次schedule

(defnk schedule [timer delay-secs afn :check-active true]
  (when check-active (check-active! timer))
  (let [id (uuid)
        ^PriorityQueue queue (:queue timer)]
    (locking (:lock timer)
      (.add queue [(+ (current-time-secs) delay-secs) afn id])
      )))

(defn schedule-recurring [timer delay-secs recur-secs afn]
  (schedule timer
            delay-secs
            (fn this []
              (afn)
              (schedule timer recur-secs this :check-active false)) ; this avoids a race condition with cancel-timer
            ))

 

使用例子

Supervisor中的使用例子, 定期的调用hb函数更新supervisor的hb 
在mk-timer时, 传入的kill-fn callback, 会在timer-thread发生exception的时候被调用

:timer (mk-timer :kill-fn (fn [t]
                            (log-error t "Error when processing event")
                            (halt-process! 20 "Error when processing an event")
                            ))

(schedule-recurring (:timer supervisor)
                        0
                        (conf SUPERVISOR-HEARTBEAT-FREQUENCY-SECS)
                        heartbeat-fn)

本文章摘自博客园,原文发布日期:2013-07-02
目录
相关文章
|
7月前
|
测试技术 API 持续交付
如何免费解决 Postman 集合限制
这里有几种方法可以解决 Postman 集合运行器 (Postman Collection Runner) 的限制。然而,使用 Apifox 创建你的集合没有任何限制,而且是免费的。
如何免费解决 Postman 集合限制
|
Web App开发 Ubuntu
解决Ubuntu14.04安装Chrome浏览器打不开的问题
解决Ubuntu14.04安装Chrome浏览器打不开的问题
927 0
|
机器学习/深度学习 算法 搜索推荐
外卖平台推荐算法的优化与实践
外卖平台推荐算法的优化与实践
|
机器学习/深度学习 人工智能 网络架构
Transformer原理解析——一种Open AI和DeepMind都在用的神经网络架构
Transformer模型是一种日益流行的神经网络结构。它最近被OpenAI用于他们的语言模型中。与此同时,近期也被DeepMind用于它们的程序“星际争霸”中击败了一名顶级职业星际玩家。 Transformer模型的开发是为了解决序列转换及神经机器翻译问题。
9086 0
|
Web App开发 数据采集 开发者
如何解决ChromeDriver 126找不到chromedriver.exe问题
当使用Selenium与ChromeDriver 126时,遇到`chromedriver.exe`找不到的错误,可能是因为版本不匹配、文件路径错误或系统设置不当。解决方法包括:匹配Chrome浏览器版本下载ChromeDriver,确保文件在正确路径且有执行权限,以及调整系统设置允许执行。示例代码展示了如何设置代理IP、user-agent和cookie来运行Selenium爬虫。通过这些步骤,可以确保爬虫程序顺利运行。
1021 2
如何解决ChromeDriver 126找不到chromedriver.exe问题
|
SQL 索引
SQL查看表字段信息如:字段名、字段类型、字段精度、字段大小、索引、主键等
表名、字段名、字段类型、字段精度、字段大小 字段名、是否为主键、字段类型、字段大小、索引名
1608 0
SQL查看表字段信息如:字段名、字段类型、字段精度、字段大小、索引、主键等
详尽分享马卡龙色色卡及其十六进制代码
详尽分享马卡龙色色卡及其十六进制代码
573 0
|
机器学习/深度学习 PyTorch 算法框架/工具
base model初始化large model,造成的参数矩阵对不上权重不匹配问题+修改预训练权重形状和上采样
base model初始化large model,造成的参数矩阵对不上权重不匹配问题+修改预训练权重形状和上采样
589 0
|
资源调度 机器人
微信接入ChatGPT,使用Node+ChatGPT+Wechaty做一个微信机器人
微信接入ChatGPT,使用Node+ChatGPT+Wechaty做一个微信机器人
4191 4
微信接入ChatGPT,使用Node+ChatGPT+Wechaty做一个微信机器人
一元函数微分学中导数--高阶导数--极值--凹凸性--泰勒展开式
一元函数微分学中导数--高阶导数--极值--凹凸性--泰勒展开式
下一篇
开通oss服务