分布式实时消息队列Kafka(二)Kafka分布式集群部署

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 分布式实时消息队列Kafka(二)Kafka分布式集群部署

分布式实时消息队列Kafka(二)

知识点01:课程回顾

  1. 什么是消息队列?
  • 用于两个系统之间或者两个模块之间实现消息传递,基于队列机制实现数据缓存
  1. 消息队列的优点是什么?
  • 实现解耦
  • 通过异步,提高性能
  1. 消息队列的缺点是什么?
  • 架构更加复杂:如果消息队列出现故障,整个系统都会故障
  • 分布式集群
  • 副本机制
  • 数据维护更加复杂:不丢失,不重复
  • 生产安全:幂等性机制
  • 消费安全:Offset
  1. 什么是同步与异步?
  • 同步:立即一致性
  • 异步:最终一致性
  1. 什么是Kafka?
  • Kafka是一个基于订阅发布模式的高性能、高吞吐的实时消息队列系统
  1. Kafka在大数据的应用场景是什么?
  • 用于实时架构中:实现数据的临时存储
  1. Kafka中的Producer、Consumer、Consumer Group 、Broker分别是什么?
  • Producer:生产者,负责写入数据到Kafka
  • Consumer:消费者,负责从Kafka消费读取数据
  • Consumer Group:消费者组
  • Kafka中的数据消费必须以消费者组为单位
  • 一个消费者组可以包含多个消费者,注意多个消费者消费的数据加在一起是一份完整的数据
  • 目的:提高性能
  • 消费者组消费Topic
  • 消费者组中的消费者消费Topic的分区
  • Broker:Kafka一个节点
  • 多个节点,构建Kafka集群
  • 主从架构:类似于Zookeeper
  • HDFS:NameNode、DataNode
  • Hbase:HMaster、HRegionServer
  • Kafka:Kafka
  • 主:Kafka Controler
  • 从:Kafka Broker
  • 启动Kafka时候,会从所有的Broker选举一个Controler,如果Controller故障,会从其他的Broker重新选举一个
  • 选举:使用ZK是实现辅助选举
  1. Kafka中的Topic与Partition是什么?
  • Topic:逻辑上实现数据存储的分类,类似于数据库中的表概念
  • Partition:Topic中用于实现分布式存储的物理单元,一个Topic可以有多个分区
  • 每个分区可以存储在不同的节点,实现分布式存储
  • 副本机制:Kafka中每个分区可以构建多个副本【副本个数 <= 机器的个数】
  • 将一个分区的多个副本分为两种角色
  • leader副本:负责对外提供读写请求
  • follower副本:负责与leader同步数据,如果leader故障,follower要重新选举一个成为leader
  • 选举:不由ZK实现选举,由Kafka Crontroller来决定谁是leader
  1. Kafka中的Segment是什么?
  • Segment:对分区内部的数据进行更细的划分,分区段,文件段
  • 类似于Region中划分store
  • 规则:按照文件产生的时间或者大小
  • 目的:提高写入和查询性能
  • 文件名称可以用于检索数据:用offset命名的
  • 组成:每个Segment由两个文件组成
  • .log:存储的数据
  • .index:对应.log文件的索引信息
  1. Kafka中的Offset是什么?
  • Offset是kafka中存储数据时给每个数据做的标记或者编号
  • 分区级别的编号
  • 从0开始编号
  • 功能:消费者根据offset来进行消费,保证顺序消费,数据安全

知识点02:课程目标

  1. Kafka的集群如何搭建启动?
  • 实现Kafka分布式集群的安装部署【按照笔记一步步搭建】
  1. Kafka的Topic如何创建管理?【掌握】
  • 命令行实现
  • 创建Topic
  • 查看Topic信息
  • 删除、列举Topic
  1. Kafka的Java API如何实现?【掌握类和方法】
  • Java API
  • 开发生产者
  • 开发消费者

知识点03:Kafka集群架构

  • 目标了解Kafka集群架构及角色功能
  • 路径
  • Kafka集群有哪些角色?
  • Kafka每个角色的功能是什么?
  • Zookeeper在架构中的作用是什么?
  • 实施
  • 架构角色
  • Kafka
  • Zookeeper
  • Kafka中的每个角色以及对应的功能
  • 分布式主从架构
  • 主:Kafka Controller
  • 负责管理所有从节点:Topic、分区和副本
  • 每次启动集群,会从所有Broker中选举一个Controller,由ZK实现
  • 从:Kafka Broker
  • 对外提供读写请求
  • 其他的Broker监听Controller,如果Controller,会重新从Broker选举一个新的
  • ZK的功能
  • 辅助选举Active的主节点
  • 存储元数据
  • 小结
  • kafka是一个主从架构,整体对外提供分布式读写
  • ZK主要负责选举Controller和实现元数据存储

知识点04:Kafka分布式集群部署

  • 目标实现Kafka分布式集群的搭建部署
  • 路径
  • step1:选择版本
  • step2:下载解压安装
  • step:3:修改配置文件
  • 实施
  • 版本的选型
  • 0.8.x:老的版本,很多的问题
  • 0.10.x +:消息功能上基本没有问题
  • 选择:kafka_2.12-2.4.1.tgz
  • Kafka:2.4.1
  • Scala:2.12,Kafka是由Scala语言开发
  • 下载解压安装
cd /export/software/
rz
  • 解压
tar -zxvf kafka_2.12-2.4.1.tgz -C /export/server/
cd /export/server/kafka_2.12-2.4.1/
mkdir logs
  • bin:一般用于存放客户端操作命令脚本
  • sbin:一般用于存放集群的启动和关闭的命令脚本,如果没有这个命令,脚本放在bin目录中
  • conf/etc/config:配置文件目录
  • lib:jar包的存放目录
  • logs:一般用于存放服务日志
  • 修改配置
  • 切换到配置文件目录
cd /export/server/kafka_2.12-2.4.1/config
  • 修改server.properties
#21行:唯一的 服务端id
broker.id=0
#60行:指定kafka的日志及数据【segment【.log,.index】】存储的位置
log.dirs=/export/server/kafka_2.12-2.4.1/logs 
#123行:指定zookeeper的地址
zookeeper.connect=node1:2181,node2:2181,node3:2181
  • #在最后添加两个配置,允许删除topic,当前kafkaServer的主机名
    delete.topic.enable=true
    host.name=node1
- 分发
```shell
cd /export/server/
scp -r kafka_2.12-2.4.1 node2:$PWD
scp -r kafka_2.12-2.4.1 node3:$PWD
  • 第二台:server.properties
#21行:唯一的 服务端id
  • broker.id=1
    #最后
    host.name=node2
- 第三台:server.properties
```properties
#21行:唯一的 服务端id
broker.id=2
#最后
host.name=node3
  • 添加环境变量
vim /etc/profile

         
  • #KAFKA_HOME
    export KAFKA_HOME=/export/server/kafka_2.12-2.4.1
    export PATH=:P A T H : PATH:PATH:KAFKA_HOME/bin
```shell
source /etc/profile
  • 小结
  • 按照笔记一步步来,不做过多要求,只要配置含义,实现安装即可
  • 解压安装
  • 修改配置:server.properties

知识点05:Kafka启动与关闭

  • 目标掌握kafka集群的启动与关闭命令及脚本封装
  • 路径
  • step1:如何启动Kafka集群?
  • step2:如何关闭Kafka集群?
  • step3:如何封装启动及关闭脚本?
  • 实施
  • 启动Zookeeper
/export/server/zookeeper-3.4.6/bin/start-zk-all.sh 
  • 启动Kafka
bin/kafka-server-start.sh config/server.properties >>/dev/null 2>&1 &
 >>/dev/null 2>&1 &:在后台运行
  • 关闭Kafka
bin/kafka-server-stop.sh 
  • 封装Kafka脚本
  • 启动脚本
vim /export/server/kafka_2.12-2.4.1/bin/start-kafka.sh
#!/bin/bash
KAFKA_HOME=/export/server/kafka_2.12-2.4.1
for number in {1..3}
do
        host=node${number}
        echo ${host}
        /usr/bin/ssh ${host} "cd ${KAFKA_HOME};source /etc/profile;export JMX_PORT=9988;${KAFKA_HOME}/bin/kafka-server-start.sh ${KAFKA_HOME}/config/server.properties >>/dev/null 2>&1 &"
        echo "${host} started"
done
chmod u+x /export/server/kafka_2.12-2.4.1/bin/start-kafka.sh
  • 关闭脚本
vim /export/server/kafka_2.12-2.4.1/bin/stop-kafka.sh
#!/bin/bash
KAFKA_HOME=/export/server/kafka_2.12-2.4.1
for number in {1..3}
do
  host=node${number}
  echo ${host}
  /usr/bin/ssh ${host} "cd ${KAFKA_HOME};source /etc/profile;${KAFKA_HOME}/bin/kafka-server-stop.sh"
  echo "${host} stoped"
done
chmod u+x /export/server/kafka_2.12-2.4.1/bin/stop-kafka.sh
  • 小结
  • 启动:kafka-server-start.sh
  • 关闭:kafka-server-stop.sh

知识点06:Topic管理:创建与列举

  • 目标掌握Kafka集群中Topic的管理命令,实现创建Topic及列举Topic
  • 路径
  • step1:Topic脚本的使用
  • step2:创建Topic
  • step3:列举Topic
  • 实施
  • Topic管理脚本
  • 查看用法
Create, delete, describe, or change a topic.
Option                                   Description                            
------                                   -----------                            
--alter                                  Alter the number of partitions,        
                                           replica assignment, and/or           
                                           configuration for the topic.         
--at-min-isr-partitions                  if set when describing topics, only    
                                           show partitions whose isr count is   
                                           equal to the configured minimum. Not 
                                           supported with the --zookeeper       
                                           option.                              
--bootstrap-server <String: server to    REQUIRED: The Kafka server to connect  
  connect to>                              to. In case of providing this, a     
                                           direct Zookeeper connection won't be 
                                           required.                            
--command-config <String: command        Property file containing configs to be 
  config property file>                    passed to Admin Client. This is used 
                                           only with --bootstrap-server option  
                                           for describing and altering broker   
                                           configs.                             
--config <String: name=value>            A topic configuration override for the 
                                           topic being created or altered.The   
                                           following is a list of valid         
                                           configurations:                      
                                                cleanup.policy                        
                                                compression.type                      
                                                delete.retention.ms                   
                                                file.delete.delay.ms                  
                                                flush.messages                        
                                                flush.ms                              
                                                follower.replication.throttled.       
                                           replicas                             
                                                index.interval.bytes                  
                                                leader.replication.throttled.replicas 
                                                max.compaction.lag.ms                 
                                                max.message.bytes                     
                                                message.downconversion.enable         
                                                message.format.version                
                                                message.timestamp.difference.max.ms   
                                                message.timestamp.type                
                                                min.cleanable.dirty.ratio             
                                                min.compaction.lag.ms                 
                                                min.insync.replicas                   
                                                preallocate                           
                                                retention.bytes                       
                                                retention.ms                          
                                                segment.bytes                         
                                                segment.index.bytes                   
                                                segment.jitter.ms                     
                                                segment.ms                            
                                                unclean.leader.election.enable        
                                         See the Kafka documentation for full   
                                           details on the topic configs.It is   
                                           supported only in combination with --
                                           create if --bootstrap-server option  
                                           is used.                             
--create                                 Create a new topic.                    
--delete                                 Delete a topic                         
--delete-config <String: name>           A topic configuration override to be   
                                           removed for an existing topic (see   
                                           the list of configurations under the 
                                           --config option). Not supported with 
                                           the --bootstrap-server option.       
--describe                               List details for the given topics.     
--disable-rack-aware                     Disable rack aware replica assignment  
--exclude-internal                       exclude internal topics when running   
                                           list or describe command. The        
                                           internal topics will be listed by    
                                           default                              
--force                                  Suppress console prompts               
--help                                   Print usage information.               
--if-exists                              if set when altering or deleting or    
                                           describing topics, the action will   
                                           only execute if the topic exists.    
                                           Not supported with the --bootstrap-  
                                           server option.                       
--if-not-exists                          if set when creating topics, the       
                                           action will only execute if the      
                                           topic does not already exist. Not    
                                           supported with the --bootstrap-      
                                           server option.                       
--list                                   List all available topics.             
--partitions <Integer: # of partitions>  The number of partitions for the topic 
                                           being created or altered (WARNING:   
                                           If partitions are increased for a    
                                           topic that has a key, the partition  
                                           logic or ordering of the messages    
                                           will be affected). If not supplied   
                                           for create, defaults to the cluster  
                                           default.                             
--replica-assignment <String:            A list of manual partition-to-broker   
  broker_id_for_part1_replica1 :           assignments for the topic being      
  broker_id_for_part1_replica2 ,           created or altered.                  
  broker_id_for_part2_replica1 :                                                
  broker_id_for_part2_replica2 , ...>                                           
--replication-factor <Integer:           The replication factor for each        
  replication factor>                      partition in the topic being         
                                           created. If not supplied, defaults   
                                           to the cluster default.              
--topic <String: topic>                  The topic to create, alter, describe   
                                           or delete. It also accepts a regular 
                                           expression, except for --create      
                                           option. Put topic name in double     
                                           quotes and use the '\' prefix to     
                                           escape regular expression symbols; e.
                                           g. "test\.topic".                    
--topics-with-overrides                  if set when describing topics, only    
                                           show topics that have overridden     
                                           configs                              
--unavailable-partitions                 if set when describing topics, only    
                                           show partitions whose leader is not  
                                           available                            
--under-min-isr-partitions               if set when describing topics, only    
                                           show partitions whose isr count is   
                                           less than the configured minimum.    
                                           Not supported with the --zookeeper   
                                           option.                              
--under-replicated-partitions            if set when describing topics, only    
                                           show under replicated partitions     
--version                                Display Kafka version.                 
--zookeeper <String: hosts>              DEPRECATED, The connection string for  
                                           the zookeeper connection in the form 
                                           host:port. Multiple hosts can be     
                                           given to allow fail-over. 
  • 创建Topic
bin/kafka-topics.sh --create --topic bigdata01 --partitions 3 --replication-factor 2 --bootstrap-server node1:9092,node2:9092,node3:9092
  • –create:创建
  • –topic :指定操作的Topic的名称
  • –partitions:指定分区个数,默认为1
  • –replication-factor:副本因子,默认为1
  • –bootstrap-server:指定Kafka服务端地址
  • 列举Topic
bin/kafka-topics.sh --list -bootstrap-server node1:9092,node2:9092,node3:9092
  • –list:表示列举

  • 小结
  • 创建:create
  • 指定分区个数
  • 指定副本个数
  • 列举:list
  • 必选:–bootstrap-server:服务端地址
  • 端口:9092

知识点07:Topic管理:查看与删除

  • 目标掌握Kafka集群中Topic的管理命令,实现查看Topic信息及删除Topic
  • 路径
  • step1:查看Topic详细信息
  • step2:删除Topic
  • 实施
  • 查看Topic信息
bin/kafka-topics.sh --describe --topic bigdata02  --bootstrap-server node1:9092,node2:9092,node3:9092
Topic: bigdata02        PartitionCount: 2       ReplicationFactor: 3    Configs: segment.bytes=1073741824
        Topic: bigdata02        Partition: 0    Leader: 0       Replicas: 0,1,2 Isr: 0,1,2
        Topic: bigdata02        Partition: 1    Leader: 2       Replicas: 2,0,1 Isr: 2,0,1
  • Partition:分区编号
  • Replicas:分区副本所在的Kafka Broker ID
  • 每个分区的副本有两种角色
  • leader副本
  • follower副本
  • Leader:leader 副本所在的Kafka节点
  • Isr:In-Sync-Replicas:正在同步的副本,可用的副本
  • 用于leader故障时,选举新的leader
  • 删除Topic
bin/kafka-topics.sh --delete --topic bigdata02  --bootstrap-server node1:9092,node2:9092,node3:9092
  • 小结
  • 查看信息:describe
  • 删除:delete

知识点08:生产者及消费者测试

  • 目标:了解命令行如何模拟测试生产者和消费者
  • 路径
  • step1:构建一个生产者往Topic中生产数据
  • 指定Topic
  • 指定Kafka集群地址
  • step2:构建一个消费者从Topic中消费数据
  • 指定Topic
  • 指定Kafka集群地址
  • 实施
  • 命令行提供的脚本

  • Console生产者
bin/kafka-console-producer.sh --topic bigdata01 --broker-list node1:9092,node2:9092,node3:9092
  • Console消费者
bin/kafka-console-consumer.sh --topic bigdata01 --bootstrap-server node1:9092,node2:9092,node3:9092  --from-beginning
  • –from-beginning:从每个分区的最初开始消费,默认从最新的offset进行消费

  • 小结
  • 只要生产者不断生产,消费就能实时的消费到Topic中的数据

知识点09:可视化工具Kafka Tool

  • 目标了解Windows版可视化工具Kafka Tool的使用
  • 路径
  • step1:安装Kafka Tool
  • step2:启动构建连接
  • step3:查看Kafka集群信息
  • 实施
  • 安装Kafka Tool:不断下一步即可


  • 构建集群连接:连接Kafka集群


  • 查看集群信息

  • 小结
  • 可视化工具,界面或者交互性不是很友好
  • 后面会学习:Kafka Eagle

知识点10:Kafka集群压力测试

  • 目标了解如何实现Kafka集群的吞吐量及压力测试
  • 路径
  • step1:生产压力测试
  • step2:消费压力测试
  • 实施
  • 创建Topic
bin/kafka-topics.sh --create --topic bigdata --partitions 2 --replication-factor 2 --bootstrap-server node1:9092,node2:9092,node3:9092
  • 生产测试
kafka-producer-perf-test.sh --topic bigdata --num-records 5000000 --throughput -1 --record-size 1000 --producer-props bootstrap.servers=node1:9092,node2:9092,node3:9092 acks=1
  • –num-records:写入数据的条数
  • –throughput:是否做限制,-1表示不限制
  • –record-size:每条数据的字节大小

  • 消费测试
kafka-consumer-perf-test.sh --topic bigdata --broker-list node1:9092,node2:9092,node3:9092  --fetch-size 1048576 --messages 5000000
  • [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-86jBAgCD-1625805582217)(20210330_分布式实时消息队列Kafka(二).assets/image-20210330105146154.png)]
  • 小结
  • 工作中一般根据实际的需求来调整参数,测试kafka集群的最高性能,判断是否能满足需求

知识点11:Kafka API 的应用

  • 目标了解工作中使用Kafka API的场景
  • 路径
  • step1:工作中使用Kafka的方式
  • step2:Kafka API的分类
  • 实施
  • 命令行使用Kafka
  • 一般只用于topic的管理:创建、删除
  • 大数据架构中使用Kafka
  • Java API:构建生产者和消费者
  • 工作中一般不用自己开发生产者和消费者
  • 生产者:数据采集工具
  • Flume:Kafka sink
  • 配置kafka集群地址
  • Topic的名称
  • 消费者:实时计算程序
  • SparkStream:KafkaUtil
KafkaUtil.createDirectStream
  • 这些软件的API已经将Kafka生产者和消费者的API封装了,只要调用即可
  • 重点掌握:用到哪些类和方法
  • Kafka的API的分类
  • High Level API:高级API
  • 基于了SimpleAPI做了封装,让用户开发更加方便
  • 但是由于封装了底层的API,有很多的东西不能控制,无法控制数据安全
  • Simple API:简单API
  • 并不简单,最原始的API
  • 自定义控制所有消费和生产、保证数据安全
  • 小结
  • 大数据工作中一般不自己开发Java API:掌握类和方法即可
  • 只使用Simple API来实现开发

知识点12:生产者API:构建KafkaProducer

  • 目标了解如何通过Java API构建生产者
  • 路径
  • step1:构建集群配置对象
  • 指定服务端集群地址
  • step2:构建Kafka Porducer对象
  • 加载配置
  • 实施
  • 构建集群配置对象
//todo:1-构建连接,Kafka生产者对象
//构建配置对象,指定生产者的配置
Properties props = new Properties();
props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092");//指定集群地址
/**
         * acks:表示生产者生产数据时,怎么保证数据不丢失,Kafka接受写入数据以后,可以给生产者返回一个ack,表示收到这条数据,生产者发送下一条
         * 0:生产者不管Kafka有没有返回ack,都直接发送下一条
         *              快、数据容易丢失
         * 1:生产者发送数据给Kafka的某个分区,写入leader副本以后,kafka就返回ack,生产者发送下一条
         *              相对安全机制,有一定的概率,数据会丢失
         * all:生产者发送数据给Kafka的某个分区,写入leader副本并且所有follower同步成功以后,kafka就返回ack,生产者发送下一条
         *              最安全,性能较差
         */
props.put("acks", "all");
//指定Key的序列化的类
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//指定Value的序列化的类
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
  • 构建Kafka Producer加载配置
//构建生产者对象
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
  • 小结
  • Properties:构建生产者的配置
  • 集群地址
  • acks
  • 序列化的类
  • KafkaProducer:生产者的对象

知识点13:生产者API:生产数据到Kafka

  • 目标了解如何将数据写入Kafka中
  • 路径
  • step1:构建ProducerRecord对象
  • step2:调用KafkaProducer的send方法将数据写入Kafka
  • 实施
  • 构建Producer对象
//构建一条数据的对象
//ProducerRecord(String topic, V value)
//            ProducerRecord<String, String> record = new ProducerRecord<String, String>("bigdata01","itcast"+i);
//ProducerRecord(String topic, K key, V value)
//        ProducerRecord<String, String> record = new ProducerRecord<String, String>("bigdata01",i+"","itcast"+i);
//ProducerRecord(String topic, Integer partition, K key, V value)
ProducerRecord<String, String> record = new ProducerRecord<String, String>("bigdata01",0,i+"","itcast"+i);
  • 调用send方法
//生产数据的数据
 producer.send(record);
  • 小结
  • ProducerRecord:表示生产每一条数据
  • Topic
  • Key
  • Value
  • 可选:Partition
  • KafkaProducer:send:写入数据到Kafka

知识点14:消费者API:构建KafkaConsumer

  • 目标了解如何通过Java API构建消费者
  • 路径
  • step1:构建集群配置对象
  • step2:构建Kafka Consumer对象
  • 实施
  • 构建集群配置对象
//todo:1-构建连接,消费者对象
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "node1:9092");//服务端地址
        props.setProperty("group.id", "test01");//消费者组的id
        props.setProperty("enable.auto.commit", "true");//是否自动提交offset
        props.setProperty("auto.commit.interval.ms", "1000");//提交的间隔时间
        //指定key和value反序列化的类
        props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  • 构建Kafka Consumer加载配置
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
  • 小结
  • Properties:配置对象
  • KafkaConsumer:消费者对象

知识点15:消费者API:消费Topic数据

  • 目标了解如何从Kafka中消费数据
  • 路径
  • step1:消费者订阅Topic
  • step2:调用poll方法从Kafka中拉取数据,获取返回值
  • step3:从返回值中输出:Topic、Partition、Offset、Key、Value
  • 实施
  • 消费者订阅Topic
//订阅Topic
consumer.subscribe(Arrays.asList("bigdata01"));
  • 拉取数据
//消费数据
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
  • 输出数据
//取出每一条数据
for (ConsumerRecord<String, String> record : records) {
    //获取topic
    String topic = record.topic();
    //获取分区
    int partition = record.partition();
    //获取offset
    long offset = record.offset();
    //获取Key
    String key = record.key();
    //获取Value
    String value = record.value();
    System.out.println(topic+"\t"+partition+"\t"+offset+"\t"+key+"\t"+value);
}
  • 小结
  • KafkaConsumer:subscribe:负责订阅Kafka的Topic
  • KafkaConsumer:poll:负责拉取消费数据
  • ConsumerRecords:消费到的所有数据的集合
  • ConsumerRecord:消费到的每一条数据
  • topic:获取Topic
  • partition:获取分区
  • offset:获取offset
  • key:获取key
  • value:获取value

附录一:Maven依赖

<repositories>
    <repository>
        <id>aliyun</id>
        <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
    </repository>
</repositories>
<dependencies>
    <!-- Kafka的依赖 -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.4.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
        <version>2.4.1</version>
    </dependency>
</dependencies>
cord.key();
//获取Value
String value = record.value();
System.out.println(topic+“\t”+partition+“\t”+offset+“\t”+key+“\t”+value);
}
```
  • 小结
  • KafkaConsumer:subscribe:负责订阅Kafka的Topic
  • KafkaConsumer:poll:负责拉取消费数据
  • ConsumerRecords:消费到的所有数据的集合
  • ConsumerRecord:消费到的每一条数据
  • topic:获取Topic
  • partition:获取分区
  • offset:获取offset
  • key:获取key
  • value:获取value

附录一:Maven依赖

<repositories>
    <repository>
        <id>aliyun</id>
        <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
    </repository>
</repositories>
<dependencies>
    <!-- Kafka的依赖 -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.4.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
        <version>2.4.1</version>
    </dependency>
</dependencies>


目录
相关文章
|
1月前
|
Java 测试技术 Linux
jmeter-分布式部署之负载机的设置
jmeter-分布式部署之负载机的设置
46 1
|
1月前
|
消息中间件 存储 大数据
Apache Kafka: 强大消息队列系统的介绍与使用
Apache Kafka: 强大消息队列系统的介绍与使用
|
1月前
|
消息中间件 存储 监控
RabbitMQ:分布式系统中的高效消息队列
RabbitMQ:分布式系统中的高效消息队列
|
1月前
|
消息中间件 存储 监控
消息队列:分布式系统中的重要组件
消息队列:分布式系统中的重要组件
|
1月前
|
消息中间件 存储 缓存
Kafka【基础知识 01】消息队列介绍+Kafka架构及核心概念(图片来源于网络)
【2月更文挑战第20天】Kafka【基础知识 01】消息队列介绍+Kafka架构及核心概念(图片来源于网络)
89 2
|
14天前
|
Docker 容器 关系型数据库
【PolarDB-X从入门到精通】 第四讲:PolarDB分布式版安装部署(源码编译部署)
本期课程将于4月11日19:00开始直播,内容包括源码编译基础知识和实践操作,课程目标是使学员掌握源码编译部署技能,为未来发展奠定基础,期待大家在课程中取得丰富的学习成果!
【PolarDB-X从入门到精通】 第四讲:PolarDB分布式版安装部署(源码编译部署)
|
6天前
|
消息中间件 存储 Java
深度探索:使用Apache Kafka构建高效Java消息队列处理系统
【4月更文挑战第17天】本文介绍了在Java环境下使用Apache Kafka进行消息队列处理的方法。Kafka是一个分布式流处理平台,采用发布/订阅模型,支持高效的消息生产和消费。文章详细讲解了Kafka的核心概念,包括主题、生产者和消费者,以及消息的存储和消费流程。此外,还展示了Java代码示例,说明如何创建生产者和消费者。最后,讨论了在高并发场景下的优化策略,如分区、消息压缩和批处理。通过理解和应用这些策略,可以构建高性能的消息系统。
|
19天前
|
消息中间件 存储 负载均衡
消息队列学习之kafka
【4月更文挑战第2天】消息队列学习之kafka,一个分布式的,支持多分区、多副本,基于 Zookeeper 的分布式消息流平台。
16 2
|
2月前
|
消息中间件 存储 监控
美团面试:Kafka如何处理百万级消息队列?
美团面试:Kafka如何处理百万级消息队列?
133 1
|
3月前
|
消息中间件 Kafka
消息队列 MQ:构建高效、可扩展的分布式系统
消息队列 MQ:构建高效、可扩展的分布式系统