开发者社区> 阿里云实时计算Flink> 正文

易仓跨境Saas全球租户,如何做到数据秒级响应?

简介: 易仓科技面对数据体量大,跨全球各区域的复杂场景易仓大数据团队是如何做到秒级响应的?
+关注继续查看

前言


易仓科技成立于2013年,致力于构建智能协同的跨境网络,让全球贸易更轻松。经过9年的积累沉淀,产业链SaaS+生态链协同的服务模式,已发展成为跨境电商行业的头部企业。目前租户分布全球,数据来自各大洲,汇集国外各大电商平台各个站点销售数据。面对这种数据体量大,跨全球各区域的复杂场景易仓大数据团队是如何做到秒级响应的?


问题


数据分布三大洲,如何集中统一计算?


Saas租户数据库级别物理隔离,分系统分库分表,百万张表如何同步?

(1) A客户,A系统,两个库总共1000+张表与B客户,A系统同样的1000+张表

(2) Saas租户不同套餐保护不同功能系统,各系统表数量在100、200到500张不等

(3) A、B客户在同系统在同一个数据库实例与单客户单数据库实例(rds实例,polarDB实例)

(4) 几百个数据库实例分布在不同地域,不同国家


如何在保证数据准确的基础上达到毫秒级别同步入仓?


技术选项

image.png


针对以上问题的特殊性,复杂性,我们在技术选型上做了大量的调研以及深度测试,并与云服务团队深度交流做出了大量优化。


一、对于问题①②,数据分布以及多租户多库多表,如何处理?


image.png


1.租户数据库分布在阿里云rds以及polarDB的几百个实例,分散在国内外各大Region,dts同时支持全库全表同步以及可选择库表。单个dts同步表数量没有限制,高度匹配易仓复杂的业务场景,在实施过程的功能以及性能优化是问题解决关键。


2.dts数据同步将国内外数据集中实时同步到统一华南区域的Kafka,保证数据准确,时效在毫秒级别。


二、数据准确毫秒级别同步入仓,关键点在Flink的高并发pipeline处理数据,时延毫秒级,且兼具可靠性


3.由Flink实时消费Kafka数写到实时数仓Hologres。


4.借助Maxcompute(下文简称MC)与实时数仓Hologres的互通能力,完成复杂的分析计算。


遇到的坑


1.DTS规格预估不准导致数据同步不稳定且没有及时的告警机制无法及时处理,无法保证数据准确性。


2.底层数仓建模不清晰,对于物理删除的客户数据数据中心采用逻辑删除进行标记后利用MC进行合并计算,以至于跟数据源无法一比一比对。无法判断数据同步的准确性。


3.以上导致了频繁的客户数据重推,数据初始化,以至于Kafka数据大量堆积Flink消费不及时。


4.以上步骤环环相扣,陷入数据异常同步的死循环,导致数据不可用。


如何解决


1.根据数据库实例实际RPS采用DTS对应的规格,推动DTS完善告警机制。数据同步稳定性实时性都大幅度提高。


(1) 推动DTS团队解决百万表级别数量的Api接口支持,开发互相支持快速迭代

(2) 定位大数据量表初始化任务频繁重启问题,优化dts任务元数据存储逻辑策略


image.png


2.数据贴源层跟数据库一比一,重构底层设计方案,每个数据原表在原建表基础上通过sharing_seq、db_seq、company_code加上原表的主键组成唯一索引构成数据中心的目的表。


image.png


-- 租户源表
CREATE TABLE `erp_test` (  
  `id` int(11) NOT NULL AUTO_INCREMENT,  
  `d_key` varchar(50) NOT NULL DEFAULT '' COMMENT '字典key',  
  `d_val` varchar(100) NOT NULL DEFAULT '' COMMENT '字典val',  
  `d_desc` varchar(100) NOT NULL DEFAULT '' COMMENT '字典描述',  
  `ec_update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE
CURRENT_TIMESTAMP COMMENT '更新时间',  
  `ec_create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',  
  
  PRIMARY KEY (`id`),  
  KEY `idxx_ec_update_time` (`ec_update_time`),  
  KEY `idxx_ec_create_time` (`ec_create_time`)
  ) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8 COMMENT="erp_test表';
  
-- 数据中心目的表
CREATE TABLE `erp_test` (  
 `sharing_seq` text,  
 `db_seq` text,  
 `company_code` text,  
 `id` bigint NOT NULL,  
 `d_desc` text,  
 `d_key` text,  
 `d_val` text,  
 `ec_create_time` timestamptz,  
 `ec_update_time` timestamptz
 ,PRIMARY KEY (`sharing_seq`, `db_seq`, `company_code`, `id`)
 );



3.Flink核心代码逻辑优化,达到数据一比一还原数据库大量的新增、更新以及物理删除操作。数据可对比,有迹可循。通过大量数据对比,数据准确可靠并且实时性达到秒级


class DtsToDataworksHoloStream(fromSource: TSource[JSONObject], toSink: TSink[JSONObject]) extends TStream[JSONObject, JSONObject] {  
  override val source: TSource[JSONObject] = fromSource  
  override val sink: TSink[JSONObject] = toSink
  
  override protected def transform(dataStream: DataStream[JSONObject], others: DataStream[_]*): DataStream[JSONObject] = {
    val parallelism: Int = AppUtil.env.getParallelism    
    val confStream: DataStream[mutable.Map[String, DbInfo2DataworksHoloBean]] = others.head.asInstanceOf[DataStream[mutable.Map[String, DbInfo2DataworksHoloBean]]]    
    //获取各租户广播配置信息    
    val confStreamDescriptor = new MapStateDescriptor[String, DbInfo2DataworksHoloBean]("DtsConfig", classOf[String], classOf[DbInfo2DataworksHoloBean])
    val confBroadcast: BroadcastStream[mutable.Map[String, DbInfo2DataworksHoloBean]] = confStream.broadcast(confStreamDescriptor)    
    //Dts推送kafka的canalJson与租户广播配置connect    
    val result: DataStream[JSONObject] = dataStream.connect(confBroadcast).process(new Dts2DataworksHoloTransformFunction)    
    .keyBy(_.getString("table").hashCode % parallelism).map(s => s)
    result  
  }
}
//还原数据库增删改
def upsertOrDelTable(tableName: String, data: Array[(String, Any)], opType: String): Unit = {
    val put = new Put(holoclient.getTableSchema(tableName))    
    if (opType == "DELETE") {      
      put.getRecord.setType(SqlCommandType.DELETE)    
    }    
    for (kv <- data) {     
      put.setObject(kv._1.toLowerCase, kv._2)    
    }    
    holoclient.put(put)  
  }



亿万数据量级的业务订单表,实现了从数据源端到数据中心的业务延迟控制在毫秒级别,基于此数据中心秒级别的业务响应更是不在话下。


image.png


后记


DTS团队针对易仓复杂的单实例多租户场景优化:
1.单实例多租户数10万张表初始化,DTS提供的API接口完全无法满足需求!

问题排查:表数量太大,接口参数体超过nginx网关限制,tomcat限制。限制调整后仍然无法满足。
最后解决:开发新Api接口,经过与dts开发反复联调,最终采用oss文件中转方式绕过网关,实现10万级别表的初始化推送。
2.DTS实例单次初始化表数据了限制以及DTS元数据策略优化
问题排查:单次初始化表达到一定限制,dts对应的reader模块内存溢出。任务无限重启,延迟逐步增大。
最后解决:dts开发、GTS TAM服务团队日夜坚守,出现问题快速响应,临时调大reader模块内存并排查根本原因,最后专门针对SaaS这种多表的场景在元数据存储策略方面进行了大量优化并彻底解决问题。


注:

Flink:一个批处理和流处理结合的统一计算框架

Hologres:一站式实时数仓引擎(Real-Time Data Warehouse

Maxcompute:大数据计算服务,快速、完全托管的PB级数据仓库解决方案



版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
如何设置阿里云服务器安全组?阿里云安全组规则详细解说
阿里云安全组设置详细图文教程(收藏起来) 阿里云服务器安全组设置规则分享,阿里云服务器安全组如何放行端口设置教程。阿里云会要求客户设置安全组,如果不设置,阿里云会指定默认的安全组。那么,这个安全组是什么呢?顾名思义,就是为了服务器安全设置的。安全组其实就是一个虚拟的防火墙,可以让用户从端口、IP的维度来筛选对应服务器的访问者,从而形成一个云上的安全域。
18577 0
阿里云服务器如何登录?阿里云服务器的三种登录方法
购买阿里云ECS云服务器后如何登录?场景不同,阿里云优惠总结大概有三种登录方式: 登录到ECS云服务器控制台 在ECS云服务器控制台用户可以更改密码、更换系.
27705 0
阿里云服务器如何登录?阿里云服务器的三种登录方法
购买阿里云ECS云服务器后如何登录?场景不同,大概有三种登录方式:
12964 0
阿里云服务器安全组设置内网互通的方法
虽然0.0.0.0/0使用非常方便,但是发现很多同学使用它来做内网互通,这是有安全风险的,实例有可能会在经典网络被内网IP访问到。下面介绍一下四种安全的内网互联设置方法。 购买前请先:领取阿里云幸运券,有很多优惠,可到下文中领取。
21932 0
阿里云服务器端口号设置
阿里云服务器初级使用者可能面临的问题之一. 使用tomcat或者其他服务器软件设置端口号后,比如 一些不是默认的, mysql的 3306, mssql的1433,有时候打不开网页, 原因是没有在ecs安全组去设置这个端口号. 解决: 点击ecs下网络和安全下的安全组 在弹出的安全组中,如果没有就新建安全组,然后点击配置规则 最后如上图点击添加...或快速创建.   have fun!  将编程看作是一门艺术,而不单单是个技术。
19978 0
阿里云服务器ECS登录用户名是什么?系统不同默认账号也不同
阿里云服务器Windows系统默认用户名administrator,Linux镜像服务器用户名root
15287 0
650
文章
18
问答
来源圈子
更多
实时计算 Flink 版(Alibaba Cloud Realtime Compute for Apache Flink,Powered by Ververica)是阿里云基于 Apache Flink 构建的企业级、高性能实时大数据处理系统,由 Apache Flink 创始团队官方出品,拥有全球统一商业化品牌,完全兼容开源 Flink API,提供丰富的企业级增值功能。
+ 订阅
相关文档: 实时计算(流计算)
文章排行榜
最热
最新
相关电子书
更多
JS零基础入门教程(上册)
立即下载
性能优化方法论
立即下载
手把手学习日志服务SLS,云启实验室实战指南
立即下载