基于Tablestore Tunnel的数据复制实战

本文涉及的产品
对象存储 OSS,标准 - 本地冗余存储 20GB 3个月
对象存储OSS,敏感数据保护2.0 200GB 1年
对象存储 OSS,内容安全 1000 次 1年
简介: ### 前言 数据复制主要指通过互联的网络在多台机器上保存相同数据的副本,通过数据复制方案,人们通常希望达到以下目的:1)使数据在地理位置上更接近用户,进而降低访问延迟;2)当部分组件出现故障时,系统依旧可以继续工作,提高可用性;3)扩展至多台机器以同时提供数据访问服务,从而提升读吞吐量。

前言

数据复制主要指通过互联的网络在多台机器上保存相同数据的副本,通过数据复制方案,人们通常希望达到以下目的:1)使数据在地理位置上更接近用户,进而降低访问延迟;2)当部分组件出现故障时,系统依旧可以继续工作,提高可用性;3)扩展至多台机器以同时提供数据访问服务,从而提升读吞吐量。
如果复制的数据一成不变,那么数据复制就非常容易,只需要将数据复制到每个节点,一次性即可搞定,面对持续更改的数据如何正确而有效的完成数据复制是一个不小的挑战。

使用DataX进行Tablestore数据复制

表格存储(Tablestore)是阿里云自研的NoSQL多模型数据库,提供海量结构化数据存储以及快速的查询和分析服务,表格存储的分布式存储和强大的索引引擎能够提供PB级存储、千万TPS以及毫秒级延迟的服务能力。DataX是阿里巴巴集团内被广泛使用的离线数据同步工具,DataX本身作为数据同步框架,将不同数据源的同步抽象为从源头数据源读取数据的Reader插件,以及向目标端写入数据的Writer插件。
通过使用DataX可以完成Tablestore表的数据复制,如下图所示,otsreader插件实现了从Tablestore读取数据,并可以通过用户指定抽取数据范围可方便的实现数据增量抽取的需求,otsstreamreader插件实现了Tablestore的增量数据导出,而otswriter插件则实现了向Tablestore中写入数据。通过在DataX中配置Tablestore相关的Reader和Writer插件,即可以完成Tablestore的表数据复制。
1560928635687-14be34ff-a7f2-4b1a-9f6b-a6cb6dad4710.png

使用通道服务进行Tablestore数据复制

通道服务(Tunnel Service)是基于表格存储数据接口之上的全增量一体化服务。通道服务为您提供了增量、全量、增量加全量三种类型的分布式数据实时消费通道。通过为数据表建立数据通道,可以简单地实现对表中历史存量和新增数据的消费处理。
1561002236480-60d504d2-dfbd-4b8d-84dd-0441988f2143.png
借助于全增量一体的通道服务,我们可以轻松构建高效、弹性的数据复制解决方案。本文将逐步介绍如何结合通道服务进行Tablestore的数据复制,完整代码开源在github上的 tablestore-examples中。本次的实战将基于通道服务的Java SDK来完成,推荐先阅读下通道服务的相关文档,包括快速开始等。

1. 配置抽取

配置抽取其实对应的是数据同步所具备的功能,在本次实战中,我们将完成指定时间点之前的表数据同步,指定的时间点可以是现在或者未来的某个时刻。具体的配置如下所示,ots-reader中记录的是源表的相关配置,ots-writer中记录的是目的表的相关配置。

{
  "ots-reader": {
    "endpoint": "https://zhuoran-high.cn-hangzhou.ots.aliyuncs.com",
    "instanceName": "zhuoran-high",
    "tableName": "testSrcTable",
    "accessId": "",
    "accessKey": "",
    "tunnelName": "testTunnel",
    "endTime": "2019-06-19 17:00:00"
  },
  "ots-writer": {
    "endpoint": "https://zhuoran-search.cn-hangzhou.ots.aliyuncs.com",
    "instanceName": "zhuoran-search",
    "tableName": "testDstTable",
    "accessId": "",
    "accessKey": "",
    "batchWriteCount": 100
  }
}

ots-reader中各参数的说明如下:

  • endpoint: Tablestore服务的Endpoint地址,在进行数据复制前,请检查下连通性(可以使用curl命令)。
  • instanceName: Tablestore的实例名。
  • tableName: Tablestore的表名。
  • accessId: 访问Tablestore的云账号accessId。
  • accessKey: 访问Tablestore的云账号accessKey。
  • tunnelName: Tablestore的通道名,配置
  • endTime: 数据同步的截止时间点,对应到Java里SimpleFormat的格式为:yyyy-MM-dd HH:mm:ss 。

ots-writer中各参数的说明如下(略去相同的参数):

  • batchWriteCount: Tablestore单次批量写入的条数,最大值为200。
注:未来会开放更多的功能配置,比如指定时间范围的数据复制等。

2. 编写主逻辑

数据复制的主逻辑主要分为以下4步,在第一次运行时,会完整的进行所有步骤,而在程序重启或者断点续传场景时,只需要进行第3步和第4步。

  1. 创建复制目的表

通过使用DesribeTable接口,我们可以获取到源表的Schema,借此可以创建出目的表,值得注意的是需要把目的表的有效版本偏差设成一个足够大的值(默认为86400秒),因为服务端在处理写请求时会对属性列的版本号进行检查,写入的版本号需要在一个范围内才能写入成功,对于源表中的历史存量数据而言,时间戳往往是比较小的,会被服务端过滤掉,最终导致同步数据的丢失。

sourceClient = new SyncClient(config.getReadConf().getEndpoint(), config.getReadConf().getAccessId(),
    config.getReadConf().getAccessKey(), config.getReadConf().getInstanceName());
destClient = new SyncClient(config.getWriteConf().getEndpoint(), config.getWriteConf().getAccessId(),
    config.getWriteConf().getAccessKey(), config.getWriteConf().getInstanceName());
if (destClient.listTable().getTableNames().contains(config.getWriteConf().getTableName())) {
      System.out.println("Table is already exist: " + config.getWriteConf().getTableName());
} else {
    DescribeTableResponse describeTableResponse = sourceClient.describeTable(
        new DescribeTableRequest(config.getReadConf().getTableName()));
    describeTableResponse.getTableMeta().setTableName(config.getWriteConf().getTableName());
    describeTableResponse.getTableOptions().setMaxTimeDeviation(Long.MAX_VALUE / 1000000);
    CreateTableRequest createTableRequest = new CreateTableRequest(describeTableResponse.getTableMeta(),
        describeTableResponse.getTableOptions(),
        new ReservedThroughput(describeTableResponse.getReservedThroughputDetails().getCapacityUnit()));
    destClient.createTable(createTableRequest);
    System.out.println("Create table success: " + config.getWriteConf().getTableName());
}
  1. 在源表上创建通道

使用通道服务的CreateTunnel接口可以创建通道,此处我们创建全量加增量类型(TunnelType.BaseAndStream)类型的通道。

sourceTunnelClient = new TunnelClient(config.getReadConf().getEndpoint(), config.getReadConf().getAccessId(),
    config.getReadConf().getAccessKey(), config.getReadConf().getInstanceName());
List<TunnelInfo> tunnelInfos = sourceTunnelClient.listTunnel(
    new ListTunnelRequest(config.getReadConf().getTableName())).getTunnelInfos();
String tunnelId = null;
TunnelInfo tunnelInfo = getTunnelInfo(config.getReadConf().getTunnelName(), tunnelInfos);
if (tunnelInfo != null) {
    tunnelId = tunnelInfo.getTunnelId();
    System.out.println(String.format("Tunnel is already exist, TunnelName: %s, TunnelId: %s",
        config.getReadConf().getTunnelName(), tunnelId));
} else {
    CreateTunnelResponse createTunnelResponse = sourceTunnelClient.createTunnel(
        new CreateTunnelRequest(config.getReadConf().getTableName(),
            config.getReadConf().getTunnelName(), TunnelType.BaseAndStream));
    System.out.println("Create tunnel success: " + createTunnelResponse.getTunnelId());
}
  1. 启动定时任务来监测备份进度

备份进度的监测可以通过DesribeTunnel接口来完成,DescribeTunnel接口可以获取到最新消费到的时间点,通过和配置里的备份结束时间对比,我们可以获取到当前同步的进度。在到达结束时间后,即可退出备份程序。

backgroundExecutor = Executors.newScheduledThreadPool(2, new ThreadFactory() {
    private final AtomicInteger counter = new AtomicInteger(0);

    @Override
    public Thread newThread(Runnable r) {
        return new Thread(r, "background-checker-" + counter.getAndIncrement());
    }
});
backgroundExecutor.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        DescribeTunnelResponse resp = sourceTunnelClient.describeTunnel(new DescribeTunnelRequest(
            config.getReadConf().getTableName(), config.getReadConf().getTunnelName()
        ));
        // 已同步完成
        if (resp.getTunnelConsumePoint().getTime() > config.getReadConf().getEndTime()) {
            System.out.println("Table copy finished, program exit!");
            // 退出备份程序
            shutdown();
        }
    }
}, 0, 2, TimeUnit.SECONDS);
  1. 启动数据复制

启动通道服务的自动化消费框架,开始自动化的数据同步,其中OtsReaderProcessor中完成的是源表数据的解析和目的表的写入,处理逻辑将会在后文中介绍。

if (tunnelId != null) {
    sourceWorkerConfig = new TunnelWorkerConfig(
        new OtsReaderProcessor(config.getReadConf(), config.getWriteConf(), destClient));
    sourceWorkerConfig.setHeartbeatIntervalInSec(15);
    sourceWorker = new TunnelWorker(tunnelId, sourceTunnelClient, sourceWorkerConfig);
    sourceWorker.connectAndWorking();
}

3. 数据同步逻辑(OtsReaderProcessor)

使用通道服务,我们需要编写数据的Process逻辑和Shutdown逻辑,数据同步中的核心在于解析数据并将其写入到目的表中,处理数据的完整代码如下所示,主要逻辑还是比较清晰的,首先会检查数据的时间戳是否在合理的时间范围内,然后将StreamRecord转化为BatchWrite里对应的行,最后将数据串行写入到目的表中。

public void process(ProcessRecordsInput input) {
    System.out.println(String.format("Begin process %d records.", input.getRecords().size()));
    BatchWriteRowRequest batchWriteRowRequest = new BatchWriteRowRequest();
    int count = 0;
    for (StreamRecord record : input.getRecords()) {
        if (record.getSequenceInfo().getTimestamp() / 1000 > readConf.getEndTime()) {
            System.out.println(String.format("skip record timestamp %d larger than endTime %d",
                record.getSequenceInfo().getTimestamp() / 1000, readConf.getEndTime()));
            continue;
        }
        count++;
        switch (record.getRecordType()) {
            case PUT:
                RowPutChange putChange = new RowPutChange(writeConf.getTableName(), record.getPrimaryKey());
                putChange.addColumns(getColumns(record));
                batchWriteRowRequest.addRowChange(putChange);
                break;
            case UPDATE:
                RowUpdateChange updateChange = new RowUpdateChange(writeConf.getTableName(),
                    record.getPrimaryKey());
                for (RecordColumn column : record.getColumns()) {
                    switch (column.getColumnType()) {
                        case PUT:
                            updateChange.put(column.getColumn());
                            break;
                        case DELETE_ONE_VERSION:
                            updateChange.deleteColumn(column.getColumn().getName(),
                                column.getColumn().getTimestamp());
                            break;
                        case DELETE_ALL_VERSION:
                            updateChange.deleteColumns(column.getColumn().getName());
                            break;
                        default:
                            break;
                    }
                }
                batchWriteRowRequest.addRowChange(updateChange);
                break;
            case DELETE:
                RowDeleteChange deleteChange = new RowDeleteChange(writeConf.getTableName(),
                    record.getPrimaryKey());
                batchWriteRowRequest.addRowChange(deleteChange);
                break;
            default:
                break;
        }

        if (count == writeConf.getBatchWriteCount()) {
            System.out.println("BatchWriteRow: " + count);
            writeClient.batchWriteRow(batchWriteRowRequest);
            batchWriteRowRequest = new BatchWriteRowRequest();
            count = 0;
        }
    }

    // 写最后一次的数据。
    if (!batchWriteRowRequest.isEmpty()) {
        System.out.println("BatchWriteRow: " + count);
        writeClient.batchWriteRow(batchWriteRowRequest);
    }
}

4. 技术注解

  1. 如何保障备份性能?

备份过程分为全量(存量)和增量阶段,对于全量阶段,通道服务会自动将全表的数据在逻辑上划分成接近指定大小的若干分片,全量阶段的数据同步的整体并行度和分片数相关,能够有效的保障吞吐量。而对于增量阶段,为了保障数据的有序性,单分区内的数据我们需要串行处理数据,增量阶段的性能和分区数成正比关系(增量同步性能白皮书),如果需要提速(增加分区)可以联系表格存储技术支持。

  1. 如何做到数据同步的水平扩展?

运行多个TunnelWorker(客户端)对同一个Tunnel进行消费时(TunnelId相同), 在TunnelWorker执行Heartbeat时,通道服务端会自动的对Channel(分区)资源进行重分配,让活跃的Channel尽可能的均摊到每一个TunnelWorker上,达到资源负载均衡的目的。同时,在水平扩展性方面,用户可以很容易的通过增加TunnelWorker的数量来完成,TunnelWorker可以在同一个机器或者不同机器上。更多的原理可以参见数据消费框架原理介绍

  1. 如何做到数据的最终一致性?

数据的一致性建立在通道服务的保序协议基础上,通过全量和增量数据同步的幂等性可以保障备份数据的最终一致。

  1. 如何完成断点续传功能?

通道服务的客户端会定期将已同步(消费)完成的数据的时间位点定期发送到服务端进行持久化,在发生Failover或者重启程序后,下一次的数据消费会从记录的checkpoint开始数据处理,不会造成数据的丢失。

未来展望

在本次的实战中,我们结合通道服务完成一个简洁而有效的数据复制方案,实现了指定时间点的表数据复制。借助于本次的实战样例代码,用户仅需要配置源表和目的表的相关参数,即可以高效的完成的表数据的复制和数据的迁移。
在未来的演进中,通道服务还将支持创建指定时间段的通道,这样可以更加灵活的制定数据备份的计划,也可以完成持续备份和按时间点恢复等更加丰富的功能。

参考文献

  1. Desiging Data-Intensive Applications.

写在最后

如果您对表格存储感兴趣,欢迎加入【表格存储公开交流群】交流探讨,群号:11789671。
1561450830671-17c6be54-488f-4401-9c88-3b9f2eae0819.png

相关实践学习
阿里云表格存储使用教程
表格存储(Table Store)是构建在阿里云飞天分布式系统之上的分布式NoSQL数据存储服务,根据99.99%的高可用以及11个9的数据可靠性的标准设计。表格存储通过数据分片和负载均衡技术,实现数据规模与访问并发上的无缝扩展,提供海量结构化数据的存储和实时访问。 产品详情:https://www.aliyun.com/product/ots
目录
相关文章
|
存储 SQL 运维
基于 MySQL + Tablestore 分层存储架构的大规模订单系统实践-架构篇
背景订单系统存在于各行各业,如电商订单、银行流水、运营商话费账单等,是一个非常广泛、通用的系统。对于这类系统,在过去十几年发展中已经形成了经典的做法。但是随着互联网的发展,以及各企业对数据的重视,需要存储和持久化的订单量越来越大,数据的重视程度与数据规模的膨胀带来了新的挑战。首先,订单量对于数据的存储、持久化、访问带来了挑战,这不仅增加了开发面对的困难,也为系统的运维带来了挑战。其次,随着大数据技
3316 0
基于 MySQL + Tablestore 分层存储架构的大规模订单系统实践-架构篇
|
存储 运维 Cloud Native
教你如何免费使用一款免运维、无限容量的表存储服务
作者:李欣前言表格存储是一款用于存储海量非关系型(NoSQL)结构化数据的云原生表存储服务,提供 Schemaless 表结构设计、多元化索引以及数据更新实时订阅通道,支撑 PB 级数据存储的同时能提供丰富且灵活的数据查询、检索和分析能力。对接了各大主流开源计算引擎,能灵活的实现流批一体分析。通过阅读本文您将了解如何免费开通和使用表格存储服务,即刻拥有一个完全免运维、弹性、高性能、低成本的表存储服
3866 0
教你如何免费使用一款免运维、无限容量的表存储服务
|
存储 SQL 自然语言处理
详解TableStore模糊查询——以订单场景为例
# 背景 订单系统在各行各业中广泛应用,为消费者、商家后台、促销系统等第三方提供用户、产品、订单等多维度的管理和查询服务。为了挖掘出海量订单数据的潜能,丰富高效的查询必不可少。然而很多时候并不能给出完整准确的查询关键字,例如,只知道收货人姓氏,或是产品名称部分关键字,或是根据收货人手机尾号找到订单,我们将这类查询归为“模糊查询”。
5711 0
|
存储 NoSQL 分布式数据库
深入对比 HBase 与阿里云的表格存储服务
谷歌的 Bigtable 于 2016 年推出了兼容 HBase 的接口,而作为国内最早推出分布式 NoSQL 数据存储服务的阿里云表格存储也在最近正式发布了HBase Client,能够帮助用户将业务轻松从 HBase 迁移至表格存储。
21457 1
深入对比 HBase 与阿里云的表格存储服务
|
NoSQL 数据库 索引
海量结构化数据存储技术揭秘:Tablestore表设计最佳实践
前言 表格存储Tablestore是阿里云自研的面向海量结构化数据存储的Serverless NoSQL多模型数据库。在处理海量数据时,方案设计非常重要,合理的设计才能够发挥出数据库的性能水平。本文主要介绍Tablestore在表设计方面的一些实践经验,供大家参考。
10743 1
|
存储 NoSQL 关系型数据库
基于Tablestore打造亿量级订单管理解决方案
一、方案背景 订单系统存在于各行各业,如电商订单、银行流水、运营商话费账单等,是一个非常广泛、通用的系统。对于这类系统,在过去十几年发展中已经形成了经典的做法。但是随着互联网的发展,以及各企业对数据的重视,需要存储和持久化的订单量越来越大。
19225 0
|
索引 存储 NoSQL
海量结构化数据存储技术揭秘:Tablestore存储和索引引擎详解
前言 表格存储Tablestore是阿里云自研的面向海量结构化数据存储的Serverless NoSQL多模型数据库。Tablestore在阿里云官网上有各种文档介绍,也发布了很多场景案例文章,这些文章收录在这个合集中《表格存储Tablestore权威指南》。
25976 1
|
存储 关系型数据库 移动开发
表格存储数据模型和查询操作
本篇文章主要会详细聊一下表格存储的查询操作,以及如何根据业务的需求来设计表结构以支持特定条件的查询。        在理解查询操作之前,会简单描述一下表格存储的数据模型,以加深对查询操作的理解。
24059 2
|
存储 NoSQL Java
TableStore Timeline:轻松构建千万级IM和Feed流系统
在文章《现代IM系统中消息推送和存储架构的实现》中介绍了一种适用于IM的消息存储和推送模型Timeline,在本篇文章中,会扩展Timeline模型到IM和Feed流系统中,并且提供成熟的LIB实现。用户基于TableStore-Timeline LIB可轻松实现千万级的IM和Feed流系统。
26116 1
|
存储 NoSQL 物联网
TableStore数据模型 - WideColumn和Timeline
前言 TableStore是阿里云自研的一款分布式NoSQL数据库,提到NoSQL数据库,现在对很多应用研发来说都已经不再陌生。当前很多应用系统底层不会再仅仅依赖于关系型数据库,而是会根据不同的业务场景,来选型使用不同类型的数据库,例如缓存型KeyValue数据会存储在Redis,文档型数据会存储在MongoDB,图数据会存储在Neo4J等。
12212 0