RocketMQ是阿里巴巴研发的消息中间件,由于RocketMQ4.0 5.0在协议等方面的变化,本文主要针对Java客户端接入,列举在4.0、5.0使用普通java接入、SpringBoot接入等方式。
在接入之前,首先我们来了解RocketMQ5.0和4.0在代码接入上的差异性。
版本 |
协议 |
支持接入方式 |
4.0 |
remoting |
tcp、http |
5.0 |
remoting、GRPC |
tcp |
- RocketMQ4.0支持remoting协议,支持通过tcp和http的方式接入,推荐通过tcp方式接入;
- RocketMQ5.0支持remoting、GRPC协议,与4.0最大的区别就是支持了GRPC,以更好的与开源社区对齐,但仅支持通过tcp接入;
普通接入
普通接入即通过new对象的方式实现,这种情况下需要注意不应每次都new对象,容易耗尽tcp连接资源,引发broken pipe等报错。
RocketMQ4.0
RocketMQ5.0
普通接入请参考:链接
Springboot接入
RocketMQ4.0
官方接入
spring集成可参考:链接
通过rocketmq-spring-boot-starter接入
即通过以下jar接入,推荐使用2.2.1以后的版本
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>${version}</version></dependency>
rocketmq-spring-boot-starter依赖的是rocketmq-client包,所以他是一个支持remoting协议的包(rocketmq-client支持remoting协议,rocketmq-client-java支持remoting和grpc协议)
接入方式如下:
- 修改application.prpoerties,如下所示:
#NAMESRV_ADDR, 通过"实例管理--获取接入点信息--TCP协议接入点"获取。#如果使用当前Demo访问阿里云RocketMQ 4.0系列实例,接入点应该是类似这样的格式 http://MQ_INST_XXX:xxx,注意!!!一定要有http协议头#如果使用当前Demo访问阿里云RocketMQ 5.0系列实例,接入点应该是类似这样的格式 rmq-cn-xxx.xx:xxx,注意!!!一定不要自己添加http协议头rocketmq.name-server=#如果使用当前Demo访问阿里云RocketMQ 4.0系列实例,请设置访问的阿里云账号的AccessKeyId。#如果使用当前Demo访问阿里云RocketMQ 5.0系列实例,请设置实例详情页获取的实例用户名,不要设置阿里云账号的AccessKeyId。rocketmq.consumer.access-key=#如果使用当前Demo访问阿里云RocketMQ 4.0系列实例,请设置访问的阿里云账号的AccessKeySecret。#如果使用当前Demo访问阿里云RocketMQ 5.0系列实例,请设置实例详情页获取的实例密码,不要设置阿里云账号的AccessKeySecret。rocketmq.consumer.secret-key=# 注意,4.0实例需要在控制台实例详情中查看是否有命名空间,无命名空间则不需要填写此项rocketmq.consumer.instance-name=rmq-cn-pe33gx4so0grocketmq.producer.group=group2#如果使用当前Demo访问阿里云RocketMQ 4.0系列实例,请设置访问的阿里云账号的AccessKeyId。#如果使用当前Demo访问阿里云RocketMQ 5.0系列实例,请设置实例详情页获取的实例用户名,不要设置阿里云账号的AccessKeyId。rocketmq.producer.access-key=#如果使用当前Demo访问阿里云RocketMQ 4.0系列实例,请设置访问的阿里云账号的AccessKeySecret。#如果使用当前Demo访问阿里云RocketMQ 5.0系列实例,请设置实例详情页获取的实例密码,不要设置阿里云账号的AccessKeySecret。rocketmq.producer.secret-key=# 默认发送超时时间,可修改rocketmq.producer.send-message-timeout=30000
- producer代码示例:
publicclassDemoController { privateRocketMQTemplaterocketMQTemplate; "/testDelay") (publicvoidtest() { Message<String>msg=MessageBuilder.withPayload("Hello,RocketMQ").build(); // 发送延迟消息,注意topic要先创建好SendResultsendResult=rocketMQTemplate.syncSendDeliverTimeMills("topic1", msg, System.currentTimeMillis() +100000); System.out.println(JSON.toJSONString(sendResult)); } "/testSend") (publicvoidtestSend() { Message<String>msg=MessageBuilder.withPayload("Hello,RocketMQ").build(); // 发送普通消息,注意topic要先创建好 SendResultsendResult=rocketMQTemplate.send("simple-topic", msg); System.out.println(JSON.toJSONString(sendResult)); } }
- consumer代码示例:
// 需要注意注解中的消费方式consumeMode(并发、顺序)、消费类型messageModel(集群、广播),需要与控制台设置的保持一致,同时注意topic和group需提前创建好topic="simple-topic", selectorExpression="*", consumerGroup="simple-group", (messageModel=MessageModel.CLUSTERING, consumeMode=ConsumeMode.CONCURRENTLY) publicclassSimpleConsumerimplementsRocketMQListener<MessageExt> { publicvoidonMessage(MessageExtmessage) { log.info("simple msg:{}, body:{}", message.getMsgId(), message.getBody()); } }
RocketMQ5.0
官方接入
RocketMQ5.0阿里云官方client并未提供spring bean方式接入的示例,在普通接入中5.0的代码示例提供了在生产者和消费者initMethod阶段的单例实现demo,但并未提供destroyMethod阶段的demo,如果需要用官方范例接入的话需要自行在spring生命周期中实现destory,否则可能会在应用发布、重启等过程中导致异常报错。
通过rocketmq-spring-boot-starter接入
与RocketMQ4.0的接入方式一致,但需要注意的是,此种接入方式使用的是remoting协议(非5.0新增的grpc协议),只是5.0兼容了remoting协议。