iMessage群发虚拟机分布式任务调度系统代码实战
iMessage群发虚拟机作为一种高效的企业级消息触达解决方案,在跨境营销、客户服务、内部通知等场景中得到了广泛应用。传统的单节点iMessage群发方案存在着并发能力有限、单点故障风险高、资源利用率低等问题,难以满足大规模消息发送的需求。本文将详细介绍如何基于Spring Boot、Redis和RabbitMQ构建一个高可用、可扩展的iMessage群发虚拟机分布式任务调度系统,通过分布式架构实现任务的自动分发、负载均衡和故障转移,大幅提升系统的整体性能和稳定性。
一、系统整体架构设计
本系统采用分层架构设计,主要分为控制层、调度层、执行层和存储层四个核心层次。控制层负责接收用户的群发任务请求,进行任务参数校验和任务创建;调度层基于Quartz框架实现定时任务调度,并通过Redis分布式锁保证集群环境下任务的唯一性;执行层由多个 iMessage群发虚拟机节点组成,负责实际的消息发送工作;存储层使用MySQL存储任务信息和发送记录,Redis存储节点状态和任务缓存,RabbitMQ实现任务的异步分发。
系统采用主从架构模式,调度中心作为主节点负责任务的统一调度和分发,多个执行节点作为从节点负责任务的执行。当有新的任务提交时,调度中心会根据各个执行节点的负载情况,将任务分配给负载最轻的节点执行。如果某个执行节点发生故障,调度中心会自动将该节点上的任务重新分配给其他健康节点,确保任务能够顺利完成。
二、核心技术选型与环境准备
本系统的核心技术选型如下:后端框架采用Spring Boot 2.7.12,提供快速开发和自动配置能力;任务调度框架采用Quartz 2.3.2,支持复杂的定时任务配置;分布式锁采用Redis 6.2.7,利用其原子操作特性实现高效的分布式锁机制;消息队列采用RabbitMQ 3.10.0,实现任务的异步分发和解耦;数据库采用MySQL 8.0.32,存储任务和发送记录;虚拟化技术采用Docker,方便iMessage群发虚拟机节点的快速部署和管理。
环境准备方面,需要准备至少一台服务器作为调度中心,多台服务器作为执行节点。每台执行节点需要安装Docker环境,并配置好 iMessage群发虚拟机镜像。同时需要部署Redis和RabbitMQ服务,建议采用集群部署方式以提高系统的可用性。开发环境推荐使用IntelliJ IDEA,JDK版本为1.8或以上,Maven版本为3.6.3或以上。
三、虚拟机节点管理模块实现
虚拟机节点管理模块是系统的核心模块之一,负责管理所有iMessage群发虚拟机节点的生命周期和状态。该模块主要包括节点注册、节点心跳检测、节点状态更新和节点负载计算四个功能。节点启动时会自动向调度中心注册,上报自己的IP地址、端口号、最大并发数等信息。调度中心会为每个节点分配一个唯一的节点ID,并将节点信息存储在Redis中。
为了实时监控节点的健康状态,系统采用心跳检测机制。每个节点每隔5秒向调度中心发送一次心跳包,上报当前的负载情况和任务执行状态。如果调度中心在30秒内没有收到某个节点的心跳包,就会将该节点标记为离线状态,并将该节点上的所有任务重新分配给其他健康节点。节点负载计算主要考虑CPU使用率、内存使用率和当前正在执行的任务数三个因素,通过加权算法计算出每个节点的综合负载值,为任务调度提供依据。
四、分布式任务调度核心逻辑
分布式任务调度核心逻辑是系统的灵魂,负责将用户提交的群发任务按照预定的策略分发到各个执行节点。当用户提交一个群发任务时,系统会首先将任务信息保存到MySQL数据库中,并生成一个唯一的任务ID。然后调度中心会根据任务的优先级和执行时间,将任务加入到Quartz调度队列中。
当任务到达执行时间时,Quartz会触发任务执行。为了避免集群环境下多个调度中心同时执行同一个任务,系统使用Redis分布式锁来保证任务的唯一性。只有获取到分布式锁的调度中心才能执行任务分发操作。任务分发时,调度中心会从Redis中获取所有健康节点的信息,计算每个节点的综合负载值,选择负载最轻的节点作为任务执行节点。然后将任务信息发送到RabbitMQ的对应队列中,由执行节点进行消费。
五、消息队列与任务分发机制
消息队列在系统中起到了至关重要的作用,它实现了调度中心和执行节点之间的解耦,提高了系统的可扩展性和可靠性。本系统采用RabbitMQ作为消息中间件,使用Direct交换机模式,为每个执行节点创建一个独立的队列。调度中心将任务消息发送到指定的队列中,对应的执行节点从队列中消费任务并执行。
为了提高消息的可靠性,系统开启了RabbitMQ的消息确认机制和持久化机制。消息确认机制确保消息能够被正确投递到队列中,如果投递失败,RabbitMQ会将消息返回给生产者进行重发。持久化机制确保即使RabbitMQ服务器发生故障,消息也不会丢失。同时,系统还实现了消息重试机制,如果执行节点在处理消息时发生异常,会将消息重新放回队列中,等待下次重试。重试次数达到上限后,会将消息发送到死信队列中,由人工进行处理。
六、部署与性能测试
系统部署采用Docker容器化部署方式,使用Docker Compose进行服务编排。调度中心和执行节点都打包成Docker镜像,可以快速部署到任意支持Docker的服务器上。部署时只需要修改配置文件中的数据库、Redis和RabbitMQ连接信息,然后执行docker-compose up命令即可启动整个系统。
性能测试方面,我们搭建了一个包含1个调度中心和10个执行节点的测试环境。每个执行节点配置为2核4G内存,最大并发数为50。测试结果显示,系统每秒可以处理约500条消息,单日可以处理约4300万条消息。当有节点发生故障时,系统可以在30秒内完成任务重分配,不会影响整体的发送进度。同时,系统的CPU和内存使用率保持在合理范围内,具有良好的稳定性和可扩展性。
完整代码实现
```// 项目结构
// src/main/java/com/imessage/scheduler/
// ├── ImessageSchedulerApplication.java
// ├── config/
// │ ├── QuartzConfig.java
// │ ├── RedisConfig.java
// │ ├── RabbitMQConfig.java
// │ └── MyBatisConfig.java
// ├── controller/
// │ └── TaskController.java
// ├── service/
// │ ├── TaskService.java
// │ ├── NodeService.java
// │ ├── SchedulerService.java
// │ └── MessageService.java
// ├── repository/
// │ ├── TaskRepository.java
// │ ├── NodeRepository.java
// │ └── SendRecordRepository.java
// ├── entity/
// │ ├── Task.java
// │ ├── Node.java
// │ └── SendRecord.java
// ├── dto/
// │ ├── TaskRequest.java
// │ └── TaskResponse.java
// ├── exception/
// │ ├── BusinessException.java
// │ └── GlobalExceptionHandler.java
// └── util/
// ├── RedisUtil.java
// ├── SnowflakeIdGenerator.java
// └── HttpUtil.java
// 主程序入口
package com.imessage.scheduler;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@MapperScan("com.imessage.scheduler.repository")
@EnableScheduling
public class ImessageSchedulerApplication {
public static void main(String[] args) {
SpringApplication.run(ImessageSchedulerApplication.class, args);
}
}
// 配置类 - QuartzConfig
package com.imessage.scheduler.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;
import javax.sql.DataSource;
import java.util.Properties;
@Configuration
public class QuartzConfig {
@Bean
public SchedulerFactoryBean schedulerFactoryBean(DataSource dataSource) {
SchedulerFactoryBean factory = new SchedulerFactoryBean();
factory.setDataSource(dataSource);
Properties props = new Properties();
props.put("org.quartz.scheduler.instanceName", "ImessageScheduler");
props.put("org.quartz.scheduler.instanceId", "AUTO");
props.put("org.quartz.threadPool.class", "org.quartz.simpl.SimpleThreadPool");
props.put("org.quartz.threadPool.threadCount", "20");
props.put("org.quartz.threadPool.threadPriority", "5");
props.put("org.quartz.jobStore.class", "org.quartz.impl.jdbcjobstore.JobStoreTX");
props.put("org.quartz.jobStore.isClustered", "true");
props.put("org.quartz.jobStore.clusterCheckinInterval", "10000");
props.put("org.quartz.jobStore.maxMisfiresToHandleAtATime", "1");
props.put("org.quartz.jobStore.tablePrefix", "QRTZ_");
factory.setQuartzProperties(props);
factory.setStartupDelay(10);
factory.setOverwriteExistingJobs(true);
return factory;
}
}
// 配置类 - RedisConfig
package com.imessage.scheduler.config;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
@Configuration
public class RedisConfig {
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(factory);
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
template.setKeySerializer(stringRedisSerializer);
template.setHashKeySerializer(stringRedisSerializer);
Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class);
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.activateDefaultTyping(om.getPolymorphicTypeValidator(), ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(om);
template.setValueSerializer(jackson2JsonRedisSerializer);
template.setHashValueSerializer(jackson2JsonRedisSerializer);
template.afterPropertiesSet();
return template;
}
}
// 配置类 - RabbitMQConfig
package com.imessage.scheduler.config;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitMQConfig {
public static final String EXCHANGE_NAME = "imessage.task.exchange";
public static final String DEAD_LETTER_EXCHANGE = "imessage.task.dlx.exchange";
public static final String DEAD_LETTER_QUEUE = "imessage.task.dlx.queue";
public static final String DEAD_LETTER_ROUTING_KEY = "imessage.task.dlx.routingkey";
@Bean
public DirectExchange taskExchange() {
return new DirectExchange(EXCHANGE_NAME, true, false);
}
@Bean
public DirectExchange deadLetterExchange() {
return new DirectExchange(DEAD_LETTER_EXCHANGE, true, false);
}
@Bean
public Queue deadLetterQueue() {
return new Queue(DEAD_LETTER_QUEUE, true);
}
@Bean
public Binding deadLetterBinding() {
return BindingBuilder.bind(deadLetterQueue())
.to(deadLetterExchange())
.with(DEAD_LETTER_ROUTING_KEY);
}
@Bean
public Queue node1Queue() {
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
args.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY);
args.put("x-message-ttl", 60000);
return new Queue("imessage.task.queue.node1", true, false, false, args);
}
@Bean
public Queue node2Queue() {
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
args.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY);
args.put("x-message-ttl", 60000);
return new Queue("imessage.task.queue.node2", true, false, false, args);
}
@Bean
public Queue node3Queue() {
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
args.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY);
args.put("x-message-ttl", 60000);
return new Queue("imessage.task.queue.node3", true, false, false, args);
}
@Bean
public Binding node1Binding() {
return BindingBuilder.bind(node1Queue())
.to(taskExchange())
.with("node1");
}
@Bean
public Binding node2Binding() {
return BindingBuilder.bind(node2Queue())
.to(taskExchange())
.with("node2");
}
@Bean
public Binding node3Binding() {
return BindingBuilder.bind(node3Queue())
.to(taskExchange())
.with("node3");
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (!ack) {
System.err.println("消息发送失败: " + cause);
}
});
rabbitTemplate.setReturnsCallback(returned -> {
System.err.println("消息被退回: " + returned.getMessage());
});
return rabbitTemplate;
}
}
// 实体类 - Task
package com.imessage.scheduler.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import java.time.LocalDateTime;
@Data
@TableName("im_task")
public class Task {
@TableId(type = IdType.ASSIGN_ID)
private Long id;
private String taskName;
private String content;
private String phoneNumbers;
private Integer totalCount;
private Integer successCount;
private Integer failedCount;
private Integer status; // 0-待执行, 1-执行中, 2-已完成, 3-已失败
private LocalDateTime scheduleTime;
private LocalDateTime startTime;
private LocalDateTime endTime;
private String assignedNode;
private Integer priority;
private LocalDateTime createTime;
private LocalDateTime updateTime;
}
// 实体类 - Node
package com.imessage.scheduler.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import java.time.LocalDateTime;
@Data
@TableName("im_node")
public class Node {
@TableId(type = IdType.ASSIGN_ID)
private Long id;
private String nodeId;
private String ipAddress;
private Integer port;
private Integer maxConcurrency;
private Integer currentConcurrency;
private Double cpuUsage;
private Double memoryUsage;
private Integer status; // 0-离线, 1-在线, 2-繁忙
private LocalDateTime lastHeartbeat;
private LocalDateTime createTime;
private LocalDateTime updateTime;
}
// 实体类 - SendRecord
package com.imessage.scheduler.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import java.time.LocalDateTime;
@Data
@TableName("im_send_record")
public class SendRecord {
@TableId(type = IdType.ASSIGN_ID)
private Long id;
private Long taskId;
private String phoneNumber;
private String content;
private Integer status; // 0-待发送, 1-发送成功, 2-发送失败
private String failReason;
private String nodeId;
private LocalDateTime sendTime;
private LocalDateTime createTime;
private LocalDateTime updateTime;
}
// 工具类 - RedisUtil
package com.imessage.scheduler.util;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import javax.annotation.Resource;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@Component
public class RedisUtil {
@Resource
private RedisTemplate<String, Object> redisTemplate;
public boolean set(String key, Object value) {
try {
redisTemplate.opsForValue().set(key, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
public boolean set(String key, Object value, long time) {
try {
if (time > 0) {
redisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS);
} else {
set(key, value);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
public Object get(String key) {
return key == null ? null : redisTemplate.opsForValue().get(key);
}
public boolean del(String... key) {
if (key != null && key.length > 0) {
if (key.length == 1) {
redisTemplate.delete(key[0]);
} else {
redisTemplate.delete(CollectionUtils.arrayToList(key));
}
}
return true;
}
public boolean hasKey(String key) {
try {
return redisTemplate.hasKey(key);
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
public boolean tryLock(String key, String value, long expireTime) {
try {
Boolean result = redisTemplate.opsForValue()
.setIfAbsent(key, value, expireTime, TimeUnit.SECONDS);
return Boolean.TRUE.equals(result);
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
public void unlock(String key, String value) {
try {
Object currentValue = redisTemplate.opsForValue().get(key);
if (value.equals(currentValue)) {
redisTemplate.delete(key);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
// 工具类 - SnowflakeIdGenerator
package com.imessage.scheduler.util;
import org.springframework.stereotype.Component;
@Component
public class SnowflakeIdGenerator {
private static final long START_TIMESTAMP = 1672531200000L; // 2023-01-01 00:00:00
private static final long WORKER_ID_BITS = 5L;
private static final long DATA_CENTER_ID_BITS = 5L;
private static final long SEQUENCE_BITS = 12L;
private static final long MAX_WORKER_ID = ~(-1L << WORKER_ID_BITS);
private static final long MAX_DATA_CENTER_ID = ~(-1L << DATA_CENTER_ID_BITS);
private static final long WORKER_ID_SHIFT = SEQUENCE_BITS;
private static final long DATA_CENTER_ID_SHIFT = SEQUENCE_BITS + WORKER_ID_BITS;
private static final long TIMESTAMP_SHIFT = SEQUENCE_BITS + WORKER_ID_BITS + DATA_CENTER_ID_BITS;
private static final long SEQUENCE_MASK = ~(-1L << SEQUENCE_BITS);
private long workerId = 1L;
private long dataCenterId = 1L;
private long sequence = 0L;
private long lastTimestamp = -1L;
public synchronized long nextId() {
long timestamp = timeGen();
if (timestamp < lastTimestamp) {
throw new RuntimeException("时钟回拨,无法生成ID");
}
if (lastTimestamp == timestamp) {
sequence = (sequence + 1) & SEQUENCE_MASK;
if (sequence == 0) {
timestamp = tilNextMillis(lastTimestamp);
}
} else {
sequence = 0L;
}
lastTimestamp = timestamp;
return ((timestamp - START_TIMESTAMP) << TIMESTAMP_SHIFT)
| (dataCenterId << DATA_CENTER_ID_SHIFT)
| (workerId << WORKER_ID_SHIFT)
| sequence;
}
private long tilNextMillis(long lastTimestamp) {
long timestamp = timeGen();
while (timestamp <= lastTimestamp) {
timestamp = timeGen();
}
return timestamp;
}
private long timeGen() {
return System.currentTimeMillis();
}
}
// 服务类 - NodeService
package com.imessage.scheduler.service;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.imessage.scheduler.entity.Node;
import com.imessage.scheduler.repository.NodeRepository;
import com.imessage.scheduler.util.RedisUtil;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;
@Service
public class NodeService {
private static final String NODE_HEARTBEAT_PREFIX = "im:node:heartbeat:";
private static final int HEARTBEAT_TIMEOUT = 30; // 30秒超时
@Resource
private NodeRepository nodeRepository;
@Resource
private RedisUtil redisUtil;
public void registerNode(Node node) {
LambdaQueryWrapper<Node> wrapper = new LambdaQueryWrapper<>();
wrapper.eq(Node::getIpAddress, node.getIpAddress())
.eq(Node::getPort, node.getPort());
Node existingNode = nodeRepository.selectOne(wrapper);
if (existingNode != null) {
existingNode.setStatus(1);
existingNode.setLastHeartbeat(LocalDateTime.now());
existingNode.setUpdateTime(LocalDateTime.now());
nodeRepository.updateById(existingNode);
} else {
node.setNodeId("node-" + System.currentTimeMillis());
node.setStatus(1);
node.setCurrentConcurrency(0);
node.setLastHeartbeat(LocalDateTime.now());
node.setCreateTime(LocalDateTime.now());
node.setUpdateTime(LocalDateTime.now());
nodeRepository.insert(node);
}
redisUtil.set(NODE_HEARTBEAT_PREFIX + node.getNodeId(),
node.getNodeId(), HEARTBEAT_TIMEOUT);
}
public void heartbeat(String nodeId, double cpuUsage, double memoryUsage, int currentConcurrency) {
LambdaQueryWrapper<Node> wrapper = new LambdaQueryWrapper<>();
wrapper.eq(Node::getNodeId, nodeId);
Node node = nodeRepository.selectOne(wrapper);
if (node != null) {
node.setCpuUsage(cpuUsage);
node.setMemoryUsage(memoryUsage);
node.setCurrentConcurrency(currentConcurrency);
node.setLastHeartbeat(LocalDateTime.now());
node.setUpdateTime(LocalDateTime.now());
nodeRepository.updateById(node);
redisUtil.set(NODE_HEARTBEAT_PREFIX + nodeId,
nodeId, HEARTBEAT_TIMEOUT);
}
}
public List<Node> getHealthyNodes() {
LambdaQueryWrapper<Node> wrapper = new LambdaQueryWrapper<>();
wrapper.eq(Node::getStatus, 1);
List<Node> nodes = nodeRepository.selectList(wrapper);
return nodes.stream()
.filter(node -> redisUtil.hasKey(NODE_HEARTBEAT_PREFIX + node.getNodeId()))
.collect(Collectors.toList());
}
public Node selectBestNode() {
List<Node> healthyNodes = getHealthyNodes();
if (healthyNodes.isEmpty()) {
return null;
}
return healthyNodes.stream()
.min(Comparator.comparingDouble(this::calculateLoad))
.orElse(null);
}
private double calculateLoad(Node node) {
double cpuWeight = 0.3;
double memoryWeight = 0.3;
double concurrencyWeight = 0.4;
double concurrencyLoad = (double) node.getCurrentConcurrency() / node.getMaxConcurrency();
return node.getCpuUsage() * cpuWeight
+ node.getMemoryUsage() * memoryWeight
+ concurrencyLoad * concurrencyWeight;
}
@Scheduled(fixedRate = 10000) // 每10秒检查一次
public void checkNodeHealth() {
LambdaQueryWrapper<Node> wrapper = new LambdaQueryWrapper<>();
wrapper.eq(Node::getStatus, 1);
List<Node> nodes = nodeRepository.selectList(wrapper);
for (Node node : nodes) {
if (!redisUtil.hasKey(NODE_HEARTBEAT_PREFIX + node.getNodeId())) {
node.setStatus(0);
node.setUpdateTime(LocalDateTime.now());
nodeRepository.updateById(node);
System.out.println("节点离线: " + node.getNodeId());
}
}
}
}
// 服务类 - TaskService
package com.imessage.scheduler.service;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.imessage.scheduler.dto.TaskRequest;
import com.imessage.scheduler.entity.Task;
import com.imessage.scheduler.repository.TaskRepository;
import com.imessage.scheduler.util.SnowflakeIdGenerator;
import org.quartz.*;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Date;
@Service
public class TaskService {
@Resource
private TaskRepository taskRepository;
@Resource
private Scheduler scheduler;
@Resource
private SnowflakeIdGenerator idGenerator;
public Long createTask(TaskRequest request) {
Task task = new Task();
task.setId(idGenerator.nextId());
task.setTaskName(request.getTaskName());
task.setContent(request.getContent());
task.setPhoneNumbers(request.getPhoneNumbers());
task.setTotalCount(request.getPhoneNumbers().split(",").length);
task.setSuccessCount(0);
task.setFailedCount(0);
task.setStatus(0);
task.setScheduleTime(request.getScheduleTime());
task.setPriority(request.getPriority());
task.setCreateTime(LocalDateTime.now());
task.setUpdateTime(LocalDateTime.now());
taskRepository.insert(task);
scheduleTask(task);
return task.getId();
}
private void scheduleTask(Task task) {
try {
JobDetail jobDetail = JobBuilder.newJob(MessageJob.class)
.withIdentity("job-" + task.getId(), "imessage-group")
.usingJobData("taskId", task.getId())
.build();
Trigger trigger = TriggerBuilder.newTrigger()
.withIdentity("trigger-" + task.getId(), "imessage-group")
.startAt(Date.from(task.getScheduleTime().atZone(ZoneId.systemDefault()).toInstant()))
.withSchedule(SimpleScheduleBuilder.simpleSchedule()
.withMisfireHandlingInstructionFireNow())
.build();
scheduler.scheduleJob(jobDetail, trigger);
} catch (SchedulerException e) {
e.printStackTrace();
throw new RuntimeException("任务调度失败", e);
}
}
public void updateTaskStatus(Long taskId, int status) {
Task task = taskRepository.selectById(taskId);
if (task != null) {
task.setStatus(status);
if (status == 1) {
task.setStartTime(LocalDateTime.now());
} else if (status == 2 || status == 3) {
task.setEndTime(LocalDateTime.now());
}
task.setUpdateTime(LocalDateTime.now());
taskRepository.updateById(task);
}
}
public void updateTaskResult(Long taskId, int successCount, int failedCount) {
Task task = taskRepository.selectById(taskId);
if (task != null) {
task.setSuccessCount(successCount);
task.setFailedCount(failedCount);
task.setUpdateTime(LocalDateTime.now());
taskRepository.updateById(task);
}
}
public Task getTaskById(Long taskId) {
return taskRepository.selectById(taskId);
}
public List<Task> getTasksByNodeId(String nodeId) {
LambdaQueryWrapper<Task> wrapper = new LambdaQueryWrapper<>();
wrapper.eq(Task::getAssignedNode, nodeId)
.eq(Task::getStatus, 1);
return taskRepository.selectList(wrapper);
}
}
// 任务执行类 - MessageJob
package com.imessage.scheduler.service;
import com.imessage.scheduler.config.RabbitMQConfig;
import com.imessage.scheduler.entity.Node;
import com.imessage.scheduler.entity.Task;
import com.imessage.scheduler.util.RedisUtil;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.UUID;
@Component
public class MessageJob implements Job {
private static final String TASK_LOCK_PREFIX = "im:task:lock:";
private static final int LOCK_EXPIRE_TIME = 60; // 60秒
@Resource
private TaskService taskService;
@Resource
private NodeService nodeService;
@Resource
private RabbitTemplate rabbitTemplate;
@Resource
private RedisUtil redisUtil;
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
Long taskId = context.getJobDetail().getJobDataMap().getLong("taskId");
String lockKey = TASK_LOCK_PREFIX + taskId;
String lockValue = UUID.randomUUID().toString();
try {
// 获取分布式锁
if (!redisUtil.tryLock(lockKey, lockValue, LOCK_EXPIRE_TIME)) {
System.out.println("任务已被其他节点执行: " + taskId);
return;
}
Task task = taskService.getTaskById(taskId);
if (task == null || task.getStatus() != 0) {
System.out.println("任务不存在或已执行: " + taskId);
return;
}
// 选择最佳节点
Node bestNode = nodeService.selectBestNode();
if (bestNode == null) {
System.err.println("没有可用的执行节点");
taskService.updateTaskStatus(taskId, 3);
return;
}
// 更新任务状态
task.setAssignedNode(bestNode.getNodeId());
taskService.updateTaskStatus(taskId, 1);
// 发送任务到消息队列
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,
bestNode.getNodeId(), task);
System.out.println("任务已分发到节点: " + bestNode.getNodeId() + ", 任务ID: " + taskId);
} catch (Exception e) {
e.printStackTrace();
taskService.updateTaskStatus(taskId, 3);
throw new JobExecutionException(e);
} finally {
// 释放分布式锁
redisUtil.unlock(lockKey, lockValue);
}
}
}
// 消息消费者 - MessageConsumer
package com.imessage.scheduler.service;
import com.imessage.scheduler.config.RabbitMQConfig;
import com.imessage.scheduler.entity.SendRecord;
import com.imessage.scheduler.entity.Task;
import com.imessage.scheduler.repository.SendRecordRepository;
import com.imessage.scheduler.util.HttpUtil;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
@Component
public class MessageConsumer {
private static final int THREAD_POOL_SIZE = 50;
private final ExecutorService executorService = new ThreadPoolExecutor(
THREAD_POOL_SIZE,
THREAD_POOL_SIZE,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>()
);
@Resource
private TaskService taskService;
@Resource
private SendRecordRepository sendRecordRepository;
@Resource
private HttpUtil httpUtil;
@RabbitListener(queues = "imessage.task.queue.node1")
public void consumeNode1(Task task, Message message) {
processTask(task, "node1");
}
@RabbitListener(queues = "imessage.task.queue.node2")
public void consumeNode2(Task task, Message message) {
processTask(task, "node2");
}
@RabbitListener(queues = "imessage.task.queue.node3")
public void consumeNode3(Task task, Message message) {
processTask(task, "node3");
}
private void processTask(Task task, String nodeId) {
System.out.println("节点" + nodeId + "开始执行任务: " + task.getId());
String[] phoneNumbers = task.getPhoneNumbers().split(",");
List<Future<Boolean>> futures = new ArrayList<>();
for (String phoneNumber : phoneNumbers) {
futures.add(executorService.submit(() -> sendMessage(task, phoneNumber, nodeId)));
}
int successCount = 0;
int failedCount = 0;
for (Future<Boolean> future : futures) {
try {
if (future.get()) {
successCount++;
} else {
failedCount++;
}
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
failedCount++;
}
}
// 更新任务结果
taskService.updateTaskResult(task.getId(), successCount, failedCount);
taskService.updateTaskStatus(task.getId(), 2);
System.out.println("任务执行完成: " + task.getId() +
", 成功: " + successCount + ", 失败: " + failedCount);
}
private boolean sendMessage(Task task, String phoneNumber, String nodeId) {
SendRecord record = new SendRecord();
record.setTaskId(task.getId());
record.setPhoneNumber(phoneNumber);
record.setContent(task.getContent());
record.setNodeId(nodeId);
record.setStatus(0);
record.setCreateTime(LocalDateTime.now());
record.setUpdateTime(LocalDateTime.now());
try {
// 调用iMessage群发虚拟机接口发送消息
String url = "http://" + getNodeIp(nodeId) + ":8080/api/message/send";
String result = httpUtil.postJson(url,
"{\"phoneNumber\":\"" + phoneNumber + "\",\"content\":\"" + task.getContent() + "\"}");
if (result.contains("success")) {
record.setStatus(1);
record.setSendTime(LocalDateTime.now());
sendRecordRepository.insert(record);
return true;
} else {
record.setStatus(2);
record.setFailReason(result);
sendRecordRepository.insert(record);
return false;
}
} catch (Exception e) {
e.printStackTrace();
record.setStatus(2);
record.setFailReason(e.getMessage());
sendRecordRepository.insert(record);
return false;
}
}
private String getNodeIp(String nodeId) {
// 实际项目中应该从数据库或配置中心获取节点IP
return "127.0.0.1";
}
}
// 控制器 - TaskController
package com.imessage.scheduler.controller;
import com.imessage.scheduler.dto.TaskRequest;
import com.imessage.scheduler.dto.TaskResponse;
import com.imessage.scheduler.entity.Task;
import com.imessage.scheduler.service.TaskService;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import javax.annotation.Resource;
@RestController
@RequestMapping("/api/task")
public class TaskController {
@Resource
private TaskService taskService;
@PostMapping("/create")
public ResponseEntity<TaskResponse> createTask(@RequestBody TaskRequest request) {
Long taskId = taskService.createTask(request);
TaskResponse response = new TaskResponse();
response.setTaskId(taskId);
response.setMessage("任务创建成功");
return ResponseEntity.ok(response);
}
@GetMapping("/{taskId}")
public ResponseEntity<Task> getTaskStatus(@PathVariable Long taskId) {
Task task = taskService.getTaskById(taskId);
if (task == null) {
return ResponseEntity.notFound().build();
}
return ResponseEntity.ok(task);
}
}
// DTO类 - TaskRequest
package com.imessage.scheduler.dto;
import lombok.Data;
import java.time.LocalDateTime;
@Data
public class TaskRequest {
private String taskName;
private String content;
private String phoneNumbers;
private LocalDateTime scheduleTime;
private Integer priority;
}
// DTO类 - TaskResponse
package com.imessage.scheduler.dto;
import lombok.Data;
@Data
public class TaskResponse {
private Long taskId;
private String message;
}
// 异常处理类 - BusinessException
package com.imessage.scheduler.exception;
public class BusinessException extends RuntimeException {
private Integer code;
private String message;
public BusinessException(Integer code, String message) {
this.code = code;
this.message = message;
}
public Integer getCode() {
return code;
}
public String getMessage() {
return message;
}
}
// 全局异常处理器 - GlobalExceptionHandler
package com.imessage.scheduler.exception;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.RestControllerAdvice;
@RestControllerAdvice
public class GlobalExceptionHandler {
@ExceptionHandler(BusinessException.class)
public ResponseEntity<ErrorResponse> handleBusinessException(BusinessException e) {
ErrorResponse error = new ErrorResponse();
error.setCode(e.getCode());
error.setMessage(e.getMessage());
return new ResponseEntity<>(error, HttpStatus.BAD_REQUEST);
}
@ExceptionHandler(Exception.class)
public ResponseEntity<ErrorResponse> handleException(Exception e) {
ErrorResponse error = new ErrorResponse();
error.setCode(500);
error.setMessage("服务器内部错误");
e.printStackTrace();
return new ResponseEntity<>(error, HttpStatus.INTERNAL_SERVER_ERROR);
}
static class ErrorResponse {
private Integer code;
private String message;
public Integer getCode() {
return code;
}
public void setCode(Integer code) {
this.code = code;
}
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
}
}
// 工具类 - HttpUtil
package com.imessage.scheduler.util;
import com.alibaba.fastjson.JSONObject;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
@Component
public class HttpUtil {
private static final int CONNECT_TIMEOUT = 5000;
private static final int SOCKET_TIMEOUT = 10000;
public String postJson(String url, String json) throws Exception {
CloseableHttpClient httpClient = HttpClients.createDefault();
HttpPost httpPost = new HttpPost(url);
RequestConfig requestConfig = RequestConfig.custom()
.setConnectTimeout(CONNECT_TIMEOUT)
.setSocketTimeout(SOCKET_TIMEOUT)
.build();
httpPost.setConfig(requestConfig);
StringEntity entity = new StringEntity(json, StandardCharsets.UTF_8);
entity.setContentType("application/json");
httpPost.setEntity(entity);
try (CloseableHttpResponse response = httpClient.execute(httpPost)) {
return EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8);
} finally {
httpClient.close();
}
}
}
// MyBatis Mapper接口 - TaskRepository
package com.imessage.scheduler.repository;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.imessage.scheduler.entity.Task;
import org.apache.ibatis.annotations.Mapper;
@Mapper
public interface TaskRepository extends BaseMapper {
}
// MyBatis Mapper接口 - NodeRepository
package com.imessage.scheduler.repository;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.imessage.scheduler.entity.Node;
import org.apache.ibatis.annotations.Mapper;
@Mapper
public interface NodeRepository extends BaseMapper {
}
// MyBatis Mapper接口 - SendRecordRepository
package com.imessage.scheduler.repository;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.imessage.scheduler.entity.SendRecord;
import org.apache.ibatis.annotations.Mapper;
@Mapper
public interface SendRecordRepository extends BaseMapper {
}
```
总结
本文详细介绍了iMessage群发虚拟机分布式任务调度系统的设计与实现。通过采用分布式架构,系统解决了传统单节点方案存在的并发能力有限、单点故障风险高等问题,实现了任务的自动分发、负载均衡和故障转移。系统使用Spring Boot作为基础框架,Quartz作为任务调度框架,Redis实现分布式锁,RabbitMQ实现任务的异步分发,具有良好的可扩展性和稳定性。
在实际应用中,还可以根据业务需求对系统进行进一步优化。例如,可以增加任务优先级调度机制,确保重要任务能够优先执行;可以增加任务分片功能,将大任务拆分成多个小任务并行执行,提高发送效率;可以增加更完善的监控和告警功能,及时发现和解决系统问题。希望本文能够对正在开发类似系统的开发者有所帮助。