PostgreSQL11 CDC的分布式文件采集架构实战

本文涉及的产品
PolarDB Agent Express,2核4GB
PolarDB Agent Flow,2核4GB
云数据库 PolarDB MySQL 版,列存表分析加速 8核16GB
简介: PostgreSQL11 CDC的分布式文件采集架构实战

PostgreSQL Wal2json的变更捕获


通过CDC(数据变更捕获)机制将PostgreSQL主表变更数据以PG逻辑复制机制迁移到异构的数据库中。


先看我们的PG CDC方案:PG逻辑复制经历Wal2json解码,由pg_recvlogical监控源源不断输出到文件,然后由Storm的自定义Java程序监测文件(后续聊如何监测),并分流到Kafka Topic,来实现变更数据捕获(CDC)方案。


77eeb7a09f394b81991bbfce473a0d52.png


第一部分先安装PostgreSQL11所需的wal2json插件,并进行相关的配置

yum install postgresql11-contrib
yum install wal2json_11

编辑postgresql.conf

shared_preload_libraries = 'wal2json'
wal_level = logical
max_wal_senders = 4
max_replication_slots = 4

编辑pg_hba.conf

local   replication     all                                     trust
host    replication     all             127.0.0.1/32            trust
host    replication     all             ::1/128                 trust

第二部分,我们需要创建health_master角色,具有复制权限

create role health_master replication login password '123456';

第三步,我们需要对监控的healthcare_db数据库建立复制槽health_db_slot

#通过命令pg_recvlogical创建slot
pg_recvlogical -d healthcare_db --slot health_db_slot --create-slot -P wal2json
#slot捕获输出文件
pg_recvlogical -U healthcare_master -d healthcare_db --slot health_db_slot --start --file=healthcare_db.log

启动复制槽后, healthcare_db数据库的变化的日志都会进入healthcare_db.log文件


696eec5540dc4e4fb18df2bac2259e60.png


下来就是跟文件打交道,很有趣,每一步都不会顺心如意,但每一步的解决都有所获益,首先是对文件变化的监测,能找到很多办法,例如通过ELK家族的Filebeat工具来探测,但是外部工具不好融合进Storm,最好是自己写Java程序来监测。


Java NIO WatchService监控文件变化


其实JDK7以上版本就有一个比较不错的选择,那就是NIO包里的WatchService监控器,我觉得它有两方面的优点,其一就是由操作系统的信号通知机制,当文件目录中出现变化就发信号给应用层监控器,那么这种由操作系统主动通知的效率就远好于应用程序对文件的反复轮巡,而且不占用过多系统资源;其二编程模型并不采用观察者模式注册监听器的方案,而是将多线程问题隐藏起来,客户端对api采取循环阻塞的直观调用,这就非常有利于嵌入到各种运行容器当中去执行文件采集监控。


另外监测文件变化后按行采集变化记录我采用了RadmonAccessFile对象,这个文件操作对象常用于断点续传此类的需求,很方便,关键要设计一个可持久化的位移记录文件,保证采集器重启后总能从未读取的最新变化数据点位置开始采集数据。


19a7175c08714597a9423d636d1f6322.png



文件监控与采集功能嵌入storm集群之后又出现了一个新问题,那就是storm spout实例不会如你所愿地运行在指定的机器上,而是完全由storm集群随机地在节点上指定运行,但被监测的文件位置是固定的,反正总有笨办法:当storm集群启动后,确定spout运行的机器节点,再由该机器执行cdc文件输出程序,但是这样耦合性太强,必须跟随storm对spout实例的安排而变化采集位置,维护管理就会很麻烦,而且很容易出错。


我为Storm Spout创建了名为CDCFileSpout.java的类,下面的源代码示例是监测文件并发射流数据的核心部分:

    @Override
    public void nextTuple() {
        //读取文件位移点
        long oldpos = loadPos();
        long newpos = oldpos;
        try {
            //通过JAVA NIO包提供的WatchService监控器对文件目录的变化监控
            //阻塞式等待变化
            while ((key = watchService.take()) != null) {
                List<WatchEvent<?>> events = key.pollEvents();
                log.info("take pollEvents size : " + events.size());
                //遍历文件变化事件
                for (WatchEvent<?> event : events) {
                    //只处理PG变更输出文件
                    if (event.context().toString().equals(fileName)) {
                        //通过文件随机访问对象的创建,定位捕获文件新增记录
                        raf = new RandomAccessFile(dirName + fileName, "r");
                        if(raf.length() == 0) {
                            log.info("Empty file detected! ");
                            newpos = 0;
                            oldpos = -1;
                            break;
                        }
                        //定位位移
                        raf.seek(newpos);
                        log.info("seek pos : " + newpos);
                        String change = null;
                        //按行读取变化
                        while ((change = raf.readLine()) != null) {
                            //将变化字符串记录发射出去
                            collector.emit(new Values(change));
                        }
                        //更新位移
                        newpos = raf.getFilePointer();
                    }
                }
                //完成一次监测读取,持久化位移量
                if (newpos > oldpos) {
                    writerPos(newpos);
                    oldpos = newpos;
                    log.info("write pos : " + newpos);
                }
                key.reset();
            }
        } catch (IOException | InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            if(raf != null) {
                try {
                    raf.close();
                }catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }


GlusterFS解决Storm Spout的分布问题


因此我就引出了一个新的假设:通过分布式文件系统(dfs)来解决此问题,但是dfs的选型很重要,Hadoop hdfs肯定不行,它脱离了普通文件系统的操作方式,最终我挑选了两款dfs,一是clusterfs,二是moosefs,它们都具有fuse结合功能,通过mount dfs到本地目录的方式,让访问dfs如同访问本地目录文件一样无缝结合,dfs的任一客户端节点对文件的修改,都会在所有dfs客户端节点上被通知,因此我让storm的所有节点都成为dfs的客户端,这样无论spout随机运行在任何节点上,都可以在本节点的相同目录中去访问dfs中的被监测的文件,同时被监测文件还具有了多副本的高可靠性。


ac572c03b0c64a14b629b63220f2ec00.png


这种解决分布式计算过程中与分布式存储结合的方案,也就是storm计算节点由于是集群动态分配位置,无法固定住storm spout的文件采集位置,因此我选择了分布式文件系统的思路,主要是利用了GlusterFS连接Linux fuse(用户空间文件系统)的办法,使得每一个spout节点都是dfs客户端,那么无论spout被分配在哪个节点,都可以通过监测并读取本节点的GlusterFS客户端挂载(mount)的目录来实现对pg cdc输出文件副本的数据采集。


制服Bug的艺术


但是测试中发现一个大bug,让我虎躯一震,bug原因分析:


内置在spout中的Java文件监控器(watchservice)监控目录变化是通过操作系统传递来的信号驱动的,这样spout就可以等待式文件变化实现监控,可是我想当然的以为就算pgsql cdc输出节点与spout文件采集监控节点不是一台机器也可以,只要通过分布式文件系统同步副本,spout节点就一定能感知到当前目录副本的变化,事实上我错了,spout中的watchservice根本就感知不到目录副本的变化,因此想要得到操作系统的文件变化信号通知,必须对文件目录的读写是在一台机器上,才会有文件变化信号发送给上层应用,我之前的测试正确仅仅是因为pgsql输出和spout监控是同一台服务器。

    @Override
    protected int run(String[] args) throws Exception {
        //创建拓扑构建者
        TopologyBuilder builder = new TopologyBuilder();
        //创建并设置文件监控Spout,3个实例分配在3台不同的机器,同时只有一个实例采集
        builder.setSpout("fileSpout", new CDCFileSpout(), 3);
        //创建并设置PG变更JSON解码Bolt,只会有一个Spout传递数据
        builder.setBolt("decodeGroup", new CDCDecodeBlot(),1)
                .shuffleGrouping("fileSpout");
        //基于Storm-Kafka插件,创建并设置Kafka Producer Bolt
        KafkaBolt<String, String> kafkaBolt = configureKafkaBolt(builder);
        //KafkaBolt以数据表名字段分组接收上游分流数据,可以由3个KafkaBolt发送数据
        //每个KafkaBolt进程只发送所属“table”的数据到指定的分区(按Kafka Table名自定义分区)
        builder.setBolt("KafkaBolt", kafkaBolt, 3)
            .fieldsGrouping("decodeGroup", new Fields("table"));
        conf.setDebug(true);
        String topologyName = "PGSQLCDC";
        //计算拓扑分配3个工作进程
        conf.setNumWorkers(3);
        if (args != null && args.length > 0) {
            topologyName = args[0];
        }
        return submit(topologyName, conf, builder);
    }

那么问题就来了,我的假设就是spout不用考虑采集点的目录位置,否则逆向根据storm集群分配好spout节点地址后才能进行pg监控,显然这是颠倒流程了,又试过moosefs和nfs,结果一样,nfs还不如分布式文件系统高效,当无路可走的时候,认为自己的假设即将失败的时候,一个新的思路开启了我的灵感,为什么非要spout只设置1个并行度呢?按照参与storm集群拓扑的工作数是3个,那就设置spout并行度为3,这样每一个机器就都会有一个spout监控本地GlusterFS挂载目录,那么无论我的pg cdc输出程序是在哪个节点启动,同时只会有一个spout感应到副本变化开始推送数据,其他都是wait,这样就解决了问题,同样也保证了即便是换一个节点进行pg cdc文件输出,前一个spout实例自然wait,新的spout就工作了,依然完美地保证了pg cdc程序与spout的可靠性冗余。


相关实践学习
使用PolarDB和ECS搭建门户网站
本场景主要介绍如何基于PolarDB和ECS实现搭建门户网站。
阿里云数据库产品家族及特性
阿里云智能数据库产品团队一直致力于不断健全产品体系,提升产品性能,打磨产品功能,从而帮助客户实现更加极致的弹性能力、具备更强的扩展能力、并利用云设施进一步降低企业成本。以云原生+分布式为核心技术抓手,打造以自研的在线事务型(OLTP)数据库Polar DB和在线分析型(OLAP)数据库Analytic DB为代表的新一代企业级云原生数据库产品体系, 结合NoSQL数据库、数据库生态工具、云原生智能化数据库管控平台,为阿里巴巴经济体以及各个行业的企业客户和开发者提供从公共云到混合云再到私有云的完整解决方案,提供基于云基础设施进行数据从处理、到存储、再到计算与分析的一体化解决方案。本节课带你了解阿里云数据库产品家族及特性。
相关文章
|
10月前
|
人工智能 监控 前端开发
支付宝 AI 出行助手高效研发指南:4 人团队的架构迁移与提效实战
支付宝「AI 出行助手」是一款集成公交、地铁、火车票、机票、打车等多项功能的智能出行产品。
1465 21
支付宝 AI 出行助手高效研发指南:4 人团队的架构迁移与提效实战
|
10月前
|
消息中间件 Java 数据库
Java 基于 DDD 分层架构实战从基础到精通最新实操全流程指南
本文详解基于Java的领域驱动设计(DDD)分层架构实战,结合Spring Boot 3.x、Spring Data JPA 3.x等最新技术栈,通过电商订单系统案例展示如何构建清晰、可维护的微服务架构。内容涵盖项目结构设计、各层实现细节及关键技术点,助力开发者掌握DDD在复杂业务系统中的应用。
1834 0
|
8月前
|
Cloud Native Serverless API
微服务架构实战指南:从单体应用到云原生的蜕变之路
🌟蒋星熠Jaxonic,代码为舟的星际旅人。深耕微服务架构,擅以DDD拆分服务、构建高可用通信与治理体系。分享从单体到云原生的实战经验,探索技术演进的无限可能。
微服务架构实战指南:从单体应用到云原生的蜕变之路
|
8月前
|
缓存 Cloud Native 中间件
《聊聊分布式》从单体到分布式:电商系统架构演进之路
本文系统阐述了电商平台从单体到分布式架构的演进历程,剖析了单体架构的局限性与分布式架构的优势,结合淘宝、京东等真实案例,深入探讨了服务拆分、数据库分片、中间件体系等关键技术实践,并总结了渐进式迁移策略与核心经验,为大型应用架构升级提供了全面参考。
|
8月前
|
监控 Cloud Native Java
Spring Boot 3.x 微服务架构实战指南
🌟蒋星熠Jaxonic,技术宇宙中的星际旅人。深耕Spring Boot 3.x与微服务架构,探索云原生、性能优化与高可用系统设计。以代码为笔,在二进制星河中谱写极客诗篇。关注我,共赴技术星辰大海!(238字)
1314 2
Spring Boot 3.x 微服务架构实战指南
|
9月前
|
消息中间件 数据采集 NoSQL
秒级行情推送系统实战:从触发、采集到入库的端到端架构
本文设计了一套秒级实时行情推送系统,涵盖触发、采集、缓冲、入库与推送五层架构,结合动态代理IP、Kafka/Redis缓冲及WebSocket推送,实现金融数据低延迟、高并发处理,适用于股票、数字货币等实时行情场景。
1381 3
秒级行情推送系统实战:从触发、采集到入库的端到端架构
|
9月前
|
设计模式 人工智能 API
AI智能体开发实战:17种核心架构模式详解与Python代码实现
本文系统解析17种智能体架构设计模式,涵盖多智能体协作、思维树、反思优化与工具调用等核心范式,结合LangChain与LangGraph实现代码工作流,并通过真实案例验证效果,助力构建高效AI系统。
970 7
|
8月前
|
存储 NoSQL 前端开发
【赵渝强老师】MongoDB的分布式存储架构
MongoDB分片通过将数据分布到多台服务器,实现海量数据的高效存储与读写。其架构包含路由、配置服务器和分片服务器,支持水平扩展,结合复制集保障高可用性,适用于大规模生产环境。
550 1

热门文章

最新文章

推荐镜像

更多