Springcloud实战之自研分布式id生成器7

简介: Springcloud实战之自研分布式id生成器7

生成mapper映射类,注意插入加入了乐观锁,注意这个sql

```package com.laoyang.id.dao.mapper;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;
import org.apache.ibatis.annotations.Update;
import com.laoyang.id.dao.po.IdGeneratePO;

import java.util.List;

/**

  • @Author idea
  • @Date: Created in 19:47 2023/5/25
  • @Description
    */
    @Mapper
    public interface IdGenerateMapper extends BaseMapper {

    @Update("update t_id_gengrate_config set next_threshold=next_threshold+step," +

         "current_start=current_start+step,version=version+1 where id =#{id} and version=#{version}")
    

    int updateNewIdCountAndVersion(@Param("id")int id,@Param("version")int version);

    @Select("select * from t_id_gengrate_config")
    List selectAll();
    }
    ```

    在service下创建bo类生成有序id和无序id对象

    ```package com.laoyang.id.service.bo;

import java.util.concurrent.atomic.AtomicLong;

/**

  • @Author idea
  • @Date: Created in 20:00 2023/5/25
  • @Description 有序id的BO对象
    */
    public class LocalSeqIdBO {

    private int id;
    /**

    • 在内存中记录的当前有序id的值
      */
      private AtomicLong currentNum;

      /**

    • 当前id段的开始值
      /
      private Long currentStart;
      /*
    • 当前id段的结束值
      */
      private Long nextThreshold;

      public int getId() {
      return id;
      }

      public void setId(int id) {
      this.id = id;
      }

      public AtomicLong getCurrentNum() {
      return currentNum;
      }

      public void setCurrentNum(AtomicLong currentNum) {
      this.currentNum = currentNum;
      }

      public Long getCurrentStart() {
      return currentStart;
      }

      public void setCurrentStart(Long currentStart) {
      this.currentStart = currentStart;
      }

      public Long getNextThreshold() {
      return nextThreshold;
      }

      public void setNextThreshold(Long nextThreshold) {
      this.nextThreshold = nextThreshold;
      }
      }
      package com.laoyang.id.service.bo;

import java.util.concurrent.ConcurrentLinkedQueue;

/**

  • @Author idea
  • @Date: Created in 20:32 2023/5/26
  • @Description 无序id的BO对象
    */
    public class LocalUnSeqIdBO {

    private int id;
    /**

    • 提前将无序的id存放在这条队列中
      /
      private ConcurrentLinkedQueue idQueue;
      /*
    • 当前id段的开始值
      /
      private Long currentStart;
      /*
    • 当前id段的结束值
      */
      private Long nextThreshold;

      public int getId() {
      return id;
      }

      public void setId(int id) {
      this.id = id;
      }

      public ConcurrentLinkedQueue getIdQueue() {
      return idQueue;
      }

      public void setIdQueue(ConcurrentLinkedQueue idQueue) {
      this.idQueue = idQueue;
      }

      public Long getCurrentStart() {
      return currentStart;
      }

      public void setCurrentStart(Long currentStart) {
      this.currentStart = currentStart;
      }

      public Long getNextThreshold() {
      return nextThreshold;
      }

      public void setNextThreshold(Long nextThreshold) {
      this.nextThreshold = nextThreshold;
      }
      }
      ```

      生成service类生成有序id与无序id

      ```package com.laoyang.id.service;

/**

  • @Author idea
  • @Date: Created in 19:58 2023/5/25
  • @Description
    */
    public interface IdGenerateService {

    /**

    • 获取有序id
      *
    • @param id
    • @return
      */
      Long getSeqId(Integer id);

      /**

    • 获取无序id
      *
    • @param id
    • @return
      */
      Long getUnSeqId(Integer id);
      }
      ```

      实现有序id和无序id方法(这里是关键,主要用到了原子类,一些同步类操作等等,线程池)

      ```package com.laoyang.id.service.impl;

import jakarta.annotation.Resource;
import com.laoyang.id.dao.mapper.IdGenerateMapper;
import com.laoyang.id.dao.po.IdGeneratePO;
import com.laoyang.id.service.IdGenerateService;
import com.laoyang.id.service.bo.LocalSeqIdBO;
import com.laoyang.id.service.bo.LocalUnSeqIdBO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;

/**

  • @Author idea
  • @Date: Created in 19:58 2023/5/25
  • @Description
    */
    @Service
    public class IdGenerateServiceImpl implements IdGenerateService, InitializingBean {

    @Resource
    private IdGenerateMapper idGenerateMapper;

    private static final Logger LOGGER = LoggerFactory.getLogger(IdGenerateServiceImpl.class);
    private static Map localSeqIdBOMap = new ConcurrentHashMap<>();
    private static Map localUnSeqIdBOMap = new ConcurrentHashMap<>();
    private static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(8, 16, 3, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1000),

         new ThreadFactory() {
             @Override
             public Thread newThread(Runnable r) {
                 Thread thread = new Thread(r);
                 thread.setName("id-generate-thread-" + ThreadLocalRandom.current().nextInt(1000));
                 return thread;
             }
         });
    

    private static final float UPDATE_RATE = 0.50f;
    private static final int SEQ_ID = 1;
    private static Map semaphoreMap = new ConcurrentHashMap<>();

    @Override
    public Long getUnSeqId(Integer id) {

     if (id == null) {
         LOGGER.error("[getSeqId] id is error,id is {}", id);
         return null;
     }
     LocalUnSeqIdBO localUnSeqIdBO = localUnSeqIdBOMap.get(id);
     if (localUnSeqIdBO == null) {
         LOGGER.error("[getUnSeqId] localUnSeqIdBO is null,id is {}", id);
         return null;
     }
     Long returnId = localUnSeqIdBO.getIdQueue().poll();
     if (returnId == null) {
         LOGGER.error("[getUnSeqId] returnId is null,id is {}", id);
         return null;
     }
     this.refreshLocalUnSeqId(localUnSeqIdBO);
     return returnId;
    

    }

    /*

    • @param id 传的是对应的业务id
    • @return
      */
      @Override
      public Long getSeqId(Integer id) {
      if (id == null) {

       LOGGER.error("[getSeqId] id is error,id is {}", id);
       return null;
      

      }
      LocalSeqIdBO localSeqIdBO = localSeqIdBOMap.get(id);
      if (localSeqIdBO == null) {

       LOGGER.error("[getSeqId] localSeqIdBO is null,id is {}", id);
       return null;
      

      }
      this.refreshLocalSeqId(localSeqIdBO);
      long returnId = localSeqIdBO.getCurrentNum().incrementAndGet();
      if (returnId > localSeqIdBO.getNextThreshold()) {

       //同步去刷新 可能是高并发下还未更新本地数据
       LOGGER.error("[getSeqId] id is over limit,id is {}", id);
       return null;
      

      }
      return returnId;
      }

      /**

    • 刷新本地有序id段
      *
    • @param localSeqIdBO
      /
      private void refreshLocalSeqId(LocalSeqIdBO localSeqIdBO) {
      long step = localSeqIdBO.getNextThreshold() - localSeqIdBO.getCurrentStart();
      if (localSeqIdBO.getCurrentNum().get() - localSeqIdBO.getCurrentStart() > step
      UPDATE_RATE) {

       Semaphore semaphore = semaphoreMap.get(localSeqIdBO.getId());
       if (semaphore == null) {
           LOGGER.error("semaphore is null,id is {}", localSeqIdBO.getId());
           return;
       }
       boolean acquireStatus = semaphore.tryAcquire();
       if (acquireStatus) {
           LOGGER.info("开始尝试进行本地id段的同步操作");
           //异步进行同步id段操作
           threadPoolExecutor.execute(new Runnable() {
               @Override
               public void run() {
                   try {
                       IdGeneratePO idGeneratePO = idGenerateMapper.selectById(localSeqIdBO.getId());
                       tryUpdateMySQLRecord(idGeneratePO);
                   } catch (Exception e) {
                       LOGGER.error("[refreshLocalSeqId] error is ", e);
                   } finally {
                       semaphoreMap.get(localSeqIdBO.getId()).release();
                       LOGGER.info("本地有序id段同步完成,id is {}", localSeqIdBO.getId());
                   }
               }
           });
       }
      

      }
      }

      /**

    • 刷新本地无序id段
      *
    • @param localUnSeqIdBO
      /
      private void refreshLocalUnSeqId(LocalUnSeqIdBO localUnSeqIdBO) {
      long begin = localUnSeqIdBO.getCurrentStart();
      long end = localUnSeqIdBO.getNextThreshold();
      long remainSize = localUnSeqIdBO.getIdQueue().size();
      //如果使用剩余空间不足25%,则进行刷新
      if ((end - begin)
      0.35 > remainSize) {

       LOGGER.info("本地无序id段同步开始,id is {}", localUnSeqIdBO.getId());
       Semaphore semaphore = semaphoreMap.get(localUnSeqIdBO.getId());
       if (semaphore == null) {
           LOGGER.error("semaphore is null,id is {}", localUnSeqIdBO.getId());
           return;
       }
       boolean acquireStatus = semaphore.tryAcquire();
       if (acquireStatus) {
           threadPoolExecutor.execute(new Runnable() {
               @Override
               public void run() {
                   try {
                       IdGeneratePO idGeneratePO = idGenerateMapper.selectById(localUnSeqIdBO.getId());
                       tryUpdateMySQLRecord(idGeneratePO);
                   } catch (Exception e) {
                       LOGGER.error("[refreshLocalUnSeqId] error is ", e);
                   } finally {
                       semaphoreMap.get(localUnSeqIdBO.getId()).release();
                       LOGGER.info("本地无序id段同步完成,id is {}", localUnSeqIdBO.getId());
                   }
               }
           });
       }
      

      }
      }

      //bean初始化的时候会回调到这里
      @Override
      public void afterPropertiesSet() throws Exception {
      List idGeneratePOList = idGenerateMapper.selectAll();
      for (IdGeneratePO idGeneratePO : idGeneratePOList) {

       LOGGER.info("服务刚启动,抢占新的id段");
       tryUpdateMySQLRecord(idGeneratePO);
       semaphoreMap.put(idGeneratePO.getId(), new Semaphore(1));
      

      }
      }

      /**

    • 更新mysql里面的分布式id的配置信息,占用相应的id段
    • 同步执行,很多的网络IO,性能较慢
      *
    • @param idGeneratePO
      */
      private void tryUpdateMySQLRecord(IdGeneratePO idGeneratePO) {
      int updateResult = idGenerateMapper.updateNewIdCountAndVersion(idGeneratePO.getId(), idGeneratePO.getVersion());
      if (updateResult > 0) {

       localIdBOHandler(idGeneratePO);
       return;
      

      }
      //重试进行更新
      for (int i = 0; i < 3; i++) {

       idGeneratePO = idGenerateMapper.selectById(idGeneratePO.getId());
       updateResult = idGenerateMapper.updateNewIdCountAndVersion(idGeneratePO.getId(), idGeneratePO.getVersion());
       if (updateResult > 0) {
           localIdBOHandler(idGeneratePO);
           return;
       }
      

      }
      throw new RuntimeException("表id段占用失败,竞争过于激烈,id is " + idGeneratePO.getId());
      }

      /**

    • 专门处理如何将本地ID对象放入到Map中,并且进行初始化的
      *
    • @param idGeneratePO
      */
      private void localIdBOHandler(IdGeneratePO idGeneratePO) {
      long currentStart = idGeneratePO.getCurrentStart();
      long nextThreshold = idGeneratePO.getNextThreshold();
      long currentNum = currentStart;
      if (idGeneratePO.getIsSeq() == SEQ_ID) {
       LocalSeqIdBO localSeqIdBO = new LocalSeqIdBO();
       AtomicLong atomicLong = new AtomicLong(currentNum);
       localSeqIdBO.setId(idGeneratePO.getId());
       localSeqIdBO.setCurrentNum(atomicLong);
       localSeqIdBO.setCurrentStart(currentStart);
       localSeqIdBO.setNextThreshold(nextThreshold);
       localSeqIdBOMap.put(localSeqIdBO.getId(), localSeqIdBO);
      
      } else {
       LocalUnSeqIdBO localUnSeqIdBO = new LocalUnSeqIdBO();
       localUnSeqIdBO.setCurrentStart(currentStart);
       localUnSeqIdBO.setNextThreshold(nextThreshold);
       localUnSeqIdBO.setId(idGeneratePO.getId());
       long begin = localUnSeqIdBO.getCurrentStart();
       long end = localUnSeqIdBO.getNextThreshold();
       List<Long> idList = new ArrayList<>();
       for (long i = begin; i < end; i++) {
           idList.add(i);
       }
       //将本地id段提前打乱,然后放入到队列中
       Collections.shuffle(idList);
       ConcurrentLinkedQueue<Long> idQueue = new ConcurrentLinkedQueue<>();
       idQueue.addAll(idList);
       localUnSeqIdBO.setIdQueue(idQueue);
       localUnSeqIdBOMap.put(localUnSeqIdBO.getId(), localUnSeqIdBO);
      
      }
      }
      }
      ```

      最后创建启动类

    ```package com.laoyang.id;

import jakarta.annotation.Resource;
import org.apache.dubbo.config.spring.context.annotation.EnableDubbo;
import com.laoyang.id.service.IdGenerateService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;

import java.util.HashSet;

/**

  • @Author idea
  • @Date: Created in 19:45 2023/5/25
  • @Description
    */
    @SpringBootApplication
    public class IdGenerateApplication implements CommandLineRunner {

    private static final Logger LOGGER = LoggerFactory.getLogger(IdGenerateApplication.class);

    @Resource
    private IdGenerateService idGenerateService;

    public static void main(String[] args) {

     SpringApplication springApplication = new SpringApplication(IdGenerateApplication.class);
     springApplication.setWebApplicationType(WebApplicationType.NONE);
     springApplication.run(args);
    

    }

    @Override
    public void run(String... args) throws Exception {

     HashSet<Long> idSet = new HashSet<>();
     for (int i = 0; i < 1500; i++) {
         Long id = idGenerateService.getSeqId(1);
         System.out.println(id);
         idSet.add(id);
     }
     System.out.println(idSet.size());
    

    }
    }
    ```

    最终会在控制台打印输出!

相关实践学习
自建数据库迁移到云数据库
本场景将引导您将网站的自建数据库平滑迁移至云数据库RDS。通过使用RDS,您可以获得稳定、可靠和安全的企业级数据库服务,可以更加专注于发展核心业务,无需过多担心数据库的管理和维护。
MySQL数据库入门学习
本课程通过最流行的开源数据库MySQL带你了解数据库的世界。 &nbsp; 相关的阿里云产品:云数据库RDS MySQL 版 阿里云关系型数据库RDS(Relational Database Service)是一种稳定可靠、可弹性伸缩的在线数据库服务,提供容灾、备份、恢复、迁移等方面的全套解决方案,彻底解决数据库运维的烦恼。 了解产品详情:&nbsp;https://www.aliyun.com/product/rds/mysql&nbsp;
相关文章
|
9月前
|
人工智能 Kubernetes 数据可视化
Kubernetes下的分布式采集系统设计与实战:趋势监测失效引发的架构进化
本文回顾了一次关键词监测任务在容器集群中失效的全过程,分析了中转IP复用、调度节奏和异常处理等隐性风险,并提出通过解耦架构、动态IP分发和行为模拟优化采集策略,最终实现稳定高效的数据抓取与分析。
161 2
Kubernetes下的分布式采集系统设计与实战:趋势监测失效引发的架构进化
|
9月前
|
数据采集 缓存 NoSQL
分布式新闻数据采集系统的同步效率优化实战
本文介绍了一个针对高频新闻站点的分布式爬虫系统优化方案。通过引入异步任务机制、本地缓存池、Redis pipeline 批量写入及身份池策略,系统采集效率提升近两倍,数据同步延迟显著降低,实现了分钟级热点追踪能力,为实时舆情监控与分析提供了高效、稳定的数据支持。
366 1
分布式新闻数据采集系统的同步效率优化实战
|
10月前
|
缓存 NoSQL 算法
高并发秒杀系统实战(Redis+Lua分布式锁防超卖与库存扣减优化)
秒杀系统面临瞬时高并发、资源竞争和数据一致性挑战。传统方案如数据库锁或应用层锁存在性能瓶颈或分布式问题,而基于Redis的分布式锁与Lua脚本原子操作成为高效解决方案。通过Redis的`SETNX`实现分布式锁,结合Lua脚本完成库存扣减,确保操作原子性并大幅提升性能(QPS从120提升至8,200)。此外,分段库存策略、多级限流及服务降级机制进一步优化系统稳定性。最佳实践包括分层防控、黄金扣减法则与容灾设计,强调根据业务特性灵活组合技术手段以应对高并发场景。
2707 7
|
11月前
|
监控 Java 调度
SpringBoot中@Scheduled和Quartz的区别是什么?分布式定时任务框架选型实战
本文对比分析了SpringBoot中的`@Scheduled`与Quartz定时任务框架。`@Scheduled`轻量易用,适合单机简单场景,但存在多实例重复执行、无持久化等缺陷;Quartz功能强大,支持分布式调度、任务持久化、动态调整和失败重试,适用于复杂企业级需求。文章通过特性对比、代码示例及常见问题解答,帮助开发者理解两者差异,合理选择方案。记住口诀:单机简单用注解,多节点上Quartz;若是任务要可靠,持久化配置不能少。
962 4
|
SpringCloudAlibaba API 开发者
新版-SpringCloud+SpringCloud Alibaba
新版-SpringCloud+SpringCloud Alibaba
|
资源调度 Java 调度
Spring Cloud Alibaba 集成分布式定时任务调度功能
定时任务在企业应用中至关重要,常用于异步数据处理、自动化运维等场景。在单体应用中,利用Java的`java.util.Timer`或Spring的`@Scheduled`即可轻松实现。然而,进入微服务架构后,任务可能因多节点并发执行而重复。Spring Cloud Alibaba为此发布了Scheduling模块,提供轻量级、高可用的分布式定时任务解决方案,支持防重复执行、分片运行等功能,并可通过`spring-cloud-starter-alibaba-schedulerx`快速集成。用户可选择基于阿里云SchedulerX托管服务或采用本地开源方案(如ShedLock)
510 1
|
负载均衡 Dubbo Java
Spring Cloud Alibaba与Spring Cloud区别和联系?
Spring Cloud Alibaba与Spring Cloud区别和联系?
|
人工智能 SpringCloudAlibaba 自然语言处理
SpringCloud Alibaba AI整合DeepSeek落地AI项目实战
在现代软件开发领域,微服务架构因其灵活性、可扩展性和模块化特性而受到广泛欢迎。微服务架构通过将大型应用程序拆分为多个小型、独立的服务,每个服务运行在其独立的进程中,服务与服务间通过轻量级通信机制(通常是HTTP API)进行通信。这种架构模式有助于提升系统的可维护性、可扩展性和开发效率。
4914 2
|
SpringCloudAlibaba 负载均衡 Dubbo
【SpringCloud Alibaba系列】Dubbo高级特性篇
本章我们介绍Dubbo的常用高级特性,包括序列化、地址缓存、超时与重试机制、多版本、负载均衡。集群容错、服务降级等。
1992 7
【SpringCloud Alibaba系列】Dubbo高级特性篇