分布式实时消息队列Kafka(二)Kafka分布式集群部署

本文涉及的产品
注册配置 MSE Nacos/ZooKeeper,118元/月
云原生网关 MSE Higress,422元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 分布式实时消息队列Kafka(二)Kafka分布式集群部署

分布式实时消息队列Kafka(二)

知识点01:课程回顾

  1. 什么是消息队列?
  • 用于两个系统之间或者两个模块之间实现消息传递,基于队列机制实现数据缓存
  1. 消息队列的优点是什么?
  • 实现解耦
  • 通过异步,提高性能
  1. 消息队列的缺点是什么?
  • 架构更加复杂:如果消息队列出现故障,整个系统都会故障
  • 分布式集群
  • 副本机制
  • 数据维护更加复杂:不丢失,不重复
  • 生产安全:幂等性机制
  • 消费安全:Offset
  1. 什么是同步与异步?
  • 同步:立即一致性
  • 异步:最终一致性
  1. 什么是Kafka?
  • Kafka是一个基于订阅发布模式的高性能、高吞吐的实时消息队列系统
  1. Kafka在大数据的应用场景是什么?
  • 用于实时架构中:实现数据的临时存储
  1. Kafka中的Producer、Consumer、Consumer Group 、Broker分别是什么?
  • Producer:生产者,负责写入数据到Kafka
  • Consumer:消费者,负责从Kafka消费读取数据
  • Consumer Group:消费者组
  • Kafka中的数据消费必须以消费者组为单位
  • 一个消费者组可以包含多个消费者,注意多个消费者消费的数据加在一起是一份完整的数据
  • 目的:提高性能
  • 消费者组消费Topic
  • 消费者组中的消费者消费Topic的分区
  • Broker:Kafka一个节点
  • 多个节点,构建Kafka集群
  • 主从架构:类似于Zookeeper
  • HDFS:NameNode、DataNode
  • Hbase:HMaster、HRegionServer
  • Kafka:Kafka
  • 主:Kafka Controler
  • 从:Kafka Broker
  • 启动Kafka时候,会从所有的Broker选举一个Controler,如果Controller故障,会从其他的Broker重新选举一个
  • 选举:使用ZK是实现辅助选举
  1. Kafka中的Topic与Partition是什么?
  • Topic:逻辑上实现数据存储的分类,类似于数据库中的表概念
  • Partition:Topic中用于实现分布式存储的物理单元,一个Topic可以有多个分区
  • 每个分区可以存储在不同的节点,实现分布式存储
  • 副本机制:Kafka中每个分区可以构建多个副本【副本个数 <= 机器的个数】
  • 将一个分区的多个副本分为两种角色
  • leader副本:负责对外提供读写请求
  • follower副本:负责与leader同步数据,如果leader故障,follower要重新选举一个成为leader
  • 选举:不由ZK实现选举,由Kafka Crontroller来决定谁是leader
  1. Kafka中的Segment是什么?
  • Segment:对分区内部的数据进行更细的划分,分区段,文件段
  • 类似于Region中划分store
  • 规则:按照文件产生的时间或者大小
  • 目的:提高写入和查询性能
  • 文件名称可以用于检索数据:用offset命名的
  • 组成:每个Segment由两个文件组成
  • .log:存储的数据
  • .index:对应.log文件的索引信息
  1. Kafka中的Offset是什么?
  • Offset是kafka中存储数据时给每个数据做的标记或者编号
  • 分区级别的编号
  • 从0开始编号
  • 功能:消费者根据offset来进行消费,保证顺序消费,数据安全

知识点02:课程目标

  1. Kafka的集群如何搭建启动?
  • 实现Kafka分布式集群的安装部署【按照笔记一步步搭建】
  1. Kafka的Topic如何创建管理?【掌握】
  • 命令行实现
  • 创建Topic
  • 查看Topic信息
  • 删除、列举Topic
  1. Kafka的Java API如何实现?【掌握类和方法】
  • Java API
  • 开发生产者
  • 开发消费者

知识点03:Kafka集群架构

  • 目标了解Kafka集群架构及角色功能
  • 路径
  • Kafka集群有哪些角色?
  • Kafka每个角色的功能是什么?
  • Zookeeper在架构中的作用是什么?
  • 实施
  • 架构角色
  • Kafka
  • Zookeeper
  • Kafka中的每个角色以及对应的功能
  • 分布式主从架构
  • 主:Kafka Controller
  • 负责管理所有从节点:Topic、分区和副本
  • 每次启动集群,会从所有Broker中选举一个Controller,由ZK实现
  • 从:Kafka Broker
  • 对外提供读写请求
  • 其他的Broker监听Controller,如果Controller,会重新从Broker选举一个新的
  • ZK的功能
  • 辅助选举Active的主节点
  • 存储元数据
  • 小结
  • kafka是一个主从架构,整体对外提供分布式读写
  • ZK主要负责选举Controller和实现元数据存储

知识点04:Kafka分布式集群部署

  • 目标实现Kafka分布式集群的搭建部署
  • 路径
  • step1:选择版本
  • step2:下载解压安装
  • step:3:修改配置文件
  • 实施
  • 版本的选型
  • 0.8.x:老的版本,很多的问题
  • 0.10.x +:消息功能上基本没有问题
  • 选择:kafka_2.12-2.4.1.tgz
  • Kafka:2.4.1
  • Scala:2.12,Kafka是由Scala语言开发
  • 下载解压安装
cd /export/software/
rz
  • 解压
tar -zxvf kafka_2.12-2.4.1.tgz -C /export/server/
cd /export/server/kafka_2.12-2.4.1/
mkdir logs
  • bin:一般用于存放客户端操作命令脚本
  • sbin:一般用于存放集群的启动和关闭的命令脚本,如果没有这个命令,脚本放在bin目录中
  • conf/etc/config:配置文件目录
  • lib:jar包的存放目录
  • logs:一般用于存放服务日志
  • 修改配置
  • 切换到配置文件目录
cd /export/server/kafka_2.12-2.4.1/config
  • 修改server.properties
#21行:唯一的 服务端id
broker.id=0
#60行:指定kafka的日志及数据【segment【.log,.index】】存储的位置
log.dirs=/export/server/kafka_2.12-2.4.1/logs 
#123行:指定zookeeper的地址
zookeeper.connect=node1:2181,node2:2181,node3:2181
  • #在最后添加两个配置,允许删除topic,当前kafkaServer的主机名
    delete.topic.enable=true
    host.name=node1
- 分发
```shell
cd /export/server/
scp -r kafka_2.12-2.4.1 node2:$PWD
scp -r kafka_2.12-2.4.1 node3:$PWD
  • 第二台:server.properties
#21行:唯一的 服务端id
  • broker.id=1
    #最后
    host.name=node2
- 第三台:server.properties
```properties
#21行:唯一的 服务端id
broker.id=2
#最后
host.name=node3
  • 添加环境变量
vim /etc/profile

         
  • #KAFKA_HOME
    export KAFKA_HOME=/export/server/kafka_2.12-2.4.1
    export PATH=:P A T H : PATH:PATH:KAFKA_HOME/bin
```shell
source /etc/profile
  • 小结
  • 按照笔记一步步来,不做过多要求,只要配置含义,实现安装即可
  • 解压安装
  • 修改配置:server.properties

知识点05:Kafka启动与关闭

  • 目标掌握kafka集群的启动与关闭命令及脚本封装
  • 路径
  • step1:如何启动Kafka集群?
  • step2:如何关闭Kafka集群?
  • step3:如何封装启动及关闭脚本?
  • 实施
  • 启动Zookeeper
/export/server/zookeeper-3.4.6/bin/start-zk-all.sh 
  • 启动Kafka
bin/kafka-server-start.sh config/server.properties >>/dev/null 2>&1 &
 >>/dev/null 2>&1 &:在后台运行
  • 关闭Kafka
bin/kafka-server-stop.sh 
  • 封装Kafka脚本
  • 启动脚本
vim /export/server/kafka_2.12-2.4.1/bin/start-kafka.sh
#!/bin/bash
KAFKA_HOME=/export/server/kafka_2.12-2.4.1
for number in {1..3}
do
        host=node${number}
        echo ${host}
        /usr/bin/ssh ${host} "cd ${KAFKA_HOME};source /etc/profile;export JMX_PORT=9988;${KAFKA_HOME}/bin/kafka-server-start.sh ${KAFKA_HOME}/config/server.properties >>/dev/null 2>&1 &"
        echo "${host} started"
done
chmod u+x /export/server/kafka_2.12-2.4.1/bin/start-kafka.sh
  • 关闭脚本
vim /export/server/kafka_2.12-2.4.1/bin/stop-kafka.sh
#!/bin/bash
KAFKA_HOME=/export/server/kafka_2.12-2.4.1
for number in {1..3}
do
  host=node${number}
  echo ${host}
  /usr/bin/ssh ${host} "cd ${KAFKA_HOME};source /etc/profile;${KAFKA_HOME}/bin/kafka-server-stop.sh"
  echo "${host} stoped"
done
chmod u+x /export/server/kafka_2.12-2.4.1/bin/stop-kafka.sh
  • 小结
  • 启动:kafka-server-start.sh
  • 关闭:kafka-server-stop.sh

知识点06:Topic管理:创建与列举

  • 目标掌握Kafka集群中Topic的管理命令,实现创建Topic及列举Topic
  • 路径
  • step1:Topic脚本的使用
  • step2:创建Topic
  • step3:列举Topic
  • 实施
  • Topic管理脚本
  • 查看用法
Create, delete, describe, or change a topic.
Option                                   Description                            
------                                   -----------                            
--alter                                  Alter the number of partitions,        
                                           replica assignment, and/or           
                                           configuration for the topic.         
--at-min-isr-partitions                  if set when describing topics, only    
                                           show partitions whose isr count is   
                                           equal to the configured minimum. Not 
                                           supported with the --zookeeper       
                                           option.                              
--bootstrap-server <String: server to    REQUIRED: The Kafka server to connect  
  connect to>                              to. In case of providing this, a     
                                           direct Zookeeper connection won't be 
                                           required.                            
--command-config <String: command        Property file containing configs to be 
  config property file>                    passed to Admin Client. This is used 
                                           only with --bootstrap-server option  
                                           for describing and altering broker   
                                           configs.                             
--config <String: name=value>            A topic configuration override for the 
                                           topic being created or altered.The   
                                           following is a list of valid         
                                           configurations:                      
                                                cleanup.policy                        
                                                compression.type                      
                                                delete.retention.ms                   
                                                file.delete.delay.ms                  
                                                flush.messages                        
                                                flush.ms                              
                                                follower.replication.throttled.       
                                           replicas                             
                                                index.interval.bytes                  
                                                leader.replication.throttled.replicas 
                                                max.compaction.lag.ms                 
                                                max.message.bytes                     
                                                message.downconversion.enable         
                                                message.format.version                
                                                message.timestamp.difference.max.ms   
                                                message.timestamp.type                
                                                min.cleanable.dirty.ratio             
                                                min.compaction.lag.ms                 
                                                min.insync.replicas                   
                                                preallocate                           
                                                retention.bytes                       
                                                retention.ms                          
                                                segment.bytes                         
                                                segment.index.bytes                   
                                                segment.jitter.ms                     
                                                segment.ms                            
                                                unclean.leader.election.enable        
                                         See the Kafka documentation for full   
                                           details on the topic configs.It is   
                                           supported only in combination with --
                                           create if --bootstrap-server option  
                                           is used.                             
--create                                 Create a new topic.                    
--delete                                 Delete a topic                         
--delete-config <String: name>           A topic configuration override to be   
                                           removed for an existing topic (see   
                                           the list of configurations under the 
                                           --config option). Not supported with 
                                           the --bootstrap-server option.       
--describe                               List details for the given topics.     
--disable-rack-aware                     Disable rack aware replica assignment  
--exclude-internal                       exclude internal topics when running   
                                           list or describe command. The        
                                           internal topics will be listed by    
                                           default                              
--force                                  Suppress console prompts               
--help                                   Print usage information.               
--if-exists                              if set when altering or deleting or    
                                           describing topics, the action will   
                                           only execute if the topic exists.    
                                           Not supported with the --bootstrap-  
                                           server option.                       
--if-not-exists                          if set when creating topics, the       
                                           action will only execute if the      
                                           topic does not already exist. Not    
                                           supported with the --bootstrap-      
                                           server option.                       
--list                                   List all available topics.             
--partitions <Integer: # of partitions>  The number of partitions for the topic 
                                           being created or altered (WARNING:   
                                           If partitions are increased for a    
                                           topic that has a key, the partition  
                                           logic or ordering of the messages    
                                           will be affected). If not supplied   
                                           for create, defaults to the cluster  
                                           default.                             
--replica-assignment <String:            A list of manual partition-to-broker   
  broker_id_for_part1_replica1 :           assignments for the topic being      
  broker_id_for_part1_replica2 ,           created or altered.                  
  broker_id_for_part2_replica1 :                                                
  broker_id_for_part2_replica2 , ...>                                           
--replication-factor <Integer:           The replication factor for each        
  replication factor>                      partition in the topic being         
                                           created. If not supplied, defaults   
                                           to the cluster default.              
--topic <String: topic>                  The topic to create, alter, describe   
                                           or delete. It also accepts a regular 
                                           expression, except for --create      
                                           option. Put topic name in double     
                                           quotes and use the '\' prefix to     
                                           escape regular expression symbols; e.
                                           g. "test\.topic".                    
--topics-with-overrides                  if set when describing topics, only    
                                           show topics that have overridden     
                                           configs                              
--unavailable-partitions                 if set when describing topics, only    
                                           show partitions whose leader is not  
                                           available                            
--under-min-isr-partitions               if set when describing topics, only    
                                           show partitions whose isr count is   
                                           less than the configured minimum.    
                                           Not supported with the --zookeeper   
                                           option.                              
--under-replicated-partitions            if set when describing topics, only    
                                           show under replicated partitions     
--version                                Display Kafka version.                 
--zookeeper <String: hosts>              DEPRECATED, The connection string for  
                                           the zookeeper connection in the form 
                                           host:port. Multiple hosts can be     
                                           given to allow fail-over. 
  • 创建Topic
bin/kafka-topics.sh --create --topic bigdata01 --partitions 3 --replication-factor 2 --bootstrap-server node1:9092,node2:9092,node3:9092
  • –create:创建
  • –topic :指定操作的Topic的名称
  • –partitions:指定分区个数,默认为1
  • –replication-factor:副本因子,默认为1
  • –bootstrap-server:指定Kafka服务端地址
  • 列举Topic
bin/kafka-topics.sh --list -bootstrap-server node1:9092,node2:9092,node3:9092
  • –list:表示列举

  • 小结
  • 创建:create
  • 指定分区个数
  • 指定副本个数
  • 列举:list
  • 必选:–bootstrap-server:服务端地址
  • 端口:9092

知识点07:Topic管理:查看与删除

  • 目标掌握Kafka集群中Topic的管理命令,实现查看Topic信息及删除Topic
  • 路径
  • step1:查看Topic详细信息
  • step2:删除Topic
  • 实施
  • 查看Topic信息
bin/kafka-topics.sh --describe --topic bigdata02  --bootstrap-server node1:9092,node2:9092,node3:9092
Topic: bigdata02        PartitionCount: 2       ReplicationFactor: 3    Configs: segment.bytes=1073741824
        Topic: bigdata02        Partition: 0    Leader: 0       Replicas: 0,1,2 Isr: 0,1,2
        Topic: bigdata02        Partition: 1    Leader: 2       Replicas: 2,0,1 Isr: 2,0,1
  • Partition:分区编号
  • Replicas:分区副本所在的Kafka Broker ID
  • 每个分区的副本有两种角色
  • leader副本
  • follower副本
  • Leader:leader 副本所在的Kafka节点
  • Isr:In-Sync-Replicas:正在同步的副本,可用的副本
  • 用于leader故障时,选举新的leader
  • 删除Topic
bin/kafka-topics.sh --delete --topic bigdata02  --bootstrap-server node1:9092,node2:9092,node3:9092
  • 小结
  • 查看信息:describe
  • 删除:delete

知识点08:生产者及消费者测试

  • 目标:了解命令行如何模拟测试生产者和消费者
  • 路径
  • step1:构建一个生产者往Topic中生产数据
  • 指定Topic
  • 指定Kafka集群地址
  • step2:构建一个消费者从Topic中消费数据
  • 指定Topic
  • 指定Kafka集群地址
  • 实施
  • 命令行提供的脚本

  • Console生产者
bin/kafka-console-producer.sh --topic bigdata01 --broker-list node1:9092,node2:9092,node3:9092
  • Console消费者
bin/kafka-console-consumer.sh --topic bigdata01 --bootstrap-server node1:9092,node2:9092,node3:9092  --from-beginning
  • –from-beginning:从每个分区的最初开始消费,默认从最新的offset进行消费

  • 小结
  • 只要生产者不断生产,消费就能实时的消费到Topic中的数据

知识点09:可视化工具Kafka Tool

  • 目标了解Windows版可视化工具Kafka Tool的使用
  • 路径
  • step1:安装Kafka Tool
  • step2:启动构建连接
  • step3:查看Kafka集群信息
  • 实施
  • 安装Kafka Tool:不断下一步即可


  • 构建集群连接:连接Kafka集群


  • 查看集群信息

  • 小结
  • 可视化工具,界面或者交互性不是很友好
  • 后面会学习:Kafka Eagle

知识点10:Kafka集群压力测试

  • 目标了解如何实现Kafka集群的吞吐量及压力测试
  • 路径
  • step1:生产压力测试
  • step2:消费压力测试
  • 实施
  • 创建Topic
bin/kafka-topics.sh --create --topic bigdata --partitions 2 --replication-factor 2 --bootstrap-server node1:9092,node2:9092,node3:9092
  • 生产测试
kafka-producer-perf-test.sh --topic bigdata --num-records 5000000 --throughput -1 --record-size 1000 --producer-props bootstrap.servers=node1:9092,node2:9092,node3:9092 acks=1
  • –num-records:写入数据的条数
  • –throughput:是否做限制,-1表示不限制
  • –record-size:每条数据的字节大小

  • 消费测试
kafka-consumer-perf-test.sh --topic bigdata --broker-list node1:9092,node2:9092,node3:9092  --fetch-size 1048576 --messages 5000000
  • [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-86jBAgCD-1625805582217)(20210330_分布式实时消息队列Kafka(二).assets/image-20210330105146154.png)]
  • 小结
  • 工作中一般根据实际的需求来调整参数,测试kafka集群的最高性能,判断是否能满足需求

知识点11:Kafka API 的应用

  • 目标了解工作中使用Kafka API的场景
  • 路径
  • step1:工作中使用Kafka的方式
  • step2:Kafka API的分类
  • 实施
  • 命令行使用Kafka
  • 一般只用于topic的管理:创建、删除
  • 大数据架构中使用Kafka
  • Java API:构建生产者和消费者
  • 工作中一般不用自己开发生产者和消费者
  • 生产者:数据采集工具
  • Flume:Kafka sink
  • 配置kafka集群地址
  • Topic的名称
  • 消费者:实时计算程序
  • SparkStream:KafkaUtil
KafkaUtil.createDirectStream
  • 这些软件的API已经将Kafka生产者和消费者的API封装了,只要调用即可
  • 重点掌握:用到哪些类和方法
  • Kafka的API的分类
  • High Level API:高级API
  • 基于了SimpleAPI做了封装,让用户开发更加方便
  • 但是由于封装了底层的API,有很多的东西不能控制,无法控制数据安全
  • Simple API:简单API
  • 并不简单,最原始的API
  • 自定义控制所有消费和生产、保证数据安全
  • 小结
  • 大数据工作中一般不自己开发Java API:掌握类和方法即可
  • 只使用Simple API来实现开发

知识点12:生产者API:构建KafkaProducer

  • 目标了解如何通过Java API构建生产者
  • 路径
  • step1:构建集群配置对象
  • 指定服务端集群地址
  • step2:构建Kafka Porducer对象
  • 加载配置
  • 实施
  • 构建集群配置对象
//todo:1-构建连接,Kafka生产者对象
//构建配置对象,指定生产者的配置
Properties props = new Properties();
props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092");//指定集群地址
/**
         * acks:表示生产者生产数据时,怎么保证数据不丢失,Kafka接受写入数据以后,可以给生产者返回一个ack,表示收到这条数据,生产者发送下一条
         * 0:生产者不管Kafka有没有返回ack,都直接发送下一条
         *              快、数据容易丢失
         * 1:生产者发送数据给Kafka的某个分区,写入leader副本以后,kafka就返回ack,生产者发送下一条
         *              相对安全机制,有一定的概率,数据会丢失
         * all:生产者发送数据给Kafka的某个分区,写入leader副本并且所有follower同步成功以后,kafka就返回ack,生产者发送下一条
         *              最安全,性能较差
         */
props.put("acks", "all");
//指定Key的序列化的类
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//指定Value的序列化的类
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
  • 构建Kafka Producer加载配置
//构建生产者对象
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
  • 小结
  • Properties:构建生产者的配置
  • 集群地址
  • acks
  • 序列化的类
  • KafkaProducer:生产者的对象

知识点13:生产者API:生产数据到Kafka

  • 目标了解如何将数据写入Kafka中
  • 路径
  • step1:构建ProducerRecord对象
  • step2:调用KafkaProducer的send方法将数据写入Kafka
  • 实施
  • 构建Producer对象
//构建一条数据的对象
//ProducerRecord(String topic, V value)
//            ProducerRecord<String, String> record = new ProducerRecord<String, String>("bigdata01","itcast"+i);
//ProducerRecord(String topic, K key, V value)
//        ProducerRecord<String, String> record = new ProducerRecord<String, String>("bigdata01",i+"","itcast"+i);
//ProducerRecord(String topic, Integer partition, K key, V value)
ProducerRecord<String, String> record = new ProducerRecord<String, String>("bigdata01",0,i+"","itcast"+i);
  • 调用send方法
//生产数据的数据
 producer.send(record);
  • 小结
  • ProducerRecord:表示生产每一条数据
  • Topic
  • Key
  • Value
  • 可选:Partition
  • KafkaProducer:send:写入数据到Kafka

知识点14:消费者API:构建KafkaConsumer

  • 目标了解如何通过Java API构建消费者
  • 路径
  • step1:构建集群配置对象
  • step2:构建Kafka Consumer对象
  • 实施
  • 构建集群配置对象
//todo:1-构建连接,消费者对象
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "node1:9092");//服务端地址
        props.setProperty("group.id", "test01");//消费者组的id
        props.setProperty("enable.auto.commit", "true");//是否自动提交offset
        props.setProperty("auto.commit.interval.ms", "1000");//提交的间隔时间
        //指定key和value反序列化的类
        props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  • 构建Kafka Consumer加载配置
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
  • 小结
  • Properties:配置对象
  • KafkaConsumer:消费者对象

知识点15:消费者API:消费Topic数据

  • 目标了解如何从Kafka中消费数据
  • 路径
  • step1:消费者订阅Topic
  • step2:调用poll方法从Kafka中拉取数据,获取返回值
  • step3:从返回值中输出:Topic、Partition、Offset、Key、Value
  • 实施
  • 消费者订阅Topic
//订阅Topic
consumer.subscribe(Arrays.asList("bigdata01"));
  • 拉取数据
//消费数据
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
  • 输出数据
//取出每一条数据
for (ConsumerRecord<String, String> record : records) {
    //获取topic
    String topic = record.topic();
    //获取分区
    int partition = record.partition();
    //获取offset
    long offset = record.offset();
    //获取Key
    String key = record.key();
    //获取Value
    String value = record.value();
    System.out.println(topic+"\t"+partition+"\t"+offset+"\t"+key+"\t"+value);
}
  • 小结
  • KafkaConsumer:subscribe:负责订阅Kafka的Topic
  • KafkaConsumer:poll:负责拉取消费数据
  • ConsumerRecords:消费到的所有数据的集合
  • ConsumerRecord:消费到的每一条数据
  • topic:获取Topic
  • partition:获取分区
  • offset:获取offset
  • key:获取key
  • value:获取value

附录一:Maven依赖

<repositories>
    <repository>
        <id>aliyun</id>
        <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
    </repository>
</repositories>
<dependencies>
    <!-- Kafka的依赖 -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.4.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
        <version>2.4.1</version>
    </dependency>
</dependencies>
cord.key();
//获取Value
String value = record.value();
System.out.println(topic+“\t”+partition+“\t”+offset+“\t”+key+“\t”+value);
}
```
  • 小结
  • KafkaConsumer:subscribe:负责订阅Kafka的Topic
  • KafkaConsumer:poll:负责拉取消费数据
  • ConsumerRecords:消费到的所有数据的集合
  • ConsumerRecord:消费到的每一条数据
  • topic:获取Topic
  • partition:获取分区
  • offset:获取offset
  • key:获取key
  • value:获取value

附录一:Maven依赖

<repositories>
    <repository>
        <id>aliyun</id>
        <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
    </repository>
</repositories>
<dependencies>
    <!-- Kafka的依赖 -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.4.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
        <version>2.4.1</version>
    </dependency>
</dependencies>


目录
相关文章
|
17天前
|
消息中间件 存储 负载均衡
2024消息队列“四大天王”:Rabbit、Rocket、Kafka、Pulsar巅峰对决
本文对比了 RabbitMQ、RocketMQ、Kafka 和 Pulsar 四种消息队列系统,涵盖架构、性能、可用性和适用场景。RabbitMQ 以灵活路由和可靠性著称;RocketMQ 支持高可用和顺序消息;Kafka 专为高吞吐量和低延迟设计;Pulsar 提供多租户支持和高可扩展性。性能方面,吞吐量从高到低依次为
81 1
|
2月前
|
消息中间件 Java Kafka
初识Apache Kafka:搭建你的第一个消息队列系统
【10月更文挑战第24天】在数字化转型的浪潮中,数据成为了企业决策的关键因素之一。而高效的数据处理能力,则成为了企业在竞争中脱颖而出的重要武器。在这个背景下,消息队列作为连接不同系统和服务的桥梁,其重要性日益凸显。Apache Kafka 是一款开源的消息队列系统,以其高吞吐量、可扩展性和持久性等特点受到了广泛欢迎。作为一名技术爱好者,我对 Apache Kafka 产生了浓厚的兴趣,并决定亲手搭建一套属于自己的消息队列系统。
83 2
初识Apache Kafka:搭建你的第一个消息队列系统
|
3月前
|
消息中间件 存储 监控
消息队列系统中的确认机制在分布式系统中如何实现
消息队列系统中的确认机制在分布式系统中如何实现
|
3月前
|
消息中间件 存储 监控
【10月更文挑战第2天】消息队列系统中的确认机制在分布式系统中如何实现
【10月更文挑战第2天】消息队列系统中的确认机制在分布式系统中如何实现
|
3月前
|
消息中间件 中间件 Kafka
解锁Kafka等消息队列中间件的测试之道
在这个数字化时代,分布式系统和消息队列中间件(如Kafka、RabbitMQ)已成为日常工作的核心组件。本次公开课由前字节跳动资深专家KK老师主讲,深入解析消息队列的基本原理、架构及测试要点,涵盖功能、性能、可靠性、安全性和兼容性测试,并探讨其主要应用场景,如应用解耦、异步处理和限流削峰。课程最后设有互动答疑环节,助你全面掌握消息队列的测试方法。
|
3月前
|
消息中间件 存储 监控
消息队列系统中的确认机制在分布式系统中如何实现?
消息队列系统中的确认机制在分布式系统中如何实现?
|
3月前
|
消息中间件
实践部署《云消息队列RabbitMQ实践》测评
《云消息队列RabbitMQ实践》解决方案原理清晰,尤其在异步通信和解耦方面解释详尽。对初学者而言,部分术语如消息持久化、确认机制及集群性能优化可更细致。部署过程文档详实,涵盖主要环节,但插件配置等细节存在环境问题,需查阅社区资料解决。该方案展示了RabbitMQ的高吞吐量、灵活路由和可靠消息传递能力,但在高可用性和消息丢失处理上可提供更深入配置建议。适用于高并发和解耦场景,如订单处理、日志收集,有助于提升系统可扩展性。总体部署体验良好,实用性较强。
59 0
|
4月前
|
存储
cephFS高可用分布式文件系统部署指南
关于如何部署高可用的cephFS分布式文件系统,包括集群的搭建、验证高可用性以及实现两主一从架构的详细指南。
169 9
|
5月前
|
图形学 人工智能 C#
从零起步,到亲手实现:一步步教你用Unity引擎搭建出令人惊叹的3D游戏世界,绝不错过的初学者友好型超详细指南 ——兼探索游戏设计奥秘与实践编程技巧的完美结合之旅
【8月更文挑战第31天】本文介绍如何使用Unity引擎从零开始创建简单的3D游戏世界,涵盖游戏对象创建、物理模拟、用户输入处理及动画效果。Unity是一款强大的跨平台游戏开发工具,支持多种编程语言,具有直观编辑器和丰富文档。文章指导读者创建新项目、添加立方体对象、编写移动脚本,并引入基础动画,帮助初学者快速掌握Unity开发核心概念,迈出游戏制作的第一步。
295 1
|
5月前
|
消息中间件 存储 监控
消息队列系统中的确认机制在分布式系统中如何实现?
消息队列系统中的确认机制在分布式系统中如何实现?

热门文章

最新文章