ZeroMQ(java)之Router/Dealer模式

简介: 本教程转自:http://blog.csdn.net/kobejayandy/article/details/20163527在开始之前先把guid里面提到的几个ZeroMQ的特性列一下吧:(1)ZeroMQ有自己的I/O线程来异步的处理I/O,而且后台采用了无锁的数据结构(2)在ZeroMQ中,所有的组件都可以动态的加入和移除,而且可以启动组件以任何的顺利,例如我们可以先启动request,再启动response,依然可以工作,而且还会自动的重连接。

 本教程转自:http://blog.csdn.net/kobejayandy/article/details/20163527

在开始之前先把guid里面提到的几个ZeroMQ的特性列一下吧:

(1)ZeroMQ有自己的I/O线程来异步的处理I/O,而且后台采用了无锁的数据结构

(2)在ZeroMQ中,所有的组件都可以动态的加入和移除,而且可以启动组件以任何的顺利,例如我们可以先启动request,再启动response,依然可以工作,而且还会自动的重连接。

(3)如果有需要的话,会自动的将message进行排队,当然这都是有一套的模式的,一般情况下会尽量早的将数据发送到receiver。

(4)当缓冲的message队列满了以后,ZeroMQ有自己的行为,有的组件会阻塞,有的则会将message抛弃。

(5)底层的通信可以采用各种各样的都行,例如TCP,IPC啥的。

(6)它会自动的处理那些比较慢而且阻塞的reader

(7)支持message的路由

(8)ZeroMQ确保全部的数据被receiver接收到,例如发送10K,那么也接受到10K

(9)它发送的数据格式是二进制,所以对发送的内容无要求

(10)ZeroMQ会自动处理网络错误,而且会自动尝试恢复

(11)节能。。。(我擦,居然还有这个)


好了,先来看一下poller这个东西吧,蛮有意思的,类似与epoll或者java里面的selector,

在前面的例子中我们都只是创建一个socket,那如果我们要创建两个socket在同一个线程中该怎么处理呢,那么这个时候就可以用到poller这东西了。。。可以将已经建立好的socket注册到poller上面去,并注册相应的事件。。

这里就用push/pull来举例子吧,就直接来看pull端的代码吧:

[java]  view plain copy 在CODE上查看代码片 派生到我的代码片
  1. package poller;  
  2.   
  3. import org.zeromq.ZMQ;  
  4.   
  5. public class Pull {  
  6.     public static void main(String args[]) {  
  7.         ZMQ.Context context = ZMQ.context(1);  
  8.           
  9.         ZMQ.Socket pull1 = context.socket(ZMQ.PULL);  //创建一个pull  
  10.         pull1.connect("tcp://127.0.0.1:5555");    //建立于push的连接  
  11.         ZMQ.Socket pull2 = context.socket(ZMQ.PULL);  
  12.         pull2.connect("tcp://127.0.0.1:5555");  
  13.           
  14.         ZMQ.Poller poller = new ZMQ.Poller(2);  //创建一个大小为2的poller  
  15.         poller.register(pull1, ZMQ.Poller.POLLIN);  //分别将上述的pull注册到poller上,注册的事件是读  
  16.         poller.register(pull2, ZMQ.Poller.POLLIN);  
  17.         int i = 0;  
  18.         while (!Thread.currentThread().isInterrupted()) {  
  19.             poller.poll();  
  20.             if (poller.pollin(0)) {  
  21.                 while (null != pull1.recv(ZMQ.NOBLOCK)) {  //这里采用了非阻塞,确保一次性将队列中的数据读取完  
  22.                     i++;  
  23.                 }  
  24.                   
  25.             }  
  26.             if (poller.pollin(1)) {  
  27.                 while (null != pull2.recv(ZMQ.NOBLOCK)) {  
  28.                     i++;  
  29.                 }  
  30.                   
  31.                   
  32.             }  
  33.             if (i % 10000000 == 0) {  
  34.                 System.out.println(i);  
  35.             }  
  36.         }  
  37.         pull1.close();  
  38.         pull2.close();  
  39.         context.term();  
  40.           
  41.     }  
  42. }  

这里还算简单吧,同时创建了两个pull,都将他们注册到了poller上面去。。。其实这个样子很像是selector或者epoll啥的。。。

好啦,接下来进入正题:

request/response算是一种非常常用的模式了,但是前面的例子中,我们的response端都只能在单线程中运行,因为必须要recv与send配对使用,那么就很大程度上限制了response的伸缩性,如果有大量的request来提交很多request请求的话,那么性能将会受到极大的限制,当然这种情况下我们可以采用如下的方式来解决:



这里让request同时连接到多个response,这里就可以将request请求分散到多个response,这样当有多个request的时候的性能要求。。。但是有一个问题,如果我们又10个request端,他们每一个都不断的提交request请求,这个时候我们的reponse可能就会很忙,那么在这种结构下无法动态的添加response,依然限制了整个系统的伸缩性。。。

那么最终的解决方案就来了:



这里可以看到,在request端与response端之间加了一个中间层,可以将其看成一个路由器,它将request端的请求路由到response端,如果性能不够的话,可以再建立新的response将其连接到中间层就可以了,就方便的解决系统的伸缩性问题了。。。

好了,这里直接就上中间层与response端的代码吧:

[java]  view plain copy 在CODE上查看代码片 派生到我的代码片
  1. package multireqrep;  
  2.   
  3. import org.zeromq.ZMQ;  
  4.   
  5. public class Response {  
  6.     public static void main(String args[]) {  
  7.         final ZMQ.Context context = ZMQ.context(1);  
  8.         ZMQ.Socket router = context.socket(ZMQ.ROUTER);  
  9.         ZMQ.Socket dealer = context.socket(ZMQ.DEALER);  
  10.           
  11.         router.bind("ipc://fjs1");  
  12.         dealer.bind("ipc://fjs2");  
  13.           
  14.         for (int i = 0; i < 20; i++) {  
  15.             new Thread(new Runnable(){  
  16.   
  17.                 public void run() {  
  18.                     // TODO Auto-generated method stub  
  19.       
  20.                     ZMQ.Socket response = context.socket(ZMQ.REP);  
  21.                     response.connect("ipc://fjs2");  
  22.                     while (!Thread.currentThread().isInterrupted()) {  
  23.                         response.recv();  
  24.                         response.send("hello".getBytes());  
  25.                         try {  
  26.                             Thread.currentThread().sleep(1);  
  27.                         } catch (InterruptedException e) {  
  28.                             // TODO Auto-generated catch block  
  29.                             e.printStackTrace();  
  30.                         }  
  31.                     }  
  32.                     response.close();  
  33.                 }  
  34.                   
  35.             }).start();  
  36.         }  
  37.         ZMQ.proxy(router, dealer, null);  
  38.         router.close();  
  39.         dealer.close();  
  40.         context.term();  
  41.     }  
  42. }  

好吧,代码还算蛮简单的,直接用了ZeroMQ定义的router和dealer组件,以及内置的proxy方法就好了。。。


嗯,再来赞叹一次,ZeroMQ确实好用。。。

若转载请注明出处!若有疑问,请回复交流!
目录
相关文章
|
3月前
|
存储 Java 开发者
【Java新纪元启航】JDK 22:解锁未命名变量与模式,让代码更简洁,思维更自由!
【9月更文挑战第7天】JDK 22带来的未命名变量与模式匹配的结合,是Java编程语言发展历程中的一个重要里程碑。它不仅简化了代码,提高了开发效率,更重要的是,它激发了我们对Java编程的新思考,让我们有机会以更加自由、更加创造性的方式解决问题。随着Java生态系统的不断演进,我们有理由相信,未来的Java将更加灵活、更加强大,为开发者们提供更加广阔的舞台。让我们携手并进,共同迎接Java新纪元的到来!
70 11
|
3月前
|
设计模式 Java
Java设计模式-工厂方法模式(4)
Java设计模式-工厂方法模式(4)
|
4月前
|
消息中间件 Java
【实战揭秘】如何运用Java发布-订阅模式,打造高效响应式天气预报App?
【8月更文挑战第30天】发布-订阅模式是一种消息通信模型,发送者将消息发布到公共队列,接收者自行订阅并处理。此模式降低了对象间的耦合度,使系统更灵活、可扩展。例如,在天气预报应用中,`WeatherEventPublisher` 类作为发布者收集天气数据并通知订阅者(如 `TemperatureDisplay` 和 `HumidityDisplay`),实现组件间的解耦和动态更新。这种方式适用于事件驱动的应用,提高了系统的扩展性和可维护性。
76 2
|
4月前
|
Java
"揭秘Java IO三大模式:BIO、NIO、AIO背后的秘密!为何AIO成为高并发时代的宠儿,你的选择对了吗?"
【8月更文挑战第19天】在Java的IO编程中,BIO、NIO与AIO代表了三种不同的IO处理机制。BIO采用同步阻塞模型,每个连接需单独线程处理,适用于连接少且稳定的场景。NIO引入了非阻塞性质,利用Channel、Buffer与Selector实现多路复用,提升了效率与吞吐量。AIO则是真正的异步IO,在JDK 7中引入,通过回调或Future机制在IO操作完成后通知应用,适合高并发场景。选择合适的模型对构建高效网络应用至关重要。
92 2
|
4月前
|
设计模式 XML 存储
【二】设计模式~~~创建型模式~~~工厂方法模式(Java)
文章详细介绍了工厂方法模式(Factory Method Pattern),这是一种创建型设计模式,用于将对象的创建过程委托给多个工厂子类中的某一个,以实现对象创建的封装和扩展性。文章通过日志记录器的实例,展示了工厂方法模式的结构、角色、时序图、代码实现、优点、缺点以及适用环境,并探讨了如何通过配置文件和Java反射机制实现工厂的动态创建。
【二】设计模式~~~创建型模式~~~工厂方法模式(Java)
|
4月前
|
设计模式 XML Java
【一】设计模式~~~创建型模式~~~简单工厂模式(Java)
文章详细介绍了简单工厂模式(Simple Factory Pattern),这是一种创建型设计模式,用于根据输入参数的不同返回不同类的实例,而客户端不需要知道具体类名。文章通过图表类的实例,展示了简单工厂模式的结构、时序图、代码实现、优缺点以及适用环境,并提供了Java代码示例和扩展应用,如通过配置文件读取参数来实现对象的创建。
【一】设计模式~~~创建型模式~~~简单工厂模式(Java)
|
3月前
|
JSON Java UED
uniapp:使用DCloud的uni-push推送消息通知(在线模式)java实现
以上展示了使用Java结合DCloud的uni-push进行在线消息推送的基本步骤和实现方法。实际部署时,可能需要依据实际项目的规模,业务场景及用户基数进行必要的调整和优化,确保消息推送机制在保证用户体验的同时也满足业务需求。
213 0
|
5月前
|
设计模式 监控 Java
Java中的并发编程模式与最佳实践
随着多核处理器的普及,充分利用并发和多线程成为提高软件性能的关键。Java语言通过其丰富的并发API提供了强大的支持,使得开发者能够构建高效、可靠的并发应用程序。本文深入探讨了Java并发编程的核心概念、设计模式以及在实际开发中的最佳实践,旨在帮助读者更好地理解和掌握Java并发编程,从而编写出更加高效、稳定的应用程序。
|
5月前
|
设计模式 Java 开发者
Java中的异常处理与断路器模式
Java中的异常处理与断路器模式
|
6月前
|
消息中间件 存储 负载均衡
Java中的异步消息传递模式
Java中的异步消息传递模式