实验说明
通过本节实验可以掌握使用Flink SQL同步RDS MySQL数据到阿里云Elasticsearch基本步骤和实现逻辑,并了解
ES数据新增(Insert)与更新(update)的实践结果。
实验环境
1. 准备环境
① RDS MySQL实例5.7.0
② 阿里云ES实例6.7.0
③ Fllink实例
④ MySQL Catalog
2. 准备数据
① MySQL建表语句
CREATETABLE `liu_test_3` (`id` bigint(20)NOTNULL COMMENT '序号',`name` char(50)NOTNULL COMMENT '姓名',`age` int(20) DEFAULT NULL COMMENT '年龄',`gender` char(50) DEFAULT NULL COMMENT '性别',`city` char(50) DEFAULT NULL COMMENT '所在地',PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='小范围内户籍信息统计';
②MySQL导入数据
INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1001,'大山',27,'male','苏州');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1002,'小二',23,'male','北京');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1003,'杉杉',25,'female','上海');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1004,'小四',27,'male','成都');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1005,'小五',21,'male','西安');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1006,'小六',22,'male','郑州');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1007,'琪琪',25,'female','洛阳');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1008,'巴哥',29,'male','深圳');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1009,'九儿',27,'female','广州');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1010,'杉杉',25,'female','广州');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1011,'琪琪',26,'female','洛阳');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1012,'仔仔',24,'male','苏州');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1013,'娜娜',23,'male','上海');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1014,'萧山',27,'female','成都');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1015,'大山',27,'male','苏州');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1016,'小二',23,'male','北京');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1017,'杉杉',25,'female','上海');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1018,'小四',27,'male','成都');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1019,'小五',21,'male','西安');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1020,'小六',22,'male','郑州');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1021,'琪琪',25,'female','洛阳');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1022,'巴哥',29,'male','深圳');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1023,'九儿',27,'female','广州');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1024,'杉杉',25,'female','广州');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1025,'琪琪',26,'female','洛阳');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1026,'仔仔',24,'male','苏州');
③ES索引创建
PUT /dev_flink_invoice_detail { "settings":{ "number_of_shards":5, "number_of_replicas":1 }, "mappings":{ "products":{ "properties":{ "productName":{ "type":"text", "analyzer":"ik_smart" }, "annual_rate":{ "type":"keyword" }, "describe":{ "type":"text", "analyzer":"ik_smart" } } } } }
④Flink SQL作业
CREATE TEMPORARY TABLE es_sink (id BIGINT,name string,age BIGINT,gender string,city string,PRIMARY KEY (id)NOT ENFORCED -- 主键可选,如果定义了主键,则作为文档ID,否则文档ID将为随机值。)WITH ('connector'='elasticsearch-6','hosts'='es-cn-zvp*****qkh.elasticsearch.aliyuncs.com:9200','index'='dev_test_flink_invoices','document-type'='_doc','username'='elastic','password'='******','document-id.key-delimiter'='$',-- 可选参数, 复合主键的连接字符 (默认是 _ 符号)'retry-on-conflict'='1000');CREATE TEMPORARY TABLE es_sink_hwmcs (id BIGINT,name string,PRIMARY KEY (id)NOT ENFORCED -- 主键可选,如果定义了主键,则作为文档ID,否则文档ID将为随机值。)WITH ('connector'='elasticsearch-6','hosts'='es-cn-zvp*****qkh.elasticsearch.aliyuncs.com:9200','index'='dev_test_flink_invoices','document-type'='_doc','username'='elastic','password'='******','document-id.key-delimiter'='$',-- 可选参数, 复合主键的连接字符 (默认是 _ 符号)'retry-on-conflict'='1000');CREATE TEMPORARY TABLE invoice_detail_sink (id BIGINT,name string,age BIGINT,gender string,city string,PRIMARY KEY (id)NOT ENFORCED -- 主键可选,如果定义了主键,则作为文档ID,否则文档ID将为随机值。)WITH ('connector'='elasticsearch-6','hosts'='es-cn-zvp*****qkh.elasticsearch.aliyuncs.com:9200','index'='dev_flink_invoice_detail','document-type'='_doc','username'='elastic','password'='******','document-id.key-delimiter'='$',-- 可选参数, 复合主键的连接字符 (默认是 _ 符号)'retry-on-conflict'='1000');BEGIN STATEMENT SET;INSERTINTO es_sink SELECTc.id,c.name,c.age,c.gender,c.cityfrom `liutest`.`liuhao`.`liu_test_3` /*+ OPTIONS('server-id'='5400') */ c left join `liutest`.`liuhao`.`liu_test_4` /*+ OPTIONS('server-id'='5401') */ i on c.id=i.idwherec.age=26;INSERTINTO es_sink_hwmcs SELECT c.idAS id,LISTAGG (d.name)AS name from`liutest`.`liuhao`.`liu_test_3` /*+ OPTIONS('server-id'='5402') */ c left join `liutest`.`liuhao`.`liu_test_5` /*+ OPTIONS('server-id'='5403') */ d on c.id= d.idwherec.age=25groupby c.id;INSERTINTO invoice_detail_sink select*from`liutest`.`liuhao`.`liu_test_5` /*+ OPTIONS('server-id'='5404') */;END;
实验结果
1. 业务测试
①启动Flink作业
此时RDS数据已经写入到ES索引中
②插入新的数据
insertinto liu_test_3 (id,name,age,gender,city)values(1060,'娜娜',25,'female','广东');insertinto liu_test_4 (id,name,age,gender,city)values(1060,'娜娜',25,'female','广东');insertinto liu_test_5 (id,name,age,gender,city)values(1060,'娜娜',25,'female','广东');
此时数据写入到ES中
③更新插入的数据
update liu_test_3 set name ='小小',city='大理'where id=1060;update liu_test_4 set name ='小小',city='大理'where id=1060;update liu_test_5 set name ='小小',city='大理'where id=1060;
此时更新数据已同步到ES中
④更新历史数据
update liu_test_3 set name ='甜甜',city='深圳'where id=1007;update liu_test_4 set name ='甜甜',city='深圳'where id=1007;update liu_test_5 set name ='甜甜',city='深圳'where id=1007;
此时查看历史数据也已经被更新到ES中