pmq再学习三

简介: 前面我们已经了解了在做好基础数据的准备工作后,启动测试的时候,会做一个注册消费组的工作,完成后,我们就可以执行生产者发消息操作了。发消息的操作是:发送消息完成后,由于其采用的拉模式,我们可以看到消息在经过发送,存储到数据库之后,会做一个通知拉取数据操作,然后执行拉取。拉取完成后,进行响应。此时会进行消费操作,而这个过程的处理关键是handleData操作,从代码中,我们可以看到其是执行的线程操作是一个batchExcute批量执行操作,可以看到其里面有一个重要方法:threadExcute方法,从而进一步看到我们想看到的方法doMessageReceived

前面我们已经了解了在做好基础数据的准备工作后,启动测试的时候,会做一个注册消费组的工作,完成后,我们就可以执行生产者发消息操作了。发消息的操作是:

@GetMapping("/test1")
publicvoidtest1(@RequestParamStringtopicName, @RequestParamintcount) {
if (Util.isEmpty(topicName)) return;
Executors.newSingleThreadExecutor().submit(newRunnable() {
@Overridepublicvoidrun() {
for (inti=1; i<count; i++) {
try {
MqClient.publish(topicName, "", newProducerDataDto(String.valueOf(i)));
       } catch (MqNotInitExceptione) {
// TODO Auto-generated catch blocke.printStackTrace();
         } catch (ContentExceed65535Exceptione) {
// TODO Auto-generated catch blocke.printStackTrace();
       }
Util.sleep(10);
     }
   }
 });
}

发送消息完成后,由于其采用的拉模式,我们可以看到消息在经过发送,存储到数据库之后,会做一个通知拉取数据操作,然后执行拉取。拉取完成后,进行响应。此时会进行消费操作,而这个过程的处理关键是handleData操作,从代码中,我们可以看到其是执行的线程操作是一个batchExcute批量执行操作,可以看到其里面有一个重要方法:threadExcute方法,从而进一步看到我们想看到的方法doMessageReceived,这个方法会调用我们自定义的方法实现消费。

// 执行消费操作

@OverridepublicList<Long>onMessageReceived(List<MessageDto>messages) {
try {
// 执行消息消费TransactioncatTransaction=null;
System.out.println("开始接收生产者发送过来的消息");
for (MessageDtomessageDto : messages) {
// 执行消息消费System.out.println("当前接收到的消息是messageDto消费组:"+messageDto.getConsumerGroupName() +" "+"当前接收的消费主题:"+messageDto.getTopicName()
\+" "+"当前接收到的消息:"+messageDto.getBody());
   }
System.out.println("接收成功");
 } catch (Exceptione) {
// TODO Auto-generated catch blocke.printStackTrace();
 }
returnnull;
}

而从消息的结果可以看到其打印出来的消费消息。


目录
相关文章
|
6月前
|
算法 Ubuntu C语言
学习C++的意义
学习C++的意义
|
6月前
|
分布式计算 架构师 前端开发
IT学习视频
一、架构师: N学教育 N学教育P7架构师|价值14999元课程由一线大厂资深架构师(阿里 P8+)进行主讲。业内首次将分布式架构全部技术点串联,并结合大厂真实案例实践讲解,将后端架构技术全面系统的展现,帮助同学们从全局视角掌握分布式架构设计方法,成为一名合格的架构师。他们用名师+好课来改变世界,打破业界严重缺乏顶层架构思维且仅靠疯狂堆叠知识点常见做法,初心不变,让每个人持续提升职业能力!让每个程序人获得抵御寒冬的能力!
40 0
|
8月前
|
架构师 算法 程序员
|
9月前
|
NoSQL Java jenkins
【学习总结】总结
【学习总结】总结
|
9月前
|
NoSQL Java jenkins
|
程序员 编译器 C++
C++学习——前进(三)
C++学习——前进(三)
69 0
C++学习——前进(三)
|
机器学习/深度学习 并行计算 Java
今后的学习计划
今后的学习计划
78 0
|
XML 监控 Dubbo
pmq再学习二
首先启动的过程中,会去获取消费组中的配置信息,拿到消费组中的配置信息后,执行注册消费组操作,而执行注册消费组操作中,会首先注册消费者,然后执行消费组操作,然后执行启动消费者轮询服务,执行mq检查服务启动,mq提交服务启动。完成后,执行监控服务配置操作。 这里面最为重要的是启动长轮询服务操作。因为长轮询服务涉及到执行重平衡操作和执行更新元数据操作。更新元数据操作涉及到更新队列元数据操作,此时不可避免的涉及到对偏移量的更新操作。
100 2
pmq再学习二
|
网络协议 Linux 网络安全
MTPuTTy学习
MTPuTTy学习
|
弹性计算 运维 安全
我的学习之路
运用云服务器创建云笔记