消息发送和接收演示

简介: 接下来我们使用Java代码来演示消息的发送和接收

导入依赖

         <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.0.2</version>
        </dependency>

发送消息

消息发送步骤:

​ 创建消息消费者, 指定消费者所属的组名

​ 指定Nameserver地址

​ 指定消费者订阅的主题和标签

​ 设置回调函数,编写处理消息的方法

​ 启动消息消费者

代码示例:

package com.wxit.test;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

/**
 * @Author wj
 **/
//发送消息
public class RocketMQSendMessageTest {
    public static void main(String[] args) throws Exception {
        //1.创建消息生产者,并且设置生产组名
        DefaultMQProducer producer = new DefaultMQProducer("myproducer-group");

        //2. 指定Nameserver地址
        producer.setNamesrvAddr("192.168.91.4:9876");

        //3.启动生产者
        producer.start();

        //4.创建消息对象,指定主题、标签和消息体
        Message message = new Message("myTopic", "myTag", ("Test RocketMQ Message").getBytes());

        //5.发送消息
        SendResult sendResult = producer.send(message, 10000);
        System.out.println(sendResult);

        //6.关闭生产者
        producer.shutdown();
    }
}

接收消息

消息接收步骤:

​ \1. 创建消息消费者, 指定消费者所属的组名

​ \2. 指定Nameserver地址

​ \3. 指定消费者订阅的主题和标签

​ \4. 设置回调函数,编写处理消息的方法

​ \5. 启动消息消费者

代码示例:

package com.wxit.test;


import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

/**
 * @Author wj
 **/
//接收消息
public class RocketMQReceiveMessageTest {

    //接收消息
    public static void main(String[] args) throws Exception {

        //1 创建消费者,并且为其指定消费者组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myproducer-group");

        //2 为消费者设置NameServer的地址
        consumer.setNamesrvAddr("192.168.91.4:9876");

        //3 指定消费者订阅的主题和标签
        consumer.subscribe("myTopic", "*");

        //4 设置一个回调函数,并在函数中编写接收到消息之后的处理方法
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            //处理获取到的消息
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                //消费逻辑
                System.out.println("Message===>" + list);

                //返回消费成功状态
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        //5 启动消费者
        consumer.start();
        System.out.println("启动消费者成功了");
    }
}

案例

接下来我们模拟一种场景: 下单成功之后,向下单用户发送短信。设计图如下:

订单微服务发送消息

1 在 shop-order 中添加rocketmq的依赖

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.0.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.4.0</version>
        </dependency>

2 添加配置

#rocketmq
rocketmq:
  name-server: 192.168.91.4:9876 #rocketMQ服务的地址
  producer:
    group: shop-order  # 生产者组

3 编写测试代码

@RestController
@Slf4j
public class OrderController {

    @Autowired
    private RestTemplate restTemplate;

    @Autowired
    private OrderService orderService;

    @Autowired
    private ProductService productService;

    @Autowired
    private DiscoveryClient discoveryClient;

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    //下单 --ribbon自定义负载均衡
    //fegin
    @RequestMapping("/order/prod/{pid}")
    public Order order(@PathVariable("pid") Integer pid){
        log.info("接收到{}号商品的下单请求,接下来调用商品的微服务查询此商品信息",pid);

        /**
         * 调用商品微服务,查询商品信息
         * 问题:
         * 1.代码可读性不好
         * 2.编程风格不统一
         */
        Product product = productService.findByPid(pid);

        if (product.getPid() == -100){
            Order order = new Order();
            order.setOid(-100L);
            order.setPname("下单失败");
            return order;
        }

        log.info("查询到{}号商品的信息",pid);

        //下单,创建订单
        Order order = new Order();
        order.setUid(1);
        order.setUsername("测试用户");

        order.setPid(pid);
        order.setPname(product.getPname());
        order.setPprice(product.getPprice());
        order.setNumber(1);

        orderService.createOrder(order);

        log.info("创建订单成功,订单信息为{}",order);

        //向mq中投递一个下单成功的信息
        rocketMQTemplate.convertAndSend("order-topic",order);

        return order;
    }

用户微服务订阅消息

1 修改 shop-user 模块配置,导入依赖

        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.0.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.4.0</version>
        </dependency>

2 修改主类

package com.wxit;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;

/**
 * @Author wj
 **/
@SpringBootApplication
@EnableDiscoveryClient
public class UserApplication {

    public static void main(String[] args) {
        SpringApplication.run(UserApplication.class);
    }
}

3 修改配置文件

  cloud:    nacos:      discovery:        server-addr: 127.0.0.1:8848#rocrocketmq:  name-server: 192.168.91.4:9876

4 编写消息接收服务

package com.wxit.service;import com.wxit.domain.Order;import lombok.extern.slf4j.Slf4j;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;import org.apache.rocketmq.spring.core.RocketMQListener;import org.springframework.stereotype.Service;/** * @Author wj **/@Slf4j@Service@RocketMQMessageListener(consumerGroup = "shop-user", topic = "order-topic")public class SmsService implements RocketMQListener<Order> {    @Override    public void onMessage(Order message) {        log.info("接收到了一个订单信息{},接下来就可以发送短信通知了", message);    }}

5 启动服务,执行下单操作,观看后台输出

相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
前端开发 小程序
扩展uview复选组件库支持自定义图片+自定义内容
扩展uview复选组件库支持自定义图片+自定义内容
577 6
|
存储 缓存 监控
如何提高数据驱动方式的性能和可维护性?
【10月更文挑战第13天】 本文深入探讨了提高数据驱动方式性能与可维护性的关键方法和策略,包括优化数据结构选择、数据缓存策略、合理的数据更新策略、数据压缩与精简、代码结构优化、测试与监控、版本控制与协作管理、文档化与知识共享、持续优化的意识及结合实际案例分析,旨在为数据驱动的高效和可持续发展提供全面指导。
|
数据库
脏读、幻读、不可重复读的定义?
脏读、不可重复读和幻读是数据库事务处理中的三种异常现象。脏读指读取未提交的修改数据;不可重复读指同一事务中多次读取数据不一致;幻读指读取记录范围时,前后读取结果数量不一致。这些现象通常由并发事务操作引起。
552 7
|
传感器 缓存 网络协议
CoAP 协议与 HTTP 协议的区别
CoAP(Constrained Application Protocol)协议是为资源受限的设备设计的轻量级协议,适用于物联网场景。相比HTTP,CoAP具有低功耗、低带宽占用和简单易实现的特点,支持多播通信和无连接的交互模式。
|
安全 数据安全/隐私保护 UED
优化用户体验:前后端分离架构下Python WebSocket实时通信的性能考量
在当今互联网技术的迅猛发展中,前后端分离架构已然成为主流趋势,它不仅提升了开发效率,也优化了用户体验。然而,在这种架构模式下,如何实现高效的实时通信,特别是利用WebSocket协议,成为了提升用户体验的关键。本文将探讨在前后端分离架构中,使用Python进行WebSocket实时通信时的性能考量,以及与传统轮询方式的比较。
326 2
|
传感器 存储 安全
机器通信 | 《5G移动无线通信技术》之八
本节主要介绍了机器通信的内容以及超可靠机器类通信。
机器通信  | 《5G移动无线通信技术》之八
|
传感器 人工智能 供应链
报告发布丨数字化助力乡村振兴:《县域数字生态创新趋势展望》
报告指出,借助数字新基建打造“新时空”,县域数字生态加速构建,并揭示了数字技术加速下沉全面赋能县域经济、新型信息基础设施重构县域生产要素、数字化加持下县域生态价值逐步释放等九大趋势。
报告发布丨数字化助力乡村振兴:《县域数字生态创新趋势展望》
|
Java 关系型数据库 数据库连接
快速配置多数据源(整合MyBatis)
本文内容: 在Springboot+Mybatis项目的基础上,学习多数据源的快速配置 避免网上某些配置数据源文章的深坑
644 0
|
存储 运维 算法
云仿真平台有哪些特点
云仿真按照服务模式为用户提供OTS仿真软件,该服务模式采用云计算、大数据算法和人工智能技术实现,遵循软件即服务(saas)的概念,无需安装就可以运行。 仿真软件统一部署在云(公有云或私有云)服务器上,用户可以通过网络获得平台提供的服务,例如仿真操作、理论学习、仿真评估、教师管理、理论考试等。用户可以在云仿真平台操作环境、化工、食品等环境中操作虚拟仿真软件。
云仿真平台有哪些特点

热门文章

最新文章