在微服务中使用Apache Kafka进行异步通信

本文涉及的产品
注册配置 MSE Nacos/ZooKeeper,118元/月
云原生网关 MSE Higress,422元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 在微服务中使用Apache Kafka进行异步通信

尽管微服务架构可能并不是所有系统的灵丹妙药,但它无疑具有其优势,尤其是在构建具有许多不同组件的复杂系统时。如果您正在考虑微服务,则考虑不同服务的通信方式。在本文中,我们将研究如何设置Apache Kafka实例,创建用户服务以将数据发布到主题,以及构建通知服务以使用这些主题中的数据。具体来说,我们将构建一个两步验证应用程序,用户可以在该程序中进行注册,接收带有验证码的邮件,并使用该代码完成注册。源代码可以在这里找到。

为什么选择Apache Kafka?

Kafka是LinkedIn于2011年创建的分布式流媒体平台,用于处理高吞吐量,低延迟传输以及实时处理记录流。它的三大功能使其非常适合此用例:

  • 发布和订阅记录流。在这方面,它类似于消息队列或企业消息传递系统。
  • 以容错方式存储记录流。
  • 处理发生的记录流。

设置Apache Kafka

开始本教程之前,需要满足以下条件:

  • Mac版Docker或Windows版Docker
  • Docker Compose的知识
  • Node.js的知识

我们将使用Wurstmeister Kafka Docker映像。请注意,Kafka使用Zookeeper在不同的Kafka节点之间进行协调。

docker-compose.yml类似docker-compose.yml用于为Kafka和Zookeeper提取图像。Kafka服务所需的配置选项之一是KAFKA_ZOOKEEPER_CONNECT ,它告诉Kafka在哪里可以找到Zookeeper实例。

1. version: '2.1'
2. services:
3. zookeeper:
4. container_name: zookeeper
5. image: wurstmeister/zookeeper
6. ports:
7. - "2181:2181"
8. kafka:
9. container_name: kafka
10. image: wurstmeister/kafka
11. ports:
12. - "9092"
13. depends_on:
14. - "zookeeper"
15. environment:
16. KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

将数据发布到Kafka主题

要将数据发布到Kafka主题,我们将创建一个提供两个端点的用户服务:

  • /api/register –将用户详细信息存储在内存中存储节点缓存中,并将用户数据发布到Kafka主题user_account_created
  • /api/verify –验证提供的代码正确,并将用户数据发布到Kafka主题user_account_verified

我们使用node-rdkafka NPM软件包创建一个生产者,该生产者从我们的节点应用程序连接到Kafka:

1. let producerReady;
2. producer = new kafka.Producer({
3. debug: 'all',
4. 'client.id': 'user-api',
5. 'metadata.broker.list': KAFKA_BROKER_LIST,
6. 'compression.codec': 'gzip',
7. 'retry.backoff.ms': 200,
8. 'message.send.max.retries': 10,
9. 'socket.keepalive.enable': true,
10. 'queue.buffering.max.messages': 100000,
11. 'queue.buffering.max.ms': 1000,
12. 'batch.num.messages': 1000000,
13. dr_cb: true
14. });
15. producer.connect({}, err => {
16. if (err) {
17. logger.error('connect', err);
18. }
19. });
20. producerReady = new Promise((resolve, reject) => {
21. producer.on('ready', () => {
22. logger.info('producer ready');
23. resolve(producer);
24. });
25. });

我们创建一个新的Promise对象,该对象解析为准备开始发布数据的生产者。这在我们的sendMessage函数中使用,该函数将数据发布到Kafka主题分区:

1. KafkaService.prototype.sendMessage = function sendMessage(
2.     topic,
3.     payload,
4.     partition = 0
5. ) {
6. return producerReady
7. .then(producer => {
8. const message = Buffer.from(JSON.stringify(payload));
9. producer.produce(topic, partition, message);
10. })
11. .catch(error => logger.error('unable to send message', error));
12. };

使用Kafka主题中的数据

为了使用Kafka主题中的数据,我们将创建一个通知服务,以监听来自我们主题的数据,并根据从中获取数据的主题发送带有验证码或成功消息的电子邮件。

我们创建一个连接到Kafka的使用者,其中KAFKA_BROKER_LIST是所有Kafka实例的逗号分隔列表。

1. process.stdin.resume(); // keep process alive
2. 
3. require('dotenv').config();
4. 
5. const Kafka = require('node-rdkafka');
6. 
7. const logger = require('./logger');
8. 
9. const sendMail = require('./email');
10. 
11. const KAFKA_BROKER_LIST = process.env.KAFKA_BROKER_LIST;
12. 
13. const consumer = new Kafka.KafkaConsumer({
14. //'debug': 'all',
15. 'metadata.broker.list': KAFKA_BROKER_LIST,
16. 'group.id': 'notification-service',
17. 'enable.auto.commit': false
18. });

node-rdkafka返回的使用者对象是可读流的实例。我们等待ready事件订阅我们的主题user_account_createduser_account_verified ,并侦听这些主题中的数据:

1. const topics = [
2. 'user_account_created',
3. 'user_account_verified'
4. ];
5. 
6. //counter to commit offsets every numMessages are received
7. let counter = 0;
8. let numMessages = 5;
9. 
10. consumer.on('ready', function(arg) {
11. logger.info('consumer ready.' + JSON.stringify(arg));
12. 
13. consumer.subscribe(topics);
14. //start consuming messages
15. consumer.consume();
16. });
17. 
18. consumer.on('data', function(metadata) {
19. counter++;
20. 
21. //committing offsets every numMessages
22. if (counter % numMessages === 0) {
23. logger.info('calling commit');
24. consumer.commit(metadata);
25. }
26. 
27. // Output the actual message contents
28. const data = JSON.parse(metadata.value.toString());
29. logger.info('data value', data);
30. 
31. if(metadata.topic === 'user_account_created'){
32. const to = data.email;
33. const subject = 'Verify Account';
34. const content = `Hello ${data.first_name},
35.       Please use this code ${data.code} to complete your verification`;
36. sendMail(subject, content,to);
37. }else if(metadata.topic === 'user_account_verified') {
38. const to = data.email;
39. const subject = 'Account Verified';
40. const content = `Hello ${data.first_name},
41.       You have successfully been verified`;
42. sendMail(subject, content,to);
43. }
44. 
45. });
46. 
47. consumer.on('disconnected', function(arg) {
48. logger.info('consumer disconnected. ' + JSON.stringify(arg));
49. });
50. 
51. //logging all errors
52. consumer.on('event.error', function(err) {
53. logger.error('Error from consumer', err, 'code: ', err.code);
54. });
55. 
56. //starting the consumer
57. consumer.connect();

当消息发布到我们正在侦听的任何主题时,将调用data事件处理程序。在这里,我们解析传入的消息并检查元数据对象,以了解接收到的数据所针对的主题,因此我们可以执行适当的操作。

结论

我们的两因素身份验证应用程序演示了使用Apache Kafka(还有其他系统,例如RabbitMQ , ZeroMQ )的两个微服务之间的通信模式,相对于Feign增加了未来的灵活性。

例如,假设我们将来会添加一个推荐服务,当新用户登录时,该服务需要发送推荐;它仅订阅user_account_verified主题,因此无需更改用户服务。

系统架构结论(1)数据管道场景,MQ比cache更合适;(2)服务化架构,不应该绕过service读取其后端的cache/db,而应该通过RPC接口访问;(3)MQ通信可扩展

相关文章
|
16天前
|
消息中间件 安全 Kafka
Apache Kafka安全加固指南:保护你的消息传递系统
【10月更文挑战第24天】在现代企业环境中,数据的安全性和隐私保护至关重要。Apache Kafka作为一款广泛使用的分布式流处理平台,其安全性直接影响着业务的稳定性和用户数据的安全。作为一名资深的Kafka使用者,我深知加强Kafka安全性的重要性。本文将从个人角度出发,分享我在实践中积累的经验,帮助读者了解如何有效地保护Kafka消息传递系统的安全性。
44 7
|
16天前
|
消息中间件 数据挖掘 Kafka
Apache Kafka流处理实战:构建实时数据分析应用
【10月更文挑战第24天】在当今这个数据爆炸的时代,能够快速准确地处理实时数据变得尤为重要。无论是金融交易监控、网络行为分析还是物联网设备的数据收集,实时数据处理技术都是不可或缺的一部分。Apache Kafka作为一款高性能的消息队列系统,不仅支持传统的消息传递模式,还提供了强大的流处理能力,能够帮助开发者构建高效、可扩展的实时数据分析应用。
64 5
|
16天前
|
消息中间件 存储 监控
构建高可用性Apache Kafka集群:从理论到实践
【10月更文挑战第24天】随着大数据时代的到来,数据传输与处理的需求日益增长。Apache Kafka作为一个高性能的消息队列服务,因其出色的吞吐量、可扩展性和容错能力而受到广泛欢迎。然而,在构建大规模生产环境下的Kafka集群时,保证其高可用性是至关重要的。本文将从个人实践经验出发,详细介绍如何构建一个高可用性的Kafka集群,包括集群规划、节点配置以及故障恢复机制等方面。
48 4
|
16天前
|
消息中间件 监控 大数据
优化Apache Kafka性能:最佳实践与调优策略
【10月更文挑战第24天】作为一名已经对Apache Kafka有所了解并有实际使用经验的开发者,我深知在大数据处理和实时数据流传输中,Kafka的重要性不言而喻。然而,在面对日益增长的数据量和业务需求时,如何保证系统的高性能和稳定性成为了摆在我们面前的一个挑战。本文将从我的个人视角出发,分享一些关于如何通过合理的配置和调优来提高Kafka性能的经验和建议。
47 4
|
16天前
|
消息中间件 Java Kafka
初识Apache Kafka:搭建你的第一个消息队列系统
【10月更文挑战第24天】在数字化转型的浪潮中,数据成为了企业决策的关键因素之一。而高效的数据处理能力,则成为了企业在竞争中脱颖而出的重要武器。在这个背景下,消息队列作为连接不同系统和服务的桥梁,其重要性日益凸显。Apache Kafka 是一款开源的消息队列系统,以其高吞吐量、可扩展性和持久性等特点受到了广泛欢迎。作为一名技术爱好者,我对 Apache Kafka 产生了浓厚的兴趣,并决定亲手搭建一套属于自己的消息队列系统。
36 2
初识Apache Kafka:搭建你的第一个消息队列系统
|
16天前
|
消息中间件 存储 负载均衡
Apache Kafka核心概念解析:生产者、消费者与Broker
【10月更文挑战第24天】在数字化转型的大潮中,数据的实时处理能力成为了企业竞争力的重要组成部分。Apache Kafka 作为一款高性能的消息队列系统,在这一领域占据了重要地位。通过使用 Kafka,企业可以构建出高效的数据管道,实现数据的快速传输和处理。今天,我将从个人的角度出发,深入解析 Kafka 的三大核心组件——生产者、消费者与 Broker,希望能够帮助大家建立起对 Kafka 内部机制的基本理解。
48 2
|
1月前
|
消息中间件 监控 Kafka
Apache Kafka 成为实时数据流处理的关键组件
【10月更文挑战第8天】随着大数据技术的发展,Apache Kafka 成为实时数据流处理的关键组件。Kafka Manager 提供了一个简洁易用的 Web 界面,方便管理和监控 Kafka 集群。本文详细介绍了 Kafka Manager 的部署步骤和基本使用方法,包括配置文件修改、启动服务、创建和管理 Topic 等操作,帮助你快速上手。
45 3
|
1月前
|
消息中间件 存储 druid
大数据-156 Apache Druid 案例实战 Scala Kafka 订单统计
大数据-156 Apache Druid 案例实战 Scala Kafka 订单统计
38 3
|
14天前
|
消息中间件 Ubuntu Java
Ubuntu系统上安装Apache Kafka
Ubuntu系统上安装Apache Kafka
|
15天前
|
消息中间件 监控 Kafka
Apache Kafka 成为处理实时数据流的关键组件。Kafka Manager 提供了一个简洁的 Web 界面
随着大数据技术的发展,Apache Kafka 成为处理实时数据流的关键组件。Kafka Manager 提供了一个简洁的 Web 界面,方便管理和监控 Kafka 集群。本文详细介绍了 Kafka Manager 的部署步骤和基本使用方法,包括配置文件的修改、启动命令、API 示例代码等,帮助你快速上手并有效管理 Kafka 集群。
38 0

推荐镜像

更多