易仓跨境Saas全球租户,如何做到数据秒级响应?

简介: 易仓科技面对数据体量大,跨全球各区域的复杂场景易仓大数据团队是如何做到秒级响应的?
+关注继续查看

前言


易仓科技成立于2013年,致力于构建智能协同的跨境网络,让全球贸易更轻松。经过9年的积累沉淀,产业链SaaS+生态链协同的服务模式,已发展成为跨境电商行业的头部企业。目前租户分布全球,数据来自各大洲,汇集国外各大电商平台各个站点销售数据。面对这种数据体量大,跨全球各区域的复杂场景易仓大数据团队是如何做到秒级响应的?


问题


数据分布三大洲,如何集中统一计算?


Saas租户数据库级别物理隔离,分系统分库分表,百万张表如何同步?

(1) A客户,A系统,两个库总共1000+张表与B客户,A系统同样的1000+张表

(2) Saas租户不同套餐保护不同功能系统,各系统表数量在100、200到500张不等

(3) A、B客户在同系统在同一个数据库实例与单客户单数据库实例(rds实例,polarDB实例)

(4) 几百个数据库实例分布在不同地域,不同国家


如何在保证数据准确的基础上达到毫秒级别同步入仓?


技术选项

image.png


针对以上问题的特殊性,复杂性,我们在技术选型上做了大量的调研以及深度测试,并与云服务团队深度交流做出了大量优化。


一、对于问题①②,数据分布以及多租户多库多表,如何处理?


image.png


1.租户数据库分布在阿里云rds以及polarDB的几百个实例,分散在国内外各大Region,dts同时支持全库全表同步以及可选择库表。单个dts同步表数量没有限制,高度匹配易仓复杂的业务场景,在实施过程的功能以及性能优化是问题解决关键。


2.dts数据同步将国内外数据集中实时同步到统一华南区域的Kafka,保证数据准确,时效在毫秒级别。


二、数据准确毫秒级别同步入仓,关键点在Flink的高并发pipeline处理数据,时延毫秒级,且兼具可靠性


3.由Flink实时消费Kafka数写到实时数仓Hologres。


4.借助Maxcompute(下文简称MC)与实时数仓Hologres的互通能力,完成复杂的分析计算。


遇到的坑


1.DTS规格预估不准导致数据同步不稳定且没有及时的告警机制无法及时处理,无法保证数据准确性。


2.底层数仓建模不清晰,对于物理删除的客户数据数据中心采用逻辑删除进行标记后利用MC进行合并计算,以至于跟数据源无法一比一比对。无法判断数据同步的准确性。


3.以上导致了频繁的客户数据重推,数据初始化,以至于Kafka数据大量堆积Flink消费不及时。


4.以上步骤环环相扣,陷入数据异常同步的死循环,导致数据不可用。


如何解决


1.根据数据库实例实际RPS采用DTS对应的规格,推动DTS完善告警机制。数据同步稳定性实时性都大幅度提高。


(1) 推动DTS团队解决百万表级别数量的Api接口支持,开发互相支持快速迭代

(2) 定位大数据量表初始化任务频繁重启问题,优化dts任务元数据存储逻辑策略


image.png


2.数据贴源层跟数据库一比一,重构底层设计方案,每个数据原表在原建表基础上通过sharing_seq、db_seq、company_code加上原表的主键组成唯一索引构成数据中心的目的表。


image.png


-- 租户源表
CREATE TABLE `erp_test` (  
  `id` int(11) NOT NULL AUTO_INCREMENT,  
  `d_key` varchar(50) NOT NULL DEFAULT '' COMMENT '字典key',  
  `d_val` varchar(100) NOT NULL DEFAULT '' COMMENT '字典val',  
  `d_desc` varchar(100) NOT NULL DEFAULT '' COMMENT '字典描述',  
  `ec_update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE
CURRENT_TIMESTAMP COMMENT '更新时间',  
  `ec_create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',  
  
  PRIMARY KEY (`id`),  
  KEY `idxx_ec_update_time` (`ec_update_time`),  
  KEY `idxx_ec_create_time` (`ec_create_time`)
  ) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8 COMMENT="erp_test表';
  
-- 数据中心目的表
CREATE TABLE `erp_test` (  
 `sharing_seq` text,  
 `db_seq` text,  
 `company_code` text,  
 `id` bigint NOT NULL,  
 `d_desc` text,  
 `d_key` text,  
 `d_val` text,  
 `ec_create_time` timestamptz,  
 `ec_update_time` timestamptz
 ,PRIMARY KEY (`sharing_seq`, `db_seq`, `company_code`, `id`)
 );



3.Flink核心代码逻辑优化,达到数据一比一还原数据库大量的新增、更新以及物理删除操作。数据可对比,有迹可循。通过大量数据对比,数据准确可靠并且实时性达到秒级


class DtsToDataworksHoloStream(fromSource: TSource[JSONObject], toSink: TSink[JSONObject]) extends TStream[JSONObject, JSONObject] {  
  override val source: TSource[JSONObject] = fromSource  
  override val sink: TSink[JSONObject] = toSink
  
  override protected def transform(dataStream: DataStream[JSONObject], others: DataStream[_]*): DataStream[JSONObject] = {
    val parallelism: Int = AppUtil.env.getParallelism    
    val confStream: DataStream[mutable.Map[String, DbInfo2DataworksHoloBean]] = others.head.asInstanceOf[DataStream[mutable.Map[String, DbInfo2DataworksHoloBean]]]    
    //获取各租户广播配置信息    
    val confStreamDescriptor = new MapStateDescriptor[String, DbInfo2DataworksHoloBean]("DtsConfig", classOf[String], classOf[DbInfo2DataworksHoloBean])
    val confBroadcast: BroadcastStream[mutable.Map[String, DbInfo2DataworksHoloBean]] = confStream.broadcast(confStreamDescriptor)    
    //Dts推送kafka的canalJson与租户广播配置connect    
    val result: DataStream[JSONObject] = dataStream.connect(confBroadcast).process(new Dts2DataworksHoloTransformFunction)    
    .keyBy(_.getString("table").hashCode % parallelism).map(s => s)
    result  
  }
}
//还原数据库增删改
def upsertOrDelTable(tableName: String, data: Array[(String, Any)], opType: String): Unit = {
    val put = new Put(holoclient.getTableSchema(tableName))    
    if (opType == "DELETE") {      
      put.getRecord.setType(SqlCommandType.DELETE)    
    }    
    for (kv <- data) {     
      put.setObject(kv._1.toLowerCase, kv._2)    
    }    
    holoclient.put(put)  
  }



亿万数据量级的业务订单表,实现了从数据源端到数据中心的业务延迟控制在毫秒级别,基于此数据中心秒级别的业务响应更是不在话下。


image.png


后记


DTS团队针对易仓复杂的单实例多租户场景优化:
1.单实例多租户数10万张表初始化,DTS提供的API接口完全无法满足需求!

问题排查:表数量太大,接口参数体超过nginx网关限制,tomcat限制。限制调整后仍然无法满足。
最后解决:开发新Api接口,经过与dts开发反复联调,最终采用oss文件中转方式绕过网关,实现10万级别表的初始化推送。
2.DTS实例单次初始化表数据了限制以及DTS元数据策略优化
问题排查:单次初始化表达到一定限制,dts对应的reader模块内存溢出。任务无限重启,延迟逐步增大。
最后解决:dts开发、GTS TAM服务团队日夜坚守,出现问题快速响应,临时调大reader模块内存并排查根本原因,最后专门针对SaaS这种多表的场景在元数据存储策略方面进行了大量优化并彻底解决问题。


注:

Flink:一个批处理和流处理结合的统一计算框架

Hologres:一站式实时数仓引擎(Real-Time Data Warehouse

Maxcompute:大数据计算服务,快速、完全托管的PB级数据仓库解决方案



相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
5月前
|
缓存 运维 NoSQL
中泰证券引入阿里云瑶池数据库:完成核心业务系统自主创新,实现业务微秒级响应
一举打破自建Redis架构的性能瓶颈,并带来了稳定性提升
550 1
|
消息中间件 分布式计算 关系型数据库
易仓跨境Saas全球租户,如何做到数据秒级响应?
易仓科技成立于2013年,致力于构建智能协同的跨境网络,让全球贸易更轻松。经过9年的积累沉淀,产业链SaaS+生态链协同的服务模式,已发展成为跨境电商行业的头部企业。目前租户分布全球,数据来自各大洲,汇集国外各大电商平台各个站点销售数据。面对这种数据体量大,跨全球各区域的复杂场景易仓大数据团队是如何做到秒级响应的?
240 0
易仓跨境Saas全球租户,如何做到数据秒级响应?
|
弹性计算 资源调度 运维
最佳实践丨云上虚拟IDC(私有池)如何为客户业务的确定性、连续性保驾护航
企业业务上云后,还面临特定可用区购买云上特定计算产品实例失败的困境?云上私有池pick一下
|
弹性计算 负载均衡 网络虚拟化
面向传统行业客户的共享带宽方案
告诉你为什么共享流量包相对于共享带宽更适合传统行业客户。
222 0
面向传统行业客户的共享带宽方案
|
存储 人工智能 运维
云维 - 灾备存储云端管控新体验
本文介绍了“阿里云存储-混合云备份”推出的“灾备存储-云维”服务。用户可以使用云维服务,随时随地在阿里云控制台统一管控用户所有的灾备存储设备,灵活定制设备指标报警规则,带来了灾备存储云端运维的新体验。
1120 0
云维 - 灾备存储云端管控新体验
|
数据中心 运维 API
一文揭秘存量云资源的管理难题
开源生态工具Terraform通过简单的客户端命令即可实现对阿里云资源的创建,更改和删除等操作,但对于很多以非Terraform创建的云资源,是否有办法统一管理呢?本文将向你揭秘如何使用Terraform Import命令来实现对存量云资源的导入和统一管理。
787 0
一文揭秘存量云资源的管理难题
|
关系型数据库 数据库
③云上场景:天弘基金,支撑余额宝的弹性扩展架构
天弘基金将系统迁移到金融云进行了重新部署,又陆续对以余额宝为基础的服务、监控、查询进行了逐步部署。
3827 0
【电信增值业务学习笔记】10基于业务节点的增值业务提供技术
作者:gnuhpc 出处:http://www.cnblogs.com/gnuhpc/   1.业务节点概念: 智能网有如下问题 交换机需要升级支持SSP和INAP; SCP系统技术门槛较高,成本较高; 基于智能网的业务开发技术门槛较高; 只有基本的语音资源功能,缺乏对语音内容的灵活处理能力; 对客户化业务支持弱,很难根据具体业务需求引入相应业务。
933 0
相关产品
实时计算 Flink版
推荐文章
更多