Kafka入门,这一篇就够了(安装,topic,生产者,消费者)

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: Kafka入门,这一篇就够了(安装,topic,生产者,消费者)

Kafka的安装

前提,已安装docker和docker-compose。

拉取镜像

docker pull bitnami/zookeeper:latest
docker pull bitnami/kafka:latest

docker-compose.yaml如下

version: '3'
services:
  zookeeper:
    image: 'bitnami/zookeeper:latest'
    ports:
      - '2181:2181'
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
  kafka:
    image: 'bitnami/kafka:latest'
    ports:
      - '9092:9092'
    environment:
      - KAFKA_BROKER_ID=1
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
      - KAFKA_CFG_ADVERTISED_LISTENER=PLAINTEXT://127.0.0.1:9092
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
    depends_on:
      - zookeeper

启动命令

docker-compose up -d

截图

之后的相关命令若涉及容器id,请自行更换

文件与配置

目录

docker exec -it a0 ls /opt/bitnami/kafka

查看目录命令

截图

重要目录解释如下:

  • bin: 脚本目录
  • config:配置目录
  • libs:第三方依赖库目录
  • logs:日志

bin

重要的shell脚本加粗了,之后会用

connect-distributed.sh kafka-dump-log.sh kafka-storage.sh

connect-mirror-maker.sh kafka-features.sh kafka-streams-application-reset.sh

connect-standalone.sh kafka-get-offsets.sh kafka-topics.sh

kafka-acls.sh kafka-leader-election.sh kafka-transactions.sh

kafka-broker-api-versions.sh kafka-log-dirs.sh kafka-verifiable-consumer.sh

kafka-cluster.sh kafka-metadata-shell.sh kafka-verifiable-producer.sh

kafka-configs.sh kafka-mirror-maker.sh trogdor.sh

kafka-console-consumer.sh kafka-producer-perf-test.sh windows

kafka-console-producer.sh kafka-reassign-partitions.sh zookeeper-security-migration.sh

kafka-consumer-groups.sh kafka-replica-verification.sh zookeeper-server-start.sh

kafka-consumer-perf-test.sh kafka-run-class.sh zookeeper-server-stop.sh

kafka-delegation-tokens.sh kafka-server-start.sh zookeeper-shell.sh

kafka-delete-records.sh kafka-server-stop.sh

config

connect-console-sink.properties connect-mirror-maker.properties server.properties

connect-console-source.properties connect-standalone.properties tools-log4j.properties

connect-distributed.properties consumer.properties trogdor.conf

connect-file-sink.properties kraft zookeeper.properties

connect-file-source.properties log4j.properties

connect-log4j.properties producer.properties

配置文件

server.properties

  • broker.id: 唯一id值,通过环境变量设置为了1
  • log.dirs: kafka集群日志目录,默认是log.dirs=/bitnami/kafka/data
  • zookeeper.connect:zookeeper地址端口,格式域名/ip:port,这块是zookeeper:2181,在docker的网络中可以解析为另一容器的ip

更多配置可以查看参考中Dockerhub链接的Configuration部分

producer.properties

  • bootstrap.servers:kafka的ip:port,这里是localhost:9092
  • compression.type:压缩类型,默认是none, 一共有四种,none, gzip, snappy, lz4, zstd,推荐排序LZ4 > GZIP > Snappy,详见腾讯云压缩算法对比

consumer.properties

  • group.id:消费者组id,默认为test-consumer-group
  • auto.offset.reset:offset设置,三种latest, earliest, none,看情况设置

命令行简单使用

kafka-topics.sh

对主题topic进行增删改查的工具

常用选项如下:

  • --bootstrap-server:kafka服务器ip:port,必须的
  • --create:创建主题
  • --delete:删除主题
  • --describe:描述主题
  • --list:查看主题列表
  • --alter:修改主题的 partitions等
  • --topic :主题名
  • --topic-id :主题id
  • --partitions :主题的partition

新增

命令

docker exec -it a0 /opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server=127.0.0.1:9092 --create --topic lady_killer9

截图

查看列表

命令

docker exec -it a0 /opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server=127.0.0.1:9092 --list

截图

查看详情

命令

docker exec -it a0 /opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server=127.0.0.1:9092 --describe --topic lady_killer9

截图

修改

命令

以修改主题partiion数量为例

docker exec -it a0 /opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server=127.0.0.1:9092 --alter --topic lady_killer9 --partitions 3

截图

删除

命令

docker exec -it a0 /opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server=127.0.0.1:9092 --delete --topic lady_killer9

截图

kafka-console-producer.sh

标准输入读数据,发送到Kafka的工具

常用选项如下:

  • --bootstrap-server:kafka服务器ip:port,必须的
  • --topic  :Kafka主题,必须的
  • --sync:同步发送
  • --compression-codec [String: compression-codec] :压缩方式,‘none’,‘gzip’, ‘snappy’, ‘lz4’, , ‘zstd’,默认gzip.

命令

docker exec -it a0 /opt/bitnami/kafka/bin/kafka-topics.sh --bootstrap-server=127.0.0.1:9092 --create --topic demo
docker exec -it a0 /opt/bitnami/kafka/bin/kafka-console-producer.sh  --bootstrap-server=127.0.0.1:9092 --topic demo

截图

kafka-console-consumer.sh

常用选项如下:

  • --bootstrap-server:kafka服务器ip:port,必须的
  • --topic  :Kafka主题,必须的
  • --group :消费者组id
  • --key-deserializer :key反序列化,默认是org.apache.kafka.common.serialization.StringDeserializer
  • --value-deserializer :value反序列化,默认是org.apache.kafka.common.serialization.StringDeserializer
  • --offset :消费的offset
  • --partition :消费的分区

命令

docker exec -it a0 /opt/bitnami/kafka/bin/kafka-console-consumer.sh  --bootstrap-server=127.0.0.1:9092 --topic demo

截图

命令

docker exec -it a0 /opt/bitnami/kafka/bin/kafka-console-consumer.sh  --bootstrap-server=127.0.0.1:9092 --topic demo --partition 0 --offset 2

截图

上手之后我们再来了解一些概念。

概念

集群

已发布的消息保存在一组服务器中,称为Kafka集群。

代理broker

集群中的每一个服务器都是一个代理。

主题topic

每条发布到kafka集群的消息都有一个主题,这个主题被称为topic。每个topic都由一个或者多个分区构成。

分区partition

topic的partition数量可以在创建时配置,partition数量决定了每个消费者组中并发消费者的最大数量

分区的原则:

  • 生产者指定了partition,则直接使用
  • 未指定partition但指定了key,通过对key的value进行hash出一个partition
  • partition和key都未指定,使用轮询选出一个partition

偏移量offset

任何发布到partition的消息都会被直接追加到partition尾部,每条消息的位置称为offset,offset是一个long型数字,它唯一标记一条消息。消费者可以通过(topic、partition、offset)跟踪记录。

生产者producer

push消息到topc的叫生产者,push后可以获得offset。生产者可以指定partition,但不建议这么做。

消费者组consumer group

包含多个消费者,有一个 group id,可以订阅topic进行消费。消费偏移以消费者组为单位。

消费者consumer

从topic中pull数据,可以指定partition和offset。

FAQ

如何保证一个主题下的数据,一定是有序的(生产与消费的顺序一致)?

Kafka每个partition中的消息在写入是都是有序的,消费时,每个partition只能被每一个group中的消费者消费,因此,topic下只有一个partition时一定有序。

如何设置分区和消费者数?

建议分区数与消费者数一致,防止消费不过来。

参考

dockerhub-bitnami/kafka

腾讯云CKafka 压缩算法对比

python-kafka客户端封装

相关文章
|
28天前
|
消息中间件 负载均衡 Kafka
【Kafka面试演练】那Kafka消费者手动提交、自动提交有什么区别?
嗯嗯Ok。分区的作用主要就是为了提高Kafka处理消息吞吐量。每一个topic会被分为多个分区。假如同一个topic下有n个分区、n个消费者,这样的话每个分区就会发送消息给对应的一个消费者,这样n个消费者负载均衡地处理消息。同时生产者会发送消息给不同分区,每个分区分给不同的brocker处理,让集群平坦压力,这样大大提高了Kafka的吞吐量。面试官思考中…
47 4
|
1月前
|
消息中间件 Java Kafka
关于kafka消费者超时配置
关于kafka消费者超时配置
52 2
|
3月前
|
消息中间件 Kafka 流计算
Flink的分区表订阅功能是通过Kafka的topic分区来实现的
Flink的分区表订阅功能是通过Kafka的topic分区来实现的【1月更文挑战第6天】【1月更文挑战第26篇】
99 1
|
3月前
|
消息中间件 缓存 Kafka
探究Kafka原理-5.Kafka设计原理和生产者原理解析(下)
探究Kafka原理-5.Kafka设计原理和生产者原理解析
38 0
|
3月前
|
消息中间件 存储 负载均衡
探究Kafka原理-5.Kafka设计原理和生产者原理解析(上)
探究Kafka原理-5.Kafka设计原理和生产者原理解析
60 0
|
28天前
|
消息中间件 存储 负载均衡
Kafka【付诸实践 01】生产者发送消息的过程描述及设计+创建生产者并发送消息(同步、异步)+自定义分区器+自定义序列化器+生产者其他属性说明(实例源码粘贴可用)【一篇学会使用Kafka生产者】
【2月更文挑战第21天】Kafka【付诸实践 01】生产者发送消息的过程描述及设计+创建生产者并发送消息(同步、异步)+自定义分区器+自定义序列化器+生产者其他属性说明(实例源码粘贴可用)【一篇学会使用Kafka生产者】
106 4
|
27天前
|
消息中间件 分布式计算 Kafka
SparkStreaming(SparkStreaming概述、入门、Kafka数据源、DStream转换、输出、关闭)
SparkStreaming(SparkStreaming概述、入门、Kafka数据源、DStream转换、输出、关闭)(一)
44 5
|
30天前
|
消息中间件 Java Kafka
Kafka【环境搭建 01】kafka_2.12-2.6.0 单机版安装+参数配置及说明+添加到service服务+开机启动配置+验证+chkconfig配置说明(一篇入门kafka)
【2月更文挑战第19天】Kafka【环境搭建 01】kafka_2.12-2.6.0 单机版安装+参数配置及说明+添加到service服务+开机启动配置+验证+chkconfig配置说明(一篇入门kafka)
41 1
|
1天前
|
消息中间件 负载均衡 监控
Kafka消费者:监听模式VS主动拉取,哪种更适合你?
Kafka消费者:监听模式VS主动拉取,哪种更适合你?
16 0
|
28天前
|
消息中间件 Kafka Linux
Kafka【付诸实践 03】Offset Explorer Kafka 的终极 UI 工具安装+简单上手+关键特性测试(一篇学会使用 Offset Explorer)
【2月更文挑战第21天】Kafka【付诸实践 03】Offset Explorer Kafka 的终极 UI 工具安装+简单上手+关键特性测试(一篇学会使用 Offset Explorer)
138 2

热门文章

最新文章