Zookeeper场景实践:(5)分布式通知/协调

本文涉及的产品
云原生网关 MSE Higress,422元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
注册配置 MSE Nacos/ZooKeeper,118元/月
简介: 1.基本介绍 通知/协调机制通常有两种方式。 系统调度模式:操作人员发送通知实际是通过控制台改变某个节点的状态,然后Zookeeper将这些变化发送给注册了这个节点的Watcher的所有客户端。工作汇报模式:这个情况是每个工作进程都在某个目录下创建一个临时节点,并携带工作的进度数据。这样汇总的进程可以监控目录子节点的变化获得工作进度的实时的全局情况。 总的

1.基本介绍

通知/协调机制通常有两种方式。

  • 系统调度模式:操作人员发送通知实际是通过控制台改变某个节点的状态,然后Zookeeper将这些变化发送给注册了这个节点的Watcher的所有客户端。
  • 工作汇报模式:这个情况是每个工作进程都在某个目录下创建一个临时节点,并携带工作的进度数据。这样汇总的进程可以监控目录子节点的变化获得工作进度的实时的全局情况。

总的来说,利用Zookeeper的watcher注册和异步通知功能,通知的发送者创建一个节点,并将通知的数据写入的该节点;通知的接受者则对该节点注册watch,当节点变化时,就算作通知的到来。

场景实践

通过上面的说明,其实实现还是非常容易的。看下关键的几个地方:

  • g_monitor_child:变量等于0标识只监控节点,等于1标识监控所有子节点。
  • show_notify(zh,g_path);:打印接受到的通知
  • show_list(zh,g_path);:打印所有子节点的进度

再来看监控函数:

void zktest_watcher_g(zhandle_t* zh, int type, int state, const char* path, void* watcherCtx)  
{  
    //监控节点数据变化
    if(type == ZOO_CHANGED_EVENT &&
       state == ZOO_CONNECTED_STATE &&
       g_monitor_child == 0){

        show_notify(zh,g_path);
    //监控子节点个数变化
    }else if(type == ZOO_CHILD_EVENT &&
            state == ZOO_CONNECTED_STATE &&
            g_monitor_child == 1){

        show_list(zh,g_path);
    //监控子节点数据变化
    }else if(type == ZOO_CHANGED_EVENT &&
            state == ZOO_CONNECTED_STATE &&
            g_monitor_child == 1){

        show_list(zh,g_path);
    }
}


下面是完整的代码:

#include<stdio.h>  
#include<string.h>  
#include<unistd.h>
#include"zookeeper.h"  
#include"zookeeper_log.h"  

char g_host[512]= "172.17.0.36:2181";  
char g_path[512]= "/Notify";
int g_monitor_child = 0;

//watch function when child list changed
void zktest_watcher_g(zhandle_t* zh, int type, int state, const char* path, void* watcherCtx);
void show_notify(zhandle_t *zkhandle,const char *path);
//show all process ip:pid
void show_list(zhandle_t *zkhandle,const char *path);

void print_usage();
void get_option(int argc,const char* argv[]);

/**********unitl*********************/  
void print_usage()
{
    printf("Usage : [notify] [-h] [-c] [-p path][-s ip:port] \n");
    printf("        -h Show help\n");
    printf("        -p path\n");
    printf("        -c monitor the child nodes\n");
    printf("        -s zookeeper server ip:port\n");
    printf("For example:\n");
    printf("notify -s172.17.0.36:2181 -p /Notify\n");
}

void get_option(int argc,const char* argv[])
{
    extern char    *optarg;
    int            optch;
    int            dem = 1;
    const char    optstring[] = "hcp:s:";


    while((optch = getopt(argc , (char * const *)argv , optstring)) != -1 )
    {
        switch( optch )
        {
        case 'h':
            print_usage();
            exit(-1);
        case '?':
            print_usage();
            printf("unknown parameter: %c\n", optopt);
            exit(-1);
        case ':':
            print_usage();
            printf("need parameter: %c\n", optopt);
            exit(-1);
        case 'c':
            g_monitor_child = 1;
            break;
        case 's':
            strncpy(g_host,optarg,sizeof(g_host));
            break;
        case 'p':
            strncpy(g_path,optarg,sizeof(g_path));
            break;
        default:
            break;
        }
    }
} 
void zktest_watcher_g(zhandle_t* zh, int type, int state, const char* path, void* watcherCtx)  
{  
  /*
    printf("watcher event\n");  
    printf("type: %d\n", type);  
    printf("state: %d\n", state);  
    printf("path: %s\n", path);  
    printf("watcherCtx: %s\n", (char *)watcherCtx);  
  */

    if(type == ZOO_CHANGED_EVENT &&
       state == ZOO_CONNECTED_STATE &&
       g_monitor_child == 0){

        show_notify(zh,g_path);
    }else if(type == ZOO_CHILD_EVENT &&
            state == ZOO_CONNECTED_STATE &&
            g_monitor_child == 1){

        show_list(zh,g_path);
    }else if(type == ZOO_CHANGED_EVENT &&
            state == ZOO_CONNECTED_STATE &&
            g_monitor_child == 1){

        show_list(zh,g_path);
    }
} 
void show_notify(zhandle_t *zkhandle,const char *path)
{
    char notify_buffer[1024]={0};
    int  notify_len = sizeof(notify_buffer);

    int ret = zoo_get(zkhandle,g_path,1,notify_buffer,¬ify_len,NULL);
    if(ret != ZOK){
        fprintf(stderr,"failed to get the data of path %s!\n",g_path);
    }else{
        printf("Notice:%s\n",notify_buffer);
    }
}
void show_list(zhandle_t *zkhandle,const char *path)
{

    struct String_vector children;
    int i = 0;
    int ret = zoo_get_children(zkhandle,path,1,&children);

    if(ret == ZOK){
        char child_path[512] ={0};
        char notify_buffer[1024] = {0};
        int notify_len = sizeof(notify_buffer);

        printf("--------------\n");
        for(i = 0; i < children.count; ++i){
            sprintf(child_path,"%s/%s",g_path,children.data[i]);
            ret = zoo_get(zkhandle,child_path,1,notify_buffer,¬ify_len,NULL);
            if(ret != ZOK){
                fprintf(stderr,"failed to get the data of path %s!\n",child_path);
            }else{
                printf("%s:%s\n",children.data[i],notify_buffer);
            }
        }
    }else{
        fprintf(stderr,"failed to get the children of path %s!\n",path);
    }

    for(i = 0; i < children.count; ++i){
        free(children.data[i]);
        children.data[i] = NULL;
    }
}

int main(int argc, const char *argv[])  
{  
    int timeout = 30000;  
    char path_buffer[512];  
    int bufferlen=sizeof(path_buffer);  

    zoo_set_debug_level(ZOO_LOG_LEVEL_WARN); //设置日志级别,避免出现一些其他信息  

    get_option(argc,argv);

    zhandle_t* zkhandle = zookeeper_init(g_host,zktest_watcher_g, timeout, 0, (char *)"Notify Test", 0);  

    if (zkhandle ==NULL)  
    {  
        fprintf(stderr, "Error when connecting to zookeeper servers...\n");  
        exit(EXIT_FAILURE);  
    }  

    int ret = zoo_exists(zkhandle,g_path,0,NULL); 
    if(ret != ZOK){
        ret = zoo_create(zkhandle,g_path,"1.0",strlen("1.0"),  
                          &ZOO_OPEN_ACL_UNSAFE,0,  
                          path_buffer,bufferlen);  
        if(ret != ZOK){
            fprintf(stderr,"failed to create the path %s!\n",g_path);
        }else{
            printf("create path %s successfully!\n",g_path);
        }
    }

    if(ret == ZOK && g_monitor_child == 0){
        show_notify(zkhandle,g_path);
    }else if(ret == ZOK && g_monitor_child == 1){
        show_list(zkhandle,g_path);
    }

    getchar();

    zookeeper_close(zkhandle); 

    return 0;
}


程序由3个参数选项

  • -s:指定Zookeeper的服务器的ip:port
  • -p:指定要监控的路径,默认为/Notify
  • -c:使用此项表示监控子节点列表

notify -s172.17.0.36:2181 -p /Notify
当你在客户端修改数据的时候,程序就能收到对应的通知了。


相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
相关文章
|
1月前
|
分布式计算 NoSQL Java
Hadoop-32 ZooKeeper 分布式锁问题 分布式锁Java实现 附带案例和实现思路代码
Hadoop-32 ZooKeeper 分布式锁问题 分布式锁Java实现 附带案例和实现思路代码
45 2
|
1月前
|
分布式计算 Hadoop
Hadoop-27 ZooKeeper集群 集群配置启动 3台云服务器 myid集群 zoo.cfg多节点配置 分布式协调框架 Leader Follower Observer
Hadoop-27 ZooKeeper集群 集群配置启动 3台云服务器 myid集群 zoo.cfg多节点配置 分布式协调框架 Leader Follower Observer
48 1
|
1月前
|
存储 SQL 消息中间件
Hadoop-26 ZooKeeper集群 3台云服务器 基础概念简介与环境的配置使用 架构组成 分布式协调框架 Leader Follower Observer
Hadoop-26 ZooKeeper集群 3台云服务器 基础概念简介与环境的配置使用 架构组成 分布式协调框架 Leader Follower Observer
49 0
|
1月前
|
NoSQL Java Redis
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
Redis分布式锁在高并发场景下是重要的技术手段,但其实现过程中常遇到五大深坑:**原子性问题**、**连接耗尽问题**、**锁过期问题**、**锁失效问题**以及**锁分段问题**。这些问题不仅影响系统的稳定性和性能,还可能导致数据不一致。尼恩在实际项目中总结了这些坑,并提供了详细的解决方案,包括使用Lua脚本保证原子性、设置合理的锁过期时间和使用看门狗机制、以及通过锁分段提升性能。这些经验和技巧对面试和实际开发都有很大帮助,值得深入学习和实践。
太惨痛: Redis 分布式锁 5个大坑,又大又深, 如何才能 避开 ?
|
3月前
|
NoSQL Redis
基于Redis的高可用分布式锁——RedLock
这篇文章介绍了基于Redis的高可用分布式锁RedLock的概念、工作流程、获取和释放锁的方法,以及RedLock相比单机锁在高可用性上的优势,同时指出了其在某些特殊场景下的不足,并提到了ZooKeeper作为另一种实现分布式锁的方案。
114 2
基于Redis的高可用分布式锁——RedLock
|
3月前
|
缓存 NoSQL Java
SpringBoot整合Redis、以及缓存穿透、缓存雪崩、缓存击穿的理解分布式情况下如何添加分布式锁 【续篇】
这篇文章是关于如何在SpringBoot应用中整合Redis并处理分布式场景下的缓存问题,包括缓存穿透、缓存雪崩和缓存击穿。文章详细讨论了在分布式情况下如何添加分布式锁来解决缓存击穿问题,提供了加锁和解锁的实现过程,并展示了使用JMeter进行压力测试来验证锁机制有效性的方法。
SpringBoot整合Redis、以及缓存穿透、缓存雪崩、缓存击穿的理解分布式情况下如何添加分布式锁 【续篇】
|
14天前
|
NoSQL Redis
Redis分布式锁如何实现 ?
Redis分布式锁通过SETNX指令实现,确保仅在键不存在时设置值。此机制用于控制多个线程对共享资源的访问,避免并发冲突。然而,实际应用中需解决死锁、锁超时、归一化、可重入及阻塞等问题,以确保系统的稳定性和可靠性。解决方案包括设置锁超时、引入Watch Dog机制、使用ThreadLocal绑定加解锁操作、实现计数器支持可重入锁以及采用自旋锁思想处理阻塞请求。
52 16
|
1月前
|
缓存 NoSQL Java
大数据-50 Redis 分布式锁 乐观锁 Watch SETNX Lua Redisson分布式锁 Java实现分布式锁
大数据-50 Redis 分布式锁 乐观锁 Watch SETNX Lua Redisson分布式锁 Java实现分布式锁
61 3
大数据-50 Redis 分布式锁 乐观锁 Watch SETNX Lua Redisson分布式锁 Java实现分布式锁
|
1月前
|
NoSQL Redis 数据库
计数器 分布式锁 redis实现
【10月更文挑战第5天】
50 1
|
1月前
|
NoSQL 算法 关系型数据库
Redis分布式锁
【10月更文挑战第1天】分布式锁用于在多进程环境中保护共享资源,防止并发冲突。通常借助外部系统如Redis或Zookeeper实现。通过`SETNX`命令加锁,并设置过期时间防止死锁。为避免误删他人锁,加锁时附带唯一标识,解锁前验证。面对锁提前过期的问题,可使用守护线程自动续期。在Redis集群中,需考虑主从同步延迟导致的锁丢失问题,Redlock算法可提高锁的可靠性。
80 4
下一篇
无影云桌面