Kafka与Logstash的数据采集对接 —— 看图说话,从运行机制到部署

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
云原生网关 MSE Higress,422元/月
注册配置 MSE Nacos/ZooKeeper,118元/月
简介:

基于Logstash跑通Kafka还是需要注意很多东西,最重要的就是理解Kafka的原理。

Logstash工作原理

由于Kafka采用解耦的设计思想,并非原始的发布订阅,生产者负责产生消息,直接推送给消费者。而是在中间加入持久化层——broker,生产者把数据存放在broker中,消费者从broker中取数据。这样就带来了几个好处:

  • 1 生产者的负载与消费者的负载解耦
  • 2 消费者按照自己的能力fetch数据
  • 3 消费者可以自定义消费的数量

另外,由于broker采用了主题topic-->分区的思想,使得某个分区内部的顺序可以保证有序性,但是分区间的数据不保证有序性。这样,消费者可以以分区为单位,自定义读取的位置——offset。

Kafka采用zookeeper作为管理,记录了producer到broker的信息,以及consumer与broker中partition的对应关系。因此,生产者可以直接把数据传递给broker,broker通过zookeeper进行leader-->followers的选举管理;消费者通过zookeeper保存读取的位置offset以及读取的topic的partition分区信息。

由于上面的架构设计,使得生产者与broker相连;消费者与zookeeper相连。有了这样的对应关系,就容易部署logstash-->kafka-->logstash的方案了。

接下来,按照下面的步骤就可以实现logstash与kafka的对接了。

启动kafka

启动zookeeper:

$zookeeper/bin/zkServer.sh start

启动kafka:

$kafka/bin/kafka-server-start.sh $kafka/config/server.properties &

创建主题

创建主题:

$kafka/bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --create --topic hello --replication-factor 1 --partitions 1

查看主题:

$kafka/bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --describe

测试环境

执行生产者脚本:

$kafka/bin/kafka-console-producer.sh --broker-list 10.0.67.101:9092 --topic hello

执行消费者脚本,查看是否写入:

$kafka/bin/kafka-console-consumer.sh --zookeeper 127.0.0.1:2181 --from-beginning --topic hello

输入测试

input{
    stdin{}
}
output{
    kafka{
        topic_id => "hello"
        bootstrap_servers => "192.168.0.4:9092" # kafka的地址
        batch_size => 5
    }
    stdout{
        codec => rubydebug
    }
}

读取测试

logstash配置文件:

input{
    kafka {
        codec => "plain"
        group_id => "logstash1"
        auto_offset_reset => "smallest"
        reset_beginning => true
        topic_id => "hello"
        #white_list => ["hello"]
        #black_list => nil
        zk_connect => "192.168.0.5:2181" # zookeeper的地址
   }

}
output{
    stdout{
        codec => rubydebug
    }
}
本文转自博客园xingoo的博客,原文链接:Kafka与Logstash的数据采集对接 —— 看图说话,从运行机制到部署,如需转载请自行联系原博主。
相关文章
|
2月前
|
消息中间件 Java 大数据
Kafka ISR机制详解!
本文详细解析了Kafka的ISR(In-Sync Replicas)机制,阐述其工作原理及如何确保消息的高可靠性和高可用性。ISR动态维护与Leader同步的副本集,通过不同ACK确认机制(如acks=0、acks=1、acks=all),平衡可靠性和性能。此外,ISR机制支持故障转移,当Leader失效时,可从ISR中选取新的Leader。文章还包括实例分析,展示了ISR在不同场景下的变化,并讨论了其优缺点,帮助读者更好地理解和应用ISR机制。
74 0
Kafka ISR机制详解!
|
2月前
|
消息中间件 Java Kafka
ELFK对接zookeeper&kafka
ELFK对接zookeeper&kafka
|
2月前
|
消息中间件 Java Kafka
Kafka ACK机制详解!
本文深入剖析了Kafka的ACK机制,涵盖其原理、源码分析及应用场景,并探讨了acks=0、acks=1和acks=all三种级别的优缺点。文中还介绍了ISR(同步副本)的工作原理及其维护机制,帮助读者理解如何在性能与可靠性之间找到最佳平衡。适合希望深入了解Kafka消息传递机制的开发者阅读。
215 0
|
3月前
|
数据采集 消息中间件 存储
实时数据处理的终极武器:Databricks与Confluent联手打造数据采集与分析的全新篇章!
【9月更文挑战第3天】本文介绍如何结合Databricks与Confluent实现高效实时数据处理。Databricks基于Apache Spark提供简便的大数据处理方式,Confluent则以Kafka为核心,助力实时数据传输。文章详细阐述了利用Kafka进行数据采集,通过Delta Lake存储并导入数据,最终在Databricks上完成数据分析的全流程,展示了一套完整的实时数据处理方案。
73 3
|
4月前
|
消息中间件 负载均衡 Java
揭秘Kafka背后的秘密!Kafka 架构设计大曝光:深入剖析Kafka机制,带你一探究竟!
【8月更文挑战第24天】Apache Kafka是一款专为实时数据处理及流传输设计的高效率消息系统。其核心特性包括高吞吐量、低延迟及出色的可扩展性。Kafka采用分布式日志模型,支持数据分区与副本,确保数据可靠性和持久性。系统由Producer(消息生产者)、Consumer(消息消费者)及Broker(消息服务器)组成。Kafka支持消费者组,实现数据并行处理,提升整体性能。通过内置的故障恢复机制,即使部分节点失效,系统仍能保持稳定运行。提供的Java示例代码展示了如何使用Kafka进行消息的生产和消费,并演示了故障转移处理过程。
55 3
|
4月前
|
消息中间件 Java Kafka
如何在Kafka分布式环境中保证消息的顺序消费?深入剖析Kafka机制,带你一探究竟!
【8月更文挑战第24天】Apache Kafka是一款专为实时数据管道和流处理设计的分布式平台,以其高效的消息发布与订阅功能著称。在分布式环境中确保消息按序消费颇具挑战。本文首先介绍了Kafka通过Topic分区实现消息排序的基本机制,随后详细阐述了几种保证消息顺序性的策略,包括使用单分区Topic、消费者组搭配单分区消费、幂等性生产者以及事务支持等技术手段。最后,通过一个Java示例演示了如何利用Kafka消费者确保消息按序消费的具体实现过程。
161 3
|
5月前
|
存储 消息中间件 运维
百行代码实现 Kafka 运行在 S3 之上
AutoMQ 当前已经支持完全构建于像 S3 这样的对象存储之上。你可以参考快速上手 即刻开始体验。AutoMQ 在已有的流存储引擎之上仅仅通过对顶层 WAL 的抽象进行拓展实现少量代码即可做到一些友商引以为傲的的特性,即将流系统完全构建于像 S3 对象存储之上。值得一提的是,我们也已经将这部分源码完全公开,开发者可以利用 S3Stream 流存储引擎轻松在自己的环境中拥有一个完全部署在对象存储之上的 Kafka 服务,具备极低的存储成本和运维复杂度。
54 3
百行代码实现 Kafka 运行在 S3 之上
|
4月前
|
消息中间件 监控 Kafka
Filebeat+Kafka+Logstash+Elasticsearch+Kibana 构建日志分析系统
【8月更文挑战第13天】Filebeat+Kafka+Logstash+Elasticsearch+Kibana 构建日志分析系统
231 3
|
4月前
|
消息中间件 数据采集 关系型数据库
大数据-业务数据采集-FlinkCDC 读取 MySQL 数据存入 Kafka
大数据-业务数据采集-FlinkCDC 读取 MySQL 数据存入 Kafka
62 1
|
4月前
|
消息中间件 负载均衡 Java
"深入Kafka核心:探索高效灵活的Consumer机制,以Java示例展示数据流的优雅消费之道"
【8月更文挑战第10天】在大数据领域,Apache Kafka凭借其出色的性能成为消息传递与流处理的首选工具。Kafka Consumer作为关键组件,负责优雅地从集群中提取并处理数据。它支持消息的负载均衡与容错,通过Consumer Group实现消息的水平扩展。下面通过一个Java示例展示如何启动Consumer并消费数据,同时体现了Kafka Consumer设计的灵活性与高效性,使其成为复杂消费场景的理想选择。
129 4