实战|Flink不支持分库分表的改造之路

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
简介: 实战|Flink不支持分库分表的改造之路

1、背景


在flink提供的jdbc-connector中只支持单表的数据同步,但随着业务量的增大,单表记录数过多,会导致数据查询效率降低。


为了解决单表存在的性能瓶颈,会采用分库分表。例如将订单表order拆分为1024张分表:order -> order_0000~order_1023。


显然官方默认提供的flink jdbc插件并不适用这种情况,需要我们将会对flink插件进行改造,从而支持分库分表的数据同步。


2、技术方案


2.1 flink-jdbc-connector简介


我们在创建flink jdbc同步作业时,一般是通过下面的来声明一个table。

-- 在 Flink SQL 中注册一张 MySQL 表 'users'
CREATE TABLE MyUserTable (
  id BIGINT,
  name STRING,
  age INT,
  status BOOLEAN,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://localhost:3306/mydatabase',
   'table-name' = 'users',
);

并且提供了可选配置,可以针对一个sql在指定数据固定范围内(scan.partition.lower-bound,scan.partition.upper-bound)根据拆分字段(scan.partition.column)和数量(scan.partition.num),将sql进行等步长拆分。


可选配置如下:

scan.partition.column:用于将输入进行分区的列名
scan.partition.num:分区数。
scan.partition.lower-bound:第一个分区的最小值。
scan.partition.upper-bound:最后一个分区的最大值。

例如我们预估需要通过1000条订单数据,如果不做拆分,基于flink sql的同步语句如下:

select id,name from order

如果按照id拆分成两个子任务,则sql语句如下:

select id,name from order where id between 1 and 50
select id,name from order where id between 51 and 100

上面只是为了方便举例,在真实的生产环境,同步订单表都是千万级别,将一条大SQL拆分成小任务,一方面可以减少对数据表的锁操作,降低对源端数据库的压力,另一方面可以结合flink配置的并发度,并发同步数据,增大同步效率。


基于flink-jdbc-connector数据拆分的原理如下图所示:

2e9fe49b1ed47ba5e9ba586ae24f3b37.png


2.2 数据分库分表原理探究与技术方案


flink-jdbc-connector数据拆分属性原理如下:


在flink-jdbc-connector包中提供了JdbcParameterValuesProvider接口,被JdbcInputFormat用来计算要运行的并行查询列表(即拆分)。


每个查询将使用由每个JdbcParameterValuesProvider实现提供的矩阵行进行参数化。

public interface JdbcParameterValuesProvider {
 /** Returns the necessary parameters array to use for query in parallel a table. */
 Serializable[][] getParameterValues();
}

其中getParameterValues()的返回值:Serializable[x][y]x值即为SQL拆分的数据,每个x对应的y个元素的一维数组,包含的是每个sql的变量信息,例如上述根据id进行拆分数量为2。


第一个关键点Serializable[][]的二维数组结构为:

//结构 :x=0~1
//Serializable[x] = {{id_min},{id_max}}
 Serializable[0] = {1,50}
 Serializable[1] = {51,100}

SQL模版语句如下:

select id,name from order where id between ? and ?

那么对于分表来说,其变量相当于又增加了一个table_name,这样我们只需要改动两个地方,就可以实现分表的效果:


在构建Serializable [] [] 时,新增维度:table_name,其结构如下:

//结构 :x=0~2047
//Serializable[x] = {"order_{0000~1023}",{id_min},{id_max}}
 Serializable[0] = {"order_0000",1,50}
 Serializable[1] = {"order_0000",51,100}
 Serializable[2] = {"order_0001",1,50}
 Serializable[3] = {"order_0001",51,100}
 ...
 Serializable[2047] = {"order_1023",51,100}

对应SQL的模版为:

select id,name from ${table_name} where id between ? and ?

在分表的基础上继续再推导,例如如果实现2库(10.1.1.2、10.1.1.2),4个schema(order_00~order_03),1024张表,最终拆解如下:


Serializable [] [] 存储数据格式为:

//结构:x=0~2047
//Serializable[x] = {"{db_url}","{schema_name}","order_{0000~1023}",{id_min},{id_max}}
 Serializable[0] = {"jdbc://10.1.1.2","order_00","order_0000",1,50}
 ...
 Serializable[2047] = {"jdbc://10.1.1.3","order_03","order_1023",1,50}

对应的SQL模版如下:

select id,name from {table_name} where id between {id_min} and {id_max}


2.3 方案落地


经过上面的分析,主要涉及如下5个改造点。


2.3.1 改造flink-jdbc-connection配置


主要涉及url、table-name的改造:


  • url:支持多个数据库配置,并在schema支持正则表达式动态匹配数据库中的schema
  • table-name:表名支持正则匹配,可同时匹配多个表


具体代码如下:

'url' = 'jdbc:mysql://localhost:3306/order_([0-9]{1,}),jdbc:mysql://localhost:3306/order_([0-9]{1,})',
   'table-name' = 'order_([0-9]{1,})',


2.3.2 解析URL、Table名称


主要是根据配置的url,table_name表达式,基本的编码步骤如下:


  1. 查询数据库中所有schema
  2. 通过正则匹配schema
  3. 查询匹配schema下面的table
  4. 通过正则匹配表
  5. 返回数据库url与table的对应关系:List<TableItem>


2.3.3 实现分库分表JdbcMultiTableProvider


主要基于原有数据分片结果,根据分库分表,对Serializable[][]进行二次拆分,示例代码如下:

public class JdbcMultiTableProvider implements JdbcParameterValuesProvider {
    //匹配的数据库连接与table的对应关系
 private List<TableItem> tables;
    //原jdbc数据分片配置后的拆分结果
 private Serializable[][] partition;
 public JdbcMultiTableProvider(List<TableItem> tables) {
  this.tables = tables;
 }
 /**
  * @return 返回拆分后的分片和数据块的对应关系,Serializable[partition][parameterValues]
  * 启动partition为分片索引,parameterValues为每个分片对应的数据参数。
  */
 @Override
 public Serializable[][] getParameterValues() {
  int tableNum = tables.stream().mapToInt(item -> item.getTable().size()).sum();
  int splitCount = partition == null ? tableNum : tableNum * partition.length;
  int paramLength = partition == null ? 2 : 4;
  Serializable[][] parameters = new Serializable[splitCount][paramLength];
  int splitIndex = 0;
  for (TableItem tableItem : tables) {
   for (String table : tableItem.getTable()) {
    if (partition != null) {
     for (Serializable[] serializables : partition) {
      parameters[splitIndex][0] = tableItem.getUrl();
      parameters[splitIndex][1] = table;
      //数据分片配置
      parameters[splitIndex][2] = serializables[0];
      parameters[splitIndex][3] = serializables[1];
      splitIndex++;
     }
    } else {
     parameters[splitIndex][0] = tableItem.getUrl();
     parameters[splitIndex][1] = table;
     splitIndex++;
    }
   }
  }
  return parameters;
 }
 public JdbcParameterValuesProvider withPartition(JdbcNumericBetweenParametersProvider jdbcNumericBetweenParametersProvider) {
  if (null == jdbcNumericBetweenParametersProvider) {
   return this;
  }
  this.partition = jdbcNumericBetweenParametersProvider.getParameterValues();
  return this;
 }
 public static class TableItem {
  private String url;
  private List<String> table;
        //get/set..
 }
}


2.3.4 改造JdbcDynamicTableSource


主要目的生成基于分库分表的JdbcRowDataInputFormat对象,示例代码如下:

@Override
 @SuppressWarnings("unchecked")
 public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
  final JdbcRowDataInputFormat.Builder builder = JdbcRowDataInputFormat.builder()
   .setDrivername(options.getDriverName())
   .setDBUrl(options.getDbURL())
   .setUsername(options.getUsername().orElse(null))
   .setPassword(options.getPassword().orElse(null));
  if (readOptions.getFetchSize() != 0) {
   builder.setFetchSize(readOptions.getFetchSize());
  }
  final JdbcDialect dialect = options.getDialect();
        JdbcNumericBetweenParametersProvider jdbcNumericBetweenParametersProvider = null;
  //数据分片配置
  if (readOptions.getPartitionColumnName().isPresent()) {
   long lowerBound = readOptions.getPartitionLowerBound().get();
   long upperBound = readOptions.getPartitionUpperBound().get();
   int numPartitions = readOptions.getNumPartitions().get();
   jdbcNumericBetweenParametersProvider = new JdbcNumericBetweenParametersProvider(lowerBound, upperBound).ofBatchNum(numPartitions);
  }
        //根据table分片
  List<TableItem>  tableItems = options.getTables();
  builder.setParametersProvider(new JdbcMultiTableProvider(tableItems)
    .withPartition(jdbcNumericBetweenParametersProvider, physicalSchema, readOptions.getPartitionColumnName().orElse(null)));
  final RowType rowType = (RowType) physicalSchema.toRowDataType().getLogicalType();
  builder.setRowConverter(dialect.getRowConverter(rowType));
  builder.setRowDataTypeInfo((TypeInformation<RowData>) runtimeProviderContext
   .createTypeInformation(physicalSchema.toRowDataType()));
  return InputFormatProvider.of(builder.build());
 }


2.3.5 改造JdbcRowDataInputFormat


在JdbcRowDataInputFormat的open(InputSplit inputSplit)中,初始化Connection、statement、以及sql查询模板。


JdbcRowDataInputFormat整个生命周期中,每个并行实例调用一次openInputFormat(),并对应关闭当前并行实例的方法:closeInputFormat())。


每次切换分片,都会调用一次open(InputSplit inputSplit)(对应关闭当前数据分片方法:close()),inputSplit的值对应Serializable[x][y]中x的值递增,并且每个并行实例不会重复执行,比如有1024个分表,每个表2个数据分片,那么inputSplit.getSplitNumber()值的范围是:[0~2047]。JdbcRowDataInputFormat对象持有Serializable[ x ] [y ],并且根据open(InputSplit inputSplit)来定位当前JdbcRowDataInputFormat处理对应分区的数据,从而达到数据分区根据并发度,并发查询的效果。


示例代码如下:

@Override
public void open(InputSplit inputSplit) throws IOException {
  try {
   //分库,分表逻辑
   Object[] params = parameterValues[inputSplit.getSplitNumber()];
   //初始化数据库连接,url= params[0].toString();
   initConnect(params);
   String url = params[0].toString();
   final JdbcDialect dialect = RdbsDialects.get(url).get();
   //数据查询模板,String table = params[1].toString();
   String queryTemplate = queryTemplate(params, dialect);
   statement = dbConn.prepareStatement(queryTemplate, resultSetType, resultSetConcurrency);
   if (inputSplit != null && parameterValues != null) {
   //从index=2 开始为数据分片配置
    for (int i = 2; i < parameterValues[inputSplit.getSplitNumber()].length; i++) {
     Object param = parameterValues[inputSplit.getSplitNumber()][i];
     if (param instanceof String) {
      statement.setString(i - 1, (String) param);
     } else if (param instanceof Long) {
      statement.setLong(i - 1, (Long) param);
     } else if (param instanceof Integer) {
      statement.setInt(i - 1, (Integer) param);
     ...
      //extends with other types if needed
      throw new IllegalArgumentException("open() failed. Parameter " + i + " of type " + param.getClass() + " is not handled (yet).");
     }
    }
    if (LOG.isDebugEnabled()) {
     LOG.debug(String.format("Executing '%s' with parameters %s", queryTemplate, Arrays.deepToString(parameterValues[inputSplit.getSplitNumber()])));
    }
   }
   resultSet = statement.executeQuery();
   hasNext = resultSet.next();
  } catch (SQLException se) {
   throw new IllegalArgumentException("open() failed." + se.getMessage(), se);
  }
}

基于上述步骤改造后,就可以实现从flink-jdbc-connector source端单库单表,到分库分表的改造。


相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
8月前
|
SQL Java 关系型数据库
Flink DataSet API迁移到DataStream API实战
本文介绍了作者的Flink项目从DataSet API迁移到DataStream API的背景、方法和遇到的问题以及解决方案。
227 3
|
2月前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
199 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
25天前
|
消息中间件 JSON 数据库
探索Flink动态CEP:杭州银行的实战案例
本文由杭州银行大数据工程师唐占峰、欧阳武林撰写,介绍Flink动态CEP的定义、应用场景、技术实现及使用方式。Flink动态CEP是基于Flink的复杂事件处理库,支持在不重启服务的情况下动态更新规则,适应快速变化的业务需求。文章详细阐述了其在反洗钱、反欺诈和实时营销等金融领域的应用,并展示了某金融机构的实际应用案例。通过动态CEP,用户可以实时调整规则,提高系统的灵活性和响应速度,降低维护成本。文中还提供了具体的代码示例和技术细节,帮助读者理解和使用Flink动态CEP。
396 2
探索Flink动态CEP:杭州银行的实战案例
|
8月前
|
NoSQL 关系型数据库 MySQL
实时计算 Flink版操作报错之同步MySQL分库分表500张表报连接超时,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
5月前
|
大数据 API 数据处理
揭秘!Flink如何从默默无闻到大数据界的璀璨明星?起源、设计理念与实战秘籍大公开!
【8月更文挑战第24天】Apache Flink是一款源自Stratosphere项目的开源流处理框架,由柏林理工大学等机构于2010至2014年间开发,并于2014年捐赠给Apache软件基金会。Flink设计之初即聚焦于提供统一的数据处理模型,支持事件时间处理、精确一次状态一致性等特性,实现了流批一体化处理。其核心优势包括高吞吐量、低延迟及强大的容错机制。
100 1
|
5月前
|
API C# Shell
WPF与Windows Shell完美融合:深入解析文件系统操作技巧——从基本文件管理到高级Shell功能调用,全面掌握WPF中的文件处理艺术
【8月更文挑战第31天】Windows Presentation Foundation (WPF) 是 .NET Framework 的关键组件,用于构建 Windows 桌面应用程序。WPF 提供了丰富的功能来创建美观且功能强大的用户界面。本文通过问题解答的形式,探讨了如何在 WPF 应用中集成 Windows Shell 功能,并通过具体示例代码展示了文件系统的操作方法,包括列出目录下的所有文件、创建和删除文件、移动和复制文件以及打开文件夹或文件等。
112 0
|
7月前
|
SQL 关系型数据库 数据处理
实时计算 Flink版产品使用问题之在同步数据时,是否可以使用正则表达式匹配进行源表到目标表的分库分表同步
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
8月前
|
传感器 存储 缓存
[尚硅谷flink学习笔记] 实战案例TopN 问题
这段内容是关于如何使用Apache Flink解决实时统计水位传感器数据中,在一定时间窗口内出现次数最多的水位问题,即&quot;Top N&quot;问题。首先,介绍了一个使用滑动窗口的简单实现,通过收集传感器数据,按照水位计数,然后排序并输出前两名。接着,提出了全窗口和优化方案,其中优化包括按键分区(按水位vc分组)、开窗操作(增量聚合计算count)和过程函数处理(聚合并排序输出Top N结果)。最后,给出了一个使用`KeyedProcessFunction`进行优化的示例代码,通过按键by窗口结束时间,确保每个窗口的所有数据到达后再进行处理,提高了效率。
209 1
|
Java 程序员 网络安全
Flink处理函数实战之四:窗口处理
学习Flink低阶处理函数中的ProcessAllWindowFunction和ProcessWindowFunction
146 0
Flink处理函数实战之四:窗口处理
|
8月前
|
存储 缓存 数据处理
Flink 2.0 状态存算分离改造实践
本文整理自阿里云智能 Flink 存储引擎团队兰兆千在 FFA 2023 核心技术(一)中 的分享,内容关于 Flink 2.0 状态存算分离改造实践的研究。
644 1
Flink 2.0 状态存算分离改造实践