上篇文章我们讲解了消息队列的主要作用,没有看过的小伙伴可以先看一下,mq消息队列作用。今天我们来从代码的角度来学习MQ队列的入门使用,由于笔者在工作中用到的MQ是RocketMQ,所以就使用RocketMQ来讲解了。
一. 前提条件
我们需要把RocketMQ运行起来,它包含两个组件,NameServer和Broker,把这两个组件运行起来就可以了,可以二进制运行,或者把源码拉下来运行,大家参考一下官方文档就可以运行起来了,这里我讲解一下拉取源码的方式运行。
先将distribution/conf目录,复制到源码的目录
- 运行NameServer
运行成功输出
- 运行Broker
broker.conf添加配置:namesrvAddr=127.0.0.1:9876
运行成功输出
单机版的RocketMQ集群就搭建完成了,在本机运行可以方便我们以后调试。
二.生产者
maven需要依赖rocketmq-client
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.5.0</version> </dependency>
public class Producer { public static void main(String[] args) throws MQClientException{ DefaultMQProducer producer = new DefaultMQProducer("demoProducerGroup"); //设置nameserver地址 producer.setNamesrvAddr("127.0.0.1:9876"); producer.start(); for (int i = 0; i < 1; i++) { try { //生成mq消息 Message msg = new Message("TopicTest" /* Topic */, "TagA" /* Tag */, ("关注java面试教程 学习更多知识").getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); //同步发送mq消息到broker,获取到结果,可以知道是否发送成功 SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } catch (Exception e) { e.printStackTrace(); } } producer.shutdown(); } }
运行结果:发送成功,并返回了消息id
三.消费者
public class Consumer { public static void main(String[] args) throws MQClientException { //设置消费者组 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("demoConsumerGroup"); //设置nameserver地址 consumer.setNamesrvAddr("127.0.0.1:9876"); //第一次消费从那开始消费 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); //消费模式 consumer.setMessageModel(MessageModel.CLUSTERING); consumer.subscribe("TopicTest", "*"); //mq消息的回调函数 consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }); consumer.start(); System.out.printf("Consumer Started.%n"); } }
输出刚刚发送到mq的消息:
四.小结
今天我们学习了怎么运行RocketMQ, 以及使用它来生产消息和消费消息,下篇我们来学习一下RocketMQ的生产者和消费者的核心概念。