[实战系列]SelectDB Cloud Flink Connector 最佳实践

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
实时计算 Flink 版,5000CU*H 3个月
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
简介: 随着云基础设施的不断完善,云原生已经成为各行业数字化转型的必选项,越来越多的应用开始进行云原生化架构升级和应用迁移。而云原生实时数仓的出现,让传统的数据仓库无论是成本、灵活性还是开放性等方面都显露出不足。拥有高性能、高可用性、可伸缩性、高安全性等特征的云原生数据库,正在成为企业的首选。SelectDB Cloud作为一款运行于多云之上的云原生实时数据仓库,可以为客户提供极简运维和极致性价比的数仓服务,为用户提供开箱即用的能力。同时,SelectDB Cloud 结合 Flink 流式计算,可以让用户将 Kafka 中的非结构化数据以及 MySQL 等上游业务库中的变更数据,实时同步到 S

概述

随着云基础设施的不断完善,云原生已经成为各行业数字化转型的必选项,越来越多的应用开始进行云原生化架构升级和应用迁移。

而云原生实时数仓的出现,让传统的数据仓库无论是成本、灵活性还是开放性等方面都显露出不足。拥有高性能、高可用性、可伸缩性、高安全性等特征的云原生数据库,正在成为企业的首选。

SelectDB Cloud作为一款运行于多云之上的云原生实时数据仓库,可以为客户提供极简运维和极致性价比的数仓服务,为用户提供开箱即用的能力。

同时,SelectDB Cloud 结合 Flink 流式计算,可以让用户将 Kafka 中的非结构化数据以及 MySQL 等上游业务库中的变更数据,实时同步到 SelectDB Cloud中,同时 SelectDB Cloud 提供亚秒级分析查询的能力,可以有效地满足实时 OLAP、实时数据看板以及实时数据服务等场景的需求。

原理

架构设计

image.png



在实时计算中,通过Flink可以将业务数据库(MySQL、SQLServer、Oracle)或Kafka消息队列等其他上游数据作为Source读取出来,经过FlinkSQL或着DataStream加工计算,最后将清洗后的数据写入。

Sink写入时使用Flink SelectDB Connector,将数据先上传到internal stage,最后通过copy into的方式将文件一次性加载到表中。



Exactly-Once

在实时写入场景中,如何确保端到端的数据一致性是经常遇到的问题,Flink SelectDB Connector基于Flink的两阶段提交,实现了端到端的Exactly-Once。


image.png


  1. 以Kafka为例,接收到上游的数据后,会持续的发送到Sink端
  2. Sink端会将数据以文件的形式周期性的写入到SelectDB Cloud的InternalStage。
  3. 当Checkpoint完成后,会将刚写入Internal Stage的文件列表一次性通过Copy Into的方式导入到Table中。
  4. Flink任务意外挂掉后,从上个Checkpoint重启,会自动Commit上次未完成的Copy任务。

使用Flink CDC同步MySQL数据到SelectDB Cloud

  1. 下载安装Flink

这里以Flink1.15版本为例

wget https://dlcdn.apache.org/flink/flink-1.15.3/flink-1.15.3-bin-scala_2.12.tgz 
tar -zxvf flink-1.15.3-bin-scala_2.12.tgz cd flink-1.15.3 
wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.3.0/flink-sql-connector-mysql-cdc-2.3.0.jar -P ./lib/ 
wget https://selectdb.s3.amazonaws.com/connector/flink-selectdb-connector-1.15-1.0.0-SNAPSHOT.jar -P ./lib/
  1. 启动Flink Standalone集群
bin/start-cluster.sh 
Starting cluster. 
Starting standalonesession daemon on host flink. 
Starting taskexecutor daemon on host flink.
  1. 在MySQL和SelectDBCloud上创建表,并且在MySQL中预先插入数据
-- 创建MySQL表
CREATE TABLE test.employees (
    emp_no      INT             NOT NULL,
    birth_date  DATE            NOT NULL,
    first_name  VARCHAR(14)     NOT NULL,
    last_name   VARCHAR(16)     NOT NULL,
    gender      ENUM ('M','F')  NOT NULL,    
    hire_date   DATE            NOT NULL,
    PRIMARY KEY (emp_no)
);
-- 插入数据
INSERT INTO test.employees VALUES (10001,'1953-09-02','Georgi','Facello','M','1986-06-26'),
(10002,'1964-06-02','Bezalel','Simmel','F','1985-11-21'),
(10003,'1959-12-03','Parto','Bamford','M','1986-08-28'),
(10004,'1954-05-01','Chirstian','Koblick','M','1986-12-01'),
(10005,'1955-01-21','Kyoichi','Maliniak','M','1989-09-12');
-- selectdb cloud中创建表
CREATE TABLE test.employees (
    emp_no       int NOT NULL,
    birth_date   date,
    first_name   varchar(20),
    last_name    varchar(20),
    gender       char(2),
    hire_date    date
)
UNIQUE KEY(`emp_no`)
DISTRIBUTED BY HASH(`emp_no`) BUCKETS 1
  1. 在Flink SQL Client上提交Flink任务
SET 'execution.checkpointing.interval' = '10s';
CREATE TABLE employees_source (
    emp_no INT,
    birth_date DATE,
    first_name STRING,
    last_name STRING,
    gender STRING,
    hire_date DATE,
    PRIMARY KEY (`emp_no`) NOT ENFORCED
) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = '127.0.0.1',
    'port' = '3306',
    'username' = 'root',
    'password' = '123456',
    'database-name' = 'test',
    'table-name' = 'employees'
);
CREATE TABLE employees_sink (
    emp_no       INT ,
    birth_date   DATE,
    first_name   STRING,
    last_name    STRING,
    gender       STRING,
    hire_date    DATE
) 
WITH (
  'connector' = 'selectdb',
  'load-url' = '127.0.0.1:36252',
  'jdbc-url' = '127.0.0.1:19846',
  'cluster-name' = 'cluster1',
  'table.identifier' = 'test.employees',
  'username' = 'admin',
  'password' = '',
  'sink.enable-delete' = 'true'
);
INSERT INTO employees_sink select * from employees_source
  1. 在MySQL中执行增删改操作
INSERT INTO test.employees VALUES (10006,'1953-04-20','Anneke','Preusig','F','1989-06-02');
UPDATE test.employees set last_name = 'update' where emp_no = 10001;
DELETE FROM test.employees WHERE emp_no = 10002;
  1. 在SelectDB Cloud中验证
mysql> select * from employees;
+--------+------------+------------+-------------+--------+------------+
| emp_no | birth_date | first_name | last_name   | gender | hire_date  |
+--------+------------+------------+-------------+--------+------------+
|  10001 | 1953-09-02 | Georgi     | update      | M      | 1986-06-26 |
|  10003 | 1959-12-03 | Parto      | Bamford     | M      | 1986-08-28 |
|  10004 | 1954-05-01 | Chirstian  | Koblick     | M      | 1986-12-01 |
|  10005 | 1955-01-21 | Kyoichi    | Maliniak    | M      | 1989-09-12 |
|  10006 | 1953-04-20 | Anneke     | Preusig     | F      | 1989-06-02 |
+--------+------------+------------+-------------+--------+------------+


相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
17天前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
52 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
2月前
|
SQL 消息中间件 关系型数据库
Apache Doris Flink Connector 24.0.0 版本正式发布
该版本新增了对 Flink 1.20 的支持,并支持通过 Arrow Flight SQL 高速读取 Doris 中数据。
|
29天前
|
运维 监控 安全
选择主题1:实时计算Flink版最佳实践测评
本文介绍了使用实时计算Flink版进行用户行为分析的实践,涵盖用户行为趋势、留存分析、用户画像构建及异常检测等方面。与自建Flink集群相比,实时计算Flink版在稳定性、性能、开发运维和安全能力上表现更优,且显著降低了企业的IT支出和运维成本,提升了业务决策效率和系统可靠性,是企业级应用的理想选择。
76 32
|
5月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用问题之selectDB是否可以做为源数据
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
29天前
|
运维 监控 安全
实时计算 Flink 版最佳实践测评
本文介绍了结合电商平台用户行为数据的实时计算Flink版实践,涵盖用户行为分析、标签画像构建、业务指标监控和数据分析预测等场景。文章还对比了实时计算Flink版与其他引擎及自建Flink集群在稳定性、性能、开发运维和安全能力方面的差异,分析了其成本与收益。最后,文章评估了实时计算Flink版的产品内引导、文档帮助、功能满足情况,并提出了针对不同业务场景的改进建议和与其他产品的联动可能性。
51 2
|
2月前
|
SQL 运维 监控
实时计算Flink版最佳实践测评报告
本报告旨在评估阿里云实时计算Flink版在实际应用中的表现,通过一系列的测试和分析来探讨其在稳定性、性能、开发运维及安全性方面的优势。同时,我们将结合具体的业务场景,如用户行为分析、标签画像构建等,来说明其实时数据处理能力,并对比自建Flink集群以及其他实时计算引擎。最后,从成本效益的角度出发,讨论采用全托管服务对企业运营的影响。
64 13
|
28天前
|
消息中间件 运维 分布式计算
实时计算Flink版最佳实践测评
本文介绍了使用阿里云实时计算Flink版进行用户行为分析的实践,详细探讨了其在性能、稳定性和成本方面的优势,以及与自建Flink集群的对比。通过实时计算,能够快速发现用户行为模式,优化产品功能,提升用户体验和市场竞争力。文章还提到了产品的易用性、功能满足度及改进建议,并与其他Flink实时计算产品进行了对比,强调了Flink在实时处理方面的优势。
|
2月前
|
存储 运维 监控
实时计算Flink版最佳实践测评
实时计算Flink版最佳实践测评
93 1
|
3月前
|
大数据 API 数据处理
揭秘!Flink如何从默默无闻到大数据界的璀璨明星?起源、设计理念与实战秘籍大公开!
【8月更文挑战第24天】Apache Flink是一款源自Stratosphere项目的开源流处理框架,由柏林理工大学等机构于2010至2014年间开发,并于2014年捐赠给Apache软件基金会。Flink设计之初即聚焦于提供统一的数据处理模型,支持事件时间处理、精确一次状态一致性等特性,实现了流批一体化处理。其核心优势包括高吞吐量、低延迟及强大的容错机制。
61 1
|
3月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用问题之如何配置Connector来保持与MySOL一致
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。

热门文章

最新文章