kafka的介绍
Kafka是一个分布式的、可分区的、可复制的消息系统。它提供了普通消息系统的功能,但具有自己独特的设计。它可以像消息系统一样读写数据流,并且可以在实时业务的场景中写可靠的流处理应用,并且能安全地存储数据流到分布式、多副本、容错的集群中。所以通俗点理解,可以说Kafka就是一个消息中间件。
kafka的四个核心API
1.ProducerAPI:允许一个应用向一个或多个topic里发布记录流;
2.ConsumerAPI:允许一个应用订阅一个或多个topics,处理topic里的数据流,就相当于消费;
3.StreamAPI:允许应用扮演流处理的作用,从一个或多个topic里消费数据流,然后产生输出流数据到其他一个或多个topic里,对输入流数据有效传输到输出口;
4.ConnectorAPI:允许运行和构建一个可重复利用的生产者和消费者,能将kafka的topic与其他存在的应用和数据库设备相连接,比如链接一个实时数据库,可以捕捉到每张表的变化。
kafka的基本的消息系统用语
- Kafka将消息以topic为单位进行归纳。
- 将向Kafka topic发布消息的程序成为producers.
- 将预订topics并消费消息的程序成为consumer.
- Kafka以集群的方式运行,可以由一个或多个服务组成,每个服务叫做一个broker.
kafka的使用
1.启动zookeeper
2.启动kafka (博主上篇文章是关于kafka的配置启动,我的是Windows操作系统)
3.创建 topic
创建一个叫做“test”的topic,它只有一个分区,一个副本。
注意:windows版本的kafka要进入/bin/windows
文件夹下
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
可以通过list命令查看创建的topic:
kafka-topics.bat --list --zookeeper localhost:2181
4.发送消息
kafka-console-producer.bat --broker-list localhost:9092 --topic test
然后输入消息
按Ctrl+C可以结束消息发送
5.消费消息
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
其他版本的消费消息写法
kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
在windows上使用会出现zookeeper is not a recognized option
这个错误
单个topic,多个broker的负载配置
1.首先创建3份server.properties文件
config/server-1.properties:
broker.id=1
port=9093
log.dir=/tmp/kafka-logs-1
config/server-2.properties:
broker.id=2
port=9094
log.dir=/tmp/kafka-logs-2
注意配置文件中没有port这一项配置
port=9093
刚才已经启动可Zookeeper和一个节点,现在启动另外两个节点:
.\bin\windows\kafka-server-start.bat .\config\server-1.properties
.\bin\windows\kafka-server-start.bat .\config\server-2.properties
创建一个拥有3个副本的topic:
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
查看topics的集群节点信息
kafka-topics.bat --describe --zookeeper localhost:2181 --topic my-replicated-topic
下面解释一下这些输出。第一行是对所有分区的一个描述,然后每个分区都会对应一行,因为我们只有一个分区所以下面就只加了一行。
leader:负责处理消息的读和写,leader是从所有节点中随机选择的.
replicas:列出了所有的副本节点,不管节点是否在服务中.
isr:是正在服务中的节点.
向topic发送消息:
kafka-console-producer.bat --broker-list localhost:9092 --topic my-replicated-topic
消费这些消息
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic my-replicated-topic --from-beginning
测试一下容错能力.Broker 2作为leader运行,现在我们关掉它:
我的是 broker 2
然后关闭server-2.properties的cmd
结果变成了0
虽然最初负责续写消息的leader down掉了,但之前的消息还是可以消费的:
kafka-console-consumer.bat --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
看来kafka的容错机制还是很不错的!!