3.事件处理优化 MultiStageCoprocessor
我们前面说过,parallel为false的,是单线程交给sinkHandler处理,parallel为true的,交给MultiStageCoprocessor处理。
这里展开看看并行处理是如何实现的。
实现类是MysqlMultiStageCoprocessor, 看下基本结构,持有了EventTransactionBuffer(前文提到过的存储事务内多个evnet的buffer)、RingBuffer<MessageEvent>、几个线程池、两个BatchEventProcessor<MessageEvent>。
这些属性类型基本是跟Disruptor框架相关。
start()方法里面对一系列属性做了初始化配置并进行启动,要理解这里的逻辑,其实主要是使用Disruptor框架做的任务队列。
如果了解了Disruptor框架的使用,就能明白这里所做的任务队列处理模型了。
start()源码如下:
首先,这里用了Disruptor框架的典型单生产者-多消费者模型。
这里创建生产者的时候,就创建了RingBuffer和Sequencer,全局唯一。
上面在dump方法内,订阅到binlog事件后,通过multiStageCoprocessor的publish方法写入RingBuffer,作为单一的生产者。
多消费者主要通过Disruptor的Sequencer管理。
Sequencer 接口有两种实现,SingleProducerSequencer 和 MultiProducerSequencer,分别来处理单个生产者和多个生产者的情况,这里使用了SingleProducerSequencer。
在 Sequencer 中有一个 next() 方法,就是这个方法来产生 Sequence 中的 value。Sequence 本质上可以认为是一个 AtomicLong,消费者和生产者都会维护自己的 Sequence。
Sequencer 的核心就是解决了两个问题,第一个是对于所有的消费者,在 RingBuffer 为空时,就不能再从中取数据,对于生产者,新生产的内容不能把未消费的数据覆盖掉。
上图中 C 代表消费者,P 代表生产者。
当然,在多消费者模型中,一个关键的问题是控制消费者的消费顺序。
这里主要通过消费者之间控制依赖关系其实就是控制 sequence 的大小,如果说 C2 消费者 依赖 C1,那就表示 C2 中 Sequence 的值一定小于等于 C1 的 Sequence。
具体的方法是通过RingBuffer的addGatingSequences( )进行的。
具体Disruptor的原理和使用就不展开说明了,这里了解这些关键问题即可。
通过这样的编程模型,parser实现了解析器的多阶段顺序处理。
- Stage1: 网络接收 (单线程),publish投递到RingBuffer
- Stage2: 从RingBuffe获取事件,使用SimpleParserStage进行基本解析 (单线程,事件类型、DDL解析构造TableMeta、维护位点信息)
- Stage3: 事件深度解析 ,用workpool进行多线程, 使用DmlParserStage进行DML事件数据的完整解析
- Stage4: SinkStoreStage单线程投递到store
SimpleParserStage和SinkStoreStage使用了stageExecutor这个线程池进行管理,DmlParserStage使用了workpool进行管理。
这三个类都是MysqlMultiStageCoprocessor的内部类,通过实现OnEvent方法进行逻辑处理,具体处理逻辑就不展开了,大家有兴趣可以看下源码。
4.总结
这个模块是非常核心的,涉及到了对binlog事件的抓取和处理,以及相关位点信息的处理。
回头看开头几个问题,相信也都有了答案:
- 如何抓取binlog
dump方法在MysqlConnection类中实现,主要就是把自己注册到数据库作为一个slave,然后获取binlog变更(具体的协议我们就不展开分析了)。
- 对binlog消息处理做了怎样的性能优化
利用disruptor框架,基于RingBuffer实现了
单线程接受 -> 单线程解析事件 -> 多线程深度解析事件 -> 单线程投递store 这样的一个流程。
(这里有点疑惑,单线程接受事件后,为什么需要一个单线程先解析一下再多线程深度解析,而不是直接多线程深度解析?有了解的朋友可以给我留言指点一下,谢谢)
- 如何控制位点信息
有多种CanalLogPositionManager可以选择。
默认采用FailbackLogPositionManager,获取位点信息时,先尝试从内存memory中找到lastest position,如果不存在才尝试找一下zookeeper里的位点信息。
- 如何兼容阿里云RDS的高可用模式下的主备切换问题
RdsBinlogEventParserProxy如果发现了PositionNotFoundException异常,就委托rdsLocalBinlogEventParser通过下载binlog的oss备份,找到目标binlog文件和位置。