消息发送和接收演示

简介: 接下来我们使用Java代码来演示消息的发送和接收

导入依赖

         <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.0.2</version>
        </dependency>

发送消息

消息发送步骤:

​ 创建消息消费者, 指定消费者所属的组名

​ 指定Nameserver地址

​ 指定消费者订阅的主题和标签

​ 设置回调函数,编写处理消息的方法

​ 启动消息消费者

代码示例:

package com.wxit.test;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

/**
 * @Author wj
 **/
//发送消息
public class RocketMQSendMessageTest {
    public static void main(String[] args) throws Exception {
        //1.创建消息生产者,并且设置生产组名
        DefaultMQProducer producer = new DefaultMQProducer("myproducer-group");

        //2. 指定Nameserver地址
        producer.setNamesrvAddr("192.168.91.4:9876");

        //3.启动生产者
        producer.start();

        //4.创建消息对象,指定主题、标签和消息体
        Message message = new Message("myTopic", "myTag", ("Test RocketMQ Message").getBytes());

        //5.发送消息
        SendResult sendResult = producer.send(message, 10000);
        System.out.println(sendResult);

        //6.关闭生产者
        producer.shutdown();
    }
}

接收消息

消息接收步骤:

​ \1. 创建消息消费者, 指定消费者所属的组名

​ \2. 指定Nameserver地址

​ \3. 指定消费者订阅的主题和标签

​ \4. 设置回调函数,编写处理消息的方法

​ \5. 启动消息消费者

代码示例:

package com.wxit.test;


import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

/**
 * @Author wj
 **/
//接收消息
public class RocketMQReceiveMessageTest {

    //接收消息
    public static void main(String[] args) throws Exception {

        //1 创建消费者,并且为其指定消费者组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myproducer-group");

        //2 为消费者设置NameServer的地址
        consumer.setNamesrvAddr("192.168.91.4:9876");

        //3 指定消费者订阅的主题和标签
        consumer.subscribe("myTopic", "*");

        //4 设置一个回调函数,并在函数中编写接收到消息之后的处理方法
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            //处理获取到的消息
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                //消费逻辑
                System.out.println("Message===>" + list);

                //返回消费成功状态
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        //5 启动消费者
        consumer.start();
        System.out.println("启动消费者成功了");
    }
}

案例

接下来我们模拟一种场景: 下单成功之后,向下单用户发送短信。设计图如下:

订单微服务发送消息

1 在 shop-order 中添加rocketmq的依赖

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.0.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.4.0</version>
        </dependency>

2 添加配置

#rocketmq
rocketmq:
  name-server: 192.168.91.4:9876 #rocketMQ服务的地址
  producer:
    group: shop-order  # 生产者组

3 编写测试代码

@RestController
@Slf4j
public class OrderController {

    @Autowired
    private RestTemplate restTemplate;

    @Autowired
    private OrderService orderService;

    @Autowired
    private ProductService productService;

    @Autowired
    private DiscoveryClient discoveryClient;

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    //下单 --ribbon自定义负载均衡
    //fegin
    @RequestMapping("/order/prod/{pid}")
    public Order order(@PathVariable("pid") Integer pid){
        log.info("接收到{}号商品的下单请求,接下来调用商品的微服务查询此商品信息",pid);

        /**
         * 调用商品微服务,查询商品信息
         * 问题:
         * 1.代码可读性不好
         * 2.编程风格不统一
         */
        Product product = productService.findByPid(pid);

        if (product.getPid() == -100){
            Order order = new Order();
            order.setOid(-100L);
            order.setPname("下单失败");
            return order;
        }

        log.info("查询到{}号商品的信息",pid);

        //下单,创建订单
        Order order = new Order();
        order.setUid(1);
        order.setUsername("测试用户");

        order.setPid(pid);
        order.setPname(product.getPname());
        order.setPprice(product.getPprice());
        order.setNumber(1);

        orderService.createOrder(order);

        log.info("创建订单成功,订单信息为{}",order);

        //向mq中投递一个下单成功的信息
        rocketMQTemplate.convertAndSend("order-topic",order);

        return order;
    }

用户微服务订阅消息

1 修改 shop-user 模块配置,导入依赖

        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.0.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.4.0</version>
        </dependency>

2 修改主类

package com.wxit;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;

/**
 * @Author wj
 **/
@SpringBootApplication
@EnableDiscoveryClient
public class UserApplication {

    public static void main(String[] args) {
        SpringApplication.run(UserApplication.class);
    }
}

3 修改配置文件

  cloud:    nacos:      discovery:        server-addr: 127.0.0.1:8848#rocrocketmq:  name-server: 192.168.91.4:9876

4 编写消息接收服务

package com.wxit.service;import com.wxit.domain.Order;import lombok.extern.slf4j.Slf4j;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;import org.apache.rocketmq.spring.core.RocketMQListener;import org.springframework.stereotype.Service;/** * @Author wj **/@Slf4j@Service@RocketMQMessageListener(consumerGroup = "shop-user", topic = "order-topic")public class SmsService implements RocketMQListener<Order> {    @Override    public void onMessage(Order message) {        log.info("接收到了一个订单信息{},接下来就可以发送短信通知了", message);    }}

5 启动服务,执行下单操作,观看后台输出

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
8月前
|
设计模式
策略模式在数据接收和发送场景的应用
策略模式在数据接收和发送场景的应用
|
8月前
|
消息中间件
【面试问题】如何确保消息正确地发送至 RabbitMQ? 如何确保消息接收方消费了消息?
【1月更文挑战第27天】【面试问题】如何确保消息正确地发送至 RabbitMQ? 如何确保消息接收方消费了消息?
|
8月前
Socket网络编程练习题二:客户端发送一条数据,接收服务端反馈的消息并打印;服务端接收数据并打印,再给客户端反馈消息
Socket网络编程练习题二:客户端发送一条数据,接收服务端反馈的消息并打印;服务端接收数据并打印,再给客户端反馈消息
|
消息中间件 负载均衡 Java
下单消息的发送和接收案例|学习笔记
快速学习下单消息的发送和接收案例
239 0
下单消息的发送和接收案例|学习笔记
|
消息中间件 物联网 Linux
Msgrcv 接收消息|学习笔记
快速学习 Msgrcv 接收消息
|
消息中间件 RocketMQ 开发者
测试发送消息和接受消息|学习笔记
快速学习测试发送消息和接受消息
146 0
测试发送消息和接受消息|学习笔记
|
消息中间件 Java RocketMQ
发送确认订单失败消息演示|学习笔记
快速学习发送确认订单失败消息
105 0
发送确认订单失败消息演示|学习笔记
|
消息中间件 RocketMQ 开发者
消息发送4发送消息|学习笔记
快速学习消息发送4发送消息
消息发送4发送消息|学习笔记
|
移动开发 网络协议 测试技术
服务器循环接收客户端消息|学习笔记
快速学习服务器循环接收客户端消息
服务器循环接收客户端消息|学习笔记
|
消息中间件 Kafka
kafka模拟客户端发送、接受消息
producer   消息的生成者,即发布消息 consumer   消息的消费者,即订阅消息 broker     Kafka以集群的方式运行,可以由一个或多个服务组成,服务即broker zookeeper  协调转发    一、创建topic .
2186 0