用Fluent实现MySQL到ODPS数据集成

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
云数据库 RDS MySQL,高可用系列 2核4GB
简介: Fluentd是一个日志收集系统,可以定制输入或输出到常用系统(如MongoDB, RDS, Mysql, HDFS)或直接用于Log收集。 DHS(Datahub)向开发人员提供Fluentd插件,可以通过Fluentd将其它系统数据利用DHS导入到ODPS中。 以下操作均在root 用户下进

安装ruby

首先通过 /etc/issue 命令查看当前使用centos是哪个版本:

[hadoop@hadoop03 ~]$   cat /etc/issue

025f6311f124dd610e452335c4feecf6f30cda17

由于centos版本是6.6,安装ruby时就要选择在centos 6.X环境,具体安装步骤参考如下所示即可!

yum install gcc-c++ patch readline readline-devel zlib zlib-devel libyaml-devel libffi-devel openssl-devel make bzip2 autoconf automake libtool bison iconv-devel wget tar

cd ~/
wget https://ruby.taobao.org/mirrors/ruby/ruby-2.2.3.tar.gz

tar xvf ruby-2.2.3.tar.gz
cd ruby-2.2.3
./configure

makemake install

查看验证

[root@hadoop02  ~]#    ruby -v
  ruby 2.2.3p173 (2015-08-18 revision 51636) [x86_64-linux]

012a6969261bcea10489f2c9212f84f21e3fc68f

安装fluent-plugin-sql插件(输入源)

[root@hadoop03 ~]#   gem install fluent-plugin-sql
64a08fad70766b661e4505903ec8e023b1b45128

准备MySQL表及数据

在test数据库创建一张表,建表语句如下:
use test; 进入test数据库里操作
create table test_fluent
(
  id int unsigned not null  auto_increment,
  sex varchar(1),
  name varchar(225),
  primary key(id)
)engine=innodb default charset=utf8 auto_increment=1;
其中id是主键,自增

插入数据,插入语句如下命令:
insert into test_fluent(sex,name) values('f','dongdong');
insert into test_fluent(sex,name) values('m','heihei');
insert into test_fluent(sex,name) values('m','qingsong');
insert into test_fluent(sex,name) values('f','jiafu');
insert into test_fluent(sex,name) values('m','angrybaby');
insert into test_fluent(sex,name) values('f','jack');
insert into test_fluent(sex,name) values('f','helloword');
insert into test_fluent(sex,name) values('m','sunlongfei');
insert into test_fluent(sex,name) values('m','donglang');
insert into test_fluent(sex,name) values('f','deguang');
insert into test_fluent(sex,name) values('m','yuanijng');
insert into test_fluent(sex,name) values('f','yangqun');

备注:
由于我操作的MySQL数据库位于172.16.1.156机器上,用户名是dong,密码是123456
而我安装的fluent位于172.16.1.158机器上,不在一台机器上,如果要从158机器远程访问156机器上MySQL会受限,禁止访问。
为此,需要在156机器上执行以下命令给指定IP授权。
授权158机器上的dong用户可以远程访问156上MySQL,dong用户登录密码是123456
grant ALL PRIVILEGES ON *.* to dong@"172.16.1.158" identified by "123456" WITH GRANT OPTION; 
使刚授予权限立即生效
flush privileges;

准备ODPS测试表

创建ODPS 表为 demo_access_log,其建表语句为:
drop table if exists demo_access_log;
create table demo_access_log(
sex string, 
name string) 
into 5 shards hublifecycle 7;
编辑fluent.conf配置文件

编辑fluent.conf配置文件

配置mysql输入源、ODPS输出源:

state_file  /var/run/fluentd/sql_state  配置项 (path to a file to store last rows该文件默认不存在,需要提前创建好!)

state_file stores last selected rows to a file (named state_file) to not forget last row when Fluentd restarts.

[root@hadoop03 ~]#  vi /etc/fluent/fluent.conf    --编辑fluent.conf配置文件

 
        
<source>
  @type sql
  host 172.16.1.156
  port 3306
  database test
  adapter mysql
  username dong
  password 123456
  select_interval 10s
  select_limit 10
  state_file /var/run/fluentd/sql_state
  <table>
    table test_fluent
     tag in.sql
    update_column id
  </table>
</source>

<match  in.**>
  type aliyun_odps
  aliyun_access_id UQV2yoSSWNgquhhe
  aliyun_access_key bG8xSLwhmKYRmtBoE3HbhOBYXvknG6
  aliyun_odps_endpoint  http://service.odps.aliyun.com/api
  aliyun_odps_hub_endpoint  http://dh.odps.aliyun.com
  buffer_chunk_limit 2m
  buffer_queue_limit 128
  flush_interval 5s
  project dtstack_dev
  <table  in.sql>
    table demo_access_log
    fields sex,name
    shard_number 5
  </table>
</match>

启动fluent

fluentd启动时会自动加载/etc/fluent/fluent.conf中读取fluent.conf配置文件
fluentd  --启动命令
大概 5 分钟后,实时导入数据会被同步到离线表,可以使用 select count(*) from demo_access_log这样sql 语句进行验证。

如果安装Fluentd 用的是Ruby Gem,可以创建一个配置文件运行下面命令。发出一个终止信号将会重新安装配置文件。(如果修改了配置文件—fluent.conf 文件,ctrl c 终止进程,然后在配置文件下重新启动)

ctrl c 

fluentd -c fluent.conf

如果有类似如下输出,就可以说明数据实时写入Datahub服务已经成功。
30b7a6d16985950389ab9e0989c8815653bb3927

运行过程遇到异常及排查

(1) 异常描述:fluent.conf文件没有配置正确
0f23223c004226c6e901b2f4e83e9693c8405e0f
异常产生原因:输入端没有配置 tag,输出端table上也没有制定对应tag。输入tag,在输出match时要能
匹配上,在输出table 要能对应上才行。
解决方法:在mysql输入源上添加上tag标签,即 tag in.sql

(2)在fluent.conf配置正确基础上运行fluentd启动命令,又报以下异常:
/usr/local/lib/ruby/gems/2.2.0/gems/activerecord- 4.2.6/lib/active_record/connection_adapters/connection_specification.rb:177:in `rescue in spec': Specified 'mysql' for database adapter, but the gem is not loaded. Add `gem 'mysql'` to your Gemfile (and ensure its version is at the minimum required by ActiveRecord). (Gem::LoadError)
这个问题是mysql插件需要用到mysql adapter适配器,需要安装mysql adapter适配器,执行以下命令:
[root@hadoop03 fluent]#   yum install mysql-devel
[root@hadoop03 fluent]#   gem install mysql
Building native extensions.  This could take a while...
Successfully installed mysql-2.9.1
Parsing documentation for mysql-2.9.1
Installing ri documentation for mysql-2.9.1
Done installing documentation for mysql after 2 seconds
1 gem installed

连接数据库适配器路径:
mysql2_adapter.rb、mysql_adapter.rb、 postgresql_adapter.rb
/usr/local/lib/ruby/gems/2.2.0/gems/activerecord-4.2.6/lib/active_record/connection_adapters目录下

gem安装插件时遇到异常及排查

ERROR:  While executing gem ... (Gem::RemoteFetcher::FetchError)
     Errno::ECONNRESET: Connection reset by peer - SSL_connect (https://api.rubygems.org/quick/Marshal.4.8/cool.io-1.4.4.gemspec.rz)
87e96aeca2f6d3af42fc917d1aa087dbbb056add
异常产生原因:
由于gem源引起,需要加上淘宝源后要把原来那个rubygems那个删掉 
解决方法:
# 删除默认的官方源 
gem sources -r https://rubygems.org/
# 添加淘宝源 
gem sources -a https://ruby.taobao.org/
具体解决截图展示如下:
a8c4b9469fe22a92ee1de6a1fb3269c44b251ba3

                                         第二种异常
[hadoop@hadoop03 ~]$   gem install fluent-plugin-aliyun-odps
ERROR:  While executing gem ... (Gem::FilePermissionError)
    You don't have write permissions for the /usr/local/lib/ruby/gems/2.2.0 directory.
异常产生原因:
没有写入执行权限
解决方法:
切换到root用户下进行gem install操作
具体解决截图展示如下:
14a82b299bb6e2b6bb66aff2fccb5b47565b848a
再次查看已安装插件:
66f3e1ee0d15b7230288cd71ae86cca55f46099c

相关文章
|
11天前
|
分布式计算 DataWorks 关系型数据库
MaxCompute 生态系统中的数据集成工具
【8月更文第31天】在大数据时代,数据集成对于构建高效的数据处理流水线至关重要。阿里云的 MaxCompute 是一个用于处理大规模数据集的服务平台,它提供了强大的计算能力和丰富的生态系统工具来帮助用户管理和处理数据。本文将详细介绍如何使用 DataWorks 这样的工具将 MaxCompute 整合到整个数据处理流程中,以便更有效地管理数据生命周期。
33 0
|
9天前
|
监控 关系型数据库 MySQL
zabbix agent集成percona监控MySQL的插件实战案例
这篇文章是关于如何使用Percona监控插件集成Zabbix agent来监控MySQL的实战案例。
23 2
zabbix agent集成percona监控MySQL的插件实战案例
|
15天前
|
关系型数据库 MySQL 大数据
教你使用Python玩转MySQL数据库,大数据导入不再是难题!
教你使用Python玩转MySQL数据库,大数据导入不再是难题!
|
16天前
|
存储 JSON 关系型数据库
MySQL与JSON的邂逅:开启大数据分析新纪元
MySQL与JSON的邂逅:开启大数据分析新纪元
|
23天前
|
分布式计算 DataWorks 关系型数据库
DataWorks产品使用合集之ODPS数据怎么Merge到MySQL数据库
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
1月前
|
安全 关系型数据库 MySQL
揭秘MySQL海量数据迁移终极秘籍:从逻辑备份到物理复制,解锁大数据迁移的高效与安全之道
【8月更文挑战第2天】MySQL数据量很大的数据库迁移最优方案
136 17
|
28天前
|
NoSQL 关系型数据库 MySQL
SpringBoot 集成 SpringSecurity + MySQL + JWT 附源码,废话不多直接盘
SpringBoot 集成 SpringSecurity + MySQL + JWT 附源码,废话不多直接盘
69 2
|
28天前
|
Java 关系型数据库 MySQL
SpringBoot 集成 Quartz + MySQL
SpringBoot 集成 Quartz + MySQL
55 1
|
29天前
|
消息中间件 数据采集 关系型数据库
大数据-业务数据采集-FlinkCDC 读取 MySQL 数据存入 Kafka
大数据-业务数据采集-FlinkCDC 读取 MySQL 数据存入 Kafka
36 1
|
29天前
|
数据采集 关系型数据库 MySQL
大数据-业务数据采集-FlinkCDC The MySQL server is not configured to use a ROW binlog_format
大数据-业务数据采集-FlinkCDC The MySQL server is not configured to use a ROW binlog_format
30 1

热门文章

最新文章