创建MQ消费者进行同步
在application.yml配置文件加上kafka的配置信息:
spring: kafka: # Kafka服务地址 bootstrap-servers: 127.0.0.1:9092 consumer: # 指定一个默认的组名 group-id: consumer-group1 #序列化反序列化 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer producer: key-serializer: org.apache.kafka.common.serialization.StringDeserializer value-serializer: org.apache.kafka.common.serialization.StringDeserializer # 批量抓取 batch-size: 65536 # 缓存容量 buffer-memory: 524288
根据上面Kafka消费命令那里,我们知道了json数据的结构,可以创建一个CanalBean对象进行接收:
public class CanalBean { //数据 private List<TbCommodityInfo> data; //数据库名称 private String database; private long es; //递增,从1开始 private int id; //是否是DDL语句 private boolean isDdl; //表结构的字段类型 private MysqlType mysqlType; //UPDATE语句,旧数据 private String old; //主键名称 private List<String> pkNames; //sql语句 private String sql; private SqlType sqlType; //表名 private String table; private long ts; //(新增)INSERT、(更新)UPDATE、(删除)DELETE、(删除表)ERASE等等 private String type; //getter、setter方法 } public class MysqlType { private String id; private String commodity_name; private String commodity_price; private String number; private String description; //getter、setter方法 } public class SqlType { private int id; private int commodity_name; private int commodity_price; private int number; private int description; }
最后就可以创建一个消费者CanalConsumer进行消费:
@Component public class CanalConsumer { //日志记录 private static Logger log = LoggerFactory.getLogger(CanalConsumer.class); //redis操作工具类 @Resource private RedisClient redisClient; //监听的队列名称为:canaltopic @KafkaListener(topics = "canaltopic") public void receive(ConsumerRecord<?, ?> consumer) { String value = (String) consumer.value(); log.info("topic名称:{},key:{},分区位置:{},下标:{},value:{}", consumer.topic(), consumer.key(),consumer.partition(), consumer.offset(), value); //转换为javaBean CanalBean canalBean = JSONObject.parseObject(value, CanalBean.class); //获取是否是DDL语句 boolean isDdl = canalBean.getIsDdl(); //获取类型 String type = canalBean.getType(); //不是DDL语句 if (!isDdl) { List<TbCommodityInfo> tbCommodityInfos = canalBean.getData(); //过期时间 long TIME_OUT = 600L; if ("INSERT".equals(type)) { //新增语句 for (TbCommodityInfo tbCommodityInfo : tbCommodityInfos) { String id = tbCommodityInfo.getId(); //新增到redis中,过期时间是10分钟 redisClient.setString(id, JSONObject.toJSONString(tbCommodityInfo), TIME_OUT); } } else if ("UPDATE".equals(type)) { //更新语句 for (TbCommodityInfo tbCommodityInfo : tbCommodityInfos) { String id = tbCommodityInfo.getId(); //更新到redis中,过期时间是10分钟 redisClient.setString(id, JSONObject.toJSONString(tbCommodityInfo), TIME_OUT); } } else { //删除语句 for (TbCommodityInfo tbCommodityInfo : tbCommodityInfos) { String id = tbCommodityInfo.getId(); //从redis中删除 redisClient.deleteKey(id); } } } } }
测试MySQL与Redis同步
mysql对应的表结构如下:
CREATE TABLE `tb_commodity_info` ( `id` varchar(32) NOT NULL, `commodity_name` varchar(512) DEFAULT NULL COMMENT '商品名称', `commodity_price` varchar(36) DEFAULT '0' COMMENT '商品价格', `number` int(10) DEFAULT '0' COMMENT '商品数量', `description` varchar(2048) DEFAULT '' COMMENT '商品描述', PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='商品信息表';
首先在MySQL创建表。然后启动项目,接着新增一条数据:
INSERT INTO `canaldb`.`tb_commodity_info` (`id`, `commodity_name`, `commodity_price`, `number`, `description`) VALUES ('3e71a81fd80711eaaed600163e046cc3', '叉包', '3.99', '3', '大叉包,老喜欢');
tb_commodity_info表查到新增的数据:
Redis也查到了对应的数据,证明同步成功!
如果更新呢?试一下Update语句:
UPDATE `canaldb`.`tb_commodity_info` SET `commodity_name`='青菜包',`description`='便宜的青菜包' WHERE `id`='3e71a81fd80711eaaed600163e046cc3';
没有问题!
总结
canal的缺点:
- canal只能同步增量数据。
- 不是实时同步,是准实时同步。
- 存在一些bug,不过社区活跃度较高,对于提出的bug能及时修复。
- MQ顺序性问题。
网的回答,大家参考一下
尽管有一些缺点,毕竟没有一样技术(产品)是完美的,合适最重要。