PostgreSQL11 CDC的分布式文件采集架构实战

本文涉及的产品
云原生数据库 PolarDB MySQL 版,通用型 2核4GB 50GB
云原生数据库 PolarDB PostgreSQL 版,标准版 2核4GB 50GB
简介: PostgreSQL11 CDC的分布式文件采集架构实战

PostgreSQL Wal2json的变更捕获


通过CDC(数据变更捕获)机制将PostgreSQL主表变更数据以PG逻辑复制机制迁移到异构的数据库中。


先看我们的PG CDC方案:PG逻辑复制经历Wal2json解码,由pg_recvlogical监控源源不断输出到文件,然后由Storm的自定义Java程序监测文件(后续聊如何监测),并分流到Kafka Topic,来实现变更数据捕获(CDC)方案。


77eeb7a09f394b81991bbfce473a0d52.png


第一部分先安装PostgreSQL11所需的wal2json插件,并进行相关的配置

yum install postgresql11-contrib
yum install wal2json_11

编辑postgresql.conf

shared_preload_libraries = 'wal2json'
wal_level = logical
max_wal_senders = 4
max_replication_slots = 4

编辑pg_hba.conf

local   replication     all                                     trust
host    replication     all             127.0.0.1/32            trust
host    replication     all             ::1/128                 trust

第二部分,我们需要创建health_master角色,具有复制权限

create role health_master replication login password '123456';

第三步,我们需要对监控的healthcare_db数据库建立复制槽health_db_slot

#通过命令pg_recvlogical创建slot
pg_recvlogical -d healthcare_db --slot health_db_slot --create-slot -P wal2json
#slot捕获输出文件
pg_recvlogical -U healthcare_master -d healthcare_db --slot health_db_slot --start --file=healthcare_db.log

启动复制槽后, healthcare_db数据库的变化的日志都会进入healthcare_db.log文件


696eec5540dc4e4fb18df2bac2259e60.png


下来就是跟文件打交道,很有趣,每一步都不会顺心如意,但每一步的解决都有所获益,首先是对文件变化的监测,能找到很多办法,例如通过ELK家族的Filebeat工具来探测,但是外部工具不好融合进Storm,最好是自己写Java程序来监测。


Java NIO WatchService监控文件变化


其实JDK7以上版本就有一个比较不错的选择,那就是NIO包里的WatchService监控器,我觉得它有两方面的优点,其一就是由操作系统的信号通知机制,当文件目录中出现变化就发信号给应用层监控器,那么这种由操作系统主动通知的效率就远好于应用程序对文件的反复轮巡,而且不占用过多系统资源;其二编程模型并不采用观察者模式注册监听器的方案,而是将多线程问题隐藏起来,客户端对api采取循环阻塞的直观调用,这就非常有利于嵌入到各种运行容器当中去执行文件采集监控。


另外监测文件变化后按行采集变化记录我采用了RadmonAccessFile对象,这个文件操作对象常用于断点续传此类的需求,很方便,关键要设计一个可持久化的位移记录文件,保证采集器重启后总能从未读取的最新变化数据点位置开始采集数据。


19a7175c08714597a9423d636d1f6322.png



文件监控与采集功能嵌入storm集群之后又出现了一个新问题,那就是storm spout实例不会如你所愿地运行在指定的机器上,而是完全由storm集群随机地在节点上指定运行,但被监测的文件位置是固定的,反正总有笨办法:当storm集群启动后,确定spout运行的机器节点,再由该机器执行cdc文件输出程序,但是这样耦合性太强,必须跟随storm对spout实例的安排而变化采集位置,维护管理就会很麻烦,而且很容易出错。


我为Storm Spout创建了名为CDCFileSpout.java的类,下面的源代码示例是监测文件并发射流数据的核心部分:

    @Override
    public void nextTuple() {
        //读取文件位移点
        long oldpos = loadPos();
        long newpos = oldpos;
        try {
            //通过JAVA NIO包提供的WatchService监控器对文件目录的变化监控
            //阻塞式等待变化
            while ((key = watchService.take()) != null) {
                List<WatchEvent<?>> events = key.pollEvents();
                log.info("take pollEvents size : " + events.size());
                //遍历文件变化事件
                for (WatchEvent<?> event : events) {
                    //只处理PG变更输出文件
                    if (event.context().toString().equals(fileName)) {
                        //通过文件随机访问对象的创建,定位捕获文件新增记录
                        raf = new RandomAccessFile(dirName + fileName, "r");
                        if(raf.length() == 0) {
                            log.info("Empty file detected! ");
                            newpos = 0;
                            oldpos = -1;
                            break;
                        }
                        //定位位移
                        raf.seek(newpos);
                        log.info("seek pos : " + newpos);
                        String change = null;
                        //按行读取变化
                        while ((change = raf.readLine()) != null) {
                            //将变化字符串记录发射出去
                            collector.emit(new Values(change));
                        }
                        //更新位移
                        newpos = raf.getFilePointer();
                    }
                }
                //完成一次监测读取,持久化位移量
                if (newpos > oldpos) {
                    writerPos(newpos);
                    oldpos = newpos;
                    log.info("write pos : " + newpos);
                }
                key.reset();
            }
        } catch (IOException | InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            if(raf != null) {
                try {
                    raf.close();
                }catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }


GlusterFS解决Storm Spout的分布问题


因此我就引出了一个新的假设:通过分布式文件系统(dfs)来解决此问题,但是dfs的选型很重要,Hadoop hdfs肯定不行,它脱离了普通文件系统的操作方式,最终我挑选了两款dfs,一是clusterfs,二是moosefs,它们都具有fuse结合功能,通过mount dfs到本地目录的方式,让访问dfs如同访问本地目录文件一样无缝结合,dfs的任一客户端节点对文件的修改,都会在所有dfs客户端节点上被通知,因此我让storm的所有节点都成为dfs的客户端,这样无论spout随机运行在任何节点上,都可以在本节点的相同目录中去访问dfs中的被监测的文件,同时被监测文件还具有了多副本的高可靠性。


ac572c03b0c64a14b629b63220f2ec00.png


这种解决分布式计算过程中与分布式存储结合的方案,也就是storm计算节点由于是集群动态分配位置,无法固定住storm spout的文件采集位置,因此我选择了分布式文件系统的思路,主要是利用了GlusterFS连接Linux fuse(用户空间文件系统)的办法,使得每一个spout节点都是dfs客户端,那么无论spout被分配在哪个节点,都可以通过监测并读取本节点的GlusterFS客户端挂载(mount)的目录来实现对pg cdc输出文件副本的数据采集。


制服Bug的艺术


但是测试中发现一个大bug,让我虎躯一震,bug原因分析:


内置在spout中的Java文件监控器(watchservice)监控目录变化是通过操作系统传递来的信号驱动的,这样spout就可以等待式文件变化实现监控,可是我想当然的以为就算pgsql cdc输出节点与spout文件采集监控节点不是一台机器也可以,只要通过分布式文件系统同步副本,spout节点就一定能感知到当前目录副本的变化,事实上我错了,spout中的watchservice根本就感知不到目录副本的变化,因此想要得到操作系统的文件变化信号通知,必须对文件目录的读写是在一台机器上,才会有文件变化信号发送给上层应用,我之前的测试正确仅仅是因为pgsql输出和spout监控是同一台服务器。

    @Override
    protected int run(String[] args) throws Exception {
        //创建拓扑构建者
        TopologyBuilder builder = new TopologyBuilder();
        //创建并设置文件监控Spout,3个实例分配在3台不同的机器,同时只有一个实例采集
        builder.setSpout("fileSpout", new CDCFileSpout(), 3);
        //创建并设置PG变更JSON解码Bolt,只会有一个Spout传递数据
        builder.setBolt("decodeGroup", new CDCDecodeBlot(),1)
                .shuffleGrouping("fileSpout");
        //基于Storm-Kafka插件,创建并设置Kafka Producer Bolt
        KafkaBolt<String, String> kafkaBolt = configureKafkaBolt(builder);
        //KafkaBolt以数据表名字段分组接收上游分流数据,可以由3个KafkaBolt发送数据
        //每个KafkaBolt进程只发送所属“table”的数据到指定的分区(按Kafka Table名自定义分区)
        builder.setBolt("KafkaBolt", kafkaBolt, 3)
            .fieldsGrouping("decodeGroup", new Fields("table"));
        conf.setDebug(true);
        String topologyName = "PGSQLCDC";
        //计算拓扑分配3个工作进程
        conf.setNumWorkers(3);
        if (args != null && args.length > 0) {
            topologyName = args[0];
        }
        return submit(topologyName, conf, builder);
    }

那么问题就来了,我的假设就是spout不用考虑采集点的目录位置,否则逆向根据storm集群分配好spout节点地址后才能进行pg监控,显然这是颠倒流程了,又试过moosefs和nfs,结果一样,nfs还不如分布式文件系统高效,当无路可走的时候,认为自己的假设即将失败的时候,一个新的思路开启了我的灵感,为什么非要spout只设置1个并行度呢?按照参与storm集群拓扑的工作数是3个,那就设置spout并行度为3,这样每一个机器就都会有一个spout监控本地GlusterFS挂载目录,那么无论我的pg cdc输出程序是在哪个节点启动,同时只会有一个spout感应到副本变化开始推送数据,其他都是wait,这样就解决了问题,同样也保证了即便是换一个节点进行pg cdc文件输出,前一个spout实例自然wait,新的spout就工作了,依然完美地保证了pg cdc程序与spout的可靠性冗余。


相关实践学习
使用PolarDB和ECS搭建门户网站
本场景主要介绍基于PolarDB和ECS实现搭建门户网站。
阿里云数据库产品家族及特性
阿里云智能数据库产品团队一直致力于不断健全产品体系,提升产品性能,打磨产品功能,从而帮助客户实现更加极致的弹性能力、具备更强的扩展能力、并利用云设施进一步降低企业成本。以云原生+分布式为核心技术抓手,打造以自研的在线事务型(OLTP)数据库Polar DB和在线分析型(OLAP)数据库Analytic DB为代表的新一代企业级云原生数据库产品体系, 结合NoSQL数据库、数据库生态工具、云原生智能化数据库管控平台,为阿里巴巴经济体以及各个行业的企业客户和开发者提供从公共云到混合云再到私有云的完整解决方案,提供基于云基础设施进行数据从处理、到存储、再到计算与分析的一体化解决方案。本节课带你了解阿里云数据库产品家族及特性。
相关文章
|
8天前
|
弹性计算 Java 数据库
Web应用上云经典架构实战
本课程详细介绍了Web应用上云的经典架构实战,涵盖前期准备、配置ALB、创建服务器组和监听、验证ECS公网能力、环境配置(JDK、Maven、Node、Git)、下载并运行若依框架、操作第二台ECS以及验证高可用性。通过具体步骤和命令,帮助学员快速掌握云上部署的全流程。
|
1月前
|
消息中间件 Java Kafka
实时数仓Kappa架构:从入门到实战
【11月更文挑战第24天】随着大数据技术的不断发展,企业对实时数据处理和分析的需求日益增长。实时数仓(Real-Time Data Warehouse, RTDW)应运而生,其中Kappa架构作为一种简化的数据处理架构,通过统一的流处理框架,解决了传统Lambda架构中批处理和实时处理的复杂性。本文将深入探讨Kappa架构的历史背景、业务场景、功能点、优缺点、解决的问题以及底层原理,并详细介绍如何使用Java语言快速搭建一套实时数仓。
180 4
|
1月前
|
运维 NoSQL Java
后端架构演进:微服务架构的优缺点与实战案例分析
【10月更文挑战第28天】本文探讨了微服务架构与单体架构的优缺点,并通过实战案例分析了微服务架构在实际应用中的表现。微服务架构具有高内聚、低耦合、独立部署等优势,但也面临分布式系统的复杂性和较高的运维成本。通过某电商平台的实际案例,展示了微服务架构在提升系统性能和团队协作效率方面的显著效果,同时也指出了其带来的挑战。
86 4
|
2月前
|
存储 前端开发 API
DDD领域驱动设计实战-分层架构
DDD分层架构通过明确各层职责及交互规则,有效降低了层间依赖。其基本原则是每层仅与下方层耦合,分为严格和松散两种形式。架构演进包括传统四层架构与改良版四层架构,后者采用依赖反转设计原则优化基础设施层位置。各层职责分明:用户接口层处理显示与请求;应用层负责服务编排与组合;领域层实现业务逻辑;基础层提供技术基础服务。通过合理设计聚合与依赖关系,DDD支持微服务架构灵活演进,提升系统适应性和可维护性。
|
2月前
|
存储 开发框架 .NET
C#语言如何搭建分布式文件存储系统
C#语言如何搭建分布式文件存储系统
87 2
|
2月前
|
存储 数据采集 分布式计算
Hadoop-17 Flume 介绍与环境配置 实机云服务器测试 分布式日志信息收集 海量数据 实时采集引擎 Source Channel Sink 串行复制负载均衡
Hadoop-17 Flume 介绍与环境配置 实机云服务器测试 分布式日志信息收集 海量数据 实时采集引擎 Source Channel Sink 串行复制负载均衡
57 1
|
2月前
|
分布式计算 Hadoop 网络安全
Hadoop-08-HDFS集群 基础知识 命令行上机实操 hadoop fs 分布式文件系统 读写原理 读流程与写流程 基本语法上传下载拷贝移动文件
Hadoop-08-HDFS集群 基础知识 命令行上机实操 hadoop fs 分布式文件系统 读写原理 读流程与写流程 基本语法上传下载拷贝移动文件
46 1
|
2月前
|
存储 机器学习/深度学习 缓存
Hadoop-07-HDFS集群 基础知识 分布式文件系统 读写原理 读流程与写流程 基本语法上传下载拷贝移动文件
Hadoop-07-HDFS集群 基础知识 分布式文件系统 读写原理 读流程与写流程 基本语法上传下载拷贝移动文件
58 1
|
3月前
|
运维 持续交付 API
深入理解并实践微服务架构:从理论到实战
深入理解并实践微服务架构:从理论到实战
155 3
|
3月前
|
存储 缓存 负载均衡
亿级流量架构理论+秒杀实战系列(二)
亿级流量架构理论+秒杀实战系列(二)