Springcloud实战之自研分布式id生成器7

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
注册配置 MSE Nacos/ZooKeeper,118元/月
云原生网关 MSE Higress,422元/月
简介: Springcloud实战之自研分布式id生成器7

生成mapper映射类,注意插入加入了乐观锁,注意这个sql

```package com.laoyang.id.dao.mapper;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;
import org.apache.ibatis.annotations.Update;
import com.laoyang.id.dao.po.IdGeneratePO;

import java.util.List;

/**

  • @Author idea
  • @Date: Created in 19:47 2023/5/25
  • @Description
    */
    @Mapper
    public interface IdGenerateMapper extends BaseMapper {

    @Update("update t_id_gengrate_config set next_threshold=next_threshold+step," +

         "current_start=current_start+step,version=version+1 where id =#{id} and version=#{version}")
    

    int updateNewIdCountAndVersion(@Param("id")int id,@Param("version")int version);

    @Select("select * from t_id_gengrate_config")
    List selectAll();
    }
    ```

    在service下创建bo类生成有序id和无序id对象

    ```package com.laoyang.id.service.bo;

import java.util.concurrent.atomic.AtomicLong;

/**

  • @Author idea
  • @Date: Created in 20:00 2023/5/25
  • @Description 有序id的BO对象
    */
    public class LocalSeqIdBO {

    private int id;
    /**

    • 在内存中记录的当前有序id的值
      */
      private AtomicLong currentNum;

      /**

    • 当前id段的开始值
      /
      private Long currentStart;
      /*
    • 当前id段的结束值
      */
      private Long nextThreshold;

      public int getId() {
      return id;
      }

      public void setId(int id) {
      this.id = id;
      }

      public AtomicLong getCurrentNum() {
      return currentNum;
      }

      public void setCurrentNum(AtomicLong currentNum) {
      this.currentNum = currentNum;
      }

      public Long getCurrentStart() {
      return currentStart;
      }

      public void setCurrentStart(Long currentStart) {
      this.currentStart = currentStart;
      }

      public Long getNextThreshold() {
      return nextThreshold;
      }

      public void setNextThreshold(Long nextThreshold) {
      this.nextThreshold = nextThreshold;
      }
      }
      package com.laoyang.id.service.bo;

import java.util.concurrent.ConcurrentLinkedQueue;

/**

  • @Author idea
  • @Date: Created in 20:32 2023/5/26
  • @Description 无序id的BO对象
    */
    public class LocalUnSeqIdBO {

    private int id;
    /**

    • 提前将无序的id存放在这条队列中
      /
      private ConcurrentLinkedQueue idQueue;
      /*
    • 当前id段的开始值
      /
      private Long currentStart;
      /*
    • 当前id段的结束值
      */
      private Long nextThreshold;

      public int getId() {
      return id;
      }

      public void setId(int id) {
      this.id = id;
      }

      public ConcurrentLinkedQueue getIdQueue() {
      return idQueue;
      }

      public void setIdQueue(ConcurrentLinkedQueue idQueue) {
      this.idQueue = idQueue;
      }

      public Long getCurrentStart() {
      return currentStart;
      }

      public void setCurrentStart(Long currentStart) {
      this.currentStart = currentStart;
      }

      public Long getNextThreshold() {
      return nextThreshold;
      }

      public void setNextThreshold(Long nextThreshold) {
      this.nextThreshold = nextThreshold;
      }
      }
      ```

      生成service类生成有序id与无序id

      ```package com.laoyang.id.service;

/**

  • @Author idea
  • @Date: Created in 19:58 2023/5/25
  • @Description
    */
    public interface IdGenerateService {

    /**

    • 获取有序id
      *
    • @param id
    • @return
      */
      Long getSeqId(Integer id);

      /**

    • 获取无序id
      *
    • @param id
    • @return
      */
      Long getUnSeqId(Integer id);
      }
      ```

      实现有序id和无序id方法(这里是关键,主要用到了原子类,一些同步类操作等等,线程池)

      ```package com.laoyang.id.service.impl;

import jakarta.annotation.Resource;
import com.laoyang.id.dao.mapper.IdGenerateMapper;
import com.laoyang.id.dao.po.IdGeneratePO;
import com.laoyang.id.service.IdGenerateService;
import com.laoyang.id.service.bo.LocalSeqIdBO;
import com.laoyang.id.service.bo.LocalUnSeqIdBO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;

/**

  • @Author idea
  • @Date: Created in 19:58 2023/5/25
  • @Description
    */
    @Service
    public class IdGenerateServiceImpl implements IdGenerateService, InitializingBean {

    @Resource
    private IdGenerateMapper idGenerateMapper;

    private static final Logger LOGGER = LoggerFactory.getLogger(IdGenerateServiceImpl.class);
    private static Map localSeqIdBOMap = new ConcurrentHashMap<>();
    private static Map localUnSeqIdBOMap = new ConcurrentHashMap<>();
    private static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(8, 16, 3, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1000),

         new ThreadFactory() {
             @Override
             public Thread newThread(Runnable r) {
                 Thread thread = new Thread(r);
                 thread.setName("id-generate-thread-" + ThreadLocalRandom.current().nextInt(1000));
                 return thread;
             }
         });
    

    private static final float UPDATE_RATE = 0.50f;
    private static final int SEQ_ID = 1;
    private static Map semaphoreMap = new ConcurrentHashMap<>();

    @Override
    public Long getUnSeqId(Integer id) {

     if (id == null) {
         LOGGER.error("[getSeqId] id is error,id is {}", id);
         return null;
     }
     LocalUnSeqIdBO localUnSeqIdBO = localUnSeqIdBOMap.get(id);
     if (localUnSeqIdBO == null) {
         LOGGER.error("[getUnSeqId] localUnSeqIdBO is null,id is {}", id);
         return null;
     }
     Long returnId = localUnSeqIdBO.getIdQueue().poll();
     if (returnId == null) {
         LOGGER.error("[getUnSeqId] returnId is null,id is {}", id);
         return null;
     }
     this.refreshLocalUnSeqId(localUnSeqIdBO);
     return returnId;
    

    }

    /*

    • @param id 传的是对应的业务id
    • @return
      */
      @Override
      public Long getSeqId(Integer id) {
      if (id == null) {

       LOGGER.error("[getSeqId] id is error,id is {}", id);
       return null;
      

      }
      LocalSeqIdBO localSeqIdBO = localSeqIdBOMap.get(id);
      if (localSeqIdBO == null) {

       LOGGER.error("[getSeqId] localSeqIdBO is null,id is {}", id);
       return null;
      

      }
      this.refreshLocalSeqId(localSeqIdBO);
      long returnId = localSeqIdBO.getCurrentNum().incrementAndGet();
      if (returnId > localSeqIdBO.getNextThreshold()) {

       //同步去刷新 可能是高并发下还未更新本地数据
       LOGGER.error("[getSeqId] id is over limit,id is {}", id);
       return null;
      

      }
      return returnId;
      }

      /**

    • 刷新本地有序id段
      *
    • @param localSeqIdBO
      /
      private void refreshLocalSeqId(LocalSeqIdBO localSeqIdBO) {
      long step = localSeqIdBO.getNextThreshold() - localSeqIdBO.getCurrentStart();
      if (localSeqIdBO.getCurrentNum().get() - localSeqIdBO.getCurrentStart() > step
      UPDATE_RATE) {

       Semaphore semaphore = semaphoreMap.get(localSeqIdBO.getId());
       if (semaphore == null) {
           LOGGER.error("semaphore is null,id is {}", localSeqIdBO.getId());
           return;
       }
       boolean acquireStatus = semaphore.tryAcquire();
       if (acquireStatus) {
           LOGGER.info("开始尝试进行本地id段的同步操作");
           //异步进行同步id段操作
           threadPoolExecutor.execute(new Runnable() {
               @Override
               public void run() {
                   try {
                       IdGeneratePO idGeneratePO = idGenerateMapper.selectById(localSeqIdBO.getId());
                       tryUpdateMySQLRecord(idGeneratePO);
                   } catch (Exception e) {
                       LOGGER.error("[refreshLocalSeqId] error is ", e);
                   } finally {
                       semaphoreMap.get(localSeqIdBO.getId()).release();
                       LOGGER.info("本地有序id段同步完成,id is {}", localSeqIdBO.getId());
                   }
               }
           });
       }
      

      }
      }

      /**

    • 刷新本地无序id段
      *
    • @param localUnSeqIdBO
      /
      private void refreshLocalUnSeqId(LocalUnSeqIdBO localUnSeqIdBO) {
      long begin = localUnSeqIdBO.getCurrentStart();
      long end = localUnSeqIdBO.getNextThreshold();
      long remainSize = localUnSeqIdBO.getIdQueue().size();
      //如果使用剩余空间不足25%,则进行刷新
      if ((end - begin)
      0.35 > remainSize) {

       LOGGER.info("本地无序id段同步开始,id is {}", localUnSeqIdBO.getId());
       Semaphore semaphore = semaphoreMap.get(localUnSeqIdBO.getId());
       if (semaphore == null) {
           LOGGER.error("semaphore is null,id is {}", localUnSeqIdBO.getId());
           return;
       }
       boolean acquireStatus = semaphore.tryAcquire();
       if (acquireStatus) {
           threadPoolExecutor.execute(new Runnable() {
               @Override
               public void run() {
                   try {
                       IdGeneratePO idGeneratePO = idGenerateMapper.selectById(localUnSeqIdBO.getId());
                       tryUpdateMySQLRecord(idGeneratePO);
                   } catch (Exception e) {
                       LOGGER.error("[refreshLocalUnSeqId] error is ", e);
                   } finally {
                       semaphoreMap.get(localUnSeqIdBO.getId()).release();
                       LOGGER.info("本地无序id段同步完成,id is {}", localUnSeqIdBO.getId());
                   }
               }
           });
       }
      

      }
      }

      //bean初始化的时候会回调到这里
      @Override
      public void afterPropertiesSet() throws Exception {
      List idGeneratePOList = idGenerateMapper.selectAll();
      for (IdGeneratePO idGeneratePO : idGeneratePOList) {

       LOGGER.info("服务刚启动,抢占新的id段");
       tryUpdateMySQLRecord(idGeneratePO);
       semaphoreMap.put(idGeneratePO.getId(), new Semaphore(1));
      

      }
      }

      /**

    • 更新mysql里面的分布式id的配置信息,占用相应的id段
    • 同步执行,很多的网络IO,性能较慢
      *
    • @param idGeneratePO
      */
      private void tryUpdateMySQLRecord(IdGeneratePO idGeneratePO) {
      int updateResult = idGenerateMapper.updateNewIdCountAndVersion(idGeneratePO.getId(), idGeneratePO.getVersion());
      if (updateResult > 0) {

       localIdBOHandler(idGeneratePO);
       return;
      

      }
      //重试进行更新
      for (int i = 0; i < 3; i++) {

       idGeneratePO = idGenerateMapper.selectById(idGeneratePO.getId());
       updateResult = idGenerateMapper.updateNewIdCountAndVersion(idGeneratePO.getId(), idGeneratePO.getVersion());
       if (updateResult > 0) {
           localIdBOHandler(idGeneratePO);
           return;
       }
      

      }
      throw new RuntimeException("表id段占用失败,竞争过于激烈,id is " + idGeneratePO.getId());
      }

      /**

    • 专门处理如何将本地ID对象放入到Map中,并且进行初始化的
      *
    • @param idGeneratePO
      */
      private void localIdBOHandler(IdGeneratePO idGeneratePO) {
      long currentStart = idGeneratePO.getCurrentStart();
      long nextThreshold = idGeneratePO.getNextThreshold();
      long currentNum = currentStart;
      if (idGeneratePO.getIsSeq() == SEQ_ID) {
       LocalSeqIdBO localSeqIdBO = new LocalSeqIdBO();
       AtomicLong atomicLong = new AtomicLong(currentNum);
       localSeqIdBO.setId(idGeneratePO.getId());
       localSeqIdBO.setCurrentNum(atomicLong);
       localSeqIdBO.setCurrentStart(currentStart);
       localSeqIdBO.setNextThreshold(nextThreshold);
       localSeqIdBOMap.put(localSeqIdBO.getId(), localSeqIdBO);
      
      } else {
       LocalUnSeqIdBO localUnSeqIdBO = new LocalUnSeqIdBO();
       localUnSeqIdBO.setCurrentStart(currentStart);
       localUnSeqIdBO.setNextThreshold(nextThreshold);
       localUnSeqIdBO.setId(idGeneratePO.getId());
       long begin = localUnSeqIdBO.getCurrentStart();
       long end = localUnSeqIdBO.getNextThreshold();
       List<Long> idList = new ArrayList<>();
       for (long i = begin; i < end; i++) {
           idList.add(i);
       }
       //将本地id段提前打乱,然后放入到队列中
       Collections.shuffle(idList);
       ConcurrentLinkedQueue<Long> idQueue = new ConcurrentLinkedQueue<>();
       idQueue.addAll(idList);
       localUnSeqIdBO.setIdQueue(idQueue);
       localUnSeqIdBOMap.put(localUnSeqIdBO.getId(), localUnSeqIdBO);
      
      }
      }
      }
      ```

      最后创建启动类

    ```package com.laoyang.id;

import jakarta.annotation.Resource;
import org.apache.dubbo.config.spring.context.annotation.EnableDubbo;
import com.laoyang.id.service.IdGenerateService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;

import java.util.HashSet;

/**

  • @Author idea
  • @Date: Created in 19:45 2023/5/25
  • @Description
    */
    @SpringBootApplication
    public class IdGenerateApplication implements CommandLineRunner {

    private static final Logger LOGGER = LoggerFactory.getLogger(IdGenerateApplication.class);

    @Resource
    private IdGenerateService idGenerateService;

    public static void main(String[] args) {

     SpringApplication springApplication = new SpringApplication(IdGenerateApplication.class);
     springApplication.setWebApplicationType(WebApplicationType.NONE);
     springApplication.run(args);
    

    }

    @Override
    public void run(String... args) throws Exception {

     HashSet<Long> idSet = new HashSet<>();
     for (int i = 0; i < 1500; i++) {
         Long id = idGenerateService.getSeqId(1);
         System.out.println(id);
         idSet.add(id);
     }
     System.out.println(idSet.size());
    

    }
    }
    ```

    最终会在控制台打印输出!

相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
3月前
|
资源调度 Java 调度
Spring Cloud Alibaba 集成分布式定时任务调度功能
定时任务在企业应用中至关重要,常用于异步数据处理、自动化运维等场景。在单体应用中,利用Java的`java.util.Timer`或Spring的`@Scheduled`即可轻松实现。然而,进入微服务架构后,任务可能因多节点并发执行而重复。Spring Cloud Alibaba为此发布了Scheduling模块,提供轻量级、高可用的分布式定时任务解决方案,支持防重复执行、分片运行等功能,并可通过`spring-cloud-starter-alibaba-schedulerx`快速集成。用户可选择基于阿里云SchedulerX托管服务或采用本地开源方案(如ShedLock)
122 1
|
6天前
|
JSON Java 测试技术
SpringCloud2023实战之接口服务测试工具SpringBootTest
SpringBootTest同时集成了JUnit Jupiter、AssertJ、Hamcrest测试辅助库,使得更容易编写但愿测试代码。
34 3
|
30天前
|
人工智能 文字识别 Java
SpringCloud+Python 混合微服务,如何打造AI分布式业务应用的技术底层?
尼恩,一位拥有20年架构经验的老架构师,通过其深厚的架构功力,成功指导了一位9年经验的网易工程师转型为大模型架构师,薪资逆涨50%,年薪近80W。尼恩的指导不仅帮助这位工程师在一年内成为大模型架构师,还让他管理起了10人团队,产品成功应用于多家大中型企业。尼恩因此决定编写《LLM大模型学习圣经》系列,帮助更多人掌握大模型架构,实现职业跃迁。该系列包括《从0到1吃透Transformer技术底座》、《从0到1精通RAG架构》等,旨在系统化、体系化地讲解大模型技术,助力读者实现“offer直提”。此外,尼恩还分享了多个技术圣经,如《NIO圣经》、《Docker圣经》等,帮助读者深入理解核心技术。
SpringCloud+Python 混合微服务,如何打造AI分布式业务应用的技术底层?
|
1月前
|
NoSQL Java Redis
开发实战:使用Redisson实现分布式延时消息,订单30分钟关闭的另外一种实现!
本文详细介绍了 Redisson 延迟队列(DelayedQueue)的实现原理,包括基本使用、内部数据结构、基本流程、发送和获取延时消息以及初始化延时队列等内容。文章通过代码示例和流程图,逐步解析了延迟消息的发送、接收及处理机制,帮助读者深入了解 Redisson 延迟队列的工作原理。
|
2月前
|
存储 NoSQL Redis
SpringCloud基础7——Redis分布式缓存,RDB,AOF持久化+主从+哨兵+分片集群
Redis持久化、RDB和AOF方案、Redis主从集群、哨兵、分片集群、散列插槽、自动手动故障转移
SpringCloud基础7——Redis分布式缓存,RDB,AOF持久化+主从+哨兵+分片集群
|
2月前
|
SQL NoSQL 数据库
SpringCloud基础6——分布式事务,Seata
分布式事务、ACID原则、CAP定理、Seata、Seata的四种分布式方案:XA、AT、TCC、SAGA模式
SpringCloud基础6——分布式事务,Seata
|
2月前
|
消息中间件 Java 对象存储
数据一致性挑战:Spring Cloud与Netflix OSS下的分布式事务管理
数据一致性挑战:Spring Cloud与Netflix OSS下的分布式事务管理
50 2
|
3月前
|
消息中间件 SQL 关系型数据库
go-zero微服务实战系列(十、分布式事务如何实现)
go-zero微服务实战系列(十、分布式事务如何实现)
|
3月前
|
Dubbo Java 调度
揭秘!Spring Cloud Alibaba的超级力量——如何轻松驾驭分布式定时任务调度?
【8月更文挑战第20天】在现代微服务架构中,Spring Cloud Alibaba通过集成分布式定时任务调度功能解决了一致性和可靠性挑战。它利用TimerX实现任务的分布式编排与调度,并通过`@SchedulerLock`确保任务不被重复执行。示例代码展示了如何配置定时任务及其分布式锁,以实现每5秒仅由一个节点执行任务,适合构建高可用的微服务系统。
68 0