项目实战16——消息队列的意义

简介: 项目实战16——消息队列的意义

一.什么是消息队列

消息队列,英文简称是MQ,“消息队列”是在消息的传输过程中保存消息的容器。

二.原理

1.ArrayBockingQueue:

ArrayBlockingQueue是一个阻塞式的队列,继承自AbstractBlockingQueue,底层以数组的形式保存数据(实际上可看作一个循环数组)

ArrayBockingQueue使用场景:

1.先进先出队列:头是先进队的元素,尾是后进队的元素

2.有界队列:初始化时指定的容量,就是队列最大的容量,不会出现扩容,容量满,则阻塞进队操作;容量空,则阻塞出队操作

3.队列不支持空元素

阻塞式队列方法的四种形式:

2.Socket

创建一个Socket类的实例,用它来发送和接收字节流,发送时调用getOutputStream方法获取一个java.io.OutputStream对象,接收远程对象发送来的信息可以调用getInputStream方法来返回一个java.io.InputStream对象

3.SeverSocket

ServerSocket与Socket不同,ServerSocket是等待客户端的请求,一旦获得一个连接请求,就创建一个Socket示例来与客户端进行通信。

4.Java IO操作——BufferedReader

可以接收任意长度的数据,并且避免乱码的产生

5.java.io.PrintWriter

输出流、字符打印流

三.实现

1.例子目录结构

2.代码部分

消息处理中心 Broker

package org.example;
import java.util.concurrent.ArrayBlockingQueue;
public class Broker {
    private final static int MAX_SIZE = 3;
    private static ArrayBlockingQueue<String> messageQueue = new ArrayBlockingQueue<>(MAX_SIZE);
    public static void produce(String msg){
        if(messageQueue.offer(msg)){
            System.out.println("已成功向消息处理中心发送消息: " + msg + ",当前缓存的消息数量是:"+ messageQueue.size());
        } else{
            System.out.println("消息处理中心内暂存的消息达到最大负荷,不能继续放入消息!");
        }
        System.out.println("-----------------------------");
    }
    public static String consume(){
        String msg = messageQueue.poll();
        if(msg != null){
            System.out.println("已经消费的消息:" + msg + ",当前暂存消息的数量是:" + messageQueue.size());
        } else {
            System.out.println("消息处理中心内没有可供消费的消息!");
        }
        System.out.println("-----------------------------");
        return msg;
    }
}

BrokerSever 用来提供Broker类的对外服务

(BrokerSever类实现Runnable接口,实现run方法。用new Thread(Runnable target).start()方法来启动)

package org.example;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
public class BrokerSever implements Runnable{
    public static int SERVICE_PORT = 9999;
    private final Socket socket;
    public BrokerSever(Socket socket){
        this.socket = socket;
    }
    @Override
    public void run() {
        try(
                BufferedReader in = new BufferedReader(new InputStreamReader(
                        socket.getInputStream()));
                PrintWriter out = new PrintWriter(socket.getOutputStream()))
        {
            while (true){
                String str = in.readLine();
                if (str == null){
                    continue;
                }
                System.out.println("接收到原始数据: " + str);
                if (str.equals("CONSUME")){
                    String message = Broker.consume();
                    out.println(message);
                    out.flush();
                }else {
                    Broker.produce(str);
                }
            }
        } catch (Exception e){
            e.printStackTrace();
        }
    }
    public static void main(String[] args) throws Exception{
        ServerSocket server = new ServerSocket(SERVICE_PORT);
        while(true){
            BrokerSever brokerServer = new BrokerSever(server.accept());
            new Thread(brokerServer).start();
        }
    }
}

ProduceClient 消息生产者

package org.example;
public class ProduceClient {
    public static void main(String[] args) throws Exception{
        MyClient client = new MyClient();
        client.produce("hello World.");
    }
}

ConsumeClient 消息消费者

package org.example;
public class ConsumeClient {
    public static void main(String[] args) throws Exception{
        MyClient client = new MyClient();
        String message = client.consume();
        System.out.println("获得的消息为: " + message);
    }
}

MyClient 与消息服务器进行通信

package org.example;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.InetAddress;
import java.net.Socket;
public class MyClient {
    public static void produce(String message) throws Exception{
        Socket socket = new Socket(InetAddress.getLocalHost(),BrokerSever.SERVICE_PORT);
        try(
                PrintWriter out = new PrintWriter(socket.getOutputStream())
        ){
            out.println(message);
            out.flush();
        }
    }
    public static String consume() throws Exception{
        Socket socket = new Socket(InetAddress.getLocalHost(),BrokerSever.SERVICE_PORT);
        try(
                BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                PrintWriter out = new PrintWriter(socket.getOutputStream())
        ){
            out.println("CONSUME");
            out.flush();
            String message = in.readLine();
            return message;
        }
    }
}


相关文章
|
2月前
|
消息中间件 监控 Java
开发者如何使用云消息队列 RocketMQ 版
【10月更文挑战第12天】开发者如何使用云消息队列 RocketMQ 版
197 3
|
4月前
|
消息中间件 存储 Java
分布式消息队列基础知识
本文概述了分布式消息队列的基本概念、组成、模式、基础与高级功能,以及它在业务开发中的应用和核心技术,为深入学习RocketMQ等消息队列组件提供基础知识。
分布式消息队列基础知识
|
7月前
|
消息中间件 Kafka 数据库
【后端面经】【消息队列】22 | 消息队列:消息队列可以用来解决什么问题?-02 超时场景+性能问题
【5月更文挑战第7天】 本文介绍了电商中订单超时取消的处理方法,通过使用消息队列实现延时消息。当订单30分钟后未支付,消息队列将触发取消操作,但需注意并发问题,如采用分布式锁或乐观锁避免并发更新订单状态。乐观锁确保只有订单状态为未支付时才允许支付。主流消息队列如RocketMQ支持延迟消息,而Kafka不支持。 使用消息队列的好处在于解耦和提高系统性能、扩展性和可用性。同步调用会导致性能下降,因为必须等待所有调用完成。并发调用虽可提升性能,但仍逊于消息队列,且无法解决扩展性和可用性问题。
123 1
|
消息中间件 存储 监控
消息中间件第八讲:消息队列 RocketMQ 版实战、集群及原理
消息中间件第八讲:消息队列 RocketMQ 版实战、集群及原理
367 0
|
7月前
|
消息中间件 NoSQL Redis
【后端面经】【消息队列】22 | 消息队列:消息队列可以用来解决什么问题?-01
【5月更文挑战第6天】消息队列的核心特性是异步、削峰和解耦,常用于日志处理和消息通讯,实现事件驱动架构。面试中可能涉及问题包括公司是否使用消息队列、应用场景、优缺点以及延时队列、秒杀架构等。秒杀场景下,消息队列将校验和库存扣减(轻量级)与订单创建(重量级)分隔,减轻系统压力,依赖于Redis性能。使用消息队列能解决高并发、复杂流程同步等问题。
105 0
|
7月前
|
消息中间件 存储 Kafka
MQ消息队列学习入门
MQ消息队列学习入门
121 0
|
7月前
|
消息中间件 存储 监控
消息队列进阶-3.消息队列常见问题解决方案
消息队列进阶-3.消息队列常见问题解决方案
173 0
|
消息中间件 中间件 调度
消息队列基础知识
什么是消息队列
109 0
|
消息中间件 监控 容灾
消息队列基础
消息队列基础
86 0
|
消息中间件 自然语言处理 数据可视化
消息队列入门学习
消息队列(Message Queue,简称 MQ)。是基于队列与消息传递技术,在网络环境中为应用系统提供同步或异步、可靠的消息传输的支撑性软件系统
消息队列入门学习