使用 Canal 向 Tablestore 导入数据

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: 可以使用 Canal 将数据传输进入 Tablestore。需要部署两部分内容,首先部署 canal.deployer,deployer 负责从上游拉取 binlog 数据,记录位点等。然后再部署 canal.adapter 包,这个服务负责对接 deployer 解析过的数据,并且将数据传输到下游数据库中,在本文中即 Tablestore 数据库。链路如图。Deployer部署部署步骤首先部署 

可以使用 Canal 将数据传输进入 Tablestore。


需要部署两部分内容,首先部署 canal.deployer,deployer 负责从上游拉取 binlog 数据,记录位点等。然后再部署 canal.adapter 包,这个服务负责对接 deployer 解析过的数据,并且将数据传输到下游数据库中,在本文中即 Tablestore 数据库。链路如图。


Deployer部署

部署步骤

首先部署 canal.deployer 包,这个包里面不存在任何专门为 Tablestore 定制的 jar 包或者配置,因此部署方式也和使用 Canal 对接 MySQL 数据库时的部署方式相同。部署 Canal Deployer 的具体步骤可以参考 Canal 官方文档的 QuickStart。大致步骤如下:

  1. 开启 MySQL binlog 功能,并且配置 binlog-format 为 ROW 模式。

  2. 在 Canal 官方 release页,下载 canal.deployer 包。

  3. 将包解压,可以看到项目路径下的 bin、conf、plugin、lib文件夹

  4. 修改 conf 路径下的配置文件

  5. 通过 bin 路径下的脚本启动项目

配置说明

对部署步骤中的第 4 步做一个更具体的说明。无论下游目标库为 MySQL 还是 Tablestore,canal.deployer 部分的配置文件配置方式并没有什么区别。可以参考 Canal 官方文档 QuickStart。本文再进行下更细致的说明。

canal.properties配置

需要关注的配置文件有两处,第一处为 conf 路径下的 canal.properties。在 canal.properties 中,其余配置使用默认项即可,只需要修改

canal.destinations = #{destinations}   // 默认值为example,填入给当前canal实例的命名即可

instance.properties配置

另一处需要关注的配置为 conf 路径下的#{destinations}/instance.properties,destinations为当前 canal 实例名,即 canal.properties 配置文件中的 canal.destinations 字段。假设 canal.destinations 的值为 test_ots,那么需要在conf 路径下创建 test_ots 文件夹,并且将 conf/example/instance.properties 该文件复制到 conf/test_ots/ 路径下。然后修改该配置文件。

instance.properties 中需要关注的参数如下。

参数

示例值

说明

canal.instance.master.address

host:port

数据库域名端口

canal.instance.rds.accesskey

***

本文 MySQL 为阿里云产品 RDS,需填入对应accessKey。若非 RDS 库,此项不用填写。

canal.instance.rds.secretkey

***

本文 MySQL 为阿里云产品 RDS,需填入对应secretkey。若非 RDS 库,此项不用填写。

canal.instance.rds.instanceId

rm-bp15p0713f7z6

本文 MySQL 为阿里云产品 RDS,需填入对应示例 id。若非 RDS 库,此项不用填写。

canal.instance.dbUsername

***

数据库账号用户名

canal.instance.dbPassword

***

数据库账号密码

canal.instance.filter.regex

.*\\..*

Canal 实例关注的表。通过正则表达式匹配。

这里匹配所有库下的所有表

canal.destinations

test_ots

canal 的实例名称,需要配置文件所在上层路径相同,本例路径为 conf/test_ots /instance.properties,那么实例名为test_ots


ClientAdapter部署

部署步骤

常规步骤

canal.adapter的部署方式与常规部署对接下游 MySQL 时略有不同。详细的部署步骤见 ClientAdapter。这里也做一下针对 Tablestore 的详细说明。大致步骤:

  1. 在 Canal 官方 release页,下载 canal.adapter 包。

  2. 将包解压后,可以看到项目路径下的 bin、conf、plugin、lib文件夹。

  3. 配置 conf 路径下的配置文件。

  4. 然后就可以通过 bin 路径下的启动脚本启动。

在第 2 步中,在plugin路径下,如果存在名字以 client-adapter.tablestore 开头的 jar 包,说明部署包中已经包含了 Canal 对接 Tablestore 部分的适配器代码。可以直接使用该部署包。如果不存在名字以 client-adapter.tablestore 开头的 jar 包,则需要使用额外提供的部署包进行部署。


异常处理

若plugin路径下不存在以 client-adapter.tablestore 开头的 jar 包。需要执行以下步骤

  1. 拉取canal-adapter下的 zip 包。用此 zip 包替代从 release页下载 canal.adapter 包。

  2. 执行部署步骤中的 2、3、4 步,此时可以在 plugin 路径下看到名字以 client-adapter.tablestore 开头的 jar 包。

配置说明

部署步骤中的第 4 步修改配置文件,需要关心两处配置文件修改。

application.yml配置

首先是配置文件 conf/application.yml。这个文件中的配置项说明可以参考官方文档 ClientAdapter。另外,在 Tablestore 对接中存在一些独有的配置项,在下表中进行了说明。

参数

示例值

说明

是否必填

canal.conf:

canalAdapters:instance

test_ots

与Depolyer中的destinations保持一致

canal.conf:

canalAdapters:outerAdapters:

-name:

tablestore

定义适配器类型,填入 tablestore,说明此适配器下游写入 Tablestore 库。

canal.conf:

canalAdapters:outerAdapters: properties:tablestore.endpoint

                    Tablestore endpoint

canal.conf:

canalAdapters:outerAdapters: properties:tablestore.accessSecretId

****

AccessSecretId

canal.conf:

canalAdapters:outerAdapters: properties:tablestore.accessSecretKey

****

AccessSecretKey

canal.conf:

canalAdapters:outerAdapters: properties:tablestore.instanceName

test-2009

Tablestore 中的 InstanceName

canal.conf: terminateOnException

true

默认为false。若配置为true,则若数据同步重试后仍失败,程序会暂停实时同步任务,等待用户手动处理

完整 application.yml 配置如下

server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null

canal.conf:
  mode: tcp #tcp kafka rocketMQ rabbitMQ
  flatMessage: true
  zookeeperHosts:
  syncBatchSize: 1000
  retries: 3
  timeout:
  accessKey:
  secretKey:
  terminateOnException: true
  consumerProperties:
    # canal tcp consumer
    canal.tcp.server.host: 127.0.0.1:11111
    canal.tcp.zookeeper.hosts:
    canal.tcp.batch.size: 500
    canal.tcp.username:
    canal.tcp.password:

  srcDataSources:
    defaultDS:
      url: jdbc:mysql://rm-bp15po.mysql.rds.aliyuncs.com:3306/test_ots?useUnicode=true
      username: ****
      password: ****
  canalAdapters:
  - instance: test_ots # canal instance Name or mq topic name
    groups:
    - groupId: g1
      outerAdapters:
      - name: logger
      - name: tablestore
        key: ts
        properties:
          tablestore.endpoint: https://test-2009.cn-hangzhou.ots.aliyuncs.com
          tablestore.accessSecretId: ****
          tablestore.accessSecretKey: ****
          tablestore.instanceName: test-2009

conf/tablestore路径下配置

然后需要关注 conf/tablestore 路径下的配置文件。如果没有该路径,需要手动创建 conf/tablestore 这个路径。

在 conf/tablestore 路径下,创建一个 yml 文件,文件名自定,填入以下格式的内容。然后根据自己的项目配置修改配置文件。

dataSourceKey: defaultDS
destination: test_ots
groupId: g1
outerAdapterKey: ts
threads: 8
updateChangeColumns: false
dbMapping:
  database: test_ots
  table: order_contract_canal
  targetTable: canal_target_order
  targetPk:
    oId: oId
  targetColumns:
    oId:
    create_time: createTime$string
    pay_time: $string
    update_time: updateTime
  etlCondition: 
  commitBatch: 200 # 批量提交的大小

其中各参数含义见表。

参数

说明

是否必填

dataSourceKey

该任务的源数据库标识,在 application.yml 中 srcDataSources 下可以找到该标识对应的数据库。

destination

canal 实例名,与 application.yml 下的

canal.conf: canalAdapters:instance参数相同

groupId

分组 id,MQ 模式下使用,这里不关心,配置成 application.yml 中 canalAdapters 中相同即可。

outerAdapterKey

使用的 Adapter 标识,应与 application.yml 中 outerAdapters 下的 key 值相同。

threads

筒数量,默认为 1,对应 tablestorewriter 中的 bucket 数量。

dbMapping.database

源库名

dbMapping.table

源表名

dbMapping.targetTable

目标表

dbMapping.targetPk

主键配置,格式如下:

id: target_id 源表主键:目标表主键。多主键可以配置多个,多主键配置顺序需要与 Tablestore 中的主键顺序相同。

dbMapping.targetColumns

配置需要同步的列名,以及列映射,可以配置类型转换。有如下 4 种格式。

id: target_id$string,表示id字段同步后为target_id字段,且类型映射为 string;

id: target_id,表示id字段同步后为 target_id 字段;

id: ,表示 id 字段同步前后字段名不变,字段类型采用默认映射;

id: $string 功能等同于 id: id$string

需要注意,在 targetPk 中配置过的主键也需要在这里再配置一次。

dbMapping.etlCondition

全量抽取数据时的过滤条件

dbMapping.commitBatch

一次批量 RPC 请求导入的行数,对应 tablestorewriter 中的 maxBatchRowsCount,默认取 writerConfig 中的默认值200

updateChangeColumns

行覆盖或行更新。

默认为 false,为行覆盖,即记录更新时,使用该记录最新整行值覆盖 Tablestore 中的老记录。若为 true,为行更新,即记录更新时,只对变化的字段进行操作。

Canal 支持源表到目标 Tablestore 表的字段名映射以及字段类型映射(dbMapping.targetColumns 字段中的映射配置)。可以作为目标类型配置在 $ 后面的有

配置项(大小写不敏感)

目标类型

string

string

int,integer

int

bool,boolean

bool

binary

binary

double,float,decimal

double

若不配置映射,会根据原始字段类型推断目标字段类型。

相关实践学习
消息队列+Serverless+Tablestore:实现高弹性的电商订单系统
基于消息队列以及函数计算,快速部署一个高弹性的商品订单系统,能够应对抢购场景下的高并发情况。
阿里云表格存储使用教程
表格存储(Table Store)是构建在阿里云飞天分布式系统之上的分布式NoSQL数据存储服务,根据99.99%的高可用以及11个9的数据可靠性的标准设计。表格存储通过数据分片和负载均衡技术,实现数据规模与访问并发上的无缝扩展,提供海量结构化数据的存储和实时访问。 产品详情:https://www.aliyun.com/product/ots
目录
相关文章
|
存储 索引
表格存储根据多元索引查询条件直接更新数据
表格存储是否可以根据多元索引查询条件直接更新数据?
118 3
|
SQL NoSQL 数据可视化
玩转Tablestore:使用Grafana快速展示时序数据
Grafana 是一款采用 go 语言编写的开源应用,主要用于大规模指标数据的可视化展现,是网络架构和应用分析中最流行的时序数据展示工具,可以通过将采集的数据查询然后可视化的展示,实现报警通知;Grafana拥有丰富的数据源,官方支持以下数据源:Graphite,Elasticsearch,InfluxDB,Prometheus,Cloudwatch,MySQ
1766 0
玩转Tablestore:使用Grafana快速展示时序数据
|
5月前
|
DataWorks NoSQL 关系型数据库
DataWorks产品使用合集之如何从Tablestore同步数据到MySQL
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
7月前
|
分布式计算 DataWorks API
DataWorks常见问题之按指定条件物理删除OTS中的数据失败如何解决
DataWorks是阿里云提供的一站式大数据开发与管理平台,支持数据集成、数据开发、数据治理等功能;在本汇总中,我们梳理了DataWorks产品在使用过程中经常遇到的问题及解答,以助用户在数据处理和分析工作中提高效率,降低难度。
|
7月前
|
DataWorks NoSQL 关系型数据库
可以使用dataworks从tablestore同步数据到mysql吗?
可以使用dataworks从tablestore同步数据到mysql吗?
79 1
|
NoSQL 开发工具
TableStore表格存储(阿里云OTS)多行数据操作查询,支持倒序,过滤条件和分页
1. 批量读取操作 批量读取操作可以通过多种方式进行,包括: GetRow:根据主键读取一行数据。 BatchGetRow:批量读取多行数据。 GetRange:根据范围读取多行数据。
921 0
|
存储 消息中间件 NoSQL
物联网数据通过规则引擎流转到OTS|学习笔记
快速学习物联网数据通过规则引擎流转到OTS
343 15
物联网数据通过规则引擎流转到OTS|学习笔记
|
存储 负载均衡 开发者
表格存储数据多版本介绍| 学习笔记
快速学习表格存储数据多版本介绍。
表格存储数据多版本介绍| 学习笔记
|
存储 NoSQL 关系型数据库
基于TableStore的海量气象格点数据解决方案实战 王怀远
基于TableStore的海量气象格点数据解决方案实战 王怀远
432 0
基于TableStore的海量气象格点数据解决方案实战 王怀远
|
存储 SQL 运维
基于Tablestore 实现大规模订单系统海量订单/日志数据分类存储的实践
前言:从最早的互联网高速发展、到移动互联网的爆发式增长,再到今天的产业互联网、物联网的快速崛起,各种各样新应用、新系统产生了众多订单类型的需求,比如电商购物订单、银行流水、运营商话费账单、外卖订单、设备信息等,产生的数据种类和数据量越来越多;其中订单系统就是一个非常广泛、通用的系统。而随着数据规模的快速增长、大数据技术的发展、运营水平的不断提高,包括数据消费的能力要求越来越高,这对支撑订单系统的数据库设计、存储系统也提出了更多的要求。在新的需求下,传统的经典架构面临着诸多挑战,需要进一步思考架构优化,以更好支撑业务发展;
817 0
基于Tablestore 实现大规模订单系统海量订单/日志数据分类存储的实践