ZeroMQ(java)之负载均衡

本文涉及的产品
传统型负载均衡 CLB,每月750个小时 15LCU
EMR Serverless StarRocks,5000CU*H 48000GB*H
应用型负载均衡 ALB,每月750个小时 15LCU
简介: 我们在实际的应用中最常遇到的场景如下:A向B发送请求,B向A返回结果。。。。但是这种场景就会很容易变成这个样子:很多A向B发送请求,所以B要不断的处理这些请求,所以就会很容易想到对B进行扩展,由多个B来处理这些请求,那么这里就出现了另外一个问题:B对请求处理的速度可能不同,那么B之间他们的负载也是不同的,那么应该如何对请求进行分发就成了一个比较重要的问题。

我们在实际的应用中最常遇到的场景如下:


A向B发送请求,B向A返回结果。。。。

但是这种场景就会很容易变成这个样子:


很多A向B发送请求,所以B要不断的处理这些请求,所以就会很容易想到对B进行扩展,由多个B来处理这些请求,那么这里就出现了另外一个问题:

B对请求处理的速度可能不同,那么B之间他们的负载也是不同的,那么应该如何对请求进行分发就成了一个比较重要的问题。。。也就变成了负载均衡的问题了。。。

其实最好的负载均衡解决方案也很简单:

绝大多数的任务都是独立的,这里中间层可以将A发送过来的请求先缓存起来,然后B的行为就是主动的找中间层获取请求处理,然后返回,再获取。。。。也就是中间层只是做一个请求的缓存。。。由B自己来掌控合适来处理请求,也就是当B已经处理完了任务之后,自己去主动获取。。。而不是由中间层自己去主动分发。。。。

嗯,那么在ZeroMQ中应该如何实现这种模式呢,恩其实还挺简单的,如下图:



这里由两个Router来作为中间层,具体的数据流程如下:

(1)中间层启动,Worker连接Backend,向其发送Request请求(ready),这个时候中间层就能够知道哪一个worker现在是空闲的,将其保存起来(放到worker队列),可以处理请求

worker的执行流程就是send(发送ready)--->recv(获取请求),

(2)Client端向Fronted发送请求,中间层将请求缓存到一个任务队列

(3)中间层从任务队里里面取出一个任务,将其发送给worker队列中的一个worker,并将其从woker队列中移除

(4)worker处理完以后,发送执行结果,也就是send,中间层收到woker的数据 之后,将其发送给相应的client,然后在讲这个worker放到worker队列中,表示当前这个worker可用。。。。


好了,前面就基本上介绍了整个结构用ZeroMQ应该是怎么实现的,那么接下来就直接来上代码吧:

[java]  view plain copy 在CODE上查看代码片 派生到我的代码片
  1. package balance;  
  2.   
  3. import java.util.LinkedList;  
  4.   
  5. import org.zeromq.ZFrame;  
  6. import org.zeromq.ZMQ;  
  7. import org.zeromq.ZMsg;  
  8.   
  9. public class Balance {  
  10.       
  11.     public static class Client {  
  12.         public void start() {  
  13.             new Thread(new Runnable(){  
  14.   
  15.                 public void run() {  
  16.                     // TODO Auto-generated method stub  
  17.                     ZMQ.Context context = ZMQ.context(1);  
  18.                     ZMQ.Socket socket = context.socket(ZMQ.REQ);  
  19.                       
  20.                     socket.connect("ipc://front");  //连接router,想起发送请求  
  21.                       
  22.                     for (int i = 0; i < 1000; i++) {  
  23.                         socket.send("hello".getBytes(), 0);  //发送hello请求  
  24.                         String bb = new String(socket.recv());  //获取返回的数据  
  25.                         System.out.println(bb);   
  26.                     }  
  27.                     socket.close();  
  28.                     context.term();  
  29.                 }  
  30.                   
  31.             }).start();  
  32.         }  
  33.     }  
  34.       
  35.     public static class Worker {  
  36.         public void start() {  
  37.             new Thread(new Runnable(){  
  38.   
  39.                 public void run() {  
  40.                     // TODO Auto-generated method stub  
  41.                     ZMQ.Context context = ZMQ.context(1);  
  42.                     ZMQ.Socket socket = context.socket(ZMQ.REQ);  
  43.                       
  44.                     socket.connect("ipc://back");  //连接,用于获取要处理的请求,并发送回去处理结果  
  45.                       
  46.                     socket.send("ready".getBytes());  //发送ready,表示当前可用  
  47.                        
  48.                     while (!Thread.currentThread().isInterrupted()) {  
  49.                         ZMsg msg = ZMsg.recvMsg(socket);  //获取需要处理的请求,其实这里msg最外面的标志frame是router对分配给client的标志frame  
  50.                         ZFrame request = msg.removeLast();   //最后一个frame其实保存的就是实际的请求数据,这里将其移除,待会用新的frame代替  
  51.                         ZFrame frame = new ZFrame("hello fjs".getBytes());    
  52.                         msg.addLast(frame);  //将刚刚创建的frame放到msg的最后,worker将会收到  
  53.                         msg.send(socket);  //将数据发送回去  
  54.                           
  55.                     }  
  56.                     socket.close();  
  57.                     context.term();  
  58.                 }  
  59.                   
  60.             }).start();  
  61.         }  
  62.     }  
  63.       
  64.     public static class Middle {  
  65.         private LinkedList<ZFrame> workers;  
  66.         private LinkedList<ZMsg> requests;  
  67.         private ZMQ.Context context;  
  68.         private ZMQ.Poller poller;  
  69.           
  70.         public Middle() {  
  71.             this.workers = new LinkedList<ZFrame>();  
  72.             this.requests = new LinkedList<ZMsg>();  
  73.             this.context = ZMQ.context(1);  
  74.             this.poller = new ZMQ.Poller(2);  
  75.         }  
  76.           
  77.         public void start() {  
  78.             ZMQ.Socket fronted = this.context.socket(ZMQ.ROUTER);  //创建一个router,用于接收client发送过来的请求,以及向client发送处理结果  
  79.             ZMQ.Socket backend = this.context.socket(ZMQ.ROUTER);  //创建一个router,用于向后面的worker发送数据,然后接收处理的结果  
  80.               
  81.             fronted.bind("ipc://front");  //监听,等待client的连接  
  82.             backend.bind("ipc://back");  //监听,等待worker连接  
  83.               
  84.             //创建pollItem  
  85.             ZMQ.PollItem fitem = new ZMQ.PollItem(fronted, ZMQ.Poller.POLLIN);    
  86.             ZMQ.PollItem bitem = new ZMQ.PollItem(backend, ZMQ.Poller.POLLIN);  
  87.               
  88.             this.poller.register(fitem);  //注册pollItem  
  89.             this.poller.register(bitem);  
  90.               
  91.               
  92.             while (!Thread.currentThread().isInterrupted()) {  
  93.                 this.poller.poll();  
  94.                 if (fitem.isReadable()) {  //表示前面有请求发过来了  
  95.                     ZMsg msg = ZMsg.recvMsg(fitem.getSocket());  //获取client发送过来的请求,这里router会在实际请求上面套一个连接的标志frame  
  96.                     this.requests.addLast(msg);   //将其挂到请求队列  
  97.                 }  
  98.                 if (bitem.isReadable()) {  //这里表示worker发送数据过来了  
  99.                     ZMsg msg = ZMsg.recvMsg(bitem.getSocket());  //获取msg,这里也会在实际发送的数据前面包装一个连接的标志frame  
  100.                     //这里需要注意,这里返回的是最外面的那个frame,另外它还会将后面的接着的空的标志frame都去掉  
  101.                     ZFrame workerID = msg.unwrap();  //把外面那层包装取下来,也就是router对连接的标志frame  
  102.                     this.workers.addLast(workerID);  //将当前的worker的标志frame放到worker队列里面,表示这个worker可以用了  
  103.                     ZFrame readyOrAddress = msg.getFirst(); //这里获取标志frame后面的数据,如果worker刚刚启动,那么应该是发送过来的ready,  
  104.                       
  105.                       
  106.                     if (new String(readyOrAddress.getData()).equals("ready")) {  //表示是worker刚刚启动,发过来的ready  
  107.                         msg.destroy();  
  108.                     } else {  
  109.                         msg.send(fronted);  //表示是worker处理完的返回结果,那么返回给客户端  
  110.                     }  
  111.                 }  
  112.                   
  113.                 while (this.workers.size() > 0 && this.requests.size() > 0) {  
  114.                     ZMsg request = this.requests.removeFirst();  
  115.                     ZFrame worker = this.workers.removeFirst();  
  116.                       
  117.                     request.wrap(worker);  //在request前面包装一层,把可以用的worker的标志frame包装上,这样router就会发给相应的worker的连接  
  118.                     request.send(backend);  //将这个包装过的消息发送出去  
  119.                 }  
  120.                   
  121.             }  
  122.             fronted.close();  
  123.             backend.close();  
  124.             this.context.term();  
  125.         }  
  126.     }  
  127.       
  128.       
  129.     public static void main(String args[]) {  
  130.         Worker worker = new Worker();  
  131.         worker.start();  
  132.         Client client = new Client();  
  133.         client.start();  
  134.         Middle middle = new Middle();  
  135.         middle.start();  
  136.           
  137.     }  
  138. }  

其实根据前面已经提出来的实现原理来编写代码还是比较顺利的,中途也没有遇到什么问题。。。不过要理解这部分要比较了解ZeroMQ的数据格式才行

若转载请注明出处!若有疑问,请回复交流!
相关实践学习
SLB负载均衡实践
本场景通过使用阿里云负载均衡 SLB 以及对负载均衡 SLB 后端服务器 ECS 的权重进行修改,快速解决服务器响应速度慢的问题
负载均衡入门与产品使用指南
负载均衡(Server Load Balancer)是对多台云服务器进行流量分发的负载均衡服务,可以通过流量分发扩展应用系统对外的服务能力,通过消除单点故障提升应用系统的可用性。 本课程主要介绍负载均衡的相关技术以及阿里云负载均衡产品的使用方法。
目录
相关文章
|
11月前
|
负载均衡 Java
Java加权负载均衡策略
Java加权负载均衡策略
|
2月前
|
负载均衡 NoSQL 算法
一天五道Java面试题----第十天(简述Redis事务实现--------->负载均衡算法、类型)
这篇文章是关于Java面试中Redis相关问题的笔记,包括Redis事务实现、集群方案、主从复制原理、CAP和BASE理论以及负载均衡算法和类型。
一天五道Java面试题----第十天(简述Redis事务实现--------->负载均衡算法、类型)
|
4月前
|
负载均衡 Java 测试技术
性能测试与负载均衡:保证Java应用的稳定性
性能测试与负载均衡:保证Java应用的稳定性
|
4月前
|
缓存 负载均衡 Java
Java一分钟之-Spring Cloud Netflix Ribbon:客户端负载均衡
【6月更文挑战第9天】Spring Cloud Netflix Ribbon是客户端负载均衡器,用于服务间的智能路由。本文介绍了Ribbon的基本概念、快速入门步骤,包括添加依赖、配置服务调用和使用RestTemplate。此外,还讨论了常见问题,如服务实例选择不均、超时和重试设置不当、服务列表更新不及时,并提供了相应的解决策略。最后,展示了如何自定义负载均衡策略。理解并正确使用Ribbon能提升微服务架构的稳定性和效率。
162 3
|
3月前
|
负载均衡 监控 安全
解析Java中的服务治理与负载均衡
解析Java中的服务治理与负载均衡
|
3月前
|
负载均衡 算法 Java
实现高可用和可扩展的负载均衡系统的Java方法
实现高可用和可扩展的负载均衡系统的Java方法
|
5月前
|
负载均衡 Java 应用服务中间件
|
运维 负载均衡 网络协议
JAVA面试——负载均衡(二)
JAVA面试——负载均衡
295 0
JAVA面试——负载均衡(二)
|
负载均衡 前端开发 Java
JAVA面试——负载均衡(三)
JAVA面试——负载均衡
101 0
JAVA面试——负载均衡(三)
|
域名解析 缓存 负载均衡
JAVA面试——负载均衡(一)
JAVA面试——负载均衡
245 0
JAVA面试——负载均衡(一)