前言
Fluentd是一个实时开源的数据收集器,基于CRuby实现,td-agent是其商业化版本,由Treasure Data公司维护。本文将介绍如何使Fluentd能够读写OSS。
安装
首先下载并安装td-agent,笔者使用的是td-agent-3.3.0-1.el7.x86_64.rpm,使用rpm命令安装:
[root@apache ~]# rpm -ivh td-agent-3.3.0-1.el7.x86_64.rpm
然后,需要安装Fluentd的OSS plugin:
[root@apache ~]# /usr/sbin/td-agent-gem install fluent-plugin-aliyun-oss
这里请注意,因为我们使用的是td-agent,安装Fluentd plugin时需要使用td-agent的td-agent-gem(/usr/sbin/td-agent-gem)。原因是td-agent有自己的Ruby,你需要将plugin安装到它的Ruby里面,而不是其他的Ruby,否则将会找不到已经安装好的plugin。
具体可以参见Fluentd的官方文档
安装完成后,我们可以查看安装的OSS plugin:
[root@apache ~]# /usr/sbin/td-agent-gem list fluent-plugin-aliyun-oss
*** LOCAL GEMS ***
fluent-plugin-aliyun-oss (0.0.1)
fluent-plugin-aliyun-oss这个plugin包含两部分:
Fluent OSS output plugin
将数据缓存在本地,达到设定的条件后,将缓存的数据(压缩后,如果设置的话)上传到OSS。
Fluent OSS input plugin
首先,OSS的bucket需要配置事件通知,这篇文章介绍了如何设置,得到MNS的Queue与Endpoint。
设置好之后,这个plugin会定时地从MNS拉取消息,从消息中获取上传的Objects,最后从OSS读取这些Objects,再发往下游。
下面将分别介绍如何配置,具体的配置参数说明参见github: https://github.com/aliyun/fluent-plugin-oss
配置(向OSS写数据)
下面是一个例子,将读到的数据,每分钟一个文件写到OSS中。其中endpoint/bucket/access_key_id/access_key_secret是必填项,其他的都是可选项。
<system>
workers 6
</system>
<match input.*>
@type oss
endpoint <OSS endpoint to connect to>
bucket <Your Bucket>
access_key_id <Your Access Key>
access_key_secret <Your Secret Key>
upload_crc_enable false
path "fluent-oss/logs"
auto_create_bucket true
key_format "%{path}/%{time_slice}/events_%{index}_%{thread_id}.%{file_extension}"
#key_format %{path}/events/ts=%{time_slice}/events_%{index}_%{thread_id}.%{file_extension}
time_slice_format %Y%m%d-%H
store_as gzip
<buffer tag,time>
@type file
path /var/log/fluent/oss/${ENV['SERVERENGINE_WORKER_ID']}
timekey 60 # 1 min partition
timekey_wait 1s
# timekey_use_utc true
flush_thread_count 1
</buffer>
<format>
@type json
</format>
</match>
我们可以从OSS控制台看到效果:
配置(从OSS读数据)
下面是一个配置示例,其中endpoint/bucket/access_key_id/access_key_secret和MNS的endpoint/queue是必填项,其他的都是可选项。
<source>
@type oss
endpoint <OSS endpoint to connect to>
bucket <Your Bucket>
access_key_id <Your Access Key>
access_key_secret <Your Secret Key>
store_local false
store_as gzip
flush_batch_lines 800
flush_pause_milliseconds 1
download_crc_enable false
<mns>
endpoint <MNS endpoint to connect to, E.g.,{account-id}.mns.cn-zhangjiakou-internal.aliyuncs.com>
queue <MNS queue>
poll_interval_seconds 1
</mns>
<parse>
@type json
</parse>
</source>
我们可以从log中看一下运行状态
2019-04-23 15:38:14 +0800 [info]: #5 start to poll message from MNS queue fluentd-oss
2019-04-23 15:38:14 +0800 [info]: #5 http://1305310278558820.mns.cn-zhangjiakou-internal.aliyuncs.com/queues/fluentd-oss/messages
2019-04-23 15:38:14 +0800 [info]: #5 read object fluent-oss/logs/20190423-12/events_10_70226640160100.gz, size 4389548 from OSS
2019-04-23 15:38:15 +0800 [info]: #1 http://1305310278558820.mns.cn-zhangjiakou-internal.aliyuncs.com/queues/fluentd-oss/messages?ReceiptHandle=0BC1EA4E51483D4EAC69736941044AAE-MjY5ODkgMTU1NjAwNTAwODMwNSAzNjAwMDA
2019-04-23 15:38:16 +0800 [info]: #1 start to poll message from MNS queue fluentd-oss
2019-04-23 15:38:16 +0800 [info]: #1 http://1305310278558820.mns.cn-zhangjiakou-internal.aliyuncs.com/queues/fluentd-oss/messages
2019-04-23 15:38:16 +0800 [info]: #1 read object fluent-oss/logs/20190423-09/events_50_69939581261780.gz, size 6750045 from OSS
参考资料
https://github.com/aliyun/fluent-plugin-oss
https://rubygems.org/gems/fluent-plugin-aliyun-oss
https://www.fluentd.org/
https://docs.fluentd.org/v1.0/articles/quickstart