Zookeeper场景实践:(5)分布式通知/协调

本文涉及的产品
MSE Nacos/ZooKeeper 企业版试用,1600元额度,限量50份
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
任务调度 XXL-JOB 版免费试用,400 元额度,开发版规格
简介: 1.基本介绍 通知/协调机制通常有两种方式。 系统调度模式:操作人员发送通知实际是通过控制台改变某个节点的状态,然后Zookeeper将这些变化发送给注册了这个节点的Watcher的所有客户端。工作汇报模式:这个情况是每个工作进程都在某个目录下创建一个临时节点,并携带工作的进度数据。这样汇总的进程可以监控目录子节点的变化获得工作进度的实时的全局情况。 总的

1.基本介绍

通知/协调机制通常有两种方式。

  • 系统调度模式:操作人员发送通知实际是通过控制台改变某个节点的状态,然后Zookeeper将这些变化发送给注册了这个节点的Watcher的所有客户端。
  • 工作汇报模式:这个情况是每个工作进程都在某个目录下创建一个临时节点,并携带工作的进度数据。这样汇总的进程可以监控目录子节点的变化获得工作进度的实时的全局情况。

总的来说,利用Zookeeper的watcher注册和异步通知功能,通知的发送者创建一个节点,并将通知的数据写入的该节点;通知的接受者则对该节点注册watch,当节点变化时,就算作通知的到来。

场景实践

通过上面的说明,其实实现还是非常容易的。看下关键的几个地方:

  • g_monitor_child:变量等于0标识只监控节点,等于1标识监控所有子节点。
  • show_notify(zh,g_path);:打印接受到的通知
  • show_list(zh,g_path);:打印所有子节点的进度

再来看监控函数:

void zktest_watcher_g(zhandle_t* zh, int type, int state, const char* path, void* watcherCtx)  
{  
    //监控节点数据变化
    if(type == ZOO_CHANGED_EVENT &&
       state == ZOO_CONNECTED_STATE &&
       g_monitor_child == 0){

        show_notify(zh,g_path);
    //监控子节点个数变化
    }else if(type == ZOO_CHILD_EVENT &&
            state == ZOO_CONNECTED_STATE &&
            g_monitor_child == 1){

        show_list(zh,g_path);
    //监控子节点数据变化
    }else if(type == ZOO_CHANGED_EVENT &&
            state == ZOO_CONNECTED_STATE &&
            g_monitor_child == 1){

        show_list(zh,g_path);
    }
}


下面是完整的代码:

#include<stdio.h>  
#include<string.h>  
#include<unistd.h>
#include"zookeeper.h"  
#include"zookeeper_log.h"  

char g_host[512]= "172.17.0.36:2181";  
char g_path[512]= "/Notify";
int g_monitor_child = 0;

//watch function when child list changed
void zktest_watcher_g(zhandle_t* zh, int type, int state, const char* path, void* watcherCtx);
void show_notify(zhandle_t *zkhandle,const char *path);
//show all process ip:pid
void show_list(zhandle_t *zkhandle,const char *path);

void print_usage();
void get_option(int argc,const char* argv[]);

/**********unitl*********************/  
void print_usage()
{
    printf("Usage : [notify] [-h] [-c] [-p path][-s ip:port] \n");
    printf("        -h Show help\n");
    printf("        -p path\n");
    printf("        -c monitor the child nodes\n");
    printf("        -s zookeeper server ip:port\n");
    printf("For example:\n");
    printf("notify -s172.17.0.36:2181 -p /Notify\n");
}

void get_option(int argc,const char* argv[])
{
    extern char    *optarg;
    int            optch;
    int            dem = 1;
    const char    optstring[] = "hcp:s:";


    while((optch = getopt(argc , (char * const *)argv , optstring)) != -1 )
    {
        switch( optch )
        {
        case 'h':
            print_usage();
            exit(-1);
        case '?':
            print_usage();
            printf("unknown parameter: %c\n", optopt);
            exit(-1);
        case ':':
            print_usage();
            printf("need parameter: %c\n", optopt);
            exit(-1);
        case 'c':
            g_monitor_child = 1;
            break;
        case 's':
            strncpy(g_host,optarg,sizeof(g_host));
            break;
        case 'p':
            strncpy(g_path,optarg,sizeof(g_path));
            break;
        default:
            break;
        }
    }
} 
void zktest_watcher_g(zhandle_t* zh, int type, int state, const char* path, void* watcherCtx)  
{  
  /*
    printf("watcher event\n");  
    printf("type: %d\n", type);  
    printf("state: %d\n", state);  
    printf("path: %s\n", path);  
    printf("watcherCtx: %s\n", (char *)watcherCtx);  
  */

    if(type == ZOO_CHANGED_EVENT &&
       state == ZOO_CONNECTED_STATE &&
       g_monitor_child == 0){

        show_notify(zh,g_path);
    }else if(type == ZOO_CHILD_EVENT &&
            state == ZOO_CONNECTED_STATE &&
            g_monitor_child == 1){

        show_list(zh,g_path);
    }else if(type == ZOO_CHANGED_EVENT &&
            state == ZOO_CONNECTED_STATE &&
            g_monitor_child == 1){

        show_list(zh,g_path);
    }
} 
void show_notify(zhandle_t *zkhandle,const char *path)
{
    char notify_buffer[1024]={0};
    int  notify_len = sizeof(notify_buffer);

    int ret = zoo_get(zkhandle,g_path,1,notify_buffer,¬ify_len,NULL);
    if(ret != ZOK){
        fprintf(stderr,"failed to get the data of path %s!\n",g_path);
    }else{
        printf("Notice:%s\n",notify_buffer);
    }
}
void show_list(zhandle_t *zkhandle,const char *path)
{

    struct String_vector children;
    int i = 0;
    int ret = zoo_get_children(zkhandle,path,1,&children);

    if(ret == ZOK){
        char child_path[512] ={0};
        char notify_buffer[1024] = {0};
        int notify_len = sizeof(notify_buffer);

        printf("--------------\n");
        for(i = 0; i < children.count; ++i){
            sprintf(child_path,"%s/%s",g_path,children.data[i]);
            ret = zoo_get(zkhandle,child_path,1,notify_buffer,¬ify_len,NULL);
            if(ret != ZOK){
                fprintf(stderr,"failed to get the data of path %s!\n",child_path);
            }else{
                printf("%s:%s\n",children.data[i],notify_buffer);
            }
        }
    }else{
        fprintf(stderr,"failed to get the children of path %s!\n",path);
    }

    for(i = 0; i < children.count; ++i){
        free(children.data[i]);
        children.data[i] = NULL;
    }
}

int main(int argc, const char *argv[])  
{  
    int timeout = 30000;  
    char path_buffer[512];  
    int bufferlen=sizeof(path_buffer);  

    zoo_set_debug_level(ZOO_LOG_LEVEL_WARN); //设置日志级别,避免出现一些其他信息  

    get_option(argc,argv);

    zhandle_t* zkhandle = zookeeper_init(g_host,zktest_watcher_g, timeout, 0, (char *)"Notify Test", 0);  

    if (zkhandle ==NULL)  
    {  
        fprintf(stderr, "Error when connecting to zookeeper servers...\n");  
        exit(EXIT_FAILURE);  
    }  

    int ret = zoo_exists(zkhandle,g_path,0,NULL); 
    if(ret != ZOK){
        ret = zoo_create(zkhandle,g_path,"1.0",strlen("1.0"),  
                          &ZOO_OPEN_ACL_UNSAFE,0,  
                          path_buffer,bufferlen);  
        if(ret != ZOK){
            fprintf(stderr,"failed to create the path %s!\n",g_path);
        }else{
            printf("create path %s successfully!\n",g_path);
        }
    }

    if(ret == ZOK && g_monitor_child == 0){
        show_notify(zkhandle,g_path);
    }else if(ret == ZOK && g_monitor_child == 1){
        show_list(zkhandle,g_path);
    }

    getchar();

    zookeeper_close(zkhandle); 

    return 0;
}


程序由3个参数选项

  • -s:指定Zookeeper的服务器的ip:port
  • -p:指定要监控的路径,默认为/Notify
  • -c:使用此项表示监控子节点列表

notify -s172.17.0.36:2181 -p /Notify
当你在客户端修改数据的时候,程序就能收到对应的通知了。


相关文章
|
5月前
|
人工智能 安全 应用服务中间件
阿里巴巴 MCP 分布式落地实践:快速转换 HSF 到 MCP server
本文分享了阿里巴巴内部将大规模HSF服务快速转换为MCP Server的实践经验,通过Higress网关实现MCP协议卸载,无需修改代码即可接入MCP生态。文章分析了MCP生态面临的挑战,如协议快速迭代和SDK不稳定性,并详细介绍了操作步骤及组件功能。强调MCP虽非终极解决方案,但作为AI业务工程化的起点具有重要意义。最后总结指出,MCP只是AI原生应用发展的第一步,未来还有更多可能性值得探索。
1044 48
|
17天前
|
消息中间件 缓存 监控
中间件架构设计与实践:构建高性能分布式系统的核心基石
摘要 本文系统探讨了中间件技术及其在分布式系统中的核心价值。作者首先定义了中间件作为连接系统组件的&quot;神经网络&quot;,强调其在数据传输、系统稳定性和扩展性中的关键作用。随后详细分类了中间件体系,包括通信中间件(如RabbitMQ/Kafka)、数据中间件(如Redis/MyCAT)等类型。文章重点剖析了消息中间件的实现机制,通过Spring Boot代码示例展示了消息生产者的完整实现,涵盖消息ID生成、持久化、批量发送及重试机制等关键技术点。最后,作者指出中间件架构设计对系统性能的决定性影响,
|
5月前
|
监控 Linux 应用服务中间件
Linux多节点多硬盘部署MinIO:分布式MinIO集群部署指南搭建高可用架构实践
通过以上步骤,已成功基于已有的 MinIO 服务,扩展为一个 MinIO 集群。该集群具有高可用性和容错性,适合生产环境使用。如果有任何问题,请检查日志或参考MinIO 官方文档。作者联系方式vx:2743642415。
1416 57
|
5月前
|
安全 JavaScript 前端开发
HarmonyOS NEXT~HarmonyOS 语言仓颉:下一代分布式开发语言的技术解析与应用实践
HarmonyOS语言仓颉是华为专为HarmonyOS生态系统设计的新型编程语言,旨在解决分布式环境下的开发挑战。它以“编码创造”为理念,具备分布式原生、高性能与高效率、安全可靠三大核心特性。仓颉语言通过内置分布式能力简化跨设备开发,提供统一的编程模型和开发体验。文章从语言基础、关键特性、开发实践及未来展望四个方面剖析其技术优势,助力开发者掌握这一新兴工具,构建全场景分布式应用。
495 35
|
6月前
|
存储 负载均衡 测试技术
ACK Gateway with Inference Extension:优化多机分布式大模型推理服务实践
本文介绍了如何利用阿里云容器服务ACK推出的ACK Gateway with Inference Extension组件,在Kubernetes环境中为多机分布式部署的LLM推理服务提供智能路由和负载均衡能力。文章以部署和优化QwQ-32B模型为例,详细展示了从环境准备到性能测试的完整实践过程。
|
7月前
|
并行计算 PyTorch 算法框架/工具
融合AMD与NVIDIA GPU集群的MLOps:异构计算环境中的分布式训练架构实践
本文探讨了如何通过技术手段混合使用AMD与NVIDIA GPU集群以支持PyTorch分布式训练。面对CUDA与ROCm框架互操作性不足的问题,文章提出利用UCC和UCX等统一通信框架实现高效数据传输,并在异构Kubernetes集群中部署任务。通过解决轻度与强度异构环境下的挑战,如计算能力不平衡、内存容量差异及通信性能优化,文章展示了如何无需重构代码即可充分利用异构硬件资源。尽管存在RDMA验证不足、通信性能次优等局限性,但该方案为最大化GPU资源利用率、降低供应商锁定提供了可行路径。源代码已公开,供读者参考实践。
492 3
融合AMD与NVIDIA GPU集群的MLOps:异构计算环境中的分布式训练架构实践
|
7月前
|
人工智能 运维 监控
领先AI企业经验谈:探究AI分布式推理网络架构实践
当前,AI行业正处于快速发展的关键时期。继DeepSeek大放异彩之后,又一款备受瞩目的AI智能体产品Manus横空出世。Manus具备独立思考、规划和执行复杂任务的能力,其多智能体架构能够自主调用工具。在GAIA基准测试中,Manus的性能超越了OpenAI同层次的大模型,展现出卓越的技术实力。
|
9月前
|
数据采集 人工智能 分布式计算
MaxFrame:链接大数据与AI的高效分布式计算框架深度评测与实践!
阿里云推出的MaxFrame是链接大数据与AI的分布式Python计算框架,提供类似Pandas的操作接口和分布式处理能力。本文从部署、功能验证到实际场景全面评测MaxFrame,涵盖分布式Pandas操作、大语言模型数据预处理及企业级应用。结果显示,MaxFrame在处理大规模数据时性能显著提升,代码兼容性强,适合从数据清洗到训练数据生成的全链路场景...
384 5
MaxFrame:链接大数据与AI的高效分布式计算框架深度评测与实践!
|
10月前
|
存储 SpringCloudAlibaba Java
【SpringCloud Alibaba系列】一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论
一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论。
【SpringCloud Alibaba系列】一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论
|
9月前
|
存储 运维 安全
盘古分布式存储系统的稳定性实践
本文介绍了阿里云飞天盘古分布式存储系统的稳定性实践。盘古作为阿里云的核心组件,支撑了阿里巴巴集团的众多业务,确保数据高可靠性、系统高可用性和安全生产运维是其关键目标。文章详细探讨了数据不丢不错、系统高可用性的实现方法,以及通过故障演练、自动化发布和健康检查等手段保障生产安全。总结指出,稳定性是一项系统工程,需要持续迭代演进,盘古经过十年以上的线上锤炼,积累了丰富的实践经验。
643 7

热门文章

最新文章