Kafka概述
定义:
Kafka传统定义: Kafka是一个分布式的基于发布/订阅模式的消息队列(Message Queue),主要应用于大数据实时处理领域。
发布/订阅:消息的发布者不会将消息直接发布给特定的订阅者,而是将发布的消息分为不同的类别,订阅者只接收感兴趣的消息。
Kafka最新定义:Kafka是一个开源的分布式事件流平台(Event Streaming Platform),被数千家公司用于高性能的数据管道、流分析、数据集成和关键任务应用。
编辑
消息队列
目前企业中比较常见的消息队列产品主要有Kafka、ActiveMQ、RabbitMQ、RocketMQ等。
在大数据场景主要采用Kafka作为消息队列。在JavaEE开发中主要采用ActiveMQ、RabbitMQ、RocketMQ。
编辑
kafka安装
一、准备工作
1、JDK安装(version:1.8)
1.1.1、JDK官网下载
官网下载地址(需要oracle账号)
https://www.oracle.com/java/technologies/javase/javase-jdk8-downloads.html
1.1.2、JDK网盘下载
或者网盘下载:jdk-8u381-windows-x64.exe
1.1.3、JDK安装
可以参考博文:【java】windows下安装jdk1.8详细图文操作说明(包会)
1.2、Zookeeper安装
1.2.1、Zookeeper官网下载
官网下载地址:https://www.apache.org/dyn/closer.lua/zookeeper/zookeeper-3.6.4/apache-zookeeper-3.6.4-bin.tar.gz
1.2.2、Zookeeper网盘下载
或者网盘下载:apache-zookeeper-3.6.4-bin.tar.gz
1.2.3、Zookeeper安装
安装方法就不赘述了。
参考博文:Windows下安装Zookeeper(图文记录详细步骤,手把手包安装成功)
1.3、Scala安装(version:2.12)
1.3.1、Scala官网下载
官网下载地址:
https://downloads.lightbend.com/scala/2.11.12/scala-2.11.12.msi
1.3.2、Scala网盘下载
或者网盘下载:scala-2.11.12.msi
1.3.3、Scala安装
安装方法就不赘述了。
可参考博文:Windows下安装Scala(以Scala 2.11.12为例)
二、Kafka安装(version:2.12-3.5.1)
version:2.12-3.5.1,表示Scala版本是2.12,Kafka版本是基于此的3.5.1版本。
2.1、Kafka官网下载
https://kafka.apache.org/downloads
编辑
官网下载地址:kafka_2.12-3.5.1.tgz
2.2、Kafka网盘下载
网盘下载地址:kafka_2.12-3.5.1.tgz
2.3、Kafka安装
2.3.1、解压Kafka安装包到安装目录
这里解压到:D:\bigdata\kafka\2.12-3.5.1
2.3.2、Kafka安装目录下新建目录logs
编辑
2.3.3、修改Kafka配置文件 server.properties
文件路径:D:\bigdata\kafka\2.12-3.5.1\config\server.properties
2.3.3.1、修改 log.dirs 参数
修改 log.dirs 参数值,修改成上一步新建的logs文件夹。注意文件夹路径中是双左斜杠
log.dirs=D:\\bigdata\\kafka\\2.12-3.5.1\\logs
编辑
2.3.3.2、修改 listeners 参数
修改 listeners 参数值。
listeners=PLAINTEXT://localhost:9092
2.4、Kafka启动
由于Kafka依赖于Zookeeper,所以要先启动Zookeeper,再启动Kafka。
2.4.1、先启动Zookeeper服务
管理员权限打开命令窗口,输入命令zkServer,启动Zookeeper服务:
zkServer
显示如下信息,则表示Zookeeper服务正常运行:
编辑
2.4.2、再启动Kafka服务
管理员权限打开命令窗口,进入到Kafka安装目录(D:\bigdata\kafka\2.12-3.5.1)。
输入如下命令启动Kafka服务:
.\bin\windows\kafka-server-start.bat .\config\server.properties
显示如下信息,则表示Kafka服务正常运行:
编辑
2.4、Kafka相关操作(Kafka新版本命令)
Kafka2.2之后版本中使用–zookeeper hadoop01:2181会出现报错情况,2.2之后的版本使用了–bootstrap-server hadoop01:9092来替换–zookeeper hadoop01:2181
2.4.1、创建topics
以管理员权限新开一个命令提示窗口,进入D:\bigdata\kafka\2.12-3.5.1\bin\windows目录,执行以下命令,创建topics:
kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
2.4.2、查看topics
查看topics列表:
kafka-topics.bat --bootstrap-server localhost:9092 --list
2.4.3、打开一个producer(生产者)
以管理员权限新开一个命令提示窗口,进入D:\bigdata\kafka\2.12-3.5.1\bin\windows目录,
执行以下命令,打开一个producer(生产者):
编辑
kafka-console-producer.bat --broker-list localhost:9092 --topic test
2.4.4、打开一个consumer(消费者)
以管理员权限新开一个命令提示窗口,进入D:\bigdata\kafka\2.12-3.5.1\bin\windows目录,执行以下命令,打开一个consumer(消费者):
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
2.4.5、测试发送和接受消息
以上打开的窗口不要关闭,然后就可以在producer(生产者)控制台窗口输入消息并回车。在消息输入过后,很快consumer(消费者)窗口就会显示出producer(生产者)发送的消息。
2.4.5.1、producer(生产者)发送消息
在producer(生产者)控制台窗口输入消息:
编辑
2.4.5.2、consumer(消费者)接收消息
在consumer(消费者)控制台窗口查看消息:
编辑
我们发现,producer(生产者)发送的消息被consumer(消费者)接受到了。
这里乱码是字符集的问题。
java代码
yaml文件:
dinst:
kafka:
bootstrap-servers: localhost:9092
alarm-topic: test5
consumer-group: alarm
config
package com.derlte.dinstrest.config; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; /** * 启用Spring的Kafka基础设施,以便使用{@link org.springframework.kafka.annotation.KafkaListener} * 注解无需任何手动工厂bean即可处理。 */ @Configuration @EnableKafka public class KafkaConsumerConfiguration { }
它的作用是在 Spring Boot 启 动时显式开启 Spring for Apache Kafka 的监听基础设施,让容器知道要扫描并代理. @KafkaListener 方法。
有了 @EnableKafka,Spring 会自动注册 KafkaListenerAnnotationBeanPostProcessor 等 Bean,帮你把 KafkaTelemetryConsumer#handleTelemetry 织入真正的 Kafka 消费循环;否则这个注解方法只会当作普通 Bean 方法,不会去监听 topic。
由于项目对消费端没有特殊需求(比如自定义ConcurrentKafkaListenerContainerFactory),这个配置类本身不需要额外内容,只要保证 Kafka 自动配置被启用即可
消费者模拟
package com.derlte.dinstrest.kafka; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; import java.io.UnsupportedEncodingException; import java.nio.charset.StandardCharsets; /** * 最简单的Kafka消费者,仅记录有效载荷。 */ @Component public class KafkaTelemetryConsumer { private static final Logger log = LoggerFactory.getLogger(KafkaTelemetryConsumer.class); @KafkaListener( topics = "${dinst.kafka.alarm-topic}", groupId = "${dinst.kafka.consumer-group}" ) public void handleTelemetry(ConsumerRecord<String, String> record) throws UnsupportedEncodingException { log.info("收到 Kafka 消息, topic={}, partition={}, offset={}, payload={}", record.topic(), record.partition(), record.offset(), new String(record.value().getBytes(StandardCharsets.ISO_8859_1), "GBK")); // TODO 这里可以把消息转交给后续业务逻辑 } }
topics = "${dinst.kafka.alarm-topic}" 让监听主题由配置文件决定(即application.yml 里 dinst.kafka.alarm-topic,当前值是 test)。
groupId = "${dinst.kafka.consumer-group}" 指定消费组,同样从配置里读取(现在在 application.yml 中加了 dinst.kafka.consumer-group: alarm)。
src/main/resources/application.yml:123-129:在 dinst.kafka 节点下配置了 bootstrap-servers(Kafka 集群地址)、alarm- topic(监听的 topic)、consumer-group。
这些属性既供 KafkaTelemetryConsumer 使用,也可让其他模块(例如未来的 Kafkaproducer)共享。
配合 src/main/java/com/derlte/dinstrest/DinstRestApplication.java:5-11 的 SpringBoot 启动类,整个项目启动后就会自动连接 Kafka 并监听 test 主题。当前逻辑的“作用”就是在收到 Kafka 消息时记录日志,方便你验证消费链路通不通。
kafka乱码解决
kafka-console-producer命令可以通过–property参数设置消息发送的编码格式。具体设置方法如下:
- 在命令行中输入以下命令:
kafka-console-producer --broker-list <broker_list> --topic <topic> --property value.serializer=org.apache.kafka.common.serialization.StringSerializer --property value.encoding=<encoding>
编辑
其中,<broker_list>是Kafka集群中的broker地址列表,是要发送消息的主题名,是要使用的编码格式,例如UTF-8、GBK等。
执行命令后,即可使用指定的编码格式发送消息。例如,以下命令可以使用UTF-8编码格式发送消息:
kafka-console-producer --broker-list localhost:9092 --topic test --property value.serializer=org.apache.kafka.common.serialization.StringSerializer --property value.encoding=UTF-8
注意,如果要发送的消息中包含非ASCII字符,需要使用支持该字符集的编码格式。
消费者参数配置
kafka-console-consumer如何设置接收消息的编码格式
kafka-console-consumer 命令可以通过 --property 参数来设置接收消息的编码格式。具体来说,可以通过以下命令来设置接收消息的编码格式为 UTF-8:
kafka-console-consumer --bootstrap-server <server>:<port> --topic <topic> --from-beginning --property print.key=true --property print.value=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property consumer.encoding=UTF-8
其中,–property consumer.encoding=UTF-8 参数用于设置接收消息的编码格式为 UTF-8。如果需要设置其他编码格式,只需要将 UTF-8 替换为对应的编码格式即可。