Kafka的安装
前提,已安装docker和docker-compose。
拉取镜像
docker pull bitnami/zookeeper:latest docker pull bitnami/kafka:latest
docker-compose.yaml如下
version: '3' services: zookeeper: image: 'bitnami/zookeeper:latest' ports: - '2181:2181' environment: - ALLOW_ANONYMOUS_LOGIN=yes kafka: image: 'bitnami/kafka:latest' ports: - '9092:9092' environment: - KAFKA_BROKER_ID=1 - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092 - KAFKA_CFG_ADVERTISED_LISTENER=PLAINTEXT://127.0.0.1:9092 - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 - ALLOW_PLAINTEXT_LISTENER=yes depends_on: - zookeeper
启动命令
docker-compose up -d
截图
之后的相关命令若涉及容器id,请自行更换
文件与配置
目录
docker exec -it a0 ls /opt/bitnami/kafka
查看目录命令
截图
重要目录解释如下:
- bin: 脚本目录
- config:配置目录
- libs:第三方依赖库目录
- logs:日志
bin
重要的shell脚本加粗了,之后会用
connect-distributed.sh kafka-dump-log.sh kafka-storage.sh
connect-mirror-maker.sh kafka-features.sh kafka-streams-application-reset.sh
connect-standalone.sh kafka-get-offsets.sh kafka-topics.sh
kafka-acls.sh kafka-leader-election.sh kafka-transactions.sh
kafka-broker-api-versions.sh kafka-log-dirs.sh kafka-verifiable-consumer.sh
kafka-cluster.sh kafka-metadata-shell.sh kafka-verifiable-producer.sh
kafka-configs.sh kafka-mirror-maker.sh trogdor.sh
kafka-console-consumer.sh kafka-producer-perf-test.sh windows
kafka-console-producer.sh kafka-reassign-partitions.sh zookeeper-security-migration.sh
kafka-consumer-groups.sh kafka-replica-verification.sh zookeeper-server-start.sh
kafka-consumer-perf-test.sh kafka-run-class.sh zookeeper-server-stop.sh
kafka-delegation-tokens.sh kafka-server-start.sh zookeeper-shell.sh
kafka-delete-records.sh kafka-server-stop.sh
config
connect-console-sink.properties connect-mirror-maker.properties server.properties
connect-console-source.properties connect-standalone.properties tools-log4j.properties
connect-distributed.properties consumer.properties trogdor.conf
connect-file-sink.properties kraft zookeeper.properties
connect-file-source.properties log4j.properties
connect-log4j.properties producer.properties
配置文件
server.properties
- broker.id: 唯一id值,通过环境变量设置为了1
- log.dirs: kafka集群日志目录,默认是log.dirs=/bitnami/kafka/data
- zookeeper.connect:zookeeper地址端口,格式域名/ip:port,这块是zookeeper:2181,在docker的网络中可以解析为另一容器的ip
更多配置可以查看参考中Dockerhub链接的Configuration部分
producer.properties
- bootstrap.servers:kafka的ip:port,这里是localhost:9092
- compression.type:压缩类型,默认是none, 一共有四种,none, gzip, snappy, lz4, zstd,推荐排序LZ4 > GZIP > Snappy,详见腾讯云压缩算法对比
consumer.properties
- group.id:消费者组id,默认为test-consumer-group
- auto.offset.reset:offset设置,三种latest, earliest, none,看情况设置
命令行简单使用
kafka-topics.sh
对主题topic进行增删改查的工具
常用选项如下:
- --bootstrap-server:kafka服务器ip:port,必须的
- --create:创建主题
- --delete:删除主题
- --describe:描述主题
- --list:查看主题列表
- --alter:修改主题的 partitions等
- --topic :主题名
- --topic-id :主题id
- --partitions :主题的partition
新增
命令
docker exec -it a0 /opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server=127.0.0.1:9092 --create --topic lady_killer9
截图
查看列表
命令
docker exec -it a0 /opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server=127.0.0.1:9092 --list
截图
查看详情
命令
docker exec -it a0 /opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server=127.0.0.1:9092 --describe --topic lady_killer9
截图
修改
命令
以修改主题partiion数量为例
docker exec -it a0 /opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server=127.0.0.1:9092 --alter --topic lady_killer9 --partitions 3
截图
删除
命令
docker exec -it a0 /opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server=127.0.0.1:9092 --delete --topic lady_killer9
截图
kafka-console-producer.sh
标准输入读数据,发送到Kafka的工具
常用选项如下:
- --bootstrap-server:kafka服务器ip:port,必须的
- --topic :Kafka主题,必须的
- --sync:同步发送
- --compression-codec [String: compression-codec] :压缩方式,‘none’,‘gzip’, ‘snappy’, ‘lz4’, , ‘zstd’,默认gzip.
命令
docker exec -it a0 /opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server=127.0.0.1:9092 --create --topic demo docker exec -it a0 /opt/bitnami/kafka/bin/kafka-console-producer.sh --bootstrap-server=127.0.0.1:9092 --topic demo
截图
kafka-console-consumer.sh
常用选项如下:
- --bootstrap-server:kafka服务器ip:port,必须的
- --topic :Kafka主题,必须的
- --group :消费者组id
- --key-deserializer :key反序列化,默认是org.apache.kafka.common.serialization.StringDeserializer
- --value-deserializer :value反序列化,默认是org.apache.kafka.common.serialization.StringDeserializer
- --offset :消费的offset
- --partition :消费的分区
命令
docker exec -it a0 /opt/bitnami/kafka/bin/kafka-console-consumer.sh --bootstrap-server=127.0.0.1:9092 --topic demo
截图
命令
docker exec -it a0 /opt/bitnami/kafka/bin/kafka-console-consumer.sh --bootstrap-server=127.0.0.1:9092 --topic demo --partition 0 --offset 2
截图
上手之后我们再来了解一些概念。
概念
集群
已发布的消息保存在一组服务器中,称为Kafka集群。
代理broker
集群中的每一个服务器都是一个代理。
主题topic
每条发布到kafka集群的消息都有一个主题,这个主题被称为topic。每个topic都由一个或者多个分区构成。
分区partition
topic的partition数量可以在创建时配置,partition数量决定了每个消费者组中并发消费者的最大数量。
分区的原则:
- 生产者指定了partition,则直接使用
- 未指定partition但指定了key,通过对key的value进行hash出一个partition
- partition和key都未指定,使用轮询选出一个partition
偏移量offset
任何发布到partition的消息都会被直接追加到partition尾部,每条消息的位置称为offset,offset是一个long型数字,它唯一标记一条消息。消费者可以通过(topic、partition、offset)跟踪记录。
生产者producer
push消息到topc的叫生产者,push后可以获得offset。生产者可以指定partition,但不建议这么做。
消费者组consumer group
包含多个消费者,有一个 group id,可以订阅topic进行消费。消费偏移以消费者组为单位。
消费者consumer
从topic中pull数据,可以指定partition和offset。
FAQ
如何保证一个主题下的数据,一定是有序的(生产与消费的顺序一致)?
Kafka每个partition中的消息在写入是都是有序的,消费时,每个partition只能被每一个group中的消费者消费,因此,topic下只有一个partition时一定有序。
如何设置分区和消费者数?
建议分区数与消费者数一致,防止消费不过来。