Springcloud实战之自研分布式id生成器7

本文涉及的产品
注册配置 MSE Nacos/ZooKeeper,118元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
云原生网关 MSE Higress,422元/月
简介: Springcloud实战之自研分布式id生成器7

生成mapper映射类,注意插入加入了乐观锁,注意这个sql

```package com.laoyang.id.dao.mapper;

import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;
import org.apache.ibatis.annotations.Update;
import com.laoyang.id.dao.po.IdGeneratePO;

import java.util.List;

/**

  • @Author idea
  • @Date: Created in 19:47 2023/5/25
  • @Description
    */
    @Mapper
    public interface IdGenerateMapper extends BaseMapper {

    @Update("update t_id_gengrate_config set next_threshold=next_threshold+step," +

         "current_start=current_start+step,version=version+1 where id =#{id} and version=#{version}")
    

    int updateNewIdCountAndVersion(@Param("id")int id,@Param("version")int version);

    @Select("select * from t_id_gengrate_config")
    List selectAll();
    }
    ```

    在service下创建bo类生成有序id和无序id对象

    ```package com.laoyang.id.service.bo;

import java.util.concurrent.atomic.AtomicLong;

/**

  • @Author idea
  • @Date: Created in 20:00 2023/5/25
  • @Description 有序id的BO对象
    */
    public class LocalSeqIdBO {

    private int id;
    /**

    • 在内存中记录的当前有序id的值
      */
      private AtomicLong currentNum;

      /**

    • 当前id段的开始值
      /
      private Long currentStart;
      /*
    • 当前id段的结束值
      */
      private Long nextThreshold;

      public int getId() {
      return id;
      }

      public void setId(int id) {
      this.id = id;
      }

      public AtomicLong getCurrentNum() {
      return currentNum;
      }

      public void setCurrentNum(AtomicLong currentNum) {
      this.currentNum = currentNum;
      }

      public Long getCurrentStart() {
      return currentStart;
      }

      public void setCurrentStart(Long currentStart) {
      this.currentStart = currentStart;
      }

      public Long getNextThreshold() {
      return nextThreshold;
      }

      public void setNextThreshold(Long nextThreshold) {
      this.nextThreshold = nextThreshold;
      }
      }
      package com.laoyang.id.service.bo;

import java.util.concurrent.ConcurrentLinkedQueue;

/**

  • @Author idea
  • @Date: Created in 20:32 2023/5/26
  • @Description 无序id的BO对象
    */
    public class LocalUnSeqIdBO {

    private int id;
    /**

    • 提前将无序的id存放在这条队列中
      /
      private ConcurrentLinkedQueue idQueue;
      /*
    • 当前id段的开始值
      /
      private Long currentStart;
      /*
    • 当前id段的结束值
      */
      private Long nextThreshold;

      public int getId() {
      return id;
      }

      public void setId(int id) {
      this.id = id;
      }

      public ConcurrentLinkedQueue getIdQueue() {
      return idQueue;
      }

      public void setIdQueue(ConcurrentLinkedQueue idQueue) {
      this.idQueue = idQueue;
      }

      public Long getCurrentStart() {
      return currentStart;
      }

      public void setCurrentStart(Long currentStart) {
      this.currentStart = currentStart;
      }

      public Long getNextThreshold() {
      return nextThreshold;
      }

      public void setNextThreshold(Long nextThreshold) {
      this.nextThreshold = nextThreshold;
      }
      }
      ```

      生成service类生成有序id与无序id

      ```package com.laoyang.id.service;

/**

  • @Author idea
  • @Date: Created in 19:58 2023/5/25
  • @Description
    */
    public interface IdGenerateService {

    /**

    • 获取有序id
      *
    • @param id
    • @return
      */
      Long getSeqId(Integer id);

      /**

    • 获取无序id
      *
    • @param id
    • @return
      */
      Long getUnSeqId(Integer id);
      }
      ```

      实现有序id和无序id方法(这里是关键,主要用到了原子类,一些同步类操作等等,线程池)

      ```package com.laoyang.id.service.impl;

import jakarta.annotation.Resource;
import com.laoyang.id.dao.mapper.IdGenerateMapper;
import com.laoyang.id.dao.po.IdGeneratePO;
import com.laoyang.id.service.IdGenerateService;
import com.laoyang.id.service.bo.LocalSeqIdBO;
import com.laoyang.id.service.bo.LocalUnSeqIdBO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;

/**

  • @Author idea
  • @Date: Created in 19:58 2023/5/25
  • @Description
    */
    @Service
    public class IdGenerateServiceImpl implements IdGenerateService, InitializingBean {

    @Resource
    private IdGenerateMapper idGenerateMapper;

    private static final Logger LOGGER = LoggerFactory.getLogger(IdGenerateServiceImpl.class);
    private static Map localSeqIdBOMap = new ConcurrentHashMap<>();
    private static Map localUnSeqIdBOMap = new ConcurrentHashMap<>();
    private static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(8, 16, 3, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1000),

         new ThreadFactory() {
             @Override
             public Thread newThread(Runnable r) {
                 Thread thread = new Thread(r);
                 thread.setName("id-generate-thread-" + ThreadLocalRandom.current().nextInt(1000));
                 return thread;
             }
         });
    

    private static final float UPDATE_RATE = 0.50f;
    private static final int SEQ_ID = 1;
    private static Map semaphoreMap = new ConcurrentHashMap<>();

    @Override
    public Long getUnSeqId(Integer id) {

     if (id == null) {
         LOGGER.error("[getSeqId] id is error,id is {}", id);
         return null;
     }
     LocalUnSeqIdBO localUnSeqIdBO = localUnSeqIdBOMap.get(id);
     if (localUnSeqIdBO == null) {
         LOGGER.error("[getUnSeqId] localUnSeqIdBO is null,id is {}", id);
         return null;
     }
     Long returnId = localUnSeqIdBO.getIdQueue().poll();
     if (returnId == null) {
         LOGGER.error("[getUnSeqId] returnId is null,id is {}", id);
         return null;
     }
     this.refreshLocalUnSeqId(localUnSeqIdBO);
     return returnId;
    

    }

    /*

    • @param id 传的是对应的业务id
    • @return
      */
      @Override
      public Long getSeqId(Integer id) {
      if (id == null) {

       LOGGER.error("[getSeqId] id is error,id is {}", id);
       return null;
      

      }
      LocalSeqIdBO localSeqIdBO = localSeqIdBOMap.get(id);
      if (localSeqIdBO == null) {

       LOGGER.error("[getSeqId] localSeqIdBO is null,id is {}", id);
       return null;
      

      }
      this.refreshLocalSeqId(localSeqIdBO);
      long returnId = localSeqIdBO.getCurrentNum().incrementAndGet();
      if (returnId > localSeqIdBO.getNextThreshold()) {

       //同步去刷新 可能是高并发下还未更新本地数据
       LOGGER.error("[getSeqId] id is over limit,id is {}", id);
       return null;
      

      }
      return returnId;
      }

      /**

    • 刷新本地有序id段
      *
    • @param localSeqIdBO
      /
      private void refreshLocalSeqId(LocalSeqIdBO localSeqIdBO) {
      long step = localSeqIdBO.getNextThreshold() - localSeqIdBO.getCurrentStart();
      if (localSeqIdBO.getCurrentNum().get() - localSeqIdBO.getCurrentStart() > step
      UPDATE_RATE) {

       Semaphore semaphore = semaphoreMap.get(localSeqIdBO.getId());
       if (semaphore == null) {
           LOGGER.error("semaphore is null,id is {}", localSeqIdBO.getId());
           return;
       }
       boolean acquireStatus = semaphore.tryAcquire();
       if (acquireStatus) {
           LOGGER.info("开始尝试进行本地id段的同步操作");
           //异步进行同步id段操作
           threadPoolExecutor.execute(new Runnable() {
               @Override
               public void run() {
                   try {
                       IdGeneratePO idGeneratePO = idGenerateMapper.selectById(localSeqIdBO.getId());
                       tryUpdateMySQLRecord(idGeneratePO);
                   } catch (Exception e) {
                       LOGGER.error("[refreshLocalSeqId] error is ", e);
                   } finally {
                       semaphoreMap.get(localSeqIdBO.getId()).release();
                       LOGGER.info("本地有序id段同步完成,id is {}", localSeqIdBO.getId());
                   }
               }
           });
       }
      

      }
      }

      /**

    • 刷新本地无序id段
      *
    • @param localUnSeqIdBO
      /
      private void refreshLocalUnSeqId(LocalUnSeqIdBO localUnSeqIdBO) {
      long begin = localUnSeqIdBO.getCurrentStart();
      long end = localUnSeqIdBO.getNextThreshold();
      long remainSize = localUnSeqIdBO.getIdQueue().size();
      //如果使用剩余空间不足25%,则进行刷新
      if ((end - begin)
      0.35 > remainSize) {

       LOGGER.info("本地无序id段同步开始,id is {}", localUnSeqIdBO.getId());
       Semaphore semaphore = semaphoreMap.get(localUnSeqIdBO.getId());
       if (semaphore == null) {
           LOGGER.error("semaphore is null,id is {}", localUnSeqIdBO.getId());
           return;
       }
       boolean acquireStatus = semaphore.tryAcquire();
       if (acquireStatus) {
           threadPoolExecutor.execute(new Runnable() {
               @Override
               public void run() {
                   try {
                       IdGeneratePO idGeneratePO = idGenerateMapper.selectById(localUnSeqIdBO.getId());
                       tryUpdateMySQLRecord(idGeneratePO);
                   } catch (Exception e) {
                       LOGGER.error("[refreshLocalUnSeqId] error is ", e);
                   } finally {
                       semaphoreMap.get(localUnSeqIdBO.getId()).release();
                       LOGGER.info("本地无序id段同步完成,id is {}", localUnSeqIdBO.getId());
                   }
               }
           });
       }
      

      }
      }

      //bean初始化的时候会回调到这里
      @Override
      public void afterPropertiesSet() throws Exception {
      List idGeneratePOList = idGenerateMapper.selectAll();
      for (IdGeneratePO idGeneratePO : idGeneratePOList) {

       LOGGER.info("服务刚启动,抢占新的id段");
       tryUpdateMySQLRecord(idGeneratePO);
       semaphoreMap.put(idGeneratePO.getId(), new Semaphore(1));
      

      }
      }

      /**

    • 更新mysql里面的分布式id的配置信息,占用相应的id段
    • 同步执行,很多的网络IO,性能较慢
      *
    • @param idGeneratePO
      */
      private void tryUpdateMySQLRecord(IdGeneratePO idGeneratePO) {
      int updateResult = idGenerateMapper.updateNewIdCountAndVersion(idGeneratePO.getId(), idGeneratePO.getVersion());
      if (updateResult > 0) {

       localIdBOHandler(idGeneratePO);
       return;
      

      }
      //重试进行更新
      for (int i = 0; i < 3; i++) {

       idGeneratePO = idGenerateMapper.selectById(idGeneratePO.getId());
       updateResult = idGenerateMapper.updateNewIdCountAndVersion(idGeneratePO.getId(), idGeneratePO.getVersion());
       if (updateResult > 0) {
           localIdBOHandler(idGeneratePO);
           return;
       }
      

      }
      throw new RuntimeException("表id段占用失败,竞争过于激烈,id is " + idGeneratePO.getId());
      }

      /**

    • 专门处理如何将本地ID对象放入到Map中,并且进行初始化的
      *
    • @param idGeneratePO
      */
      private void localIdBOHandler(IdGeneratePO idGeneratePO) {
      long currentStart = idGeneratePO.getCurrentStart();
      long nextThreshold = idGeneratePO.getNextThreshold();
      long currentNum = currentStart;
      if (idGeneratePO.getIsSeq() == SEQ_ID) {
       LocalSeqIdBO localSeqIdBO = new LocalSeqIdBO();
       AtomicLong atomicLong = new AtomicLong(currentNum);
       localSeqIdBO.setId(idGeneratePO.getId());
       localSeqIdBO.setCurrentNum(atomicLong);
       localSeqIdBO.setCurrentStart(currentStart);
       localSeqIdBO.setNextThreshold(nextThreshold);
       localSeqIdBOMap.put(localSeqIdBO.getId(), localSeqIdBO);
      
      } else {
       LocalUnSeqIdBO localUnSeqIdBO = new LocalUnSeqIdBO();
       localUnSeqIdBO.setCurrentStart(currentStart);
       localUnSeqIdBO.setNextThreshold(nextThreshold);
       localUnSeqIdBO.setId(idGeneratePO.getId());
       long begin = localUnSeqIdBO.getCurrentStart();
       long end = localUnSeqIdBO.getNextThreshold();
       List<Long> idList = new ArrayList<>();
       for (long i = begin; i < end; i++) {
           idList.add(i);
       }
       //将本地id段提前打乱,然后放入到队列中
       Collections.shuffle(idList);
       ConcurrentLinkedQueue<Long> idQueue = new ConcurrentLinkedQueue<>();
       idQueue.addAll(idList);
       localUnSeqIdBO.setIdQueue(idQueue);
       localUnSeqIdBOMap.put(localUnSeqIdBO.getId(), localUnSeqIdBO);
      
      }
      }
      }
      ```

      最后创建启动类

    ```package com.laoyang.id;

import jakarta.annotation.Resource;
import org.apache.dubbo.config.spring.context.annotation.EnableDubbo;
import com.laoyang.id.service.IdGenerateService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;

import java.util.HashSet;

/**

  • @Author idea
  • @Date: Created in 19:45 2023/5/25
  • @Description
    */
    @SpringBootApplication
    public class IdGenerateApplication implements CommandLineRunner {

    private static final Logger LOGGER = LoggerFactory.getLogger(IdGenerateApplication.class);

    @Resource
    private IdGenerateService idGenerateService;

    public static void main(String[] args) {

     SpringApplication springApplication = new SpringApplication(IdGenerateApplication.class);
     springApplication.setWebApplicationType(WebApplicationType.NONE);
     springApplication.run(args);
    

    }

    @Override
    public void run(String... args) throws Exception {

     HashSet<Long> idSet = new HashSet<>();
     for (int i = 0; i < 1500; i++) {
         Long id = idGenerateService.getSeqId(1);
         System.out.println(id);
         idSet.add(id);
     }
     System.out.println(idSet.size());
    

    }
    }
    ```

    最终会在控制台打印输出!

相关实践学习
如何快速连接云数据库RDS MySQL
本场景介绍如何通过阿里云数据管理服务DMS快速连接云数据库RDS MySQL,然后进行数据表的CRUD操作。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助 &nbsp; &nbsp; 相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
18天前
|
数据管理 API 调度
鸿蒙HarmonyOS应用开发 | 探索 HarmonyOS Next-从开发到实战掌握 HarmonyOS Next 的分布式能力
HarmonyOS Next 是华为新一代操作系统,专注于分布式技术的深度应用与生态融合。本文通过技术特点、应用场景及实战案例,全面解析其核心技术架构与开发流程。重点介绍分布式软总线2.0、数据管理、任务调度等升级特性,并提供基于 ArkTS 的原生开发支持。通过开发跨设备协同音乐播放应用,展示分布式能力的实际应用,涵盖项目配置、主界面设计、分布式服务实现及部署调试步骤。此外,深入分析分布式数据同步原理、任务调度优化及常见问题解决方案,帮助开发者掌握 HarmonyOS Next 的核心技术和实战技巧。
163 76
鸿蒙HarmonyOS应用开发 | 探索 HarmonyOS Next-从开发到实战掌握 HarmonyOS Next 的分布式能力
|
11天前
|
存储 JavaScript 开发工具
基于HarmonyOS 5.0(NEXT)与SpringCloud架构的跨平台应用开发与服务集成研究【实战】
本次的.HarmonyOS Next ,ArkTS语言,HarmonyOS的元服务和DevEco Studio 开发工具,为开发者提供了构建现代化、轻量化、高性能应用的便捷方式。这些技术和工具将帮助开发者更好地适应未来的智能设备和服务提供方式。
36 8
基于HarmonyOS 5.0(NEXT)与SpringCloud架构的跨平台应用开发与服务集成研究【实战】
|
19天前
|
物联网 调度 vr&ar
鸿蒙HarmonyOS应用开发 |鸿蒙技术分享HarmonyOS Next 深度解析:分布式能力与跨设备协作实战
鸿蒙技术分享:HarmonyOS Next 深度解析 随着万物互联时代的到来,华为发布的 HarmonyOS Next 在技术架构和生态体验上实现了重大升级。本文从技术架构、生态优势和开发实践三方面深入探讨其特点,并通过跨设备笔记应用实战案例,展示其强大的分布式能力和多设备协作功能。核心亮点包括新一代微内核架构、统一开发语言 ArkTS 和多模态交互支持。开发者可借助 DevEco Studio 4.0 快速上手,体验高效、灵活的开发过程。 239个字符
197 13
鸿蒙HarmonyOS应用开发 |鸿蒙技术分享HarmonyOS Next 深度解析:分布式能力与跨设备协作实战
|
16天前
|
存储 SpringCloudAlibaba Java
【SpringCloud Alibaba系列】一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论
一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论。
【SpringCloud Alibaba系列】一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论
|
7天前
|
Java 关系型数据库 数据库
微服务SpringCloud分布式事务之Seata
SpringCloud+SpringCloudAlibaba的Seata实现分布式事务,步骤超详细,附带视频教程
25 1
|
26天前
|
NoSQL Java Redis
秒杀抢购场景下实战JVM级别锁与分布式锁
在电商系统中,秒杀抢购活动是一种常见的营销手段。它通过设定极低的价格和有限的商品数量,吸引大量用户在特定时间点抢购,从而迅速增加销量、提升品牌曝光度和用户活跃度。然而,这种活动也对系统的性能和稳定性提出了极高的要求。特别是在秒杀开始的瞬间,系统需要处理海量的并发请求,同时确保数据的准确性和一致性。 为了解决这些问题,系统开发者们引入了锁机制。锁机制是一种用于控制对共享资源的并发访问的技术,它能够确保在同一时间只有一个进程或线程能够操作某个资源,从而避免数据不一致或冲突。在秒杀抢购场景下,锁机制显得尤为重要,它能够保证商品库存的扣减操作是原子性的,避免出现超卖或数据不一致的情况。
53 10
|
2月前
|
JSON Java 测试技术
SpringCloud2023实战之接口服务测试工具SpringBootTest
SpringBootTest同时集成了JUnit Jupiter、AssertJ、Hamcrest测试辅助库,使得更容易编写但愿测试代码。
72 3
|
3月前
|
人工智能 文字识别 Java
SpringCloud+Python 混合微服务,如何打造AI分布式业务应用的技术底层?
尼恩,一位拥有20年架构经验的老架构师,通过其深厚的架构功力,成功指导了一位9年经验的网易工程师转型为大模型架构师,薪资逆涨50%,年薪近80W。尼恩的指导不仅帮助这位工程师在一年内成为大模型架构师,还让他管理起了10人团队,产品成功应用于多家大中型企业。尼恩因此决定编写《LLM大模型学习圣经》系列,帮助更多人掌握大模型架构,实现职业跃迁。该系列包括《从0到1吃透Transformer技术底座》、《从0到1精通RAG架构》等,旨在系统化、体系化地讲解大模型技术,助力读者实现“offer直提”。此外,尼恩还分享了多个技术圣经,如《NIO圣经》、《Docker圣经》等,帮助读者深入理解核心技术。
SpringCloud+Python 混合微服务,如何打造AI分布式业务应用的技术底层?
|
3月前
|
NoSQL Java Redis
开发实战:使用Redisson实现分布式延时消息,订单30分钟关闭的另外一种实现!
本文详细介绍了 Redisson 延迟队列(DelayedQueue)的实现原理,包括基本使用、内部数据结构、基本流程、发送和获取延时消息以及初始化延时队列等内容。文章通过代码示例和流程图,逐步解析了延迟消息的发送、接收及处理机制,帮助读者深入了解 Redisson 延迟队列的工作原理。
|
3月前
|
NoSQL Java Redis
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
Redis分布式锁在高并发场景下是重要的技术手段,但其实现过程中常遇到五大深坑:**原子性问题**、**连接耗尽问题**、**锁过期问题**、**锁失效问题**以及**锁分段问题**。这些问题不仅影响系统的稳定性和性能,还可能导致数据不一致。尼恩在实际项目中总结了这些坑,并提供了详细的解决方案,包括使用Lua脚本保证原子性、设置合理的锁过期时间和使用看门狗机制、以及通过锁分段提升性能。这些经验和技巧对面试和实际开发都有很大帮助,值得深入学习和实践。
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
下一篇
开通oss服务