用Fluent实现MySQL到ODPS数据集成

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: Fluentd是一个日志收集系统,可以定制输入或输出到常用系统(如MongoDB, RDS, Mysql, HDFS)或直接用于Log收集。 DHS(Datahub)向开发人员提供Fluentd插件,可以通过Fluentd将其它系统数据利用DHS导入到ODPS中。 以下操作均在root 用户下进

安装ruby

首先通过 /etc/issue 命令查看当前使用centos是哪个版本:

[hadoop@hadoop03 ~]$   cat /etc/issue

025f6311f124dd610e452335c4feecf6f30cda17

由于centos版本是6.6,安装ruby时就要选择在centos 6.X环境,具体安装步骤参考如下所示即可!

yum install gcc-c++ patch readline readline-devel zlib zlib-devel libyaml-devel libffi-devel openssl-devel make bzip2 autoconf automake libtool bison iconv-devel wget tar

cd ~/
wget https://ruby.taobao.org/mirrors/ruby/ruby-2.2.3.tar.gz

tar xvf ruby-2.2.3.tar.gz
cd ruby-2.2.3
./configure

makemake install

查看验证

[root@hadoop02  ~]#    ruby -v
  ruby 2.2.3p173 (2015-08-18 revision 51636) [x86_64-linux]

012a6969261bcea10489f2c9212f84f21e3fc68f

安装fluent-plugin-sql插件(输入源)

[root@hadoop03 ~]#   gem install fluent-plugin-sql
64a08fad70766b661e4505903ec8e023b1b45128

准备MySQL表及数据

在test数据库创建一张表,建表语句如下:
use test; 进入test数据库里操作
create table test_fluent
(
  id int unsigned not null  auto_increment,
  sex varchar(1),
  name varchar(225),
  primary key(id)
)engine=innodb default charset=utf8 auto_increment=1;
其中id是主键,自增

插入数据,插入语句如下命令:
insert into test_fluent(sex,name) values('f','dongdong');
insert into test_fluent(sex,name) values('m','heihei');
insert into test_fluent(sex,name) values('m','qingsong');
insert into test_fluent(sex,name) values('f','jiafu');
insert into test_fluent(sex,name) values('m','angrybaby');
insert into test_fluent(sex,name) values('f','jack');
insert into test_fluent(sex,name) values('f','helloword');
insert into test_fluent(sex,name) values('m','sunlongfei');
insert into test_fluent(sex,name) values('m','donglang');
insert into test_fluent(sex,name) values('f','deguang');
insert into test_fluent(sex,name) values('m','yuanijng');
insert into test_fluent(sex,name) values('f','yangqun');

备注:
由于我操作的MySQL数据库位于172.16.1.156机器上,用户名是dong,密码是123456
而我安装的fluent位于172.16.1.158机器上,不在一台机器上,如果要从158机器远程访问156机器上MySQL会受限,禁止访问。
为此,需要在156机器上执行以下命令给指定IP授权。
授权158机器上的dong用户可以远程访问156上MySQL,dong用户登录密码是123456
grant ALL PRIVILEGES ON *.* to dong@"172.16.1.158" identified by "123456" WITH GRANT OPTION; 
使刚授予权限立即生效
flush privileges;

准备ODPS测试表

创建ODPS 表为 demo_access_log,其建表语句为:
drop table if exists demo_access_log;
create table demo_access_log(
sex string, 
name string) 
into 5 shards hublifecycle 7;
编辑fluent.conf配置文件

编辑fluent.conf配置文件

配置mysql输入源、ODPS输出源:

state_file  /var/run/fluentd/sql_state  配置项 (path to a file to store last rows该文件默认不存在,需要提前创建好!)

state_file stores last selected rows to a file (named state_file) to not forget last row when Fluentd restarts.

[root@hadoop03 ~]#  vi /etc/fluent/fluent.conf    --编辑fluent.conf配置文件

 
        
<source>
  @type sql
  host 172.16.1.156
  port 3306
  database test
  adapter mysql
  username dong
  password 123456
  select_interval 10s
  select_limit 10
  state_file /var/run/fluentd/sql_state
  <table>
    table test_fluent
     tag in.sql
    update_column id
  </table>
</source>

<match  in.**>
  type aliyun_odps
  aliyun_access_id UQV2yoSSWNgquhhe
  aliyun_access_key bG8xSLwhmKYRmtBoE3HbhOBYXvknG6
  aliyun_odps_endpoint  http://service.odps.aliyun.com/api
  aliyun_odps_hub_endpoint  http://dh.odps.aliyun.com
  buffer_chunk_limit 2m
  buffer_queue_limit 128
  flush_interval 5s
  project dtstack_dev
  <table  in.sql>
    table demo_access_log
    fields sex,name
    shard_number 5
  </table>
</match>

启动fluent

fluentd启动时会自动加载/etc/fluent/fluent.conf中读取fluent.conf配置文件
fluentd  --启动命令
大概 5 分钟后,实时导入数据会被同步到离线表,可以使用 select count(*) from demo_access_log这样sql 语句进行验证。

如果安装Fluentd 用的是Ruby Gem,可以创建一个配置文件运行下面命令。发出一个终止信号将会重新安装配置文件。(如果修改了配置文件—fluent.conf 文件,ctrl c 终止进程,然后在配置文件下重新启动)

ctrl c 

fluentd -c fluent.conf

如果有类似如下输出,就可以说明数据实时写入Datahub服务已经成功。
30b7a6d16985950389ab9e0989c8815653bb3927

运行过程遇到异常及排查

(1) 异常描述:fluent.conf文件没有配置正确
0f23223c004226c6e901b2f4e83e9693c8405e0f
异常产生原因:输入端没有配置 tag,输出端table上也没有制定对应tag。输入tag,在输出match时要能
匹配上,在输出table 要能对应上才行。
解决方法:在mysql输入源上添加上tag标签,即 tag in.sql

(2)在fluent.conf配置正确基础上运行fluentd启动命令,又报以下异常:
/usr/local/lib/ruby/gems/2.2.0/gems/activerecord- 4.2.6/lib/active_record/connection_adapters/connection_specification.rb:177:in `rescue in spec': Specified 'mysql' for database adapter, but the gem is not loaded. Add `gem 'mysql'` to your Gemfile (and ensure its version is at the minimum required by ActiveRecord). (Gem::LoadError)
这个问题是mysql插件需要用到mysql adapter适配器,需要安装mysql adapter适配器,执行以下命令:
[root@hadoop03 fluent]#   yum install mysql-devel
[root@hadoop03 fluent]#   gem install mysql
Building native extensions.  This could take a while...
Successfully installed mysql-2.9.1
Parsing documentation for mysql-2.9.1
Installing ri documentation for mysql-2.9.1
Done installing documentation for mysql after 2 seconds
1 gem installed

连接数据库适配器路径:
mysql2_adapter.rb、mysql_adapter.rb、 postgresql_adapter.rb
/usr/local/lib/ruby/gems/2.2.0/gems/activerecord-4.2.6/lib/active_record/connection_adapters目录下

gem安装插件时遇到异常及排查

ERROR:  While executing gem ... (Gem::RemoteFetcher::FetchError)
     Errno::ECONNRESET: Connection reset by peer - SSL_connect (https://api.rubygems.org/quick/Marshal.4.8/cool.io-1.4.4.gemspec.rz)
87e96aeca2f6d3af42fc917d1aa087dbbb056add
异常产生原因:
由于gem源引起,需要加上淘宝源后要把原来那个rubygems那个删掉 
解决方法:
# 删除默认的官方源 
gem sources -r https://rubygems.org/
# 添加淘宝源 
gem sources -a https://ruby.taobao.org/
具体解决截图展示如下:
a8c4b9469fe22a92ee1de6a1fb3269c44b251ba3

                                         第二种异常
[hadoop@hadoop03 ~]$   gem install fluent-plugin-aliyun-odps
ERROR:  While executing gem ... (Gem::FilePermissionError)
    You don't have write permissions for the /usr/local/lib/ruby/gems/2.2.0 directory.
异常产生原因:
没有写入执行权限
解决方法:
切换到root用户下进行gem install操作
具体解决截图展示如下:
14a82b299bb6e2b6bb66aff2fccb5b47565b848a
再次查看已安装插件:
66f3e1ee0d15b7230288cd71ae86cca55f46099c

相关文章
|
22天前
|
存储 关系型数据库 MySQL
Mysql大数据批量插入方法
Mysql大数据批量插入方法
28 0
|
22天前
|
分布式计算 DataWorks 关系型数据库
DataWorks操作报错合集之DataWorks集成实例绑定到同一个vpc下面,也添加了RDS的IP白名单报错:数据源配置有误,请检查,该怎么处理
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
35 0
|
22天前
|
分布式计算 DataWorks 数据库
DataWorks操作报错合集之DataWorks使用数据集成整库全增量同步oceanbase数据到odps的时候,遇到报错,该怎么处理
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
29 0
|
22天前
|
SQL 分布式计算 DataWorks
DataWorks操作报错合集之在DataWorks数据集成中,但是预览ODPS源数据库为空,是什么导致的
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
32 0
|
22天前
|
分布式计算 DataWorks 调度
DataWorks产品使用合集之在DataWorks中,查看ODPS表的OSS对象如何解决
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
34 1
|
22天前
|
分布式计算 DataWorks MaxCompute
DataWorks产品使用合集之在DataWorks中,将数据集成功能将AnalyticDB for MySQL中的数据实时同步到MaxCompute中如何解决
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
39 0
|
6天前
|
存储 NoSQL 关系型数据库
mysql 数据库 基本介绍
mysql 数据库 基本介绍
|
1天前
|
SQL 关系型数据库 MySQL
mysql 数据库导出导入到本地文件
mysql 数据库导出导入到本地文件
|
1天前
|
Prometheus 监控 关系型数据库
数据库同步革命:MySQL GTID模式下主从配置的全面解析
数据库同步革命:MySQL GTID模式下主从配置的全面解析
10 0
|
1天前
|
SQL 监控 关系型数据库
MySQL慢查询日志配置指南:发现性能瓶颈,提升数据库效率
MySQL慢查询日志配置指南:发现性能瓶颈,提升数据库效率
8 0

热门文章

最新文章