同步调用优点:
时效性强,打电话、直播,很快可以得到结果
同步调用的问题:
耦合度高
性能和吞吐能力差
有额外的资源消耗
有级联失败的问题
异步:
对高并发有要求的功能使用异步
优点:
耦合度低
性能和吞吐能力高
流量削峰(对秒杀来说很重要)
故障隔离
缺点:
异步时效性没有同步好
依赖于Broker可靠性、安全性等要求高·1—————》Broker—>mq消息队列
架构明显变复杂了
场景:
双11是购物狂节,用户下单后,订单系统需要通知库存系统,传统的做法就是订单系统调用库存系统的接口
订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下但成功。
库存系统:订阅订单的消息,获取下单信息,进行库操作。就算库存系统出现故障,消息列队也能保证消息的可靠投递,不会导致消息丢失
流量削峰
场景:秒杀活动,一般会因为流量过大,导致应用挂掉,为了解决这个问题,一般在应用前端加入消息队列
作用:
1,可以控制活动人数,超过此阈值的订单直接丢弃
2,可以缓解短时间的高流量压垮应用(应用程序安自己的最大处理能力获取订单)
消息队列mq
消息-----》以队列的形式存储
先进先出
异步解耦
场景:注册邮件和短信通知
在秒杀团队—》高并发 流量削峰
总的来说,不管是rocketmq还是rabbitmq,他们应用的场景是
1、流量削峰
2、日志处理
3、应用解耦
4、异步处理
RocketMQ发送消息和消费消息测试类
====================二期实现
导入依赖
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.8.0</version> </dependency>
测试代码
package com.example.springbootproject; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import java.io.UnsupportedEncodingException; import java.nio.charset.StandardCharsets; import java.util.List; @SpringBootTest @RunWith(SpringRunner.class) @Slf4j public class mqtest { /** * 生产者 * @throws Exception */ @Test public void test() throws Exception { DefaultMQProducer producer = new DefaultMQProducer("GROUP_DEMO"); producer.setNamesrvAddr("127.0.0.1:9876"); producer.setSendLatencyFaultEnable(true); producer.start(); for (int i = 0; i < 10; i++) { Message msg = new Message("TOPIC_TEST", "TAGE_TEST", ("ROCKETMQ000" + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); //发送核心方法 //同步 SendResult send = producer.send(msg); //异步 producer.send(msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.println("===" + sendResult); } @Override public void onException(Throwable throwable) { System.out.println("e:" + throwable); } }); //单向 producer.sendOneway(msg); System.out.println("send:%s%n" + send); } //producer.shutdown(); } /** * 消费者 * @throws Exception */ @Test public void consume() throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_consumer"); consumer.setNamesrvAddr("127.0.0.1:9876"); //订阅Topic,去消费生产者产生的消息。 consumer.subscribe("TOPIC_TEST", "*");//tag tagA|tagB|tagC consumer.setMessageModel(MessageModel.CLUSTERING); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) { try { for (MessageExt item : list) { String topic = item.getTopic(); String tags = item.getTags(); String msgBody = new String(item.getBody(), "utf-8"); System.out.println("收到消息:topic:" + topic + ",tags:" + tags + ",msg:" + msgBody); } } catch (Exception e) { log.error("e:", e); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //启动消费者 consumer.start(); System.out.println("consumer start"); } }
====================一期实现
public class RockMQSendMessageTest { //发送消息 public static void main(String[] args) throws Exception { //1创建消息生产者,并且设置生成组名 DefaultMQProducer producer = new DefaultMQProducer("myproducer-group"); //2为生产者设置NameServer的地址 mq ip:port port可以在本地跑起来mq producer.setNamesrvAddr("......."); //3 启动生产者 producer.start(); //4 构建消息对象,主要设置消息的主题 标签 内容 topic tags 内容 Message message = new Message("myTopic", "myTag", ("Test jfdlfd").getBytes()); //5发送消息 SendResult result = producer.send(message, 1000); System.out.println(result); //6 关闭生产者 producer.shutdown(); } }
public class RocketMQReceviceMessageTest { //接收消息 public static void main(String[] args) throws Exception { //1创建消费者,并且为其制定消费者组名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myconsumer-group"); //2为消费者设置NameServer的地址ip:port可以在本地跑起来mq consumer.setNamesrvAddr("......."); //3制定消费者订阅的主题和标签 consumer.subscribe("myTopic", "*"); //4设置一个回调函数,并在函数编写接收到消息之后的处理方法 consumer.registerMessageListener(new MessageListenerConcurrently() { //j处理获取到的消息 @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { //消费逻辑 System.out.println("Message====>" + list); //消息成功 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //5启动消费者 consumer.start(); System.out.println("启动消费成功"); } }