正文
一、业务场景
在实际生产过程中,由于网络原因或者Redis故障等等原因会导致Redis与数据库数据不一致的问题,这个时候我们怎么办?或许我们可以写一个定时Job然后定时去跑数据,或者找到那条数据不一致,然后删除Redis中的缓存,但是这些都有缺点,定时任务跑数据,延时性太大了。如果是删除缓存,数据量少还可以,数据量多了呢,写脚本?那脚本执行时候一定会影响Redis性能。Canal这个开源框架可以很友好的解决我们上面这些问题。
二、Canal介绍
Canal是阿里巴巴的开源项目,其原理是使用数据库的主从复制。
当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x。
canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
canal 解析 binary log 对象(原始为 byte 流)
三、搭建Canal
传统方式
1、搭建mysql数据库
2、配置主数据库
[mysqld] log-bin=mysql-bin # 开启 binlog binlog-format=ROW # 选择 ROW 模式 server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复。 #授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限 CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; FLUSH PRIVILEGES;
3、搭建CanalServer
#下载canal安装包 [root@localhost ~] wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz
4、创建安装目录并解压文件
[root@localhost mysql]# mkdir -p /usr/local/canal [root@localhost ~]# tar -zxvf canal.deployer-1.1.5.tar.gz -C /usr/local/canal/
5、配置(基于RocketMQ)
Rocketmq安装
本人使用的是jdk17需要修改启动脚本startup.sh ,jdk8不需要修改,
压缩包下载: spring-boot: Springboot整合redis、消息中间件等相关代码 - Gitee.com
修改源码maven中的依赖,打包之后修改配合文件即可
<!-- https://mvnrepository.com/artifact/io.netty/netty-all --> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.70.Final</version> </dependency>
#启动脚本 #!/bin/bash current_path=`pwd` case "`uname`" in Linux) bin_abs_path=$(readlink -f $(dirname $0)) ;; *) bin_abs_path=`cd $(dirname $0); pwd` ;; esac base=${bin_abs_path}/.. canal_conf=$base/conf/canal.properties canal_local_conf=$base/conf/canal_local.properties logback_configurationFile=$base/conf/logback.xml export LANG=en_US.UTF-8 export BASE=$base if [ -f $base/bin/canal.pid ] ; then echo "found canal.pid , Please run stop.sh first ,then startup.sh" 2>&2 exit 1 fi if [ ! -d $base/logs/canal ] ; then mkdir -p $base/logs/canal fi ## set java path if [ -z "$JAVA" ] ; then JAVA=$(which java) fi ALIBABA_JAVA="/usr/alibaba/java/bin/java" TAOBAO_JAVA="/opt/taobao/java/bin/java" if [ -z "$JAVA" ]; then if [ -f $ALIBABA_JAVA ] ; then JAVA=$ALIBABA_JAVA elif [ -f $TAOBAO_JAVA ] ; then JAVA=$TAOBAO_JAVA else echo "Cannot find a Java JDK. Please set either set JAVA or put java (>=1.5) in your PATH." 2>&2 exit 1 fi fi case "$#" in 0 ) ;; 1 ) var=$* if [ "$var" = "local" ]; then canal_conf=$canal_local_conf else if [ -f $var ] ; then canal_conf=$var else echo "THE PARAMETER IS NOT CORRECT.PLEASE CHECK AGAIN." exit fi fi;; 2 ) var=$1 if [ "$var" = "local" ]; then canal_conf=$canal_local_conf else if [ -f $var ] ; then canal_conf=$var else if [ "$1" = "debug" ]; then DEBUG_PORT=$2 DEBUG_SUSPEND="n" JAVA_DEBUG_OPT="-Xdebug -Xnoagent -Djava.compiler=NONE -Xrunjdwp:transport=dt_socket,address=$DEBUG_PORT,server=y,suspend=$DEBUG_SUSPEND" fi fi fi;; * ) echo "THE PARAMETERS MUST BE TWO OR LESS.PLEASE CHECK AGAIN." exit;; esac str=`file -L $JAVA | grep 64-bit` #if [ -n "$str" ]; then # JAVA_OPTS="-server -Xms2048m -Xmx3072m -Xmn1024m -XX:SurvivorRatio=2 -XX:PermSize=96m -XX:MaxPermSize=256m -Xss256k -XX:-UseAdaptiveSizePolicy -XX:MaxTenuringThreshold=15 -XX:+DisableExplicitGC -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:+UseCMSCompactAtFullCollection -XX:+UseFastAccessorMethods -XX:+UseCMSInitiatingOccupancyOnly -XX:+HeapDumpOnOutOfMemoryError" #else # JAVA_OPTS="-server -Xms1024m -Xmx1024m -XX:NewSize=256m -XX:MaxNewSize=256m -XX:MaxPermSize=128m " #fi if [ -n "$str" ]; then JAVA_OPTS="-server -Xms256m -Xmx256m -Xmn128m -XX:SurvivorRatio=2 -Xss256k -XX:-UseAdaptiveSizePolicy -XX:MaxTenuringThreshold=15 -XX:+DisableExplicitGC -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0 " else JAVA_OPTS="-server -Xms256m -Xmx256m -Xmn128m -XX:MaxNewSize=256m " fi JAVA_OPTS=" $JAVA_OPTS -Djava.awt.headless=true -Djava.net.preferIPv4Stack=true -Dfile.encoding=UTF-8" CANAL_OPTS="-DappName=otter-canal -Dlogback.configurationFile=$logback_configurationFile -Dcanal.conf=$canal_conf" if [ -e $canal_conf -a -e $logback_configurationFile ] then for i in $base/lib/*; do CLASSPATH=$i:"$CLASSPATH"; done CLASSPATH="$base/conf:$CLASSPATH"; echo "cd to $bin_abs_path for workaround relative path" cd $bin_abs_path echo LOG CONFIGURATION : $logback_configurationFile echo canal conf : $canal_conf echo CLASSPATH :$CLASSPATH $JAVA $JAVA_OPTS $JAVA_DEBUG_OPT $CANAL_OPTS -classpath .:$CLASSPATH com.alibaba.otter.canal.deployer.CanalLauncher 1>>$base/logs/canal/canal_stdout.log 2>&1 & echo $! > $base/bin/canal.pid echo "cd to $current_path for continue" cd $current_path else echo "canal conf("$canal_conf") OR log configration file($logback_configurationFile) is not exist,please create then first!" fi
canal.properties
################################################# ######### common argument ############# ################################################# # tcp bind ip canal.ip = # register ip to zookeeper canal.register.ip = canal.port = 11111 canal.metrics.pull.port = 11112 # canal instance user/passwd # canal.user = canal # canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458 # canal admin config #canal.admin.manager = 127.0.0.1:8089 canal.admin.port = 11110 canal.admin.user = admin canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441 # admin auto register #canal.admin.register.auto = true #canal.admin.register.cluster = #canal.admin.register.name = #集群使用 canal.zkServers = # flush data to zk canal.zookeeper.flush.period = 1000 canal.withoutNetty = false # tcp, kafka, rocketMQ, rabbitMQ #需要修改的地方 canal.serverMode = rocketMQ # flush meta cursor/parse position to file canal.file.data.dir = ${canal.conf.dir} canal.file.flush.period = 1000 ## memory store RingBuffer size, should be Math.pow(2,n) canal.instance.memory.buffer.size = 16384 ## memory store RingBuffer used memory unit size , default 1kb canal.instance.memory.buffer.memunit = 1024 ## meory store gets mode used MEMSIZE or ITEMSIZE canal.instance.memory.batch.mode = MEMSIZE canal.instance.memory.rawEntry = true ## detecing config canal.instance.detecting.enable = false #canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now() canal.instance.detecting.sql = select 1 canal.instance.detecting.interval.time = 3 canal.instance.detecting.retry.threshold = 3 canal.instance.detecting.heartbeatHaEnable = false # support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery canal.instance.transaction.size = 1024 # mysql fallback connected to new master should fallback times canal.instance.fallbackIntervalInSeconds = 60 # network config canal.instance.network.receiveBufferSize = 16384 canal.instance.network.sendBufferSize = 16384 canal.instance.network.soTimeout = 30 # binlog filter config canal.instance.filter.druid.ddl = true canal.instance.filter.query.dcl = false canal.instance.filter.query.dml = false canal.instance.filter.query.ddl = false canal.instance.filter.table.error = false canal.instance.filter.rows = false canal.instance.filter.transaction.entry = false canal.instance.filter.dml.insert = false canal.instance.filter.dml.update = false canal.instance.filter.dml.delete = false # binlog format/image check canal.instance.binlog.format = ROW,STATEMENT,MIXED canal.instance.binlog.image = FULL,MINIMAL,NOBLOB # binlog ddl isolation canal.instance.get.ddl.isolation = false # parallel parser config canal.instance.parser.parallel = true ## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors() #canal.instance.parser.parallelThreadSize = 16 ## disruptor ringbuffer size, must be power of 2 canal.instance.parser.parallelBufferSize = 256 # table meta tsdb info canal.instance.tsdb.enable = true canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:} canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL; canal.instance.tsdb.dbUsername = canal canal.instance.tsdb.dbPassword = canal # dump snapshot interval, default 24 hour canal.instance.tsdb.snapshot.interval = 24 # purge snapshot expire , default 360 hour(15 days) canal.instance.tsdb.snapshot.expire = 360 ################################################# ######### destinations ############# ################################################# #canal实例,对应example文件,如果需要多个请将example也复制多分canal.destinations = example,example1,example2 canal.destinations = example # conf root dir canal.conf.dir = ../conf # auto scan instance dir add/remove and start/stop instance canal.auto.scan = true canal.auto.scan.interval = 5 # set this value to 'true' means that when binlog pos not found, skip to latest. # WARN: pls keep 'false' in production env, or if you know what you want. canal.auto.reset.latest.pos.mode = false canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml #canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml canal.instance.global.mode = spring canal.instance.global.lazy = false canal.instance.global.manager.address = ${canal.admin.manager} #canal.instance.global.spring.xml = classpath:spring/memory-instance.xml canal.instance.global.spring.xml = classpath:spring/file-instance.xml #canal.instance.global.spring.xml = classpath:spring/default-instance.xml ################################################## ######### MQ Properties ############# ################################################## # aliyun ak/sk , support rds/mq canal.aliyun.accessKey = canal.aliyun.secretKey = canal.aliyun.uid= canal.mq.flatMessage = true canal.mq.canalBatchSize = 50 canal.mq.canalGetTimeout = 100 # Set this value to "cloud", if you want open message trace feature in aliyun. canal.mq.accessChannel = local canal.mq.database.hash = true canal.mq.send.thread.size = 30 canal.mq.build.thread.size = 8 ################################################## ######### Kafka ############# ################################################## kafka.bootstrap.servers = 127.0.0.1:9092 kafka.acks = all kafka.compression.type = none kafka.batch.size = 16384 kafka.linger.ms = 1 kafka.max.request.size = 1048576 kafka.buffer.memory = 33554432 kafka.max.in.flight.requests.per.connection = 1 kafka.retries = 0 kafka.kerberos.enable = false kafka.kerberos.krb5.file = "../conf/kerberos/krb5.conf" kafka.kerberos.jaas.file = "../conf/kerberos/jaas.conf" ################################################## ######### RocketMQ ############# ################################################## rocketmq.producer.group = test rocketmq.enable.message.trace = false rocketmq.customized.trace.topic = rocketmq.namespace = #需要修改的地方,多个值用分号隔开 rocketmq.namesrv.addr = 192.168.6.145:9876;192.168.6.145:9877 rocketmq.retry.times.when.send.failed = 0 rocketmq.vip.channel.enabled = false rocketmq.tag = canal ################################################## ######### RabbitMQ ############# ################################################## rabbitmq.host = rabbitmq.virtual.host = rabbitmq.exchange = rabbitmq.username = rabbitmq.password = rabbitmq.deliveryMode =
instance.properties
################################################# ## mysql serverId , v1.0.26+ will autoGen # canal.instance.mysql.slaveId=0 # enable gtid use true/false canal.instance.gtidon=false # position info #修改成自己的数据库连接 canal.instance.master.address=192.168.6.145:3306 canal.instance.master.journal.name= canal.instance.master.position= canal.instance.master.timestamp= canal.instance.master.gtid= # rds oss binlog canal.instance.rds.accesskey= canal.instance.rds.secretkey= canal.instance.rds.instanceId= # table meta tsdb info canal.instance.tsdb.enable=true #canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb #canal.instance.tsdb.dbUsername=canal #canal.instance.tsdb.dbPassword=canal #canal.instance.standby.address = #canal.instance.standby.journal.name = #canal.instance.standby.position = #canal.instance.standby.timestamp = #canal.instance.standby.gtid= # username/password #数据库连接账号和密码 canal.instance.dbUsername=canal canal.instance.dbPassword=canal canal.instance.connectionCharset = UTF-8 # enable druid Decrypt database password canal.instance.enableDruid=false #canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ== # table regex #mysql 数据解析关注的表,Perl正则表达式. #多个正则之间以逗号(,)分隔,转义符需要双斜杠(\\) #常见例子: #1. 所有表:.* or .*\\..* #2. canal schema下所有表: canal\\..* #3. canal下的以canal打头的表:canal\\.canal.* #4. canal schema下的一张表:canal\\.test1 #5. 多个规则组合使用:canal\\..*,mysql.test1,mysql.test2 (逗号分隔) canal.instance.filter.regex=.*\\..* # table black regex canal.instance.filter.black.regex=mysql\\.slave_.* # table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2) #canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch # table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2) #canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch # mq config #定义的topic主题 canal.mq.topic=canal-topic # dynamic topic route by schema or table regex #canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..* canal.mq.partition=0 # hash partition config #canal.mq.partitionsNum=3 #canal.mq.partitionHash=test.table:id^name,.*\\..* #canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6 #################################################
6、启动
#启动 sh bin/startup.sh #停止 sh bin/stop.sh
Docker方式
1、拉取镜像
docker pull canal/canal-server:v1.1.5
2、创建canal-docker目录存放启动脚本
[root@bogon ~]# mkdir /usr/local/canal-docker
#!/bin/bash function usage() { echo "Usage:" echo " run.sh [CONFIG]" echo "example 1 :" echo " run.sh -e canal.instance.master.address=127.0.0.1:3306 \\" echo " -e canal.instance.dbUsername=canal \\" echo " -e canal.instance.dbPassword=canal \\" echo " -e canal.instance.connectionCharset=UTF-8 \\" echo " -e canal.instance.tsdb.enable=true \\" echo " -e canal.instance.gtidon=false \\" echo " -e canal.instance.filter.regex=.*\\\\\\..* " echo "example 2 :" echo " run.sh -e canal.admin.manager=127.0.0.1:8089 \\" echo " -e canal.admin.port=11110 \\" echo " -e canal.admin.user=admin \\" echo " -e canal.admin.passwd=4ACFE3202A5FF5CF467898FC58AAB1D615029441" exit } function check_port() { local port=$1 local TL=$(which telnet) if [ -f $TL ]; then data=`echo quit | telnet 127.0.0.1 $port| grep -ic connected` echo $data return fi local NC=$(which nc) if [ -f $NC ]; then data=`nc -z -w 1 127.0.0.1 $port | grep -ic succeeded` echo $data return fi echo "0" return } function getMyIp() { case "`uname`" in Darwin) myip=`echo "show State:/Network/Global/IPv4" | scutil | grep PrimaryInterface | awk '{print $3}' | xargs ifconfig | grep inet | grep -v inet6 | awk '{print $2}'` ;; *) myip=`ip route get 1 | awk '{print $NF;exit}'` ;; esac echo $myip } CONFIG=${@:1} #VOLUMNS="-v $DATA:/home/admin/canal-server/logs" PORTLIST="11110 11111 11112 9100" PORTS="" for PORT in $PORTLIST ; do #exist=`check_port $PORT` exist="0" if [ "$exist" == "0" ]; then PORTS="$PORTS -p $PORT:$PORT" else echo "port $PORT is used , pls check" exit 1 fi done NET_MODE="" case "`uname`" in Darwin) bin_abs_path=`cd $(dirname $0); pwd` ;; Linux) bin_abs_path=$(readlink -f $(dirname $0)) NET_MODE="--net=host" PORTS="" ;; *) bin_abs_path=`cd $(dirname $0); pwd` NET_MODE="--net=host" PORTS="" ;; esac BASE=${bin_abs_path} DATA="$BASE/data" mkdir -p $DATA if [ $# -eq 0 ]; then usage elif [ "$1" == "-h" ] ; then usage elif [ "$1" == "help" ] ; then usage fi MEMORY="-m 4096m" LOCALHOST=`getMyIp` cmd="docker run -d -it -h $LOCALHOST $CONFIG --name=canal-server $VOLUMNS $NET_MODE $PORTS $MEMORY canal/canal-server:v1.1.5" echo $cmd eval $cmd
3、创建docker容器
sh run.sh -e canal.auto.scan=false \ -e canal.destinations=test \ -e canal.instance.master.address=127.0.0.1:3306 \ -e canal.instance.dbUsername=canal \ -e canal.instance.dbPassword=canal \ -e canal.instance.connectionCharset=UTF-8 \ -e canal.instance.tsdb.enable=true \ -e canal.instance.gtidon=false \ -e canal.serverMode=rocketMQ \ -e rocketmq.namesrv.addr=192.168.139.156:9876 \ -e rocketmq.producer.group=test \ -e canal.mq.topic=canal-topic
参数说明
canal.auto.scan=false #是否开启自动扫描
canal.destinations=test #对应配置文件中的canal.destinations=example
canal.instance.master.address=127.0.0.1:3306 #数据库链接地址
canal.instance.dbUsername=canal #数据库用户名
canal.instance.dbPassword=canal #数据库密码
canal.instance.connectionCharset=UTF-8 #编码格式
canal.instance.tsdb.enable=true #切换数据库
canal.instance.gtidon=false #没搞明白
canal.serverMode=rocketMQ #有tcp, kafka, rocketMQ, rabbitMQ
rocketmq.namesrv.addr=rocketmq.namesrv.addr=192.168.139.156:9876 #多个地址启动之后,连不上rocketmq,其他的mq没有尝试未知
rocketmq.producer.group=test #生产者组
canal.mq.topic=canal-topic#对应的topic
四、代码演示
package com.xiaojie.canal; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.xiaojie.entity.User; import com.xiaojie.utils.RedisUtil; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.apache.rocketmq.spring.annotation.ConsumeMode; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.annotation.SelectorType; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * @Description:解决redis与数据库数据不一致问题 * @author: yan * @date: 2021.11.25 */ @Component @RocketMQMessageListener(consumerGroup = "canal1", topic = "canal-topic", consumeMode = ConsumeMode.ORDERLY , selectorType = SelectorType.TAG, selectorExpression = "*") @Slf4j public class CanalConsumer implements RocketMQListener<String> { @Autowired private RedisUtil redisUtil; @Override public void onMessage(String message) { try { log.info("接收到的数据是:》》》》》》{}", message); JSONObject jsonObj = JSONObject.parseObject(message); JSONArray jsonArray = (JSONArray) jsonObj.get("data"); if (null != jsonArray && jsonArray.size() > 0) { User user = JSONObject.parseObject(jsonArray.get(0).toString(), User.class); String database = jsonObj.getString("database"); String table = jsonObj.getString("table"); String type = jsonObj.getString("type"); if (StringUtils.isEmpty(type)) { log.info("不用同步数据。。。。。。。。。。。。。。"); } if (type.equals("INSERT") || type.equals("UPDATE")) { log.info("更新数据》》》》》》》》》》》》"); redisUtil.set(database + "_" + table + "_" + user.getId(), JSONObject.toJSONString(user), 24 * 60 * 60); } else { if (redisUtil.hasKey(database + "_" + table + "_" + user.getId())) { log.info("删除数据》》》》》》》》》》》》"); redisUtil.del(database + "_" + table + "_" + user.getId()); } } } } catch (Exception e) { log.info("数据更新异常》》》》》》》》》》》》", e); } } }
完整代码:spring-boot: Springboot整合redis、消息中间件等相关代码