开发者学堂课程【RocketMQ知识精讲与项目实战(第一阶段):顺序消息消费者】学习笔记,与课程紧密联系,让用户快速学习知识
课程地址:https://developer.aliyun.com/learning/course/702/detail/12383
顺序消息消费者
内容介绍
一、 编写代码
二、 测试
三、 结果
四、 总结
一、编写代码
现在编写一个 consumer 去消费
public class Consumer
public static void main(String[] args) throws MQClientExceptio(
//基本代码拿过来
//1.创建消费者 Consumer,制定消费者组名DefaultMQPushConsumer consumer = new DefaultMQPushConsumer( consumerGroup:"group1");
//2.指定Nameserver地址consumer.setNamesrvAddr("192.168.25.135:9876192.168.25.138:9876");
//3.订阅主题 Topic 和 Tag
consumer.subscribe( topic: "orderTopic",subExpression:"*");
//4.注册消息监听器
consumer.registerMessageListener(new MessageListenerOrderly()
@Override
public ConsumeOrderlyStatus consumeNessage(List msgs, ConsumeOrderlyContext context) (
for (MessageExt msg : msgs)
System.out.println
(”线程名称:T+Thread.currentThread().getName()+":"+new String(msg.getBody()));
//扩展线程名称
return Consumeorderlystatus.sucess;
二、测试
写完后就可以测试了,先发送。
//发送消息
for (int i=e;icordersteps.size();i++) f
String body= ordersteps.get(i)+"";
Message message= new Message( topic: "OrderTopic",tags: "Order",keys: "i"+i,body.getBytes());
参数一:消息对象
参数二:消息队列的选择器
参数三:选择队列的业务标识(订单 ID)SendResultsendResult = producer.send(message, new NessageQueueselector()(
@param mqs:队列集合
@param msg:消息对象
@param arg:业务标识的参数
@return
稍等一下,等待发送结果。
会看到打印结果:system.out.printin(发送结果:“+sendResult);
三、结果
关键是要看消费的结果。
//5.启动消费者
consumer.start();
System.out.print1n
("消费者启动");
稍等一下消费者正在启动。
当结果出来后,查看同一个订单是否按着顺序消费,必须保证顺序性。此时整个流程就弄完了。
四、总结
对于消息发送者来说,基本流程和之前一样,无非是再发送消息时传了一个消息队列选择器,目的是为了再发送消息时选择一个固定对点,需要根据 id 去选择,返回
当前对点。消费者流程也是同之前没区别,只不过在注册消息监听器时,
用了一个 MessageListenerOrdenly ,用它的原因时因为我们是在针对同一个队列消息,采用单线程,一个队列只用一个线程去消费,这样就能保证消费的顺序性,
其他都没有区别,正常消费即可。