基于Tablestore Tunnel的数据复制实战

本文涉及的产品
对象存储 OSS,20GB 3个月
云备份 Cloud Backup,100GB 3个月
文件存储 NAS,50GB 3个月
简介: ### 前言 数据复制主要指通过互联的网络在多台机器上保存相同数据的副本,通过数据复制方案,人们通常希望达到以下目的:1)使数据在地理位置上更接近用户,进而降低访问延迟;2)当部分组件出现故障时,系统依旧可以继续工作,提高可用性;3)扩展至多台机器以同时提供数据访问服务,从而提升读吞吐量。

前言

数据复制主要指通过互联的网络在多台机器上保存相同数据的副本,通过数据复制方案,人们通常希望达到以下目的:1)使数据在地理位置上更接近用户,进而降低访问延迟;2)当部分组件出现故障时,系统依旧可以继续工作,提高可用性;3)扩展至多台机器以同时提供数据访问服务,从而提升读吞吐量。
如果复制的数据一成不变,那么数据复制就非常容易,只需要将数据复制到每个节点,一次性即可搞定,面对持续更改的数据如何正确而有效的完成数据复制是一个不小的挑战。

使用DataX进行Tablestore数据复制

表格存储(Tablestore)是阿里云自研的NoSQL多模型数据库,提供海量结构化数据存储以及快速的查询和分析服务,表格存储的分布式存储和强大的索引引擎能够提供PB级存储、千万TPS以及毫秒级延迟的服务能力。DataX是阿里巴巴集团内被广泛使用的离线数据同步工具,DataX本身作为数据同步框架,将不同数据源的同步抽象为从源头数据源读取数据的Reader插件,以及向目标端写入数据的Writer插件。
通过使用DataX可以完成Tablestore表的数据复制,如下图所示,otsreader插件实现了从Tablestore读取数据,并可以通过用户指定抽取数据范围可方便的实现数据增量抽取的需求,otsstreamreader插件实现了Tablestore的增量数据导出,而otswriter插件则实现了向Tablestore中写入数据。通过在DataX中配置Tablestore相关的Reader和Writer插件,即可以完成Tablestore的表数据复制。
1560928635687-14be34ff-a7f2-4b1a-9f6b-a6cb6dad4710.png

使用通道服务进行Tablestore数据复制

通道服务(Tunnel Service)是基于表格存储数据接口之上的全增量一体化服务。通道服务为您提供了增量、全量、增量加全量三种类型的分布式数据实时消费通道。通过为数据表建立数据通道,可以简单地实现对表中历史存量和新增数据的消费处理。
1561002236480-60d504d2-dfbd-4b8d-84dd-0441988f2143.png
借助于全增量一体的通道服务,我们可以轻松构建高效、弹性的数据复制解决方案。本文将逐步介绍如何结合通道服务进行Tablestore的数据复制,完整代码开源在github上的 tablestore-examples中。本次的实战将基于通道服务的Java SDK来完成,推荐先阅读下通道服务的相关文档,包括快速开始等。

1. 配置抽取

配置抽取其实对应的是数据同步所具备的功能,在本次实战中,我们将完成指定时间点之前的表数据同步,指定的时间点可以是现在或者未来的某个时刻。具体的配置如下所示,ots-reader中记录的是源表的相关配置,ots-writer中记录的是目的表的相关配置。

{
  "ots-reader": {
    "endpoint": "https://zhuoran-high.cn-hangzhou.ots.aliyuncs.com",
    "instanceName": "zhuoran-high",
    "tableName": "testSrcTable",
    "accessId": "",
    "accessKey": "",
    "tunnelName": "testTunnel",
    "endTime": "2019-06-19 17:00:00"
  },
  "ots-writer": {
    "endpoint": "https://zhuoran-search.cn-hangzhou.ots.aliyuncs.com",
    "instanceName": "zhuoran-search",
    "tableName": "testDstTable",
    "accessId": "",
    "accessKey": "",
    "batchWriteCount": 100
  }
}

ots-reader中各参数的说明如下:

  • endpoint: Tablestore服务的Endpoint地址,在进行数据复制前,请检查下连通性(可以使用curl命令)。
  • instanceName: Tablestore的实例名。
  • tableName: Tablestore的表名。
  • accessId: 访问Tablestore的云账号accessId。
  • accessKey: 访问Tablestore的云账号accessKey。
  • tunnelName: Tablestore的通道名,配置
  • endTime: 数据同步的截止时间点,对应到Java里SimpleFormat的格式为:yyyy-MM-dd HH:mm:ss 。

ots-writer中各参数的说明如下(略去相同的参数):

  • batchWriteCount: Tablestore单次批量写入的条数,最大值为200。
注:未来会开放更多的功能配置,比如指定时间范围的数据复制等。

2. 编写主逻辑

数据复制的主逻辑主要分为以下4步,在第一次运行时,会完整的进行所有步骤,而在程序重启或者断点续传场景时,只需要进行第3步和第4步。

  1. 创建复制目的表

通过使用DesribeTable接口,我们可以获取到源表的Schema,借此可以创建出目的表,值得注意的是需要把目的表的有效版本偏差设成一个足够大的值(默认为86400秒),因为服务端在处理写请求时会对属性列的版本号进行检查,写入的版本号需要在一个范围内才能写入成功,对于源表中的历史存量数据而言,时间戳往往是比较小的,会被服务端过滤掉,最终导致同步数据的丢失。

sourceClient = new SyncClient(config.getReadConf().getEndpoint(), config.getReadConf().getAccessId(),
    config.getReadConf().getAccessKey(), config.getReadConf().getInstanceName());
destClient = new SyncClient(config.getWriteConf().getEndpoint(), config.getWriteConf().getAccessId(),
    config.getWriteConf().getAccessKey(), config.getWriteConf().getInstanceName());
if (destClient.listTable().getTableNames().contains(config.getWriteConf().getTableName())) {
      System.out.println("Table is already exist: " + config.getWriteConf().getTableName());
} else {
    DescribeTableResponse describeTableResponse = sourceClient.describeTable(
        new DescribeTableRequest(config.getReadConf().getTableName()));
    describeTableResponse.getTableMeta().setTableName(config.getWriteConf().getTableName());
    describeTableResponse.getTableOptions().setMaxTimeDeviation(Long.MAX_VALUE / 1000000);
    CreateTableRequest createTableRequest = new CreateTableRequest(describeTableResponse.getTableMeta(),
        describeTableResponse.getTableOptions(),
        new ReservedThroughput(describeTableResponse.getReservedThroughputDetails().getCapacityUnit()));
    destClient.createTable(createTableRequest);
    System.out.println("Create table success: " + config.getWriteConf().getTableName());
}
  1. 在源表上创建通道

使用通道服务的CreateTunnel接口可以创建通道,此处我们创建全量加增量类型(TunnelType.BaseAndStream)类型的通道。

sourceTunnelClient = new TunnelClient(config.getReadConf().getEndpoint(), config.getReadConf().getAccessId(),
    config.getReadConf().getAccessKey(), config.getReadConf().getInstanceName());
List<TunnelInfo> tunnelInfos = sourceTunnelClient.listTunnel(
    new ListTunnelRequest(config.getReadConf().getTableName())).getTunnelInfos();
String tunnelId = null;
TunnelInfo tunnelInfo = getTunnelInfo(config.getReadConf().getTunnelName(), tunnelInfos);
if (tunnelInfo != null) {
    tunnelId = tunnelInfo.getTunnelId();
    System.out.println(String.format("Tunnel is already exist, TunnelName: %s, TunnelId: %s",
        config.getReadConf().getTunnelName(), tunnelId));
} else {
    CreateTunnelResponse createTunnelResponse = sourceTunnelClient.createTunnel(
        new CreateTunnelRequest(config.getReadConf().getTableName(),
            config.getReadConf().getTunnelName(), TunnelType.BaseAndStream));
    System.out.println("Create tunnel success: " + createTunnelResponse.getTunnelId());
}
  1. 启动定时任务来监测备份进度

备份进度的监测可以通过DesribeTunnel接口来完成,DescribeTunnel接口可以获取到最新消费到的时间点,通过和配置里的备份结束时间对比,我们可以获取到当前同步的进度。在到达结束时间后,即可退出备份程序。

backgroundExecutor = Executors.newScheduledThreadPool(2, new ThreadFactory() {
    private final AtomicInteger counter = new AtomicInteger(0);

    @Override
    public Thread newThread(Runnable r) {
        return new Thread(r, "background-checker-" + counter.getAndIncrement());
    }
});
backgroundExecutor.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        DescribeTunnelResponse resp = sourceTunnelClient.describeTunnel(new DescribeTunnelRequest(
            config.getReadConf().getTableName(), config.getReadConf().getTunnelName()
        ));
        // 已同步完成
        if (resp.getTunnelConsumePoint().getTime() > config.getReadConf().getEndTime()) {
            System.out.println("Table copy finished, program exit!");
            // 退出备份程序
            shutdown();
        }
    }
}, 0, 2, TimeUnit.SECONDS);
  1. 启动数据复制

启动通道服务的自动化消费框架,开始自动化的数据同步,其中OtsReaderProcessor中完成的是源表数据的解析和目的表的写入,处理逻辑将会在后文中介绍。

if (tunnelId != null) {
    sourceWorkerConfig = new TunnelWorkerConfig(
        new OtsReaderProcessor(config.getReadConf(), config.getWriteConf(), destClient));
    sourceWorkerConfig.setHeartbeatIntervalInSec(15);
    sourceWorker = new TunnelWorker(tunnelId, sourceTunnelClient, sourceWorkerConfig);
    sourceWorker.connectAndWorking();
}

3. 数据同步逻辑(OtsReaderProcessor)

使用通道服务,我们需要编写数据的Process逻辑和Shutdown逻辑,数据同步中的核心在于解析数据并将其写入到目的表中,处理数据的完整代码如下所示,主要逻辑还是比较清晰的,首先会检查数据的时间戳是否在合理的时间范围内,然后将StreamRecord转化为BatchWrite里对应的行,最后将数据串行写入到目的表中。

public void process(ProcessRecordsInput input) {
    System.out.println(String.format("Begin process %d records.", input.getRecords().size()));
    BatchWriteRowRequest batchWriteRowRequest = new BatchWriteRowRequest();
    int count = 0;
    for (StreamRecord record : input.getRecords()) {
        if (record.getSequenceInfo().getTimestamp() / 1000 > readConf.getEndTime()) {
            System.out.println(String.format("skip record timestamp %d larger than endTime %d",
                record.getSequenceInfo().getTimestamp() / 1000, readConf.getEndTime()));
            continue;
        }
        count++;
        switch (record.getRecordType()) {
            case PUT:
                RowPutChange putChange = new RowPutChange(writeConf.getTableName(), record.getPrimaryKey());
                putChange.addColumns(getColumns(record));
                batchWriteRowRequest.addRowChange(putChange);
                break;
            case UPDATE:
                RowUpdateChange updateChange = new RowUpdateChange(writeConf.getTableName(),
                    record.getPrimaryKey());
                for (RecordColumn column : record.getColumns()) {
                    switch (column.getColumnType()) {
                        case PUT:
                            updateChange.put(column.getColumn());
                            break;
                        case DELETE_ONE_VERSION:
                            updateChange.deleteColumn(column.getColumn().getName(),
                                column.getColumn().getTimestamp());
                            break;
                        case DELETE_ALL_VERSION:
                            updateChange.deleteColumns(column.getColumn().getName());
                            break;
                        default:
                            break;
                    }
                }
                batchWriteRowRequest.addRowChange(updateChange);
                break;
            case DELETE:
                RowDeleteChange deleteChange = new RowDeleteChange(writeConf.getTableName(),
                    record.getPrimaryKey());
                batchWriteRowRequest.addRowChange(deleteChange);
                break;
            default:
                break;
        }

        if (count == writeConf.getBatchWriteCount()) {
            System.out.println("BatchWriteRow: " + count);
            writeClient.batchWriteRow(batchWriteRowRequest);
            batchWriteRowRequest = new BatchWriteRowRequest();
            count = 0;
        }
    }

    // 写最后一次的数据。
    if (!batchWriteRowRequest.isEmpty()) {
        System.out.println("BatchWriteRow: " + count);
        writeClient.batchWriteRow(batchWriteRowRequest);
    }
}

4. 技术注解

  1. 如何保障备份性能?

备份过程分为全量(存量)和增量阶段,对于全量阶段,通道服务会自动将全表的数据在逻辑上划分成接近指定大小的若干分片,全量阶段的数据同步的整体并行度和分片数相关,能够有效的保障吞吐量。而对于增量阶段,为了保障数据的有序性,单分区内的数据我们需要串行处理数据,增量阶段的性能和分区数成正比关系(增量同步性能白皮书),如果需要提速(增加分区)可以联系表格存储技术支持。

  1. 如何做到数据同步的水平扩展?

运行多个TunnelWorker(客户端)对同一个Tunnel进行消费时(TunnelId相同), 在TunnelWorker执行Heartbeat时,通道服务端会自动的对Channel(分区)资源进行重分配,让活跃的Channel尽可能的均摊到每一个TunnelWorker上,达到资源负载均衡的目的。同时,在水平扩展性方面,用户可以很容易的通过增加TunnelWorker的数量来完成,TunnelWorker可以在同一个机器或者不同机器上。更多的原理可以参见数据消费框架原理介绍

  1. 如何做到数据的最终一致性?

数据的一致性建立在通道服务的保序协议基础上,通过全量和增量数据同步的幂等性可以保障备份数据的最终一致。

  1. 如何完成断点续传功能?

通道服务的客户端会定期将已同步(消费)完成的数据的时间位点定期发送到服务端进行持久化,在发生Failover或者重启程序后,下一次的数据消费会从记录的checkpoint开始数据处理,不会造成数据的丢失。

未来展望

在本次的实战中,我们结合通道服务完成一个简洁而有效的数据复制方案,实现了指定时间点的表数据复制。借助于本次的实战样例代码,用户仅需要配置源表和目的表的相关参数,即可以高效的完成的表数据的复制和数据的迁移。
在未来的演进中,通道服务还将支持创建指定时间段的通道,这样可以更加灵活的制定数据备份的计划,也可以完成持续备份和按时间点恢复等更加丰富的功能。

参考文献

  1. Desiging Data-Intensive Applications.

写在最后

如果您对表格存储感兴趣,欢迎加入【表格存储公开交流群】交流探讨,群号:11789671。
1561450830671-17c6be54-488f-4401-9c88-3b9f2eae0819.png

相关实践学习
消息队列+Serverless+Tablestore:实现高弹性的电商订单系统
基于消息队列以及函数计算,快速部署一个高弹性的商品订单系统,能够应对抢购场景下的高并发情况。
阿里云表格存储使用教程
表格存储(Table Store)是构建在阿里云飞天分布式系统之上的分布式NoSQL数据存储服务,根据99.99%的高可用以及11个9的数据可靠性的标准设计。表格存储通过数据分片和负载均衡技术,实现数据规模与访问并发上的无缝扩展,提供海量结构化数据的存储和实时访问。 产品详情:https://www.aliyun.com/product/ots
目录
相关文章
|
3月前
|
分布式计算 DataWorks 调度
oss数据同步maxcompute报错
在使用阿里云DataWorks同步OSS数据至MaxCompute时,遇到“Input is not in the .gz format”的报错。问题源于目标目录中存在一个空文件,导致同步时识别错误。
|
索引 存储 NoSQL
表格存储(Tablestore)入门指南
表格存储(Tablestore)入门指南内容简介了表格存储(Tablestore)是阿里云自研的 NoSQL 多模型数据库,提供海量结构化数据存储以及快速的查询和分析服务。
18550 2
|
存储 SQL NoSQL
Tablestore
Tablestore(表格存储)是阿里云提供的一种云原生、高性能、可扩展的 NoSQL 数据库服务。它支持海量数据存储和快速查询,适用于大数据分析、数据仓库、日志收集等场景。
642 1
|
存储 SQL 分布式计算
《阿里云存储手册》——表格存储Tablestore
《阿里云存储手册》——表格存储Tablestore
297 0
|
canal NoSQL 关系型数据库
使用 Canal 向 Tablestore 导入数据
可以使用 Canal 将数据传输进入 Tablestore。需要部署两部分内容,首先部署 canal.deployer,deployer 负责从上游拉取 binlog 数据,记录位点等。然后再部署 canal.adapter 包,这个服务负责对接 deployer 解析过的数据,并且将数据传输到下游数据库中,在本文中即 Tablestore 数据库。链路如图。Deployer部署部署步骤首先部署 
560 0
使用 Canal 向 Tablestore 导入数据
|
编解码 分布式计算 Java
Maxcompute tunnel 上传典型问题 | 学习笔记
快速学习 Maxcompute tunnel 上传典型问题
848 0
|
缓存 弹性计算 算法
Flink 高效 sink 写入 oss | 学习笔记
快速学习 Flink 高效 sink 写入 oss。
1072 0
Flink 高效 sink 写入 oss | 学习笔记
|
存储 运维 NoSQL
表格存储 Tablestore 简介
近十年来互联网技术得到了飞速的发展,越来越多的行业逐渐加入到了互联网的阵营中来,同时也产生了更丰富、更复杂的业务场景和需求,这对于数据应用系统的性能无疑是巨大的挑战。传统关系型数据库有什么瓶颈,如何通过分布式数据库表格存储 Tablestore 进行优化?
945 0
|
缓存 运维 NoSQL
使用 Blink 访问表格存储 Tablestore
本文介绍如何使用实时计算 Blink 服务访问表格存储服务(Tablestore),并进行开发。背景Blink 产品介绍阿里云实时计算Flink版(Alibaba Cloud Realtime Compute for Apache Flink,Powered by Ververica)是阿里云基于Apache Flink构建的企业级、高性能实时大数据处理系统,由Apache Flink创始团队官方
571 0
使用 Blink 访问表格存储 Tablestore
|
SQL NoSQL 大数据
使用MaxCompute访问TableStore(OTS) 简明手册
大数据计算服务 MaxCompute 能够提供强大的分析能力,而分布式 NoSQL 数据库表格存储在行级别上的实时更新和可覆盖性写入等特性,相对于 MaxCompute 内置表 append-only 批量操作,提供了一个很好的补充。
10003 1