(三)kafka从入门到精通之使用场景

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: Kafka 是一种流处理平台,主要用于处理大量数据流,如实时事件、日志文件和传感器数据等。Kafka的目的是实现高吞吐量、低延迟和高可用性的数据处理。Kafka提供了一个高度可扩展的架构,可以轻松地添加和删除节点,并且能够处理数百亿条消息/分区。Kafka的消息可以容错,即使某个节点失败,消息也会在集群中的其他节点上得到处理。总的来说,Kafka 是一个非常强大的数据处理平台,可以用于实时数据处理、日志文件处理、传感器数据处理和流处理等场景。

1、kafka简介

Kafka 是一种流处理平台,主要用于处理大量数据流,如实时事件、日志文件和传感器数据等。Kafka的目的是实现高吞吐量、低延迟和高可用性的数据处理。

Kafka提供了一个高度可扩展的架构,可以轻松地添加和删除节点,并且能够处理数百亿条消息/分区。Kafka的消息可以容错,即使某个节点失败,消息也会在集群中的其他节点上得到处理。

2、Kafka 的使用场景包括:

实时数据处理:

Kafka 非常适合处理实时事件,例如实时交易、实时搜索结果和实时推文等。Kafka 可以将数据快速地发布和订阅,从而实现实时处理。

日志文件处理:

Kafka 可以处理大量的日志文件,例如 Web 服务器日志、数据库日志和操作系统日志等。Kafka可以将日志文件的数据快速地发布和订阅,并且提供了多种聚合和分析日志数据的方法,例如使用 Apache Storm 或 ApacheFlink。

传感器数据处理:

Kafka 可以处理来自传感器的数据,例如温度、湿度和气压等传感器数据。Kafka可以将传感器数据快速地发布和订阅,并且可以将数据发送到分布式处理系统,例如 Apache Hadoop 或 ApacheSpark,进行处理。

流处理:

Kafka 可以作为流处理系统,例如 Apache Nifi 或 Apache Beam 的底层存储系统。Kafka可以将数据流快速地发布和订阅,并且可以支持多种流处理模式,例如按时间排序、字段过滤和路由规则。

分布式消息队列:

Kafka 可以作为分布式消息队列,用于多个应用程序之间的数据传输。Kafka提供了多种消息传输模式,例如点对点模式、多主节点模式和发布/订阅模式。

数据备份:

Kafka 可以作为数据备份系统,用于备份数据到多个节点上。Kafka 可以将数据发布到多个主题中,从而实现数据备份。

3、简单使用

Kafka 的使用非常简单,可以使用多种语言来编写客户端库和消费者。以下是使用 Java 客户端库的示例:

首先,您需要在项目中添加 Kafka 依赖项:

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>2.8.0</version>
</dependency>

然后,您需要编写一个生产者,以将消息发布到指定的主题中:

package com.yinfeng.test.demo.kafka;

import lombok.SneakyThrows;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

/**
 * @author admin
 * @date 2023/7/2 19:02
 * @description
 */
public class KafkaProducerDemo {
   
   
    @SneakyThrows
    public static void main(String[] args) {
   
   
        Properties props = new Properties();
        // Kafka 集群地址
        props.put("bootstrap.servers", "localhost:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);

        // 发送3条消息
        for (int i = 0; i < 3; i++) {
   
   
            ProducerRecord<String, String> record1 = new ProducerRecord<>("test", "key"+i, "hello"+i);
            producer.send(record1, (metadata, exception) -> {
   
   
                System.out.println("消息发送成功 topic="+metadata.topic()+", msg=>" + record1.value());
            });
        }

        // kafka异步发送,延时等待执行完成
        Thread.sleep(5000);

    }
}

执行之后可在控制台看到3条消息已发送
在这里插入图片描述

最后,您需要编写一个消费者,以从指定的主题中接收消息:

package com.yinfeng.test.demo.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

/**
 * @author admin
 * @date 2023/7/2 19:02
 * @description
 */
public class KafkaConsumerDemo {
   
   
    public static void main(String[] args) {
   
   
        Properties props = new Properties();
        // Kafka 集群地址
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "my_group");
        props.put("auto.offset.reset", "earliest");
        props.put("enable.auto.commit", "true");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props, new StringDeserializer(), new StringDeserializer());

        consumer.subscribe(Collections.singleton("test"));

        // 循环拉取消息
        while (true){
   
   
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> record : records) {
   
   
                System.out.println("Received message: " + record.value());
            }
        }

    }
}

在这里插入图片描述

以上示例只是一个简单的例子,Kafka 还提供了更多的功能和配置选项,例如订阅多个主题、设置消息过滤器和消息压缩等。

4、总结

总的来说,Kafka 是一个非常强大的数据处理平台,可以用于实时数据处理、日志文件处理、传感器数据处理和流处理等场景。其使用简单、功能丰富,并且可以扩展到数百亿条消息/分区,适用于各种大规模的数据处理场景。

使用 Kafka 需要一定的学习和配置,但是一旦您熟悉了其使用方法,Kafka 将会成为您的一个得力工具,可以提高您的工作效率并为您的业务增添价值。 如果您对使用 Kafka 有疑问或遇到问题,可以查看 Kafka 的官方文档和社区,或者使用 Kafka 提供的丰富的客户端库和文档,包括 Apache Kafka、Apache Confluent Kafka 和 Confluent Kubernetes 等。

总之,Kafka 是一种高性能、可扩展、易于使用的数据处理平台,可以广泛应用于实时数据处理、日志文件处理、传感器数据处理和流处理等领域。

目录
相关文章
|
12月前
|
测试技术 Swift
Swift代码审查的关键点及最佳实践
Swift代码审查的关键点及最佳实践
309 59
|
10月前
|
数据挖掘 UED
WebSocket在实时体育比分网站中的应用
WebSocket 在实时体育比分网站中用于实时比分更新、动态赛事信息推送、交互式功能(如即时聊天和投票)、赛程提醒与推送通知、比分预测与数据分析,以及多平台支持。通过持久连接,服务器可即时推送比分变化、球员动态、比赛状态等信息,减少延迟并提升用户体验。同时,WebSocket 支持双向通信,使用户能实时互动,确保跨平台的实时数据同步。
166 10
|
8月前
|
人工智能 开发者
AI Forward: Alibaba Cloud Developer Summit 2025 开放注册中
AI Forward: Alibaba Cloud Developer Summit 2025 开放注册中
|
网络安全 Docker 容器
【Bug修复】秒杀服务器异常,轻松恢复网站访问--从防火墙到Docker服务的全面解析
【Bug修复】秒杀服务器异常,轻松恢复网站访问--从防火墙到Docker服务的全面解析
490 0
|
网络协议 应用服务中间件 nginx
FFmpeg错误笔记(一):nginx-rtmp-module推流出现 Server error: Already publishing
这篇文章讨论了在使用nginx-rtmp-module进行RTMP推流时遇到的“Server error: Already publishing”错误,分析了错误原因,并提供了详细的解决办法,包括修改nginx配置文件和终止异常的TCP连接。
609 0
FFmpeg错误笔记(一):nginx-rtmp-module推流出现 Server error: Already publishing
|
Java 关系型数据库 MySQL
【Java用法】windows10系统下修改jar中的文件并重新打包成jar文件然后运行
【Java用法】windows10系统下修改jar中的文件并重新打包成jar文件然后运行
837 0
|
Android开发
【苹果安卓通用】xlsx 和 vCard 文件转换器,txt转vCard文件格式,CSV转 vCard格式,如何批量号码导入手机通讯录,一篇文章说全
本文介绍了如何快速将批量号码导入手机通讯录,适用于企业客户管理、营销团队、活动组织、团队协作和新员工入职等场景。步骤包括:1) 下载软件,提供腾讯云盘和百度网盘链接;2) 打开软件,复制粘贴号码并进行加载预览和制作文件;3) 将制作好的文件通过QQ或微信发送至手机,然后按苹果、安卓或鸿蒙系统的指示导入。整个过程简便快捷,可在1分钟内完成。
758 6
|
Java 数据库连接 数据库
save() 和 saveOrUpdate() 方法有什么区别?
【8月更文挑战第21天】
519 0
|
机器学习/深度学习 编解码 人工智能
Transformer 和扩散模型的生成式 AI 实用指南(预览版)
Transformer 和扩散模型的生成式 AI 实用指南(预览版)
832 1
Transformer 和扩散模型的生成式 AI 实用指南(预览版)
|
存储 NoSQL 数据库
Harbor 共享后端高可用-简单版
主机配置包括3台服务器,运行Harbor v2.10.0和Docker 24.0.5,其中10.0.90.68额外运行Postgres+Redis。基础安装配置中详细描述了Docker的安装步骤,包括添加仓库、安装、配置国内镜像源和启动Docker。安装postgres+redis服务使用docker-compose.yml文件,通过`docker-compose up -d`命令启动。最后,安装Harbor涉及修改harbor.yml配置文件,设置主机名、数据库和Redis连接信息,然后运行`install.sh`脚本。