(三)kafka从入门到精通之使用场景

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: Kafka 是一种流处理平台,主要用于处理大量数据流,如实时事件、日志文件和传感器数据等。Kafka的目的是实现高吞吐量、低延迟和高可用性的数据处理。Kafka提供了一个高度可扩展的架构,可以轻松地添加和删除节点,并且能够处理数百亿条消息/分区。Kafka的消息可以容错,即使某个节点失败,消息也会在集群中的其他节点上得到处理。总的来说,Kafka 是一个非常强大的数据处理平台,可以用于实时数据处理、日志文件处理、传感器数据处理和流处理等场景。

1、kafka简介

Kafka 是一种流处理平台,主要用于处理大量数据流,如实时事件、日志文件和传感器数据等。Kafka的目的是实现高吞吐量、低延迟和高可用性的数据处理。

Kafka提供了一个高度可扩展的架构,可以轻松地添加和删除节点,并且能够处理数百亿条消息/分区。Kafka的消息可以容错,即使某个节点失败,消息也会在集群中的其他节点上得到处理。

2、Kafka 的使用场景包括:

实时数据处理:

Kafka 非常适合处理实时事件,例如实时交易、实时搜索结果和实时推文等。Kafka 可以将数据快速地发布和订阅,从而实现实时处理。

日志文件处理:

Kafka 可以处理大量的日志文件,例如 Web 服务器日志、数据库日志和操作系统日志等。Kafka可以将日志文件的数据快速地发布和订阅,并且提供了多种聚合和分析日志数据的方法,例如使用 Apache Storm 或 ApacheFlink。

传感器数据处理:

Kafka 可以处理来自传感器的数据,例如温度、湿度和气压等传感器数据。Kafka可以将传感器数据快速地发布和订阅,并且可以将数据发送到分布式处理系统,例如 Apache Hadoop 或 ApacheSpark,进行处理。

流处理:

Kafka 可以作为流处理系统,例如 Apache Nifi 或 Apache Beam 的底层存储系统。Kafka可以将数据流快速地发布和订阅,并且可以支持多种流处理模式,例如按时间排序、字段过滤和路由规则。

分布式消息队列:

Kafka 可以作为分布式消息队列,用于多个应用程序之间的数据传输。Kafka提供了多种消息传输模式,例如点对点模式、多主节点模式和发布/订阅模式。

数据备份:

Kafka 可以作为数据备份系统,用于备份数据到多个节点上。Kafka 可以将数据发布到多个主题中,从而实现数据备份。

3、简单使用

Kafka 的使用非常简单,可以使用多种语言来编写客户端库和消费者。以下是使用 Java 客户端库的示例:

首先,您需要在项目中添加 Kafka 依赖项:

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>2.8.0</version>
</dependency>

然后,您需要编写一个生产者,以将消息发布到指定的主题中:

package com.yinfeng.test.demo.kafka;

import lombok.SneakyThrows;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

/**
 * @author admin
 * @date 2023/7/2 19:02
 * @description
 */
public class KafkaProducerDemo {
   
   
    @SneakyThrows
    public static void main(String[] args) {
   
   
        Properties props = new Properties();
        // Kafka 集群地址
        props.put("bootstrap.servers", "localhost:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);

        // 发送3条消息
        for (int i = 0; i < 3; i++) {
   
   
            ProducerRecord<String, String> record1 = new ProducerRecord<>("test", "key"+i, "hello"+i);
            producer.send(record1, (metadata, exception) -> {
   
   
                System.out.println("消息发送成功 topic="+metadata.topic()+", msg=>" + record1.value());
            });
        }

        // kafka异步发送,延时等待执行完成
        Thread.sleep(5000);

    }
}

执行之后可在控制台看到3条消息已发送
在这里插入图片描述

最后,您需要编写一个消费者,以从指定的主题中接收消息:

package com.yinfeng.test.demo.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

/**
 * @author admin
 * @date 2023/7/2 19:02
 * @description
 */
public class KafkaConsumerDemo {
   
   
    public static void main(String[] args) {
   
   
        Properties props = new Properties();
        // Kafka 集群地址
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "my_group");
        props.put("auto.offset.reset", "earliest");
        props.put("enable.auto.commit", "true");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props, new StringDeserializer(), new StringDeserializer());

        consumer.subscribe(Collections.singleton("test"));

        // 循环拉取消息
        while (true){
   
   
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> record : records) {
   
   
                System.out.println("Received message: " + record.value());
            }
        }

    }
}

在这里插入图片描述

以上示例只是一个简单的例子,Kafka 还提供了更多的功能和配置选项,例如订阅多个主题、设置消息过滤器和消息压缩等。

4、总结

总的来说,Kafka 是一个非常强大的数据处理平台,可以用于实时数据处理、日志文件处理、传感器数据处理和流处理等场景。其使用简单、功能丰富,并且可以扩展到数百亿条消息/分区,适用于各种大规模的数据处理场景。

使用 Kafka 需要一定的学习和配置,但是一旦您熟悉了其使用方法,Kafka 将会成为您的一个得力工具,可以提高您的工作效率并为您的业务增添价值。 如果您对使用 Kafka 有疑问或遇到问题,可以查看 Kafka 的官方文档和社区,或者使用 Kafka 提供的丰富的客户端库和文档,包括 Apache Kafka、Apache Confluent Kafka 和 Confluent Kubernetes 等。

总之,Kafka 是一种高性能、可扩展、易于使用的数据处理平台,可以广泛应用于实时数据处理、日志文件处理、传感器数据处理和流处理等领域。

目录
相关文章
|
1月前
|
消息中间件 Java Kafka
kafka入门demo
kafka入门demo
43 0
|
1月前
|
消息中间件 分布式计算 Kafka
SparkStreaming(SparkStreaming概述、入门、Kafka数据源、DStream转换、输出、关闭)
SparkStreaming(SparkStreaming概述、入门、Kafka数据源、DStream转换、输出、关闭)(一)
|
1月前
|
消息中间件 Java Kafka
Kafka【环境搭建 01】kafka_2.12-2.6.0 单机版安装+参数配置及说明+添加到service服务+开机启动配置+验证+chkconfig配置说明(一篇入门kafka)
【2月更文挑战第19天】Kafka【环境搭建 01】kafka_2.12-2.6.0 单机版安装+参数配置及说明+添加到service服务+开机启动配置+验证+chkconfig配置说明(一篇入门kafka)
88 1
|
1月前
|
消息中间件 存储 Kafka
Kafka【基础入门】
Kafka【基础入门】
38 1
|
1月前
|
消息中间件 存储 分布式计算
Apache Kafka-初体验Kafka(01)-入门整体认识kafka
Apache Kafka-初体验Kafka(01)-入门整体认识kafka
50 0
|
1月前
|
消息中间件 算法 Kafka
Kafka入门,这一篇就够了(安装,topic,生产者,消费者)
Kafka入门,这一篇就够了(安装,topic,生产者,消费者)
161 0
|
9月前
|
消息中间件 存储 Kafka
(四)kafka从入门到精通之安装教程
Kafka是一个高性能、低延迟、分布式的分布式数据库,可以在分布式环境中实现数据的实时同步和分发。Zookeeper是一种开源的分布式数据存储系统,它可以在分布式环境中存储和管理数据库中的数据。它的主要作用是实现数据的实时同步和分发,可以用于实现分布式数据库、分布式文件系统、分布式日志系统等。Zookeeper的设计目标是高可用性、高性能、低延迟,它支持多种客户端协议,包括TCP和HTTP,可以方便地与其他分布式系统进行集成。
107 0
|
14天前
|
消息中间件 存储 Kafka
实时计算 Flink版产品使用问题之通过flink同步kafka数据进到doris,decimal数值类型的在kafka是正常显示数值,但是同步到doris表之后数据就变成了整数,该如何处理
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
14天前
|
消息中间件 存储 Kafka
实时计算 Flink版产品使用问题之 从Kafka读取数据,并与两个仅在任务启动时读取一次的维度表进行内连接(inner join)时,如果没有匹配到的数据会被直接丢弃还是会被存储在内存中
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
12天前
|
消息中间件 Java 关系型数据库
实时计算 Flink版操作报错合集之从 PostgreSQL 读取数据并写入 Kafka 时,遇到 "initial slot snapshot too large" 的错误,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
629 0