【ZooKeeper】④ ZooKeeper 实际应用(服务器的动态感知)

本文涉及的产品
注册配置 MSE Nacos/ZooKeeper,118元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
云原生网关 MSE Higress,422元/月
简介: 使用 ZooKeeper 实现服务器的动态感知,动态获取应用服务(秒杀服务)的上线、宕机情况,并能够让客户端(如 Java 客户端)知道。① 每个集群服务器的秒杀服务都需要连接 ZooKeeper 客户端。当上线秒杀服务的时候,往 ZooKeeper 的指定目录创建临时顺序节点,并写入当前秒杀服务器的信息(ip 地址、端口等)为什么是【临时顺序节点】?临时节点的特点:当会话断开的时候,临时节点会自动被移除(可实现服务器宕机的时候自动移除服务器在 ZooKeeper 注册中心的信息)顺序节点的特点:顺序节点的节点名字可以一样,ZooKeeper 会自动在节点的名字后面加上顺序号②

一、服务器的动态感知

(1) 描述过程

使用 ZooKeeper 实现服务器的动态感知,动态获取应用服务(秒杀服务)的上线、宕机情况,并能够让客户端(如 Java 客户端)知道。

① 每个集群服务器的秒杀服务都需要连接 ZooKeeper 客户端。当上线秒杀服务的时候,往 ZooKeeper 的指定目录创建临时顺序节点,并写入当前秒杀服务器的信息(ip 地址、端口等)

为什么是【临时顺序节点】?

  • 临时节点的特点:当会话断开的时候,临时节点会自动被移除(可实现服务器宕机的时候自动移除服务器在 ZooKeeper 注册中心的信息)
  • 顺序节点的特点:顺序节点的节点名字可以一样,ZooKeeper 会自动在节点的名字后面加上顺序号

② Java 客户端连接 ZooKeeper 服务器。获取 ZooKeeper 指定目录(秒杀服务目录)的子节点列表信息(秒杀服务的地址列表信息),并注册永久的节点改变事件(是秒杀服务上线和宕机的时候可以通知 Java 客户端的必须条件)
在这里插入图片描述

(2) 秒杀服务端代码

在这里插入图片描述

@Configuration
public class ZooKeeperClient {
    private static final String CONNECTING_STRING = "192.168.80.129:2888:3888,192.168.80.129:2888:3888,192.168.80.129:2889:3889";
    private static final int SESSION_TIMEOUT = 30000;

    private static final String BASE_PATH = "/server";
    private static final String SUB_PATH = "/seckillServer";

    @Value("${server.host}")
    private String seckillHost;

    @Value("${server.port}")
    private String seckillPort;

    private ZooKeeper zooKeeper;

    /**
     * 连接 ZooKeeper
     */
    @Bean
    public ZooKeeper zooKeeper() throws Exception {
        zooKeeper = new ZooKeeper(CONNECTING_STRING, SESSION_TIMEOUT, new Watcher() {
            public void process(WatchedEvent event) {
                if (event.getState() == Event.KeeperState.SyncConnected) {
                    System.out.println("connect zookeeper successfully");
                    String s = writeSeckillServerAddrInfo(seckillHost + ":" + seckillPort);
                    System.out.println(seckillHost + ":" + seckillPort);
                    System.out.println("创建节点: " + s);
                }
            }
        });
        return zooKeeper;
    }

    /**
     * 往 ZooKeeper 的指定目录创建子节点, 在子节点中存储当前秒杀服务器的地址信息
     */
    private String writeSeckillServerAddrInfo(String addrInfo) {
        try {
            String nodePath = zooKeeper.create(BASE_PATH + SUB_PATH,
                    addrInfo.getBytes(),
                    ZooDefs.Ids.OPEN_ACL_UNSAFE,
                    CreateMode.EPHEMERAL_SEQUENTIAL);
            return "success_" + nodePath;
        } catch (Exception e) {
            e.printStackTrace();
            return "error";
        }
    }

}
# 应用名称
spring.application.name=seckill-server
# 应用服务 WEB 访问端口
server.port=8080
server.host=111.111.111.111

(3) 客户端代码

/**
 * Java 客户端连接上 ZooKeeper
 */
@Configuration
public class ZooKeeperClient {
    private static final String CONNECTING_STRING = "192.168.80.129:2888:3888,192.168.80.129:2888:3888,192.168.80.129:2889:3889";
    private static final int SESSION_TIMEOUT = 30000;

    private static final String BASE_PATH = "/server";
    private static final String SUB_PATH = "/seckillServer";

    private ZooKeeper zooKeeper;

    /**
     * 连接 ZooKeeper
     */
    @Bean
    public ZooKeeper zooKeeper() throws Exception {
        zooKeeper = new ZooKeeper(CONNECTING_STRING, SESSION_TIMEOUT, new Watcher() {
            public void process(WatchedEvent event) {
                if (event.getState() == Event.KeeperState.SyncConnected) {
                    System.out.println("connect zookeeper successfully");
                    List<String> addrInfo = getChildNodeAddrInfo();
                    if (addrInfo != null) {
                        for (String info : addrInfo) {
                            System.out.println("地址信息: " + info);
                        }
                    }
                }
            }
        });
        return zooKeeper;
    }

    /**
     * 获取子节点列表
     */
    private List<String> getChildNodeList() {
        try {
            Stat stat = zooKeeper.exists(BASE_PATH, false);
            if (stat == null) throw new IllegalArgumentException("路径节点不存在");
            return zooKeeper.getChildren(BASE_PATH, false);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    /**
     * 获取子节点列表上的地址信息
     */
    private List<String> getChildNodeAddrInfo() {
        try {
            ArrayList<String> childNodeData = new ArrayList<>();
            Stat stat = zooKeeper.exists(BASE_PATH, false);
            if (stat == null) throw new IllegalArgumentException("路径节点不存在");
            List<String> childNodeList = getChildNodeList();
            if (childNodeList == null) return null;

            for (String childPath : childNodeList) {
                byte[] data = getNodeData(BASE_PATH + "/" + childPath);
                if (data != null) {
                    childNodeData.add(new String(data));
                }
            }
            addWatch();
            return childNodeData;
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    /**
     * 获取节点数据
     */
    private byte[] getNodeData(String path) {
        try {
            Stat stat = zooKeeper.exists(path, false);
            if (stat == null) throw new IllegalArgumentException("路径节点不存在");
            return zooKeeper.getData(path, false, stat);
        } catch (Exception e) {
            e.printStackTrace();
            return null;
        }
    }

    /**
     * 注册节点改变事件
     */
    private void addWatch() {
        try {
            zooKeeper.addWatch(BASE_PATH, new Watcher() {
                @Override
                public void process(WatchedEvent event) {
                    List<String> childNodeAddrInfo = getChildNodeAddrInfo();
                    if (childNodeAddrInfo != null) {
                        for (String info : childNodeAddrInfo) {
                            System.out.println(info);
                        }
                    }
                }
            }, AddWatchMode.PERSISTENT);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}
# 应用名称
spring.application.name=java-client
# 应用服务 WEB 访问端口
server.port=8123

二、分布式锁

(1) 什么是锁?

多个任务同时执行的时候(多线程、多进程),① 若任务是需要对同一资源进行操作,② 对于资源的访问,不能多个任务同时执行(同一时间只能一个任务访问资源)。

跨 JVM 的分布式程序之间进行数据共享需要使用分布式锁。

(2) 锁的基本概念

① 竞争锁:任务通过竞争获取到锁后才能对资源进行操作
公平竞争:按照一定的顺序,先来先获得锁
非公平竞争:没有顺序,谁抢到誰获得锁

② 占有锁:获得锁,有任务正在对资源进行更新操作

③ 任务阻塞:A、B、C 三个进程。A 占有锁的时候,B 和 C 只能等待,此时是阻塞状态

④ 释放锁:对资源的访问结束后,要释放锁

(3) 非分布式环境下的锁的使用

创建订单问题(多线程)

public class OrderService {

    // 多个线程会对 count 的值进行改变
    private int count = 0;

    public synchronized String createOrderId() {
        try {
            TimeUnit.MICROSECONDS.sleep(666);
        } catch (Exception e) {
            e.printStackTrace();
        }
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
        count++;
        return sdf.format(new Date()) + "-" + count;
    }
}
public class OrderController implements Runnable {

    private static OrderService orderService = new OrderService();
    private static Set<String> orderIds = new HashSet<>();

    // 用于结束进程
    private static CountDownLatch countDownLatch = new CountDownLatch(50);

    public static void main(String[] args) throws Exception {
        OrderController orderController = new OrderController();
        for (int i = 0; i < 50; i++) { // 创建 50 个线程
            new Thread(orderController).start();
        }
        // 所有线程执行完之后才执行后面的代码
        countDownLatch.await();
        System.out.println(orderIds.size());
        for (String orderId : orderIds) {
            System.out.println(orderId);
        }
    }

    @Override
    public void run() {
        orderIds.add(orderService.createOrderId());
        // 执行完一个线程后减 1
        countDownLatch.countDown();
    }
}

(4) 分布式锁的实现

① 创建同名节点

a. 代码和画图

@RestController
@RequestMapping("/orders")
public class OrderController {
    @Autowired
    private ZooKeeper zooKeeper;

    // 用于发送网络请求
    private RestTemplate networkReq = new RestTemplate();

    private static final String BASE_PATH = "/locks";
    private static final String SUB_PATH = "/lock";
    private static final String FULL_PATH = BASE_PATH + SUB_PATH;

    @Value("${server.port}")
    private String port;

    @GetMapping("/createOrder")
    public String createOrder() {
        if (tryGetLock()) {
            String orderId = networkReq.getForObject(
                    "http://localhost:8080/orderIds/getId",
                    String.class);
            System.out.println("【" + port + "】orderId = " + orderId);

            releaseLock();
        } else {
            waitLock();
        }
        return port + " createOrder";
    }

    /**
     * 尝试获取锁
     * <p>
     * 往指定目录创建同名节点 (locks)
     * 创建成功获得锁资源
     */
    private boolean tryGetLock() {
        try {
            String path;
            path = zooKeeper.create(FULL_PATH,
                    "同名节点".getBytes(),
                    ZooDefs.Ids.OPEN_ACL_UNSAFE,
                    CreateMode.PERSISTENT);
            return !"".equals(path);
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 释放锁资源
     */
    private void releaseLock() {
        try {
            Stat stat = zooKeeper.exists(FULL_PATH, false);
            zooKeeper.delete(FULL_PATH, stat.getVersion());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 等待锁
     * 绑定 /server 节点的子节点改变事件
     */
    private void waitLock() {
        try {
            zooKeeper.getChildren(BASE_PATH, event -> {
                if (event.getType() == Watcher.Event.EventType.NodeChildrenChanged) {
                    System.out.println("【waitLock】子节点改变事件");
                    createOrder(); // 再次执行业务
                }
            });
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
@Configuration
public class ZooKeeperClient {
    private static final String CONNECTING_STRING =
            "192.168.80.129:2888:3888,192.168.80.129:2888:3888,192.168.80.129:2889:3889";
    private static final int SESSION_TIMEOUT = 30000;

    @Bean
    public ZooKeeper zooKeeper() throws IOException {
        return new ZooKeeper(CONNECTING_STRING,
                SESSION_TIMEOUT,
                event -> {
                    if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
                        System.out.println("【连接 ZooKeeper】成功");
                    }
                }
        );
    }

}
@RestController
@RequestMapping("/orderIds")
public class OrderIdController {
    private int count = 0;

    @GetMapping("/getId")
    public String getId() {
        String id = null;
        try {
            TimeUnit.MILLISECONDS.sleep(50);
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
            count++;
            id = sdf.format(new Date()) + "-" + count;
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return id;
    }
}

在这里插入图片描述
在这里插入图片描述
性能差、且不是公平的。

b. 异常处理(错误处理)

在这里插入图片描述

String orderId = networkReq.getForObject(
                "http://localhost:8123/orderIds/getId",
                 String.class);

上面的请求路径中必须要加 http

② 创建临时顺序节点

在这里插入图片描述
在这里插入图片描述

/**
 * 创建临时顺序节点
 */
@RestController
@RequestMapping("/orders2")
public class OrderController2 {
    @Autowired
    private ZooKeeper zooKeeper;

    // 用于发送网络请求
    private RestTemplate networkReq = new RestTemplate();

    private static final String BASE_PATH = "/locks2";
    private static final String SUB_PATH = "/lock";
    private static final String FULL_PATH = BASE_PATH + SUB_PATH;

    @Value("${server.port}")
    private String port;

    @GetMapping("/createOrder")
    public String createOrder() {
        try {
            // 所有要获取锁资源的服务器都在 locks2 节点下创建 lock 临时顺序
            // curPath: /locks2/lock000000
            String curPath = zooKeeper.create(FULL_PATH,
                    null,
                    ZooDefs.Ids.OPEN_ACL_UNSAFE,
                    CreateMode.EPHEMERAL_SEQUENTIAL);
            // 【lock000001】
            String curPathTail = curPath.substring(curPath.lastIndexOf("/") + 1);
            // 尝试获取锁
            if (tryGetLock(curPathTail)) {
                reqCreateOrderId();
                releaseLock(curPathTail);
            } else {
                waitLock(curPath);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        return port + " createOrder";
    }

    /**
     * 发请求执行业务方法
     */
    private void reqCreateOrderId() {
        String orderId = networkReq.getForObject(
                "http://localhost:8080/orderIds/getId",
                String.class);
        System.out.println("【" + port + "】orderId = " + orderId);
    }

    /**
     * 尝试获取锁
     * <p>
     * 获取 locks 节点下的所有子节点
     * 子节点序号最小的获得锁资源
     */
    private boolean tryGetLock(String curPath) {
        try {
            List<String> childNodes = zooKeeper.getChildren(BASE_PATH, false);
            if (childNodes == null) return false;
            // 对子节点进行升序排序
            Collections.sort(childNodes);
            // 判断当前节点是否是所有节点列表中最小的节点
            return curPath.equals(childNodes.get(0));
            // return StringUtils.pathEquals(curPath, childNodes.get(0));
        } catch (Exception e) {
            e.printStackTrace();
            return false;
        }
    }

    /**
     * 释放锁资源
     */
    private void releaseLock(String curPath) {
        try {
            Stat stat = zooKeeper.exists(BASE_PATH + "/" + curPath, false);
            if (stat == null) throw new IllegalArgumentException("curPath 路径(节点)不存在");
            zooKeeper.delete(BASE_PATH + "/" + curPath, stat.getVersion());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 等待锁
     * 绑定前一个节点的删除事件
     */
    private void waitLock(String curPath) {
        try {
            // 获取子节点列表
            List<String> children = zooKeeper.getChildren(BASE_PATH, false);
            Collections.sort(children);
            int curPathIdx = children.indexOf(curPath); // 当前节点的索引
            if (curPathIdx > 0) {
                String prePath = children.get(curPathIdx - 1);
                zooKeeper.getData(BASE_PATH + "/" + prePath, event -> {
                    if (event.getType() == Watcher.Event.EventType.NodeDeleted) {
                        System.out.println("【waitLock】前一个节点删除事件");
                        reqCreateOrderId();
                        releaseLock(curPath);
                    }
                }, new Stat());
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
相关文章
|
2月前
|
存储 缓存 前端开发
如何优化 SSR 应用以减少服务器压力
优化SSR应用以减少服务器压力,可采用代码分割、缓存策略、数据预加载、服务端性能优化、使用CDN、SSR与SSG结合、限制并发请求、SSR与CSR平滑切换、优化前端资源及利用框架特性等策略。这些方法能有效提升性能和稳定性,同时保证用户体验。
|
3月前
|
机器学习/深度学习 人工智能 运维
企业内训|LLM大模型在服务器和IT网络运维中的应用-某日企IT运维部门
本课程是为某在华日资企业集团的IT运维部门专门定制开发的企业培训课程,本课程旨在深入探讨大型语言模型(LLM)在服务器及IT网络运维中的应用,结合当前技术趋势与行业需求,帮助学员掌握LLM如何为运维工作赋能。通过系统的理论讲解与实践操作,学员将了解LLM的基本知识、模型架构及其在实际运维场景中的应用,如日志分析、故障诊断、网络安全与性能优化等。
103 2
|
2月前
|
弹性计算 开发工具 git
2分钟在阿里云ECS控制台部署个人应用(图文示例)
作为一名程序员,我在部署托管于Github/Gitee的代码到阿里云ECS服务器时,经常遇到繁琐的手动配置问题。近期,阿里云ECS控制台推出了一键构建部署功能,简化了这一过程,支持Gitee和GitHub仓库,自动处理git、docker等安装配置,无需手动登录服务器执行命令,大大提升了部署效率。本文将详细介绍该功能的使用方法和适用场景。
2分钟在阿里云ECS控制台部署个人应用(图文示例)
|
1月前
|
开发框架 .NET PHP
网站应用项目如何选择阿里云服务器实例规格+内存+CPU+带宽+操作系统等配置
对于使用阿里云服务器的搭建网站的用户来说,面对众多可选的实例规格和配置选项,我们应该如何做出最佳选择,以最大化业务效益并控制成本,成为大家比较关注的问题,如果实例、内存、CPU、带宽等配置选择不合适,可能会影响到自己业务在云服务器上的计算性能及后期运营状况,本文将详细解析企业在搭建网站应用项目时选购阿里云服务器应考虑的一些因素,以供参考。
|
2月前
|
安全 开发工具 Swift
Swift 是苹果公司开发的现代编程语言,具备高效、安全、简洁的特点,支持类型推断、闭包、泛型等特性,广泛应用于苹果各平台及服务器端开发
Swift 是苹果公司开发的现代编程语言,具备高效、安全、简洁的特点,支持类型推断、闭包、泛型等特性,广泛应用于苹果各平台及服务器端开发。基础语法涵盖变量、常量、数据类型、运算符、控制流等,高级特性包括函数、闭包、类、结构体、协议和泛型。
36 2
|
3月前
|
Dubbo 应用服务中间件 Apache
Dubbo 应用切换 ZooKeeper 注册中心实例,流量无损迁移
如果 Dubbo 应用使用 ZooKeeper 作为注册中心,现在需要切换到新的 ZooKeeper 实例,如何做到流量无损?
40 4
|
2月前
|
存储 安全 关系型数据库
Linux系统在服务器领域的应用与优势###
本文深入探讨了Linux操作系统在服务器领域的广泛应用及其显著优势。通过分析其开源性、安全性、稳定性和高效性,揭示了为何Linux成为众多企业和开发者的首选服务器操作系统。文章还列举了Linux在服务器管理、性能优化和社区支持等方面的具体优势,为读者提供了全面而深入的理解。 ###
|
3月前
|
存储 缓存 前端开发
如何优化 SSR 应用以减少服务器压力?
如何优化 SSR 应用以减少服务器压力?
|
7天前
|
机器学习/深度学习 人工智能 PyTorch
阿里云GPU云服务器怎么样?产品优势、应用场景介绍与最新活动价格参考
阿里云GPU云服务器怎么样?阿里云GPU结合了GPU计算力与CPU计算力,主要应用于于深度学习、科学计算、图形可视化、视频处理多种应用场景,本文为您详细介绍阿里云GPU云服务器产品优势、应用场景以及最新活动价格。
阿里云GPU云服务器怎么样?产品优势、应用场景介绍与最新活动价格参考
|
6天前
|
存储 运维 安全
阿里云弹性裸金属服务器是什么?产品规格及适用场景介绍
阿里云服务器ECS包括众多产品,其中弹性裸金属服务器(ECS Bare Metal Server)是一种可弹性伸缩的高性能计算服务,计算性能与传统物理机无差别,具有安全物理隔离的特点。分钟级的交付周期将提供给您实时的业务响应能力,助力您的核心业务飞速成长。本文为大家详细介绍弹性裸金属服务器的特点、优势以及与云服务器的对比等内容。

热门文章

最新文章