PostgreSQL11 CDC的分布式文件采集架构实战

本文涉及的产品
云原生数据库 PolarDB MySQL 版,Serverless 5000PCU 100GB
云原生数据库 PolarDB MySQL 版,通用型 2核4GB 50GB
云原生数据库 PolarDB PostgreSQL 版,标准版 2核4GB 50GB
简介: PostgreSQL11 CDC的分布式文件采集架构实战

PostgreSQL Wal2json的变更捕获


通过CDC(数据变更捕获)机制将PostgreSQL主表变更数据以PG逻辑复制机制迁移到异构的数据库中。


先看我们的PG CDC方案:PG逻辑复制经历Wal2json解码,由pg_recvlogical监控源源不断输出到文件,然后由Storm的自定义Java程序监测文件(后续聊如何监测),并分流到Kafka Topic,来实现变更数据捕获(CDC)方案。


77eeb7a09f394b81991bbfce473a0d52.png


第一部分先安装PostgreSQL11所需的wal2json插件,并进行相关的配置

yum install postgresql11-contrib
yum install wal2json_11

编辑postgresql.conf

shared_preload_libraries = 'wal2json'
wal_level = logical
max_wal_senders = 4
max_replication_slots = 4

编辑pg_hba.conf

local   replication     all                                     trust
host    replication     all             127.0.0.1/32            trust
host    replication     all             ::1/128                 trust

第二部分,我们需要创建health_master角色,具有复制权限

create role health_master replication login password '123456';

第三步,我们需要对监控的healthcare_db数据库建立复制槽health_db_slot

#通过命令pg_recvlogical创建slot
pg_recvlogical -d healthcare_db --slot health_db_slot --create-slot -P wal2json
#slot捕获输出文件
pg_recvlogical -U healthcare_master -d healthcare_db --slot health_db_slot --start --file=healthcare_db.log

启动复制槽后, healthcare_db数据库的变化的日志都会进入healthcare_db.log文件


696eec5540dc4e4fb18df2bac2259e60.png


下来就是跟文件打交道,很有趣,每一步都不会顺心如意,但每一步的解决都有所获益,首先是对文件变化的监测,能找到很多办法,例如通过ELK家族的Filebeat工具来探测,但是外部工具不好融合进Storm,最好是自己写Java程序来监测。


Java NIO WatchService监控文件变化


其实JDK7以上版本就有一个比较不错的选择,那就是NIO包里的WatchService监控器,我觉得它有两方面的优点,其一就是由操作系统的信号通知机制,当文件目录中出现变化就发信号给应用层监控器,那么这种由操作系统主动通知的效率就远好于应用程序对文件的反复轮巡,而且不占用过多系统资源;其二编程模型并不采用观察者模式注册监听器的方案,而是将多线程问题隐藏起来,客户端对api采取循环阻塞的直观调用,这就非常有利于嵌入到各种运行容器当中去执行文件采集监控。


另外监测文件变化后按行采集变化记录我采用了RadmonAccessFile对象,这个文件操作对象常用于断点续传此类的需求,很方便,关键要设计一个可持久化的位移记录文件,保证采集器重启后总能从未读取的最新变化数据点位置开始采集数据。


19a7175c08714597a9423d636d1f6322.png



文件监控与采集功能嵌入storm集群之后又出现了一个新问题,那就是storm spout实例不会如你所愿地运行在指定的机器上,而是完全由storm集群随机地在节点上指定运行,但被监测的文件位置是固定的,反正总有笨办法:当storm集群启动后,确定spout运行的机器节点,再由该机器执行cdc文件输出程序,但是这样耦合性太强,必须跟随storm对spout实例的安排而变化采集位置,维护管理就会很麻烦,而且很容易出错。


我为Storm Spout创建了名为CDCFileSpout.java的类,下面的源代码示例是监测文件并发射流数据的核心部分:

    @Override
    public void nextTuple() {
        //读取文件位移点
        long oldpos = loadPos();
        long newpos = oldpos;
        try {
            //通过JAVA NIO包提供的WatchService监控器对文件目录的变化监控
            //阻塞式等待变化
            while ((key = watchService.take()) != null) {
                List<WatchEvent<?>> events = key.pollEvents();
                log.info("take pollEvents size : " + events.size());
                //遍历文件变化事件
                for (WatchEvent<?> event : events) {
                    //只处理PG变更输出文件
                    if (event.context().toString().equals(fileName)) {
                        //通过文件随机访问对象的创建,定位捕获文件新增记录
                        raf = new RandomAccessFile(dirName + fileName, "r");
                        if(raf.length() == 0) {
                            log.info("Empty file detected! ");
                            newpos = 0;
                            oldpos = -1;
                            break;
                        }
                        //定位位移
                        raf.seek(newpos);
                        log.info("seek pos : " + newpos);
                        String change = null;
                        //按行读取变化
                        while ((change = raf.readLine()) != null) {
                            //将变化字符串记录发射出去
                            collector.emit(new Values(change));
                        }
                        //更新位移
                        newpos = raf.getFilePointer();
                    }
                }
                //完成一次监测读取,持久化位移量
                if (newpos > oldpos) {
                    writerPos(newpos);
                    oldpos = newpos;
                    log.info("write pos : " + newpos);
                }
                key.reset();
            }
        } catch (IOException | InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            if(raf != null) {
                try {
                    raf.close();
                }catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }


GlusterFS解决Storm Spout的分布问题


因此我就引出了一个新的假设:通过分布式文件系统(dfs)来解决此问题,但是dfs的选型很重要,Hadoop hdfs肯定不行,它脱离了普通文件系统的操作方式,最终我挑选了两款dfs,一是clusterfs,二是moosefs,它们都具有fuse结合功能,通过mount dfs到本地目录的方式,让访问dfs如同访问本地目录文件一样无缝结合,dfs的任一客户端节点对文件的修改,都会在所有dfs客户端节点上被通知,因此我让storm的所有节点都成为dfs的客户端,这样无论spout随机运行在任何节点上,都可以在本节点的相同目录中去访问dfs中的被监测的文件,同时被监测文件还具有了多副本的高可靠性。


ac572c03b0c64a14b629b63220f2ec00.png


这种解决分布式计算过程中与分布式存储结合的方案,也就是storm计算节点由于是集群动态分配位置,无法固定住storm spout的文件采集位置,因此我选择了分布式文件系统的思路,主要是利用了GlusterFS连接Linux fuse(用户空间文件系统)的办法,使得每一个spout节点都是dfs客户端,那么无论spout被分配在哪个节点,都可以通过监测并读取本节点的GlusterFS客户端挂载(mount)的目录来实现对pg cdc输出文件副本的数据采集。


制服Bug的艺术


但是测试中发现一个大bug,让我虎躯一震,bug原因分析:


内置在spout中的Java文件监控器(watchservice)监控目录变化是通过操作系统传递来的信号驱动的,这样spout就可以等待式文件变化实现监控,可是我想当然的以为就算pgsql cdc输出节点与spout文件采集监控节点不是一台机器也可以,只要通过分布式文件系统同步副本,spout节点就一定能感知到当前目录副本的变化,事实上我错了,spout中的watchservice根本就感知不到目录副本的变化,因此想要得到操作系统的文件变化信号通知,必须对文件目录的读写是在一台机器上,才会有文件变化信号发送给上层应用,我之前的测试正确仅仅是因为pgsql输出和spout监控是同一台服务器。

    @Override
    protected int run(String[] args) throws Exception {
        //创建拓扑构建者
        TopologyBuilder builder = new TopologyBuilder();
        //创建并设置文件监控Spout,3个实例分配在3台不同的机器,同时只有一个实例采集
        builder.setSpout("fileSpout", new CDCFileSpout(), 3);
        //创建并设置PG变更JSON解码Bolt,只会有一个Spout传递数据
        builder.setBolt("decodeGroup", new CDCDecodeBlot(),1)
                .shuffleGrouping("fileSpout");
        //基于Storm-Kafka插件,创建并设置Kafka Producer Bolt
        KafkaBolt<String, String> kafkaBolt = configureKafkaBolt(builder);
        //KafkaBolt以数据表名字段分组接收上游分流数据,可以由3个KafkaBolt发送数据
        //每个KafkaBolt进程只发送所属“table”的数据到指定的分区(按Kafka Table名自定义分区)
        builder.setBolt("KafkaBolt", kafkaBolt, 3)
            .fieldsGrouping("decodeGroup", new Fields("table"));
        conf.setDebug(true);
        String topologyName = "PGSQLCDC";
        //计算拓扑分配3个工作进程
        conf.setNumWorkers(3);
        if (args != null && args.length > 0) {
            topologyName = args[0];
        }
        return submit(topologyName, conf, builder);
    }

那么问题就来了,我的假设就是spout不用考虑采集点的目录位置,否则逆向根据storm集群分配好spout节点地址后才能进行pg监控,显然这是颠倒流程了,又试过moosefs和nfs,结果一样,nfs还不如分布式文件系统高效,当无路可走的时候,认为自己的假设即将失败的时候,一个新的思路开启了我的灵感,为什么非要spout只设置1个并行度呢?按照参与storm集群拓扑的工作数是3个,那就设置spout并行度为3,这样每一个机器就都会有一个spout监控本地GlusterFS挂载目录,那么无论我的pg cdc输出程序是在哪个节点启动,同时只会有一个spout感应到副本变化开始推送数据,其他都是wait,这样就解决了问题,同样也保证了即便是换一个节点进行pg cdc文件输出,前一个spout实例自然wait,新的spout就工作了,依然完美地保证了pg cdc程序与spout的可靠性冗余。


相关实践学习
使用PolarDB和ECS搭建门户网站
本场景主要介绍基于PolarDB和ECS实现搭建门户网站。
阿里云数据库产品家族及特性
阿里云智能数据库产品团队一直致力于不断健全产品体系,提升产品性能,打磨产品功能,从而帮助客户实现更加极致的弹性能力、具备更强的扩展能力、并利用云设施进一步降低企业成本。以云原生+分布式为核心技术抓手,打造以自研的在线事务型(OLTP)数据库Polar DB和在线分析型(OLAP)数据库Analytic DB为代表的新一代企业级云原生数据库产品体系, 结合NoSQL数据库、数据库生态工具、云原生智能化数据库管控平台,为阿里巴巴经济体以及各个行业的企业客户和开发者提供从公共云到混合云再到私有云的完整解决方案,提供基于云基础设施进行数据从处理、到存储、再到计算与分析的一体化解决方案。本节课带你了解阿里云数据库产品家族及特性。
相关文章
|
6天前
|
存储 NoSQL Java
一天五道Java面试题----第十一天(分布式架构下,Session共享有什么方案--------->分布式事务解决方案)
这篇文章是关于Java面试中的分布式架构问题的笔记,包括分布式架构下的Session共享方案、RPC和RMI的理解、分布式ID生成方案、分布式锁解决方案以及分布式事务解决方案。
一天五道Java面试题----第十一天(分布式架构下,Session共享有什么方案--------->分布式事务解决方案)
|
1天前
|
机器学习/深度学习 分布式计算 Cloud Native
云原生架构下的高性能计算解决方案:利用分布式计算资源加速机器学习训练
【8月更文第19天】随着大数据和人工智能技术的发展,机器学习模型的训练数据量和复杂度都在迅速增长。传统的单机训练方式已经无法满足日益增长的计算需求。云原生架构为高性能计算提供了新的可能性,通过利用分布式计算资源,可以在短时间内完成大规模数据集的训练任务。本文将探讨如何在云原生环境下搭建高性能计算平台,并展示如何使用 PyTorch 和 TensorFlow 这样的流行框架进行分布式训练。
6 2
|
1天前
|
监控 Java 开发者
随着软件开发的发展,传统单体应用已难以适应现代业务需求,微服务架构因此兴起,成为构建可伸缩、分布式系统的主流
随着软件开发的发展,传统单体应用已难以适应现代业务需求,微服务架构因此兴起,成为构建可伸缩、分布式系统的主流。本文探讨Java微服务架构的设计原则与实践。核心思想是将应用拆分为独立服务单元,增强模块化与扩展性。Java开发者可利用Spring Boot等框架简化开发流程。设计时需遵循单一职责、自治性和面向接口编程的原则。以电商系统为例,将订单处理、商品管理和用户认证等拆分为独立服务,提高可维护性和容错能力。还需考虑服务间通信、数据一致性及监控等高级话题。掌握这些原则和工具,开发者能构建高效、可维护的微服务应用,更好地应对未来挑战。
5 1
|
3天前
|
弹性计算 监控 数据挖掘
事件驱动架构的优势与应用:深度解析与实战应用
【8月更文挑战第17天】事件驱动架构以其松耦合、可扩展性、异步处理、实时性和高可靠性等优势,在实时数据处理、复杂业务流程、弹性伸缩和实时通信等多个领域展现出巨大的应用潜力。通过合理应用事件驱动架构,可以构建灵活、可扩展和可维护的系统架构,满足不断变化的业务需求和技术挑战。对于开发者而言,深入理解事件驱动架构的核心概念和优势,将有助于更好地设计和实现高质量的软件系统。
|
9天前
|
消息中间件 Java Kafka
"Kafka快速上手:从环境搭建到Java Producer与Consumer实战,轻松掌握分布式流处理平台"
【8月更文挑战第10天】Apache Kafka作为分布式流处理平台的领头羊,凭借其高吞吐量、可扩展性和容错性,在大数据处理、实时日志收集及消息队列领域表现卓越。初学者需掌握Kafka基本概念与操作。Kafka的核心组件包括Producer(生产者)、Broker(服务器)和Consumer(消费者)。Producer发送消息到Topic,Broker负责存储与转发,Consumer则读取这些消息。首先确保已安装Java和Kafka,并启动服务。接着可通过命令行创建Topic,并使用提供的Java API实现Producer发送消息和Consumer读取消息的功能。
32 8
|
12天前
|
Cloud Native 云计算 微服务
云原生时代:企业分布式应用架构的惊人蜕变,从SOA到微服务的大逃亡!
【8月更文挑战第8天】在云计算与容器技术推动下,企业分布式应用架构正经历从SOA到微服务再到云原生的深刻变革。SOA强调服务重用与组合,通过标准化接口实现服务解耦;微服务以细粒度划分服务,增强系统灵活性;云原生架构借助容器化与自动化技术简化部署与管理。每一步演进都为企业带来新的技术挑战与机遇。
50 6
|
18天前
|
XML 存储 Android开发
Android实战经验之Kotlin中快速实现MVI架构
本文介绍MVI(Model-View-Intent)架构模式,强调单向数据流与不可变状态管理,提升Android应用的可维护性和可测试性。MVI分为Model(存储数据)、View(展示UI)、Intent(用户动作)、State(UI状态)与ViewModel(处理逻辑)。通过Kotlin示例展示了MVI的实现过程,包括定义Model、State、Intent及创建ViewModel,并在View中观察状态更新UI。
56 12
|
14天前
|
Kubernetes 负载均衡 算法
如何在kubernetes中实现分布式可扩展的WebSocket服务架构
如何在kubernetes中实现分布式可扩展的WebSocket服务架构
26 1
|
18天前
|
存储 监控 安全
|
21天前
|
NoSQL 算法 Java
(十三)全面理解并发编程之分布式架构下Redis、ZK分布式锁的前世今生
本文探讨了从单体架构下的锁机制到分布式架构下的线程安全问题,并详细分析了分布式锁的实现原理和过程。