配置文件
doris ├── doris.conf ├── flink.conf └── tables mysql ├── flink.conf ├── mysql.conf └── tables flink ├── flink_conf 注意mysql的tables和doris的tables是一一对应的关系
文件详解
```mysql中flink.conf -- 指定binlog消费方式 'scan.startup.mode' = 'initial' ```flink.conf -- 运行的yarn模式,需要用户配置flink on yarn set 'execution.target' = 'yarn-per-job'; -- 运行的yarn任务的名称 set 'yarn.application.name' = 'flinkjob_database'; -- checkpoint配置 set 'state.backend' = 'filesystem'; set 'state.checkpoints.dir' = 'hdfs:///ck'; set 'execution.checkpointing.interval' = '6000'; set 'execution.checkpointing.tolerable-failed-checkpoints' = '1'; set 'execution.checkpointing.timeout' ='600000'; set 'execution.checkpointing.externalized-checkpoint-retention' = 'RETAIN_ON_CANCELLATION'; set 'execution.checkpointing.mode' = 'EXACTLY_ONCE'; set 'execution.checkpointing.max-concurrent-checkpoints' = '1'; -- 资源配置 set 'jobmanager.memory.process.size' = '1600m'; set 'taskmanager.memory.process.size' = '1780m'; set 'taskmanager.memory.managed.size' = '100m'; set 'taskmanager.numberoftaskslots' = '1'; ```doris中flink.conf -- 两阶段提交 'sink.properties.two_phase_commit' = 'true'
开始执行,去bin目录下执行脚本生成文件
文件生成格式:
每个Mysql库等于一个Flink任务等于一个Flink脚本文件
每个Flink脚本文件由一个Mysql脚本语句和一个Doris映射语句和一个insert同步语句构成
所以生成格式为
mysql_db.sql doris_mysqldb.sql insert_mysqldb.sql flink_mysqldb.sql
原因如下:
首先为了统一命名格式便于理解
其次如果以doris库命名很有可能mysql10个库同步至doris一个库当中,就导致只会生成一个flinkjob
测试先:准备好insert_into和delete语句
INSERT INTO emp_2.employees_2 VALUES (11,'1962-11-07','Kenroku','Malabarba','M','1994-04-09'), (22,'1962-11-19','Somnath','Foote','M','1990-02-16'), (33,'1959-07-23','Xinglin','Eugenio','F','1986-09-08'), (44,'1954-02-25','Jungsoon','Syrzycki','F','1988-09-02')
delete from emp_2.employees_2 where emp_no = 11; delete from emp_2.employees_2 where emp_no = 22; delete from emp_2.employees_2 where emp_no = 33; delete from emp_2.employees_2 where emp_no = 44;
update emp_2.employees_2 set first_name = 'toms' where emp_no = 10091 ; update emp_2.employees_2 set first_name = 'toms' where emp_no= 10092; update emp_2.employees_2 set first_name = 'toms' where emp_no = 10093; update emp_2.employees_2 set first_name = 'toms' where emp_no = 10094;
统计语句 use emp_1; select count(1) from employees_1; select count(1) from employees_2; use emp_2; select count(1) from employees_1; select count(1) from employees_2;
insert端的实现:
for m_table in $(cat ../conf/mysql/tables |grep -v '#' | awk -F '\n' '{print $1}') do let a++ m_d=`cat ../conf/mysql/tables |grep -v '#' | awk "NR==$a{print}" |awk -F '.' '{print $1}'` d_table=`cat ../conf/doris/tables |grep -v '#' | awk "NR==$a{print}"` sed "/$d_table$sink/,/KEY(/{//d;s/ *//;p};d" ../result/doris_$m_d.sql |awk '!x[$0]++' |awk '{print $0}'| awk -F '`' '{print $2}'|awk -F '\n' '{print $1","}' |sed '$s/.$//' > a.c ac=`cat a.c` m_d=`echo $m_table | awk -F '.' '{print $1}'` echo -e "insert into \`$d_table$sink\`\nselect\n${ac}\nfrom\n\`$m_table$src\`\nwhere 1=1;\n\n" >> ../result/insert_$m_d.sql rm -rf a.c done
CREATE TABLE `demo.all_employees_info_sink1` ( `emp_no` int NOT NULL COMMENT '', `birth_date` date NULL COMMENT '', `first_name` varchar(20) NULL COMMENT '', `last_name` varchar(20) NULL COMMENT '', `gender` string NULL COMMENT '', `hire_date` date NULL COMMENT '', `database_name` varchar(50) NULL COMMENT '', `table_name` varchar(200) NULL COMMENT '', PRIMARY KEY(`emp_no`, `birth_date`) NOT ENFORCED ) with ( 'sink.properties.two_phase_commit' = 'true', 'fenodes' = '192.168.213.162:8031', 'username' = 'root', 'password' = 'zykj2021', 'table.identifier' = 'demo.all_employees_info', 'connector' = 'doris', 'sink.label-prefix' = 'emp_1_employees_1_220928_000418_1');
source端的实现:
label=0 for t_name in $(cat ../conf/doris/tables | awk -F '\n' '{print $1}' | awk -F '.' '{print $2}') do let label++ d_label=`cat ../conf/mysql/tables | awk "NR == $label"{print} |sed 's/\./\_/g'` m_d=`cat ../conf/mysql/tables | awk "NR == $label"{print}|awk -F '.' '{print $1}'` d_d=`cat ../conf/doris/tables | awk "NR == $label"{print}|awk -F '.' '{print $1}'` m_t=`cat ../conf/mysql/tables | awk "NR == $label"{print}|awk -F '.' '{print $2}'` sed -i "0,/doris_username/s/doris_username/${fe_master_username}/" ../result/$t.sql sed -i "0,/doris_password/s/doris_password/${fe_master_password}/" ../result/$t.sql sed -i "0,/doris_table/s/doris_table/${d_d}.${t_name}/" ../result/$t.sql sed -i "0,/doris_connector/s/doris_connector/doris/" ../result/$t.sql sed -i "0,/doris_fenodes/s/doris_fenodes/${fe_master_host}:${fe_load_url_port}/" ../result/$t.sql sed -i "0,/doris_label\-prefix/s/doris_label\-prefix/${m_d}_${m_t}\_`date "+%y%m%d_%H%M%S"_$label`/" ../result/$t.sql done
create table if not exists `demo.all_employees_info_sink1` ( `emp_no` int not null comment '', `birth_date` date null comment '', `first_name` varchar(20) null comment '', `last_name` varchar(20) null comment '', `gender` string null comment '', `hire_date` date null comment '', `database_name` varchar(50) null comment '', `table_name` varchar(200) null comment '', primary key(`emp_no`, `birth_date`) not enforced ) with ( 'sink.properties.two_phase_commit' = 'true', 'fenodes' = '192.168.213.162:8031', 'username' = 'root', 'password' = 'zykj2021', 'table.identifier' = 'demo.all_employees_info', 'connector' = 'doris', 'sink.label-prefix' = 'emp_1_employees_1_220928_000418_1');
sink端的实现:
for i in $(cat ../conf/mysql/tables |grep -v '#' | awk -F '\n' '{print $1}'|awk -F '.' '{print $1}' |sort -u) do sed -i '1iBEGIN STATEMENT SET;' ../result/insert_$i.sql sed -i '$aEND;' ../result/insert_$i.sql b=0 for table in $(cat ../conf/doris/tables |grep -v '#' | awk -F '\n' '{print $1}') do let b++ d_doris=`cat ../conf/doris/tables |grep -v '#' | awk "NR==$b"` sed -i "0,/into \`${d_doris}_sink\`/s/into \`${d_doris}_sink\`/into \`${d_doris}_sink${b}\`/" ../result/insert_$i.sql done done
create table if not exists `demo.all_employees_info_sink1` ( `emp_no` int not null comment '', `birth_date` date null comment '', `first_name` varchar(20) null comment '', `last_name` varchar(20) null comment '', `gender` string null comment '', `hire_date` date null comment '' primary key(`emp_no`, `birth_date`) not enforced ) with ( 'sink.properties.two_phase_commit' = 'true', 'fenodes' = '192.168.213.162:8031', 'username' = 'root', 'password' = 'zykj2021', 'table.identifier' = 'demo.all_employees_info', 'connector' = 'doris', 'sink.label-prefix' = 'emp_1_employees_1_220928_000418_1');
实现从checkpoint恢复任务:
1、小的改动:
- 获取从job的web获取到checkpoint的url(注意每30s都会更改一次)
- 修改文件内容
- 修改label
- 修改最终的checkpointurl
- 保存文件
碰到的问题:调整sql,相当变更算子,这时执行重新checkpoit会报错,需忽略这些新算子
Caused by: java.lang.IllegalStateException: Failed to rollback to checkpoint/savepoint hdfs://127.0.0.1/fck/78f7cb6b577fe6db19648ca63607e640/chk-66. Cannot map checkpoint/savepoint state for operator e75d4004e6c5f0908bd4077fcf200fcd to the new program, because the operator is not available in the new program. If you want to allow to skip this, you can set the --allowNonRestoredState option on the CLI.
2、大的改动
- cp出一份mysql_to_doris
- 重新配置表结构,执行sh flinkjob.sh
- 复制文件
- 整体粘贴
- 配置最终checkpoint的url
6、展望:
- 代码优化,将变量重新命名,调整格式以及通过变量简化代码量
- 数据类型转换规范化,在此基础上继续填充
- 优化checkpoint点,达到只需修改配置项即可同步的效果