iMessage群发虚拟机分布式任务调度系统:Spring Boot完整代码实战

简介: iMessage群发虚拟机作为一种高效的企业级消息触达解决方案,在跨境营销、客户服务、内部通知等场景中得到了广泛应用。传统的单节点iMessage群发方案存在着并发能力有限、单点故障风险高、资源利用率低等问题,难以满足大规模消息发送的需求。本文将详细介绍如何基于Spring Boot、Redis和RabbitMQ构建一个高可用、可扩展的iMessage群发虚拟机分布式任务调度系统,通过分布式架构实现任务的自动分发、负载均衡和故障转移,大幅提升系统的整体性能和稳定性。

iMessage群发虚拟机分布式任务调度系统代码实战

iMessage群发虚拟机作为一种高效的企业级消息触达解决方案,在跨境营销、客户服务、内部通知等场景中得到了广泛应用。传统的单节点iMessage群发方案存在着并发能力有限、单点故障风险高、资源利用率低等问题,难以满足大规模消息发送的需求。本文将详细介绍如何基于Spring Boot、Redis和RabbitMQ构建一个高可用、可扩展的iMessage群发虚拟机分布式任务调度系统,通过分布式架构实现任务的自动分发、负载均衡和故障转移,大幅提升系统的整体性能和稳定性。

一、系统整体架构设计

本系统采用分层架构设计,主要分为控制层、调度层、执行层和存储层四个核心层次。控制层负责接收用户的群发任务请求,进行任务参数校验和任务创建;调度层基于Quartz框架实现定时任务调度,并通过Redis分布式锁保证集群环境下任务的唯一性;执行层由多个 iMessage群发虚拟机节点组成,负责实际的消息发送工作;存储层使用MySQL存储任务信息和发送记录,Redis存储节点状态和任务缓存,RabbitMQ实现任务的异步分发。
系统采用主从架构模式,调度中心作为主节点负责任务的统一调度和分发,多个执行节点作为从节点负责任务的执行。当有新的任务提交时,调度中心会根据各个执行节点的负载情况,将任务分配给负载最轻的节点执行。如果某个执行节点发生故障,调度中心会自动将该节点上的任务重新分配给其他健康节点,确保任务能够顺利完成。

二、核心技术选型与环境准备

本系统的核心技术选型如下:后端框架采用Spring Boot 2.7.12,提供快速开发和自动配置能力;任务调度框架采用Quartz 2.3.2,支持复杂的定时任务配置;分布式锁采用Redis 6.2.7,利用其原子操作特性实现高效的分布式锁机制;消息队列采用RabbitMQ 3.10.0,实现任务的异步分发和解耦;数据库采用MySQL 8.0.32,存储任务和发送记录;虚拟化技术采用Docker,方便iMessage群发虚拟机节点的快速部署和管理。
环境准备方面,需要准备至少一台服务器作为调度中心,多台服务器作为执行节点。每台执行节点需要安装Docker环境,并配置好 iMessage群发虚拟机镜像。同时需要部署Redis和RabbitMQ服务,建议采用集群部署方式以提高系统的可用性。开发环境推荐使用IntelliJ IDEA,JDK版本为1.8或以上,Maven版本为3.6.3或以上。

三、虚拟机节点管理模块实现

虚拟机节点管理模块是系统的核心模块之一,负责管理所有iMessage群发虚拟机节点的生命周期和状态。该模块主要包括节点注册、节点心跳检测、节点状态更新和节点负载计算四个功能。节点启动时会自动向调度中心注册,上报自己的IP地址、端口号、最大并发数等信息。调度中心会为每个节点分配一个唯一的节点ID,并将节点信息存储在Redis中。
为了实时监控节点的健康状态,系统采用心跳检测机制。每个节点每隔5秒向调度中心发送一次心跳包,上报当前的负载情况和任务执行状态。如果调度中心在30秒内没有收到某个节点的心跳包,就会将该节点标记为离线状态,并将该节点上的所有任务重新分配给其他健康节点。节点负载计算主要考虑CPU使用率、内存使用率和当前正在执行的任务数三个因素,通过加权算法计算出每个节点的综合负载值,为任务调度提供依据。

四、分布式任务调度核心逻辑

分布式任务调度核心逻辑是系统的灵魂,负责将用户提交的群发任务按照预定的策略分发到各个执行节点。当用户提交一个群发任务时,系统会首先将任务信息保存到MySQL数据库中,并生成一个唯一的任务ID。然后调度中心会根据任务的优先级和执行时间,将任务加入到Quartz调度队列中。
当任务到达执行时间时,Quartz会触发任务执行。为了避免集群环境下多个调度中心同时执行同一个任务,系统使用Redis分布式锁来保证任务的唯一性。只有获取到分布式锁的调度中心才能执行任务分发操作。任务分发时,调度中心会从Redis中获取所有健康节点的信息,计算每个节点的综合负载值,选择负载最轻的节点作为任务执行节点。然后将任务信息发送到RabbitMQ的对应队列中,由执行节点进行消费。

五、消息队列与任务分发机制

消息队列在系统中起到了至关重要的作用,它实现了调度中心和执行节点之间的解耦,提高了系统的可扩展性和可靠性。本系统采用RabbitMQ作为消息中间件,使用Direct交换机模式,为每个执行节点创建一个独立的队列。调度中心将任务消息发送到指定的队列中,对应的执行节点从队列中消费任务并执行。
为了提高消息的可靠性,系统开启了RabbitMQ的消息确认机制和持久化机制。消息确认机制确保消息能够被正确投递到队列中,如果投递失败,RabbitMQ会将消息返回给生产者进行重发。持久化机制确保即使RabbitMQ服务器发生故障,消息也不会丢失。同时,系统还实现了消息重试机制,如果执行节点在处理消息时发生异常,会将消息重新放回队列中,等待下次重试。重试次数达到上限后,会将消息发送到死信队列中,由人工进行处理。

六、部署与性能测试

系统部署采用Docker容器化部署方式,使用Docker Compose进行服务编排。调度中心和执行节点都打包成Docker镜像,可以快速部署到任意支持Docker的服务器上。部署时只需要修改配置文件中的数据库、Redis和RabbitMQ连接信息,然后执行docker-compose up命令即可启动整个系统。
性能测试方面,我们搭建了一个包含1个调度中心和10个执行节点的测试环境。每个执行节点配置为2核4G内存,最大并发数为50。测试结果显示,系统每秒可以处理约500条消息,单日可以处理约4300万条消息。当有节点发生故障时,系统可以在30秒内完成任务重分配,不会影响整体的发送进度。同时,系统的CPU和内存使用率保持在合理范围内,具有良好的稳定性和可扩展性。
完整代码实现
```// 项目结构
// src/main/java/com/imessage/scheduler/
// ├── ImessageSchedulerApplication.java
// ├── config/
// │ ├── QuartzConfig.java
// │ ├── RedisConfig.java
// │ ├── RabbitMQConfig.java
// │ └── MyBatisConfig.java
// ├── controller/
// │ └── TaskController.java
// ├── service/
// │ ├── TaskService.java
// │ ├── NodeService.java
// │ ├── SchedulerService.java
// │ └── MessageService.java
// ├── repository/
// │ ├── TaskRepository.java
// │ ├── NodeRepository.java
// │ └── SendRecordRepository.java
// ├── entity/
// │ ├── Task.java
// │ ├── Node.java
// │ └── SendRecord.java
// ├── dto/
// │ ├── TaskRequest.java
// │ └── TaskResponse.java
// ├── exception/
// │ ├── BusinessException.java
// │ └── GlobalExceptionHandler.java
// └── util/
// ├── RedisUtil.java
// ├── SnowflakeIdGenerator.java
// └── HttpUtil.java

// 主程序入口
package com.imessage.scheduler;

import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;

@SpringBootApplication
@MapperScan("com.imessage.scheduler.repository")
@EnableScheduling
public class ImessageSchedulerApplication {
public static void main(String[] args) {
SpringApplication.run(ImessageSchedulerApplication.class, args);
}
}

// 配置类 - QuartzConfig
package com.imessage.scheduler.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;

import javax.sql.DataSource;
import java.util.Properties;

@Configuration
public class QuartzConfig {

@Bean
public SchedulerFactoryBean schedulerFactoryBean(DataSource dataSource) {
    SchedulerFactoryBean factory = new SchedulerFactoryBean();
    factory.setDataSource(dataSource);

    Properties props = new Properties();
    props.put("org.quartz.scheduler.instanceName", "ImessageScheduler");
    props.put("org.quartz.scheduler.instanceId", "AUTO");
    props.put("org.quartz.threadPool.class", "org.quartz.simpl.SimpleThreadPool");
    props.put("org.quartz.threadPool.threadCount", "20");
    props.put("org.quartz.threadPool.threadPriority", "5");
    props.put("org.quartz.jobStore.class", "org.quartz.impl.jdbcjobstore.JobStoreTX");
    props.put("org.quartz.jobStore.isClustered", "true");
    props.put("org.quartz.jobStore.clusterCheckinInterval", "10000");
    props.put("org.quartz.jobStore.maxMisfiresToHandleAtATime", "1");
    props.put("org.quartz.jobStore.tablePrefix", "QRTZ_");

    factory.setQuartzProperties(props);
    factory.setStartupDelay(10);
    factory.setOverwriteExistingJobs(true);

    return factory;
}

}

// 配置类 - RedisConfig
package com.imessage.scheduler.config;

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

@Configuration
public class RedisConfig {

@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
    RedisTemplate<String, Object> template = new RedisTemplate<>();
    template.setConnectionFactory(factory);

    StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
    template.setKeySerializer(stringRedisSerializer);
    template.setHashKeySerializer(stringRedisSerializer);

    Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class);
    ObjectMapper om = new ObjectMapper();
    om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
    om.activateDefaultTyping(om.getPolymorphicTypeValidator(), ObjectMapper.DefaultTyping.NON_FINAL);
    jackson2JsonRedisSerializer.setObjectMapper(om);

    template.setValueSerializer(jackson2JsonRedisSerializer);
    template.setHashValueSerializer(jackson2JsonRedisSerializer);
    template.afterPropertiesSet();

    return template;
}

}

// 配置类 - RabbitMQConfig
package com.imessage.scheduler.config;

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class RabbitMQConfig {

public static final String EXCHANGE_NAME = "imessage.task.exchange";
public static final String DEAD_LETTER_EXCHANGE = "imessage.task.dlx.exchange";
public static final String DEAD_LETTER_QUEUE = "imessage.task.dlx.queue";
public static final String DEAD_LETTER_ROUTING_KEY = "imessage.task.dlx.routingkey";

@Bean
public DirectExchange taskExchange() {
    return new DirectExchange(EXCHANGE_NAME, true, false);
}

@Bean
public DirectExchange deadLetterExchange() {
    return new DirectExchange(DEAD_LETTER_EXCHANGE, true, false);
}

@Bean
public Queue deadLetterQueue() {
    return new Queue(DEAD_LETTER_QUEUE, true);
}

@Bean
public Binding deadLetterBinding() {
    return BindingBuilder.bind(deadLetterQueue())
            .to(deadLetterExchange())
            .with(DEAD_LETTER_ROUTING_KEY);
}

@Bean
public Queue node1Queue() {
    Map<String, Object> args = new HashMap<>();
    args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
    args.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY);
    args.put("x-message-ttl", 60000);
    return new Queue("imessage.task.queue.node1", true, false, false, args);
}

@Bean
public Queue node2Queue() {
    Map<String, Object> args = new HashMap<>();
    args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
    args.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY);
    args.put("x-message-ttl", 60000);
    return new Queue("imessage.task.queue.node2", true, false, false, args);
}

@Bean
public Queue node3Queue() {
    Map<String, Object> args = new HashMap<>();
    args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
    args.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY);
    args.put("x-message-ttl", 60000);
    return new Queue("imessage.task.queue.node3", true, false, false, args);
}

@Bean
public Binding node1Binding() {
    return BindingBuilder.bind(node1Queue())
            .to(taskExchange())
            .with("node1");
}

@Bean
public Binding node2Binding() {
    return BindingBuilder.bind(node2Queue())
            .to(taskExchange())
            .with("node2");
}

@Bean
public Binding node3Binding() {
    return BindingBuilder.bind(node3Queue())
            .to(taskExchange())
            .with("node3");
}

@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
    rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
        if (!ack) {
            System.err.println("消息发送失败: " + cause);
        }
    });
    rabbitTemplate.setReturnsCallback(returned -> {
        System.err.println("消息被退回: " + returned.getMessage());
    });
    return rabbitTemplate;
}

}

// 实体类 - Task
package com.imessage.scheduler.entity;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;

import java.time.LocalDateTime;

@Data
@TableName("im_task")
public class Task {
@TableId(type = IdType.ASSIGN_ID)
private Long id;
private String taskName;
private String content;
private String phoneNumbers;
private Integer totalCount;
private Integer successCount;
private Integer failedCount;
private Integer status; // 0-待执行, 1-执行中, 2-已完成, 3-已失败
private LocalDateTime scheduleTime;
private LocalDateTime startTime;
private LocalDateTime endTime;
private String assignedNode;
private Integer priority;
private LocalDateTime createTime;
private LocalDateTime updateTime;
}

// 实体类 - Node
package com.imessage.scheduler.entity;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;

import java.time.LocalDateTime;

@Data
@TableName("im_node")
public class Node {
@TableId(type = IdType.ASSIGN_ID)
private Long id;
private String nodeId;
private String ipAddress;
private Integer port;
private Integer maxConcurrency;
private Integer currentConcurrency;
private Double cpuUsage;
private Double memoryUsage;
private Integer status; // 0-离线, 1-在线, 2-繁忙
private LocalDateTime lastHeartbeat;
private LocalDateTime createTime;
private LocalDateTime updateTime;
}

// 实体类 - SendRecord
package com.imessage.scheduler.entity;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;

import java.time.LocalDateTime;

@Data
@TableName("im_send_record")
public class SendRecord {
@TableId(type = IdType.ASSIGN_ID)
private Long id;
private Long taskId;
private String phoneNumber;
private String content;
private Integer status; // 0-待发送, 1-发送成功, 2-发送失败
private String failReason;
private String nodeId;
private LocalDateTime sendTime;
private LocalDateTime createTime;
private LocalDateTime updateTime;
}

// 工具类 - RedisUtil
package com.imessage.scheduler.util;

import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

import javax.annotation.Resource;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

@Component
public class RedisUtil {

@Resource
private RedisTemplate<String, Object> redisTemplate;

public boolean set(String key, Object value) {
    try {
        redisTemplate.opsForValue().set(key, value);
        return true;
    } catch (Exception e) {
        e.printStackTrace();
        return false;
    }
}

public boolean set(String key, Object value, long time) {
    try {
        if (time > 0) {
            redisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS);
        } else {
            set(key, value);
        }
        return true;
    } catch (Exception e) {
        e.printStackTrace();
        return false;
    }
}

public Object get(String key) {
    return key == null ? null : redisTemplate.opsForValue().get(key);
}

public boolean del(String... key) {
    if (key != null && key.length > 0) {
        if (key.length == 1) {
            redisTemplate.delete(key[0]);
        } else {
            redisTemplate.delete(CollectionUtils.arrayToList(key));
        }
    }
    return true;
}

public boolean hasKey(String key) {
    try {
        return redisTemplate.hasKey(key);
    } catch (Exception e) {
        e.printStackTrace();
        return false;
    }
}

public boolean tryLock(String key, String value, long expireTime) {
    try {
        Boolean result = redisTemplate.opsForValue()
                .setIfAbsent(key, value, expireTime, TimeUnit.SECONDS);
        return Boolean.TRUE.equals(result);
    } catch (Exception e) {
        e.printStackTrace();
        return false;
    }
}

public void unlock(String key, String value) {
    try {
        Object currentValue = redisTemplate.opsForValue().get(key);
        if (value.equals(currentValue)) {
            redisTemplate.delete(key);
        }
    } catch (Exception e) {
        e.printStackTrace();
    }
}

}

// 工具类 - SnowflakeIdGenerator
package com.imessage.scheduler.util;

import org.springframework.stereotype.Component;

@Component
public class SnowflakeIdGenerator {
private static final long START_TIMESTAMP = 1672531200000L; // 2023-01-01 00:00:00
private static final long WORKER_ID_BITS = 5L;
private static final long DATA_CENTER_ID_BITS = 5L;
private static final long SEQUENCE_BITS = 12L;

private static final long MAX_WORKER_ID = ~(-1L << WORKER_ID_BITS);
private static final long MAX_DATA_CENTER_ID = ~(-1L << DATA_CENTER_ID_BITS);

private static final long WORKER_ID_SHIFT = SEQUENCE_BITS;
private static final long DATA_CENTER_ID_SHIFT = SEQUENCE_BITS + WORKER_ID_BITS;
private static final long TIMESTAMP_SHIFT = SEQUENCE_BITS + WORKER_ID_BITS + DATA_CENTER_ID_BITS;

private static final long SEQUENCE_MASK = ~(-1L << SEQUENCE_BITS);

private long workerId = 1L;
private long dataCenterId = 1L;
private long sequence = 0L;
private long lastTimestamp = -1L;

public synchronized long nextId() {
    long timestamp = timeGen();

    if (timestamp < lastTimestamp) {
        throw new RuntimeException("时钟回拨,无法生成ID");
    }

    if (lastTimestamp == timestamp) {
        sequence = (sequence + 1) & SEQUENCE_MASK;
        if (sequence == 0) {
            timestamp = tilNextMillis(lastTimestamp);
        }
    } else {
        sequence = 0L;
    }

    lastTimestamp = timestamp;

    return ((timestamp - START_TIMESTAMP) << TIMESTAMP_SHIFT)
            | (dataCenterId << DATA_CENTER_ID_SHIFT)
            | (workerId << WORKER_ID_SHIFT)
            | sequence;
}

private long tilNextMillis(long lastTimestamp) {
    long timestamp = timeGen();
    while (timestamp <= lastTimestamp) {
        timestamp = timeGen();
    }
    return timestamp;
}

private long timeGen() {
    return System.currentTimeMillis();
}

}

// 服务类 - NodeService
package com.imessage.scheduler.service;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.imessage.scheduler.entity.Node;
import com.imessage.scheduler.repository.NodeRepository;
import com.imessage.scheduler.util.RedisUtil;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;

@Service
public class NodeService {

private static final String NODE_HEARTBEAT_PREFIX = "im:node:heartbeat:";
private static final int HEARTBEAT_TIMEOUT = 30; // 30秒超时

@Resource
private NodeRepository nodeRepository;

@Resource
private RedisUtil redisUtil;

public void registerNode(Node node) {
    LambdaQueryWrapper<Node> wrapper = new LambdaQueryWrapper<>();
    wrapper.eq(Node::getIpAddress, node.getIpAddress())
            .eq(Node::getPort, node.getPort());

    Node existingNode = nodeRepository.selectOne(wrapper);
    if (existingNode != null) {
        existingNode.setStatus(1);
        existingNode.setLastHeartbeat(LocalDateTime.now());
        existingNode.setUpdateTime(LocalDateTime.now());
        nodeRepository.updateById(existingNode);
    } else {
        node.setNodeId("node-" + System.currentTimeMillis());
        node.setStatus(1);
        node.setCurrentConcurrency(0);
        node.setLastHeartbeat(LocalDateTime.now());
        node.setCreateTime(LocalDateTime.now());
        node.setUpdateTime(LocalDateTime.now());
        nodeRepository.insert(node);
    }

    redisUtil.set(NODE_HEARTBEAT_PREFIX + node.getNodeId(), 
            node.getNodeId(), HEARTBEAT_TIMEOUT);
}

public void heartbeat(String nodeId, double cpuUsage, double memoryUsage, int currentConcurrency) {
    LambdaQueryWrapper<Node> wrapper = new LambdaQueryWrapper<>();
    wrapper.eq(Node::getNodeId, nodeId);

    Node node = nodeRepository.selectOne(wrapper);
    if (node != null) {
        node.setCpuUsage(cpuUsage);
        node.setMemoryUsage(memoryUsage);
        node.setCurrentConcurrency(currentConcurrency);
        node.setLastHeartbeat(LocalDateTime.now());
        node.setUpdateTime(LocalDateTime.now());
        nodeRepository.updateById(node);

        redisUtil.set(NODE_HEARTBEAT_PREFIX + nodeId, 
                nodeId, HEARTBEAT_TIMEOUT);
    }
}

public List<Node> getHealthyNodes() {
    LambdaQueryWrapper<Node> wrapper = new LambdaQueryWrapper<>();
    wrapper.eq(Node::getStatus, 1);

    List<Node> nodes = nodeRepository.selectList(wrapper);
    return nodes.stream()
            .filter(node -> redisUtil.hasKey(NODE_HEARTBEAT_PREFIX + node.getNodeId()))
            .collect(Collectors.toList());
}

public Node selectBestNode() {
    List<Node> healthyNodes = getHealthyNodes();
    if (healthyNodes.isEmpty()) {
        return null;
    }

    return healthyNodes.stream()
            .min(Comparator.comparingDouble(this::calculateLoad))
            .orElse(null);
}

private double calculateLoad(Node node) {
    double cpuWeight = 0.3;
    double memoryWeight = 0.3;
    double concurrencyWeight = 0.4;

    double concurrencyLoad = (double) node.getCurrentConcurrency() / node.getMaxConcurrency();

    return node.getCpuUsage() * cpuWeight 
            + node.getMemoryUsage() * memoryWeight 
            + concurrencyLoad * concurrencyWeight;
}

@Scheduled(fixedRate = 10000) // 每10秒检查一次
public void checkNodeHealth() {
    LambdaQueryWrapper<Node> wrapper = new LambdaQueryWrapper<>();
    wrapper.eq(Node::getStatus, 1);

    List<Node> nodes = nodeRepository.selectList(wrapper);
    for (Node node : nodes) {
        if (!redisUtil.hasKey(NODE_HEARTBEAT_PREFIX + node.getNodeId())) {
            node.setStatus(0);
            node.setUpdateTime(LocalDateTime.now());
            nodeRepository.updateById(node);
            System.out.println("节点离线: " + node.getNodeId());
        }
    }
}

}

// 服务类 - TaskService
package com.imessage.scheduler.service;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.imessage.scheduler.dto.TaskRequest;
import com.imessage.scheduler.entity.Task;
import com.imessage.scheduler.repository.TaskRepository;
import com.imessage.scheduler.util.SnowflakeIdGenerator;
import org.quartz.*;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Date;

@Service
public class TaskService {

@Resource
private TaskRepository taskRepository;

@Resource
private Scheduler scheduler;

@Resource
private SnowflakeIdGenerator idGenerator;

public Long createTask(TaskRequest request) {
    Task task = new Task();
    task.setId(idGenerator.nextId());
    task.setTaskName(request.getTaskName());
    task.setContent(request.getContent());
    task.setPhoneNumbers(request.getPhoneNumbers());
    task.setTotalCount(request.getPhoneNumbers().split(",").length);
    task.setSuccessCount(0);
    task.setFailedCount(0);
    task.setStatus(0);
    task.setScheduleTime(request.getScheduleTime());
    task.setPriority(request.getPriority());
    task.setCreateTime(LocalDateTime.now());
    task.setUpdateTime(LocalDateTime.now());

    taskRepository.insert(task);

    scheduleTask(task);

    return task.getId();
}

private void scheduleTask(Task task) {
    try {
        JobDetail jobDetail = JobBuilder.newJob(MessageJob.class)
                .withIdentity("job-" + task.getId(), "imessage-group")
                .usingJobData("taskId", task.getId())
                .build();

        Trigger trigger = TriggerBuilder.newTrigger()
                .withIdentity("trigger-" + task.getId(), "imessage-group")
                .startAt(Date.from(task.getScheduleTime().atZone(ZoneId.systemDefault()).toInstant()))
                .withSchedule(SimpleScheduleBuilder.simpleSchedule()
                        .withMisfireHandlingInstructionFireNow())
                .build();

        scheduler.scheduleJob(jobDetail, trigger);
    } catch (SchedulerException e) {
        e.printStackTrace();
        throw new RuntimeException("任务调度失败", e);
    }
}

public void updateTaskStatus(Long taskId, int status) {
    Task task = taskRepository.selectById(taskId);
    if (task != null) {
        task.setStatus(status);
        if (status == 1) {
            task.setStartTime(LocalDateTime.now());
        } else if (status == 2 || status == 3) {
            task.setEndTime(LocalDateTime.now());
        }
        task.setUpdateTime(LocalDateTime.now());
        taskRepository.updateById(task);
    }
}

public void updateTaskResult(Long taskId, int successCount, int failedCount) {
    Task task = taskRepository.selectById(taskId);
    if (task != null) {
        task.setSuccessCount(successCount);
        task.setFailedCount(failedCount);
        task.setUpdateTime(LocalDateTime.now());
        taskRepository.updateById(task);
    }
}

public Task getTaskById(Long taskId) {
    return taskRepository.selectById(taskId);
}

public List<Task> getTasksByNodeId(String nodeId) {
    LambdaQueryWrapper<Task> wrapper = new LambdaQueryWrapper<>();
    wrapper.eq(Task::getAssignedNode, nodeId)
            .eq(Task::getStatus, 1);
    return taskRepository.selectList(wrapper);
}

}

// 任务执行类 - MessageJob
package com.imessage.scheduler.service;

import com.imessage.scheduler.config.RabbitMQConfig;
import com.imessage.scheduler.entity.Node;
import com.imessage.scheduler.entity.Task;
import com.imessage.scheduler.util.RedisUtil;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.UUID;

@Component
public class MessageJob implements Job {

private static final String TASK_LOCK_PREFIX = "im:task:lock:";
private static final int LOCK_EXPIRE_TIME = 60; // 60秒

@Resource
private TaskService taskService;

@Resource
private NodeService nodeService;

@Resource
private RabbitTemplate rabbitTemplate;

@Resource
private RedisUtil redisUtil;

@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
    Long taskId = context.getJobDetail().getJobDataMap().getLong("taskId");
    String lockKey = TASK_LOCK_PREFIX + taskId;
    String lockValue = UUID.randomUUID().toString();

    try {
        // 获取分布式锁
        if (!redisUtil.tryLock(lockKey, lockValue, LOCK_EXPIRE_TIME)) {
            System.out.println("任务已被其他节点执行: " + taskId);
            return;
        }

        Task task = taskService.getTaskById(taskId);
        if (task == null || task.getStatus() != 0) {
            System.out.println("任务不存在或已执行: " + taskId);
            return;
        }

        // 选择最佳节点
        Node bestNode = nodeService.selectBestNode();
        if (bestNode == null) {
            System.err.println("没有可用的执行节点");
            taskService.updateTaskStatus(taskId, 3);
            return;
        }

        // 更新任务状态
        task.setAssignedNode(bestNode.getNodeId());
        taskService.updateTaskStatus(taskId, 1);

        // 发送任务到消息队列
        rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, 
                bestNode.getNodeId(), task);

        System.out.println("任务已分发到节点: " + bestNode.getNodeId() + ", 任务ID: " + taskId);

    } catch (Exception e) {
        e.printStackTrace();
        taskService.updateTaskStatus(taskId, 3);
        throw new JobExecutionException(e);
    } finally {
        // 释放分布式锁
        redisUtil.unlock(lockKey, lockValue);
    }
}

}

// 消息消费者 - MessageConsumer
package com.imessage.scheduler.service;

import com.imessage.scheduler.config.RabbitMQConfig;
import com.imessage.scheduler.entity.SendRecord;
import com.imessage.scheduler.entity.Task;
import com.imessage.scheduler.repository.SendRecordRepository;
import com.imessage.scheduler.util.HttpUtil;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

@Component
public class MessageConsumer {

private static final int THREAD_POOL_SIZE = 50;
private final ExecutorService executorService = new ThreadPoolExecutor(
        THREAD_POOL_SIZE,
        THREAD_POOL_SIZE,
        0L,
        TimeUnit.MILLISECONDS,
        new LinkedBlockingQueue<>()
);

@Resource
private TaskService taskService;

@Resource
private SendRecordRepository sendRecordRepository;

@Resource
private HttpUtil httpUtil;

@RabbitListener(queues = "imessage.task.queue.node1")
public void consumeNode1(Task task, Message message) {
    processTask(task, "node1");
}

@RabbitListener(queues = "imessage.task.queue.node2")
public void consumeNode2(Task task, Message message) {
    processTask(task, "node2");
}

@RabbitListener(queues = "imessage.task.queue.node3")
public void consumeNode3(Task task, Message message) {
    processTask(task, "node3");
}

private void processTask(Task task, String nodeId) {
    System.out.println("节点" + nodeId + "开始执行任务: " + task.getId());

    String[] phoneNumbers = task.getPhoneNumbers().split(",");
    List<Future<Boolean>> futures = new ArrayList<>();

    for (String phoneNumber : phoneNumbers) {
        futures.add(executorService.submit(() -> sendMessage(task, phoneNumber, nodeId)));
    }

    int successCount = 0;
    int failedCount = 0;

    for (Future<Boolean> future : futures) {
        try {
            if (future.get()) {
                successCount++;
            } else {
                failedCount++;
            }
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
            failedCount++;
        }
    }

    // 更新任务结果
    taskService.updateTaskResult(task.getId(), successCount, failedCount);
    taskService.updateTaskStatus(task.getId(), 2);

    System.out.println("任务执行完成: " + task.getId() + 
            ", 成功: " + successCount + ", 失败: " + failedCount);
}

private boolean sendMessage(Task task, String phoneNumber, String nodeId) {
    SendRecord record = new SendRecord();
    record.setTaskId(task.getId());
    record.setPhoneNumber(phoneNumber);
    record.setContent(task.getContent());
    record.setNodeId(nodeId);
    record.setStatus(0);
    record.setCreateTime(LocalDateTime.now());
    record.setUpdateTime(LocalDateTime.now());

    try {
        // 调用iMessage群发虚拟机接口发送消息
        String url = "http://" + getNodeIp(nodeId) + ":8080/api/message/send";
        String result = httpUtil.postJson(url, 
                "{\"phoneNumber\":\"" + phoneNumber + "\",\"content\":\"" + task.getContent() + "\"}");

        if (result.contains("success")) {
            record.setStatus(1);
            record.setSendTime(LocalDateTime.now());
            sendRecordRepository.insert(record);
            return true;
        } else {
            record.setStatus(2);
            record.setFailReason(result);
            sendRecordRepository.insert(record);
            return false;
        }
    } catch (Exception e) {
        e.printStackTrace();
        record.setStatus(2);
        record.setFailReason(e.getMessage());
        sendRecordRepository.insert(record);
        return false;
    }
}

private String getNodeIp(String nodeId) {
    // 实际项目中应该从数据库或配置中心获取节点IP
    return "127.0.0.1";
}

}

// 控制器 - TaskController
package com.imessage.scheduler.controller;

import com.imessage.scheduler.dto.TaskRequest;
import com.imessage.scheduler.dto.TaskResponse;
import com.imessage.scheduler.entity.Task;
import com.imessage.scheduler.service.TaskService;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;

import javax.annotation.Resource;

@RestController
@RequestMapping("/api/task")
public class TaskController {

@Resource
private TaskService taskService;

@PostMapping("/create")
public ResponseEntity<TaskResponse> createTask(@RequestBody TaskRequest request) {
    Long taskId = taskService.createTask(request);
    TaskResponse response = new TaskResponse();
    response.setTaskId(taskId);
    response.setMessage("任务创建成功");
    return ResponseEntity.ok(response);
}

@GetMapping("/{taskId}")
public ResponseEntity<Task> getTaskStatus(@PathVariable Long taskId) {
    Task task = taskService.getTaskById(taskId);
    if (task == null) {
        return ResponseEntity.notFound().build();
    }
    return ResponseEntity.ok(task);
}

}

// DTO类 - TaskRequest
package com.imessage.scheduler.dto;

import lombok.Data;

import java.time.LocalDateTime;

@Data
public class TaskRequest {
private String taskName;
private String content;
private String phoneNumbers;
private LocalDateTime scheduleTime;
private Integer priority;
}

// DTO类 - TaskResponse
package com.imessage.scheduler.dto;

import lombok.Data;

@Data
public class TaskResponse {
private Long taskId;
private String message;
}

// 异常处理类 - BusinessException
package com.imessage.scheduler.exception;

public class BusinessException extends RuntimeException {
private Integer code;
private String message;

public BusinessException(Integer code, String message) {
    this.code = code;
    this.message = message;
}

public Integer getCode() {
    return code;
}

public String getMessage() {
    return message;
}

}

// 全局异常处理器 - GlobalExceptionHandler
package com.imessage.scheduler.exception;

import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.RestControllerAdvice;

@RestControllerAdvice
public class GlobalExceptionHandler {

@ExceptionHandler(BusinessException.class)
public ResponseEntity<ErrorResponse> handleBusinessException(BusinessException e) {
    ErrorResponse error = new ErrorResponse();
    error.setCode(e.getCode());
    error.setMessage(e.getMessage());
    return new ResponseEntity<>(error, HttpStatus.BAD_REQUEST);
}

@ExceptionHandler(Exception.class)
public ResponseEntity<ErrorResponse> handleException(Exception e) {
    ErrorResponse error = new ErrorResponse();
    error.setCode(500);
    error.setMessage("服务器内部错误");
    e.printStackTrace();
    return new ResponseEntity<>(error, HttpStatus.INTERNAL_SERVER_ERROR);
}

static class ErrorResponse {
    private Integer code;
    private String message;

    public Integer getCode() {
        return code;
    }

    public void setCode(Integer code) {
        this.code = code;
    }

    public String getMessage() {
        return message;
    }

    public void setMessage(String message) {
        this.message = message;
    }
}

}

// 工具类 - HttpUtil
package com.imessage.scheduler.util;

import com.alibaba.fastjson.JSONObject;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.springframework.stereotype.Component;

import java.nio.charset.StandardCharsets;

@Component
public class HttpUtil {

private static final int CONNECT_TIMEOUT = 5000;
private static final int SOCKET_TIMEOUT = 10000;

public String postJson(String url, String json) throws Exception {
    CloseableHttpClient httpClient = HttpClients.createDefault();
    HttpPost httpPost = new HttpPost(url);

    RequestConfig requestConfig = RequestConfig.custom()
            .setConnectTimeout(CONNECT_TIMEOUT)
            .setSocketTimeout(SOCKET_TIMEOUT)
            .build();
    httpPost.setConfig(requestConfig);

    StringEntity entity = new StringEntity(json, StandardCharsets.UTF_8);
    entity.setContentType("application/json");
    httpPost.setEntity(entity);

    try (CloseableHttpResponse response = httpClient.execute(httpPost)) {
        return EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8);
    } finally {
        httpClient.close();
    }
}

}

// MyBatis Mapper接口 - TaskRepository
package com.imessage.scheduler.repository;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.imessage.scheduler.entity.Task;
import org.apache.ibatis.annotations.Mapper;

@Mapper
public interface TaskRepository extends BaseMapper {
}

// MyBatis Mapper接口 - NodeRepository
package com.imessage.scheduler.repository;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.imessage.scheduler.entity.Node;
import org.apache.ibatis.annotations.Mapper;

@Mapper
public interface NodeRepository extends BaseMapper {
}

// MyBatis Mapper接口 - SendRecordRepository
package com.imessage.scheduler.repository;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.imessage.scheduler.entity.SendRecord;
import org.apache.ibatis.annotations.Mapper;

@Mapper
public interface SendRecordRepository extends BaseMapper {
}
```

总结

本文详细介绍了iMessage群发虚拟机分布式任务调度系统的设计与实现。通过采用分布式架构,系统解决了传统单节点方案存在的并发能力有限、单点故障风险高等问题,实现了任务的自动分发、负载均衡和故障转移。系统使用Spring Boot作为基础框架,Quartz作为任务调度框架,Redis实现分布式锁,RabbitMQ实现任务的异步分发,具有良好的可扩展性和稳定性。
在实际应用中,还可以根据业务需求对系统进行进一步优化。例如,可以增加任务优先级调度机制,确保重要任务能够优先执行;可以增加任务分片功能,将大任务拆分成多个小任务并行执行,提高发送效率;可以增加更完善的监控和告警功能,及时发现和解决系统问题。希望本文能够对正在开发类似系统的开发者有所帮助。

相关文章
|
16天前
|
人工智能 JSON 供应链
畅用7个月无影 JVS Claw |手把手教你把JVS改造成「科研与产业地理情报可视化大师」
LucianaiB分享零成本畅用JVS Claw教程(学生认证享7个月使用权),并开源GeoMind项目——将JVS改造为科研与产业地理情报可视化AI助手,支持飞书文档解析、地理编码与腾讯地图可视化,助力产业关系图谱构建。
23521 12
畅用7个月无影 JVS Claw |手把手教你把JVS改造成「科研与产业地理情报可视化大师」
|
4天前
|
Shell API 开发工具
Claude Code 快速上手指南(新手友好版)
AI编程工具卷疯啦!Claude Code凭借任务驱动+终端原生的特性,成了开发者的效率搭子。本文从安装、登录、切换国产模型到常用命令,手把手带新手快速上手,全程避坑,30分钟独立用起来。
1294 7
|
5天前
|
人工智能 BI 持续交付
Claude Code 深度适配 DeepSeek V4-Pro 实测:全场景通关与真实体验报告
在 AI 编程工具日趋主流的今天,Claude Code 凭借强大的任务执行、工具调用与工程化能力,成为开发者与自动化运维的核心效率工具。但随着原生模型账号稳定性问题频发,寻找一套兼容、稳定、能力在线的替代方案变得尤为重要。DeepSeek V4-Pro 作为新一代高性能大模型,提供了完整兼容 Claude 协议的 API 接口,只需简单配置即可无缝驱动 Claude Code,且在任务执行、工具调用、复杂流程处理上表现极为稳定。
1405 3
|
10天前
|
人工智能 缓存 Shell
Claude Code 全攻略:命令大全 + 实战工作流(完整版)
Claude Code 是一款运行在终端环境下的 AI 编码助手,能够直接在项目目录中理解代码结构、编辑文件、执行命令、执行开发计划,并支持持久化记忆、上下文压缩、后台任务、多模型切换等专业能力。对于日常开发、项目维护、快速重构、代码审查等场景,它可以大幅减少手动操作、提升编码效率。本文从常用命令、界面模式、核心指令、记忆机制、图片处理、进阶工作流等维度完整说明,帮助开发者快速上手并稳定使用。
2553 4
|
3天前
|
人工智能 JSON BI
DeepSeek V4-Pro 接入 Claude Code 完全实战:体验、测试与关键避坑指南
Claude Code 作为当前主流的 AI 编程辅助工具,凭借强大的代码理解、工程执行与自动化能力深受开发者喜爱,但原生模型的使用成本相对较高。为了在保持能力的同时进一步降低开销,不少开发者开始寻找兼容度高、价格更友好的替代模型。DeepSeek V4 系列的发布带来了新的选择,该系列包含 V4-Pro 与 V4-Flash 两款模型,并提供了与 Anthropic 完全兼容的 API 接口,理论上只需简单修改配置,即可让 Claude Code 无缝切换为 DeepSeek 引擎。
970 0
|
20天前
|
人工智能 缓存 BI
Claude Code + DeepSeek V4-Pro 真实评测:除了贵,没别的毛病
JeecgBoot AI专题研究 把 Claude Code 接入 DeepSeek V4Pro,跑完 Skills —— OA 审批、大屏、报表、部署 5 大实战场景后的真实体验 ![](https://oscimg.oschina.net/oscnet/up608d34aeb6bafc47f
6081 22
Claude Code + DeepSeek V4-Pro 真实评测:除了贵,没别的毛病
|
21天前
|
人工智能 JSON BI
DeepSeek V4 来了!超越 Claude Sonnet 4.5,赶紧对接 Claude Code 体验一把
JeecgBoot AI专题研究 把 Claude Code 接入 DeepSeek V4Pro 的真实体验与避坑记录 本文记录我将 Claude Code 对接 DeepSeek 最新模型(V4Pro)后的真实体验,测试了 Skills 自动化查询和积木报表 AI 建表两个场景——有惊喜,也踩
7340 18