消息中间件-RocketMQ入门 消息发送的三种方式

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: 消息中间件-RocketMQ入门 消息发送的三种方式


消息中间件简介

应用场景

假设现在有订单微服务和积分微服务,正常请求流程之后是不是一个订单完成后给对应的用户加上积分,但如果积分微服务坏掉了,正常来说会回滚,但实际中情况中,积分完全可以晚一点加,没有什么影响
1.解决代码耦合的问题


解决问题的方法


这样订单微服务把参数发送给中间件之后就完成了它自己的任务,使微服务不用依赖其它微服务,就算中间件挂了也不需要担心,它虽然默认存储在内存里面,但也会在磁盘里面存一份

2.进行流量的削峰


3.数据分发



解决办法:



常用消息中间件

1.ActiveMQ是Apache出品,比较老的一个开源的消息中间件,以前在中小企业应用广泛.

2.Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统。它可以处理消费者规模的网站中的所有动作流数据。这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决.

3.RabbitMQ是一个基于Erlang语言开发的消息中间件,

RabbitMQ最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

对数据的一致性,稳定性和可靠性要求比较高的场景

4.RocketMQ是阿里巴巴在2012年开源的分布式消息中间件,目前已经捐赠给Apache软件基金会,并于2017年9月25日成为 Apache的顶级项目。作为经历过多次阿里巴巴双十一这种"超级工程"的洗礼并有稳定出色表现的国产中间件,以其高性能、低延时和高可靠等特性近年来已经也被越来越多的国内企业使用。

淘宝内部的交易系统使用了淘宝自主研发的Noify消息中间件,使用MySQL作为消息存储媒介,可完全水平扩容,为了进一步降低成本,我们认为存储部分可以进一步优化,2011年初,Linkin开源了Kafka这个优秀的消息中间件,淘宝中间件团队在对Kafka做过充分Review之后,Kaka无限消息堆积,高效的持久化速度吸引了我们,但是同时发现这个消息系统主要定位于日志传输,对于使用在淘宝交易、订单、充值等场景下还有诸多特性不满足,为此我们重新用Javai语言编写了RocketMQ,定位于非日志的可靠消息传输〈(日志场最也OK),目前RocketMQ在阿里集团被广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理, binlog分发等场景。




RocketMQ核心概念

消息中间件里面集群了多个代理服务器,如何做到负载?

在创造RocketMQ的时候,它本身有一个轻量级的注册中心称为"NameServer命名服务",因为像Nacos和zookeeper这样复杂的注册中心,运行起来对性能肯定也会有一定的影响,倘若有一天该注册中心不开源不维护了,该中间件是不是也会因此遇到很大的麻烦



入门案例-生产者和消费者代码逻辑

第一步:创建两个两个项目,分别为生产者和消费者


创建生产者

第一步:导入依赖

<dependencies>
  <dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.4.0</version>
  </dependency>
</dependencies>

第二步:创建生产类模拟生产

public class Producer {
    public static void main(String[] args) throws Exception {
        //定义一个生产者对象
        DefaultMQProducer producer = new DefaultMQProducer("helloGroup");
        //连接nameServer
        producer.setNamesrvAddr("43.143.161.59:9876");
        //启动生产者
        producer.start();
        //设置消息发送的目的地
        String topic = "helloTopic";
        //发送消息
        for(int i=0;i<10;i++){
            Message msg = new Message(topic,("RocketMQ普通消息"+i).getBytes(Charset.defaultCharset()));
            SendResult result = producer.send(msg);
            System.out.println("发送状态"+result.getSendStatus());
        }
        System.out.println("消息发送完毕.");
        //关闭资源
        producer.shutdown();
    }
}

创建消费者
第一步:导入依赖

<dependencies>
  <dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.4.0</version>
  </dependency>
</dependencies>

第二步:创建消费类模拟接收

public class Consumer {
    public static void main(String[] args) throws Exception {
        //定义消息消费者(在同一个JVM中,消费者的组名不能重复)
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("helloConsumerGroup");
        //设置nameServer地址
        consumer.setNamesrvAddr("43.143.161.59:9876");
        //设置订阅的主题
        consumer.subscribe("helloTopic","*");
        //设置消息的监听器
        consumer.setMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for(MessageExt msg:list){
                    String s = new String(msg.getBody(), Charset.defaultCharset());
                    System.out.println("线程:"+Thread.currentThread()+",消息的内容:"+s);
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //启动消费者
        consumer.start();
    }
}

消息发送的三种方式

同步发送


应用程序给消息中间件发送消息的时候。需要等待消息中间件将消息存储完毕之后,才响应回去。业务代码才能往下执行

异步发送


应用程序给消息中间件发送消息的时候,消息中间件收到这个消息之后,直接给应用程序响应了.(此时消息并没有完全存储到磁盘),消息中间件继续存储消息。存储完成(成功或者失败),通过回调地址通知有应用程序。消息存储的结果

示例代码

public class Producer {
    public static void main(String[] args) throws Exception {
        //定义一个生产者对象
        DefaultMQProducer producer = new DefaultMQProducer("helloGroup");
        //连接nameServer
        producer.setNamesrvAddr("43.143.161.59:9876");
        //启动生产者
        producer.start();
        //设置消息发送的目的地
        String topic = "helloTopic";
        //发送消息
        Message msg = new Message(topic,("RocketMQ异步消息").getBytes(Charset.defaultCharset()));
        System.out.println("消息发送前");
        //异步发送
        producer.send(msg, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println("消息存储状态:"+sendResult.getSendStatus());
            }
            @Override
            public void onException(Throwable throwable) {
                System.out.println("消息发送出现异常");
            }
        });
        System.out.println("消息发送完毕.");
        TimeUnit.SECONDS.sleep(5);
        //关闭资源
        producer.shutdown();
    }

运行结果

业务逻辑处理 ----> 执行send方法,不需要等待消息中间件存储消息,可以直接执行业务逻辑代码

与同步发送相比,异步发送时间更短一点,响应更快一点,为了使响应时间更快的可以选择异步发送,但同步发送也有它自己的意义,同步发送更加可靠

一次性消息

应用程序给消息中间件发送消息的时候,不需要知道消息是否在消息中间存储了,只管发就是了.

public class Producer {
    public static void main(String[] args) throws Exception {
        //定义一个生产者对象
        DefaultMQProducer producer = new DefaultMQProducer("helloGroup");
        //连接nameServer
        producer.setNamesrvAddr("43.143.161.59:9876");
        //启动生产者
        producer.start();
        //设置消息发送的目的地
        String topic = "helloTopic";
        //发送消息
        Message msg = new Message(topic,("RocketMQ一次性消息").getBytes(Charset.defaultCharset()));
        System.out.println("消息发送前");
        producer.sendOneway(msg);
        System.out.println("消息发送完毕.");
        TimeUnit.SECONDS.sleep(5);
        //关闭资源
        producer.shutdown();
    }
}

运行结果

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
25天前
|
消息中间件 存储 Java
RocketMQ(一):消息中间件缘起,一览整体架构及核心组件
【10月更文挑战第15天】本文介绍了消息中间件的基本概念和特点,重点解析了RocketMQ的整体架构和核心组件。消息中间件如RocketMQ、RabbitMQ、Kafka等,具备异步通信、持久化、削峰填谷、系统解耦等特点,适用于分布式系统。RocketMQ的架构包括NameServer、Broker、Producer、Consumer等组件,通过这些组件实现消息的生产、存储和消费。文章还提供了Spring Boot快速上手RocketMQ的示例代码,帮助读者快速入门。
|
2月前
|
消息中间件 存储 RocketMQ
消息中间件-RocketMQ技术(二)
消息中间件-RocketMQ技术(二)
|
2月前
|
消息中间件 存储 中间件
消息中间件-RocketMQ技术(一)
消息中间件-RocketMQ技术(一)
|
1月前
|
消息中间件 编解码 Docker
【Docker项目实战】Docker部署RabbitMQ消息中间件
【10月更文挑战第8天】Docker部署RabbitMQ消息中间件
84 1
【Docker项目实战】Docker部署RabbitMQ消息中间件
|
18天前
|
消息中间件 存储 Java
吃透 RocketMQ 消息中间件,看这篇就够了!
本文详细介绍 RocketMQ 的五大要点、核心特性及应用场景,涵盖高并发业务场景下的消息中间件关键知识点。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
吃透 RocketMQ 消息中间件,看这篇就够了!
|
1月前
|
消息中间件 Java Kafka
RabbitMQ 入门
RabbitMQ 入门
|
24天前
|
消息中间件 JSON Java
开发者如何使用轻量消息队列MNS
【10月更文挑战第19天】开发者如何使用轻量消息队列MNS
65 5
|
18天前
|
消息中间件 存储 Kafka
MQ 消息队列核心原理,12 条最全面总结!
本文总结了消息队列的12个核心原理,涵盖消息顺序性、ACK机制、持久化及高可用性等内容。关注【mikechen的互联网架构】,10年+BAT架构经验倾囊相授。
|
1月前
|
消息中间件 安全 Java
云消息队列RabbitMQ实践解决方案评测
一文带你详细了解云消息队列RabbitMQ实践的解决方案优与劣
65 7
|
22天前
|
消息中间件
解决方案 | 云消息队列RabbitMQ实践获奖名单公布!
云消息队列RabbitMQ实践获奖名单公布!