PostgreSQL11 CDC的分布式文件采集架构实战

本文涉及的产品
云原生数据库 PolarDB MySQL 版,通用型 2核4GB 50GB
云原生数据库 PolarDB PostgreSQL 版,标准版 2核4GB 50GB
简介: PostgreSQL11 CDC的分布式文件采集架构实战

PostgreSQL Wal2json的变更捕获


通过CDC(数据变更捕获)机制将PostgreSQL主表变更数据以PG逻辑复制机制迁移到异构的数据库中。


先看我们的PG CDC方案:PG逻辑复制经历Wal2json解码,由pg_recvlogical监控源源不断输出到文件,然后由Storm的自定义Java程序监测文件(后续聊如何监测),并分流到Kafka Topic,来实现变更数据捕获(CDC)方案。


77eeb7a09f394b81991bbfce473a0d52.png


第一部分先安装PostgreSQL11所需的wal2json插件,并进行相关的配置

yum install postgresql11-contrib
yum install wal2json_11

编辑postgresql.conf

shared_preload_libraries = 'wal2json'
wal_level = logical
max_wal_senders = 4
max_replication_slots = 4

编辑pg_hba.conf

local   replication     all                                     trust
host    replication     all             127.0.0.1/32            trust
host    replication     all             ::1/128                 trust

第二部分,我们需要创建health_master角色,具有复制权限

create role health_master replication login password '123456';

第三步,我们需要对监控的healthcare_db数据库建立复制槽health_db_slot

#通过命令pg_recvlogical创建slot
pg_recvlogical -d healthcare_db --slot health_db_slot --create-slot -P wal2json
#slot捕获输出文件
pg_recvlogical -U healthcare_master -d healthcare_db --slot health_db_slot --start --file=healthcare_db.log

启动复制槽后, healthcare_db数据库的变化的日志都会进入healthcare_db.log文件


696eec5540dc4e4fb18df2bac2259e60.png


下来就是跟文件打交道,很有趣,每一步都不会顺心如意,但每一步的解决都有所获益,首先是对文件变化的监测,能找到很多办法,例如通过ELK家族的Filebeat工具来探测,但是外部工具不好融合进Storm,最好是自己写Java程序来监测。


Java NIO WatchService监控文件变化


其实JDK7以上版本就有一个比较不错的选择,那就是NIO包里的WatchService监控器,我觉得它有两方面的优点,其一就是由操作系统的信号通知机制,当文件目录中出现变化就发信号给应用层监控器,那么这种由操作系统主动通知的效率就远好于应用程序对文件的反复轮巡,而且不占用过多系统资源;其二编程模型并不采用观察者模式注册监听器的方案,而是将多线程问题隐藏起来,客户端对api采取循环阻塞的直观调用,这就非常有利于嵌入到各种运行容器当中去执行文件采集监控。


另外监测文件变化后按行采集变化记录我采用了RadmonAccessFile对象,这个文件操作对象常用于断点续传此类的需求,很方便,关键要设计一个可持久化的位移记录文件,保证采集器重启后总能从未读取的最新变化数据点位置开始采集数据。


19a7175c08714597a9423d636d1f6322.png



文件监控与采集功能嵌入storm集群之后又出现了一个新问题,那就是storm spout实例不会如你所愿地运行在指定的机器上,而是完全由storm集群随机地在节点上指定运行,但被监测的文件位置是固定的,反正总有笨办法:当storm集群启动后,确定spout运行的机器节点,再由该机器执行cdc文件输出程序,但是这样耦合性太强,必须跟随storm对spout实例的安排而变化采集位置,维护管理就会很麻烦,而且很容易出错。


我为Storm Spout创建了名为CDCFileSpout.java的类,下面的源代码示例是监测文件并发射流数据的核心部分:

    @Override
    public void nextTuple() {
        //读取文件位移点
        long oldpos = loadPos();
        long newpos = oldpos;
        try {
            //通过JAVA NIO包提供的WatchService监控器对文件目录的变化监控
            //阻塞式等待变化
            while ((key = watchService.take()) != null) {
                List<WatchEvent<?>> events = key.pollEvents();
                log.info("take pollEvents size : " + events.size());
                //遍历文件变化事件
                for (WatchEvent<?> event : events) {
                    //只处理PG变更输出文件
                    if (event.context().toString().equals(fileName)) {
                        //通过文件随机访问对象的创建,定位捕获文件新增记录
                        raf = new RandomAccessFile(dirName + fileName, "r");
                        if(raf.length() == 0) {
                            log.info("Empty file detected! ");
                            newpos = 0;
                            oldpos = -1;
                            break;
                        }
                        //定位位移
                        raf.seek(newpos);
                        log.info("seek pos : " + newpos);
                        String change = null;
                        //按行读取变化
                        while ((change = raf.readLine()) != null) {
                            //将变化字符串记录发射出去
                            collector.emit(new Values(change));
                        }
                        //更新位移
                        newpos = raf.getFilePointer();
                    }
                }
                //完成一次监测读取,持久化位移量
                if (newpos > oldpos) {
                    writerPos(newpos);
                    oldpos = newpos;
                    log.info("write pos : " + newpos);
                }
                key.reset();
            }
        } catch (IOException | InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            if(raf != null) {
                try {
                    raf.close();
                }catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }


GlusterFS解决Storm Spout的分布问题


因此我就引出了一个新的假设:通过分布式文件系统(dfs)来解决此问题,但是dfs的选型很重要,Hadoop hdfs肯定不行,它脱离了普通文件系统的操作方式,最终我挑选了两款dfs,一是clusterfs,二是moosefs,它们都具有fuse结合功能,通过mount dfs到本地目录的方式,让访问dfs如同访问本地目录文件一样无缝结合,dfs的任一客户端节点对文件的修改,都会在所有dfs客户端节点上被通知,因此我让storm的所有节点都成为dfs的客户端,这样无论spout随机运行在任何节点上,都可以在本节点的相同目录中去访问dfs中的被监测的文件,同时被监测文件还具有了多副本的高可靠性。


ac572c03b0c64a14b629b63220f2ec00.png


这种解决分布式计算过程中与分布式存储结合的方案,也就是storm计算节点由于是集群动态分配位置,无法固定住storm spout的文件采集位置,因此我选择了分布式文件系统的思路,主要是利用了GlusterFS连接Linux fuse(用户空间文件系统)的办法,使得每一个spout节点都是dfs客户端,那么无论spout被分配在哪个节点,都可以通过监测并读取本节点的GlusterFS客户端挂载(mount)的目录来实现对pg cdc输出文件副本的数据采集。


制服Bug的艺术


但是测试中发现一个大bug,让我虎躯一震,bug原因分析:


内置在spout中的Java文件监控器(watchservice)监控目录变化是通过操作系统传递来的信号驱动的,这样spout就可以等待式文件变化实现监控,可是我想当然的以为就算pgsql cdc输出节点与spout文件采集监控节点不是一台机器也可以,只要通过分布式文件系统同步副本,spout节点就一定能感知到当前目录副本的变化,事实上我错了,spout中的watchservice根本就感知不到目录副本的变化,因此想要得到操作系统的文件变化信号通知,必须对文件目录的读写是在一台机器上,才会有文件变化信号发送给上层应用,我之前的测试正确仅仅是因为pgsql输出和spout监控是同一台服务器。

    @Override
    protected int run(String[] args) throws Exception {
        //创建拓扑构建者
        TopologyBuilder builder = new TopologyBuilder();
        //创建并设置文件监控Spout,3个实例分配在3台不同的机器,同时只有一个实例采集
        builder.setSpout("fileSpout", new CDCFileSpout(), 3);
        //创建并设置PG变更JSON解码Bolt,只会有一个Spout传递数据
        builder.setBolt("decodeGroup", new CDCDecodeBlot(),1)
                .shuffleGrouping("fileSpout");
        //基于Storm-Kafka插件,创建并设置Kafka Producer Bolt
        KafkaBolt<String, String> kafkaBolt = configureKafkaBolt(builder);
        //KafkaBolt以数据表名字段分组接收上游分流数据,可以由3个KafkaBolt发送数据
        //每个KafkaBolt进程只发送所属“table”的数据到指定的分区(按Kafka Table名自定义分区)
        builder.setBolt("KafkaBolt", kafkaBolt, 3)
            .fieldsGrouping("decodeGroup", new Fields("table"));
        conf.setDebug(true);
        String topologyName = "PGSQLCDC";
        //计算拓扑分配3个工作进程
        conf.setNumWorkers(3);
        if (args != null && args.length > 0) {
            topologyName = args[0];
        }
        return submit(topologyName, conf, builder);
    }

那么问题就来了,我的假设就是spout不用考虑采集点的目录位置,否则逆向根据storm集群分配好spout节点地址后才能进行pg监控,显然这是颠倒流程了,又试过moosefs和nfs,结果一样,nfs还不如分布式文件系统高效,当无路可走的时候,认为自己的假设即将失败的时候,一个新的思路开启了我的灵感,为什么非要spout只设置1个并行度呢?按照参与storm集群拓扑的工作数是3个,那就设置spout并行度为3,这样每一个机器就都会有一个spout监控本地GlusterFS挂载目录,那么无论我的pg cdc输出程序是在哪个节点启动,同时只会有一个spout感应到副本变化开始推送数据,其他都是wait,这样就解决了问题,同样也保证了即便是换一个节点进行pg cdc文件输出,前一个spout实例自然wait,新的spout就工作了,依然完美地保证了pg cdc程序与spout的可靠性冗余。


相关实践学习
使用PolarDB和ECS搭建门户网站
本场景主要介绍基于PolarDB和ECS实现搭建门户网站。
阿里云数据库产品家族及特性
阿里云智能数据库产品团队一直致力于不断健全产品体系,提升产品性能,打磨产品功能,从而帮助客户实现更加极致的弹性能力、具备更强的扩展能力、并利用云设施进一步降低企业成本。以云原生+分布式为核心技术抓手,打造以自研的在线事务型(OLTP)数据库Polar DB和在线分析型(OLAP)数据库Analytic DB为代表的新一代企业级云原生数据库产品体系, 结合NoSQL数据库、数据库生态工具、云原生智能化数据库管控平台,为阿里巴巴经济体以及各个行业的企业客户和开发者提供从公共云到混合云再到私有云的完整解决方案,提供基于云基础设施进行数据从处理、到存储、再到计算与分析的一体化解决方案。本节课带你了解阿里云数据库产品家族及特性。
相关文章
|
2天前
|
设计模式 监控 Java
分布式系统架构4:容错设计模式
这是小卷对分布式系统架构学习的第4篇文章,重点介绍了三种常见的容错设计模式:断路器模式、舱壁隔离模式和重试模式。断路器模式防止服务故障蔓延,舱壁隔离模式通过资源隔离避免全局影响,重试模式提升短期故障下的调用成功率。文章还对比了这些模式的优缺点及适用场景,并解释了服务熔断与服务降级的区别。尽管技术文章阅读量不高,但小卷坚持每日更新以促进个人成长。
22 11
|
3天前
|
消息中间件 存储 安全
分布式系统架构3:服务容错
分布式系统因其复杂性,故障几乎是必然的。那么如何让系统在不可避免的故障中依然保持稳定?本文详细介绍了分布式架构中7种核心的服务容错策略,包括故障转移、快速失败、安全失败等,以及它们在实际业务场景中的应用。无论是支付场景的快速失败,还是日志采集的安全失败,每种策略都有自己的适用领域和优缺点。此外,文章还为技术面试提供了解题思路,助你在关键时刻脱颖而出。掌握这些策略,不仅能提升系统健壮性,还能让你的技术栈更上一层楼!快来深入学习,走向架构师之路吧!
31 11
|
5天前
|
自然语言处理 负载均衡 Kubernetes
分布式系统架构2:服务发现
服务发现是分布式系统中服务实例动态注册和发现机制,确保服务间通信。主要由注册中心和服务消费者组成,支持客户端和服务端两种发现模式。注册中心需具备高可用性,常用框架有Eureka、Zookeeper、Consul等。服务注册方式包括主动注册和被动注册,核心流程涵盖服务注册、心跳检测、服务发现、服务调用和注销。
38 12
|
2天前
|
弹性计算 Java 数据库
Web应用上云经典架构实战
本课程详细介绍了Web应用上云的经典架构实战,涵盖前期准备、配置ALB、创建服务器组和监听、验证ECS公网能力、环境配置(JDK、Maven、Node、Git)、下载并运行若依框架、操作第二台ECS以及验证高可用性。通过具体步骤和命令,帮助学员快速掌握云上部署的全流程。
|
13天前
|
存储 算法 安全
分布式系统架构1:共识算法Paxos
本文介绍了分布式系统中实现数据一致性的重要算法——Paxos及其改进版Multi Paxos。Paxos算法由Leslie Lamport提出,旨在解决分布式环境下的共识问题,通过提案节点、决策节点和记录节点的协作,确保数据在多台机器间的一致性和可用性。Multi Paxos通过引入主节点选举机制,优化了基本Paxos的效率,减少了网络通信次数,提高了系统的性能和可靠性。文中还简要讨论了数据复制的安全性和一致性保障措施。
31 1
|
26天前
|
人工智能 运维 算法
引领企业未来数字基础架构浪潮,中国铁塔探索超大规模分布式算力
引领企业未来数字基础架构浪潮,中国铁塔探索超大规模分布式算力
|
21天前
|
NoSQL Java 数据处理
基于Redis海量数据场景分布式ID架构实践
【11月更文挑战第30天】在现代分布式系统中,生成全局唯一的ID是一个常见且重要的需求。在微服务架构中,各个服务可能需要生成唯一标识符,如用户ID、订单ID等。传统的自增ID已经无法满足在集群环境下保持唯一性的要求,而分布式ID解决方案能够确保即使在多个实例间也能生成全局唯一的标识符。本文将深入探讨如何利用Redis实现分布式ID生成,并通过Java语言展示多个示例,同时分析每个实践方案的优缺点。
39 8
|
17天前
|
消息中间件 架构师 数据库
本地消息表事务:10Wqps 高并发分布式事务的 终极方案,大厂架构师的 必备方案
45岁资深架构师尼恩分享了一篇关于分布式事务的文章,详细解析了如何在10Wqps高并发场景下实现分布式事务。文章从传统单体架构到微服务架构下分布式事务的需求背景出发,介绍了Seata这一开源分布式事务解决方案及其AT和TCC两种模式。随后,文章深入探讨了经典ebay本地消息表方案,以及如何使用RocketMQ消息队列替代数据库表来提高性能和可靠性。尼恩还分享了如何结合延迟消息进行事务数据的定时对账,确保最终一致性。最后,尼恩强调了高端面试中需要准备“高大上”的答案,并提供了多个技术领域的深度学习资料,帮助读者提升技术水平,顺利通过面试。
本地消息表事务:10Wqps 高并发分布式事务的 终极方案,大厂架构师的 必备方案
|
27天前
|
消息中间件 Java Kafka
实时数仓Kappa架构:从入门到实战
【11月更文挑战第24天】随着大数据技术的不断发展,企业对实时数据处理和分析的需求日益增长。实时数仓(Real-Time Data Warehouse, RTDW)应运而生,其中Kappa架构作为一种简化的数据处理架构,通过统一的流处理框架,解决了传统Lambda架构中批处理和实时处理的复杂性。本文将深入探讨Kappa架构的历史背景、业务场景、功能点、优缺点、解决的问题以及底层原理,并详细介绍如何使用Java语言快速搭建一套实时数仓。
137 4
|
2月前
|
运维 供应链 安全
SD-WAN分布式组网:构建高效、灵活的企业网络架构
本文介绍了SD-WAN(软件定义广域网)在企业分布式组网中的应用,强调其智能化流量管理、简化的网络部署、弹性扩展能力和增强的安全性等核心优势,以及在跨国企业、多云环境、零售连锁和制造业中的典型应用场景。通过合理设计网络架构、选择合适的网络连接类型、优化应用流量优先级和定期评估网络性能等最佳实践,SD-WAN助力企业实现高效、稳定的业务连接,加速数字化转型。
SD-WAN分布式组网:构建高效、灵活的企业网络架构
下一篇
DataWorks