ZeroMQ(java)中组件间数据传输(Pipe的实现)

本文涉及的产品
数据传输服务 DTS,同步至DuckDB 3个月
简介: 在ZeroMQ(java)中,整个IO的处理流程都是分层来进行的,当然处于最下端的肯定是前面介绍过的poller以及StreamEngin了。。。。涉及到上层的话就还有session,以及socket,先用一张图来大概的描述一下整个层次关系吧。

在ZeroMQ(java)中,整个IO的处理流程都是分层来进行的,当然处于最下端的肯定是前面介绍过的poller以及StreamEngin了。。。。涉及到上层的话就还有session,以及socket,先用一张图来大概的描述一下整个层次关系吧。。



整个分层的结构大概就是这样吧,其中poller与StreamEngin是怎么交互的,这个就不说饿了吧,然后Session这个怎么与session之间交互呢,这个以后再说吧,其实在streamEngin里面有自己的session引用。。反正这里没啥意思。。主要就在与Session怎么与自己所属的Socket进行交互,当从最底层接收到数据之后,session如何交给上层的socket,让其来处理。。。这里就涉及到了Pipe,也就是session与自己所属的socket之间是通过pipe来进行数据传递的。。。

那么在具体的分析session与socket之前就来看看这个Pipe是怎么工作的吧,先来大概的看看它的类图:



这里可以看到Pipe继承自ZObject类型,那么可以知道Pipe可以发送,接受以及执行命令,同时也就意味着Pipe也需要由自己关联的IO线程才行,或者说有关联的mailbox。。。不过这个也不是强制的,以后再分析Socket的pipe的时候,就会发现它的pipe关联到socket自己的mailbox,但是socket的mailbox没有注册到任何的poller上面去,也就是它并没有在任何IO线程里执行,最后其实是在用户代码的线程中运行的。。。。好了。好像闲话说的比较多了。。用一张图来刻画一下Pipe是怎么运行的吧:



其实通过这张图形就已经将Pipe的运行原理基本描述出来了,pipe的两端都分别关联了两个YPipe(可以将其理解为队列)对象,例如左边将其中一个YPipe当做写端,那么在另外一边就将其看成是读端。。。

这里的YPipe对象可以将其理解为队列,至于说具体的实现,底层确实是队列,只不过是自己实现的,而且实现的还挺繁琐的,就不细说了,不过这里有向吐槽的地方,明明concurrent库中有无锁的队列ConcurrentLinkedList,在并发环境下有很好的性能,干嘛不在这个基础上进行扩展。。。。

这里另外还要看看在ZeroMQ中,也定义的有Pipe类型自己的事件回调,其定义如下:

[java]  view plain copy 在CODE上查看代码片 派生到我的代码片
  1. public interface IPipeEvents {  
  2.     void read_activated(Pipe pipe);  //有数据可以读取  
  3.     void write_activated(Pipe pipe);  //当前pipe有数据写  
  4.     void hiccuped(Pipe pipe);   //对面的pipe替换掉了读端,也就是当前需要替换写段的时候的回调  
  5.     void terminated(Pipe pipe);  //当前pipe停止的回调  
  6. }  

具体每个方法是干嘛用的注释应该说的很清楚了。。那么接下来来看看Pipe的两端是怎么进行交互的吧,首先看如何发送数据到pipe的另外一端:

[java]  view plain copy 在CODE上查看代码片 派生到我的代码片
  1. //从写端写数据局,发送给pipe的另外一端  
  2. public boolean write (Msg msg_)  {  
  3.     if (!check_write ())  
  4.         return false;  
  5.   
  6.     boolean more = msg_.has_more();  
  7.     outpipe.write (msg_, more);  
  8.   
  9.     if (!more)  
  10.         msgs_written++;   //已经读取的msg的计数  
  11.   
  12.     return true;  
  13. }  

其实这里直接就是在写端,将数据写到队列里面去就好了,那么如何通知对面当前有数据发送过来了呢,来看另外一个方法:

[java]  view plain copy 在CODE上查看代码片 派生到我的代码片
  1. //其实这里主要是给对面的pipe发送activate_read命令,表示它可以读了  
  2. public void flush () {  
  3.     //  The peer does not exist anymore at this point.  
  4.     if (state == State.terminating)  
  5.         return;  
  6.   
  7.     if (outpipe != null && !outpipe.flush ()) {  
  8.         send_activate_read (peer);  //向对面发送可以读取的命令  
  9.     }   
  10. }  

这个,如果看了ZObject就应该很清楚了吧,直接给命令的另外一端发送activate_read类型的命令,那么这个命令最终将会被pipe的另外一端所关联的mailbox收到,从而对面的Pipe将会在其IO线程中执行命令,对于这个命令,进行的操作是process_activate_read方法,那么来看看Pipe中这个方法的的定义吧:

[java]  view plain copy 在CODE上查看代码片 派生到我的代码片
  1. //收到命令,表示底层的pipe有数据可以读取了,这里主要是要调用事件回调,通知上层的代码,pipe有数据可以读取了  
  2. protected void process_activate_read () {  
  3.     if (!in_active && (state == State.active || state == State.pending)) {  
  4.         in_active = true;  
  5.         sink.read_activated (this);  //调用事件回调  
  6.     }  
  7. }  

这里其实就是调用当前的pipe的事件回调,来处理当前的pipe对象,其实也就是通知上层的代码,当前pipe有数据可以读了,让其进行处理。。。。

好了,那么到这里整个Pipe的运行原理就算比较的清楚了。。。

不过自己不太明白,在java中这种数据的传递明明很简单就可以实现,干嘛要搞的这么复杂。。。不过这里也有一个好处,就是将每一个对象的方法的执行都封闭在了自己的IO线程内部。。。也算是一种线程封闭原则的实现吧。。。其余的好处,好像没啥好处,而且真的觉得略繁琐。。。。

若转载请注明出处!若有疑问,请回复交流!
相关实践学习
自建数据库迁移到云数据库
本场景将引导您将网站的自建数据库平滑迁移至云数据库RDS。通过使用RDS,您可以获得稳定、可靠和安全的企业级数据库服务,可以更加专注于发展核心业务,无需过多担心数据库的管理和维护。
Sqoop 企业级大数据迁移方案实战
Sqoop是一个用于在Hadoop和关系数据库服务器之间传输数据的工具。它用于从关系数据库(如MySQL,Oracle)导入数据到Hadoop HDFS,并从Hadoop文件系统导出到关系数据库。 本课程主要讲解了Sqoop的设计思想及原理、部署安装及配置、详细具体的使用方法技巧与实操案例、企业级任务管理等。结合日常工作实践,培养解决实际问题的能力。本课程由黑马程序员提供。
目录
相关文章
|
8月前
|
Java 数据安全/隐私保护 索引
(Java)Java里JFrame窗体的基本操作(组件篇-3)
回顾 说过了下拉框和下拉列表,本篇内容将了解滚动面板和各类输入框 什么是组件? 如果不熟悉组件,可以将组件看作是某个Form表单中的表单元素,组件可以完善JFrame窗口的布局,以及一些功能; 本篇内容中所有的组件所用到的类全都来自于javax.swing这个包中,记得引入; 另:在使用组件前,请先将窗体中的内容类给实例化出来进行操作,代码如下: Container c = getContentPane(); JScollPane JScollPane类,说的就是滚动面板。它不同于其他组件,它是一个容器,
171 2
|
8月前
|
Java 索引 容器
(Java)Java里JFrame窗体的基本操作(组件篇-2)
回顾 这算是JFrame窗体基本操作的组件第二篇了,上一篇说过了单选框,复选框,按钮。 在这一篇中,我会说明下拉框和列表框 什么是组件? 自行百度:java中JFrame窗体里的组件是什么? 名字都给你想好了 ( :I ) JComBox下拉框 JComBox类,就是下拉框,实例化出来后,要填写泛型<>,添加是什么数据类型的内容,泛型中就写上该数据类型; JComboBox<?> combox = new JComboBox<>(); 以下实例化下拉框的时候我泛型写的全
210 1
|
8月前
|
Java 容器
(Java)Java里JFrame窗体的基础操作(组件-1)
如果不熟悉组件,可以将组件看作是某个Form表单中的表单元素,当然这只是在不熟悉组件的前提下。如果深入了解了组件,那么它有着非常多的组件,这些组件可以完善JFrame窗口的布局,以及一些功能; 本篇内容中所有的组件所用到的类全都来自于javax.swing这个包中,记得引入
168 1
|
10月前
|
消息中间件 Java Kafka
Java 事件驱动架构设计实战与 Kafka 生态系统组件实操全流程指南
本指南详解Java事件驱动架构与Kafka生态实操,涵盖环境搭建、事件模型定义、生产者与消费者实现、事件测试及高级特性,助你快速构建高可扩展分布式系统。
457 7
|
12月前
|
Java 数据库连接 数据库
Java 组件详细使用方法与封装实战指南
本指南详解Java核心组件使用与封装技巧,涵盖跨平台开发、面向对象编程、多线程、数据库操作等关键内容,并提供工具类、连接池、异常及响应结果的封装方法。结合Spring框架、MyBatis、Spring Boot等主流技术,助你掌握高质量Java组件设计与开发实践。
309 2
|
存储 消息中间件 安全
JUC组件实战:实现RRPC(Java与硬件通过MQTT的同步通信)
【10月更文挑战第9天】本文介绍了如何利用JUC组件实现Java服务与硬件通过MQTT的同步通信(RRPC)。通过模拟MQTT通信流程,使用`LinkedBlockingQueue`作为消息队列,详细讲解了消息发送、接收及响应的同步处理机制,包括任务超时处理和内存泄漏的预防措施。文中还提供了具体的类设计和方法实现,帮助理解同步通信的内部工作原理。
JUC组件实战:实现RRPC(Java与硬件通过MQTT的同步通信)
|
Java
如何在 Java 中处理“Broken Pipe”异常
在Java中处理“Broken Pipe”异常,通常发生在网络通信中,如Socket编程时。该异常表示写入操作的另一端已关闭连接。解决方法包括:检查网络连接、设置超时、使用try-catch捕获异常并进行重试或关闭资源。
1275 5
|
数据采集 Java Linux
Java“Broken Pipe”解决
Java中遇到“Broken Pipe”错误通常是因为Socket连接被远程主机关闭,而本地程序仍在尝试写入数据。解决方法包括:1. 检查网络连接和防火墙设置;2. 增加超时设置;3. 使用异常处理捕获并重试。
3362 4
|
人工智能 自然语言处理 Java
Spring AI,Spring团队开发的新组件,Java工程师快来一起体验吧
文章介绍了Spring AI,这是Spring团队开发的新组件,旨在为Java开发者提供易于集成的人工智能API,包括机器学习、自然语言处理和图像识别等功能,并通过实际代码示例展示了如何快速集成和使用这些AI技术。
14000 4
Spring AI,Spring团队开发的新组件,Java工程师快来一起体验吧
|
Java 数据安全/隐私保护 容器
java当中组件和窗口的相容问题(里面包含了这些方法的作用)
Java窗口和组件的布局指南,教你如何打造一个既美观又实用的GUI界面。
196 0