iLogtail使用入门-iLogtail本地配置模式部署(For Kafka Flusher)

本文涉及的产品
对象存储 OSS,20GB 3个月
日志服务 SLS,月写入数据量 50GB 1个月
对象存储 OSS,内容安全 1000次 1年
简介: iLogtail使用入门-iLogtail本地配置模式部署(For Kafka Flusher)

阿里已经正式开源了可观测数据采集器iLogtail。作为阿里内部可观测数据采集的基础设施,iLogtail承载了阿里巴巴集团、蚂蚁的日志、监控、Trace、事件等多种可观测数据的采集工作。

iLogtail作为阿里云SLS的采集Agent,一般情况下都是配合SLS进行使用,通常采集配置都是通过SLS控制台或API进行的。那是否可以在不依赖于SLS的情况下使用iLogtail呢?

本文将会详细介绍如何在不依赖于SLS控制台的情况下,进行iLogtail本地配置模式部署,并将json格式的日志文件采集到非SLS(例如Kafka等)。

场景

采集/root/bin/input_data/json.log(单行日志json格式),并将采集到的日志写入本地部署的kafka中。

前提条件

kafka本地安装完成,并创建名为logtail-flusher-kafka的topic。部署详见链接

安装ilogtail

下载最新的ilogtail版本,并解压。

# 解压tar包

$ tar zxvf logtail-linux64.tar.gz


# 查看目录结构

$ ll logtail-linux64

drwxr-xr-x   3 500 500  4096 bin

drwxr-xr-x 184 500 500 12288 conf

-rw-r--r--   1 500 500   597 README

drwxr-xr-x   2 500 500  4096 resources


# 进入bin目录

$ cd logtail-linux64/bin

$ ll

-rwxr-xr-x 1 500 500 10052072 ilogtail_1.0.28 # ilogtail可执行文件

-rwxr-xr-x 1 500 500     4191 ilogtaild  

-rwxr-xr-x 1 500 500     5976 libPluginAdapter.so

-rw-r--r-- 1 500 500 89560656 libPluginBase.so

-rwxr-xr-x 1 500 500  2333024 LogtailInsight

采集配置

配置格式

针对json格式的日志文件采集到本地kafa的配置格式:

{

   "metrics": {

       "{config_name1}": {

           "enable": true,

           "category": "file",

           "log_type": "json_log",

           "log_path": "/root/bin/input_data",

           "file_pattern": "json.log",

           "plugin": {

               "processors": [

               {

                   "detail": {

                       "SplitSep": "",

                       "SplitKey": "content"

                   },

                   "type": "processor_split_log_string"

               },

               {

                   "detail": {

                       "ExpandConnector": "",

                       "ExpandDepth": 1,

                       "SourceKey": "content",

                       "KeepSource": false

                   },

                   "type": "processor_json"

               }],

               "flushers":[

               {

                   "type": "flusher_kafka",

                   "detail": {

                       "Brokers":["localhost:9092"],

                       "Topic": "logtail-flusher-kafka"

                   }

               }]

           },

           "version": 1

    },

    "{config_name2}": {

        ...

    }

}

}

详细格式说明:

  • 文件最外层的key为metrics,内部为各个具体的采集配置。
  • 采集配置的key为配置名,改名称需保证在本文件中唯一。建议命名:"##1.0##采集配置名称"。
  • 采集配置value内部为具体采集参数配置,其中关键参数以及含义如下:
参数名 类型 描述
enable bool 该配置是否生效,为false时该配置不生效。
category string 文件采集场景取值为"file"。
log_type string log类型。json采集场景下取值json_log。
log_path string 采集路径。
file_pattern string 采集文件。
plugin object 具体采集配置,为json object,具体配置参考下面说明
version int 该配置版本号,建议每次修改配置后加1
  • plugin 字段为json object,为具体输入源以及处理方式配置:
配置项 类型 描述
processors object array 处理方式配置,具体请参考链接。 processor_json:将原始日志按照json格式展开。
flushers object array flusher_stdout:采集到标准输出,一般用于调试场景; flusher_kafka:采集到kafka。

完整配置样例

  • 进入bin目录,创建及sys_conf_dir文件夹及ilogtail_config.json文件。

# 1. 创建sys_conf_dir

$ mkdir sys_conf_dir


# 2. 创建ilogtail_config.json并完成配置。

##### logtail_sys_conf_dir取值为:$pwd/sys_conf_dir/

##### config_server_address固定取值,保持不变。

$ pwd

/root/bin/logtail-linux64/bin

$ cat ilogtail_config.json

{

    "logtail_sys_conf_dir": "/root/bin/logtail-linux64/bin/sys_conf_dir/",  


    "config_server_address":"http://logtail.cn-zhangjiakou.log.aliyuncs.com"

}


# 3. 此时的目录结构

$ ll

-rwxr-xr-x 1  500  500 ilogtail_1.0.28

-rw-r--r-- 1 root root ilogtail_config.json

-rwxr-xr-x 1  500  500 ilogtaild

-rwxr-xr-x 1  500  500 libPluginAdapter.so

-rw-r--r-- 1  500  500 libPluginBase.so

-rwxr-xr-x 1  500  500 LogtailInsight

drwxr-xr-x 2 root root sys_conf_dir

  • sys_conf_dir下创建采集配置文件user_local_config.json

说明:json_log场景下,user_local_config.json仅需修改采集路径相关参数log_pathfile_pattern即可,其他参数保持不变。

$ cat sys_conf_dir/user_local_config.json

{

   "metrics":

   {

       "##1.0##kafka_output_test":

       {

           "category": "file",

           "log_type": "json_log",

           "log_path": "/root/bin/input_data",

           "file_pattern": "json.log",

           "create_time": 1631018645,

           "defaultEndpoint": "",

           "delay_alarm_bytes": 0,

           "delay_skip_bytes": 0,

           "discard_none_utf8": false,

           "discard_unmatch": false,

           "docker_exclude_env":

           {},

           "docker_exclude_label":

           {},

           "docker_file": false,

           "docker_include_env":

           {},

           "docker_include_label":

           {},

           "enable": true,

           "enable_tag": false,

           "file_encoding": "utf8",

           "filter_keys":

           [],

           "filter_regs":

           [],

           "group_topic": "",

           "plugin":

           {

               "processors":

               [

                   {

                       "detail": {

                           "SplitSep": "",

                           "SplitKey": "content"

                       },

                       "type": "processor_split_log_string"

                   },

                   {

                       "detail":

                       {

                           "ExpandConnector": "",

                           "ExpandDepth": 1,

                           "SourceKey": "content",

                           "KeepSource": false

                       },

                       "type": "processor_json"

                   }

               ],

               "flushers":

               [

                   {

                       "type": "flusher_kafka",

                       "detail":

                       {

                           "Brokers":

                           [

                               "localhost:9092"

                           ],

                           "Topic": "logtail-flusher-kafka"

                       }

                   }

               ]

           },

           "local_storage": true,

           "log_tz": "",

           "max_depth": 10,

           "max_send_rate": -1,

           "merge_type": "topic",

           "preserve": true,

           "preserve_depth": 1,

           "priority": 0,

           "raw_log": false,

           "aliuid": "",

           "region": "",

           "project_name": "",

           "send_rate_expire": 0,

           "sensitive_keys":

           [],

           "shard_hash_key":

           [],

           "tail_existed": false,

           "time_key": "",

           "timeformat": "",

           "topic_format": "none",

           "tz_adjust": false,

           "version": 1,

           "advanced":

           {

               "force_multiconfig": false,

               "tail_size_kb": 1024

           }            

       }

   }

}

启动ilogtail

########## 终端模式运行 ##########

$ ./ilogtail_1.0.28 --ilogtail_daemon_flag=false


########## 也可以选择daemon模式运行 ##########

$ ./ilogtail_1.0.28

$ ps -ef|grep logtail

root       48453       1   ./ilogtail_1.0.28

root       48454   48453   ./ilogtail_1.0.28


采集场景模拟

/root/bin/input_data/json.log中构造json格式的数据,代码如下:

$ echo'{"seq": "1", "action": "kkkk", "extend1": "", "extend2": "", "type": "1"}'>> json.log

$ echo'{"seq": "2", "action": "kkkk", "extend1": "", "extend2": "", "type": "1"}'>> json.log

消费topic为logtail-flusher-kafka中的数据。

$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic logtail-flusher-kafka

{"Time":1640862641,"Contents":[{"Key":"__tag__:__path__","Value":"/root/bin/input_data/json.log"},{"Key":"seq","Value":"1"},{"Key":"action","Value":"kkkk"},{"Key":"extend1","Value":""},{"Key":"extend2","Value":""},{"Key":"type","Value":"1"}]}

{"Time":1640862646,"Contents":[{"Key":"__tag__:__path__","Value":"/root/bin/input_data/json.log"},{"Key":"seq","Value":"2"},{"Key":"action","Value":"kkkk"},{"Key":"extend1","Value":""},{"Key":"extend2","Value":""},{"Key":"type","Value":"1"}]}


本地调试

为了快速方便验证配置是否正确,可以将采集到的日志打印到标准输出完成快速的功能验证。

替换本地采集配置plugin-flushersflusher_stdout,并以终端模式运行$ ./ilogtail_1.0.28 --ilogtail_daemon_flag=false,即可将采集到的日志打印到标准输出快速进行本地调试。

{

   "type": "flusher_stdout",

   "detail":

   {

       "OnlyStdout": true

   }

}

目录
相关文章
|
22天前
|
消息中间件 存储 监控
Kraft模式下Kafka脚本的使用
【9月更文挑战第9天】在Kraft模式下,使用Kafka脚本涉及以下几个关键步骤:启动Zookeeper和Kafka服务、创建主题、发送与消费消息、查看主题列表及描述主题详情。通过指定配置文件与相关参数,如`--replication-factor`和`--partitions`,可以灵活管理主题。此外,确保根据实际需求调整配置文件中的参数,并监控日志以维持最佳性能与及时问题处理。
|
2月前
|
消息中间件 负载均衡 Kafka
【Kafka消费秘籍】深入了解消费者组与独立模式,掌握消息消费的两种超能力!
【8月更文挑战第24天】Apache Kafka是一款高性能的分布式消息系统,支持灵活多样的消费模型以适应不同的应用场景。消息按主题组织,每个主题可划分为多个分区,确保消息顺序性。本文深入探讨了Kafka中的两大核心消费模式:消费者组(Consumer Group)和独立消费者(Standalone Consumer)。消费者组允许多个消费者协同工作,实现负载均衡及故障恢复,是最常用的消费模式。独立消费者模式则适用于需要高度定制化处理逻辑的场景,如消息重放等。通过对比这两种模式的特点和提供的示例代码,开发者可以根据具体需求选择最合适的消费策略,从而更好地利用Kafka构建高效的数据流应用程序。
38 3
|
2月前
|
消息中间件 Java 大数据
"深入理解Kafka单线程Consumer:核心参数配置、Java实现与实战指南"
【8月更文挑战第10天】在大数据领域,Apache Kafka以高吞吐和可扩展性成为主流数据流处理平台。Kafka的单线程Consumer因其实现简单且易于管理而在多种场景中受到欢迎。本文解析单线程Consumer的工作机制,强调其在错误处理和状态管理方面的优势,并通过详细参数说明及示例代码展示如何有效地使用KafkaConsumer类。了解这些内容将帮助开发者优化实时数据处理系统的性能与可靠性。
69 7
|
2月前
|
消息中间件 监控 Java
【一键解锁!】Kafka Manager 部署与测试终极指南 —— 从菜鸟到高手的必经之路!
【8月更文挑战第9天】随着大数据技术的发展,Apache Kafka 成为核心组件,用于处理实时数据流。Kafka Manager 提供了简洁的 Web 界面来管理和监控 Kafka 集群。本文介绍部署步骤及示例代码,助您快速上手。首先确认已安装 Java 和 Kafka。
355 4
|
2月前
|
消息中间件 域名解析 网络协议
【Azure 应用服务】部署Kafka Trigger Function到Azure Function服务中,解决自定义域名解析难题
【Azure 应用服务】部署Kafka Trigger Function到Azure Function服务中,解决自定义域名解析难题
|
3月前
|
消息中间件 Kafka
面试题Kafka问题之RabbitMQ的路由配置工作如何解决
面试题Kafka问题之RabbitMQ的路由配置工作如何解决
53 1
|
2月前
|
消息中间件 Kafka Apache
部署安装kafka集群
部署安装kafka集群
|
3月前
|
消息中间件 存储 缓存
微服务数据问题之Kafka的默认复制配置如何解决
微服务数据问题之Kafka的默认复制配置如何解决
|
3月前
|
消息中间件 存储 资源调度
实时计算 Flink版产品使用问题之在消费Kafka的Avro消息,如何配置FlinkKafka消费者的相关参数
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
2月前
|
消息中间件 Java Kafka
Kafka不重复消费的终极秘籍!解锁幂等性、偏移量、去重神器,让你的数据流稳如老狗,告别数据混乱时代!
【8月更文挑战第24天】Apache Kafka作为一款领先的分布式流处理平台,凭借其卓越的高吞吐量与低延迟特性,在大数据处理领域中占据重要地位。然而,在利用Kafka进行数据处理时,如何有效避免重复消费成为众多开发者关注的焦点。本文深入探讨了Kafka中可能出现重复消费的原因,并提出了四种实用的解决方案:利用消息偏移量手动控制消费进度;启用幂等性生产者确保消息不被重复发送;在消费者端实施去重机制;以及借助Kafka的事务支持实现精确的一次性处理。通过这些方法,开发者可根据不同的应用场景灵活选择最适合的策略,从而保障数据处理的准确性和一致性。
85 9

热门文章

最新文章

下一篇
无影云桌面