大数据编程技术基础实验六:ZooKeeper实验——进程协作

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
云原生网关 MSE Higress,422元/月
简介: 大数据基础实验六,使用ZooKeeper了解并实践进程协作的操作。

一、前言

在实验开始之前,首先祝大家国庆节快乐,希望大家都能愉快的度过国庆假期,而我也就昨天休息了一天跟室友们出去吃了一顿饭,我们的国庆只有三天,真羡慕国庆七天并且还能出去玩的人,太命苦了,大学四年已过去三年一次像样的国庆都没有,只能默默的锻炼和学习安抚我的心灵。

好啦,回归正题,在前面一期实验中,我们刚学会了ZooKeeper的部署与启动,本期我们就来用ZooKeeper实战一下,学习一下ZooKeeper如何实现多个线程间的协作。

二、实验目的与要求

  1. 掌握Java代码如何连接ZooKeeper集群及通过代码读写ZooKeeper集群的目录下的数据,掌握ZooKeeper如何实现多个线程间的协作。
  2. 用Java代码实现两个线程,一个向ZooKeeper中某一目录中写入数据,另一线程监听此目录,若目录下数据有更新则将目录中数据读取并显示出来。

三、实验原理

  1. 通过ZooKeeper实现不同物理机器上的进程间通信。
  2. 场景使用:客户端A需要向客户端B发送一条消息msg1。
  3. 实现方法:客户端A把msg1发送给ZooKeeper集群,然后由客户端B自行去ZooKeeper集群读取msg1。

四、实验步骤

本实验主要完成多线程通过ZooKeeper完成彼此间的协作问题,实验过程包含启动集群、编写代码、客户端提交代码三个步骤。

1、启动ZooKeeper集群

启动ZooKeeper集群。具体步骤可以参考我前面一期博客内容:

大数据技术基础实验五:Zookeeper实验——部署ZooKeeper

2、导入jar包

我们在本地打开我们的开发工具Eclipse,然后创建我们的ZooKeeperTest项目,它是一个java项目。

image-20221002103907923.png

创建完项目之后,我们需要从服务器上ZooKeeper安装包的lib目录下,将如下jar包导入到开发工具,我们需要用到Xftp工具连接到服务器,然后直接进行拖拽到本地项目下的lib包内即可。

首先连接主机:

image-20221002104954878.png

我们需要导入的jar包有:

jline-0.9.94.jar

log4j-1.2.16.jar

netty-3.7.0.Final.jar

slf4j-api-1.6.1.jar

slf4j-log4j12-1.6.1.jar

zookeeper-3.4.6.jar

找到虚拟机内的ZooKeeper安装包的lib目录,它在/usr/cstor/zookeeper/lib,然后将lib下的这些jar包导入到我们本地的项目lib目录内:

image-20221002105233796.png

一共有六个jar包,最后一个zookeeper-3.4.6.jar它在这个目录/usr/cstor/zookeeper/lib的上一级里面:

image-20221002105608121.png

然后我们需要将这六个jar导入到我们的项目中,我们右键每一个jar包然后进行如下选择即可导入成功:

image-20221002110104609.png

3、编写java代码

然后我们创建两个java类:

image-20221002110334193.png

然后分别写入java代码。

向/testZk目录写数据线程代码实现:

publicclassWriteMsgextendsThread {
@Overridepublicvoidrun() {
try {
ZooKeeperzk=newZooKeeper("slave1:2181", 500000, null);
Stringcontent=Long.toString(newDate().getTime());
// 修改节点/testZk下的数据,第三个参数为版本,如果是-1,那会无视被修改的数据版本,直接改掉zk.setData("/testZk", content.getBytes(), -1);
// 关闭sessionzk.close();
      } catch (Exceptione) {
e.printStackTrace();
    }
  }
}

监听/testZk目录若数据改变则读取数据并显示线程代码实现:

publicclassReadMsg {
publicstaticvoidmain(String[] args) throwsException {
finalZooKeeperzk=newZooKeeper("slave1:2181", 500000, null);
//定义watch Watcherwacher=newWatcher() {
publicvoidprocess(WatchedEventevent) {
//监听到数据变化取出数据if(EventType.NodeDataChanged==event.getType()){
byte[] bb;
try {
bb=zk.getData("/testZk", null, null);
System.out.println("/testZk的数据: "+newString(bb));
         } catch (Exceptione) {
e.printStackTrace();
         }
       }
     }
   };
//设置watch zk.exists("/testZk", wacher);
//更新/testZk目录信息,触发wacthwhile(true)
   {
Thread.sleep(2000);
newWriteMsg().start();
//watch一次生效就会删除需重新设置 zk.exists("/testZk", wacher);
   }
  }
}

4、做成jar包

和之前打包类似,但这次我们需要改成可执行的jar文件:

image-20221002144042248.png

image-20221002144108676.png

打包成功之后,我们再次使用Xftp工具将打包好的jar包放到root目录下面,因为我们要在/testZk目录写数据线程,但在上一期实验中我们最后删除了这个目录,所以我们需要重新启动客户端,然后创建/testZk目录,然后退出。

这里启动客户端和创建目录我就不再演示,不会的朋友可以移步到上一期博客。

然后我们执行如下命令即可执行代码:

java -jar /root/ZooKeeperTest.jar

image-20221002144352842.png

这里我们成功打印了日志信息,打印日志信息为ZooKeeper接收线程监控到/testZk目录信息有变化时,读取该目录的内容。

但是这里一直抛出这样的错误:

2022-10-0214:34:08,511 ERROR [org.apache.zookeeper.ClientCnxn] - Error while calling watcher
java.lang.NullPointerException
        at org.apache.zookeeper.ClientCnxn$EventThread.processEvent(ClientCnxn.java:522)
        at org.apache.zookeeper.ClientCnxn$EventThread.run(ClientCnxn.java:498)

根据我上网查询过后,目前还是没有解决这个异常,我也不太懂这个是什么原因,如果有知道的大佬可以告诉我一下,谢谢,我也准备问一下我们的老师,看他是怎么解决的,如果解决了后续我再将解决办法加进来。

五、最后我想说

本期ZooKeeper实验就到这里结束了,马上我们将开始HBase的实验了,还请大家敬请关注,谢谢!

相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
目录
相关文章
|
2月前
|
安全 开发者 Python
揭秘Python IPC:进程间的秘密对话,让你的系统编程更上一层楼
【9月更文挑战第8天】在系统编程中,进程间通信(IPC)是实现多进程协作的关键技术。IPC机制如管道、队列、共享内存和套接字,使进程能在独立内存空间中共享信息,提升系统并发性和灵活性。Python提供了丰富的IPC工具,如`multiprocessing.Pipe()`和`multiprocessing.Queue()`,简化了进程间通信的实现。本文将从理论到实践,详细介绍各种IPC机制的特点和应用场景,帮助开发者构建高效、可靠的多进程应用。掌握Python IPC,让系统编程更加得心应手。
32 4
|
2月前
|
Linux C语言
C语言 多进程编程(三)信号处理方式和自定义处理函数
本文详细介绍了Linux系统中进程间通信的关键机制——信号。首先解释了信号作为一种异步通知机制的特点及其主要来源,接着列举了常见的信号类型及其定义。文章进一步探讨了信号的处理流程和Linux中处理信号的方式,包括忽略信号、捕捉信号以及执行默认操作。此外,通过具体示例演示了如何创建子进程并通过信号进行控制。最后,讲解了如何通过`signal`函数自定义信号处理函数,并提供了完整的示例代码,展示了父子进程之间通过信号进行通信的过程。
|
2月前
|
Linux C语言
C语言 多进程编程(四)定时器信号和子进程退出信号
本文详细介绍了Linux系统中的定时器信号及其相关函数。首先,文章解释了`SIGALRM`信号的作用及应用场景,包括计时器、超时重试和定时任务等。接着介绍了`alarm()`函数,展示了如何设置定时器以及其局限性。随后探讨了`setitimer()`函数,比较了它与`alarm()`的不同之处,包括定时器类型、精度和支持的定时器数量等方面。最后,文章讲解了子进程退出时如何利用`SIGCHLD`信号,提供了示例代码展示如何处理子进程退出信号,避免僵尸进程问题。
|
2月前
|
消息中间件 Unix Linux
C语言 多进程编程(五)消息队列
本文介绍了Linux系统中多进程通信之消息队列的使用方法。首先通过`ftok()`函数生成消息队列的唯一ID,然后使用`msgget()`创建消息队列,并通过`msgctl()`进行操作,如删除队列。接着,通过`msgsnd()`函数发送消息到消息队列,使用`msgrcv()`函数从队列中接收消息。文章提供了详细的函数原型、参数说明及示例代码,帮助读者理解和应用消息队列进行进程间通信。
|
2月前
|
缓存 Linux C语言
C语言 多进程编程(六)共享内存
本文介绍了Linux系统下的多进程通信机制——共享内存的使用方法。首先详细讲解了如何通过`shmget()`函数创建共享内存,并提供了示例代码。接着介绍了如何利用`shmctl()`函数删除共享内存。随后,文章解释了共享内存映射的概念及其实现方法,包括使用`shmat()`函数进行映射以及使用`shmdt()`函数解除映射,并给出了相应的示例代码。最后,展示了如何在共享内存中读写数据的具体操作流程。
|
2月前
|
消息中间件 Unix Linux
C语言 多进程编程(二)管道
本文详细介绍了Linux下的进程间通信(IPC),重点讨论了管道通信机制。首先,文章概述了进程间通信的基本概念及重要性,并列举了几种常见的IPC方式。接着深入探讨了管道通信,包括无名管道(匿名管道)和有名管道(命名管道)。无名管道主要用于父子进程间的单向通信,有名管道则可用于任意进程间的通信。文中提供了丰富的示例代码,展示了如何使用`pipe()`和`mkfifo()`函数创建管道,并通过实例演示了如何利用管道进行进程间的消息传递。此外,还分析了管道的特点、优缺点以及如何通过`errno`判断管道是否存在,帮助读者更好地理解和应用管道通信技术。
|
2月前
|
Linux C语言
C语言 多进程编程(七)信号量
本文档详细介绍了进程间通信中的信号量机制。首先解释了资源竞争、临界资源和临界区的概念,并重点阐述了信号量如何解决这些问题。信号量作为一种协调共享资源访问的机制,包括互斥和同步两方面。文档还详细描述了无名信号量的初始化、等待、释放及销毁等操作,并提供了相应的 C 语言示例代码。此外,还介绍了如何创建信号量集合、初始化信号量以及信号量的操作方法。最后,通过实际示例展示了信号量在进程互斥和同步中的应用,包括如何使用信号量避免资源竞争,并实现了父子进程间的同步输出。附带的 `sem.h` 和 `sem.c` 文件提供了信号量操作的具体实现。
|
1月前
|
存储 机器学习/深度学习 分布式计算
大数据技术——解锁数据的力量,引领未来趋势
【10月更文挑战第5天】大数据技术——解锁数据的力量,引领未来趋势
|
8天前
|
存储 分布式计算 数据挖掘
数据架构 ODPS 是什么?
数据架构 ODPS 是什么?
62 7
|
8天前
|
存储 分布式计算 大数据
大数据 优化数据读取
【11月更文挑战第4天】
23 2
下一篇
无影云桌面