基于PelicanDT验证RocketMQ消息收发

简介: 具体介绍RocketMQ-example,是基于PelicanDT实现RocketMQ环境准备,收发消息验证示例前期准备本示例程序是基于阿里云ECS或远程Linux服务器完成,只需购买阿里云机器,或者选定已准备好的远程服务器即可下载RocketMQ-example代码注意事项:如果购...

具体介绍

RocketMQ-example,是基于PelicanDT实现RocketMQ环境准备,收发消息验证示例

前期准备

  1. 本示例程序是基于阿里云ECS或远程Linux服务器完成,只需购买阿里云机器,或者选定已准备好的远程服务器即可
  2. 下载RocketMQ-example代码

注意事项:如果购买的是阿里云ECS,配置:8C16G,且安全组配置访问端口:9876

快速入门

修改配置

  1. 打开rocketmq.properties配置文件,具体路径:RocketMQ-example/src/test/resources/env/func/rocketmq.properties
  2. 填写ip,userName,password

运行示例

本地代码控制远程服务器执行Dubbo验证:

  1. 打开TestRocketMQ.java,具体路径:RocketMQ-example/src/test/java/com/alibaba/pelican/rocketmq/TestRocketMQ.java
  2. 运行单元测试

预期结果

日志输出内容如下


2019-02-28 19:46:46 [INFO] [main] c.a.p.deployment.junit.rule.LogRule - --------- TO NEXT CASE ---------
2019-02-28 19:46:46 [INFO] [main] c.a.p.deployment.junit.rule.LogRule - Run TC[test(com.alibaba.pelican.rocketmq.TestRocketMQ)]
SendResult [sendStatus=SEND_OK, msgId=1E057C08A74518B4AAC28F4A3D090000, offsetMsgId=781B1FC600002A9F0000000000008BC4, messageQueue=MessageQueue [topic=TopicTest, brokerName=iZm5e0pe3xy3tjh9sw1kgpZ, queueId=3], queueOffset=50]
SendResult [sendStatus=SEND_OK, msgId=1E057C08A74518B4AAC28F4A3D920001, offsetMsgId=781B1FC600002A9F0000000000008C76, messageQueue=MessageQueue [topic=TopicTest, brokerName=iZm5e0pe3xy3tjh9sw1kgpZ, queueId=0], queueOffset=50]
SendResult [sendStatus=SEND_OK, msgId=1E057C08A74518B4AAC28F4A3DB10002, offsetMsgId=781B1FC600002A9F0000000000008D28, messageQueue=MessageQueue [topic=TopicTest, brokerName=iZm5e0pe3xy3tjh9sw1kgpZ, queueId=1], queueOffset=50]
SendResult [sendStatus=SEND_OK, msgId=1E057C08A74518B4AAC28F4A3DD60003, offsetMsgId=781B1FC600002A9F0000000000008DDA, messageQueue=MessageQueue [topic=TopicTest, brokerName=iZm5e0pe3xy3tjh9sw1kgpZ, queueId=2], queueOffset=50]
SendResult [sendStatus=SEND_OK, msgId=1E057C08A74518B4AAC28F4A3DFE0004, offsetMsgId=781B1FC600002A9F0000000000008E8C, messageQueue=MessageQueue [topic=TopicTest, brokerName=iZm5e0pe3xy3tjh9sw1kgpZ, queueId=3], queueOffset=51]
2019-02-28 19:46:47 [INFO] [NettyClientSelector_1] RocketmqRemoting - closeChannel: close the connection to remote address[120.27.31.198:10911] result: true
2019-02-28 19:46:47 [INFO] [NettyClientSelector_1] RocketmqRemoting - closeChannel: close the connection to remote address[120.27.31.198:9876] result: true
2019-02-28 19:46:47 [INFO] [NettyClientSelector_1] RocketmqRemoting - closeChannel: close the connection to remote address[120.27.31.198:10909] result: true
ConsumeMessageThread_2 Receive New Messages: [MessageExt [queueId=3, storeSize=178, queueOffset=51, sysFlag=0, bornTimestamp=1551354407422, bornHost=/42.120.74.97:44264, storeTimestamp=1551354407473, storeHost=/120.27.31.198:10911, msgId=781B1FC600002A9F0000000000008E8C, commitLogOffset=36492, bodyCRC=601994070, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=52, CONSUME_START_TIME=1551354408017, UNIQ_KEY=1E057C08A74518B4AAC28F4A3DFE0004, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 52], transactionId='null'}]] 
ConsumeMessageThread_1 Receive New Messages: [MessageExt [queueId=3, storeSize=178, queueOffset=50, sysFlag=0, bornTimestamp=1551354407177, bornHost=/42.120.74.97:44264, storeTimestamp=1551354407322, storeHost=/120.27.31.198:10911, msgId=781B1FC600002A9F0000000000008BC4, commitLogOffset=35780, bodyCRC=613185359, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=52, CONSUME_START_TIME=1551354408017, UNIQ_KEY=1E057C08A74518B4AAC28F4A3D090000, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 48], transactionId='null'}]] 
ConsumeMessageThread_3 Receive New Messages: [MessageExt [queueId=2, storeSize=178, queueOffset=50, sysFlag=0, bornTimestamp=1551354407382, bornHost=/42.120.74.97:44264, storeTimestamp=1551354407440, storeHost=/120.27.31.198:10911, msgId=781B1FC600002A9F0000000000008DDA, commitLogOffset=36314, bodyCRC=1032136437, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=51, CONSUME_START_TIME=1551354411038, UNIQ_KEY=1E057C08A74518B4AAC28F4A3DD60003, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 51], transactionId='null'}]] 
ConsumeMessageThread_4 Receive New Messages: [MessageExt [queueId=1, storeSize=178, queueOffset=50, sysFlag=0, bornTimestamp=1551354407345, bornHost=/42.120.74.97:44264, storeTimestamp=1551354407400, storeHost=/120.27.31.198:10911, msgId=781B1FC600002A9F0000000000008D28, commitLogOffset=36136, bodyCRC=1250039395, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=51, CONSUME_START_TIME=1551354411040, UNIQ_KEY=1E057C08A74518B4AAC28F4A3DB10002, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 50], transactionId='null'}]] 
ConsumeMessageThread_5 Receive New Messages: [MessageExt [queueId=0, storeSize=178, queueOffset=50, sysFlag=0, bornTimestamp=1551354407314, bornHost=/42.120.74.97:44264, storeTimestamp=1551354407361, storeHost=/120.27.31.198:10911, msgId=781B1FC600002A9F0000000000008C76, commitLogOffset=35958, bodyCRC=1401636825, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=51, CONSUME_START_TIME=1551354411042, UNIQ_KEY=1E057C08A74518B4AAC28F4A3D920001, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 49], transactionId='null'}]] 
  • SendResult 开头的日志代表发送消息
  • Receive New 开头的日志代表消费消息
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
消息中间件 算法 Java
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
783 1
弥补延时消息的不足,RocketMQ 基于时间轮算法实现了定时消息!
|
7月前
|
物联网 Go 网络性能优化
使用Go语言(Golang)可以实现MQTT协议的点对点(P2P)消息发送。MQTT协议本身支持多种消息收发模式
使用Go语言(Golang)可以实现MQTT协议的点对点(P2P)消息发送。MQTT协议本身支持多种消息收发模式【1月更文挑战第21天】【1月更文挑战第104篇】
518 1
|
消息中间件 弹性计算 网络安全
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
|
7月前
|
消息中间件 存储 弹性计算
消息队列RocketMQ版:基础消息收发功能体验
【2月更文挑战第1天】假期闲着无聊,随便体验一下。本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
|
7月前
|
物联网 Go 网络性能优化
MQTT协议本身支持多种消息收发模式
MQTT协议本身支持多种消息收发模式【1月更文挑战第24天】【1月更文挑战第120篇】
173 3
|
消息中间件 uml RocketMQ
3 张图带你彻底理解 RocketMQ 事务消息
3 张图带你彻底理解 RocketMQ 事务消息
67793 2
3 张图带你彻底理解 RocketMQ 事务消息
|
消息中间件 NoSQL 关系型数据库
实战:如何防止mq消费方消息重复消费、rocketmq理论概述、rocketmq组成、普通消息的发送
实战:如何防止mq消费方消息重复消费 如果因为网络延迟等原因,mq无法及时接收到消费方的应答,导致mq重试。(计算机网络)。在重试过程中造成重复消费的问题
2765 1
实战:如何防止mq消费方消息重复消费、rocketmq理论概述、rocketmq组成、普通消息的发送
|
消息中间件 Java uml
5张图带你理解 RocketMQ 顺序消息实现机制
5张图带你理解 RocketMQ 顺序消息实现机制
708 1
5张图带你理解 RocketMQ 顺序消息实现机制
|
消息中间件 缓存 算法
阿里二面:RocketMQ 消息积压了,增加消费者有用吗?
阿里二面:RocketMQ 消息积压了,增加消费者有用吗?
264 0
阿里二面:RocketMQ 消息积压了,增加消费者有用吗?
|
消息中间件 缓存 数据库
4 张图,9 个维度告诉你怎么做能确保 RocketMQ 不丢失消息
4 张图,9 个维度告诉你怎么做能确保 RocketMQ 不丢失消息
428 0
4 张图,9 个维度告诉你怎么做能确保 RocketMQ 不丢失消息
下一篇
DataWorks