Flink同步RDS数据到Elasticsearch实践

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
实时计算 Flink 版,5000CU*H 3个月
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介: Flink同步RDS数据到Elasticsearch实践

实验说明

通过本节实验可以掌握使用Flink SQL同步RDS MySQL数据到阿里云Elasticsearch基本步骤和实现逻辑,并了解

ES数据新增(Insert)与更新(update)的实践结果。

实验环境

1. 准备环境

① RDS MySQL实例5.7.0

② 阿里云ES实例6.7.0

③ Fllink实例

④ MySQL Catalog

2. 准备数据

MySQL建表语句

CREATETABLE `liu_test_3` (`id` bigint(20)NOTNULL COMMENT '序号',`name` char(50)NOTNULL COMMENT '姓名',`age` int(20) DEFAULT NULL COMMENT '年龄',`gender` char(50) DEFAULT NULL COMMENT '性别',`city` char(50) DEFAULT NULL COMMENT '所在地',PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='小范围内户籍信息统计';

MySQL导入数据

INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1001,'大山',27,'male','苏州');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1002,'小二',23,'male','北京');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1003,'杉杉',25,'female','上海');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1004,'小四',27,'male','成都');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1005,'小五',21,'male','西安');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1006,'小六',22,'male','郑州');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1007,'琪琪',25,'female','洛阳');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1008,'巴哥',29,'male','深圳');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1009,'九儿',27,'female','广州');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1010,'杉杉',25,'female','广州');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1011,'琪琪',26,'female','洛阳');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1012,'仔仔',24,'male','苏州');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1013,'娜娜',23,'male','上海');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1014,'萧山',27,'female','成都');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1015,'大山',27,'male','苏州');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1016,'小二',23,'male','北京');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1017,'杉杉',25,'female','上海');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1018,'小四',27,'male','成都');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1019,'小五',21,'male','西安');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1020,'小六',22,'male','郑州');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1021,'琪琪',25,'female','洛阳');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1022,'巴哥',29,'male','深圳');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1023,'九儿',27,'female','广州');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1024,'杉杉',25,'female','广州');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1025,'琪琪',26,'female','洛阳');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1026,'仔仔',24,'male','苏州');

ES索引创建

PUT /dev_flink_invoice_detail
{
"settings":{
"number_of_shards":5,
"number_of_replicas":1
},
"mappings":{
"products":{
"properties":{
"productName":{
"type":"text",
"analyzer":"ik_smart"
},
"annual_rate":{
"type":"keyword"
},
"describe":{
"type":"text",
"analyzer":"ik_smart"
}
}
}
}
}

Flink SQL作业

CREATE TEMPORARY TABLE es_sink (id BIGINT,name string,age BIGINT,gender string,city string,PRIMARY KEY (id)NOT ENFORCED -- 主键可选,如果定义了主键,则作为文档ID,否则文档ID将为随机值。)WITH
('connector'='elasticsearch-6','hosts'='es-cn-zvp*****qkh.elasticsearch.aliyuncs.com:9200','index'='dev_test_flink_invoices','document-type'='_doc','username'='elastic','password'='******','document-id.key-delimiter'='$',-- 可选参数, 复合主键的连接字符 (默认是 _ 符号)'retry-on-conflict'='1000');CREATE TEMPORARY TABLE es_sink_hwmcs (id BIGINT,name string,PRIMARY KEY (id)NOT ENFORCED -- 主键可选,如果定义了主键,则作为文档ID,否则文档ID将为随机值。)WITH
('connector'='elasticsearch-6','hosts'='es-cn-zvp*****qkh.elasticsearch.aliyuncs.com:9200','index'='dev_test_flink_invoices','document-type'='_doc','username'='elastic','password'='******','document-id.key-delimiter'='$',-- 可选参数, 复合主键的连接字符 (默认是 _ 符号)'retry-on-conflict'='1000');CREATE TEMPORARY TABLE invoice_detail_sink (id BIGINT,name string,age BIGINT,gender string,city string,PRIMARY KEY (id)NOT ENFORCED -- 主键可选,如果定义了主键,则作为文档ID,否则文档ID将为随机值。)WITH
('connector'='elasticsearch-6','hosts'='es-cn-zvp*****qkh.elasticsearch.aliyuncs.com:9200','index'='dev_flink_invoice_detail','document-type'='_doc','username'='elastic','password'='******','document-id.key-delimiter'='$',-- 可选参数, 复合主键的连接字符 (默认是 _ 符号)'retry-on-conflict'='1000');BEGIN STATEMENT SET;INSERTINTO es_sink
SELECTc.id,c.name,c.age,c.gender,c.cityfrom `liutest`.`liuhao`.`liu_test_3` /*+ OPTIONS('server-id'='5400') */ c
left join `liutest`.`liuhao`.`liu_test_4` /*+ OPTIONS('server-id'='5401') */ i
on c.id=i.idwherec.age=26;INSERTINTO es_sink_hwmcs
SELECT c.idAS id,LISTAGG (d.name)AS name
from`liutest`.`liuhao`.`liu_test_3` /*+ OPTIONS('server-id'='5402') */ c
left join `liutest`.`liuhao`.`liu_test_5` /*+ OPTIONS('server-id'='5403') */ d
on c.id= d.idwherec.age=25groupby c.id;INSERTINTO invoice_detail_sink
select*from`liutest`.`liuhao`.`liu_test_5` /*+ OPTIONS('server-id'='5404') */;END;


实验结果

1. 业务测试

启动Flink作业

此时RDS数据已经写入到ES索引中

插入新的数据

insertinto liu_test_3 (id,name,age,gender,city)values(1060,'娜娜',25,'female','广东');insertinto liu_test_4 (id,name,age,gender,city)values(1060,'娜娜',25,'female','广东');insertinto liu_test_5 (id,name,age,gender,city)values(1060,'娜娜',25,'female','广东');

此时数据写入到ES

更新插入的数据

update liu_test_3 set name ='小小',city='大理'where id=1060;update liu_test_4 set name ='小小',city='大理'where id=1060;update liu_test_5 set name ='小小',city='大理'where id=1060;

此时更新数据已同步到ES

更新历史数据

update liu_test_3 set name ='甜甜',city='深圳'where id=1007;update liu_test_4 set name ='甜甜',city='深圳'where id=1007;update liu_test_5 set name ='甜甜',city='深圳'where id=1007;

此时查看历史数据也已经被更新到ES中

相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
相关文章
|
2月前
|
SQL 运维 网络安全
【实践】基于Hologres+Flink搭建GitHub实时数据查询
本文介绍了如何利用Flink和Hologres构建GitHub公开事件数据的实时数仓,并对接BI工具实现数据实时分析。流程包括创建VPC、Hologres、OSS、Flink实例,配置Hologres内部表,通过Flink实时写入数据至Hologres,查询实时数据,以及清理资源等步骤。
|
9天前
|
消息中间件 JSON 数据库
探索Flink动态CEP:杭州银行的实战案例
本文由杭州银行大数据工程师唐占峰、欧阳武林撰写,介绍Flink动态CEP的定义、应用场景、技术实现及使用方式。Flink动态CEP是基于Flink的复杂事件处理库,支持在不重启服务的情况下动态更新规则,适应快速变化的业务需求。文章详细阐述了其在反洗钱、反欺诈和实时营销等金融领域的应用,并展示了某金融机构的实际应用案例。通过动态CEP,用户可以实时调整规则,提高系统的灵活性和响应速度,降低维护成本。文中还提供了具体的代码示例和技术细节,帮助读者理解和使用Flink动态CEP。
279 2
探索Flink动态CEP:杭州银行的实战案例
|
2月前
|
存储 监控 数据处理
flink 向doris 数据库写入数据时出现背压如何排查?
本文介绍了如何确定和解决Flink任务向Doris数据库写入数据时遇到的背压问题。首先通过Flink Web UI和性能指标监控识别背压,然后从Doris数据库性能、网络连接稳定性、Flink任务数据处理逻辑及资源配置等方面排查原因,并通过分析相关日志进一步定位问题。
187 61
|
12天前
|
监控 关系型数据库 MySQL
Flink CDC MySQL同步MySQL错误记录
在使用Flink CDC同步MySQL数据时,常见的错误包括连接错误、权限错误、表结构变化、数据类型不匹配、主键冲突和
55 16
|
23天前
|
流计算 开发者
【开发者评测】实时计算Flink场景实践和核心功能体验测评获奖名单公布!
【开发者评测】实时计算Flink场景实践和核心功能体验测评获奖名单公布!
|
2月前
|
运维 数据挖掘 网络安全
场景实践 | 基于Flink+Hologres搭建GitHub实时数据分析
基于Flink和Hologres构建的实时数仓方案在数据开发运维体验、成本与收益等方面均表现出色。同时,该产品还具有与其他产品联动组合的可能性,能够为企业提供更全面、更智能的数据处理和分析解决方案。
|
3月前
|
消息中间件 监控 数据可视化
实时计算Flink场景实践和核心功能体验
本文详细评测了阿里云实时计算Flink版,从产品引导、文档帮助、功能满足度等方面进行了全面分析。产品界面设计友好,文档丰富实用,数据开发和运维体验优秀,具备出色的实时性和动态扩展性。同时,提出了针对业务场景的改进建议,包括功能定制化增强、高级分析功能拓展及可视化功能提升。文章还探讨了产品与阿里云内部产品及第三方工具的联动潜力,展示了其在多云架构和跨平台应用中的广阔前景。
104 9
|
3月前
|
运维 数据可视化 数据处理
实时计算Flink场景实践和核心功能体验 评测
实时计算Flink场景实践和核心功能体验 评测
79 5
|
3月前
|
运维 监控 安全
实时计算Flink场景实践和核心功能体验
实时计算Flink场景实践和核心功能体验
|
3月前
|
运维 数据处理 Apache
数据实时计算产品对比测评报告:阿里云实时计算Flink版
数据实时计算产品对比测评报告:阿里云实时计算Flink版