Flink同步RDS数据到Elasticsearch实践

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
实时计算 Flink 版,5000CU*H 3个月
简介: Flink同步RDS数据到Elasticsearch实践

实验说明

通过本节实验可以掌握使用Flink SQL同步RDS MySQL数据到阿里云Elasticsearch基本步骤和实现逻辑,并了解

ES数据新增(Insert)与更新(update)的实践结果。

实验环境

1. 准备环境

① RDS MySQL实例5.7.0

② 阿里云ES实例6.7.0

③ Fllink实例

④ MySQL Catalog

2. 准备数据

MySQL建表语句

CREATETABLE `liu_test_3` (`id` bigint(20)NOTNULL COMMENT '序号',`name` char(50)NOTNULL COMMENT '姓名',`age` int(20) DEFAULT NULL COMMENT '年龄',`gender` char(50) DEFAULT NULL COMMENT '性别',`city` char(50) DEFAULT NULL COMMENT '所在地',PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='小范围内户籍信息统计';

MySQL导入数据

INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1001,'大山',27,'male','苏州');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1002,'小二',23,'male','北京');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1003,'杉杉',25,'female','上海');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1004,'小四',27,'male','成都');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1005,'小五',21,'male','西安');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1006,'小六',22,'male','郑州');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1007,'琪琪',25,'female','洛阳');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1008,'巴哥',29,'male','深圳');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1009,'九儿',27,'female','广州');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1010,'杉杉',25,'female','广州');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1011,'琪琪',26,'female','洛阳');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1012,'仔仔',24,'male','苏州');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1013,'娜娜',23,'male','上海');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1014,'萧山',27,'female','成都');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1015,'大山',27,'male','苏州');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1016,'小二',23,'male','北京');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1017,'杉杉',25,'female','上海');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1018,'小四',27,'male','成都');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1019,'小五',21,'male','西安');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1020,'小六',22,'male','郑州');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1021,'琪琪',25,'female','洛阳');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1022,'巴哥',29,'male','深圳');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1023,'九儿',27,'female','广州');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1024,'杉杉',25,'female','广州');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1025,'琪琪',26,'female','洛阳');INSERTINTO liu_test_5 (id,name,age,gender,city)VALUES(1026,'仔仔',24,'male','苏州');

ES索引创建

PUT /dev_flink_invoice_detail
{
"settings":{
"number_of_shards":5,
"number_of_replicas":1
},
"mappings":{
"products":{
"properties":{
"productName":{
"type":"text",
"analyzer":"ik_smart"
},
"annual_rate":{
"type":"keyword"
},
"describe":{
"type":"text",
"analyzer":"ik_smart"
}
}
}
}
}

Flink SQL作业

CREATE TEMPORARY TABLE es_sink (id BIGINT,name string,age BIGINT,gender string,city string,PRIMARY KEY (id)NOT ENFORCED -- 主键可选,如果定义了主键,则作为文档ID,否则文档ID将为随机值。)WITH
('connector'='elasticsearch-6','hosts'='es-cn-zvp*****qkh.elasticsearch.aliyuncs.com:9200','index'='dev_test_flink_invoices','document-type'='_doc','username'='elastic','password'='******','document-id.key-delimiter'='$',-- 可选参数, 复合主键的连接字符 (默认是 _ 符号)'retry-on-conflict'='1000');CREATE TEMPORARY TABLE es_sink_hwmcs (id BIGINT,name string,PRIMARY KEY (id)NOT ENFORCED -- 主键可选,如果定义了主键,则作为文档ID,否则文档ID将为随机值。)WITH
('connector'='elasticsearch-6','hosts'='es-cn-zvp*****qkh.elasticsearch.aliyuncs.com:9200','index'='dev_test_flink_invoices','document-type'='_doc','username'='elastic','password'='******','document-id.key-delimiter'='$',-- 可选参数, 复合主键的连接字符 (默认是 _ 符号)'retry-on-conflict'='1000');CREATE TEMPORARY TABLE invoice_detail_sink (id BIGINT,name string,age BIGINT,gender string,city string,PRIMARY KEY (id)NOT ENFORCED -- 主键可选,如果定义了主键,则作为文档ID,否则文档ID将为随机值。)WITH
('connector'='elasticsearch-6','hosts'='es-cn-zvp*****qkh.elasticsearch.aliyuncs.com:9200','index'='dev_flink_invoice_detail','document-type'='_doc','username'='elastic','password'='******','document-id.key-delimiter'='$',-- 可选参数, 复合主键的连接字符 (默认是 _ 符号)'retry-on-conflict'='1000');BEGIN STATEMENT SET;INSERTINTO es_sink
SELECTc.id,c.name,c.age,c.gender,c.cityfrom `liutest`.`liuhao`.`liu_test_3` /*+ OPTIONS('server-id'='5400') */ c
left join `liutest`.`liuhao`.`liu_test_4` /*+ OPTIONS('server-id'='5401') */ i
on c.id=i.idwherec.age=26;INSERTINTO es_sink_hwmcs
SELECT c.idAS id,LISTAGG (d.name)AS name
from`liutest`.`liuhao`.`liu_test_3` /*+ OPTIONS('server-id'='5402') */ c
left join `liutest`.`liuhao`.`liu_test_5` /*+ OPTIONS('server-id'='5403') */ d
on c.id= d.idwherec.age=25groupby c.id;INSERTINTO invoice_detail_sink
select*from`liutest`.`liuhao`.`liu_test_5` /*+ OPTIONS('server-id'='5404') */;END;


实验结果

1. 业务测试

启动Flink作业

此时RDS数据已经写入到ES索引中

插入新的数据

insertinto liu_test_3 (id,name,age,gender,city)values(1060,'娜娜',25,'female','广东');insertinto liu_test_4 (id,name,age,gender,city)values(1060,'娜娜',25,'female','广东');insertinto liu_test_5 (id,name,age,gender,city)values(1060,'娜娜',25,'female','广东');

此时数据写入到ES

更新插入的数据

update liu_test_3 set name ='小小',city='大理'where id=1060;update liu_test_4 set name ='小小',city='大理'where id=1060;update liu_test_5 set name ='小小',city='大理'where id=1060;

此时更新数据已同步到ES

更新历史数据

update liu_test_3 set name ='甜甜',city='深圳'where id=1007;update liu_test_4 set name ='甜甜',city='深圳'where id=1007;update liu_test_5 set name ='甜甜',city='深圳'where id=1007;

此时查看历史数据也已经被更新到ES中

相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
相关文章
|
21小时前
|
Oracle 关系型数据库 MySQL
实时计算 Flink版操作报错合集之用CTAS从mysql同步数据到hologres,改了字段长度,报错提示需要全部重新同步如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
21 8
|
22小时前
|
SQL Java 关系型数据库
实时计算 Flink版操作报错合集之通过flink sql形式同步数据到hudi中,本地启动mian方法报错如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
17 8
|
1天前
|
SQL 关系型数据库 MySQL
实时计算 Flink版操作报错合集之程序初始化mysql没有完成就报错如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
12 4
|
1天前
|
关系型数据库 MySQL 数据库
实时计算 Flink版操作报错合集之sqlserver mysql都用的胖包,sqlserver的成功了,mysql报这个错如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
15 6
|
1天前
|
SQL Oracle 关系型数据库
实时计算 Flink版操作报错合集之连接器换成2.4.2之后,mysql作业一直报错如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
12 3
|
1天前
|
SQL Java 网络安全
实时计算 Flink版操作报错合集之SQLserver表没有主键,同步的时候报错如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
8 1
|
1天前
|
SQL 资源调度 关系型数据库
实时计算 Flink版产品使用合集之在抓取 MySQL binlog 数据时,datetime 字段会被自动转换为时间戳形式如何解决
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
8 2
|
1天前
|
SQL 关系型数据库 数据处理
实时计算 Flink版产品使用合集之支持在同步全量数据时使用checkpoint吗
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
13 2
|
1天前
|
消息中间件 关系型数据库 MySQL
实时计算 Flink版操作报错合集之遇到报错:Apache Kafka Connect错误如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
12 5
|
1天前
|
SQL 关系型数据库 MySQL
实时计算 Flink版操作报错合集之报错:org.apache.flink.table.api.validationexception如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
8 1