💖分布式任务调度有那么难吗?来,10分钟带你实战💖

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
注册配置 MSE Nacos/ZooKeeper,118元/月
简介: 作业分片是指任务的分布式执行,需要将一个任务拆分为多个独立的任务项,然后由分布式的应用实例分别执行某一个或者几个分布项。

1.JPG


一、概述


1.1、什么是任务调度


我们可以思考一下下面业务场景的解决方案:


  • 某电商平台需要每天上午10点,下午3点,晚上8点发放一批优惠券
  • 某银行系统需要在信用卡到期还款日的前三天进行短信提醒
  • 某财务系统需要在每天凌晨0:10分结算前一天的财务数据,统计汇总


以上场景就是任务调度所需要解决的问题,任务调度是为了自动完成特定任务,在约定的特定时刻去执行任务的过程


在Spring中也提供了定时任务注解@Scheduled。我们只需要在业务中贴上注解然后在启动类上贴上@EnableScheduling注解即可完成任务调度功能。


@Scheduled(cron = "0/20 * * * * ? ") // 每隔20秒执行一次
 public void doWork(){
  //doSomething   
 }
复制代码


1.2、分布式调度出现


感觉Spring给我们提供的这个注解可以完成任务调度的功能,好像已经完美解决问题了,为什么还需要分布式呢?主要的原因有以下几点:


  1. 机处理极限:原本1分钟内需要处理1万个订单,但是现在需要1分钟内处理10万个订单;原来一个统计需要1小时,现在业务方需要10分钟就统计出来。你也许会说,你也可以多线程、单机多进程处理。的确,多线程并行处理可以提高单位时间的处理效率,但是单机能力毕竟有限(主要是CPU、内存和磁盘),始终会有单机处理不过来的情况。


  1. 高可用:单机版的定式任务调度只能在一台机器上运行,如果程序或者系统出现异常就会导致功能不可用。虽然可以在单机程序实现的足够稳定,但始终有机会遇到非程序引起的故障,而这个对于一个系统的核心功能来说是不可接受的。


  1. 防止重复执行: 在单机模式下,定时任务是没什么问题的。但当我们部署了多台服务,同时又每台服务又有定时任务时,若不进行合理的控制在同一时间,只有一个定时任务启动执行,这时,定时执行的结果就可能存在混乱和错误了。


1.3、Elastic-Job


Elastic-Job是一个分布式调度的解决方案,由当当网开源,它由两个相互独立的子项目Elastic-job-Lite和Elastic-Job-Cloud组成,使用Elastic-Job可以快速实现分布式任务调度。Elastic-Job的github地址。他的功能主要是:


  • 分布式调度协调
    在分布式环境中,任务能够按照指定的调度策略执行,并且能够避免同一任务多实例重复执行。
  • 丰富的调度策略:
    基于成熟的定时任务作业框架Quartz cron表达式执行定时任务。
  • 弹性拓容缩容
    当集群中增加一个实例,它应当能够被选举被执行任务;当集群减少一个实例时,他所执行的任务能被转移到别的示例中执行。
  • 失效转移
    某示例在任务执行失败后,会被转移到其他实例执行。
  • 错过执行任务重触发
    若因某种原因导致作业错过执行,自动记录错误执行的作业,并在下次次作业完成后自动触发。
  • 支持并行调度
    支持任务分片,任务分片是指将一个任务分成多个小任务在多个实例同时执行。
  • 作业分片一致性
    当任务被分片后,保证同一分片在分布式环境中仅一个执行实例。
  • 支持作业生命周期操作
    可以动态对任务进行开启及停止操作。
  • 丰富的作业类型
    支持Simple、DataFlow、Script三种作业类型


3.JPG


1.4、启动zookeeper


  1. 解压包
  2. 到conf目录中把oo_sample.cfg 拷贝一份 , 修改名字为zoo.cfg。
  3. 到bin目录中启动startup.cmd文件(用1命令行启动)。


1.4、启动zookeeper图形化界面


  1. 解压。
  2. 到1build目录中,找到jar包。
  3. 使用命令:java -jar jar包的名字


二、Elastic-Job快速入门


Elastic-Job的1环境要求:

  • JDK 要求1.7以上保本
  • Maven 要求3.0.4及以上版本
  • Zookeeper 要求采取3.4.6以上版本


2.1、环境搭建


安装运行zookeeper

创建一个maven项目并导入依赖

<dependency>
    <groupId>com.dangdang</groupId>
    <artifactId>elastic-job-lite-core</artifactId>
    <version>2.1.5</version>
</dependency>
复制代码

写任务类

public class XiaoLinJob implements SimpleJob {
  // 写任务类
  @Override
  public void execute(ShardingContext shardingContext) {
    System.out.println("定时任务开始");
  }
}
复制代码

编写配置类

public class JobDemo {
  public static void main(String[] args) {
    new JobScheduler(createRegistryCenter(), createJobConfiguration()).init();
  }
  private static CoordinatorRegistryCenter createRegistryCenter() {
    //配置zk地址,调度任务的组名
    ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration("127.0.0.1:2181", "elastic-job-demo");
    zookeeperConfiguration.setSessionTimeoutMilliseconds(1000);
    CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(zookeeperConfiguration);
    regCenter.init();
    return regCenter;
  }
  private static LiteJobConfiguration createJobConfiguration() {
    // 定义作业核心配置
    JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder("demoSimpleJob","0/1 * * * * ?",1).build();
    // 定义SIMPLE类型配置
    SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig, XiaoLinJob.class.getCanonicalName());
    // 定义Lite作业根配置
    LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).build();
    return simpleJobRootConfig;
  }
}
复制代码


2.2、测试


  1. 当只有一台启动的时候,按照corn表达式进行任务调度。
  2. 开启两台机器的1时候,新开的一台会继续执行定时任务,旧的1那一台会停止。


三、SpringBoot集成Elastic-Job


3.1、引入依赖


<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.1.3.RELEASE</version>
  </parent>
  <dependencies>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
      <groupId>com.dangdang</groupId>
      <artifactId>elastic-job-lite-spring</artifactId>
      <version>2.1.5</version>
    </dependency>
    <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
    </dependency>
  </dependencies>
复制代码


3.2、编写配置文件


application.yaml


因为配置中心的地址不是固定的,所以我们不能写死在代码中,需要把他写在配置文件中。新建一个配置文件:


elasticjob:
  zookeeper-url: localhost:2181
  group-name: elastic-job-group
复制代码

zookeeper注册中心配置类

// 注册中心的配置类
@Configuration
public class RegistryCenterConfig {
  @Bean(initMethod = "init")
                                                         // 从配置文件中获取注册中心的的url和命名空间
  public CoordinatorRegistryCenter coordinatorRegistryCenter(
                        @Value("${elasticjob.zookeeper-url}") String zookeeperUrl,
                                            @Value("${elasticjob.group-name}") String namespace){
    // zk的配置
    ZookeeperConfiguration zookeeperConfiguration =
        new ZookeeperConfiguration(zookeeperUrl,namespace);
    // 设置超时时间
    zookeeperConfiguration.setMaxSleepTimeMilliseconds(10000000);
    // 创建注册中心
    ZookeeperRegistryCenter zookeeperRegistryCenter = new ZookeeperRegistryCenter(zookeeperConfiguration);
    return zookeeperRegistryCenter;
  }
}
复制代码

任务调度的配置类

@Configuration
public class JobConfig {
  @Autowired
  XiaoLinJob xiaoLinJob;
  @Autowired
  private CoordinatorRegistryCenter registryCenter;
  private static LiteJobConfiguration createJobConfiguration(
                      final Class<? extends SimpleJob> jobClass, // 任务的名字
                                       final String cron, // cron表达式
                                       final int shardingTotalCount, // 分片的数量
                                       final String shardingItemParameters // 分片类信奉的参数
                                                                                                  ){
    JobCoreConfiguration.Builder jobCoreConfigurationBuilder = JobCoreConfiguration.newBuilder(jobClass.getSimpleName(),cron,shardingTotalCount);
    if(!StringUtils.isEmpty(shardingItemParameters)){
      jobCoreConfigurationBuilder.shardingItemParameters(shardingItemParameters);
    }
    SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(jobCoreConfigurationBuilder.build(), XiaoLinJob.class.getCanonicalName());
    LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).overwrite(true).build();
    return simpleJobRootConfig;
  }
  @Bean(initMethod = "init")
  public SpringJobScheduler initSimpleElasticJob(){
    SpringJobScheduler springJobScheduler = new SpringJobScheduler(xiaoLinJob,registryCenter,createJobConfiguration(XiaoLinJob.class,"0/3 * * * * ?",1,null));
    return springJobScheduler;
  }
}
复制代码

自定义任务类

@Component
public class XiaoLinJob implements SimpleJob {
  @Override
  public void execute(ShardingContext shardingContext) {
    System.out.println("============");
  }
}
复制代码


3.3、测试


四、小案例


4.1、单机版本


4.1.1、需求描述


数据库中有一些列的数据,需要对这些数据进行备份操作,备份完之后,修改数据的状态,标记已经备份了。


4.1.2、创建数据库


SET FOREIGN_KEY_CHECKS=0;
-- ----------------------------
-- Table structure for t_file_custom
-- ----------------------------
DROP TABLE IF EXISTS `t_file_custom`;
CREATE TABLE `t_file_custom` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `name` varchar(255) DEFAULT NULL,
  `content` varchar(255) DEFAULT NULL,
  `type` varchar(255) DEFAULT NULL,
  `backedUp` tinyint(4) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=21 DEFAULT CHARSET=utf8;
-- ----------------------------
-- Records of t_file_custom
-- ----------------------------
INSERT INTO `t_file_custom` VALUES ('1', '文件1', '内容1', 'text', '1');
INSERT INTO `t_file_custom` VALUES ('2', '文件2', '内容2', 'text', '1');
INSERT INTO `t_file_custom` VALUES ('3', '文件3', '内容3', 'text', '1');
INSERT INTO `t_file_custom` VALUES ('4', '文件4', '内容4', 'image', '1');
INSERT INTO `t_file_custom` VALUES ('5', '文件5', '内容5', 'image', '1');
INSERT INTO `t_file_custom` VALUES ('6', '文件6', '内容6', 'text', '1');
INSERT INTO `t_file_custom` VALUES ('7', '文件6', '内容7', 'radio', '1');
INSERT INTO `t_file_custom` VALUES ('8', '文件8', '内容8', 'radio', '1');
INSERT INTO `t_file_custom` VALUES ('9', '文件9', '内容9', 'vedio', '1');
INSERT INTO `t_file_custom` VALUES ('10', '文件10', '内容10', 'vedio', '1');
INSERT INTO `t_file_custom` VALUES ('11', '文件11', '内容11', 'vedio', '1');
INSERT INTO `t_file_custom` VALUES ('12', '文件12', '内容12', 'vedio', '1');
INSERT INTO `t_file_custom` VALUES ('13', '文件13', '内容13', 'image', '1');
INSERT INTO `t_file_custom` VALUES ('14', '文件14', '内容14', 'text', '1');
INSERT INTO `t_file_custom` VALUES ('15', '文件15', '内容15', 'image', '1');
INSERT INTO `t_file_custom` VALUES ('16', '文件16', '内容16', 'text', '1');
INSERT INTO `t_file_custom` VALUES ('17', '文件17', '内容17', 'radio', '1');
INSERT INTO `t_file_custom` VALUES ('18', '文件18', '内容18', 'image', '1');
INSERT INTO `t_file_custom` VALUES ('19', '文件19', '内容19', 'radio', '1');
INSERT INTO `t_file_custom` VALUES ('20', '文件20', '内容20', 'vedio', '1');
复制代码


4.1.3、Druid&MyBatis


4.1.3.1、添加依赖


<dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
            <version>1.1.10</version>
        </dependency>
        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>1.2.0</version>
        </dependency>
        <!--mysql驱动-->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
        </dependency>
复制代码


4.1.3.2、集成数据库


spring:
  datasource:
    url: jdbc:mysql://localhost:3306/elastic-job-demo?serverTimezone=GMT%2B8
    driverClassName: com.mysql.jdbc.Driver
    type: com.alibaba.druid.pool.DruidDataSource
    username: root
    password: admin
复制代码


4.1.4、添加实体类


@Data
public class FileCustom {
    //唯一标识
    private Long id;
    //文件名
    private String name;
    //文件类型
    private String type;
    //文件内容
    private String content;
    //是否已备份
    private Boolean backedUp = false;
    public FileCustom(){}
    public FileCustom(Long id, String name, String type, String content){
        this.id = id;
        this.name = name;
        this.type = type;
        this.content = content;
    }
}
复制代码


4.1.5、添加任务类


@Slf4j
@Component
public class FileCustomElasticJob implements SimpleJob {
  @Autowired
  FileCopyMapper fileCopyMapper;
  @Override
  public void execute(ShardingContext shardingContext) {
    doWork();
  }
  private void doWork() {
    List<FileCustom> fileCustoms = fileCopyMapper.selectAll();
    if (fileCustoms.size() == 0){
      log.info("备份完成");
      return;
    }
    log.info("需要备份的文件个数为:"+fileCustoms.size());
    for (FileCustom fileCustom : fileCustoms) {
      backUpFile(fileCustom);
    }
  }
  private void backUpFile(FileCustom fileCustom) {
    try {
      Thread.sleep(1000);
      log.info("执行备份文件:"+fileCustom);
      fileCopyMapper.backUpFile(fileCustom.getId());
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }
}
复制代码


4.1.6、添加Mapper


@Mapper
public interface FileCopyMapper {
  @Select("select * from t_file_custom where backedUp = 0")
  List<FileCustom> selectAll();
  @Update("update t_file_custom set backedUp = 1 where id = #{id}")
  void backUpFile(Long id);
}
复制代码


4.1.7、添加任务调度配置


@Bean(initMethod = "init")
  public SpringJobScheduler initFileCustomElasticJob(FileCustomElasticJob fileCustomElasticJob){
    SpringJobScheduler springJobScheduler = new SpringJobScheduler(fileCustomElasticJob,registryCenter,createJobConfiguration(XiaoLinJob.class,"0/3 * * * * ?",1,null));
    return springJobScheduler;
  }
复制代码


4.1.8、存在的问题


为了高可用,我们会对这个项目做集群的操作,可以保证其中一台挂了,另外一台可以继续工作.但是在集群的情况下,调度任务只在一台机器上运行,如果单个任务调度比较耗时,耗资源的情况下,对这台机器的消耗还是比较大的。


但是这个时候,其他机器却是空闲着的,如何合理的利用集群的其他机器且如何让任务执行得更快些呢?这时候Elastic-Job提供了任务调度分片的功能。


4.2、集群版本


4.2.1、分片概念


作业分片是指任务的分布式执行,需要将一个任务拆分为多个独立的任务项,然后由分布式的应用实例分别执行某一个或者几个分布项。


例如在单机版本的备份数据的案例,如果有两台服务器,每台服务器分别跑一个应用实例。为了快速执行作业,那么可以讲任务分成4片,每个应用实例都执行两片。作业遍历数据逻辑应为:实例1查找text和image类型文件执行备份,实例2查找radio和vedio类型文件执行备份。


如果由于服务器拓容应用实例数量增加为4,则作业遍历数据的逻辑应为: 4个实例分别处理text、image、radio、video类型的文件。


通过对任务的合理分片化,从而达到任务并行处理的效果,他的好处是:


  1. 分片项与业务处理解耦:Elastic-Job并不直接提供数据处理的功能,框架只会将分片项分配至各个运行中的作业服务器,开发者需要自行处理分片项与真实数据的对应关系。
  2. 最大限度利用资源:将分片项设置大于服务器的数据,最好是大于服务器倍数的数量,作业将会合理利用分布式资源,动态的分配分片项。例如:3台服务器,分成10片,则分片项结果为服务器A=0、1、2。服务器B=3、4、5。服务器C=6、7、8、9。如果 服务器C奔溃,则分片项分配结果为服务器A=0、1、2、3、4。服务器B=5、6、7、8、9。在不丢失分片项的情况下,最大限度利用现有的资源提高吞吐量。


4.2.2、配置类修改


如果想将单机版本改为集群版本,我们首先需要在任务配置类中增加分片个数以及分片参数。


@Bean(initMethod = "init")
  public SpringJobScheduler initFileCustomElasticJob(FileCustomElasticJob fileCustomElasticJob){
    SpringJobScheduler springJobScheduler = new 
 //第一个参数表示自定义任务类,第二个参数是corn表达式,第三个参数是分片个数,第四个参数是分片的名称,第一个分片作用是查询类型为test的,以此类推 
 SpringJobScheduler(fileCustomElasticJob,registryCenter,createJobConfiguration(XiaoLinJob.class,"0/3 * * * * ?",4,"0=text,1=image,2=radio,3=vedio"));
    return springJobScheduler;
  }
复制代码


4.2.3、 新增作业分片逻辑


@Slf4j
@Component
public class FileCustomElasticJob implements SimpleJob {
  @Autowired
  FileCopyMapper fileCopyMapper;
  @Override
  public void execute(ShardingContext shardingContext) {
    // 获取到指定分片的类型
    doWork(shardingContext.getShardingParameter());
  }
  private void doWork(String fileType) {
    List<FileCustom> fileCustoms = fileCopyMapper.selectByType(fileType);
    if (fileCustoms.size() == 0){
      log.info("备份完成");
      return;
    }
    log.info("需要备份的文件类型是:"+fileType+"文件个数为:"+fileCustoms.size());
    for (FileCustom fileCustom : fileCustoms) {
      backUpFile(fileCustom);
    }
  }
  private void backUpFile(FileCustom fileCustom) {
    try {
      Thread.sleep(2000);
      log.info("执行备份文件:"+fileCustom);
      fileCopyMapper.backUpFile(fileCustom.getId());
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }
}
复制代码


4.2.4、Mapper类修改


@Mapper
public interface FileCopyMapper {
  @Select("select * from t_file_custom where backedUp = 0")
  List<FileCustom> selectAll();
  @Update("update t_file_custom set backedUp = 1 where id = #{id}")
  void backUpFile(Long id);
  @Select("select * from t_file_custom where backedUp = 0 and type = #{fileType}")
  List<FileCustom> selectByType(String fileType);
}
复制代码


4.2.5、测试


4.2.5.1、一台机器


一台机器启动四个线程直接跑完。


4.2.5.2、四台机器


当四台机器启动的时候,每台机器分得一个线程,查询并备份一种类型的数据。


1.JPG

----------------------------------------------------------我是分割线----------------------------------------------------------

8.JPG

----------------------------------------------------------我是分割线----------------------------------------------------------

9.JPG

----------------------------------------------------------我是分割线----------------------------------------------------------

10.JPG

相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
相关文章
|
8月前
|
存储 分布式计算 大数据
HBase分布式数据库关键技术与实战:面试经验与必备知识点解析
【4月更文挑战第9天】本文深入剖析了HBase的核心技术,包括数据模型、分布式架构、访问模式和一致性保证,并探讨了其实战应用,如大规模数据存储、实时数据分析及与Hadoop、Spark集成。同时,分享了面试经验,对比了HBase与其他数据库的差异,提出了应对挑战的解决方案,展望了HBase的未来趋势。通过Java API代码示例,帮助读者巩固理解。全面了解和掌握HBase,能为面试和实际工作中的大数据处理提供坚实基础。
478 3
|
10天前
|
数据管理 API 调度
鸿蒙HarmonyOS应用开发 | 探索 HarmonyOS Next-从开发到实战掌握 HarmonyOS Next 的分布式能力
HarmonyOS Next 是华为新一代操作系统,专注于分布式技术的深度应用与生态融合。本文通过技术特点、应用场景及实战案例,全面解析其核心技术架构与开发流程。重点介绍分布式软总线2.0、数据管理、任务调度等升级特性,并提供基于 ArkTS 的原生开发支持。通过开发跨设备协同音乐播放应用,展示分布式能力的实际应用,涵盖项目配置、主界面设计、分布式服务实现及部署调试步骤。此外,深入分析分布式数据同步原理、任务调度优化及常见问题解决方案,帮助开发者掌握 HarmonyOS Next 的核心技术和实战技巧。
136 76
鸿蒙HarmonyOS应用开发 | 探索 HarmonyOS Next-从开发到实战掌握 HarmonyOS Next 的分布式能力
|
11天前
|
物联网 调度 vr&ar
鸿蒙HarmonyOS应用开发 |鸿蒙技术分享HarmonyOS Next 深度解析:分布式能力与跨设备协作实战
鸿蒙技术分享:HarmonyOS Next 深度解析 随着万物互联时代的到来,华为发布的 HarmonyOS Next 在技术架构和生态体验上实现了重大升级。本文从技术架构、生态优势和开发实践三方面深入探讨其特点,并通过跨设备笔记应用实战案例,展示其强大的分布式能力和多设备协作功能。核心亮点包括新一代微内核架构、统一开发语言 ArkTS 和多模态交互支持。开发者可借助 DevEco Studio 4.0 快速上手,体验高效、灵活的开发过程。 239个字符
169 13
鸿蒙HarmonyOS应用开发 |鸿蒙技术分享HarmonyOS Next 深度解析:分布式能力与跨设备协作实战
|
18天前
|
NoSQL Java Redis
秒杀抢购场景下实战JVM级别锁与分布式锁
在电商系统中,秒杀抢购活动是一种常见的营销手段。它通过设定极低的价格和有限的商品数量,吸引大量用户在特定时间点抢购,从而迅速增加销量、提升品牌曝光度和用户活跃度。然而,这种活动也对系统的性能和稳定性提出了极高的要求。特别是在秒杀开始的瞬间,系统需要处理海量的并发请求,同时确保数据的准确性和一致性。 为了解决这些问题,系统开发者们引入了锁机制。锁机制是一种用于控制对共享资源的并发访问的技术,它能够确保在同一时间只有一个进程或线程能够操作某个资源,从而避免数据不一致或冲突。在秒杀抢购场景下,锁机制显得尤为重要,它能够保证商品库存的扣减操作是原子性的,避免出现超卖或数据不一致的情况。
47 10
|
7月前
|
消息中间件 NoSQL Java
Redis系列学习文章分享---第六篇(Redis实战篇--Redis分布式锁+实现思路+误删问题+原子性+lua脚本+Redisson功能介绍+可重入锁+WatchDog机制+multiLock)
Redis系列学习文章分享---第六篇(Redis实战篇--Redis分布式锁+实现思路+误删问题+原子性+lua脚本+Redisson功能介绍+可重入锁+WatchDog机制+multiLock)
253 0
|
3月前
|
NoSQL Java Redis
开发实战:使用Redisson实现分布式延时消息,订单30分钟关闭的另外一种实现!
本文详细介绍了 Redisson 延迟队列(DelayedQueue)的实现原理,包括基本使用、内部数据结构、基本流程、发送和获取延时消息以及初始化延时队列等内容。文章通过代码示例和流程图,逐步解析了延迟消息的发送、接收及处理机制,帮助读者深入了解 Redisson 延迟队列的工作原理。
|
5月前
|
消息中间件 Java Kafka
"Kafka快速上手:从环境搭建到Java Producer与Consumer实战,轻松掌握分布式流处理平台"
【8月更文挑战第10天】Apache Kafka作为分布式流处理平台的领头羊,凭借其高吞吐量、可扩展性和容错性,在大数据处理、实时日志收集及消息队列领域表现卓越。初学者需掌握Kafka基本概念与操作。Kafka的核心组件包括Producer(生产者)、Broker(服务器)和Consumer(消费者)。Producer发送消息到Topic,Broker负责存储与转发,Consumer则读取这些消息。首先确保已安装Java和Kafka,并启动服务。接着可通过命令行创建Topic,并使用提供的Java API实现Producer发送消息和Consumer读取消息的功能。
92 8
|
5月前
|
消息中间件 SQL 关系型数据库
go-zero微服务实战系列(十、分布式事务如何实现)
go-zero微服务实战系列(十、分布式事务如何实现)
|
6月前
|
NoSQL Java 调度
在Spring Boot中实现分布式任务调度
在Spring Boot中实现分布式任务调度