Kafka 从安装到应用

简介: Kafka 从安装到应用

博主介绍: ✌博主从事应用安全和大数据领域,有8年研发经验,5年面试官经验,Java技术专家✌

Java知识图谱点击链接:体系化学习Java(Java面试专题)

💕💕 感兴趣的同学可以收藏关注下不然下次找不到哟💕💕

1687774773319.jpg

1、什么是 Kafka

2、Kafka 安装教程

安装前linux上需要安装jdk,这个步骤不做赘述了。并且还要安装一个 Zookeeper。

Kafka的下载地址在官网上可以找到:https://kafka.apache.org/downloads
在该网页上可以找到各个版本的Kafka安装包下载链接,包括源码和二进制包。建议选择最新版本的二进制包进行下载。

linux 在线下载使用wget命令下载Kafka安装包:

wget https://downloads.apache.org/kafka/3.5.0/kafka_2.12-3.5.0.tgz

下载日志如下:


[root@ecs-32f7 software]# wget https://downloads.apache.org/kafka/3.5.0/kafka_2.12-3.5.0.tgz
--2023-06-26 16:59:00--  https://downloads.apache.org/kafka/3.5.0/kafka_2.12-3.5.0.tgz
Resolving downloads.apache.org (downloads.apache.org)... 88.99.95.219, 135.181.214.104, 2a01:4f9:3a:2c57::2, ...
Connecting to downloads.apache.org (downloads.apache.org)|88.99.95.219|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 107000763 (102M) [application/x-gzip]
Saving to: ‘kafka_2.12-3.5.0.tgz’

100%[===================================================================================================================================>] 107,000,763 1.90MB/s   in 4m 26s

2023-06-26 17:03:31 (393 KB/s) - ‘kafka_2.12-3.5.0.tgz’ saved [107000763/107000763]

[root@ecs-32f7 software]#

接下安装包:

tar -zxvf /root/software/kafka_2.12-3.5.0.tgz

修改配置文件

vim /root/software/kafka_2.12-3.5.0/config/server.properties
broker.id=0
listeners=PLAINTEXT://localhost:9092
log.dirs=/root/software/kafka_2.12-3.5.0/kafka-logs

启动

cd /root/software/kafka_2.12-3.5.0
bin/kafka-server-start.sh config/server.properties &

3、Kafka 常用的几个命令介绍

创建 topic

./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

使用Kafka的命令行工具 kafka-console-producer.sh 可以向Kafka发送消息。执行以下命令启动消息生产者:

./kafka-console-producer.sh --broker-list localhost:9092 --topic test

使用Kafka的命令行工具 kafka-console-consumer.sh 可以接收Kafka中的消息。执行以下命令启动消息消费者:

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

查看消费者组 lag 的信息

./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group <group_name>

看某个消费者的详细信息,可以执行以下命令:

./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group <group_name> --members --verbose

4、Kafka 的应用场景

Kafka是一个高性能、可扩展、分布式的消息队列系统,常用于以下场景:

  1. 数据收集与处理:Kafka可以作为数据收集和处理的中间件,用于收集和传输大量的数据,同时支持数据流处理和批处理等多种模式,非常适合大数据场景下的数据处理和分析。
  2. 消息系统:Kafka可以作为消息系统,用于支持实时的消息传递和处理,比如实时日志处理、实时监控和告警等场景。
  3. 数据存储:Kafka提供了高可靠性的数据存储机制,可以用于存储各种类型的数据,比如日志、事件、消息等,同时支持数据的持久化和复制,非常适合高可靠性和高可用性的数据存储场景。
  4. 流处理:Kafka提供了流处理API,可以用于实时处理数据流,支持流与流之间的连接和数据转换,非常适合实时数据处理和分析场景。
  5. 消息队列:Kafka本身就是一个消息队列系统,可以用于支持各种类型的消息队列应用,比如任务队列、通知队列、消息推送等场景。

总之,Kafka具有很广泛的应用场景,尤其是在大数据、实时计算和分布式系统等领域有着广泛的应用。

5、Kafka 在 java 上的基础应用代码

生产者代码如下:

package com.pany.camp.kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

/**
 *
 * @description:  Kafka 生产者
 * @copyright: @Copyright (c) 2022 
 * @company: Aiocloud
 * @author: pany
 * @version: 1.0.0 
 * @createTime: 2023-06-26 18:06
 */
public class KafkaProducerExample {
   
   
    private static final String TOPIC_NAME = "test-topic";
    private static final String BOOTSTRAP_SERVERS = "localhost:9092";

    public static void main(String[] args) {
   
   
        Properties props = new Properties();
        props.put("bootstrap.servers", BOOTSTRAP_SERVERS);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 10; i++) {
   
   
            String message = "Hello, Kafka! This is message " + i;
            ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, message);
            producer.send(record);
        }
        producer.close();
    }
}

消费者代码如下:

package com.pany.camp.kafka;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

/**
 *
 * @description: Kafka 消费者
 * @copyright: @Copyright (c) 2022 
 * @company: Aiocloud
 * @author: pany
 * @version: 1.0.0 
 * @createTime: 2023-06-26 18:04
 */
public class KafkaConsumerExample {
   
   
    private static final String TOPIC_NAME = "test-topic";
    private static final String BOOTSTRAP_SERVERS = "localhost:9092";
    private static final String GROUP_ID = "test-group";

    public static void main(String[] args) {
   
   
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        Consumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList(TOPIC_NAME));
        while (true) {
   
   
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            records.forEach(record -> {
   
   
                System.out.printf("Received message: key=%s, value=%s, partition=%d, offset=%d\n",
                        record.key(), record.value(), record.partition(), record.offset());
            });
        }
    }
}

以上需要引入依赖


<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.8.0</version>
</dependency>

6、Kafka 在 SpringBoot 上的应用代码

生产者

package com.pany.camp.kafka;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
 *
 * @description:  生产者
 * @copyright: @Copyright (c) 2022 
 * @company: Aiocloud
 * @author: pany
 * @version: 1.0.0 
 * @createTime: 2023-06-26 18:10
 */
@Component
public class KafkaProducer {
   
   

    @Resource
    private KafkaTemplate<String, String> kafkaTemplate;

    public void sendMessage(String topic, String message) {
   
   
        kafkaTemplate.send(topic, message);
    }
}

消费者

package com.pany.camp.kafka;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

/**
 *
 * @description:  消费者
 * @copyright: @Copyright (c) 2022 
 * @company: Aiocloud
 * @author: pany
 * @version: 1.0.0 
 * @createTime: 2023-06-26 18:10 
 */
@Component
public class KafkaConsumer {
   
   
    @KafkaListener(topics = "test_topic")
    public void receiveMessage(String message) {
   
   
        System.out.println("Received message: " + message);
    }
}

配置文件 properties 配置

spring.kafka.bootstrap-servers=localhost:9092

下面是一个发送消息的例子

package com.pany.camp.kafka;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

/**
 *
 * @description:  发送消息
 * @copyright: @Copyright (c) 2022 
 * @company: Aiocloud
 * @author: pany
 * @version: 1.0.0 
 * @createTime: 2023-06-26 18:12
 */
@RestController
public class KafkaController {
   
   
    @Autowired
    private KafkaProducer kafkaProducer;

    @GetMapping("/send/{message}")
    public String sendMessage(@PathVariable String message) {
   
   
        kafkaProducer.sendMessage("test_topic", message);
        return "Message sent: " + message;
    }
}

1686494501743.jpg

💕💕 本文由激流丶创作,原创不易,感谢支持!
💕💕喜欢的话记得点赞收藏啊!

目录
相关文章
|
11天前
|
消息中间件 监控 数据可视化
Linux安装Kafka图形化界面
Linux安装Kafka图形化界面
24 4
|
2月前
|
消息中间件 分布式计算 监控
Python面试:消息队列(RabbitMQ、Kafka)基础知识与应用
【4月更文挑战第18天】本文探讨了Python面试中RabbitMQ与Kafka的常见问题和易错点,包括两者的基础概念、特性对比、Python客户端使用、消息队列应用场景及消息可靠性保证。重点讲解了消息丢失与重复的避免策略,并提供了实战代码示例,帮助读者提升在分布式系统中使用消息队列的能力。
79 2
|
2月前
|
消息中间件 Java Kafka
Kafka【环境搭建 01】kafka_2.12-2.6.0 单机版安装+参数配置及说明+添加到service服务+开机启动配置+验证+chkconfig配置说明(一篇入门kafka)
【2月更文挑战第19天】Kafka【环境搭建 01】kafka_2.12-2.6.0 单机版安装+参数配置及说明+添加到service服务+开机启动配置+验证+chkconfig配置说明(一篇入门kafka)
91 1
|
11天前
|
消息中间件 存储 Ubuntu
Linux安装kafka3.5.1
Linux安装kafka3.5.1
18 2
|
6天前
|
消息中间件 Java Kafka
Spring Boot与Kafka的集成应用
Spring Boot与Kafka的集成应用
|
7天前
|
消息中间件 运维 Serverless
Serverless 应用引擎产品使用合集之如何触发kafka
阿里云Serverless 应用引擎(SAE)提供了完整的微服务应用生命周期管理能力,包括应用部署、服务治理、开发运维、资源管理等功能,并通过扩展功能支持多环境管理、API Gateway、事件驱动等高级应用场景,帮助企业快速构建、部署、运维和扩展微服务架构,实现Serverless化的应用部署与运维模式。以下是对SAE产品使用合集的概述,包括应用管理、服务治理、开发运维、资源管理等方面。
|
9天前
|
消息中间件 存储 Kafka
Kafka 2.13-3.7.0 在 Windows 上的安装与配置指南
Kafka 2.13-3.7.0 在 Windows 上的安装与配置指南
14 0
|
9天前
|
消息中间件 Java Kafka
Spring Boot与Kafka的集成应用
Spring Boot与Kafka的集成应用
|
12天前
|
消息中间件 存储 大数据
深度分析:Apache Kafka及其在大数据处理中的应用
Apache Kafka是高吞吐、低延迟的分布式流处理平台,常用于实时数据流、日志收集和事件驱动架构。与RabbitMQ(吞吐量有限)、Pulsar(多租户支持但生态系统小)和Amazon Kinesis(托管服务,成本高)对比,Kafka在高吞吐和持久化上有优势。适用场景包括实时处理、数据集成、日志收集和消息传递。选型需考虑吞吐延迟、持久化、协议支持等因素,使用时注意资源配置、数据管理、监控及安全性。
|
2月前
|
消息中间件 存储 传感器
Kafka消息队列原理及应用详解
【5月更文挑战第6天】Apache Kafka是高性能的分布式消息队列,常用于实时数据管道和流应用。它提供高性能、持久化、分布式和可伸缩的消息处理,支持解耦、异步通信和流量控制。Kafka的核心概念包括Broker、Topic、Partition、Producer、Consumer和Consumer Group。其特点是高吞吐、低延迟、数据持久化、分布式架构和容错性。常见应用包括实时数据流处理、日志收集、消息传递和系统间数据交换。