案例16-消息队列的作用和意义

简介: 案例16-消息队列的作用和意义

一:背景介绍

二:消息队列

概念:

1、MQ全程为Message Queue,消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息来通信,而无需专用连接来链接它们。

2、消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。

目的:

消息队列中间件是分布式系统中重要的组件,主要解决应用耦合、异步消息、流量削峰等问题。

解耦:

![譬如签到送积分,签到和送积分是两个操作。签到产生了很重要的数据,它可以把消息发送到MQ,然后积分系统需要该数据,从MQ中直接获取即可。这样签到系统就做到了和积分系统解耦,不必担心积分系统挂了怎么办,是不是需要重试等,而这些都可以在积分系统内部自己实现,再者,如果以后另外一套系统也需要该签到数据,直接从MQ中获取即可,实际上与签单系统已无关系。

异步

当做到解耦后,实现异步就是自然而然的事情,如果签到只需要1ms,而送积分,或者其他操作需要500ms,那不可能等所有操作完成之后再去返回数据给用户,这样就做到了异步。串联变并联。

同步变成异步

流量削峰

削峰是指当并发访问高峰期,通过MQ达到限流的目的,从而减少对数据库MySQL的压力,这里也用到了池化思想。

三:实现过程

解耦和异步

消息处理中心 Broker

public class Broker {
    private final static int MAX_SIZE = 3;
    private static ArrayBlockingQueue<String> messageQueue = new ArrayBlockingQueue<>(MAX_SIZE);
    public static void produce(String msg){
        if(messageQueue.offer(msg)){
            System.out.println("已成功向消息处理中心发送消息: " + msg + ",当前缓存的消息数量是:"+ messageQueue.size());
        } else{
            System.out.println("消息处理中心内暂存的消息达到最大负荷,不能继续放入消息!");
        }
        System.out.println("-----------------------------");
    }
    public static String consume(){
        String msg = messageQueue.poll();
        if(msg != null){
            System.out.println("已经消费的消息:" + msg + ",当前暂存消息的数量是:" + messageQueue.size());
        } else {
            System.out.println("消息处理中心内没有可供消费的消息!");
        }
        System.out.println("-----------------------------");
        return msg;
    }
}

BrokerSever用来提供Broker类得对外服务,BrokerSever类实现Runnable接口,实现run方法。用new Thread(Runnable target).start()方法来启动

public class BrokerSever implements Runnable{
    public static int SERVICE_PORT = 9999;
    private final Socket socket;
    public BrokerSever(Socket socket){
        this.socket = socket;
    }
    @Override
    public void run() {
        try(
                BufferedReader in = new BufferedReader(new InputStreamReader(
                        socket.getInputStream()));
                PrintWriter out = new PrintWriter(socket.getOutputStream()))
        {
            while (true){
                String str = in.readLine();
                if (str == null){
                    continue;
                }
                System.out.println("接收到原始数据: " + str);
                if (str.equals("CONSUME")){
                    String message = Broker.consume();
                    out.println(message);
                    out.flush();
                }else {
                    Broker.produce(str);
                }
            }
        } catch (Exception e){
            e.printStackTrace();
        }
    }
    public static void main(String[] args) throws Exception{
        ServerSocket server = new ServerSocket(SERVICE_PORT);
        while(true){
            BrokerSever brokerServer = new BrokerSever(server.accept());
            new Thread(brokerServer).start();
        }
    }
}

消息生产者

public class ProduceClient {
    public static void main(String[] args) throws Exception{
        MyClient client = new MyClient();
        client.produce("hello World.");
    }
}

消息消费者

public class ConsumeClient {
    public static void main(String[] args) throws Exception{
        MyClient client = new MyClient();
        String message = client.consume();
        System.out.println("获得的消息为: " + message);
    }
}

MyClient与消息服务器进行通信

public class MyClient {
    public static void produce(String message) throws Exception{
        Socket socket = new Socket(InetAddress.getLocalHost(),BrokerSever.SERVICE_PORT);
        try(
                PrintWriter out = new PrintWriter(socket.getOutputStream())
        ){
            out.println(message);
            out.flush();
        }
    }
    public static String consume() throws Exception{
        Socket socket = new Socket(InetAddress.getLocalHost(),BrokerSever.SERVICE_PORT);
        try(
                BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                PrintWriter out = new PrintWriter(socket.getOutputStream())
        ){
            out.println("CONSUME");
            out.flush();
            String message = in.readLine();
            return message;
        }
    }
}

流量削峰

定义一个消息生产者

@Test
public void test() throws Exception {
    for (int i = 0; i < 1000 ; i++) {
        rabbitTemplate.convertAndSend("test-queue ",  "消息发送);
    }
    Thread.sleep(1000 * 1000);
}

使用@RabbitListener注解定义一个消息消费者

@Component
@RabbitListener(queuesToDeclare = @Queue(name = "test-queue"))
public class Consumer {
    private int count = 0;
    @RabbitHandler
    public void receive(String msg, Channel channel, Message message) throws IOException {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            Thread.sleep(1000);
            System.out.println("=====消息处理===>");
            channel.basicAck(deliveryTag, true);
            System.out.println("current count is:" + ++count);
        } catch (Exception e) {
        }
    }
}

四:总结

使用队列有得有失,凡事都有成本。具体上边的开门小例子如何体现得失的我们还需要研究研究。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
7月前
|
消息中间件 Java Spring
JavaWeb后端开发Spring框架之消息 消息队列案例--订单短信通知
JavaWeb后端开发Spring框架之消息 消息队列案例--订单短信通知
66 0
|
8月前
|
消息中间件 存储 中间件
【SpringCloud Stream消息驱动、设计思想以及整合rabbitmq消息队列案例--学习笔记】
【SpringCloud Stream消息驱动、设计思想以及整合rabbitmq消息队列案例--学习笔记】
294 0
|
消息中间件 运维 监控
消息队列和应用工具产品体系-完美日记电商业务案例
消息队列和应用工具产品体系-完美日记电商业务案例
消息队列和应用工具产品体系-完美日记电商业务案例
|
消息中间件 中间件 关系型数据库
【项目实战典型案例】16.消息队列作用和意义
【项目实战典型案例】16.消息队列作用和意义
|
消息中间件 存储 中间件
案例16-消息队列的作用和意义
消息队列的作用和意义
151 0
|
消息中间件 存储 前端开发
28个案例问题分析---16---消息队列的作用和意义--RabbitMq
28个案例问题分析---16---消息队列的作用和意义--RabbitMq
135 0
|
6月前
|
消息中间件 C语言 RocketMQ
消息队列 MQ操作报错合集之出现"Connection reset by peer"的错误,该如何处理
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
6月前
|
消息中间件 Java C语言
消息队列 MQ使用问题之在使用C++客户端和GBase的ESQL进行编译时出现core dump,该怎么办
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
2月前
|
消息中间件 存储 Kafka
MQ 消息队列核心原理,12 条最全面总结!
本文总结了消息队列的12个核心原理,涵盖消息顺序性、ACK机制、持久化及高可用性等内容。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
|
4月前
|
消息中间件
手撸MQ消息队列——循环数组
队列是一种常用的数据结构,类似于栈,但采用先进先出(FIFO)的原则。生活中常见的排队场景就是队列的应用实例。在数据结构中,队列通常用数组实现,包括入队(队尾插入元素)和出队(队头移除元素)两种基本操作。本文介绍了如何用数组实现队列,包括定义数组长度、维护队头和队尾下标(front 和 tail),并通过取模运算解决下标越界问题。此外,还讨论了队列的空与满状态判断,以及并发和等待机制的实现。通过示例代码展示了队列的基本操作及优化方法,确保多线程环境下的正确性和高效性。
58 0
手撸MQ消息队列——循环数组