Java 实现消息消费|学习笔记

简介: 快速学习 Java 实现消息消费

开发者学堂课程【全面讲解 Spring Cloud Alibaba 技术栈(知识精讲+项目实战)第四阶段Java 实现消息消费】学习笔记,与课程紧密联系,让用户快速学习知识。

课程地址https://developer.aliyun.com/learning/course/686/detail/11907


Java 实现消息消费


内容介绍:

一、Java 实现消息消费

二、案例


一、Java 实现消息消费

 Java 代码进行消费,给予的是订阅机制也可以认为是接听机制。

1、消息接收步骤:

1创建消息消费者,指定消费者所属的组名

2指定 Nameserver 地址

3指定消费者订阅的主题和标签

4设置回调函数,编写处理消息的方法(消费者会一直监听 MQ,一旦发现消息用回调方式进行处理)

5启动消息消费者

//接收消息

public class RocketMQReceiveTest {

public static void main(string[] args) throws

mqclientException {

//1.创建消息消费者,指定消费者所属的组名

DefaultMQPushConsumer consumer

=newDefau1tMQPushCon sumer("myconsumer-group")

//2.指定 Nameserver地址

consumer.setNamesrvAddr("192.168.109.131:9876)

//3.指定消费者订阅的主题和标签

consumer.subscribe("myTopic",""");

//4.设置回调函数,编写处理消息的方法

consumer.registerMessageListener(new

MessageListenerConc urrently{

@Override

public ConsumeConcurrentlystatus

consumeMessaqe(List<Me ssaqeExt> msqs,

ConsumeConcurrentlyContextcontext){

System.out.print1n("Receive New Messages:"+ msgs);

//返回消费状态

return ConsumeConcurrentlyStatusCONSUMESUCCESS;

}

});

//5.启动消息消费者 consumer.start();

system.out.print1n("Consumer Started.");

}

}

2、处理

打开工程新加一个类 New-Java Class

接收消息的测试

package com.itheima.test;

public class RocketMQReceiveMessageTest{

//接收消息

public static void main(string[] args){

//1创建消费者,并且为其指定消费者组名

new DefaultMQPushConsumer consumer = new DefaultMQ

PushConsumer(consumerGroup:“myconsumer-group“);

//2为消费者设置 NameServer 的地址

consumer.setNamesrvAddr("192.168.109.131:9876");

//3指定消费者订阅的主题和标签

consumer.subscribe(topic:"myTopic",subExpression:"*");

//4设置一个回调函数,并在函数中编写接收到消息之后的处理方法

consumer.registerMessageListener(new

MessageListenerConc urrently() {

//处理获取到的消息

@Override

public ConsumeConcurrentlystatus consumeMessage(List<Me ssaqeext>

list,Consumeconcurrentlycontext consumeconcurrentlyContext){

//消费逻辑

system.out.println("Message--->" + list);

//返回消费成功状态

return ConsumeConcurrentlyStatus.CONSUME SUCCESS

});

//5 启动消费者

consumer.start);

system.out.println("启动消费者成功了");

}

}

启动消费者运行:

C:\Java\jdk1.8.0\bin\java.exe ...

启动消费者成功了

Messaqe===>[MessaqeExt [queueId=1,storeSize=182,

queue Offset=0,sysFlaq=0,

bornTimestamp=1577690976503,bornHo st=/

192.168.109.1:65029,storeTimestamp=1577690975351,

storeHost=/192.168.109.131:10911,msgId=C

消费者启动成功消息已经被消费,进程不会退出去一直等待,因为是基于监督机制。

还可以再发送一个消息,消费者现在只消费了一条,再来到发送启动:

c:\Java\jdk1.8.0\bin\java.exe ...

Sendresult [sendstatus=SEND OK,msgId=C0A82B9058E658

644D4698B7F5270000,

offsetMsId=C0A86D8300002A9F00000 00000AFA1A,

message

15:43:12.715 [Nettyclientselector1] INFO

RocketmqRemotin g-closechannel: close the connection to remote address[192.168.10915:43:12.724 [NettyclientSelector 1] INFO

RocketmgRemoting- closechannel: close the connection to remote address[192.168.109

15:43:12.724 [NettyclientSelector 11 INFO

RocketmgRemoting - closeChannel; close the connection to remote address[192.168.109

Process finished with exit code 0

消费者结果:

c:\Java\jdk1.8.0\bin\java.exe..启动消费者成功了

Message===>[MessageExt[queueId=1,storesize=182,

queue Offset=0,

sysFlag=0,bornTimestamp=1577690976503, born Host=/

192.168.109.1:65029,storeTimestamp=1577690975351,

storeHost=/192.168.109.131:10911,msgId=C

Message===> [MessageExt[queueId=3, storesize=182,

queue offset=0, sysflag=0,

bornTimestamp=1577691792680, bornH ost=/

192.168.109.1:49417,storeTimestamp=1577691791527,

storeHost=/192.168.109.131:10911,msqId=C

自动消费掉,关键是了解代码的大体逻辑以及内部的定额,内部的订阅机制是利用的监听,当一旦发现有消息过来,自动内部进行消费。


二、案例

接下来我们模拟一种场景:下单成功之后,向下单用户发送短信。设计图如下:

image.png

把代码应用到案例中去,下单成功以后会发送一个消息放到 MQ 中,通过另一个微服 MQ 中取消息

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
6月前
|
Java 编译器 开发工具
Java基础学习笔记——idea篇
JDK由JRE(包含JVM和核心类库)和开发工具箱(如javac编译器和java运行工具)组成。Java项目结构包括Project、Module、Package和Class。IDEA快捷键包括:生成main方法(main psvm)、复制代码(Ctrl+D)、删除代码(Ctrl+Y/X)、格式化代码(Ctrl+Alt+L)、重命名(Shift+F6)等。
51 0
|
6月前
|
监控 负载均衡 Dubbo
|
3月前
|
存储 Java
Java学习笔记 List集合的定义、集合的遍历、迭代器的使用
Java学习笔记 List集合的定义、集合的遍历、迭代器的使用
|
2月前
|
存储 安全 Java
Java修仙之路,十万字吐血整理全网最完整Java学习笔记(基础篇)
从Java环境的搭建到实际代码的编写,从基本用法的讲解到底层原理的剖析,深度解析Java基础知识。本文是《Java学习路线》专栏的起始文章,旨在提供一套完整的Java学习路线,覆盖Java基础知识、数据库、SSM/SpringBoot等框架、Redis/MQ等中间件、设计模式、架构设计、性能调优、源码解读、核心面试题等全面的知识点,并在未来不断更新和完善,帮助Java从业者在更短的时间内成长为高级开发。
Java修仙之路,十万字吐血整理全网最完整Java学习笔记(基础篇)
|
2月前
|
存储 安全 Java
Java修仙之路,十万字吐血整理全网最完整Java学习笔记(进阶篇)
本文是Java基础的进阶篇,对异常、集合、泛型、Java8新特性、I/O流等知识进行深入浅出的介绍,并附有对应的代码示例,重要的地方带有对性能、底层原理、源码的剖析。适合Java初学者。
Java修仙之路,十万字吐血整理全网最完整Java学习笔记(进阶篇)
|
1月前
|
Java 数据安全/隐私保护
java学习笔记(基础习题)
java学习笔记(基础习题)
31 0
|
1月前
|
Java 程序员 开发工具
java学习笔记
java学习笔记
34 0
|
2月前
|
存储 安全 Java
Java修仙之路,十万字吐血整理全网最完整Java学习笔记(高级篇)
本文是“Java学习路线”中Java基础知识的高级篇,主要对多线程和反射进行了深入浅出的介绍,在多线程部分,详细介绍了线程的概念、生命周期、多线程的线程安全、线程通信、线程同步,并对synchronized和Lock锁;反射部分对反射的特性、功能、优缺点、适用场景等进行了介绍。
Java修仙之路,十万字吐血整理全网最完整Java学习笔记(高级篇)
|
3月前
|
SQL druid Java
Java数据库部分(MySQL+JDBC)(二、JDBC超详细学习笔记)(下)
Java数据库部分(MySQL+JDBC)(二、JDBC超详细学习笔记)
53 3
Java数据库部分(MySQL+JDBC)(二、JDBC超详细学习笔记)(下)
|
3月前
|
SQL Java 关系型数据库
Java数据库部分(MySQL+JDBC)(二、JDBC超详细学习笔记)(上)
Java数据库部分(MySQL+JDBC)(二、JDBC超详细学习笔记)
107 3
Java数据库部分(MySQL+JDBC)(二、JDBC超详细学习笔记)(上)