消息队列
MQ有什么用?
消息队列有很多使用场景,比较常见的有3个:解耦、异步、削峰。
- 解耦:传统的软件开发模式,各个模块之间相互调用,数据共享,每个模块都要时刻关注其他模块的是否更改或者是否挂掉等等,使用消息队列,可以避免模块之间直接调用,将所需共享的数据放在消息队列中,对于新增业务模块,只要对该类消息感兴趣,即可订阅该类消息,对原有系统和业务没有任何影响,降低了系统各个模块的耦合度,提高了系统的可扩展性。
- 异步:消息队列提供了异步处理机制,在很多时候应用不想也不需要立即处理消息,允许应用把一些消息放入消息中间件中,并不立即处理它,在之后需要的时候再慢慢处理。
- 削峰:在访问量骤增的场景下,需要保证应用系统的平稳性,但是这样突发流量并不常见,如果以这类峰值的标准而投放资源的话,那无疑是巨大的浪费。使用消息队列能够使关键组件支撑突发访问压力,不会因为突发的超负荷请求而完全崩溃。消息队列的容量可以配置的很大,如果采用磁盘存储消息,则几乎等于“无限”容量,这样一来,高峰期的消息可以被积压起来,在随后的时间内进行平滑的处理完成,而不至于让系统短时间内无法承载而导致崩溃。在电商网站的秒杀抢购这种突发性流量很强的业务场景中,消息队列的强大缓冲能力可以很好的起到削峰作用。
说一说生产者与消费者模式
所谓生产者-消费者问题,实际上主要是包含了两类线程。一种是生产者线程用于生产数据,另一种是消费者线程用于消费数据,为了解耦生产者和消费者的关系,通常会采用共享的数据区域,就像是一个仓库。生产者生产数据之后直接放置在共享数据区中,并不需要关心消费者的行为。而消费者只需要从共享数据区中去获取数据,就不再需要关心生产者的行为。但是,这个共享数据区域中应该具备这样的线程间并发协作的功能:
- 如果共享数据区已满的话,阻塞生产者继续生产数据放置入内;
- 如果共享数据区为空的话,阻塞消费者继续消费数据。
在Java语言中,实现生产者消费者问题时,可以采用三种方式:
- 使用 Object 的 wait/notify 的消息通知机制;
- 使用 Lock 的 Condition 的 await/signal 的消息通知机制;
- 使用 BlockingQueue 实现。
消息队列如何保证消息不丢?
Kafka丢失消息分为如下几种情况:
- 生产者丢消息:
生产者没有设置相应的策略,发送过程中丢失数据。 - Kafka自己丢消息:
比较常见的一个场景,就是Kafka的某个broker宕机了,然后重新选举partition的leader时。如果此时follower还没来得及同步数据,leader就挂了,然后某个follower成为了leader,它就少了一部分数据。 - 消费端丢消息:
消费者消费到了这个数据,然后消费之自动提交了offset,让Kafka知道你已经消费了这个消息,当你准备处理这个消息时,自己挂掉了,那么这条消息就丢了。
针对上述三种情况,Kafka可以采用如下方式避免消息丢失:
- 生产者丢消息:
关闭自动提交offset,在自己处理完毕之后手动提交offset,这样就不会丢失数据。 - Kafka自己丢消息:一般要求设置4个参数来保证消息不丢失:
- 给topic设置 replication.factor 参数,这个值必须大于1,表示要求每个partition必须至少有2个副本。
- 在kafka服务端设置 min.isync.replicas 参数,这个值必须大于1,表示 要求一个leader至少感知到有至少一个follower在跟自己保持联系正常同步数据,这样才能保证leader挂了之后还有一个follower。
- 在生产者端设置 acks=all ,表示 要求每条每条数据,必须是写入所有replica副本之后,才能认为是写入成功了。
- 在生产者端设置 retries=MAX (很大的一个值),表示这个是一旦写入失败,就无限重试。
- 消费端丢消息:
如果按照上面设置了ack=all,则一定不会丢失数据,要求是,你的leader接收到消息,所有的follower都同步到了消息之后,才认为本次写成功了。如果没满足这个条件,生产者会自动不断的重试,重试无限次。
MQ处理消息失败了怎么办?
一般生产环境中,都会在使用MQ的时候设计两个队列:一个是核心业务队列,一个是死信队列。核心业务队列,就是比如专门用来让订单系统发送订单消息的,然后另外一个死信队列就是用来处理异常情况的。
生产者与消费者的声明调用
1. @RunWith(SpringRunner.class) 2. @SpringBootTest 3. @ContextConfiguration(classes = CommunityApplication.class) 4. public class KafkaTests { 5. 6. @Autowired 7. private KafkaProducer kafkaProducer; 8. 9. @Test 10. public void testKafka() { 11. kafkaProducer.sendMessage("test", "你好"); 12. kafkaProducer.sendMessage("test", "在吗"); 13. 14. try { 15. Thread.sleep(1000 * 10); 16. } catch (InterruptedException e) { 17. e.printStackTrace(); 18. } 19. } 20. 21. } 22. 23. @Component 24. class KafkaProducer { 25. 26. @Autowired 27. private KafkaTemplate kafkaTemplate; 28. public void sendMessage(String topic, String content) { 29. kafkaTemplate.send(topic, content); 30. } 31. 32. } 33. 34. @Component 35. class KafkaConsumer { 36. @KafkaListener(topics = {"test"}) 37. public void handleMessage(ConsumerRecord record) { 38. System.out.println(record.value()); 39. } 40. 41. }
搜索引擎
说说ElasticSearch put的全过程
put过程主要分为三个阶段:
- 协调阶段:
Client 客户端选择一个 node 发送 put 请求,此时当前节点就是协调节点(coordinating node)。协调节点根据 document 的 id 进行路由,将请求转发给对应的 node。这个 node 上的是 primary shard 。 - 主要阶段:对应的 primary shard 处理请求,写入数据 ,然后将数据同步到 replica shard。
- primary shard 会验证传入的数据结构;
- 本地执行相关操作;
- 将操作转发给 replica shard。
- 当数据写入 primary shard 和 replica shard 成功后,路由节点返回响应给 Client。
- 副本阶段:
每个 replica shard 在转发后,会进行本地操作。