Kafka入门,这一篇就够了(安装,topic,生产者,消费者)

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
注册配置 MSE Nacos/ZooKeeper,118元/月
云原生网关 MSE Higress,422元/月
简介: Kafka入门,这一篇就够了(安装,topic,生产者,消费者)

Kafka的安装

前提,已安装docker和docker-compose。

拉取镜像

docker pull bitnami/zookeeper:latest
docker pull bitnami/kafka:latest

docker-compose.yaml如下

version: '3'
services:
  zookeeper:
    image: 'bitnami/zookeeper:latest'
    ports:
      - '2181:2181'
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
  kafka:
    image: 'bitnami/kafka:latest'
    ports:
      - '9092:9092'
    environment:
      - KAFKA_BROKER_ID=1
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
      - KAFKA_CFG_ADVERTISED_LISTENER=PLAINTEXT://127.0.0.1:9092
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
    depends_on:
      - zookeeper

启动命令

docker-compose up -d

截图

之后的相关命令若涉及容器id,请自行更换

文件与配置

目录

docker exec -it a0 ls /opt/bitnami/kafka

查看目录命令

截图

重要目录解释如下:

  • bin: 脚本目录
  • config:配置目录
  • libs:第三方依赖库目录
  • logs:日志

bin

重要的shell脚本加粗了,之后会用

connect-distributed.sh kafka-dump-log.sh kafka-storage.sh

connect-mirror-maker.sh kafka-features.sh kafka-streams-application-reset.sh

connect-standalone.sh kafka-get-offsets.sh kafka-topics.sh

kafka-acls.sh kafka-leader-election.sh kafka-transactions.sh

kafka-broker-api-versions.sh kafka-log-dirs.sh kafka-verifiable-consumer.sh

kafka-cluster.sh kafka-metadata-shell.sh kafka-verifiable-producer.sh

kafka-configs.sh kafka-mirror-maker.sh trogdor.sh

kafka-console-consumer.sh kafka-producer-perf-test.sh windows

kafka-console-producer.sh kafka-reassign-partitions.sh zookeeper-security-migration.sh

kafka-consumer-groups.sh kafka-replica-verification.sh zookeeper-server-start.sh

kafka-consumer-perf-test.sh kafka-run-class.sh zookeeper-server-stop.sh

kafka-delegation-tokens.sh kafka-server-start.sh zookeeper-shell.sh

kafka-delete-records.sh kafka-server-stop.sh

config

connect-console-sink.properties connect-mirror-maker.properties server.properties

connect-console-source.properties connect-standalone.properties tools-log4j.properties

connect-distributed.properties consumer.properties trogdor.conf

connect-file-sink.properties kraft zookeeper.properties

connect-file-source.properties log4j.properties

connect-log4j.properties producer.properties

配置文件

server.properties

  • broker.id: 唯一id值,通过环境变量设置为了1
  • log.dirs: kafka集群日志目录,默认是log.dirs=/bitnami/kafka/data
  • zookeeper.connect:zookeeper地址端口,格式域名/ip:port,这块是zookeeper:2181,在docker的网络中可以解析为另一容器的ip

更多配置可以查看参考中Dockerhub链接的Configuration部分

producer.properties

  • bootstrap.servers:kafka的ip:port,这里是localhost:9092
  • compression.type:压缩类型,默认是none, 一共有四种,none, gzip, snappy, lz4, zstd,推荐排序LZ4 > GZIP > Snappy,详见腾讯云压缩算法对比

consumer.properties

  • group.id:消费者组id,默认为test-consumer-group
  • auto.offset.reset:offset设置,三种latest, earliest, none,看情况设置

命令行简单使用

kafka-topics.sh

对主题topic进行增删改查的工具

常用选项如下:

  • --bootstrap-server:kafka服务器ip:port,必须的
  • --create:创建主题
  • --delete:删除主题
  • --describe:描述主题
  • --list:查看主题列表
  • --alter:修改主题的 partitions等
  • --topic :主题名
  • --topic-id :主题id
  • --partitions :主题的partition

新增

命令

docker exec -it a0 /opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server=127.0.0.1:9092 --create --topic lady_killer9

截图

查看列表

命令

docker exec -it a0 /opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server=127.0.0.1:9092 --list

截图

查看详情

命令

docker exec -it a0 /opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server=127.0.0.1:9092 --describe --topic lady_killer9

截图

修改

命令

以修改主题partiion数量为例

docker exec -it a0 /opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server=127.0.0.1:9092 --alter --topic lady_killer9 --partitions 3

截图

删除

命令

docker exec -it a0 /opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server=127.0.0.1:9092 --delete --topic lady_killer9

截图

kafka-console-producer.sh

标准输入读数据,发送到Kafka的工具

常用选项如下:

  • --bootstrap-server:kafka服务器ip:port,必须的
  • --topic  :Kafka主题,必须的
  • --sync:同步发送
  • --compression-codec [String: compression-codec] :压缩方式,‘none’,‘gzip’, ‘snappy’, ‘lz4’, , ‘zstd’,默认gzip.

命令

docker exec -it a0 /opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server=127.0.0.1:9092 --create --topic demo
docker exec -it a0 /opt/bitnami/kafka/bin/kafka-console-producer.sh  --bootstrap-server=127.0.0.1:9092 --topic demo

截图

kafka-console-consumer.sh

常用选项如下:

  • --bootstrap-server:kafka服务器ip:port,必须的
  • --topic  :Kafka主题,必须的
  • --group :消费者组id
  • --key-deserializer :key反序列化,默认是org.apache.kafka.common.serialization.StringDeserializer
  • --value-deserializer :value反序列化,默认是org.apache.kafka.common.serialization.StringDeserializer
  • --offset :消费的offset
  • --partition :消费的分区

命令

docker exec -it a0 /opt/bitnami/kafka/bin/kafka-console-consumer.sh  --bootstrap-server=127.0.0.1:9092 --topic demo

截图

命令

docker exec -it a0 /opt/bitnami/kafka/bin/kafka-console-consumer.sh  --bootstrap-server=127.0.0.1:9092 --topic demo --partition 0 --offset 2

截图

上手之后我们再来了解一些概念。

概念

集群

已发布的消息保存在一组服务器中,称为Kafka集群。

代理broker

集群中的每一个服务器都是一个代理。

主题topic

每条发布到kafka集群的消息都有一个主题,这个主题被称为topic。每个topic都由一个或者多个分区构成。

分区partition

topic的partition数量可以在创建时配置,partition数量决定了每个消费者组中并发消费者的最大数量

分区的原则:

  • 生产者指定了partition,则直接使用
  • 未指定partition但指定了key,通过对key的value进行hash出一个partition
  • partition和key都未指定,使用轮询选出一个partition

偏移量offset

任何发布到partition的消息都会被直接追加到partition尾部,每条消息的位置称为offset,offset是一个long型数字,它唯一标记一条消息。消费者可以通过(topic、partition、offset)跟踪记录。

生产者producer

push消息到topc的叫生产者,push后可以获得offset。生产者可以指定partition,但不建议这么做。

消费者组consumer group

包含多个消费者,有一个 group id,可以订阅topic进行消费。消费偏移以消费者组为单位。

消费者consumer

从topic中pull数据,可以指定partition和offset。

FAQ

如何保证一个主题下的数据,一定是有序的(生产与消费的顺序一致)?

Kafka每个partition中的消息在写入是都是有序的,消费时,每个partition只能被每一个group中的消费者消费,因此,topic下只有一个partition时一定有序。

如何设置分区和消费者数?

建议分区数与消费者数一致,防止消费不过来。

参考

dockerhub-bitnami/kafka

腾讯云CKafka 压缩算法对比

python-kafka客户端封装

相关文章
|
3月前
|
消息中间件 Ubuntu Java
在Ubuntu 18.04上安装Apache Kafka的方法
在Ubuntu 18.04上安装Apache Kafka的方法
180 0
|
18天前
|
消息中间件 存储 负载均衡
Apache Kafka核心概念解析:生产者、消费者与Broker
【10月更文挑战第24天】在数字化转型的大潮中,数据的实时处理能力成为了企业竞争力的重要组成部分。Apache Kafka 作为一款高性能的消息队列系统,在这一领域占据了重要地位。通过使用 Kafka,企业可以构建出高效的数据管道,实现数据的快速传输和处理。今天,我将从个人的角度出发,深入解析 Kafka 的三大核心组件——生产者、消费者与 Broker,希望能够帮助大家建立起对 Kafka 内部机制的基本理解。
49 2
|
3月前
|
消息中间件 负载均衡 大数据
揭秘Kafka背后的秘密!再均衡如何上演一场消费者组的‘权力游戏’,让消息处理秒变高能剧情?
【8月更文挑战第24天】Kafka是一款在大数据处理领域备受推崇的产品,以其出色的性能和可扩展性著称。本文通过一个具体案例介绍其核心机制之一——再均衡(Rebalancing)。案例中,“user_activity”主题下10个分区被3个消费者均衡消费。当新消费者加入或原有消费者离开时,Kafka将自动触发再均衡过程,确保所有消费者能有效处理分配给它们的分区。
136 62
|
1月前
|
消息中间件 存储 分布式计算
大数据-53 Kafka 基本架构核心概念 Producer Consumer Broker Topic Partition Offset 基础概念了解
大数据-53 Kafka 基本架构核心概念 Producer Consumer Broker Topic Partition Offset 基础概念了解
61 4
|
3月前
|
消息中间件 Kafka API
【Kafka消费新风潮】告别复杂,迎接简洁之美——深度解析Kafka新旧消费者API大比拼!
【8月更文挑战第24天】Apache Kafka作为一个领先的分布式流处理平台,广泛用于实时数据管道和流式应用的构建。随着其发展,消费者API经历了重大更新。旧消费者API(包括“低级”和“高级”API)虽提供灵活性但在消息顺序处理上存在挑战。2017年引入的新消费者API简化了接口,自动管理偏移量,支持更强大的消费组功能,显著降低了开发复杂度。通过对比新旧消费者API的代码示例可以看出,新API极大提高了开发效率和系统可维护性。
130 58
|
16天前
|
消息中间件 Ubuntu Java
Ubuntu系统上安装Apache Kafka
Ubuntu系统上安装Apache Kafka
|
1月前
|
消息中间件 SQL 分布式计算
大数据-76 Kafka 高级特性 稳定性-消费重复 生产者、Broker、消费者 导致的重复消费问题
大数据-76 Kafka 高级特性 稳定性-消费重复 生产者、Broker、消费者 导致的重复消费问题
32 1
|
2月前
|
消息中间件 Kafka
消费kafka不需要设置 压缩协议吗 假如生产者压缩协议是lz4
消费kafka不需要设置 压缩协议吗 假如生产者压缩协议是lz4
|
2月前
|
消息中间件 Kafka Apache
kafka: invalid configuration (That topic/partition is already being consumed)
kafka: invalid configuration (That topic/partition is already being consumed)
|
2月前
|
消息中间件 Java Linux
linux 之centos7安装kafka;;;;;待补充,未完成
linux 之centos7安装kafka;;;;;待补充,未完成