作者介绍
李猛,Elastic Stack 深度用户,通过 Elastic 工程师认证,2012年接触 Elasticsearch,对 Elastic Stack 技术栈开发、架构、运维等方面有深入体验,实践过多种大中型项目;为企业提供 Elastic Stack 咨询培训以及调优实施;多年实战经验,爱捣腾各种技术产品,擅长大数据,机器学习,系统架构。
序言
前一篇文章 《DB与ES混合之应用系统场景分析探讨》,我们主要探讨了混合场景下的多种模型映射类型,基本覆盖了应用业务系统如何借助Elasticsearch 来解决DB局限性。
下面这篇文章,我们主要解决 DB 到 Elasticsearch 数据实时同步问题。
背景需求
DB与ES本质上是属于不同应用领域的数据库产品,混合应用在一起主要面临2个问题 :
1、同步实时性,数据在DB更新之后,需要多久才能更新到 Elasticsearch,多久的时间是应用系统可以接受的范围,一般需要控制在1s以内,如果是分钟以上,那这就属于离线同步。
2、数据一致性,数据频繁在DB变更修改,更新到Elasticsearch之后如何保证数据与DB一致,在容许的时间范围内应用系统查询的数据有效的,可接受的,如果变更出现覆盖等,那数据是无效的,应用系统是不可接受的。
同步模式
在数据同步方面,主要有3种同步模式
1、推送模式,数据源将变更数据推送到目标源,如RabbitMQ产品,服务端会主动MQ发送到客户端。
2、拉取模式,目标源定时去数据源拉取变更数据,如Mysql数据库的数据主从同步机制,Slave会去Master拉取变更数据。
3、推拉结合,数据源与目标源之间,既有推送方式,也有拉取方式,此种模式一般会借助于中间媒介实现,如基于Kafka产品的日志应用,数据源(采集端)会将日志数据发送到Kafka集群,目标源会定期的从Kafka拉取数据更新。
技术方案
从技术层面看,DB同步到ES有好多种方式
1、同步双写,更新DB时同步更新ES。此技术方案最简单,附带问题最多,数据冲突,数据覆盖,数据丢失,处处是坑,谨慎选择。
2、异步双写,更新DB之后,记录一条MQ,MQ通知消费端,消费端反向查询DB数据,最后更新到ES。此技术方案与业务系统耦合严重,需要更加业务需求独立编写,且每个业务都需要专门编写相关程序,非常不利于快速响应需求。
3、CDC,全称Change Data Capture,变更数据捕捉,从数据库内部捕捉变更数据,将变更数据推送到中间程序中,中间程序逻辑实现同步推送到ES。CDC机制速度极快,数据精准,且与应用程序耦合少,可抽象脱离业务系统,适合大规模使用。 如图:
CDC机制原有设计是为了同类型数据库之间数据同步,应用在主从同步高可用方面,所以同类型数据库之间数据同步非常容易实现,数据库厂商本身天然支持,经过多年实战验证高效可靠。相反,在异构数据库之间实现数据同步是比较复杂的,数据链路长,中间涉及到的技术点特别多,且每一步都非常关键,下面就以本人所在公司采用的技术栈讲述如何实现,以及一些技术关键点。
案例:MySQL 同步到 Elasticsearch
数据从Mysql同步到Elasticsearch主要涉及到以下几个技术关键点
• Binlog机制
• Canal中间件
• Kakfa中间件
• 同步应用程序(业务型开发成中间件)
Binlog机制
Binlog是Mysql自带功能机制,设计之初是为了数据库之间主从同步
• Master主库,启动Binlog机制,将变更数据写入Binlog文件
• Slave从库,从Master主库拉取binlon数据,回放Binlog,更新从库数据
• 启用Binlog注意以下几点
• Master主库一般会有多台Slave订阅,且Master主库要支持业务系统实时变更操作,服务器资源会有瓶颈
• 需要同步的数据表一定要有主键
Canal中间件
Canal是Mysql的中间件产品,专门应用在数据同步
• 伪装Slave从库,
• 拉取Binlog数据,
• 回放Binlog数据,
• 解析Binlog数据为Json,报文记录了新旧数据,数据库数据表,更新方式
• 输出数据,并保证变更顺序,输出目的源支持很多,常规的一般输出到kafka
• 配置cannl注意以下几点
• Canal基于Jvm运行,数据处理能力不如Mysql,Canal需要配置集群模式。一组Canal集群不能支持太多的事数据库实例。
• 若是数据库做了水平的分库分表 ,原有Canal是不能识别为一类数据源,需要稍微修改部分代码
• 建议Canal订阅的Slave从机,因为Master是业务主库,主库承担的业务系统职责太多
• Binlog日志模式建议启动Gtid,Canal订阅的数据库如果出现故障,需要基于此切换到其他数据库。
• 数据输出到Kafka,若数据库是做了分库分表的,需要修改Canal部分代码。
• 设置Kafka分区键相同,保障相同数据变更顺序。
Kafka中间件
使用Kafka作为中间缓存,主要基于以下几个方面考虑
• 分区特性,Kafka支持分区,并发性能好,数据吞吐能力超过mysql,性能不会成为瓶颈
• 分区顺序存储,Kafka数据存储是有顺序的,设置好主键,保障Binlog变更顺序
• 消费顺序,客户端消费Kafk数据,会基于Offset,按顺序消费,保障Binlog变更顺序
• 消费组,严格意义上讲,Kafka并非熟悉消息队列,应该算消息流,我们在上一篇文中讨论数据模型映射需求,一个数据表可能会映射到多个索引,这就需要设置不同的消费组,保障多个消费组之间不冲突覆盖,同样的变更数据有多次重复消费
同步程序
同步程序当前基于Java自主开发,当前的主流同步工具不能很好支持自定义需求,主要包括两大程序
同步任务调度程序
• 同步调度配置,配置同步任务,配置同步映射关系,DB到ES的映射,Kafka到ES的映射
• 同步调度分配,同步任务操作,启动、停止、重新设置;同步任务分配,指定并行度等。
同步任务执行程序
• 执行任务,将数据从Kafka经过映射写入到Elasticsearch中,主要由4大模块组成
• Kafka模块,拉取消费数据,记录消费位置
• Mapper模块,执行映射过程,数据表与索引映射,表字段与索引字段映射,生成指定的Json格式数据
• Elastic模块,将Mapper生成好的Json数据提交到Elasticsearch中,成功则提交消费记录位置,失败则走另外逻辑
• Schedule模块,基于线程级别维度执行同步任务,支持同步任务启动暂停等操作,实时汇总同步任务的指标数据
数据同步全过程回顾
数据从DB更新到ES,中间经过多个环节,同步模式既有推送,也拉取,且多次结合完成。
• Mysql写入到本地binlog,推送模式
• Canal读取binlog写入Kafka,先是拉取模式,后是推送模式
• Worker同步程序从Kafka读写数据,经过处理写入到Elasticsearch,先是拉取模式,后是推送模式
注意事项
DB到ES实时同步整体项目链路很长,且涉及技术点较多,任意环境都会导致一些问题,有一些特别注意
• DB刷数据问题,由于DB是批量更新,后面几个技术节点会出现部分性能瓶颈
• DB多表关联深度问题,DB多表直接关联最好的关系是1对1,主要ES映射也可以基于主键关联更新,无需反向查询
• ES高级类型限制问题,ES本身支持很多高级数据类型,但这些在同步程序中最好不要使用
遗留问题
项目推进过程中,遇到不少问题,有的已经解决,有的是无法解决,有的会改善解决
• 数据校验,DB数据虽然同步到ES中,但目前是没有有效的方法去校验正确性的,传统的方式校验方式是随机两边查询对比,非常的低效,此处需要探讨更好的数据比对方法,
• 数据修复,当数据发现不正确时需要自动能够修复,但由于数据校验的低效,数据修复的准确性也有待考量
• 技术演进,数据同步程序是自主基于Java开发,但做了很多非业务时间工作,程序大量的工作在调度且涉及很传统,考虑引入Flink平台,由平台负责底层资源掉度,上层只需配置同步映射,当前正在测试中。
结语
经验总结
DB到ES数据实时同步,前后经过的时间很长,经过了好几次的技术方案演进,未来还有很大的优化改进空间。
最终进化到CDC这种方案也是基于已有的经验分享(参考马蜂窝技术博客),设计思路一样,技术实现不一样。
数据同步整体技术实现中间环节很多,任意技术点都需要了解透彻,否则会出现很多致命事故,需要多人团队协作完成。
CDC并非新概念,几乎所有数据库产品都支持,如下:
• Postgresql 有 Logical Decoding
• Sqlserver 有 Change data capture 和 Change Tracking
• Oralce 有 Redo log和 Oralce Golden Gate
• Mongodb 有 Replicate sets
• Elasticsearch 有 Translog
若之后遇到类似数据实时同步需求,优先选择CDC技术方案 ,我们需要更强大的数据实时交换平台,欢迎讨论一起干。
声明:本文由原文作者“李猛”授权转载,对未经许可擅自使用者,保留追究其法律责任的权利。
【阿里云Elastic Stack】100%兼容开源ES,独有9大能力,提供免费X-pack服务(单节点价值$6000)
相关活动
更多折扣活动,请访问阿里云 Elasticsearch 官网
阿里云 Elasticsearch 商业通用版,1核2G ,SSD 20G首月免费
阿里云 Logstash 2核4G首月免费