文章目录
介绍
MQ,Message Queue,是一种提供消息队列服务的中间件,也称为消息中间件,是一套提供了消息生产、存储、消费全过程API的软件系统。消息即数据。RocketMQ是⼀款阿⾥巴巴开源的消息中间件,主要用于限流,异步解耦操作,如付款之后短信通知,订单发货通知等等,都是异步进行执行。
rocketmq搭建
查找rockermq镜像
docker search rocketmq
拉取镜像
docker pull rocketmqinc/rocketmq
创建目录
mkdir -p /docker/rocketmq/data/namesrv/logs /docker/rocketmq/data/namesrv/store
构建namesvr容器
#构建namesrv容器
docker run -d --restart=always --name rmqnamesrv --privileged=true -p 9876:9876 -v /docker/rocketmq/nameserver/logs:/root/logs -v /docker/rocketmq/nameserver/store:/root/store -e "MAX_POSSIBLE_HEAP=100000000" rocketmqinc/rocketmq sh mqnamesrv
启动broker
docker run -d --restart=always --name rmqbroker --link rmqnamesrv:namesrv -p 10911:10911 -p 10909:10909 --privileged=true -v /docker/rocketmq/data/broker/logs:/root/logs -v /docker/rocketmq/data/broker/store:/root/store -v /docker/rocketmq/conf/broker.conf:/opt/docker/rocketmq/broker.conf -e "NAMESRV_ADDR=namesrv:9876" -e "MAX_POSSIBLE_HEAP=200000000" rocketmqinc/rocketmq sh mqbroker -c /opt/docker/rocketmq/broker.conf
安装控制台
docker pull pangliang/rocketmq-console-ng
控制台
docker run -d --restart=always --name rmqadmin -e "JAVA_OPTS=-Drocketmq.namesrv.addr=192.168.5.130:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" -p 8080:8080 pangliang/rocketmq-console-ng
核心的概念
消息(Message)
消息是指,消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题。
队列(Queue)
存储消息的物理实体。一个Topic中可以包含多个Queue,每个Queue中存放的就是该Topic的消息。一个Topic的Queue也被称为一个Topic中消息的分区(Partition)。
主题(Topic)
Topic表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ进行消息订阅的基本单位
标签(Tag)
为消息设置的标签,用于同一主题下区分不同类型的消息。
Producer
消息生产者,负责一般由业务系统。
Consumer
消息消费者,负责消费消息。一个消息消费者会从Broker服务器中获取到消息,并对消息进行相关业务
处理。
项目构建
目录结构
依赖引入
<!--rocketmq-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.2</version>
</dependency>
消息提供者
yml配置
server:
port: 8022
#应用名字
spring:
application:
name: RocketMqOrderProvider
#rocketmq配置
rocketmq:
name-server: 192.168.5.130:9876
producer:
group: order-producer-group
模板发送消息
@RestController
public class MqOrderController {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@RequestMapping("/sendOrderMessage")
public String sendOrderMessage() {
Order order = new Order();
order.setOrderId(1);
order.setOrderNo(1);
order.setProductId(1);
order.setUserId(1);
order.setOrderNum(5);
order.setOrderAmt(new BigDecimal("100.0"));
order.setOrderStatus("下单");
order.setPayStatus("未支付");
order.setCreateUser("elite");
order.setCreateTime(LocalDateTime.now());
rocketMQTemplate.syncSend("order-topic",order.toString(),6000);
return order.toString();
}
}
消息接收者
yml配置
server:
port: 8023
#应用名字
spring:
application:
name: RocketMqOrderConsumer
rocketmq:
name-server: 192.168.5.130:9876
producer:
group: order-consumer-group
消息监听
@Component
@RocketMQMessageListener(topic = "order-topic",consumerGroup = "my-consumer-group")
@Slf4j
public class ConsumerListener implements RocketMQListener<String> {
@Override
public void onMessage(String orderInfo) {
System.out.println(orderInfo);
}
}
测试连接不上服务问题
See http://rocketmq.apache.org/docs/faq/ for further details.; nested exception is org.apache.rocketmq.client.exception.MQClientException: Send [3] times, still failed, cost [6288]ms, Topic: order-topic, BrokersSent: [2e116b166e4d, 2e116b166e4d, 2e116b166e4d]
See http://rocketmq.apache.org/docs/faq/ for further details.] with root cause
org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to 172.17.0.4:10911 failed
at org.apache.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:407) ~[rocketmq-remoting-4.9.3.jar:4.9.3]
问题排查中。。