Scheduled Jobs with Custom Clock Processes in Java with Quartz and RabbitMQ

简介: 原文地址: https://devcenter.heroku.com/articles/scheduled-jobs-custom-clock-processes-java-quartz-rabbitmq   Table of Contents Prerequisites Scheduli...

原文地址: https://devcenter.heroku.com/articles/scheduled-jobs-custom-clock-processes-java-quartz-rabbitmq

 

Table of Contents

There are numerous ways to schedule background jobs in Java applications. This article will teach you how to setup a Java application that uses the Quartz library along with RabbitMQ to create a scalable and reliable method of scheduling background jobs on Heroku.

Many of the common methods for background processing in Java advocate running background jobs within the same application as the web tier. This approach has scalability and reliability constraints.

A better approach is to move background jobs into separate processes so that the web tier is distinctly separate from the background processing tier. This allows the web tier to be exclusively for handling web requests. The scheduling of jobs should also be it’s own tier that puts jobs onto a queue. The worker processing tier can then be scaled independently from the rest of the application.

If you have questions about Java on Heroku, consider discussing them in the Java on Heroku forums.

The source for this article's reference application is available on GitHub.

Prerequisites

To clone the sample project to your local computer run:

$ git clone https://github.com/heroku/devcenter-java-quartz-rabbitmq.git
Cloning into devcenter-java-quartz-rabbitmq...

$ cd devcenter-java-quartz-rabbitmq/

Scheduling jobs with Quartz

custom clock process will be used to create jobs and add them to a queue. To setup a custom clock process use the Quartz library. In Maven the dependency is declared with:

See the reference app's pom.xml for the full Maven build definition.

<dependency>
    <groupId>org.quartz-scheduler</groupId>
    <artifactId>quartz</artifactId>
    <version>2.1.5</version> </dependency>

Now a Java application can be used to schedule jobs. Here is an example:

package com.heroku.devcenter;

import org.quartz.*;
import org.quartz.impl.StdSchedulerFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import static org.quartz.JobBuilder.newJob; import static org.quartz.SimpleScheduleBuilder.repeatSecondlyForever; import static org.quartz.TriggerBuilder.newTrigger; public class SchedulerMain { final static Logger logger = LoggerFactory.getLogger(SchedulerMain.class); public static void main(String[] args) throws Exception { Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler(); scheduler.start(); JobDetail jobDetail = newJob(HelloJob.class).build(); Trigger trigger = newTrigger() .startNow() .withSchedule(repeatSecondlyForever(2)) .build(); scheduler.scheduleJob(jobDetail, trigger); } public static class HelloJob implements Job { @Override public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException { logger.info("HelloJob executed"); } } }

This simple example creates a HelloJob every two seconds that simply logs a message. Quartz has a very extensive API for creating Triggerschedules.

To test this application locally you can run the Maven build and then run the SchedulerMain Java class:

$ mvn package
INFO] Scanning for projects...
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building devcenter-java-quartz-rabbitmq 1.0-SNAPSHOT [INFO] ------------------------------------------------------------------------  $ java -cp target/classes:target/dependency/* com.heroku.devcenter.SchedulerMain ... 66 [main] INFO org.quartz.impl.StdSchedulerFactory - Quartz scheduler 'DefaultQuartzScheduler' initialized from default resource file in Quartz package: 'quartz.properties' 66 [main] INFO org.quartz.impl.StdSchedulerFactory - Quartz scheduler version: 2.1.5 66 [main] INFO org.quartz.core.QuartzScheduler - Scheduler DefaultQuartzScheduler_$_NON_CLUSTERED started. 104 [DefaultQuartzScheduler_Worker-1] INFO com.heroku.devcenter.SchedulerMain - HelloJob executed 2084 [DefaultQuartzScheduler_Worker-2] INFO com.heroku.devcenter.SchedulerMain - HelloJob executed

Press Ctrl-C to exit the app.

If the HelloJob actually did work itself then we would have a runtime bottleneck because we could not scale the scheduler while avoiding duplicate jobs being scheduled. Quartz does have a JDBC module that can use a database to prevent jobs from being duplicated but a simpler approach is to only run one instance of the scheduler and have the scheduled jobs added to a message queue where they can be processes in parallel by job worker processes.

Queuing jobs with RabbitMQ

RabbitMQ can be used as a message queue so the scheduler process can be used just to add jobs to a queue and worker processes can be used to grab jobs from the queue and process them. To add the RabbitMQ client library as a dependency in Maven specify the following in dependencies block of the pom.xml file:

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>2.8.2</version> </dependency>

If you want to test this locally then install RabbitMQ and set an environment variable that will provide the application the connection information to your RabbitMQ server.

On Windows:

$ set CLOUDAMQP_URL="amqp://guest:guest@localhost:5672/%2f"

On Mac/Linux:

$ export CLOUDAMQP_URL="amqp://guest:guest@localhost:5672/%2f"

The CLOUDAMQP_URL environment variable will be used by the scheduler and worker processes to connect to the shared message queue. This example uses that environment variable because that is the way theCloudAMQP Heroku Add-on will provide it’s connection information to the application.

The SchedulerMain class needs to be updated to add a new message onto a queue every time the HelloJob is executed. Here are the newSchedulerMain and HelloJob classes from the SchedulerMain.java file in the sample project:

package com.heroku.devcenter;


import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MessageProperties; import org.quartz.*; import org.quartz.impl.StdSchedulerFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.Map; import static org.quartz.JobBuilder.newJob; import static org.quartz.SimpleScheduleBuilder.repeatSecondlyForever; import static org.quartz.TriggerBuilder.newTrigger; public class SchedulerMain { final static Logger logger = LoggerFactory.getLogger(SchedulerMain.class); final static ConnectionFactory factory = new ConnectionFactory(); public static void main(String[] args) throws Exception { factory.setUri(System.getenv("CLOUDAMQP_URL")); Scheduler scheduler = StdSchedulerFactory.getDefaultScheduler(); scheduler.start(); JobDetail jobDetail = newJob(HelloJob.class).build(); Trigger trigger = newTrigger() .startNow() .withSchedule(repeatSecondlyForever(5)) .build(); scheduler.scheduleJob(jobDetail, trigger); } public static class HelloJob implements Job { @Override public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException { try { Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); String queueName = "work-queue-1"; Map<String, Object> params = new HashMap<String, Object>(); params.put("x-ha-policy", "all"); channel.queueDeclare(queueName, true, false, false, params); String msg = "Sent at:" + System.currentTimeMillis(); byte[] body = msg.getBytes("UTF-8"); channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, body); logger.info("Message Sent: " + msg); connection.close(); } catch (Exception e) { logger.error(e.getMessage(), e); } } } }

In this example every time the HelloJob is executed it adds a message onto a RabbitMQ message queue simply containing a String with the time the String was created. Running the updated SchedulerMainshould add a new message to the queue every 5 seconds.

Processing jobs

Next, create a Java application that will pull messages from the queue and handle them. This application will also use the RabbitFactoryUtilto get a connection to RabbitMQ from the CLOUDAMQP_URL environment variable. Here is the WorkerMain class from the WorkerMain.java file in the example project:

package com.heroku.devcenter;


import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Collections; import java.util.HashMap; import java.util.Map; public class WorkerMain { final static Logger logger = LoggerFactory.getLogger(WorkerMain.class); public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setUri(System.getenv("CLOUDAMQP_URL")); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); String queueName = "work-queue-1"; Map<String, Object> params = new HashMap<String, Object>(); params.put("x-ha-policy", "all"); channel.queueDeclare(queueName, true, false, false, params); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, false, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); if (delivery != null) { String msg = new String(delivery.getBody(), "UTF-8"); logger.info("Message Received: " + msg); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } } }

This class simply waits for new messages on the message queue and logs that it received them. You can run this example locally by doing a build and then running the WorkerMain class:

$ mvn package
$ java -cp target/classes:target/dependency/* com.heroku.devcenter.WorkerMain

You can also run multiple instances of this example locally to see how the job processing can be horizontally distributed.

Running on Heroku

Now that you have everything working locally you can run this on Heroku. First declare the process model in a new file named Procfilecontaining:

scheduler: java $JAVA_OPTS -cp target/classes:target/dependency/* com.heroku.devcenter.SchedulerMain
worker: java $JAVA_OPTS -cp target/classes:target/dependency/* com.heroku.devcenter.WorkerMain

This defines two process types that can be executed on Heroku; one named scheduler for the SchedulerMain app and one named workerfor the WorkerMain app.

To run on Heroku you will need to push a Git repository to Heroku containing the Maven build descriptor, source code, and Procfile. If you cloned the example project then you already have a Git repository. If you need to create a new git repository containing these files, run:

$ git init
$ git add src pom.xml Procfile
$ git commit -m init

Create a new application on Heroku from within the project’s root directory:

$ heroku create
Creating furious-cloud-2945... done, stack is cedar-14
http://furious-cloud-2945.herokuapp.com/ | git@heroku.com:furious-cloud-2945.git
Git remote heroku added

Then add the CloudAMQP add-on to your application:

$ heroku addons:add cloudamqp
Adding cloudamqp to furious-cloud-2945... done, v2 (free)
cloudamqp documentation available at: https://devcenter.heroku.com/articles/cloudamqp

Now push your Git repository to Heroku:

$ git push heroku master
Counting objects: 165, done.
Delta compression using up to 2 threads.
...
-----> Heroku receiving push -----> Java app detected ... -----> Discovering process types  Procfile declares types -> scheduler, worker -----> Compiled slug size is 1.4MB -----> Launching... done, v5  http://furious-cloud-2945.herokuapp.com deployed to Heroku

This will run the Maven build for your project on Heroku and create a slug file containing the executable assets for your application. To run the application you will need to allocate dynos to run each process type. You should only allocate one dyno to run the scheduler process type to avoid duplicate job scheduling. You can allocate as many dynos as needed to the worker process type since it is event driven and parallelizable through the RabbitMQ message queue.

To allocate one dyno to the scheduler process type run:

$ heroku ps:scale scheduler=1
Scaling scheduler processes... done, now running 1

This should begin adding messages to the queue every five seconds. To allocate two dynos to the worker process type run:

$ heroku ps:scale worker=2
Scaling worker processes... done, now running 2

This will provision two dynos, each which will run the WorkerMain app and pull messages from the queue for processing. You can verify that this is happening by watching the Heroku logs for your application. To open a feed of your logs run:

$ heroku logs -t
2012-06-26T22:26:47+00:00 app[scheduler.1]: 100223 [DefaultQuartzScheduler_Worker-1] INFO com.heroku.devcenter.SchedulerMain - Message Sent: Sent at:1340749607126
2012-06-26T22:26:47+00:00 app[worker.2]: 104798 [main] INFO com.heroku.devcenter.WorkerMain - Message Received: Sent at:1340749607126
2012-06-26T22:26:52+00:00 app[scheduler.1]: 105252 [DefaultQuartzScheduler_Worker-2] INFO com.heroku.devcenter.SchedulerMain - Message Sent: Sent at:1340749612155
2012-06-26T22:26:52+00:00 app[worker.1]: 109738 [main] INFO com.heroku.devcenter.WorkerMain - Message Received: Sent at:1340749612155

In this example execution the scheduler creates 2 messages which are handled by the two different worker dynos (worker.1 and worker.2). This shows that the work is being scheduled and distributed correctly.

Further learning

This example application just shows the basics for architecting a scalable and reliable system for scheduling and processing background jobs. To learn more see:

 

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
26天前
|
消息中间件 存储 Java
RocketMQ文件刷盘机制深度解析与Java模拟实现
【11月更文挑战第22天】在现代分布式系统中,消息队列(Message Queue, MQ)作为一种重要的中间件,扮演着连接不同服务、实现异步通信和消息解耦的关键角色。Apache RocketMQ作为一款高性能的分布式消息中间件,广泛应用于实时数据流处理、日志流处理等场景。为了保证消息的可靠性,RocketMQ引入了一种称为“刷盘”的机制,将消息从内存写入到磁盘中,确保消息持久化。本文将从底层原理、业务场景、概念、功能点等方面深入解析RocketMQ的文件刷盘机制,并使用Java模拟实现类似的功能。
40 3
|
2月前
|
存储 消息中间件 安全
JUC组件实战:实现RRPC(Java与硬件通过MQTT的同步通信)
【10月更文挑战第9天】本文介绍了如何利用JUC组件实现Java服务与硬件通过MQTT的同步通信(RRPC)。通过模拟MQTT通信流程,使用`LinkedBlockingQueue`作为消息队列,详细讲解了消息发送、接收及响应的同步处理机制,包括任务超时处理和内存泄漏的预防措施。文中还提供了具体的类设计和方法实现,帮助理解同步通信的内部工作原理。
JUC组件实战:实现RRPC(Java与硬件通过MQTT的同步通信)
|
2月前
|
消息中间件 存储 JSON
rabbitmq基础教程(ui,java,springamqp)
本文提供了RabbitMQ的基础教程,包括如何使用UI创建队列和交换机、Java代码操作RabbitMQ、Spring AMQP进行消息发送和接收,以及如何使用不同的交换机类型(fanout、direct、topic)进行消息路由。
32 0
rabbitmq基础教程(ui,java,springamqp)
|
2月前
|
消息中间件 前端开发 Java
java高并发场景RabbitMQ的使用
java高并发场景RabbitMQ的使用
113 0
|
3月前
|
消息中间件 缓存 Java
RocketMQ的JAVA落地实战
RocketMQ作为一款高性能、高可靠、高实时、分布式特点的消息中间件,其核心作用主要体现在异步处理、削峰填谷以及系统解耦三个方面。
185 0
|
4月前
|
Java
MQTT(EMQX) - Java 调用 MQTT Demo 代码
MQTT(EMQX) - Java 调用 MQTT Demo 代码
180 0
MQTT(EMQX) - Java 调用 MQTT Demo 代码
|
5月前
|
消息中间件 Java Maven
如何在Java中使用RabbitMQ
如何在Java中使用RabbitMQ
|
6月前
|
消息中间件 Java
Java一分钟之-RabbitMQ:AMQP协议实现
【6月更文挑战第11天】RabbitMQ是基于AMQP协议的开源消息队列服务,支持多种消息模式。本文介绍了RabbitMQ的核心概念:生产者、消费者、交换器、队列和绑定,以及常见问题和解决方案。例如,通过设置消息持久化和确认机制防止消息丢失,配置死信队列处理不可消费消息,以及妥善管理资源防止泄漏。还提供了Java代码示例,帮助读者理解和使用RabbitMQ。通过理解这些基础和最佳实践,可以提升RabbitMQ在分布式系统中的可靠性和效率。
140 0
Java一分钟之-RabbitMQ:AMQP协议实现
|
5月前
|
消息中间件 负载均衡 Java
JAVA面试之MQ
JAVA面试之MQ
73 0
|
5月前
|
消息中间件 Java Maven
如何在Java中使用RabbitMQ
如何在Java中使用RabbitMQ