微服务轮子项目(34) -Kafka

简介: 微服务轮子项目(34) -Kafka

1. Kafka概述

Kafka是最初由Linkedin公司开发,是一个分布式、分区的、多副本的、多订阅者,基于zookeeper协调的分布式日志系统(也可以当做MQ系统),常见可以用于web/nginx日志、访问日志,消息服务等等,Linkedin2010年贡献给了Apache基金会并成为顶级开源项目。

Kafka主要设计目标如下:

  • 以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间的访问性能。
  • 高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒100K条消息的传输。
  • 支持Kafka Server间的消息分区,及分布式消费,同时保证每个partition内的消息顺序传输。
  • 同时支持离线数据处理和实时数据处理。
  • Scale out:支持在线水平扩展

上图中一个topic配置了3个partitionPartition1有两个offset:0和1。Partition2有4个offsetPartition3有1个offset。副本的id和副本所在的机器的id恰好相同。

如果一个topic的副本数为3,那么Kafka将在集群中为每个partition创建3个相同的副本。集群中的每个broker存储一个或多个partition。多个producerconsumer可同时生产和消费数据。

1.1 角色介绍

Broker:

  • Kafka 集群包含一个或多个服务器,服务器节点称为broker。
  • broker存储topic的数据。如果某topic有N个partition,集群有N个broker,那么每个broker存储该topic的一个partition。
  • 如果某topic有N个partition,集群有(N+M)个broker,那么其中有N个broker存储该topic的一个partition,剩下的M个broker不存储该topic的partition数据。
  • 如果某topic有N个partition,集群中broker数目少于N个,那么一个broker存储该topic的一个或多个partition。在实际生产环境中,尽量避免这种情况的发生,这种情况容易导致Kafka集群数据不均衡。

Topic:

  • 每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)

类似于数据库的表名

Partition:

  • topic中的数据分割为一个或多个partition。每个topic至少有一个partition。每个partition中的数据使用多个segment文件存储。partition中的数据是有序的,不同partition间的数据丢失了数据的顺序。如果topic有多个partition,消费数据时就不能保证数据的顺序。在需要严格保证消息的消费顺序的场景下,需要将partition数目设为1。

Producer:

  • 生产者即数据的发布者,该角色将消息发布到Kafka的topic中。broker接收到生产者发送的消息后,broker将该消息追加到当前用于追加数据的segment文件中。生产者发送的消息,存储到一个partition中,生产者也可以指定数据存储的partition。

Consumer:

  • 消费者可以从broker中读取数据。消费者可以消费多个topic中的数据。

Consumer Group

  • 每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。

Leader:

  • 每个partition有多个副本,其中有且仅有一个作为Leader,Leader是当前负责数据的读写的partition。

Follower:

  • Follower跟随Leader,所有写请求都通过Leader路由,数据变更会广播给所有Follower,Follower与Leader保持数据同步。如果Leader失效,则从Follower中选举出一个新的Leader。当Follower与Leader挂掉、卡住或者同步太慢,leader会把这个follower从“in sync replicas”(ISR)列表中删除,重新创建一个Follower。

1.2 四个核心API

Kafka中,客户端和服务器之间的通信是通过一种简单的,高性能的,语言不可知的TCP协议完成的。 该协议是版本控制的,并保持与旧版本的向后兼容性。 我们为Kafka提供Java客户端,但客户端可以使用多种语言。

  • Producer API:允许应用程序将记录流发布到一个或多个Kafka topics。
  • Consumer API:允许应用程序订阅一个或多个topics并处理生成给他们的记录流。
  • Streams API:允许应用程序充当流处理器,从一个或多个topics中消耗输入流,并将输出流生成为一个或多个输出topics,从而将输入流有效地转换为输出流。
  • Connector API:用于构建和运行将Kafka topics和现有应用或数据系统连接的可重用的produers和consumers。 例如,连接到关系数据库的连接器可能会捕获对表的每个更改。

1.3 相关资料

2. 应用场景

2.1 作为消息系统

传统的消息系统有两个模块: 队列 和 发布-订阅。 在队列中,消费者池从server读取数据,每条记录被池子中的一个消费者消费; 在发布订阅中,记录被广播到所有的消费者。两者均有优缺点。 队列的优点在于它允许你将处理数据的过程分给多个消费者实例,使你可以扩展处理过程。 不好的是,队列不是多订阅者模式的—一旦一个进程读取了数据,数据就会被丢弃。 而发布-订阅系统允许你广播数据到多个进程,但是无法进行扩展处理,因为每条消息都会发送给所有的订阅者。

消费组在Kafka有两层概念。在队列中,消费组允许你将处理过程分发给一系列进程(消费组中的成员)。 在发布订阅中,Kafka允许你将消息广播给多个消费组。

Kafka相比于传统消息队列还具有更严格的顺序保证;

传统队列在服务器上保存有序的记录,如果多个消费者消费队列中的数据, 服务器将按照存储顺序输出记录。 虽然服务器按顺序输出记录,但是记录被异步传递给消费者, 因此记录可能会无序的到达不同的消费者。这意味着在并行消耗的情况下, 记录的顺序是丢失的。因此消息系统通常使用“唯一消费者”的概念,即只让一个进程从队列中消费, 但这就意味着不能够并行地处理数据。

Kafka 设计的更好。topic中的partition是一个并行的概念。 Kafka能够为一个消费者池提供顺序保证和负载平衡,是通过将topic中的partition分配给消费者组中的消费者来实现的, 以便每个分区由消费组中的一个消费者消耗。通过这样,我们能够确保消费者是该分区的唯一读者,并按顺序消费数据。 众多分区保证了多个消费者实例间的负载均衡。但请注意,消费者组中的消费者实例个数不能超过分区的数量。

2.2 作为存储系统

许多消息队列可以发布消息,除了消费消息之外还可以充当中间数据的存储系统。那么Kafka作为一个优秀的存储系统有什么不同呢?

  • 数据写入Kafka后被写到磁盘,并且进行备份以便容错。直到完全备份,Kafka才让生产者认为完成写入,即使写入失败Kafka也会确保继续写入
  • Kafka使用磁盘结构,具有很好的扩展性—50kb和50TB的数据在server上表现一致。
  • 可以存储大量数据,并且可通过客户端控制它读取数据的位置,您可认为Kafka是一种高性能、低延迟、具备日志存储、备份和传播功能的分布式文件系统。

2.3 用做流处理

Kafka 流处理不仅仅用来读写和存储流式数据,它最终的目的是为了能够进行实时的流处理。

在Kafka中,流处理器不断地从输入的topic获取流数据,处理数据后,再不断生产流数据到输出的topic中去。

  • 例如:零售应用程序可能会接收销售和出货的输入流,经过价格调整计算后,再输出一串流式数据。

简单的数据处理可以直接用生产者和消费者的API。对于复杂的数据变换,Kafka提供了Streams API。 Stream API 允许应用做一些复杂的处理,比如将流数据聚合或者join。

  • 这一功能有助于解决以下这种应用程序所面临的问题:处理无序数据,当消费端代码变更后重新处理输入,执行有状态计算等。

Streams API建立在Kafka的核心之上:它使用Producer和Consumer API作为输入,使用Kafka进行有状态的存储, 并在流处理器实例之间使用相同的消费组机制来实现容错。

2.4 批处理

  • 将消息、存储和流处理结合起来,使得Kafka看上去不一般,但这是它作为流平台所备的。
  • 像HDFS这样的分布式文件系统可以存储用于批处理的静态文件。 一个系统如果可以存储和处理历史数据是非常不错的。
  • 传统的企业消息系统允许处理订阅后到达的数据。以这种方式来构建应用程序,并用它来处理即将到达的数据。
  • Kafka结合了上面所说的两种特性。作为一个流应用程序平台或者流数据管道,这两个特性,对于Kafka 来说是至关重要的。
  • 通过组合存储和低延迟订阅,流式应用程序可以以同样的方式处理过去和未来的数据。 一个单一的应用程序可以处理历史记录的数据,并且可以持续不断地处理以后到达的数据,而不是在到达最后一条记录时结束进程。 这是一个广泛的流处理概念,其中包含批处理以及消息驱动应用程序。
  • 同样,作为流数据管道,能够订阅实时事件使得Kafk具有非常低的延迟; 同时Kafka还具有可靠存储数据的特性,可用来存储重要的支付数据, 或者与离线系统进行交互,系统可间歇性地加载数据,也可在停机维护后再次加载数据。流处理功能使得数据可以在到达时转换数据。

3. 安装部署

3.1 下载

下载:

https://kafka.apache.org/downloads

依赖:

  • Zookeeper
  • JDK

下载后解压:

tar -zxf kafka_2.12-2.7.0.tgz -C /opt/

3.2 修改配置文件

修改文件 config/server.properties为以下内容:

broker.id=0     # 唯一ID同一集群下broker.id不能重复
listeners=PLAINTEXT://:9092   # 监听地址
log.dirs=/opt/kafka_2.12-2.7.0/data      # 数据目录
log.retention.hours=168   # kafka数据保留时间单位为hour 默认 168小时即 7天 
log.retention.bytes=1073741824  #(kafka数据量最大值,超出范围自动清理,和 log.retention.hours配合使用,注意其最大值设定不可超磁盘大小)
zookeeper.connect=127.0.0.1:2181  #(zookeeper连接ip及port,多个以逗号分隔)

3.3 运行

启动与停止:

## 启动
bin/kafka-server-start.sh config/server.properties
## 停止
bin/kafka-server-stop.sh
3.3.1 常用命令

1.创建topic:创建一个名为“test”的topic,它有一个分区和一个副本:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

2.查看topic列表

bin/kafka-topics.sh --list --zookeeper localhost:2181

3.发送消息:Kafka自带一个命令行客户端,它从文件或标准输入中获取输入,并将其作为message(消息)发送到Kafka集群。默认情况下,每行将作为单独的message发送。运行 producer,然后在控制台输入一些消息以发送到服务器。

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

4.消费消息:Kafka 还有一个命令行consumer(消费者),将消息转储到标准输出。

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

4. 集群部署

对 Kafka来说,单个实例只是一个大小为一的集群,除了启动更多的实例外没有什么变化。 下面以部署三个节点的集群为例。

4.1 修改配置

承接上面的安装部署已完成单节点后,为另外两个实例创建配置文件

cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties

修改文件 config/server-1.properties 的以下配置项

broker.id=1
listeners=PLAINTEXT://:9093
log.dir=/opt/kafka_2.12-2.7.0/data-2

修改文件 config/server-2.properties 的以下配置项

broker.id=2
listeners=PLAINTEXT://:9094
log.dir=/opt/kafka_2.12-2.7.0/data-3

broker.id属性是集群中每个节点的名称,这一名称是唯一且永久的。我们必须重写端口和日志目录,因为我们在同一台机器上运行这些,我们不希望所有的代理尝试在同一个端口注册,或者覆盖彼此的数据。

4.2 启动

bin/kafka-server-start.sh config/server-1.properties
bin/kafka-server-start.sh config/server-2.properties

4.3 测试

创建一个副本为3的topic

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic test-replicated-topic

查看topic信息

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test-replicated-topic

显示如下内容:

Topic:test-replicated-topic   PartitionCount:1    ReplicationFactor:3 Configs:
Topic: test-replicated-topic  Partition: 0    Leader: 1   Replicas: 1,2,0 Isr: 1,2,0

第一行给出了所有分区的摘要,下面的每行都给出了一个分区的信息。因为我们只有一个分区,所以只有一行。

  • leader :是负责给定分区所有读写操作的节点。每个节点都是随机选择的部分分区的领导者。
  • replicas :是复制分区日志的节点列表,不管这些节点是leader还是仅仅活着。
  • isr: 是一组同步replicas,是replicas列表的子集,它活着并被指到leader。

5. Kafka Connect

Kafka是一个使用越来越广的消息系统,尤其是在大数据开发中(实时数据处理和分析)。为何集成其他系统和解耦应用,经常使用Producer来发送消息到Broker,并使用Consumer来消费Broker中的消息。Kafka Connect是到0.9版本才提供的并极大的简化了其他系统与Kafka的集成。Kafka Connect运用用户快速定义并实现各种Connector(File,Jdbc,Hdfs等),这些功能让大批量数据导入/导出Kafka很方便。

左侧的Sources负责从其他异构系统中读取数据并导入到Kafka中;右侧的Sinks是把Kafka中的数据写入到其他的系统中。

5.1 各种Kafka Connector

如下列表中是常用的开源Connector:

Connectors References
Jdbc Source, Sink
Elastic Search Sink1, Sink2, Sink3
Cassandra Source1, Source 2, Sink1, Sink2
MongoDB Source
HBase Sink
Syslog Source
MQTT (Source) Source
Twitter (Source) Source, Sink
S3 Sink1, Sink2

5.2 测试

使用Kafka ConnectSource(test.txt)转为流数据再写入到Destination(test.sink.txt)中。如下图所示:

1.准备源头数据

echo -e "foo\nbar" > test.txt

2.启动两个连接器

  • 源连接器:用于从输入文件读取行,并将其输入到 Kafka topic。
  • 接收连接器:它从Kafka topic中读取消息,并在输出文件中生成一行。
bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties

上面包含三个配置文件:首先是Kafka Connect的配置文件,包含常用的配置,如Kafka brokers连接方式和数据的序列化格式。 其余的配置文件均指定一个要创建的连接器。这些文件包括连接器的唯一名称,类的实例,以及其他连接器所需的配置。

在启动过程中,你会看到一些日志消息,包括一些连接器正在实例化的指示。 一旦Kafka Connect进程启动,源连接器就开始从test.txt读取行并且 将它们生产到主题connect-test中,同时接收器连接器也开始从主题connect-test中读取消息, 并将它们写入文件test.sink.txt中。

3.查看目标文件

cat test.sink.txt

目录
相关文章
|
5月前
|
消息中间件 Java Kafka
消息队列比较:Spring 微服务中的 Kafka 与 RabbitMQ
本文深入解析了 Kafka 和 RabbitMQ 两大主流消息队列在 Spring 微服务中的应用与对比。内容涵盖消息队列的基本原理、Kafka 与 RabbitMQ 的核心概念、各自优势及典型用例,并结合 Spring 生态的集成方式,帮助开发者根据实际需求选择合适的消息中间件,提升系统解耦、可扩展性与可靠性。
365 1
消息队列比较:Spring 微服务中的 Kafka 与 RabbitMQ
|
11月前
|
Java Maven Android开发
微服务——SpringBoot使用归纳——Spring Boot开发环境搭建和项目启动
本文介绍了Spring Boot开发环境的搭建和项目启动流程。主要内容包括:jdk的配置(IDEA、STS/eclipse设置方法)、Spring Boot工程的构建方式(IDEA快速构建、官方构建工具start.spring.io使用)、maven配置(本地maven路径与阿里云镜像设置)以及编码配置(IDEA和eclipse中的编码设置)。通过这些步骤,帮助开发者顺利完成Spring Boot项目的初始化和运行准备。
982 0
微服务——SpringBoot使用归纳——Spring Boot开发环境搭建和项目启动
|
11月前
|
Java 测试技术 微服务
微服务——SpringBoot使用归纳——Spring Boot中的项目属性配置——少量配置信息的情形
本课主要讲解Spring Boot项目中的属性配置方法。在实际开发中,测试与生产环境的配置往往不同,因此不应将配置信息硬编码在代码中,而应使用配置文件管理,如`application.yml`。例如,在微服务架构下,可通过配置文件设置调用其他服务的地址(如订单服务端口8002),并利用`@Value`注解在代码中读取这些配置值。这种方式使项目更灵活,便于后续修改和维护。
208 0
|
11月前
|
Java 微服务 Spring
微服务——SpringBoot使用归纳——Spring Boot使用slf4j进行日志记录——使用Logger在项目中打印日志
本文介绍了如何在项目中使用Logger打印日志。通过SLF4J和Logback,可设置不同日志级别(如DEBUG、INFO、WARN、ERROR)并支持占位符输出动态信息。示例代码展示了日志在控制器中的应用,说明了日志配置对问题排查的重要性。附课程源码下载链接供实践参考。
1233 0
|
消息中间件 监控 开发工具
微服务(三)-实现自动刷新配置(不重启项目情况下)
微服务(三)-实现自动刷新配置(不重启项目情况下)
|
11月前
|
Java 数据库 微服务
微服务——SpringBoot使用归纳——Spring Boot中的项目属性配置——指定项目配置文件
在实际项目中,开发环境和生产环境的配置往往不同。为简化配置切换,可通过创建 `application-dev.yml` 和 `application-pro.yml` 分别管理开发与生产环境配置,如设置不同端口(8001/8002)。在 `application.yml` 中使用 `spring.profiles.active` 指定加载的配置文件,实现环境快速切换。本节还介绍了通过配置类读取参数的方法,适用于微服务场景,提升代码可维护性。课程源码可从 [Gitee](https://gitee.com/eson15/springboot_study) 下载。
462 0
|
11月前
|
Java 微服务 Spring
微服务——SpringBoot使用归纳——Spring Boot中的项目属性配置——少量配置信息的情形
在微服务架构中,随着业务复杂度增加,项目可能需要调用多个微服务。为避免使用`@Value`注解逐一引入配置的繁琐,可通过定义配置类(如`MicroServiceUrl`)并结合`@ConfigurationProperties`注解实现批量管理。此方法需在配置文件中设置微服务地址(如订单、用户、购物车服务),并通过`@Component`将配置类纳入Spring容器。最后,在Controller中通过`@Resource`注入配置类即可便捷使用,提升代码可维护性。
225 0
|
消息中间件 人工智能 Kafka
微服务数据问题之MetaQ和Kafka在选择读写技术时考虑因素如何解决
微服务数据问题之MetaQ和Kafka在选择读写技术时考虑因素如何解决
140 0
|
消息中间件 存储 缓存
微服务数据问题之Kafka的默认复制配置如何解决
微服务数据问题之Kafka的默认复制配置如何解决
163 0
|
消息中间件 存储 缓存
kafka 的数据是放在磁盘上还是内存上,为什么速度会快?
Kafka的数据存储机制通过将数据同时写入磁盘和内存,确保高吞吐量与持久性。其日志文件按主题和分区组织,使用预写日志(WAL)保证数据持久性,并借助操作系统的页缓存加速读取。Kafka采用顺序I/O、零拷贝技术和批量处理优化性能,支持分区分段以实现并行处理。示例代码展示了如何使用KafkaProducer发送消息。

热门文章

最新文章