Flink同步RDS数据到Elasticsearch实践

简介: Flink同步RDS数据到Elasticsearch实践

实验说明

通过本节实验可以掌握使用Flink SQL同步RDS MySQL数据到阿里云Elasticsearch基本步骤和实现逻辑,并了解

ES数据新增(Insert)与更新(update)的实践结果。

实验环境

1. 准备环境

① RDS MySQL实例5.7.0

② 阿里云ES实例6.7.0

③ Fllink实例

④ MySQL Catalog

2. 准备数据

MySQL建表语句

CREATETABLE `liu_test_3` (`id` bigint(20)NOTNULL COMMENT '序号',`name` char(50)NOTNULL COMMENT '姓名',`age` int(20) DEFAULT NULL COMMENT '年龄',`gender` char(50) DEFAULT NULL COMMENT '性别',`city` char(50) DEFAULT NULL COMMENT '所在地',PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='小范围内户籍信息统计';

MySQL导入数据

INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1001,'大山',27,'male','苏州');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1002,'小二',23,'male','北京');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1003,'杉杉',25,'female','上海');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1004,'小四',27,'male','成都');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1005,'小五',21,'male','西安');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1006,'小六',22,'male','郑州');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1007,'琪琪',25,'female','洛阳');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1008,'巴哥',29,'male','深圳');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1009,'九儿',27,'female','广州');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1010,'杉杉',25,'female','广州');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1011,'琪琪',26,'female','洛阳');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1012,'仔仔',24,'male','苏州');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1013,'娜娜',23,'male','上海');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1014,'萧山',27,'female','成都');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1015,'大山',27,'male','苏州');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1016,'小二',23,'male','北京');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1017,'杉杉',25,'female','上海');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1018,'小四',27,'male','成都');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1019,'小五',21,'male','西安');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1020,'小六',22,'male','郑州');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1021,'琪琪',25,'female','洛阳');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1022,'巴哥',29,'male','深圳');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1023,'九儿',27,'female','广州');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1024,'杉杉',25,'female','广州');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1025,'琪琪',26,'female','洛阳');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1026,'仔仔',24,'male','苏州');

ES索引创建

PUT /dev_flink_invoice_detail
{
"settings":{
"number_of_shards":5,
"number_of_replicas":1
},
"mappings":{
"products":{
"properties":{
"productName":{
"type":"text",
"analyzer":"ik_smart"
},
"annual_rate":{
"type":"keyword"
},
"describe":{
"type":"text",
"analyzer":"ik_smart"
}
}
}
}
}

Flink SQL作业

CREATE TEMPORARY TABLE es_sink (id BIGINT,name string,age BIGINT,gender string,city string,PRIMARY KEY (id)NOT ENFORCED -- 主键可选,如果定义了主键,则作为文档ID,否则文档ID将为随机值。)WITH
('connector'='elasticsearch-6','hosts'='es-cn-zvp*****qkh.elasticsearch.aliyuncs.com:9200','index'='dev_test_flink_invoices','document-type'='_doc','username'='elastic','password'='******','document-id.key-delimiter'='$',-- 可选参数, 复合主键的连接字符 (默认是 _ 符号)'retry-on-conflict'='1000');CREATE TEMPORARY TABLE es_sink_hwmcs (id BIGINT,name string,PRIMARY KEY (id)NOT ENFORCED -- 主键可选,如果定义了主键,则作为文档ID,否则文档ID将为随机值。)WITH
('connector'='elasticsearch-6','hosts'='es-cn-zvp*****qkh.elasticsearch.aliyuncs.com:9200','index'='dev_test_flink_invoices','document-type'='_doc','username'='elastic','password'='******','document-id.key-delimiter'='$',-- 可选参数, 复合主键的连接字符 (默认是 _ 符号)'retry-on-conflict'='1000');CREATE TEMPORARY TABLE invoice_detail_sink (id BIGINT,name string,age BIGINT,gender string,city string,PRIMARY KEY (id)NOT ENFORCED -- 主键可选,如果定义了主键,则作为文档ID,否则文档ID将为随机值。)WITH
('connector'='elasticsearch-6','hosts'='es-cn-zvp*****qkh.elasticsearch.aliyuncs.com:9200','index'='dev_flink_invoice_detail','document-type'='_doc','username'='elastic','password'='******','document-id.key-delimiter'='$',-- 可选参数, 复合主键的连接字符 (默认是 _ 符号)'retry-on-conflict'='1000');BEGIN STATEMENT SET;INSERTINTO es_sink
SELECTc.id,c.name,c.age,c.gender,c.cityfrom `liutest`.`liuhao`.`liu_test_3` /*+ OPTIONS('server-id'='5400') */ c
left join `liutest`.`liuhao`.`liu_test_4` /*+ OPTIONS('server-id'='5401') */ i
on c.id=i.idwherec.age=26;INSERTINTO es_sink_hwmcs
SELECT c.idAS id,LISTAGG (d.name)AS name
from`liutest`.`liuhao`.`liu_test_3` /*+ OPTIONS('server-id'='5402') */ c
left join `liutest`.`liuhao`.`liu_test_5` /*+ OPTIONS('server-id'='5403') */ d
on c.id= d.idwherec.age=25groupby c.id;INSERTINTO invoice_detail_sink
select*from`liutest`.`liuhao`.`liu_test_5` /*+ OPTIONS('server-id'='5404') */;END;


实验结果

1. 业务测试

启动Flink作业

此时RDS数据已经写入到ES索引中

插入新的数据

insertinto liu_test_3 (id,name,age,gender,city)values(1060,'娜娜',25,'female','广东');insertinto liu_test_4 (id,name,age,gender,city)values(1060,'娜娜',25,'female','广东');insertinto liu_test_5 (id,name,age,gender,city)values(1060,'娜娜',25,'female','广东');

此时数据写入到ES

更新插入的数据

update liu_test_3 set name ='小小',city='大理'where id=1060;update liu_test_4 set name ='小小',city='大理'where id=1060;update liu_test_5 set name ='小小',city='大理'where id=1060;

此时更新数据已同步到ES

更新历史数据

update liu_test_3 set name ='甜甜',city='深圳'where id=1007;update liu_test_4 set name ='甜甜',city='深圳'where id=1007;update liu_test_5 set name ='甜甜',city='深圳'where id=1007;

此时查看历史数据也已经被更新到ES中

相关实践学习
以电商场景为例搭建AI语义搜索应用
本实验旨在通过阿里云Elasticsearch结合阿里云搜索开发工作台AI模型服务,构建一个高效、精准的语义搜索系统,模拟电商场景,深入理解AI搜索技术原理并掌握其实现过程。
ElasticSearch 最新快速入门教程
本课程由千锋教育提供。全文搜索的需求非常大。而开源的解决办法Elasricsearch(Elastic)就是一个非常好的工具。目前是全文搜索引擎的首选。本系列教程由浅入深讲解了在CentOS7系统下如何搭建ElasticSearch,如何使用Kibana实现各种方式的搜索并详细分析了搜索的原理,最后讲解了在Java应用中如何集成ElasticSearch并实现搜索。  
相关文章
|
安全 关系型数据库 MySQL
如何将数据从MySQL同步到其他系统
【10月更文挑战第17天】如何将数据从MySQL同步到其他系统
1726 0
|
2月前
|
NoSQL 算法 Redis
【Docker】(3)学习Docker中 镜像与容器数据卷、映射关系!手把手带你安装 MySql主从同步 和 Redis三主三从集群!并且进行主从切换与扩容操作,还有分析 哈希分区 等知识点!
Union文件系统(UnionFS)是一种**分层、轻量级并且高性能的文件系统**,它支持对文件系统的修改作为一次提交来一层层的叠加,同时可以将不同目录挂载到同一个虚拟文件系统下(unite several directories into a single virtual filesystem) Union 文件系统是 Docker 镜像的基础。 镜像可以通过分层来进行继承,基于基础镜像(没有父镜像),可以制作各种具体的应用镜像。
513 5
|
9月前
|
关系型数据库 MySQL Shell
MySQL 备份 Shell 脚本:支持远程同步与阿里云 OSS 备份
一款自动化 MySQL 备份 Shell 脚本,支持本地存储、远程服务器同步(SSH+rsync)、阿里云 OSS 备份,并自动清理过期备份。适用于数据库管理员和开发者,帮助确保数据安全。
|
消息中间件 关系型数据库 Kafka
一种小资源情况下RDS数据实时同步StarRocks方案
使用一台4C8 G服务器轻松实现2个MySQL实例中通过负责分库分表规则之后的5000多张表的数据实时同步到StarRocks
568 67
|
监控 关系型数据库 MySQL
Flink CDC MySQL同步MySQL错误记录
在使用Flink CDC同步MySQL数据时,常见的错误包括连接错误、权限错误、表结构变化、数据类型不匹配、主键冲突和
487 17
|
弹性计算 运维 Serverless
超值选择:阿里云Elasticsearch Serverless在企业数据检索与分析中的高性能与灵活性
本文介绍了阿里云Elasticsearch Serverless服务的高性价比与高度弹性灵活性。
524 8
|
Web App开发 JavaScript Java
elasticsearch学习五:springboot整合 rest 操作elasticsearch的 实际案例操作,编写搜索的前后端,爬取京东数据到elasticsearch中。
这篇文章是关于如何使用Spring Boot整合Elasticsearch,并通过REST客户端操作Elasticsearch,实现一个简单的搜索前后端,以及如何爬取京东数据到Elasticsearch的案例教程。
874 0
elasticsearch学习五:springboot整合 rest 操作elasticsearch的 实际案例操作,编写搜索的前后端,爬取京东数据到elasticsearch中。
|
SQL 存储 关系型数据库
Mysql主从同步 清理二进制日志的技巧
Mysql主从同步 清理二进制日志的技巧
173 1
|
消息中间件 监控 关系型数据库
MySQL数据实时同步到Elasticsearch:技术深度解析与实践分享
在当今的数据驱动时代,实时数据同步成为许多应用系统的核心需求之一。MySQL作为关系型数据库的代表,以其强大的事务处理能力和数据完整性保障,广泛应用于各种业务场景中。然而,随着数据量的增长和查询复杂度的提升,单一依赖MySQL进行高效的数据检索和分析变得日益困难。这时,Elasticsearch(简称ES)以其卓越的搜索性能、灵活的数据模式以及强大的可扩展性,成为处理复杂查询需求的理想选择。本文将深入探讨MySQL数据实时同步到Elasticsearch的技术实现与最佳实践。
657 0
|
2月前
|
SQL 关系型数据库 MySQL
阿里云RDS云数据库全解析:产品功能、收费标准与活动参考
与云服务器ECS一样,关系型数据库RDS也是很多用户上云必买的热门云产品之一,阿里云的云数据库RDS主要包含RDS MySQL、RDS SQL Server、RDS PostgreSQL、RDS MariaDB等几个关系型数据库,并且提供了容灾、备份、恢复、监控、迁移等方面的全套解决方案,帮助您解决数据库运维的烦恼。本文为大家介绍阿里云的云数据库 RDS主要产品及计费方式、收费标准以及活动等相关情况,以供参考。

推荐镜像

更多