PostgreSQL Wal2json的变更捕获
通过CDC(数据变更捕获)机制将PostgreSQL主表变更数据以PG逻辑复制机制迁移到异构的数据库中。
先看我们的PG CDC方案:PG逻辑复制经历Wal2json解码,由pg_recvlogical监控源源不断输出到文件,然后由Storm的自定义Java程序监测文件(后续聊如何监测),并分流到Kafka Topic,来实现变更数据捕获(CDC)方案。
第一部分先安装PostgreSQL11所需的wal2json插件,并进行相关的配置
yum install postgresql11-contrib yum install wal2json_11
编辑postgresql.conf
shared_preload_libraries = 'wal2json' wal_level = logical max_wal_senders = 4 max_replication_slots = 4
编辑pg_hba.conf
local replication all trust host replication all 127.0.0.1/32 trust host replication all ::1/128 trust
第二部分,我们需要创建health_master角色,具有复制权限
create role health_master replication login password '123456';
第三步,我们需要对监控的healthcare_db数据库建立复制槽health_db_slot
#通过命令pg_recvlogical创建slot pg_recvlogical -d healthcare_db --slot health_db_slot --create-slot -P wal2json #slot捕获输出文件 pg_recvlogical -U healthcare_master -d healthcare_db --slot health_db_slot --start --file=healthcare_db.log
启动复制槽后, healthcare_db数据库的变化的日志都会进入healthcare_db.log文件
下来就是跟文件打交道,很有趣,每一步都不会顺心如意,但每一步的解决都有所获益,首先是对文件变化的监测,能找到很多办法,例如通过ELK家族的Filebeat工具来探测,但是外部工具不好融合进Storm,最好是自己写Java程序来监测。
Java NIO WatchService监控文件变化
其实JDK7以上版本就有一个比较不错的选择,那就是NIO包里的WatchService监控器,我觉得它有两方面的优点,其一就是由操作系统的信号通知机制,当文件目录中出现变化就发信号给应用层监控器,那么这种由操作系统主动通知的效率就远好于应用程序对文件的反复轮巡,而且不占用过多系统资源;其二编程模型并不采用观察者模式注册监听器的方案,而是将多线程问题隐藏起来,客户端对api采取循环阻塞的直观调用,这就非常有利于嵌入到各种运行容器当中去执行文件采集监控。
另外监测文件变化后按行采集变化记录我采用了RadmonAccessFile对象,这个文件操作对象常用于断点续传此类的需求,很方便,关键要设计一个可持久化的位移记录文件,保证采集器重启后总能从未读取的最新变化数据点位置开始采集数据。
文件监控与采集功能嵌入storm集群之后又出现了一个新问题,那就是storm spout实例不会如你所愿地运行在指定的机器上,而是完全由storm集群随机地在节点上指定运行,但被监测的文件位置是固定的,反正总有笨办法:当storm集群启动后,确定spout运行的机器节点,再由该机器执行cdc文件输出程序,但是这样耦合性太强,必须跟随storm对spout实例的安排而变化采集位置,维护管理就会很麻烦,而且很容易出错。
我为Storm Spout创建了名为CDCFileSpout.java的类,下面的源代码示例是监测文件并发射流数据的核心部分:
@Override public void nextTuple() { //读取文件位移点 long oldpos = loadPos(); long newpos = oldpos; try { //通过JAVA NIO包提供的WatchService监控器对文件目录的变化监控 //阻塞式等待变化 while ((key = watchService.take()) != null) { List<WatchEvent<?>> events = key.pollEvents(); log.info("take pollEvents size : " + events.size()); //遍历文件变化事件 for (WatchEvent<?> event : events) { //只处理PG变更输出文件 if (event.context().toString().equals(fileName)) { //通过文件随机访问对象的创建,定位捕获文件新增记录 raf = new RandomAccessFile(dirName + fileName, "r"); if(raf.length() == 0) { log.info("Empty file detected! "); newpos = 0; oldpos = -1; break; } //定位位移 raf.seek(newpos); log.info("seek pos : " + newpos); String change = null; //按行读取变化 while ((change = raf.readLine()) != null) { //将变化字符串记录发射出去 collector.emit(new Values(change)); } //更新位移 newpos = raf.getFilePointer(); } } //完成一次监测读取,持久化位移量 if (newpos > oldpos) { writerPos(newpos); oldpos = newpos; log.info("write pos : " + newpos); } key.reset(); } } catch (IOException | InterruptedException e) { throw new RuntimeException(e); } finally { if(raf != null) { try { raf.close(); }catch (IOException e) { e.printStackTrace(); } } } }
GlusterFS解决Storm Spout的分布问题
因此我就引出了一个新的假设:通过分布式文件系统(dfs)来解决此问题,但是dfs的选型很重要,Hadoop hdfs肯定不行,它脱离了普通文件系统的操作方式,最终我挑选了两款dfs,一是clusterfs,二是moosefs,它们都具有fuse结合功能,通过mount dfs到本地目录的方式,让访问dfs如同访问本地目录文件一样无缝结合,dfs的任一客户端节点对文件的修改,都会在所有dfs客户端节点上被通知,因此我让storm的所有节点都成为dfs的客户端,这样无论spout随机运行在任何节点上,都可以在本节点的相同目录中去访问dfs中的被监测的文件,同时被监测文件还具有了多副本的高可靠性。
这种解决分布式计算过程中与分布式存储结合的方案,也就是storm计算节点由于是集群动态分配位置,无法固定住storm spout的文件采集位置,因此我选择了分布式文件系统的思路,主要是利用了GlusterFS连接Linux fuse(用户空间文件系统)的办法,使得每一个spout节点都是dfs客户端,那么无论spout被分配在哪个节点,都可以通过监测并读取本节点的GlusterFS客户端挂载(mount)的目录来实现对pg cdc输出文件副本的数据采集。
制服Bug的艺术
但是测试中发现一个大bug,让我虎躯一震,bug原因分析:
内置在spout中的Java文件监控器(watchservice)监控目录变化是通过操作系统传递来的信号驱动的,这样spout就可以等待式文件变化实现监控,可是我想当然的以为就算pgsql cdc输出节点与spout文件采集监控节点不是一台机器也可以,只要通过分布式文件系统同步副本,spout节点就一定能感知到当前目录副本的变化,事实上我错了,spout中的watchservice根本就感知不到目录副本的变化,因此想要得到操作系统的文件变化信号通知,必须对文件目录的读写是在一台机器上,才会有文件变化信号发送给上层应用,我之前的测试正确仅仅是因为pgsql输出和spout监控是同一台服务器。
@Override protected int run(String[] args) throws Exception { //创建拓扑构建者 TopologyBuilder builder = new TopologyBuilder(); //创建并设置文件监控Spout,3个实例分配在3台不同的机器,同时只有一个实例采集 builder.setSpout("fileSpout", new CDCFileSpout(), 3); //创建并设置PG变更JSON解码Bolt,只会有一个Spout传递数据 builder.setBolt("decodeGroup", new CDCDecodeBlot(),1) .shuffleGrouping("fileSpout"); //基于Storm-Kafka插件,创建并设置Kafka Producer Bolt KafkaBolt<String, String> kafkaBolt = configureKafkaBolt(builder); //KafkaBolt以数据表名字段分组接收上游分流数据,可以由3个KafkaBolt发送数据 //每个KafkaBolt进程只发送所属“table”的数据到指定的分区(按Kafka Table名自定义分区) builder.setBolt("KafkaBolt", kafkaBolt, 3) .fieldsGrouping("decodeGroup", new Fields("table")); conf.setDebug(true); String topologyName = "PGSQLCDC"; //计算拓扑分配3个工作进程 conf.setNumWorkers(3); if (args != null && args.length > 0) { topologyName = args[0]; } return submit(topologyName, conf, builder); }
那么问题就来了,我的假设就是spout不用考虑采集点的目录位置,否则逆向根据storm集群分配好spout节点地址后才能进行pg监控,显然这是颠倒流程了,又试过moosefs和nfs,结果一样,nfs还不如分布式文件系统高效,当无路可走的时候,认为自己的假设即将失败的时候,一个新的思路开启了我的灵感,为什么非要spout只设置1个并行度呢?按照参与storm集群拓扑的工作数是3个,那就设置spout并行度为3,这样每一个机器就都会有一个spout监控本地GlusterFS挂载目录,那么无论我的pg cdc输出程序是在哪个节点启动,同时只会有一个spout感应到副本变化开始推送数据,其他都是wait,这样就解决了问题,同样也保证了即便是换一个节点进行pg cdc文件输出,前一个spout实例自然wait,新的spout就工作了,依然完美地保证了pg cdc程序与spout的可靠性冗余。