开发者社区> 阿里云存储服务> 正文

使用Fluentd读写OSS

简介: 前言Fluentd是一个实时开源的数据收集器,基于CRuby实现,td-agent是其商业化版本,由Treasure Data公司维护。本文将介绍如何使Fluentd能够读写OSS。安装首先下载并安装td-agent,笔者使用的是td-agent-3.

前言

Fluentd是一个实时开源的数据收集器,基于CRuby实现,td-agent是其商业化版本,由Treasure Data公司维护。本文将介绍如何使Fluentd能够读写OSS。

安装

首先下载并安装td-agent,笔者使用的是td-agent-3.3.0-1.el7.x86_64.rpm,使用rpm命令安装:

[root@apache ~]# rpm -ivh td-agent-3.3.0-1.el7.x86_64.rpm

然后,需要安装Fluentd的OSS plugin:

[root@apache ~]# /usr/sbin/td-agent-gem install fluent-plugin-aliyun-oss

这里请注意,因为我们使用的是td-agent,安装Fluentd plugin时需要使用td-agent的td-agent-gem(/usr/sbin/td-agent-gem)。原因是td-agent有自己的Ruby,你需要将plugin安装到它的Ruby里面,而不是其他的Ruby,否则将会找不到已经安装好的plugin。

具体可以参见Fluentd的官方文档

安装完成后,我们可以查看安装的OSS plugin:

[root@apache ~]# /usr/sbin/td-agent-gem list fluent-plugin-aliyun-oss

*** LOCAL GEMS ***

fluent-plugin-aliyun-oss (0.0.1)

fluent-plugin-aliyun-oss这个plugin包含两部分:

Fluent OSS output plugin
将数据缓存在本地,达到设定的条件后,将缓存的数据(压缩后,如果设置的话)上传到OSS。

Fluent OSS input plugin
首先,OSS的bucket需要配置事件通知,这篇文章介绍了如何设置,得到MNS的Queue与Endpoint。
设置好之后,这个plugin会定时地从MNS拉取消息,从消息中获取上传的Objects,最后从OSS读取这些Objects,再发往下游。

下面将分别介绍如何配置,具体的配置参数说明参见github: https://github.com/aliyun/fluent-plugin-oss

配置(向OSS写数据)

下面是一个例子,将读到的数据,每分钟一个文件写到OSS中。其中endpoint/bucket/access_key_id/access_key_secret是必填项,其他的都是可选项。

<system>
  workers 6
</system>

<match input.*>
  @type oss
  endpoint <OSS endpoint to connect to>
  bucket <Your Bucket>
  access_key_id <Your Access Key>
  access_key_secret <Your Secret Key>
  upload_crc_enable false
  path "fluent-oss/logs"
  auto_create_bucket true
  key_format "%{path}/%{time_slice}/events_%{index}_%{thread_id}.%{file_extension}"
  #key_format %{path}/events/ts=%{time_slice}/events_%{index}_%{thread_id}.%{file_extension}
  time_slice_format %Y%m%d-%H
  store_as gzip
  <buffer tag,time>
    @type file
    path /var/log/fluent/oss/${ENV['SERVERENGINE_WORKER_ID']}
    timekey 60 # 1 min partition
    timekey_wait 1s
    # timekey_use_utc true
    flush_thread_count 1
  </buffer>
  <format>
    @type json
  </format>
</match>

我们可以从OSS控制台看到效果:
15_40_01__04_23_2019

配置(从OSS读数据)

下面是一个配置示例,其中endpoint/bucket/access_key_id/access_key_secret和MNS的endpoint/queue是必填项,其他的都是可选项。

<source>
  @type oss
  endpoint <OSS endpoint to connect to>
  bucket <Your Bucket>
  access_key_id <Your Access Key>
  access_key_secret <Your Secret Key>
  store_local false
  store_as gzip
  flush_batch_lines 800
  flush_pause_milliseconds 1

  download_crc_enable false
  <mns>
    endpoint <MNS endpoint to connect to, E.g.,{account-id}.mns.cn-zhangjiakou-internal.aliyuncs.com>
    queue <MNS queue>
    poll_interval_seconds 1
  </mns>
  <parse>
    @type json
  </parse>
</source>

我们可以从log中看一下运行状态

2019-04-23 15:38:14 +0800 [info]: #5 start to poll message from MNS queue fluentd-oss
2019-04-23 15:38:14 +0800 [info]: #5 http://1305310278558820.mns.cn-zhangjiakou-internal.aliyuncs.com/queues/fluentd-oss/messages
2019-04-23 15:38:14 +0800 [info]: #5 read object fluent-oss/logs/20190423-12/events_10_70226640160100.gz, size 4389548 from OSS
2019-04-23 15:38:15 +0800 [info]: #1 http://1305310278558820.mns.cn-zhangjiakou-internal.aliyuncs.com/queues/fluentd-oss/messages?ReceiptHandle=0BC1EA4E51483D4EAC69736941044AAE-MjY5ODkgMTU1NjAwNTAwODMwNSAzNjAwMDA
2019-04-23 15:38:16 +0800 [info]: #1 start to poll message from MNS queue fluentd-oss
2019-04-23 15:38:16 +0800 [info]: #1 http://1305310278558820.mns.cn-zhangjiakou-internal.aliyuncs.com/queues/fluentd-oss/messages
2019-04-23 15:38:16 +0800 [info]: #1 read object fluent-oss/logs/20190423-09/events_50_69939581261780.gz, size 6750045 from OSS

参考资料

https://github.com/aliyun/fluent-plugin-oss
https://rubygems.org/gems/fluent-plugin-aliyun-oss
https://www.fluentd.org/
https://docs.fluentd.org/v1.0/articles/quickstart

版权声明:本文中所有内容均属于阿里云开发者社区所有,任何媒体、网站或个人未经阿里云开发者社区协议授权不得转载、链接、转贴或以其他方式复制发布/发表。申请授权请邮件developerteam@list.alibaba-inc.com,已获得阿里云开发者社区协议授权的媒体、网站,在转载使用时必须注明"稿件来源:阿里云开发者社区,原文作者姓名",违者本社区将依法追究责任。 如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件至:developer2020@service.aliyun.com 进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容。

分享:
上一篇:Apache Hadoop 2.7如何支持读写OSS 下一篇:如何将Elasticsearch的快照备份至OSS
阿里云存储服务
使用钉钉扫一扫加入圈子
+ 订阅

阿里云存储基于飞天盘古2.0分布式存储系统,产品多种多样,充分满足用户数据存储和迁移上云需求。

官方博客
链接