1. Kafka概述
Kafka
是最初由Linkedin
公司开发,是一个分布式、分区的、多副本的、多订阅者,基于zookeeper
协调的分布式日志系统(也可以当做MQ
系统),常见可以用于web/nginx
日志、访问日志,消息服务等等,Linkedin
于2010
年贡献给了Apache
基金会并成为顶级开源项目。
Kafka
主要设计目标如下:
- 以时间复杂度为
O(1)
的方式提供消息持久化能力,即使对TB
级以上数据也能保证常数时间的访问性能。 - 高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒
100K
条消息的传输。 - 支持
Kafka Server
间的消息分区,及分布式消费,同时保证每个partition
内的消息顺序传输。 - 同时支持离线数据处理和实时数据处理。
Scale out
:支持在线水平扩展
上图中一个topic
配置了3个partition
。Partition1
有两个offset
:0和1。Partition2
有4个offset
。Partition3
有1个offset
。副本的id
和副本所在的机器的id
恰好相同。
如果一个topic
的副本数为3,那么Kafka
将在集群中为每个partition
创建3个相同的副本。集群中的每个broker
存储一个或多个partition
。多个producer
和consumer
可同时生产和消费数据。
1.1 角色介绍
Broker:
- Kafka 集群包含一个或多个服务器,服务器节点称为broker。
- broker存储topic的数据。如果某topic有N个partition,集群有N个broker,那么每个broker存储该topic的一个partition。
- 如果某topic有N个partition,集群有(N+M)个broker,那么其中有N个broker存储该topic的一个partition,剩下的M个broker不存储该topic的partition数据。
- 如果某topic有N个partition,集群中broker数目少于N个,那么一个broker存储该topic的一个或多个partition。在实际生产环境中,尽量避免这种情况的发生,这种情况容易导致Kafka集群数据不均衡。
Topic:
- 每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)
类似于数据库的表名
Partition:
- topic中的数据分割为一个或多个partition。每个topic至少有一个partition。每个partition中的数据使用多个segment文件存储。partition中的数据是有序的,不同partition间的数据丢失了数据的顺序。如果topic有多个partition,消费数据时就不能保证数据的顺序。在需要严格保证消息的消费顺序的场景下,需要将partition数目设为1。
Producer:
- 生产者即数据的发布者,该角色将消息发布到Kafka的topic中。broker接收到生产者发送的消息后,broker将该消息追加到当前用于追加数据的segment文件中。生产者发送的消息,存储到一个partition中,生产者也可以指定数据存储的partition。
Consumer:
- 消费者可以从broker中读取数据。消费者可以消费多个topic中的数据。
Consumer Group
- 每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。
Leader:
- 每个partition有多个副本,其中有且仅有一个作为Leader,Leader是当前负责数据的读写的partition。
Follower:
- Follower跟随Leader,所有写请求都通过Leader路由,数据变更会广播给所有Follower,Follower与Leader保持数据同步。如果Leader失效,则从Follower中选举出一个新的Leader。当Follower与Leader挂掉、卡住或者同步太慢,leader会把这个follower从“in sync replicas”(ISR)列表中删除,重新创建一个Follower。
1.2 四个核心API
在Kafka中,客户端和服务器之间的通信是通过一种简单的,高性能的,语言不可知的TCP协议完成的。 该协议是版本控制的,并保持与旧版本的向后兼容性。 我们为Kafka提供Java客户端,但客户端可以使用多种语言。
- Producer API:允许应用程序将记录流发布到一个或多个Kafka topics。
- Consumer API:允许应用程序订阅一个或多个topics并处理生成给他们的记录流。
- Streams API:允许应用程序充当流处理器,从一个或多个topics中消耗输入流,并将输出流生成为一个或多个输出topics,从而将输入流有效地转换为输出流。
- Connector API:用于构建和运行将Kafka topics和现有应用或数据系统连接的可重用的produers和consumers。 例如,连接到关系数据库的连接器可能会捕获对表的每个更改。
1.3 相关资料
2. 应用场景
2.1 作为消息系统
传统的消息系统有两个模块: 队列 和 发布-订阅。 在队列中,消费者池从server读取数据,每条记录被池子中的一个消费者消费; 在发布订阅中,记录被广播到所有的消费者。两者均有优缺点。 队列的优点在于它允许你将处理数据的过程分给多个消费者实例,使你可以扩展处理过程。 不好的是,队列不是多订阅者模式的—一旦一个进程读取了数据,数据就会被丢弃。 而发布-订阅系统允许你广播数据到多个进程,但是无法进行扩展处理,因为每条消息都会发送给所有的订阅者。
消费组在Kafka有两层概念。在队列中,消费组允许你将处理过程分发给一系列进程(消费组中的成员)。 在发布订阅中,Kafka允许你将消息广播给多个消费组。
Kafka相比于传统消息队列还具有更严格的顺序保证;
传统队列在服务器上保存有序的记录,如果多个消费者消费队列中的数据, 服务器将按照存储顺序输出记录。 虽然服务器按顺序输出记录,但是记录被异步传递给消费者, 因此记录可能会无序的到达不同的消费者。这意味着在并行消耗的情况下, 记录的顺序是丢失的。因此消息系统通常使用“唯一消费者”的概念,即只让一个进程从队列中消费, 但这就意味着不能够并行地处理数据。
Kafka 设计的更好。topic中的partition是一个并行的概念。 Kafka能够为一个消费者池提供顺序保证和负载平衡,是通过将topic中的partition分配给消费者组中的消费者来实现的, 以便每个分区由消费组中的一个消费者消耗。通过这样,我们能够确保消费者是该分区的唯一读者,并按顺序消费数据。 众多分区保证了多个消费者实例间的负载均衡。但请注意,消费者组中的消费者实例个数不能超过分区的数量。
2.2 作为存储系统
许多消息队列可以发布消息,除了消费消息之外还可以充当中间数据的存储系统。那么Kafka作为一个优秀的存储系统有什么不同呢?
- 数据写入Kafka后被写到磁盘,并且进行备份以便容错。直到完全备份,Kafka才让生产者认为完成写入,即使写入失败Kafka也会确保继续写入
- Kafka使用磁盘结构,具有很好的扩展性—50kb和50TB的数据在server上表现一致。
- 可以存储大量数据,并且可通过客户端控制它读取数据的位置,您可认为Kafka是一种高性能、低延迟、具备日志存储、备份和传播功能的分布式文件系统。
2.3 用做流处理
Kafka 流处理不仅仅用来读写和存储流式数据,它最终的目的是为了能够进行实时的流处理。
在Kafka中,流处理器不断地从输入的topic获取流数据,处理数据后,再不断生产流数据到输出的topic中去。
- 例如:零售应用程序可能会接收销售和出货的输入流,经过价格调整计算后,再输出一串流式数据。
简单的数据处理可以直接用生产者和消费者的API。对于复杂的数据变换,Kafka提供了Streams API。 Stream API 允许应用做一些复杂的处理,比如将流数据聚合或者join。
- 这一功能有助于解决以下这种应用程序所面临的问题:处理无序数据,当消费端代码变更后重新处理输入,执行有状态计算等。
Streams API建立在Kafka的核心之上:它使用Producer和Consumer API作为输入,使用Kafka进行有状态的存储, 并在流处理器实例之间使用相同的消费组机制来实现容错。
2.4 批处理
- 将消息、存储和流处理结合起来,使得Kafka看上去不一般,但这是它作为流平台所备的。
- 像HDFS这样的分布式文件系统可以存储用于批处理的静态文件。 一个系统如果可以存储和处理历史数据是非常不错的。
- 传统的企业消息系统允许处理订阅后到达的数据。以这种方式来构建应用程序,并用它来处理即将到达的数据。
- Kafka结合了上面所说的两种特性。作为一个流应用程序平台或者流数据管道,这两个特性,对于Kafka 来说是至关重要的。
- 通过组合存储和低延迟订阅,流式应用程序可以以同样的方式处理过去和未来的数据。 一个单一的应用程序可以处理历史记录的数据,并且可以持续不断地处理以后到达的数据,而不是在到达最后一条记录时结束进程。 这是一个广泛的流处理概念,其中包含批处理以及消息驱动应用程序。
- 同样,作为流数据管道,能够订阅实时事件使得Kafk具有非常低的延迟; 同时Kafka还具有可靠存储数据的特性,可用来存储重要的支付数据, 或者与离线系统进行交互,系统可间歇性地加载数据,也可在停机维护后再次加载数据。流处理功能使得数据可以在到达时转换数据。
3. 安装部署
3.1 下载
下载:
https://kafka.apache.org/downloads
依赖:
- Zookeeper
- JDK
下载后解压:
tar -zxf kafka_2.12-2.7.0.tgz -C /opt/
3.2 修改配置文件
修改文件 config/server.properties
为以下内容:
broker.id=0 # 唯一ID同一集群下broker.id不能重复 listeners=PLAINTEXT://:9092 # 监听地址 log.dirs=/opt/kafka_2.12-2.7.0/data # 数据目录 log.retention.hours=168 # kafka数据保留时间单位为hour 默认 168小时即 7天 log.retention.bytes=1073741824 #(kafka数据量最大值,超出范围自动清理,和 log.retention.hours配合使用,注意其最大值设定不可超磁盘大小) zookeeper.connect=127.0.0.1:2181 #(zookeeper连接ip及port,多个以逗号分隔)
3.3 运行
启动与停止:
## 启动 bin/kafka-server-start.sh config/server.properties ## 停止 bin/kafka-server-stop.sh
3.3.1 常用命令
1.创建topic:创建一个名为“test”的topic,它有一个分区和一个副本:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
2.查看topic列表
bin/kafka-topics.sh --list --zookeeper localhost:2181
3.发送消息:Kafka自带一个命令行客户端,它从文件或标准输入中获取输入,并将其作为message(消息)发送到Kafka集群。默认情况下,每行将作为单独的message发送。运行 producer,然后在控制台输入一些消息以发送到服务器。
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
4.消费消息:Kafka 还有一个命令行consumer(消费者),将消息转储到标准输出。
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
4. 集群部署
对 Kafka来说,单个实例只是一个大小为一的集群,除了启动更多的实例外没有什么变化。 下面以部署三个节点的集群为例。
4.1 修改配置
承接上面的安装部署
已完成单节点后,为另外两个实例创建配置文件
cp config/server.properties config/server-1.properties cp config/server.properties config/server-2.properties
修改文件 config/server-1.properties
的以下配置项
broker.id=1 listeners=PLAINTEXT://:9093 log.dir=/opt/kafka_2.12-2.7.0/data-2
修改文件 config/server-2.properties
的以下配置项
broker.id=2 listeners=PLAINTEXT://:9094 log.dir=/opt/kafka_2.12-2.7.0/data-3
broker.id
属性是集群中每个节点的名称,这一名称是唯一且永久的。我们必须重写端口和日志目录,因为我们在同一台机器上运行这些,我们不希望所有的代理尝试在同一个端口注册,或者覆盖彼此的数据。
4.2 启动
bin/kafka-server-start.sh config/server-1.properties bin/kafka-server-start.sh config/server-2.properties
4.3 测试
创建一个副本为3的topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic test-replicated-topic
查看topic信息
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test-replicated-topic
显示如下内容:
Topic:test-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs: Topic: test-replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
第一行给出了所有分区的摘要,下面的每行都给出了一个分区的信息。因为我们只有一个分区,所以只有一行。
- leader :是负责给定分区所有读写操作的节点。每个节点都是随机选择的部分分区的领导者。
- replicas :是复制分区日志的节点列表,不管这些节点是leader还是仅仅活着。
- isr: 是一组同步replicas,是replicas列表的子集,它活着并被指到leader。
5. Kafka Connect
Kafka是一个使用越来越广的消息系统,尤其是在大数据开发中(实时数据处理和分析)。为何集成其他系统和解耦应用,经常使用Producer来发送消息到Broker,并使用Consumer来消费Broker中的消息。Kafka Connect是到0.9版本才提供的并极大的简化了其他系统与Kafka的集成。Kafka Connect运用用户快速定义并实现各种Connector(File,Jdbc,Hdfs等),这些功能让大批量数据导入/导出Kafka很方便。
左侧的Sources负责从其他异构系统中读取数据并导入到Kafka中;右侧的Sinks是把Kafka中的数据写入到其他的系统中。
5.1 各种Kafka Connector
如下列表中是常用的开源Connector:
Connectors | References |
Jdbc | Source, Sink |
Elastic Search | Sink1, Sink2, Sink3 |
Cassandra | Source1, Source 2, Sink1, Sink2 |
MongoDB | Source |
HBase | Sink |
Syslog | Source |
MQTT (Source) | Source |
Twitter (Source) | Source, Sink |
S3 | Sink1, Sink2 |
5.2 测试
使用Kafka Connect
把Source(test.txt)
转为流数据再写入到Destination(test.sink.txt)
中。如下图所示:
1.准备源头数据
echo -e "foo\nbar" > test.txt
2.启动两个连接器
- 源连接器:用于从输入文件读取行,并将其输入到 Kafka topic。
- 接收连接器:它从Kafka topic中读取消息,并在输出文件中生成一行。
bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
上面包含三个配置文件:首先是Kafka Connect的配置文件,包含常用的配置,如Kafka brokers连接方式和数据的序列化格式。 其余的配置文件均指定一个要创建的连接器。这些文件包括连接器的唯一名称,类的实例,以及其他连接器所需的配置。
在启动过程中,你会看到一些日志消息,包括一些连接器正在实例化的指示。 一旦Kafka Connect进程启动,源连接器就开始从test.txt读取行并且 将它们生产到主题connect-test中,同时接收器连接器也开始从主题connect-test中读取消息, 并将它们写入文件test.sink.txt中。
3.查看目标文件
cat test.sink.txt