一、Rocket 安装
1.1 下载
http://rocketmq.apache.org/release_notes/release-notes-4.4.0/
1.2 环境
1.jdk1.8
2.linux
1.3 安装
# 1.上传服务器 rocketmq-all-4.4.0-bin-release.zip # 2.解压 unzip rocketmq-all-4.4.0-bin-release.zip # 3.移动到/usr/local mv rocketmq-all-4.4.0-bin-release /usr/local/rocketmq
1.4 修改配置文件
编辑bin/runbroker.sh 和 bin/runserver.sh文件,修改里面的 # JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g" JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"
1.5 测试启动
# 进入bin目录下 # 1.启动nameserver nohup ./bin/mqnamesrv & # 2.启动broker nohup ./mqbroker -n localhost:9876 &
1.6 测试消息发送
export NAMESRV_ADDR=localhost:9876 bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
1.7 测试消息接受
export NAMESRV_ADDR=localhost:9876 bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
1.8 关闭
bin/mqshutdown broker bin/mqshutdown namesrv
二、Rocket 控制台安装
2.1 下载
在 git 上 下 载 下 面 的 工 程 rocketmq-console-1.0.0 https://github.com/apache/rocketmq-externals/releasess
2.2 修改配置
修 改 配 置 文 件
rocketmq-console\src\main\resources\application.properties
server.port=7777 #项目启动后的端口号
rocketmq.config.namesrvAddr=192.168.109.131:9876 #nameserv的地址,注意防火墙要开启9876端口
2.3 打包启动
# 进入控制台项目,将工程打成jar包
mvn clean package -Dmaven.test.skip=true # 启动控制台
java -jar target/rocketmq-console-ng-1.0.0.jar
2.4 测试访问
localhost:7777
三、springboot集成rocketmq
3.1 pom依赖
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.0.2</version> </dependency>
3.2 发送消息
public class RocketMQSendTest { public static void main(String[] args) throws Exception { //1. 创建消息生产者, 指定生产者所属的组名 DefaultMQProducer producer = new DefaultMQProducer("myproducer-group"); //2. 指定Nameserver地址 producer.setNamesrvAddr("192.168.109.131:9876"); //3. 启动生产者 producer.start(); //4. 创建消息对象,指定主题、标签和消息体 Message msg = new Message("myTopic", "myTag", ("RocketMQ Message").getBytes()); //5. 发送消息 SendResult sendResult = producer.send(msg,10000); System.out.println(sendResult); //6. 关闭生产者 producer.shutdown(); } }
3.3 接受消息
//接收消息 public class RocketMQReceiveTest { public static void main(String[] args) throws MQClientException { //1. 创建消息消费者, 指定消费者所属的组名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myconsumer- group"); //2. 指定Nameserver地址 consumer.setNamesrvAddr("192.168.109.131:9876"); //3. 指定消费者订阅的主题和标签 consumer.subscribe("myTopic", "*"); //4. 设置回调函数,编写处理消息的方法 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.println("Receive New Messages: " + msgs); //返回消费状态 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //5. 启动消息消费者 consumer.start(); System.out.println("Consumer Started."); } }