kafka入门+代码初步实现--小白必看

本文涉及的产品
可观测可视化 Grafana 版,10个用户账号 1个月
应用实时监控服务-应用监控,每月50GB免费额度
可观测监控 Prometheus 版,每月50GB免费额度
简介: kafka入门+代码初步实现--小白必看

Kafka概述

定义:

Kafka传统定义: Kafka是一个分布式的基于发布/订阅模式的消息队列(Message Queue),主要应用于大数据实时处理领域。

发布/订阅:消息的发布者不会将消息直接发布给特定的订阅者,而是将发布的消息分为不同的类别,订阅者只接收感兴趣的消息。

Kafka最新定义:Kafka是一个开源的分布式事件流平台(Event Streaming Platform),被数千家公司用于高性能的数据管道、流分析、数据集成和关键任务应用。

 

image.gif 编辑

消息队列

目前企业中比较常见的消息队列产品主要有Kafka、ActiveMQ、RabbitMQ、RocketMQ等。

在大数据场景主要采用Kafka作为消息队列。在JavaEE开发中主要采用ActiveMQ、RabbitMQ、RocketMQ。

image.gif 编辑

kafka安装

一、准备工作

1、JDK安装(version:1.8)

1.1.1、JDK官网下载

官网下载地址(需要oracle账号)

https://www.oracle.com/java/technologies/javase/javase-jdk8-downloads.html

1.1.2、JDK网盘下载

或者网盘下载:jdk-8u381-windows-x64.exe

1.1.3、JDK安装

可以参考博文:【java】windows下安装jdk1.8详细图文操作说明(包会)

1.2、Zookeeper安装

1.2.1、Zookeeper官网下载

官网下载地址:https://www.apache.org/dyn/closer.lua/zookeeper/zookeeper-3.6.4/apache-zookeeper-3.6.4-bin.tar.gz

1.2.2、Zookeeper网盘下载

或者网盘下载:apache-zookeeper-3.6.4-bin.tar.gz

1.2.3、Zookeeper安装

安装方法就不赘述了。

参考博文:Windows下安装Zookeeper(图文记录详细步骤,手把手包安装成功)

1.3、Scala安装(version:2.12)

1.3.1、Scala官网下载

官网下载地址:

https://downloads.lightbend.com/scala/2.11.12/scala-2.11.12.msi

1.3.2、Scala网盘下载

或者网盘下载:scala-2.11.12.msi

1.3.3、Scala安装

安装方法就不赘述了。

可参考博文:Windows下安装Scala(以Scala 2.11.12为例)

二、Kafka安装(version:2.12-3.5.1)

version:2.12-3.5.1,表示Scala版本是2.12,Kafka版本是基于此的3.5.1版本。

2.1、Kafka官网下载

https://kafka.apache.org/downloads

image.gif 编辑

官网下载地址:kafka_2.12-3.5.1.tgz

2.2、Kafka网盘下载

网盘下载地址:kafka_2.12-3.5.1.tgz

2.3、Kafka安装

2.3.1、解压Kafka安装包到安装目录

这里解压到:D:\bigdata\kafka\2.12-3.5.1

2.3.2、Kafka安装目录下新建目录logs

image.gif 编辑

2.3.3、修改Kafka配置文件 server.properties

文件路径:D:\bigdata\kafka\2.12-3.5.1\config\server.properties

2.3.3.1、修改 log.dirs 参数

修改 log.dirs 参数值,修改成上一步新建的logs文件夹。注意文件夹路径中是双左斜杠

log.dirs=D:\\bigdata\\kafka\\2.12-3.5.1\\logs

image.gif 编辑

2.3.3.2、修改 listeners 参数

修改 listeners 参数值。

listeners=PLAINTEXT://localhost:9092

2.4、Kafka启动

由于Kafka依赖于Zookeeper,所以要先启动Zookeeper,再启动Kafka。

2.4.1、先启动Zookeeper服务

管理员权限打开命令窗口,输入命令zkServer,启动Zookeeper服务:

zkServer

显示如下信息,则表示Zookeeper服务正常运行:

image.gif 编辑

2.4.2、再启动Kafka服务

管理员权限打开命令窗口,进入到Kafka安装目录(D:\bigdata\kafka\2.12-3.5.1)。

输入如下命令启动Kafka服务:

.\bin\windows\kafka-server-start.bat .\config\server.properties

显示如下信息,则表示Kafka服务正常运行:

image.gif 编辑

2.4、Kafka相关操作(Kafka新版本命令)

Kafka2.2之后版本中使用–zookeeper hadoop01:2181会出现报错情况,2.2之后的版本使用了–bootstrap-server hadoop01:9092来替换–zookeeper hadoop01:2181

2.4.1、创建topics

以管理员权限新开一个命令提示窗口,进入D:\bigdata\kafka\2.12-3.5.1\bin\windows目录,执行以下命令,创建topics:

kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test

2.4.2、查看topics

查看topics列表:

kafka-topics.bat --bootstrap-server localhost:9092 --list

 

2.4.3、打开一个producer(生产者)

以管理员权限新开一个命令提示窗口,进入D:\bigdata\kafka\2.12-3.5.1\bin\windows目录,

执行以下命令,打开一个producer(生产者):

image.gif 编辑

kafka-console-producer.bat --broker-list localhost:9092 --topic test

 

2.4.4、打开一个consumer(消费者)

以管理员权限新开一个命令提示窗口,进入D:\bigdata\kafka\2.12-3.5.1\bin\windows目录,执行以下命令,打开一个consumer(消费者):

kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning

2.4.5、测试发送和接受消息

以上打开的窗口不要关闭,然后就可以在producer(生产者)控制台窗口输入消息并回车。在消息输入过后,很快consumer(消费者)窗口就会显示出producer(生产者)发送的消息。

2.4.5.1、producer(生产者)发送消息

在producer(生产者)控制台窗口输入消息:

image.gif 编辑

2.4.5.2、consumer(消费者)接收消息

在consumer(消费者)控制台窗口查看消息:

image.gif 编辑

我们发现,producer(生产者)发送的消息被consumer(消费者)接受到了。

这里乱码是字符集的问题。

 

java代码

yaml文件:

dinst:

 kafka:

   bootstrap-servers: localhost:9092

   alarm-topic: test5

   consumer-group: alarm

config

package com.derlte.dinstrest.config;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
/**
 * 启用Spring的Kafka基础设施,以便使用{@link org.springframework.kafka.annotation.KafkaListener}
 * 注解无需任何手动工厂bean即可处理。
 */
@Configuration
@EnableKafka
public class KafkaConsumerConfiguration {
}

image.gif

它的作用是在 Spring Boot 启   动时显式开启 Spring for Apache Kafka 的监听基础设施,让容器知道要扫描并代理. @KafkaListener 方法。

有了 @EnableKafka,Spring 会自动注册 KafkaListenerAnnotationBeanPostProcessor 等 Bean,帮你把 KafkaTelemetryConsumer#handleTelemetry 织入真正的 Kafka 消费循环;否则这个注解方法只会当作普通 Bean 方法,不会去监听 topic。                

由于项目对消费端没有特殊需求(比如自定义ConcurrentKafkaListenerContainerFactory),这个配置类本身不需要额外内容,只要保证 Kafka 自动配置被启用即可

消费者模拟

package com.derlte.dinstrest.kafka;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
/**
 * 最简单的Kafka消费者,仅记录有效载荷。
 */
@Component
public class KafkaTelemetryConsumer {
    private static final Logger log = LoggerFactory.getLogger(KafkaTelemetryConsumer.class);
    @KafkaListener(
            topics = "${dinst.kafka.alarm-topic}",
            groupId = "${dinst.kafka.consumer-group}"
    )
    public void handleTelemetry(ConsumerRecord<String, String> record) throws UnsupportedEncodingException {
        log.info("收到 Kafka 消息, topic={}, partition={}, offset={}, payload={}",
                record.topic(), record.partition(), record.offset(), new String(record.value().getBytes(StandardCharsets.ISO_8859_1), "GBK"));
        // TODO 这里可以把消息转交给后续业务逻辑
    }
}

image.gif

topics = "${dinst.kafka.alarm-topic}" 让监听主题由配置文件决定(即application.yml 里 dinst.kafka.alarm-topic,当前值是 test)。

groupId = "${dinst.kafka.consumer-group}" 指定消费组,同样从配置里读取(现在在 application.yml 中加了 dinst.kafka.consumer-group: alarm)。

src/main/resources/application.yml:123-129:在 dinst.kafka 节点下配置了 bootstrap-servers(Kafka 集群地址)、alarm- topic(监听的 topic)、consumer-group。

这些属性既供 KafkaTelemetryConsumer 使用,也可让其他模块(例如未来的 Kafkaproducer)共享。

配合 src/main/java/com/derlte/dinstrest/DinstRestApplication.java:5-11 的 SpringBoot 启动类,整个项目启动后就会自动连接 Kafka 并监听 test 主题。当前逻辑的“作用”就是在收到 Kafka 消息时记录日志,方便你验证消费链路通不通。

kafka乱码解决

kafka-console-producer命令可以通过–property参数设置消息发送的编码格式。具体设置方法如下:

  1. 在命令行中输入以下命令:

kafka-console-producer --broker-list <broker_list> --topic <topic> --property value.serializer=org.apache.kafka.common.serialization.StringSerializer --property value.encoding=<encoding>

 

image.gif 编辑

其中,<broker_list>是Kafka集群中的broker地址列表,是要发送消息的主题名,是要使用的编码格式,例如UTF-8、GBK等。

执行命令后,即可使用指定的编码格式发送消息。例如,以下命令可以使用UTF-8编码格式发送消息:

kafka-console-producer --broker-list localhost:9092 --topic test --property value.serializer=org.apache.kafka.common.serialization.StringSerializer --property value.encoding=UTF-8

 

注意,如果要发送的消息中包含非ASCII字符,需要使用支持该字符集的编码格式。

消费者参数配置

kafka-console-consumer如何设置接收消息的编码格式

kafka-console-consumer 命令可以通过 --property 参数来设置接收消息的编码格式。具体来说,可以通过以下命令来设置接收消息的编码格式为 UTF-8:

kafka-console-consumer --bootstrap-server <server>:<port> --topic <topic> --from-beginning --property print.key=true --property print.value=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property consumer.encoding=UTF-8

 

其中,–property consumer.encoding=UTF-8 参数用于设置接收消息的编码格式为 UTF-8。如果需要设置其他编码格式,只需要将 UTF-8 替换为对应的编码格式即可。

目录
相关文章
|
2天前
|
云安全 人工智能 自然语言处理
|
9天前
|
数据采集 人工智能 自然语言处理
Meta SAM3开源:让图像分割,听懂你的话
Meta发布并开源SAM 3,首个支持文本或视觉提示的统一图像视频分割模型,可精准分割“红色条纹伞”等开放词汇概念,覆盖400万独特概念,性能达人类水平75%–80%,推动视觉分割新突破。
666 56
Meta SAM3开源:让图像分割,听懂你的话
|
6天前
|
搜索推荐 编译器 Linux
一个可用于企业开发及通用跨平台的Makefile文件
一款适用于企业级开发的通用跨平台Makefile,支持C/C++混合编译、多目标输出(可执行文件、静态/动态库)、Release/Debug版本管理。配置简洁,仅需修改带`MF_CONFIGURE_`前缀的变量,支持脚本化配置与子Makefile管理,具备完善日志、错误提示和跨平台兼容性,附详细文档与示例,便于学习与集成。
321 116
|
6天前
|
人工智能 Java API
Java 正式进入 Agentic AI 时代:Spring AI Alibaba 1.1 发布背后的技术演进
Spring AI Alibaba 1.1 正式发布,提供极简方式构建企业级AI智能体。基于ReactAgent核心,支持多智能体协作、上下文工程与生产级管控,助力开发者快速打造可靠、可扩展的智能应用。
|
21天前
|
域名解析 人工智能
【实操攻略】手把手教学,免费领取.CN域名
即日起至2025年12月31日,购买万小智AI建站或云·企业官网,每单可免费领1个.CN域名首年!跟我了解领取攻略吧~
|
9天前
|
机器学习/深度学习 人工智能 自然语言处理
AgentEvolver:让智能体系统学会「自我进化」
AgentEvolver 是一个自进化智能体系统,通过自我任务生成、经验导航与反思归因三大机制,推动AI从“被动执行”迈向“主动学习”。它显著提升强化学习效率,在更少参数下实现更强性能,助力智能体持续自我迭代。开源地址:https://github.com/modelscope/AgentEvolver
448 32
|
5天前
|
弹性计算 人工智能 Cloud Native
阿里云无门槛和有门槛优惠券解析:学生券,满减券,补贴券等优惠券领取与使用介绍
为了回馈用户与助力更多用户节省上云成本,阿里云会经常推出各种优惠券相关的活动,包括无门槛优惠券和有门槛优惠券。本文将详细介绍阿里云无门槛优惠券的领取与使用方式,同时也会概述几种常见的有门槛优惠券,帮助用户更好地利用这些优惠,降低云服务的成本。
278 133