利用Redis实现集群或开发环境下SnowFlake自动配置机器号

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
简介:

利用Redis实现集群或开发环境下SnowFlake自动配置机器号
前言:
SnowFlake 雪花ID 算法是推特公司推出的著名分布式ID生成算法。利用预先分配好的机器ID,工作区ID,机器时间可以生成全局唯一的随时间趋势递增的Long类型ID.长度在17-19位。随着时间的增长而递增,在MySQL数据库中,InnoDB存储引擎可以更快的插入递增的主键。而不像UUID那样因为写入是乱序的,InnoDB不得不频繁的做页分裂操作,耗时且容易产生碎片。

对于SnowFlake 的原理介绍,可以参考该文章:理解分布式id生成算法SnowFlake

理解了雪花的基本原理之后,我们试想:在分布式集群或者开发环境下,不同服务之间/相同服务的不同机器之间应该如何产生差异呢?有以下几种方案:

通过在 yml 文件中配置不同的参数,启动 spring 容器时通过读取该参数来实现不同服务与不同机器的workerId不同。但是这里不方便新增机器/新同事的自动化配置
向第三方应用如zookeeper、Redis中注册ID,以获得唯一的ID。
对于开发环境,可以取机器的IP后三位。因为大家在一个办公室的话IP后三位肯定是0-255之前不重复。但是这样机器ID需要8个Bit,留给数据中心的位数就只有4个了。
本方案结合了以上方案的优点,按照业务的实际情况对雪花中的数据中心和机器ID所占的位数进行调整:数据中心占4Bit,范围从0-15。机器ID占6Bit,范围从0-63
。对不同的服务在yml中配置服务名称,以服务编号作为数据中心ID。如果按照开发+测试+生产环境区分的话,可以部署5个不同的服务。application.yml 中配置如下的参数

分布式雪花ID不同机器ID自动化配置

snowFlake:
dataCenter: 1 # 数据中心的id
appName: test # 业务类型名称
而机器ID采用以下的策略实现:

获取当前机器的IP地址 localIp,模32,获得0-31的整数 machineId
向Redis中注册,使用 appName + dataCenter + machineId 作为key ,以本机IP localIp 作为 value。
注册成功后,设置键过期时间 24 h,并开启一个计时器,在 23h 后更新注册的 key
如果注册失败,可能有以下两个原因:
上次服务异常中断,没有来得及删除key。这里的解决方案是通过key获取value,如果value和localIp一致,则仍然视为注册成功
IP和别人的IP模32的结果一样,导致机器ID冲突。这是就遍历 0-31 获取其中为注册的数字作为本机的机器号
如果不幸Redis连接失败,系统将从32-63之间随机获取ID,并使用 log.error() 打印醒目的提示消息这里建议IDEA + Grep Console 实现不同级别的日志不同前景色显示,方便及时获取错误信息
当服务停止前,向Redis发送请求,删除该Key的占用。
具体的代码如下:
自动配置机器ID,并在容器启动时放入SnowFlake实例对象
package cn.keats.util;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;

import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Date;
import java.util.Timer;
import java.util.TimerTask;

@Configuration
@Slf4j
public class MachineIdConfig {

@Resource
private JedisPool jedisPool;

@Value("${snowFlake.dataCenter}")
private Integer dataCenterId;

@Value("${snowFlake.appName}")
private String APP_NAME;

/**
 * 机器id
 */
public static Integer machineId;
/**
 * 本地ip地址
 */
private static String localIp;

/**
 * 获取ip地址
 *
 * @return
 * @throws UnknownHostException
 */
private String getIPAddress() throws UnknownHostException {
    InetAddress address = InetAddress.getLocalHost();
    return address.getHostAddress();
}

/**
 * hash机器IP初始化一个机器ID
 */
@Bean
public SnowFlake initMachineId() throws Exception {
    localIp = getIPAddress(); // 192.168.0.233

    Long ip_ = Long.parseLong(localIp.replaceAll("\\.", ""));// 1921680233
    //
    machineId = ip_.hashCode() % 32;// 0-31
    // 创建一个机器ID
    createMachineId();

    log.info("初始化 machine_id :{}", machineId);
    return new SnowFlake(machineId, dataCenterId);
}

/**
 * 容器销毁前清除注册记录
 */
@PreDestroy
public void destroyMachineId() {
    try (Jedis jedis = jedisPool.getResource()) {
        jedis.del(APP_NAME + dataCenterId + machineId);
    }
}
/**
 * 主方法:首先获取机器 IP 并 % 32 得到 0-31
 * 使用 业务名 + 组名 + IP 作为 Redis 的 key,机器IP作为 value,存储到Redis中
 *
 * @return
 */
public Integer createMachineId() {
    try {
        // 向redis注册,并设置超时时间
        log.info("注册一个机器ID到Redis " + machineId + " IP:" + localIp);
        Boolean flag = registerMachine(machineId, localIp);
        // 注册成功
        if (flag) {
            // 启动一个线程更新超时时间
            updateExpTimeThread();
            // 返回机器Id
            log.info("Redis中端口没有冲突 " + machineId + " IP:" + localIp);
            return machineId;
        }
        // 注册失败,可能原因 Hash%32 的结果冲突
        if (!checkIfCanRegister()) {
            // 如果 0-31 已经用完,使用 32-64之间随机的ID
            getRandomMachineId();
            createMachineId();
        } else {
            // 如果存在剩余的ID
            log.warn("Redis中端口冲突了,使用 0-31 之间未占用的Id " + machineId + " IP:" + localIp);
            createMachineId();
        }
    } catch (Exception e) {
        // 获取 32 - 63 之间的随机Id
        // 返回机器Id
        log.error("Redis连接异常,不能正确注册雪花机器号 " + machineId + " IP:" + localIp, e);
        log.warn("使用临时方案,获取 32 - 63 之间的随机数作为机器号,请及时检查Redis连接");
        getRandomMachineId();
        return machineId;
    }
    return machineId;
}

/**
 * 检查是否被注册满了
 *
 * @return
 */
private Boolean checkIfCanRegister() {
    // 判断0~31这个区间段的机器IP是否被占满
    try (Jedis jedis = jedisPool.getResource()) {
        Boolean flag = true;
        for (int i = 0; i < 32; i++) {
            flag = jedis.exists(APP_NAME + dataCenterId + i);
            // 如果不存在。设置机器Id为这个不存在的数字
            if (!flag) {
                machineId = i;
                break;
            }
        }
        return !flag;
    }
}

/**
 * 1.更新超時時間
 * 注意,更新前检查是否存在机器ip占用情况
 */
private void updateExpTimeThread() {
    // 开启一个线程执行定时任务:
    // 每23小时更新一次超时时间
    new Timer(localIp).schedule(new TimerTask() {
        @Override
        public void run() {
            // 检查缓存中的ip与本机ip是否一致, 一致则更新时间,不一致则重新获取一个机器id
            Boolean b = checkIsLocalIp(String.valueOf(machineId));
            if (b) {
                log.info("IP一致,更新超时时间 ip:{},machineId:{}, time:{}", localIp, machineId, new Date());
                try (Jedis jedis = jedisPool.getResource()) {
                    jedis.expire(APP_NAME + dataCenterId + machineId, 60 * 60 * 24 );
                }
            } else {
                // IP冲突
                log.info("重新生成机器ID ip:{},machineId:{}, time:{}", localIp, machineId, new Date());
                // 重新生成机器ID,并且更改雪花中的机器ID
                getRandomMachineId();
                // 重新生成并注册机器id
                createMachineId();
                // 更改雪花中的机器ID
                SnowFlake.setWorkerId(machineId);
                // 结束当前任务
                log.info("Timer->thread->name:{}", Thread.currentThread().getName());
                this.cancel();
            }
        }
    }, 10 * 1000, 1000 * 60 * 60 * 23);
}

/**
 * 获取32-63随机数
 */
public void getRandomMachineId() {
    machineId = (int) (Math.random() * 31) + 31;
}
/**
 * 检查Redis中对应Key的Value是否是本机IP
 *
 * @param mechineId
 * @return
 */
private Boolean checkIsLocalIp(String mechineId) {
    try (Jedis jedis = jedisPool.getResource()) {
        String ip = jedis.get(APP_NAME + dataCenterId + mechineId);
        log.info("checkIsLocalIp->ip:{}", ip);
        return localIp.equals(ip);
    }
}

/**
 * 1.注册机器
 * 2.设置超时时间
 *
 * @param machineId 取值为0~31
 * @return
 */
private Boolean registerMachine(Integer machineId, String localIp) throws Exception {
    // try with resources 写法,出异常会释放括号内的资源 Java7特性
    try (Jedis jedis = jedisPool.getResource()) {
        // key 业务号 + 数据中心ID + 机器ID value 机器IP
        Long result = jedis.setnx(APP_NAME + dataCenterId + machineId, localIp);
        if(result == 1){
            // 过期时间 1 天
            jedis.expire(APP_NAME + dataCenterId + machineId, 60 * 60 * 24);
            return true;
        } else {
            // 如果Key存在,判断Value和当前IP是否一致,一致则返回True
            String value = jedis.get(APP_NAME + dataCenterId + machineId);
            if(localIp.equals(value)){
                // IP一致,注册机器ID成功
                jedis.expire(APP_NAME + dataCenterId + machineId, 60 * 60 * 24);
                return true;
            }
            return false;
        }
    }
}

}
雪花ID:
import org.springframework.context.annotation.Configuration;

/**

  • 功能:分布式ID生成工具类
    *

*/
@Configuration
public class SnowFlake {

/**
 * 开始时间截 (2019-09-08) 服务一旦运行过之后不能修改。会导致ID生成重复
 */
private final long twepoch = 1567872000000L;

/**
 * 机器Id所占的位数 0 - 64
 */
private final long workerIdBits = 6L;

/**
 * 工作组Id所占的位数 0 - 16
 */
private final long dataCenterIdBits = 4L;

/**
 * 支持的最大机器id,结果是63 (这个移位算法可以很快的计算出几位二进制数所能表示的最大十进制数)
 */
private final long maxWorkerId = -1L ^ (-1L << workerIdBits);

/**
 * 支持的最大数据标识id,结果是15
 */
private final long maxDatacenterId = -1L ^ (-1L << dataCenterIdBits);

/**
 * 序列在id中占的位数
 */
private final long sequenceBits = 12L;

/**
 * 机器ID向左移12位
 */
private final long workerIdShift = sequenceBits;

/**
 * 数据标识id向左移17位(12+5)
 */
private final long datacenterIdShift = sequenceBits + workerIdBits;

/**
 * 时间截向左移22位(5+5+12)
 */
private final long timestampLeftShift = sequenceBits + workerIdBits + dataCenterIdBits;

/**
 * 生成序列的掩码,这里为4095 (0b111111111111=0xfff=4095)
 */
private final long sequenceMask = -1L ^ (-1L << sequenceBits);

/**
 * 工作机器ID(0~63)
 */
private static long workerId;

/**
 * 数据中心ID(0~16)
 */
private long datacenterId;

/**
 * 毫秒内序列(0~4095)
 */
private long sequence = 0L;

/**
 * 上次生成ID的时间截
 */
private long lastTimestamp = -1L;

//==============================Constructors=====================================

/**
 * 构造函数
 *
 * @param workerId     工作ID (0~63)
 * @param datacenterId 数据中心ID (0~15)
 */
public SnowFlake(long workerId, long datacenterId) {
    if (workerId > maxWorkerId || workerId < 0) {
        throw new IllegalArgumentException(String.format("机器ID必须小于 %d 且大于 0", maxWorkerId));
    }
    if (datacenterId > maxDatacenterId || datacenterId < 0) {
        throw new IllegalArgumentException(String.format("工作组ID必须小于 %d 且大于 0", maxDatacenterId));
    }
    this.workerId = workerId;
    this.datacenterId = datacenterId;
}

/**
 * 构造函数
 *
 */
public SnowFlake() {
    this.workerId = 0;
    this.datacenterId = 0;
}

/**
 * 获得下一个ID (该方法是线程安全的)
 *
 * @return SnowFlakeId
 */
public synchronized long nextId() {
    long timestamp = timeGen();

    //如果当前时间小于上一次ID生成的时间戳,说明系统时钟回退过这个时候应当抛出异常
    if (timestamp < lastTimestamp) {
        throw new RuntimeException(
                String.format("Clock moved backwards.  Refusing to generate id for %d milliseconds", lastTimestamp - timestamp));
    }

    // 如果是同一时间生成的,则进行毫秒内序列
    if (lastTimestamp == timestamp) {
        sequence = (sequence + 1) & sequenceMask;
        // 毫秒内序列溢出
        if (sequence == 0) {
            // 阻塞到下一个毫秒,获得新的时间戳
            timestamp = tilNextMillis(lastTimestamp);
        }
    }
    //时间戳改变,毫秒内序列重置
    else {
        sequence = 0L;
    }

    // 上次生成ID的时间截
    lastTimestamp = timestamp;

    // 移位并通过或运算拼到一起组成64位的ID
    return ((timestamp - twepoch) << timestampLeftShift) //
            | (datacenterId << datacenterIdShift) //
            | (workerId << workerIdShift) //
            | sequence;
}

/**
 * 阻塞到下一个毫秒,直到获得新的时间戳
 *
 * @param lastTimestamp 上次生成ID的时间截
 * @return 当前时间戳
 */
protected long tilNextMillis(long lastTimestamp) {
    long timestamp = timeGen();
    while (timestamp <= lastTimestamp) {
        timestamp = timeGen();
    }
    return timestamp;
}

/**
 * 返回以毫秒为单位的当前时间
 *
 * @return 当前时间(毫秒)
 */
protected long timeGen() {
    return System.currentTimeMillis();
}

public long getWorkerId() {
    return workerId;
}

public static void setWorkerId(long workerId) {
    SnowFlake.workerId = workerId;
}

public long getDatacenterId() {
    return datacenterId;
}

public void setDatacenterId(long datacenterId) {
    this.datacenterId = datacenterId;
}

}
Redis 配置
public class RedisConfig {

@Value("${spring.redis.host}")
private String host;

@Value("${spring.redis.port:6379}")
private Integer port;

@Value("${spring.redis.password:-1}")
private String password;

@Bean
public JedisPool jedisPool() {
    // 1.设置连接池的配置对象
    JedisPoolConfig config = new JedisPoolConfig();
    // 设置池中最大连接数
    config.setMaxTotal(50);
    // 设置空闲时池中保有的最大连接数
    config.setMaxIdle(10);
    config.setMaxWaitMillis(3000L);
    config.setTestOnBorrow(true);
    log.info(password);
    // 2.设置连接池对象
    if("-1".equals(password)){
        log.info("Redis不通过密码连接");
        return new JedisPool(config, host, port,0);
    } else {
        log.info("Redis通过密码连接" + password);
        return new JedisPool(config, host, port,0, password);
    }
}

}
使用方法
项目中引入 Redis 、 Jedis 依赖
复制上面两个类到项目until包下
application.yml 配置服务名称,机器序号,Redis账号,密码
配置Jedis,使得项目启动时池中有Redis连接对象
启动项目
在需要生成ID的类中注入

@Autowired
private SnowFlake snowFlake;
// 生产ID
snowFlake.nextId(); 方法生产ID

原文地址https://www.cnblogs.com/keatsCoder/p/12129279.html

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
29天前
|
存储 NoSQL Redis
redis主从集群与分片集群的区别
主从集群通过主节点处理写操作并向从节点广播读操作,从节点处理读操作并复制主节点数据,优点在于提高读取性能、数据冗余及故障转移。分片集群则将数据分散存储于多节点,根据规则路由请求,优势在于横向扩展能力强,提升读写性能与存储容量,增强系统可用性和容错性。主从适用于简单场景,分片适合大规模高性能需求。
41 5
|
5月前
|
监控 NoSQL Redis
看完这篇就能弄懂Redis的集群的原理了
看完这篇就能弄懂Redis的集群的原理了
180 0
|
2月前
|
存储 SQL 关系型数据库
2024Mysql And Redis基础与进阶操作系列(1)作者——LJS[含MySQL的下载、安装、配置详解步骤及报错对应解决方法]
Mysql And Redis基础与进阶操作系列(1)之[MySQL的下载、安装、配置详解步骤及报错对应解决方法]
|
3月前
|
存储 NoSQL Redis
Redis 配置
10月更文挑战第14天
42 1
|
3月前
|
存储 缓存 NoSQL
大数据-46 Redis 持久化 RDB AOF 配置参数 混合模式 具体原理 触发方式 优点与缺点
大数据-46 Redis 持久化 RDB AOF 配置参数 混合模式 具体原理 触发方式 优点与缺点
74 1
|
3月前
|
消息中间件 NoSQL Kafka
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
205 0
|
3月前
|
NoSQL Ubuntu Linux
redis的基本安装配置启动使用
redis的基本安装配置启动使用
45 0
|
3月前
|
缓存 NoSQL 数据处理
原生php实现redis缓存配置和使用方法
通过上述步骤,你可以在PHP项目中配置并使用Redis作为高性能的缓存解决方案。合理利用Redis的各种数据结构和特性,可以有效提升应用的响应速度和数据处理效率。记得在实际应用中根据具体需求选择合适的缓存策略,如设置合理的过期时间,以避免内存过度消耗。
69 0
|
5月前
|
NoSQL Redis 容器
【Azure Cache for Redis】Redis的导出页面无法配置Storage SAS时通过az cli来完成
【Azure Cache for Redis】Redis的导出页面无法配置Storage SAS时通过az cli来完成