尽管微服务架构可能并不是所有系统的灵丹妙药,但它无疑具有其优势,尤其是在构建具有许多不同组件的复杂系统时。如果您正在考虑微服务,则考虑不同服务的通信方式。在本文中,我们将研究如何设置Apache Kafka实例,创建用户服务以将数据发布到主题,以及构建通知服务以使用这些主题中的数据。具体来说,我们将构建一个两步验证应用程序,用户可以在该程序中进行注册,接收带有验证码的邮件,并使用该代码完成注册。源代码可以在这里找到。
为什么选择Apache Kafka?
Kafka是LinkedIn于2011年创建的分布式流媒体平台,用于处理高吞吐量,低延迟传输以及实时处理记录流。它的三大功能使其非常适合此用例:
- 发布和订阅记录流。在这方面,它类似于消息队列或企业消息传递系统。
- 以容错方式存储记录流。
- 处理发生的记录流。
设置Apache Kafka
开始本教程之前,需要满足以下条件:
- Mac版Docker或Windows版Docker
- Docker Compose的知识
- Node.js的知识
我们将使用Wurstmeister Kafka Docker映像。请注意,Kafka使用Zookeeper在不同的Kafka节点之间进行协调。
docker-compose.yml
类似docker-compose.yml
用于为Kafka和Zookeeper提取图像。Kafka服务所需的配置选项之一是KAFKA_ZOOKEEPER_CONNECT
,它告诉Kafka在哪里可以找到Zookeeper实例。
1. version: '2.1' 2. services: 3. zookeeper: 4. container_name: zookeeper 5. image: wurstmeister/zookeeper 6. ports: 7. - "2181:2181" 8. kafka: 9. container_name: kafka 10. image: wurstmeister/kafka 11. ports: 12. - "9092" 13. depends_on: 14. - "zookeeper" 15. environment: 16. KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
将数据发布到Kafka主题
要将数据发布到Kafka主题,我们将创建一个提供两个端点的用户服务:
/api/register
–将用户详细信息存储在内存中存储节点缓存中,并将用户数据发布到Kafka主题user_account_created
。/api/verify
–验证提供的代码正确,并将用户数据发布到Kafka主题user_account_verified
。
我们使用node-rdkafka
NPM软件包创建一个生产者,该生产者从我们的节点应用程序连接到Kafka:
1. let producerReady; 2. producer = new kafka.Producer({ 3. debug: 'all', 4. 'client.id': 'user-api', 5. 'metadata.broker.list': KAFKA_BROKER_LIST, 6. 'compression.codec': 'gzip', 7. 'retry.backoff.ms': 200, 8. 'message.send.max.retries': 10, 9. 'socket.keepalive.enable': true, 10. 'queue.buffering.max.messages': 100000, 11. 'queue.buffering.max.ms': 1000, 12. 'batch.num.messages': 1000000, 13. dr_cb: true 14. }); 15. producer.connect({}, err => { 16. if (err) { 17. logger.error('connect', err); 18. } 19. }); 20. producerReady = new Promise((resolve, reject) => { 21. producer.on('ready', () => { 22. logger.info('producer ready'); 23. resolve(producer); 24. }); 25. });
我们创建一个新的Promise对象,该对象解析为准备开始发布数据的生产者。这在我们的sendMessage
函数中使用,该函数将数据发布到Kafka主题分区:
1. KafkaService.prototype.sendMessage = function sendMessage( 2. topic, 3. payload, 4. partition = 0 5. ) { 6. return producerReady 7. .then(producer => { 8. const message = Buffer.from(JSON.stringify(payload)); 9. producer.produce(topic, partition, message); 10. }) 11. .catch(error => logger.error('unable to send message', error)); 12. };
使用Kafka主题中的数据
为了使用Kafka主题中的数据,我们将创建一个通知服务,以监听来自我们主题的数据,并根据从中获取数据的主题发送带有验证码或成功消息的电子邮件。
我们创建一个连接到Kafka的使用者,其中KAFKA_BROKER_LIST
是所有Kafka实例的逗号分隔列表。
1. process.stdin.resume(); // keep process alive 2. 3. require('dotenv').config(); 4. 5. const Kafka = require('node-rdkafka'); 6. 7. const logger = require('./logger'); 8. 9. const sendMail = require('./email'); 10. 11. const KAFKA_BROKER_LIST = process.env.KAFKA_BROKER_LIST; 12. 13. const consumer = new Kafka.KafkaConsumer({ 14. //'debug': 'all', 15. 'metadata.broker.list': KAFKA_BROKER_LIST, 16. 'group.id': 'notification-service', 17. 'enable.auto.commit': false 18. });
node-rdkafka
返回的使用者对象是可读流的实例。我们等待ready
事件订阅我们的主题user_account_created
和user_account_verified
,并侦听这些主题中的数据:
1. const topics = [ 2. 'user_account_created', 3. 'user_account_verified' 4. ]; 5. 6. //counter to commit offsets every numMessages are received 7. let counter = 0; 8. let numMessages = 5; 9. 10. consumer.on('ready', function(arg) { 11. logger.info('consumer ready.' + JSON.stringify(arg)); 12. 13. consumer.subscribe(topics); 14. //start consuming messages 15. consumer.consume(); 16. }); 17. 18. consumer.on('data', function(metadata) { 19. counter++; 20. 21. //committing offsets every numMessages 22. if (counter % numMessages === 0) { 23. logger.info('calling commit'); 24. consumer.commit(metadata); 25. } 26. 27. // Output the actual message contents 28. const data = JSON.parse(metadata.value.toString()); 29. logger.info('data value', data); 30. 31. if(metadata.topic === 'user_account_created'){ 32. const to = data.email; 33. const subject = 'Verify Account'; 34. const content = `Hello ${data.first_name}, 35. Please use this code ${data.code} to complete your verification`; 36. sendMail(subject, content,to); 37. }else if(metadata.topic === 'user_account_verified') { 38. const to = data.email; 39. const subject = 'Account Verified'; 40. const content = `Hello ${data.first_name}, 41. You have successfully been verified`; 42. sendMail(subject, content,to); 43. } 44. 45. }); 46. 47. consumer.on('disconnected', function(arg) { 48. logger.info('consumer disconnected. ' + JSON.stringify(arg)); 49. }); 50. 51. //logging all errors 52. consumer.on('event.error', function(err) { 53. logger.error('Error from consumer', err, 'code: ', err.code); 54. }); 55. 56. //starting the consumer 57. consumer.connect();
当消息发布到我们正在侦听的任何主题时,将调用data
事件处理程序。在这里,我们解析传入的消息并检查元数据对象,以了解接收到的数据所针对的主题,因此我们可以执行适当的操作。
结论
我们的两因素身份验证应用程序演示了使用Apache Kafka(还有其他系统,例如RabbitMQ , ZeroMQ )的两个微服务之间的通信模式,相对于Feign增加了未来的灵活性。
例如,假设我们将来会添加一个推荐服务,当新用户登录时,该服务需要发送推荐;它仅订阅user_account_verified
主题,因此无需更改用户服务。
系统架构结论(1)数据管道场景,MQ比cache更合适;(2)服务化架构,不应该绕过service读取其后端的cache/db,而应该通过RPC接口访问;(3)MQ通信可扩展