江博哈哈 2019-10-05 3458浏览量
不同的数据源头被抽象成不同的Reader插件,不同的数据目标被抽象成不同的Writer插件。理论上,FlinkX框架可以支持任意数据源类型的数据同步工作。作为一套生态系统,每接入一套新数据源该新加入的数据源即可实现和现有的数据源互通。
在底层实现上,FlinkX依赖Flink,数据同步任务会被翻译成StreamGraph在Flink上执行,工作原理如下图:
进入项目根目录,使用maven打包:
mvn clean package -Dmaven.test.skip
打包结束后,项目根目录下会产生bin目录和plugins目录,其中bin目录包含FlinkX的启动脚本,plugins目录下存放编译好的数据同步插件包
model
描述:执行模式,也就是flink集群的工作模式
job
plugin
flinkconf
yarnconf
bin/flinkx -mode local -job /Users/softfly/company/flink-data-transfer/jobs/task_to_run.json -plugin /Users/softfly/company/flink-data-transfer/plugins -confProp "{"flink.checkpoint.interval":60000,"flink.checkpoint.stateBackend":"/flink_checkpoint/"}" -s /flink_checkpoint/0481473685a8e7d22e7bd079d6e5c08c/chk-*
bin/flinkx -mode standalone -job /Users/softfly/company/flink-data-transfer/jobs/oracle_to_oracle.json -plugin /Users/softfly/company/flink-data-transfer/plugins -flinkconf /hadoop/flink-1.4.0/conf -confProp "{"flink.checkpoint.interval":60000,"flink.checkpoint.stateBackend":"/flink_checkpoint/"}" -s /flink_checkpoint/0481473685a8e7d22e7bd079d6e5c08c/chk-*
bin/flinkx -mode yarn -job /Users/softfly/company/flinkx/jobs/mysql_to_mysql.json -plugin /opt/dtstack/flinkplugin/syncplugin -flinkconf /opt/dtstack/myconf/conf -yarnconf /opt/dtstack/myconf/hadoop -confProp "{"flink.checkpoint.interval":60000,"flink.checkpoint.stateBackend":"/flink_checkpoint/"}" -s /flink_checkpoint/0481473685a8e7d22e7bd079d6e5c08c/chk-*
从最高空俯视,一个数据同步的构成很简单,如下:
{
"job": {
"setting": {...},
"content": [...]
}
}
数据同步任务包括一个job元素,而这个元素包括setting和content两部分。
"setting": {
"speed": {...},
"errorLimit": {...},
"dirty": {...}
}
setting包括speed、errorLimit和dirty三部分,分别描述限速、错误控制和脏数据管理的配置信息
"speed": {
"channel": 3,
"bytes": 0
}
"errorLimit": {
"record": 10000,
"percentage": 100
}
"dirty": {
"path": "/tmp",
"hadoopConfig": {
"fs.default.name": "hdfs://ns1",
"dfs.nameservices": "ns1",
"dfs.ha.namenodes.ns1": "nn1,nn2",
"dfs.namenode.rpc-address.ns1.nn1": "node02:9000",
"dfs.namenode.rpc-address.ns1.nn2": "node03:9000",
"dfs.ha.automatic-failover.enabled": "true",
"dfs.client.failover.proxy.provider.ns1": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider",
"fs.hdfs.impl.disable.cache": "true"
}
}
"restore": {
"isRestore": false,
"restoreColumnName": "",
"restoreColumnIndex": 0
}
restore配置请参考断点续传
"content": [
{
"reader": {
"name": "...",
"parameter": {
...
}
},
"writer": {
"name": "...",
"parameter": {
...
}
}
}
]
reader和writer包括name和parameter,分别表示插件名称和插件参数
详见flinkx-examples子工程
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
大数据计算实践乐园,近距离学习前沿技术