SymmetricDS 2.2.5 undeploy时symmetricScheduler job线程杀不掉分析

简介: SymmetricDS的同步机制是定时周期性同步,我们项目根据业务需要,只需要客户在UI激活一次replication时候才开始同步,所以这里我们的设计是每次激活时,用symmetric自己的deploy/undeploy机制让其在启动时候自动跑replication任务。

SymmetricDS的同步机制是定时周期性同步,我们项目根据业务需要,只需要客户在UI激活一次replication时候才开始同步,所以这里我们的设计是每次激活时,用symmetric自己的deploy/undeploy机制让其在启动时候自动跑replication任务。

 

最近遇到一个问题, 在使用中,发现当数据同步时间间隔设在1小时以内时,运行一天以上会发现大量的SymmetricSchedule线程积压在JVM中没有释放,怀疑是Symmetric 2.2.5中在undeploy时释放线程有问题。

为了解决这个问题,先看下SymmetricDS中其需要的定时任务都定义在哪里。

 

在Symmetric engine中,整个symmetricDS的入口类在AbstractSymmetricEngine,在这个类的启动方法如下:

 

    public synchronized boolean start() {

        setup();
        
        if (isConfigured()) {
                  ...
                    getTriggerRouterService().syncTriggers();
                    heartbeat(false);
                    jobManager.startJobs();
                   ...
                    started = true;
                } finally {
                    starting = false;
                }
                
                return true;
            } else {
                return false;
            }
        } else {
           ...
        }
    }
 

 

可以看到其内置的job是由jobManager启动的,再看其实现类JobManager代码片段:

 

  private List<IJob> jobs;  

  public synchronized void startJobs() {
        for (IJob job : jobs) {
            if (job.isAutoStartConfigured()) {
                job.start();
            } else {
                log.info("JobNoAutoStart", job.getName());
            }
        }
    }
 可以看到是直接调用集合jobs中的job实例,该实例在SymmetricDS 2 中是由spring注入的,其spring xml配置如下:

 

 

    <bean id="jobManager" class="org.jumpmind.symmetric.job.JobManager">
        <property name="jobs">
            <list>
                <ref bean="job.routing" />
                <ref bean="job.push" />
                <ref bean="job.pull" />
                <ref bean="job.purge.outgoing" />
                <ref bean="job.purge.incoming" />
                <ref bean="job.purge.datagaps" />
                <ref bean="job.stat.flush" />
                <ref bean="job.synctriggers" />
                <ref bean="job.heartbeat" />
                <ref bean="job.watchdog" />
            </list>
        </property>
        <property name="taskScheduler" ref="symmetricScheduler" />
    </bean>
    <bean id="job.routing" parent="job.abstract" class="org.jumpmind.symmetric.job.RouterJob">
        <property name="routingService" ref="routingService" />
        <property name="autoStartParameterName" value="start.route.job" />
    </bean>
 以上就是其内置的job,支持周期性运行和定时运行,以便进行数据库同步。这些job就是在undeploy时候没有杀掉的线程,由于我们关注的是undeploy时这些job的卸载情况,回到AbstractSymmetricEngine:

 

 

private ThreadPoolTaskScheduler taskScheduler;    
public synchronized void stop() {
        ...
        jobManager.stopJobs();
        getRouterService().stop();
        started = false;
        starting = false;
    }
    public synchronized void destroy () {
        stopJobs();
    }
 回到JobManager:

 

 

    public synchronized void stopJobs() {
        for (IJob job : jobs) {
            job.stop();
        }      
    }
 

 

jobManager直接遍历每个job并调用其stop方法。

所有job都有个超类AbstractJob,这个类的stop方法如下:

 

    public boolean stop() {
        boolean success = false;
        if (this.scheduledJob != null) {
            success = this.scheduledJob.cancel(true);
            this.scheduledJob = null;
            if (success) {
                log.info("JobCancelled", jobName);
                started = false;                
            } else {
                log.warn("JobFailedToCancel", jobName);
            }
        }
        return success;
    }
 这里只是调用了线程的cancel方法,在我们实际使用中,只是cancel并没有终结掉这些线程,这是我们JVM中的thread dump:

 

 

"symmetricScheduler-14" prio=6 tid=0x49d7f800 nid=0x13b4 waiting on condition [0x4e2ef000]
java.lang.Thread.State: TIMED_WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x1d6873b8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:198)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2025)
at java.util.concurrent.DelayQueue.take(DelayQueue.java:164)
at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:609)
at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:602)
at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:947)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
at java.lang.Thread.run(Thread.java:662)

 

找到这里,我们知道这是其释放线程的一个bug,所以我直接去找了一下symmetricDS的bug track里有没有解决,发现这是2.2.*中的一个bug,在2.4.0以后的版本中已经fix,其JIRA地址在:

Tomcat shutdown hangs when SymmetricDS is deployed as a war 写道
http://jira.codehaus.org/browse/SYMMETRICDS-466?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel

 

这个fix主要是在JobManager中加入了调用spring的线程池的shutdow方法:

    public synchronized void destroy () {
        stopJobs();
        if (taskScheduler != null) {
            taskScheduler.shutdown();
        }
    }

 

至此,在项目中使用symmetricDS新的代码,undeploy后线程都被杀掉,重新deploy后没有多余的job线程运行。

 

我们的启动方式:

没有在配置文件中配SymmetricDS需要的.cron项,直接让其按照线程的scheduleWithFixedDelay方式运行。该任务模式的配置在symmetric.properties文件中,在symmetric源码(AbstractJob)中可以清晰的看到如何获取该配置文件的值:

this.cronExpression = parameterService.getString(jobName + ".cron", null);

 之后在该类的启动线程方法中会有判断:

    public void start() {
        if (this.scheduledJob == null) {
            log.info("JobStarting", jobName);
            if (!StringUtils.isBlank(cronExpression)) {
                this.scheduledJob = taskScheduler.schedule(this, new CronTrigger(cronExpression));
                started = true;
            } else {

                int startDelay = randomTimeSlot.getRandomValueSeededByExternalId();
                if (this.timeBetweenRunsInMs > 0) {
                    this.scheduledJob = taskScheduler.scheduleWithFixedDelay(this,
                            new Date(System.currentTimeMillis() + startDelay),
                            this.timeBetweenRunsInMs);
                    started = true;
                } else {
                    log.error("JobFailedToSchedule", jobName);
                }
            }
        }
    }

 

 

下面是正常后的JVM 线程截图:

 

 
目录
相关文章
|
1月前
|
设计模式 消息中间件 安全
【JUC】(3)常见的设计模式概念分析与多把锁使用场景!!理解线程状态转换条件!带你深入JUC!!文章全程笔记干货!!
JUC专栏第三篇,带你继续深入JUC! 本篇文章涵盖内容:保护性暂停、生产者与消费者、Park&unPark、线程转换条件、多把锁情况分析、可重入锁、顺序控制 笔记共享!!文章全程干货!
176 1
|
2月前
|
数据采集 存储 弹性计算
高并发Java爬虫的瓶颈分析与动态线程优化方案
高并发Java爬虫的瓶颈分析与动态线程优化方案
|
存储 NoSQL Redis
Redis 新版本引入多线程的利弊分析
【10月更文挑战第16天】Redis 新版本引入多线程是一个具有挑战性和机遇的改变。虽然多线程带来了一些潜在的问题和挑战,但也为 Redis 提供了进一步提升性能和扩展能力的可能性。在实际应用中,我们需要根据具体的需求和场景,综合评估多线程的利弊,谨慎地选择和使用 Redis 的新版本。同时,Redis 开发者也需要不断努力,优化和完善多线程机制,以提供更加稳定、高效和可靠的 Redis 服务。
286 1
|
10月前
|
并行计算 安全 Java
Python GIL(全局解释器锁)机制对多线程性能影响的深度分析
在Python开发中,GIL(全局解释器锁)一直备受关注。本文基于CPython解释器,探讨GIL的技术本质及其对程序性能的影响。GIL确保同一时刻只有一个线程执行代码,以保护内存管理的安全性,但也限制了多线程并行计算的效率。文章分析了GIL的必要性、局限性,并介绍了多进程、异步编程等替代方案。尽管Python 3.13计划移除GIL,但该特性至少要到2028年才会默认禁用,因此理解GIL仍至关重要。
761 16
Python GIL(全局解释器锁)机制对多线程性能影响的深度分析
线程CPU异常定位分析
【10月更文挑战第3天】 开发过程中会出现一些CPU异常升高的问题,想要定位到具体的位置就需要一系列的分析,记录一些分析手段。
287 0
|
11月前
|
调度 开发者
核心概念解析:进程与线程的对比分析
在操作系统和计算机编程领域,进程和线程是两个基本而核心的概念。它们是程序执行和资源管理的基础,但它们之间存在显著的差异。本文将深入探讨进程与线程的区别,并分析它们在现代软件开发中的应用和重要性。
439 4
|
安全 Java API
Java线程池原理与锁机制分析
综上所述,Java线程池和锁机制是并发编程中极其重要的两个部分。线程池主要用于管理线程的生命周期和执行并发任务,而锁机制则用于保障线程安全和防止数据的并发错误。它们深入地结合在一起,成为Java高效并发编程实践中的关键要素。
182 0
|
存储 监控 Java
|
安全 Java 开发者
Swing 的线程安全分析
【8月更文挑战第22天】
277 4
|
Java 数据库连接 数据库
当线程中发生异常时的情况分析
【8月更文挑战第22天】
259 4

热门文章

最新文章

下一篇
oss云网关配置