阿里云DTS同步binlog实战
首先在阿里云DTS订阅中配置你要订阅的库表,其次创建消费者组,最后就是写代码消费binlog即可。
一、创建的配置信息
从这里得到topic以及broker信息。
从这里得到消费者组ID。
二、写代码消费
- 创建消费类
import com.aliyun.dts.subscribe.clients.ConsumerContext; import com.aliyun.dts.subscribe.clients.DTSConsumer; import com.aliyun.dts.subscribe.clients.DefaultDTSConsumer; import com.aliyun.dts.subscribe.clients.common.RecordListener; import com.aliyun.dts.subscribe.clients.record.OperationType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.Collections; import java.util.Map; @Component public class DTSConsumerSubscribe { // 自己实现binlog解析组件 @Resource MysqlRecordConsumeListener mysqlRecordConsumeListener; private static final Logger log = LoggerFactory.getLogger(DTSConsumerSubscribe.class); private final DTSConsumer dtsConsumer; // DTS消费实例 public DTSConsumerSubscribe() { // kafka broker url String brokerUrl = "xxxxxxx"; // topic to consume, partition is 0 String topic = "xxxxxx"; String sid = "ccc"; // 消费者组ID String userName = "xxx"; // 消费者组名称 String password = "xxx"; // 消费者组密码 // initial checkpoint for first seek(a timestamp to set, eg 1566180200 if you want (Mon Aug 19 10:03:21 CST 2019)) String initCheckpoint = "1693567696"; // when use subscribe mode, group config is required. kafka consumer group is enabled ConsumerContext.ConsumerSubscribeMode subscribeMode = ConsumerContext.ConsumerSubscribeMode.ASSIGN; this.dtsConsumer = initDTSClient(brokerUrl, topic, sid, userName, password, initCheckpoint, subscribeMode); } public DTSConsumerSubscribe(String brokerUrl, String topic, String sid, String userName, String password, String checkpoint, ConsumerContext.ConsumerSubscribeMode subscribeMode) { this.dtsConsumer = initDTSClient(brokerUrl, topic, sid, userName, password, checkpoint, subscribeMode); } // 初始化DTS客户端 private DTSConsumer initDTSClient(String brokerUrl, String topic, String sid, String userName, String password, String initCheckpoint, ConsumerContext.ConsumerSubscribeMode subscribeMode) { ConsumerContext consumerContext = new ConsumerContext(brokerUrl, topic, sid, userName, password, initCheckpoint, subscribeMode); consumerContext.setForceUseCheckpoint(false); DTSConsumer dtsConsumer = new DefaultDTSConsumer(consumerContext); dtsConsumer.addRecordListeners(buildRecordListener()); return dtsConsumer; } // 构建binlog监听器 public Map<String, RecordListener> buildRecordListener() { // user can impl their own listener RecordListener mysqlRecordPrintListener = record -> { OperationType operationType = record.getOperationType(); if(operationType.equals(OperationType.INSERT) || operationType.equals(OperationType.UPDATE) || operationType.equals(OperationType.DELETE) || operationType.equals(OperationType.DDL)) { // consume record mysqlRecordConsumeListener.consume(record); record.commit(""); // 提交消费位点 } }; return Collections.singletonMap("mysqlRecordPrinter", mysqlRecordPrintListener); } // 开启消费 public void start() { System.out.println("Start DTS subscription client..."); dtsConsumer.start(); } // 关闭消费 public void stop() { System.out.println("Stop DTS subscription client"); dtsConsumer.close(); }
- 创建自己的解析器
@Component public class MysqlRecordConsumeListener implements RecordListener { private static final Logger log = LoggerFactory.getLogger(MysqlRecordConsumeListener.class); public void consume(DefaultUserRecord record) { OperationType operationType = record.getOperationType(); RecordSchema recordSchema = record.getSchema(); StringBuilder stringBuilder = new StringBuilder(); stringBuilder.append("\n").append("RecordID [").append(record.getId()).append("]\n").append("RecordTimestamp [").append(record.getSourceTimestamp()).append("] \n").append("Source [").append(recordSchema.getDatabaseInfo()).append("]\n").append("RecordType [").append(record.getOperationType()).append("]\n"); if (operationType.equals(OperationType.INSERT) || operationType.equals(OperationType.UPDATE) || operationType.equals(OperationType.DELETE) || operationType.equals(OperationType.DDL)) { stringBuilder.append("Schema info [").append(recordSchema.toString()).append("]\n").append("Before image {").append(record.getBeforeImage()).append("}\n").append("After image {").append(record.getAfterImage()).append("}\n"); parseRecord(operationType, record); } log.info(stringBuilder.toString()); } public void parseRecord(OperationType operationType, DefaultUserRecord record) { log.info("解析binlog record:{}", record); if (OperationType.INSERT.equals(operationType)) { // 新增只有After 对于After,取出自增字段id查询即可 log.info("=====INSERT getValues====="); } else if (OperationType.UPDATE.equals(operationType)) { // 修改有Before和After 取出自增字段id查询即可 log.info("=====UPDATE getValues====="); } else if (OperationType.DELETE.equals(operationType)) { // 删除只有Before 取出自增字段id查询即可 log.info("=====DELETE getValues====="); } }}
拿到binlog之后就可以轻松关联相关表做些操作了。大家可以看下UPDATE
的binlog的打印日志:
Before image {[Field [id] [4] Field [user_id] [1002316] Field [user_name] [ascacac] Field [goods_id] [1054] Field [course_id] [null] Field [course_name] [null] Field [current_status] [null] ]} After image {[Field [id] [4] Field [user_id] [1002316] Field [user_name] [jamlee] Field [goods_id] [1054] Field [course_id] [null] Field [course_name] [null] Field [current_status] [null] ]}
- 写task跑起来
@Component public class UserRightDtsSubscribe implements ApplicationRunner, DisposableBean { @Resource DTSConsumerSubscribe dtsConsumerSubscribe; @Override public void destroy() throws Exception { log.info("关闭用户订单数据同步..."); dtsConsumerSubscribe.stop(); } @Override public void run(ApplicationArguments args) throws Exception { log.info("启动用户订单数据同步..."); dtsConsumerSubscribe.start(); } }
三、总结
阿里云DTS这种订阅消费binlog方式非常方便,尤其在做增量数据同步或者修改等操作的时候。当然也可以通过别的方式同步binlog,比如canal等。但是binlog这种同步方式有缺点吗或者会导致哪些问题呢?