一.什么是消息队列
消息队列,英文简称是MQ,“消息队列”是在消息的传输过程中保存消息的容器。
二.原理
1.ArrayBockingQueue:
ArrayBlockingQueue是一个阻塞式的队列,继承自AbstractBlockingQueue,底层以数组的形式保存数据(实际上可看作一个循环数组)
ArrayBockingQueue使用场景:
1.先进先出队列:头是先进队的元素,尾是后进队的元素
2.有界队列:初始化时指定的容量,就是队列最大的容量,不会出现扩容,容量满,则阻塞进队操作;容量空,则阻塞出队操作
3.队列不支持空元素
阻塞式队列方法的四种形式:
2.Socket
创建一个Socket类的实例,用它来发送和接收字节流,发送时调用getOutputStream方法获取一个java.io.OutputStream对象,接收远程对象发送来的信息可以调用getInputStream方法来返回一个java.io.InputStream对象
3.SeverSocket
ServerSocket与Socket不同,ServerSocket是等待客户端的请求,一旦获得一个连接请求,就创建一个Socket示例来与客户端进行通信。
4.Java IO操作——BufferedReader
可以接收任意长度的数据,并且避免乱码的产生
5.java.io.PrintWriter
输出流、字符打印流
三.实现
1.例子目录结构
2.代码部分
消息处理中心 Broker
package org.example; import java.util.concurrent.ArrayBlockingQueue; public class Broker { private final static int MAX_SIZE = 3; private static ArrayBlockingQueue<String> messageQueue = new ArrayBlockingQueue<>(MAX_SIZE); public static void produce(String msg){ if(messageQueue.offer(msg)){ System.out.println("已成功向消息处理中心发送消息: " + msg + ",当前缓存的消息数量是:"+ messageQueue.size()); } else{ System.out.println("消息处理中心内暂存的消息达到最大负荷,不能继续放入消息!"); } System.out.println("-----------------------------"); } public static String consume(){ String msg = messageQueue.poll(); if(msg != null){ System.out.println("已经消费的消息:" + msg + ",当前暂存消息的数量是:" + messageQueue.size()); } else { System.out.println("消息处理中心内没有可供消费的消息!"); } System.out.println("-----------------------------"); return msg; } }
BrokerSever 用来提供Broker类的对外服务
(BrokerSever类实现Runnable接口,实现run方法。用new Thread(Runnable target).start()方法来启动)
package org.example; import java.io.BufferedReader; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.ServerSocket; import java.net.Socket; public class BrokerSever implements Runnable{ public static int SERVICE_PORT = 9999; private final Socket socket; public BrokerSever(Socket socket){ this.socket = socket; } @Override public void run() { try( BufferedReader in = new BufferedReader(new InputStreamReader( socket.getInputStream())); PrintWriter out = new PrintWriter(socket.getOutputStream())) { while (true){ String str = in.readLine(); if (str == null){ continue; } System.out.println("接收到原始数据: " + str); if (str.equals("CONSUME")){ String message = Broker.consume(); out.println(message); out.flush(); }else { Broker.produce(str); } } } catch (Exception e){ e.printStackTrace(); } } public static void main(String[] args) throws Exception{ ServerSocket server = new ServerSocket(SERVICE_PORT); while(true){ BrokerSever brokerServer = new BrokerSever(server.accept()); new Thread(brokerServer).start(); } } }
ProduceClient 消息生产者
package org.example; public class ProduceClient { public static void main(String[] args) throws Exception{ MyClient client = new MyClient(); client.produce("hello World."); } }
ConsumeClient 消息消费者
package org.example; public class ConsumeClient { public static void main(String[] args) throws Exception{ MyClient client = new MyClient(); String message = client.consume(); System.out.println("获得的消息为: " + message); } }
MyClient 与消息服务器进行通信
package org.example; import java.io.BufferedReader; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.InetAddress; import java.net.Socket; public class MyClient { public static void produce(String message) throws Exception{ Socket socket = new Socket(InetAddress.getLocalHost(),BrokerSever.SERVICE_PORT); try( PrintWriter out = new PrintWriter(socket.getOutputStream()) ){ out.println(message); out.flush(); } } public static String consume() throws Exception{ Socket socket = new Socket(InetAddress.getLocalHost(),BrokerSever.SERVICE_PORT); try( BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream())); PrintWriter out = new PrintWriter(socket.getOutputStream()) ){ out.println("CONSUME"); out.flush(); String message = in.readLine(); return message; } } }