Zookeeper场景实践:(5)分布式通知/协调

简介: 1.基本介绍 通知/协调机制通常有两种方式。 系统调度模式:操作人员发送通知实际是通过控制台改变某个节点的状态,然后Zookeeper将这些变化发送给注册了这个节点的Watcher的所有客户端。工作汇报模式:这个情况是每个工作进程都在某个目录下创建一个临时节点,并携带工作的进度数据。这样汇总的进程可以监控目录子节点的变化获得工作进度的实时的全局情况。 总的

1.基本介绍

通知/协调机制通常有两种方式。

  • 系统调度模式:操作人员发送通知实际是通过控制台改变某个节点的状态,然后Zookeeper将这些变化发送给注册了这个节点的Watcher的所有客户端。
  • 工作汇报模式:这个情况是每个工作进程都在某个目录下创建一个临时节点,并携带工作的进度数据。这样汇总的进程可以监控目录子节点的变化获得工作进度的实时的全局情况。

总的来说,利用Zookeeper的watcher注册和异步通知功能,通知的发送者创建一个节点,并将通知的数据写入的该节点;通知的接受者则对该节点注册watch,当节点变化时,就算作通知的到来。

场景实践

通过上面的说明,其实实现还是非常容易的。看下关键的几个地方:

  • g_monitor_child:变量等于0标识只监控节点,等于1标识监控所有子节点。
  • show_notify(zh,g_path);:打印接受到的通知
  • show_list(zh,g_path);:打印所有子节点的进度

再来看监控函数:

void zktest_watcher_g(zhandle_t* zh, int type, int state, const char* path, void* watcherCtx)  
{  
    //监控节点数据变化
    if(type == ZOO_CHANGED_EVENT &&
       state == ZOO_CONNECTED_STATE &&
       g_monitor_child == 0){

        show_notify(zh,g_path);
    //监控子节点个数变化
    }else if(type == ZOO_CHILD_EVENT &&
            state == ZOO_CONNECTED_STATE &&
            g_monitor_child == 1){

        show_list(zh,g_path);
    //监控子节点数据变化
    }else if(type == ZOO_CHANGED_EVENT &&
            state == ZOO_CONNECTED_STATE &&
            g_monitor_child == 1){

        show_list(zh,g_path);
    }
}


下面是完整的代码:

#include<stdio.h>  
#include<string.h>  
#include<unistd.h>
#include"zookeeper.h"  
#include"zookeeper_log.h"  

char g_host[512]= "172.17.0.36:2181";  
char g_path[512]= "/Notify";
int g_monitor_child = 0;

//watch function when child list changed
void zktest_watcher_g(zhandle_t* zh, int type, int state, const char* path, void* watcherCtx);
void show_notify(zhandle_t *zkhandle,const char *path);
//show all process ip:pid
void show_list(zhandle_t *zkhandle,const char *path);

void print_usage();
void get_option(int argc,const char* argv[]);

/**********unitl*********************/  
void print_usage()
{
    printf("Usage : [notify] [-h] [-c] [-p path][-s ip:port] \n");
    printf("        -h Show help\n");
    printf("        -p path\n");
    printf("        -c monitor the child nodes\n");
    printf("        -s zookeeper server ip:port\n");
    printf("For example:\n");
    printf("notify -s172.17.0.36:2181 -p /Notify\n");
}

void get_option(int argc,const char* argv[])
{
    extern char    *optarg;
    int            optch;
    int            dem = 1;
    const char    optstring[] = "hcp:s:";


    while((optch = getopt(argc , (char * const *)argv , optstring)) != -1 )
    {
        switch( optch )
        {
        case 'h':
            print_usage();
            exit(-1);
        case '?':
            print_usage();
            printf("unknown parameter: %c\n", optopt);
            exit(-1);
        case ':':
            print_usage();
            printf("need parameter: %c\n", optopt);
            exit(-1);
        case 'c':
            g_monitor_child = 1;
            break;
        case 's':
            strncpy(g_host,optarg,sizeof(g_host));
            break;
        case 'p':
            strncpy(g_path,optarg,sizeof(g_path));
            break;
        default:
            break;
        }
    }
} 
void zktest_watcher_g(zhandle_t* zh, int type, int state, const char* path, void* watcherCtx)  
{  
  /*
    printf("watcher event\n");  
    printf("type: %d\n", type);  
    printf("state: %d\n", state);  
    printf("path: %s\n", path);  
    printf("watcherCtx: %s\n", (char *)watcherCtx);  
  */

    if(type == ZOO_CHANGED_EVENT &&
       state == ZOO_CONNECTED_STATE &&
       g_monitor_child == 0){

        show_notify(zh,g_path);
    }else if(type == ZOO_CHILD_EVENT &&
            state == ZOO_CONNECTED_STATE &&
            g_monitor_child == 1){

        show_list(zh,g_path);
    }else if(type == ZOO_CHANGED_EVENT &&
            state == ZOO_CONNECTED_STATE &&
            g_monitor_child == 1){

        show_list(zh,g_path);
    }
} 
void show_notify(zhandle_t *zkhandle,const char *path)
{
    char notify_buffer[1024]={0};
    int  notify_len = sizeof(notify_buffer);

    int ret = zoo_get(zkhandle,g_path,1,notify_buffer,¬ify_len,NULL);
    if(ret != ZOK){
        fprintf(stderr,"failed to get the data of path %s!\n",g_path);
    }else{
        printf("Notice:%s\n",notify_buffer);
    }
}
void show_list(zhandle_t *zkhandle,const char *path)
{

    struct String_vector children;
    int i = 0;
    int ret = zoo_get_children(zkhandle,path,1,&children);

    if(ret == ZOK){
        char child_path[512] ={0};
        char notify_buffer[1024] = {0};
        int notify_len = sizeof(notify_buffer);

        printf("--------------\n");
        for(i = 0; i < children.count; ++i){
            sprintf(child_path,"%s/%s",g_path,children.data[i]);
            ret = zoo_get(zkhandle,child_path,1,notify_buffer,¬ify_len,NULL);
            if(ret != ZOK){
                fprintf(stderr,"failed to get the data of path %s!\n",child_path);
            }else{
                printf("%s:%s\n",children.data[i],notify_buffer);
            }
        }
    }else{
        fprintf(stderr,"failed to get the children of path %s!\n",path);
    }

    for(i = 0; i < children.count; ++i){
        free(children.data[i]);
        children.data[i] = NULL;
    }
}

int main(int argc, const char *argv[])  
{  
    int timeout = 30000;  
    char path_buffer[512];  
    int bufferlen=sizeof(path_buffer);  

    zoo_set_debug_level(ZOO_LOG_LEVEL_WARN); //设置日志级别,避免出现一些其他信息  

    get_option(argc,argv);

    zhandle_t* zkhandle = zookeeper_init(g_host,zktest_watcher_g, timeout, 0, (char *)"Notify Test", 0);  

    if (zkhandle ==NULL)  
    {  
        fprintf(stderr, "Error when connecting to zookeeper servers...\n");  
        exit(EXIT_FAILURE);  
    }  

    int ret = zoo_exists(zkhandle,g_path,0,NULL); 
    if(ret != ZOK){
        ret = zoo_create(zkhandle,g_path,"1.0",strlen("1.0"),  
                          &ZOO_OPEN_ACL_UNSAFE,0,  
                          path_buffer,bufferlen);  
        if(ret != ZOK){
            fprintf(stderr,"failed to create the path %s!\n",g_path);
        }else{
            printf("create path %s successfully!\n",g_path);
        }
    }

    if(ret == ZOK && g_monitor_child == 0){
        show_notify(zkhandle,g_path);
    }else if(ret == ZOK && g_monitor_child == 1){
        show_list(zkhandle,g_path);
    }

    getchar();

    zookeeper_close(zkhandle); 

    return 0;
}


程序由3个参数选项

  • -s:指定Zookeeper的服务器的ip:port
  • -p:指定要监控的路径,默认为/Notify
  • -c:使用此项表示监控子节点列表

notify -s172.17.0.36:2181 -p /Notify
当你在客户端修改数据的时候,程序就能收到对应的通知了。


相关文章
|
2月前
|
消息中间件 分布式计算 资源调度
《聊聊分布式》ZooKeeper与ZAB协议:分布式协调的核心引擎
ZooKeeper是一个开源的分布式协调服务,基于ZAB协议实现数据一致性,提供分布式锁、配置管理、领导者选举等核心功能,具有高可用、强一致和简单易用的特点,广泛应用于Kafka、Hadoop等大型分布式系统中。
|
3月前
|
数据采集 消息中间件 NoSQL
分布式爬虫的全局请求间隔协调与IP轮换策略
分布式爬虫的全局请求间隔协调与IP轮换策略
|
7月前
|
NoSQL 算法 安全
redis分布式锁在高并发场景下的方案设计与性能提升
本文探讨了Redis分布式锁在主从架构下失效的问题及其解决方案。首先通过CAP理论分析,Redis遵循AP原则,导致锁可能失效。针对此问题,提出两种解决方案:Zookeeper分布式锁(追求CP一致性)和Redlock算法(基于多个Redis实例提升可靠性)。文章还讨论了可能遇到的“坑”,如加从节点引发超卖问题、建议Redis节点数为奇数以及持久化策略对锁的影响。最后,从性能优化角度出发,介绍了减少锁粒度和分段锁的策略,并结合实际场景(如下单重复提交、支付与取消订单冲突)展示了分布式锁的应用方法。
574 3
|
7月前
|
存储 NoSQL Java
从扣减库存场景来讲讲redis分布式锁中的那些“坑”
本文从一个简单的库存扣减场景出发,深入分析了高并发下的超卖问题,并逐步优化解决方案。首先通过本地锁解决单机并发问题,但集群环境下失效;接着引入Redis分布式锁,利用SETNX命令实现加锁,但仍存在死锁、锁过期等隐患。文章详细探讨了通过设置唯一标识、续命机制等方法完善锁的可靠性,并最终引出Redisson工具,其内置的锁续命和原子性操作极大简化了分布式锁的实现。最后,作者剖析了Redisson源码,揭示其实现原理,并预告后续关于主从架构下分布式锁的应用与性能优化内容。
378 0
|
12月前
|
存储 SpringCloudAlibaba Java
【SpringCloud Alibaba系列】一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论
一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论。
【SpringCloud Alibaba系列】一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论
|
NoSQL Java Redis
秒杀抢购场景下实战JVM级别锁与分布式锁
在电商系统中,秒杀抢购活动是一种常见的营销手段。它通过设定极低的价格和有限的商品数量,吸引大量用户在特定时间点抢购,从而迅速增加销量、提升品牌曝光度和用户活跃度。然而,这种活动也对系统的性能和稳定性提出了极高的要求。特别是在秒杀开始的瞬间,系统需要处理海量的并发请求,同时确保数据的准确性和一致性。 为了解决这些问题,系统开发者们引入了锁机制。锁机制是一种用于控制对共享资源的并发访问的技术,它能够确保在同一时间只有一个进程或线程能够操作某个资源,从而避免数据不一致或冲突。在秒杀抢购场景下,锁机制显得尤为重要,它能够保证商品库存的扣减操作是原子性的,避免出现超卖或数据不一致的情况。
334 10
|
NoSQL Java 数据处理
基于Redis海量数据场景分布式ID架构实践
【11月更文挑战第30天】在现代分布式系统中,生成全局唯一的ID是一个常见且重要的需求。在微服务架构中,各个服务可能需要生成唯一标识符,如用户ID、订单ID等。传统的自增ID已经无法满足在集群环境下保持唯一性的要求,而分布式ID解决方案能够确保即使在多个实例间也能生成全局唯一的标识符。本文将深入探讨如何利用Redis实现分布式ID生成,并通过Java语言展示多个示例,同时分析每个实践方案的优缺点。
466 8
|
调度 数据库
什么场景下要使用分布式锁
分布式锁用于确保多节点环境下的资源互斥访问、避免重复操作、控制并发流量、防止竞态条件及任务调度协调,常见于防止超卖等问题。
374 4
|
4月前
|
存储 负载均衡 NoSQL
【赵渝强老师】Redis Cluster分布式集群
Redis Cluster是Redis的分布式存储解决方案,通过哈希槽(slot)实现数据分片,支持水平扩展,具备高可用性和负载均衡能力,适用于大规模数据场景。
371 2
|
4月前
|
存储 缓存 NoSQL
【📕分布式锁通关指南 12】源码剖析redisson如何利用Redis数据结构实现Semaphore和CountDownLatch
本文解析 Redisson 如何通过 Redis 实现分布式信号量(RSemaphore)与倒数闩(RCountDownLatch),利用 Lua 脚本与原子操作保障分布式环境下的同步控制,帮助开发者更好地理解其原理与应用。
324 6

热门文章

最新文章