基于canal 解决Redis与数据库数据不一致问题

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
简介: 基于canal 解决Redis与数据库数据不一致问题

正文


一、业务场景


       在实际生产过程中,由于网络原因或者Redis故障等等原因会导致Redis与数据库数据不一致的问题,这个时候我们怎么办?或许我们可以写一个定时Job然后定时去跑数据,或者找到那条数据不一致,然后删除Redis中的缓存,但是这些都有缺点,定时任务跑数据,延时性太大了。如果是删除缓存,数据量少还可以,数据量多了呢,写脚本?那脚本执行时候一定会影响Redis性能。Canal这个开源框架可以很友好的解决我们上面这些问题。


二、Canal介绍


Canal是阿里巴巴的开源项目,其原理是使用数据库的主从复制。


当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x。


canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议

MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )

canal 解析 binary log 对象(原始为 byte 流)


111.png


三、搭建Canal


传统方式


1、搭建mysql数据库


2、配置主数据库


[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复。 
#授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限
CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;


3、搭建CanalServer


#下载canal安装包
[root@localhost ~] wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz


4、创建安装目录并解压文件


[root@localhost mysql]# mkdir -p /usr/local/canal
[root@localhost ~]# tar -zxvf canal.deployer-1.1.5.tar.gz -C /usr/local/canal/

5、配置(基于RocketMQ)


Rocketmq安装


本人使用的是jdk17需要修改启动脚本startup.sh ,jdk8不需要修改,

压缩包下载: spring-boot: Springboot整合redis、消息中间件等相关代码 - Gitee.com

修改源码maven中的依赖,打包之后修改配合文件即可


 <!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
<dependency>
     <groupId>io.netty</groupId>
     <artifactId>netty-all</artifactId>
     <version>4.1.70.Final</version>
</dependency>


#启动脚本
#!/bin/bash 
current_path=`pwd`
case "`uname`" in
    Linux)
    bin_abs_path=$(readlink -f $(dirname $0))
    ;;
  *)
    bin_abs_path=`cd $(dirname $0); pwd`
    ;;
esac
base=${bin_abs_path}/..
canal_conf=$base/conf/canal.properties
canal_local_conf=$base/conf/canal_local.properties
logback_configurationFile=$base/conf/logback.xml
export LANG=en_US.UTF-8
export BASE=$base
if [ -f $base/bin/canal.pid ] ; then
  echo "found canal.pid , Please run stop.sh first ,then startup.sh" 2>&2
    exit 1
fi
if [ ! -d $base/logs/canal ] ; then 
  mkdir -p $base/logs/canal
fi
## set java path
if [ -z "$JAVA" ] ; then
  JAVA=$(which java)
fi
ALIBABA_JAVA="/usr/alibaba/java/bin/java"
TAOBAO_JAVA="/opt/taobao/java/bin/java"
if [ -z "$JAVA" ]; then
  if [ -f $ALIBABA_JAVA ] ; then
    JAVA=$ALIBABA_JAVA
  elif [ -f $TAOBAO_JAVA ] ; then
    JAVA=$TAOBAO_JAVA
  else
    echo "Cannot find a Java JDK. Please set either set JAVA or put java (>=1.5) in your PATH." 2>&2
    exit 1
  fi
fi
case "$#" 
in
0 ) 
  ;;
1 ) 
  var=$*
  if [ "$var" = "local" ]; then
    canal_conf=$canal_local_conf
  else
    if [ -f $var ] ; then 
      canal_conf=$var
    else
      echo "THE PARAMETER IS NOT CORRECT.PLEASE CHECK AGAIN."
      exit
    fi
  fi;;
2 ) 
  var=$1
  if [ "$var" = "local" ]; then
    canal_conf=$canal_local_conf
  else
    if [ -f $var ] ; then
      canal_conf=$var
    else 
      if [ "$1" = "debug" ]; then
        DEBUG_PORT=$2
        DEBUG_SUSPEND="n"
        JAVA_DEBUG_OPT="-Xdebug -Xnoagent -Djava.compiler=NONE -Xrunjdwp:transport=dt_socket,address=$DEBUG_PORT,server=y,suspend=$DEBUG_SUSPEND"
      fi
    fi
     fi;;
* )
  echo "THE PARAMETERS MUST BE TWO OR LESS.PLEASE CHECK AGAIN."
  exit;;
esac
str=`file -L $JAVA | grep 64-bit`
#if [ -n "$str" ]; then
# JAVA_OPTS="-server -Xms2048m -Xmx3072m -Xmn1024m -XX:SurvivorRatio=2 -XX:PermSize=96m -XX:MaxPermSize=256m -Xss256k -XX:-UseAdaptiveSizePolicy -XX:MaxTenuringThreshold=15 -XX:+DisableExplicitGC -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:+UseCMSCompactAtFullCollection -XX:+UseFastAccessorMethods -XX:+UseCMSInitiatingOccupancyOnly -XX:+HeapDumpOnOutOfMemoryError"
#else
# JAVA_OPTS="-server -Xms1024m -Xmx1024m -XX:NewSize=256m -XX:MaxNewSize=256m -XX:MaxPermSize=128m "
#fi
if [ -n "$str" ]; then
  JAVA_OPTS="-server -Xms256m -Xmx256m -Xmn128m -XX:SurvivorRatio=2  -Xss256k -XX:-UseAdaptiveSizePolicy -XX:MaxTenuringThreshold=15 -XX:+DisableExplicitGC -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0 "
else
  JAVA_OPTS="-server -Xms256m -Xmx256m -Xmn128m -XX:MaxNewSize=256m  "
fi
JAVA_OPTS=" $JAVA_OPTS -Djava.awt.headless=true -Djava.net.preferIPv4Stack=true -Dfile.encoding=UTF-8"
CANAL_OPTS="-DappName=otter-canal -Dlogback.configurationFile=$logback_configurationFile -Dcanal.conf=$canal_conf"
if [ -e $canal_conf -a -e $logback_configurationFile ]
then 
  for i in $base/lib/*;
    do CLASSPATH=$i:"$CLASSPATH";
  done
  CLASSPATH="$base/conf:$CLASSPATH";
  echo "cd to $bin_abs_path for workaround relative path"
    cd $bin_abs_path
  echo LOG CONFIGURATION : $logback_configurationFile
  echo canal conf : $canal_conf 
  echo CLASSPATH :$CLASSPATH
  $JAVA $JAVA_OPTS $JAVA_DEBUG_OPT $CANAL_OPTS -classpath .:$CLASSPATH com.alibaba.otter.canal.deployer.CanalLauncher 1>>$base/logs/canal/canal_stdout.log 2>&1 &
  echo $! > $base/bin/canal.pid 
  echo "cd to $current_path for continue"
    cd $current_path
else 
  echo "canal conf("$canal_conf") OR log configration file($logback_configurationFile) is not exist,please create then first!"
fi


canal.properties


#################################################
#########     common argument   #############
#################################################
# tcp bind ip
canal.ip =
# register ip to zookeeper
canal.register.ip =
canal.port = 11111
canal.metrics.pull.port = 11112
# canal instance user/passwd
# canal.user = canal
# canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458
# canal admin config
#canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
#canal.admin.register.auto = true
#canal.admin.register.cluster =
#canal.admin.register.name =
#集群使用
canal.zkServers =
# flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# tcp, kafka, rocketMQ, rabbitMQ
#需要修改的地方
canal.serverMode = rocketMQ
# flush meta cursor/parse position to file
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
## memory store RingBuffer size, should be Math.pow(2,n)
canal.instance.memory.buffer.size = 16384
## memory store RingBuffer used memory unit size , default 1kb
canal.instance.memory.buffer.memunit = 1024 
## meory store gets mode used MEMSIZE or ITEMSIZE
canal.instance.memory.batch.mode = MEMSIZE
canal.instance.memory.rawEntry = true
## detecing config
canal.instance.detecting.enable = false
#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
canal.instance.detecting.sql = select 1
canal.instance.detecting.interval.time = 3
canal.instance.detecting.retry.threshold = 3
canal.instance.detecting.heartbeatHaEnable = false
# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
canal.instance.transaction.size =  1024
# mysql fallback connected to new master should fallback times
canal.instance.fallbackIntervalInSeconds = 60
# network config
canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30
# binlog filter config
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false
canal.instance.filter.dml.insert = false
canal.instance.filter.dml.update = false
canal.instance.filter.dml.delete = false
# binlog format/image check
canal.instance.binlog.format = ROW,STATEMENT,MIXED 
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB
# binlog ddl isolation
canal.instance.get.ddl.isolation = false
# parallel parser config
canal.instance.parser.parallel = true
## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
#canal.instance.parser.parallelThreadSize = 16
## disruptor ringbuffer size, must be power of 2
canal.instance.parser.parallelBufferSize = 256
# table meta tsdb info
canal.instance.tsdb.enable = true
canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername = canal
canal.instance.tsdb.dbPassword = canal
# dump snapshot interval, default 24 hour
canal.instance.tsdb.snapshot.interval = 24
# purge snapshot expire , default 360 hour(15 days)
canal.instance.tsdb.snapshot.expire = 360
#################################################
#########     destinations    #############
#################################################
#canal实例,对应example文件,如果需要多个请将example也复制多分canal.destinations = example,example1,example2
canal.destinations = example
# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5
# set this value to 'true' means that when binlog pos not found, skip to latest.
# WARN: pls keep 'false' in production env, or if you know what you want.
canal.auto.reset.latest.pos.mode = false
canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml
canal.instance.global.mode = spring
canal.instance.global.lazy = false
canal.instance.global.manager.address = ${canal.admin.manager}
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
#canal.instance.global.spring.xml = classpath:spring/default-instance.xml
##################################################
#########         MQ Properties      #############
##################################################
# aliyun ak/sk , support rds/mq
canal.aliyun.accessKey =
canal.aliyun.secretKey =
canal.aliyun.uid=
canal.mq.flatMessage = true
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
# Set this value to "cloud", if you want open message trace feature in aliyun.
canal.mq.accessChannel = local
canal.mq.database.hash = true
canal.mq.send.thread.size = 30
canal.mq.build.thread.size = 8
##################################################
#########          Kafka         #############
##################################################
kafka.bootstrap.servers = 127.0.0.1:9092
kafka.acks = all
kafka.compression.type = none
kafka.batch.size = 16384
kafka.linger.ms = 1
kafka.max.request.size = 1048576
kafka.buffer.memory = 33554432
kafka.max.in.flight.requests.per.connection = 1
kafka.retries = 0
kafka.kerberos.enable = false
kafka.kerberos.krb5.file = "../conf/kerberos/krb5.conf"
kafka.kerberos.jaas.file = "../conf/kerberos/jaas.conf"
##################################################
#########         RocketMQ       #############
##################################################
rocketmq.producer.group = test
rocketmq.enable.message.trace = false
rocketmq.customized.trace.topic =
rocketmq.namespace =
#需要修改的地方,多个值用分号隔开
rocketmq.namesrv.addr = 192.168.6.145:9876;192.168.6.145:9877
rocketmq.retry.times.when.send.failed = 0
rocketmq.vip.channel.enabled = false
rocketmq.tag = canal
##################################################
#########         RabbitMQ       #############
##################################################
rabbitmq.host =
rabbitmq.virtual.host =
rabbitmq.exchange =
rabbitmq.username =
rabbitmq.password =
rabbitmq.deliveryMode =


instance.properties


#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0
# enable gtid use true/false
canal.instance.gtidon=false
# position info
#修改成自己的数据库连接
canal.instance.master.address=192.168.6.145:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=
# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=
# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=
# username/password
#数据库连接账号和密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
# table regex    
#mysql 数据解析关注的表,Perl正则表达式.
#多个正则之间以逗号(,)分隔,转义符需要双斜杠(\\)
#常见例子:
#1.  所有表:.*   or  .*\\..*
#2.  canal schema下所有表: canal\\..*
#3.  canal下的以canal打头的表:canal\\.canal.*
#4.  canal schema下的一张表:canal\\.test1
#5.  多个规则组合使用:canal\\..*,mysql.test1,mysql.test2 (逗号分隔)
canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch
# mq config
#定义的topic主题
canal.mq.topic=canal-topic
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
#################################################


6、启动


#启动
sh bin/startup.sh
#停止
sh bin/stop.sh



Docker方式


1、拉取镜像


docker pull canal/canal-server:v1.1.5


2、创建canal-docker目录存放启动脚本


[root@bogon ~]# mkdir /usr/local/canal-docker


#!/bin/bash
function usage() {
    echo "Usage:"
    echo "  run.sh [CONFIG]"
    echo "example 1 :"
    echo "  run.sh -e canal.instance.master.address=127.0.0.1:3306 \\"
    echo "         -e canal.instance.dbUsername=canal \\"
    echo "         -e canal.instance.dbPassword=canal \\"
    echo "         -e canal.instance.connectionCharset=UTF-8 \\"
    echo "         -e canal.instance.tsdb.enable=true \\"
    echo "         -e canal.instance.gtidon=false \\"
    echo "         -e canal.instance.filter.regex=.*\\\\\\..* "
    echo "example 2 :"
    echo "  run.sh -e canal.admin.manager=127.0.0.1:8089 \\"
    echo "         -e canal.admin.port=11110 \\"
    echo "         -e canal.admin.user=admin \\"
    echo "         -e canal.admin.passwd=4ACFE3202A5FF5CF467898FC58AAB1D615029441"
    exit
}
function check_port() {
    local port=$1
    local TL=$(which telnet)
    if [ -f $TL ]; then
        data=`echo quit | telnet 127.0.0.1 $port| grep -ic connected`
        echo $data
        return
    fi
    local NC=$(which nc)
    if [ -f $NC ]; then
        data=`nc -z -w 1 127.0.0.1 $port | grep -ic succeeded`
        echo $data
        return
    fi
    echo "0"
    return
}
function getMyIp() {
    case "`uname`" in
        Darwin)
         myip=`echo "show State:/Network/Global/IPv4" | scutil | grep PrimaryInterface | awk '{print $3}' | xargs ifconfig | grep inet | grep -v inet6 | awk '{print $2}'`
         ;;
        *)
         myip=`ip route get 1 | awk '{print $NF;exit}'`
         ;;
  esac
  echo $myip
}
CONFIG=${@:1}
#VOLUMNS="-v $DATA:/home/admin/canal-server/logs"
PORTLIST="11110 11111 11112 9100"
PORTS=""
for PORT in $PORTLIST ; do
    #exist=`check_port $PORT`
    exist="0"
    if [ "$exist" == "0" ]; then
        PORTS="$PORTS -p $PORT:$PORT"
    else
        echo "port $PORT is used , pls check"
        exit 1
    fi
done
NET_MODE=""
case "`uname`" in
    Darwin)
        bin_abs_path=`cd $(dirname $0); pwd`
        ;;
    Linux)
        bin_abs_path=$(readlink -f $(dirname $0))
        NET_MODE="--net=host"
        PORTS=""
        ;;
    *)
        bin_abs_path=`cd $(dirname $0); pwd`
        NET_MODE="--net=host"
        PORTS=""
        ;;
esac
BASE=${bin_abs_path}
DATA="$BASE/data"
mkdir -p $DATA
if [ $# -eq 0 ]; then
    usage
elif [ "$1" == "-h" ] ; then
    usage
elif [ "$1" == "help" ] ; then
    usage
fi
MEMORY="-m 4096m"
LOCALHOST=`getMyIp`
cmd="docker run -d -it -h $LOCALHOST $CONFIG --name=canal-server $VOLUMNS $NET_MODE $PORTS $MEMORY canal/canal-server:v1.1.5"
echo $cmd
eval $cmd

3、创建docker容器


sh run.sh -e canal.auto.scan=false \
    -e canal.destinations=test \
    -e canal.instance.master.address=127.0.0.1:3306  \
    -e canal.instance.dbUsername=canal  \
    -e canal.instance.dbPassword=canal  \
    -e canal.instance.connectionCharset=UTF-8 \
    -e canal.instance.tsdb.enable=true \
    -e canal.instance.gtidon=false  \
    -e canal.serverMode=rocketMQ \
    -e rocketmq.namesrv.addr=192.168.139.156:9876 \
    -e rocketmq.producer.group=test \
    -e canal.mq.topic=canal-topic 


参数说明


canal.auto.scan=false   #是否开启自动扫描

canal.destinations=test #对应配置文件中的canal.destinations=example

canal.instance.master.address=127.0.0.1:3306  #数据库链接地址

canal.instance.dbUsername=canal #数据库用户名

canal.instance.dbPassword=canal  #数据库密码

canal.instance.connectionCharset=UTF-8 #编码格式

canal.instance.tsdb.enable=true #切换数据库

canal.instance.gtidon=false  #没搞明白

canal.serverMode=rocketMQ #有tcp, kafka, rocketMQ, rabbitMQ

rocketmq.namesrv.addr=rocketmq.namesrv.addr=192.168.139.156:9876 #多个地址启动之后,连不上rocketmq,其他的mq没有尝试未知

rocketmq.producer.group=test #生产者组

canal.mq.topic=canal-topic#对应的topic


四、代码演示


package com.xiaojie.canal;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.xiaojie.entity.User;
import com.xiaojie.utils.RedisUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
 * @Description:解决redis与数据库数据不一致问题
 * @author: yan
 * @date: 2021.11.25
 */
@Component
@RocketMQMessageListener(consumerGroup = "canal1", topic = "canal-topic", consumeMode = ConsumeMode.ORDERLY
        , selectorType = SelectorType.TAG, selectorExpression = "*")
@Slf4j
public class CanalConsumer implements RocketMQListener<String> {
    @Autowired
    private RedisUtil redisUtil;
    @Override
    public void onMessage(String message) {
        try {
            log.info("接收到的数据是:》》》》》》{}", message);
            JSONObject jsonObj = JSONObject.parseObject(message);
            JSONArray jsonArray = (JSONArray) jsonObj.get("data");
            if (null != jsonArray && jsonArray.size() > 0) {
                User user = JSONObject.parseObject(jsonArray.get(0).toString(), User.class);
                String database = jsonObj.getString("database");
                String table = jsonObj.getString("table");
                String type = jsonObj.getString("type");
                if (StringUtils.isEmpty(type)) {
                    log.info("不用同步数据。。。。。。。。。。。。。。");
                }
                if (type.equals("INSERT") || type.equals("UPDATE")) {
                    log.info("更新数据》》》》》》》》》》》》");
                    redisUtil.set(database + "_" + table + "_" + user.getId(), JSONObject.toJSONString(user), 24 * 60 * 60);
                } else {
                    if (redisUtil.hasKey(database + "_" + table + "_" + user.getId())) {
                        log.info("删除数据》》》》》》》》》》》》");
                        redisUtil.del(database + "_" + table + "_" + user.getId());
                    }
                }
            }
        } catch (Exception e) {
            log.info("数据更新异常》》》》》》》》》》》》", e);
        }
    }
}


完整代码:spring-boot: Springboot整合redis、消息中间件等相关代码


相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
2天前
|
数据采集 数据库 Python
有哪些方法可以验证用户输入数据的格式是否符合数据库的要求?
有哪些方法可以验证用户输入数据的格式是否符合数据库的要求?
48 31
|
2月前
|
存储 监控 数据处理
flink 向doris 数据库写入数据时出现背压如何排查?
本文介绍了如何确定和解决Flink任务向Doris数据库写入数据时遇到的背压问题。首先通过Flink Web UI和性能指标监控识别背压,然后从Doris数据库性能、网络连接稳定性、Flink任务数据处理逻辑及资源配置等方面排查原因,并通过分析相关日志进一步定位问题。
215 61
|
1天前
|
存储 缓存 NoSQL
云端问道21期方案教学-应对高并发,利用云数据库 Tair(兼容 Redis®*)缓存实现极速响应
云端问道21期方案教学-应对高并发,利用云数据库 Tair(兼容 Redis®*)缓存实现极速响应
|
15天前
|
SQL 存储 运维
从建模到运维:联犀如何完美融入时序数据库 TDengine 实现物联网数据流畅管理
本篇文章是“2024,我想和 TDengine 谈谈”征文活动的三等奖作品。文章从一个具体的业务场景出发,分析了企业在面对海量时序数据时的挑战,并提出了利用 TDengine 高效处理和存储数据的方法,帮助企业解决在数据采集、存储、分析等方面的痛点。通过这篇文章,作者不仅展示了自己对数据处理技术的理解,还进一步阐释了时序数据库在行业中的潜力与应用价值,为读者提供了很多实际的操作思路和技术选型的参考。
27 1
|
20天前
|
存储 Java easyexcel
招行面试:100万级别数据的Excel,如何秒级导入到数据库?
本文由40岁老架构师尼恩撰写,分享了应对招商银行Java后端面试绝命12题的经验。文章详细介绍了如何通过系统化准备,在面试中展示强大的技术实力。针对百万级数据的Excel导入难题,尼恩推荐使用阿里巴巴开源的EasyExcel框架,并结合高性能分片读取、Disruptor队列缓冲和高并发批量写入的架构方案,实现高效的数据处理。此外,文章还提供了完整的代码示例和配置说明,帮助读者快速掌握相关技能。建议读者参考《尼恩Java面试宝典PDF》进行系统化刷题,提升面试竞争力。关注公众号【技术自由圈】可获取更多技术资源和指导。
|
23天前
|
前端开发 JavaScript 数据库
获取数据库中字段的数据作为下拉框选项
获取数据库中字段的数据作为下拉框选项
52 5
|
23天前
|
缓存 NoSQL Redis
Redis经典问题:数据并发竞争
数据并发竞争是大流量系统(如火车票系统、微博平台)中常见的问题,可能导致用户体验下降甚至系统崩溃。本文介绍了两种解决方案:1) 加写回操作加互斥锁,查询失败快速返回默认值;2) 保持多个缓存备份,减少并发竞争概率。通过实践案例展示,成功提高了系统的稳定性和性能。
|
23天前
|
缓存 监控 NoSQL
Redis经典问题:数据不一致
在使用Redis时,缓存与数据库数据不一致会导致应用异常。主要原因包括缓存更新失败、Rehash异常等。解决方案有:重试机制、缩短缓存时间、优化写入策略、建立监控报警、定期验证一致性、采用缓存分层及数据回滚恢复机制。这些措施可确保数据最终一致性,提升应用稳定性和性能。
|
2月前
|
关系型数据库 MySQL 数据库
GBase 数据库如何像MYSQL一样存放多行数据
GBase 数据库如何像MYSQL一样存放多行数据
|
2月前
|
缓存 NoSQL 关系型数据库
Redis和Mysql如何保证数据⼀致?
在项目中,为了解决Redis与Mysql的数据一致性问题,我们采用了多种策略:对于低一致性要求的数据,不做特别处理;时效性数据通过设置缓存过期时间来减少不一致风险;高一致性但时效性要求不高的数据,利用MQ异步同步确保最终一致性;而对一致性和时效性都有高要求的数据,则采用分布式事务(如Seata TCC模式)来保障。
76 14

热门文章

最新文章