kafka入门+代码初步实现--小白必看

简介: kafka入门+代码初步实现--小白必看

Kafka概述

定义:

Kafka传统定义: Kafka是一个分布式的基于发布/订阅模式的消息队列(Message Queue),主要应用于大数据实时处理领域。

发布/订阅:消息的发布者不会将消息直接发布给特定的订阅者,而是将发布的消息分为不同的类别,订阅者只接收感兴趣的消息。

Kafka最新定义:Kafka是一个开源的分布式事件流平台(Event Streaming Platform),被数千家公司用于高性能的数据管道、流分析、数据集成和关键任务应用。

 

image.gif 编辑

消息队列

目前企业中比较常见的消息队列产品主要有Kafka、ActiveMQ、RabbitMQ、RocketMQ等。

在大数据场景主要采用Kafka作为消息队列。在JavaEE开发中主要采用ActiveMQ、RabbitMQ、RocketMQ。

image.gif 编辑

kafka安装

一、准备工作

1、JDK安装(version:1.8)

1.1.1、JDK官网下载

官网下载地址(需要oracle账号)

https://www.oracle.com/java/technologies/javase/javase-jdk8-downloads.html

1.1.2、JDK网盘下载

或者网盘下载:jdk-8u381-windows-x64.exe

1.1.3、JDK安装

可以参考博文:【java】windows下安装jdk1.8详细图文操作说明(包会)

1.2、Zookeeper安装

1.2.1、Zookeeper官网下载

官网下载地址:https://www.apache.org/dyn/closer.lua/zookeeper/zookeeper-3.6.4/apache-zookeeper-3.6.4-bin.tar.gz

1.2.2、Zookeeper网盘下载

或者网盘下载:apache-zookeeper-3.6.4-bin.tar.gz

1.2.3、Zookeeper安装

安装方法就不赘述了。

参考博文:Windows下安装Zookeeper(图文记录详细步骤,手把手包安装成功)

1.3、Scala安装(version:2.12)

1.3.1、Scala官网下载

官网下载地址:

https://downloads.lightbend.com/scala/2.11.12/scala-2.11.12.msi

1.3.2、Scala网盘下载

或者网盘下载:scala-2.11.12.msi

1.3.3、Scala安装

安装方法就不赘述了。

可参考博文:Windows下安装Scala(以Scala 2.11.12为例)

二、Kafka安装(version:2.12-3.5.1)

version:2.12-3.5.1,表示Scala版本是2.12,Kafka版本是基于此的3.5.1版本。

2.1、Kafka官网下载

https://kafka.apache.org/downloads

image.gif 编辑

官网下载地址:kafka_2.12-3.5.1.tgz

2.2、Kafka网盘下载

网盘下载地址:kafka_2.12-3.5.1.tgz

2.3、Kafka安装

2.3.1、解压Kafka安装包到安装目录

这里解压到:D:\bigdata\kafka\2.12-3.5.1

2.3.2、Kafka安装目录下新建目录logs

image.gif 编辑

2.3.3、修改Kafka配置文件 server.properties

文件路径:D:\bigdata\kafka\2.12-3.5.1\config\server.properties

2.3.3.1、修改 log.dirs 参数

修改 log.dirs 参数值,修改成上一步新建的logs文件夹。注意文件夹路径中是双左斜杠

log.dirs=D:\\bigdata\\kafka\\2.12-3.5.1\\logs

image.gif 编辑

2.3.3.2、修改 listeners 参数

修改 listeners 参数值。

listeners=PLAINTEXT://localhost:9092

2.4、Kafka启动

由于Kafka依赖于Zookeeper,所以要先启动Zookeeper,再启动Kafka。

2.4.1、先启动Zookeeper服务

管理员权限打开命令窗口,输入命令zkServer,启动Zookeeper服务:

zkServer

显示如下信息,则表示Zookeeper服务正常运行:

image.gif 编辑

2.4.2、再启动Kafka服务

管理员权限打开命令窗口,进入到Kafka安装目录(D:\bigdata\kafka\2.12-3.5.1)。

输入如下命令启动Kafka服务:

.\bin\windows\kafka-server-start.bat .\config\server.properties

显示如下信息,则表示Kafka服务正常运行:

image.gif 编辑

2.4、Kafka相关操作(Kafka新版本命令)

Kafka2.2之后版本中使用–zookeeper hadoop01:2181会出现报错情况,2.2之后的版本使用了–bootstrap-server hadoop01:9092来替换–zookeeper hadoop01:2181

2.4.1、创建topics

以管理员权限新开一个命令提示窗口,进入D:\bigdata\kafka\2.12-3.5.1\bin\windows目录,执行以下命令,创建topics:

kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test

2.4.2、查看topics

查看topics列表:

kafka-topics.bat --bootstrap-server localhost:9092 --list

 

2.4.3、打开一个producer(生产者)

以管理员权限新开一个命令提示窗口,进入D:\bigdata\kafka\2.12-3.5.1\bin\windows目录,

执行以下命令,打开一个producer(生产者):

image.gif 编辑

kafka-console-producer.bat --broker-list localhost:9092 --topic test

 

2.4.4、打开一个consumer(消费者)

以管理员权限新开一个命令提示窗口,进入D:\bigdata\kafka\2.12-3.5.1\bin\windows目录,执行以下命令,打开一个consumer(消费者):

kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning

2.4.5、测试发送和接受消息

以上打开的窗口不要关闭,然后就可以在producer(生产者)控制台窗口输入消息并回车。在消息输入过后,很快consumer(消费者)窗口就会显示出producer(生产者)发送的消息。

2.4.5.1、producer(生产者)发送消息

在producer(生产者)控制台窗口输入消息:

image.gif 编辑

2.4.5.2、consumer(消费者)接收消息

在consumer(消费者)控制台窗口查看消息:

image.gif 编辑

我们发现,producer(生产者)发送的消息被consumer(消费者)接受到了。

这里乱码是字符集的问题。

 

java代码

yaml文件:

dinst:

 kafka:

   bootstrap-servers: localhost:9092

   alarm-topic: test5

   consumer-group: alarm

config

package com.derlte.dinstrest.config;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
/**
 * 启用Spring的Kafka基础设施,以便使用{@link org.springframework.kafka.annotation.KafkaListener}
 * 注解无需任何手动工厂bean即可处理。
 */
@Configuration
@EnableKafka
public class KafkaConsumerConfiguration {
}

image.gif

它的作用是在 Spring Boot 启   动时显式开启 Spring for Apache Kafka 的监听基础设施,让容器知道要扫描并代理. @KafkaListener 方法。

有了 @EnableKafka,Spring 会自动注册 KafkaListenerAnnotationBeanPostProcessor 等 Bean,帮你把 KafkaTelemetryConsumer#handleTelemetry 织入真正的 Kafka 消费循环;否则这个注解方法只会当作普通 Bean 方法,不会去监听 topic。                

由于项目对消费端没有特殊需求(比如自定义ConcurrentKafkaListenerContainerFactory),这个配置类本身不需要额外内容,只要保证 Kafka 自动配置被启用即可

消费者模拟

package com.derlte.dinstrest.kafka;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
/**
 * 最简单的Kafka消费者,仅记录有效载荷。
 */
@Component
public class KafkaTelemetryConsumer {
    private static final Logger log = LoggerFactory.getLogger(KafkaTelemetryConsumer.class);
    @KafkaListener(
            topics = "${dinst.kafka.alarm-topic}",
            groupId = "${dinst.kafka.consumer-group}"
    )
    public void handleTelemetry(ConsumerRecord<String, String> record) throws UnsupportedEncodingException {
        log.info("收到 Kafka 消息, topic={}, partition={}, offset={}, payload={}",
                record.topic(), record.partition(), record.offset(), new String(record.value().getBytes(StandardCharsets.ISO_8859_1), "GBK"));
        // TODO 这里可以把消息转交给后续业务逻辑
    }
}

image.gif

topics = "${dinst.kafka.alarm-topic}" 让监听主题由配置文件决定(即application.yml 里 dinst.kafka.alarm-topic,当前值是 test)。

groupId = "${dinst.kafka.consumer-group}" 指定消费组,同样从配置里读取(现在在 application.yml 中加了 dinst.kafka.consumer-group: alarm)。

src/main/resources/application.yml:123-129:在 dinst.kafka 节点下配置了 bootstrap-servers(Kafka 集群地址)、alarm- topic(监听的 topic)、consumer-group。

这些属性既供 KafkaTelemetryConsumer 使用,也可让其他模块(例如未来的 Kafkaproducer)共享。

配合 src/main/java/com/derlte/dinstrest/DinstRestApplication.java:5-11 的 SpringBoot 启动类,整个项目启动后就会自动连接 Kafka 并监听 test 主题。当前逻辑的“作用”就是在收到 Kafka 消息时记录日志,方便你验证消费链路通不通。

kafka乱码解决

kafka-console-producer命令可以通过–property参数设置消息发送的编码格式。具体设置方法如下:

  1. 在命令行中输入以下命令:

kafka-console-producer --broker-list <broker_list> --topic <topic> --property value.serializer=org.apache.kafka.common.serialization.StringSerializer --property value.encoding=<encoding>

 

image.gif 编辑

其中,<broker_list>是Kafka集群中的broker地址列表,是要发送消息的主题名,是要使用的编码格式,例如UTF-8、GBK等。

执行命令后,即可使用指定的编码格式发送消息。例如,以下命令可以使用UTF-8编码格式发送消息:

kafka-console-producer --broker-list localhost:9092 --topic test --property value.serializer=org.apache.kafka.common.serialization.StringSerializer --property value.encoding=UTF-8

 

注意,如果要发送的消息中包含非ASCII字符,需要使用支持该字符集的编码格式。

消费者参数配置

kafka-console-consumer如何设置接收消息的编码格式

kafka-console-consumer 命令可以通过 --property 参数来设置接收消息的编码格式。具体来说,可以通过以下命令来设置接收消息的编码格式为 UTF-8:

kafka-console-consumer --bootstrap-server <server>:<port> --topic <topic> --from-beginning --property print.key=true --property print.value=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property consumer.encoding=UTF-8

 

其中,–property consumer.encoding=UTF-8 参数用于设置接收消息的编码格式为 UTF-8。如果需要设置其他编码格式,只需要将 UTF-8 替换为对应的编码格式即可。

目录
相关文章
|
数据库连接 数据库
kettle开发篇-数据库查询
kettle开发篇-数据库查询
619 0
|
2月前
|
Kubernetes 应用服务中间件 API
Nginx Ingress 退役,详细版迁移指引来啦
Ingress NGINX 退役引发开发者们的强烈关注,官方已经提供了完备的应对措施,迁移到 Gateway API,以及20+ Ingress 控制器。但实施迁移的时候,企业还会希望了解新的 Ingress 控制器是否兼容 Ingress NGINX 的注解,迁移过程中如何进行灰度切流,遇到流量损失如何快速回滚等,以保障迁移过程平滑,不影响线上业务。因此,本文将提供基于实操的应对方案,以阿里云云原生 API 网关(Higress 企业版)为例,按步骤详细阐述迁移的操作过程。
449 21
|
2月前
|
JavaScript 前端开发 Java
2026版基于springboot的在线招聘管理系统
本文探讨了基于Web的在线招聘平台在当前社会经济环境下的发展背景、意义及研究现状。随着互联网技术进步,在线招聘平台通过大数据、人工智能等技术实现求职者与岗位的精准匹配,提升招聘效率与用户体验。国内外研究分别聚焦于功能优化、数据安全、国际化及新技术应用。系统采用SpringBoot、Java、Vue.js与MySQL等技术实现高效、稳定的招聘服务,推动人力资源管理数字化发展。
|
2月前
|
人工智能 BI 开发工具
适合个人开发者的5款开发工具,开发者必须知道
2025年,个人开发者迎来工具黄金时代。本文精选5款高效开发利器:GitHub Copilot(AI智能编程)、Trae(中文友好)、Cursor(项目级理解)、VS Code(开源全能)和Zoho Creator(低代码平台),覆盖从代码生成到应用搭建,助你提升效率,快速实现创意。
725 2
|
2月前
|
关系型数据库 MySQL 数据库
基于python的电子商城购物系统
本研究基于Flask与Vue.js构建前后端分离的电商管理系统,结合MySQL实现高效数据管理。系统具备商品管理、订单处理、用户交互等功能,提升运营效率与用户体验,具有良好的扩展性与维护性,助力电商企业应对激烈市场竞争,推动智能化发展。
|
2月前
|
人工智能 运维 安全
助力企业构建 AI 原生应用,函数计算FunctionAI 重塑模型服务与 Agent 全栈生态
在 AI 技术应用落地进程中,目前面临着五大核心挑战:开发/学习门槛过高,部署运维阶段复杂,AI 应用安全备受挑战,生态能力方面存在严重的割裂与锁定现象,同时资源成本高昂且利用率低下。这些挑战极大地阻碍了 AI 技术的广泛普及以及应用效率的有效提升。阿里云函数计算(FC)依托 Serverless AI 基础设施与全栈能力的创新突破,推出 Function AI(函数智能),精准攻克上述痛点问题,全面推动 AI 应用在开发至运维的全流程中实现降本增效。
|
3月前
|
数据采集 人工智能 自然语言处理
让跨境电商“懂文化”:AI内容生成在全球民族特色品类中的实践
本文提出并落地了一套基于大模型与民族文化知识库的民族品类智能识别与匹配方案,旨在解决跨境电商平台在服务穆斯林、印度裔等特定民族群体时面临的“供需错配”难题。
649 27
|
3月前
|
人工智能 前端开发 算法
大厂CIO独家分享:AI如何重塑开发者未来十年
在 AI 时代,若你还在紧盯代码量、执着于全栈工程师的招聘,或者仅凭技术贡献率来评判价值,执着于业务提效的比例而忽略产研价值,你很可能已经被所谓的“常识”困住了脚步。
1769 89
大厂CIO独家分享:AI如何重塑开发者未来十年
|
3月前
|
机器学习/深度学习 人工智能 缓存
让AI评测AI:构建智能客服的自动化运营Agent体系
大模型推动客服智能化演进,从规则引擎到RAG,再到AI原生智能体。通过构建“评估-诊断-优化”闭环的运营Agent,实现对话效果自动化评测与持续优化,显著提升服务质量和效率。
1856 86
让AI评测AI:构建智能客服的自动化运营Agent体系
|
2月前
|
JavaScript Java 关系型数据库
2026版基于springboot的大学生社团管理系统
本文探讨高校学生社团管理系统的研发背景与意义,分析当前国内研究现状,提出基于Spring Boot、Vue.js、MySQL及B/S架构的技术方案,旨在提升社团管理的信息化、智能化水平,推动校园文化可持续发展。