项目实战16——消息队列的意义

简介: 项目实战16——消息队列的意义

一.什么是消息队列

消息队列,英文简称是MQ,“消息队列”是在消息的传输过程中保存消息的容器。

二.原理

1.ArrayBockingQueue:

ArrayBlockingQueue是一个阻塞式的队列,继承自AbstractBlockingQueue,底层以数组的形式保存数据(实际上可看作一个循环数组)

ArrayBockingQueue使用场景:

1.先进先出队列:头是先进队的元素,尾是后进队的元素

2.有界队列:初始化时指定的容量,就是队列最大的容量,不会出现扩容,容量满,则阻塞进队操作;容量空,则阻塞出队操作

3.队列不支持空元素

阻塞式队列方法的四种形式:

2.Socket

创建一个Socket类的实例,用它来发送和接收字节流,发送时调用getOutputStream方法获取一个java.io.OutputStream对象,接收远程对象发送来的信息可以调用getInputStream方法来返回一个java.io.InputStream对象

3.SeverSocket

ServerSocket与Socket不同,ServerSocket是等待客户端的请求,一旦获得一个连接请求,就创建一个Socket示例来与客户端进行通信。

4.Java IO操作——BufferedReader

可以接收任意长度的数据,并且避免乱码的产生

5.java.io.PrintWriter

输出流、字符打印流

三.实现

1.例子目录结构

2.代码部分

消息处理中心 Broker

package org.example;
import java.util.concurrent.ArrayBlockingQueue;
public class Broker {
    private final static int MAX_SIZE = 3;
    private static ArrayBlockingQueue<String> messageQueue = new ArrayBlockingQueue<>(MAX_SIZE);
    public static void produce(String msg){
        if(messageQueue.offer(msg)){
            System.out.println("已成功向消息处理中心发送消息: " + msg + ",当前缓存的消息数量是:"+ messageQueue.size());
        } else{
            System.out.println("消息处理中心内暂存的消息达到最大负荷,不能继续放入消息!");
        }
        System.out.println("-----------------------------");
    }
    public static String consume(){
        String msg = messageQueue.poll();
        if(msg != null){
            System.out.println("已经消费的消息:" + msg + ",当前暂存消息的数量是:" + messageQueue.size());
        } else {
            System.out.println("消息处理中心内没有可供消费的消息!");
        }
        System.out.println("-----------------------------");
        return msg;
    }
}

BrokerSever 用来提供Broker类的对外服务

(BrokerSever类实现Runnable接口,实现run方法。用new Thread(Runnable target).start()方法来启动)

package org.example;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
public class BrokerSever implements Runnable{
    public static int SERVICE_PORT = 9999;
    private final Socket socket;
    public BrokerSever(Socket socket){
        this.socket = socket;
    }
    @Override
    public void run() {
        try(
                BufferedReader in = new BufferedReader(new InputStreamReader(
                        socket.getInputStream()));
                PrintWriter out = new PrintWriter(socket.getOutputStream()))
        {
            while (true){
                String str = in.readLine();
                if (str == null){
                    continue;
                }
                System.out.println("接收到原始数据: " + str);
                if (str.equals("CONSUME")){
                    String message = Broker.consume();
                    out.println(message);
                    out.flush();
                }else {
                    Broker.produce(str);
                }
            }
        } catch (Exception e){
            e.printStackTrace();
        }
    }
    public static void main(String[] args) throws Exception{
        ServerSocket server = new ServerSocket(SERVICE_PORT);
        while(true){
            BrokerSever brokerServer = new BrokerSever(server.accept());
            new Thread(brokerServer).start();
        }
    }
}

ProduceClient 消息生产者

package org.example;
public class ProduceClient {
    public static void main(String[] args) throws Exception{
        MyClient client = new MyClient();
        client.produce("hello World.");
    }
}

ConsumeClient 消息消费者

package org.example;
public class ConsumeClient {
    public static void main(String[] args) throws Exception{
        MyClient client = new MyClient();
        String message = client.consume();
        System.out.println("获得的消息为: " + message);
    }
}

MyClient 与消息服务器进行通信

package org.example;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.InetAddress;
import java.net.Socket;
public class MyClient {
    public static void produce(String message) throws Exception{
        Socket socket = new Socket(InetAddress.getLocalHost(),BrokerSever.SERVICE_PORT);
        try(
                PrintWriter out = new PrintWriter(socket.getOutputStream())
        ){
            out.println(message);
            out.flush();
        }
    }
    public static String consume() throws Exception{
        Socket socket = new Socket(InetAddress.getLocalHost(),BrokerSever.SERVICE_PORT);
        try(
                BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                PrintWriter out = new PrintWriter(socket.getOutputStream())
        ){
            out.println("CONSUME");
            out.flush();
            String message = in.readLine();
            return message;
        }
    }
}


相关文章
|
消息中间件 存储 cobar
用了8年MQ!聊聊消息队列的技术选型,哪个最香! 下
用了8年MQ!聊聊消息队列的技术选型,哪个最香! 下
|
6天前
|
消息中间件 NoSQL Redis
【后端面经】【消息队列】22 | 消息队列:消息队列可以用来解决什么问题?-01
【5月更文挑战第6天】消息队列的核心特性是异步、削峰和解耦,常用于日志处理和消息通讯,实现事件驱动架构。面试中可能涉及问题包括公司是否使用消息队列、应用场景、优缺点以及延时队列、秒杀架构等。秒杀场景下,消息队列将校验和库存扣减(轻量级)与订单创建(重量级)分隔,减轻系统压力,依赖于Redis性能。使用消息队列能解决高并发、复杂流程同步等问题。
27 0
|
6天前
|
消息中间件 存储 缓存
消息队列学习之rocketmq
【4月更文挑战第1天】消息队列学习之rocketmq
18 0
|
7月前
|
消息中间件 存储 监控
消息中间件第八讲:消息队列 RocketMQ 版实战、集群及原理
消息中间件第八讲:消息队列 RocketMQ 版实战、集群及原理
116 0
|
6天前
|
消息中间件 存储 Kafka
MQ消息队列学习入门
MQ消息队列学习入门
82 0
|
6天前
|
消息中间件 存储 监控
消息队列进阶-3.消息队列常见问题解决方案
消息队列进阶-3.消息队列常见问题解决方案
75 0
|
9月前
|
消息中间件 中间件 调度
消息队列基础知识
什么是消息队列
81 0
|
消息中间件 监控 API
基于个人使用消息队列MQ产品的心得体会
随着互联网技术的不断发展,消息队列MQ(Message Queue)产品已经成为了现代软件架构中非常重要的一部分。这种技术可以通过异步处理来提高系统性能和可靠性,并且它还能够实现不同应用程序之间的解耦。在最近的几个月里,我尝试了使用一些流行的MQ产品进行开发,从而得出了一些有关其优点和缺点的心得体会。
218 2
|
消息中间件 存储 缓存
用了8年MQ!聊聊消息队列的技术选型,哪个最香! 上
用了8年MQ!聊聊消息队列的技术选型,哪个最香! 上
|
消息中间件 NoSQL Java
消息队列:第一章:消息队列简介
消息队列:第一章:消息队列简介
102 0
消息队列:第一章:消息队列简介