基于canal 解决Redis与数据库数据不一致问题

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
云数据库 Tair(兼容Redis),内存型 2GB
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
简介: 基于canal 解决Redis与数据库数据不一致问题

正文


一、业务场景


       在实际生产过程中,由于网络原因或者Redis故障等等原因会导致Redis与数据库数据不一致的问题,这个时候我们怎么办?或许我们可以写一个定时Job然后定时去跑数据,或者找到那条数据不一致,然后删除Redis中的缓存,但是这些都有缺点,定时任务跑数据,延时性太大了。如果是删除缓存,数据量少还可以,数据量多了呢,写脚本?那脚本执行时候一定会影响Redis性能。Canal这个开源框架可以很友好的解决我们上面这些问题。


二、Canal介绍


Canal是阿里巴巴的开源项目,其原理是使用数据库的主从复制。


当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x。


canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议

MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )

canal 解析 binary log 对象(原始为 byte 流)


111.png


三、搭建Canal


传统方式


1、搭建mysql数据库


2、配置主数据库


[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复。 
#授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限
CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;


3、搭建CanalServer


#下载canal安装包
[root@localhost ~] wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz


4、创建安装目录并解压文件


[root@localhost mysql]# mkdir -p /usr/local/canal
[root@localhost ~]# tar -zxvf canal.deployer-1.1.5.tar.gz -C /usr/local/canal/

5、配置(基于RocketMQ)


Rocketmq安装


本人使用的是jdk17需要修改启动脚本startup.sh ,jdk8不需要修改,

压缩包下载: spring-boot: Springboot整合redis、消息中间件等相关代码 - Gitee.com

修改源码maven中的依赖,打包之后修改配合文件即可


 <!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
<dependency>
     <groupId>io.netty</groupId>
     <artifactId>netty-all</artifactId>
     <version>4.1.70.Final</version>
</dependency>


#启动脚本
#!/bin/bash 
current_path=`pwd`
case "`uname`" in
    Linux)
    bin_abs_path=$(readlink -f $(dirname $0))
    ;;
  *)
    bin_abs_path=`cd $(dirname $0); pwd`
    ;;
esac
base=${bin_abs_path}/..
canal_conf=$base/conf/canal.properties
canal_local_conf=$base/conf/canal_local.properties
logback_configurationFile=$base/conf/logback.xml
export LANG=en_US.UTF-8
export BASE=$base
if [ -f $base/bin/canal.pid ] ; then
  echo "found canal.pid , Please run stop.sh first ,then startup.sh" 2>&2
    exit 1
fi
if [ ! -d $base/logs/canal ] ; then 
  mkdir -p $base/logs/canal
fi
## set java path
if [ -z "$JAVA" ] ; then
  JAVA=$(which java)
fi
ALIBABA_JAVA="/usr/alibaba/java/bin/java"
TAOBAO_JAVA="/opt/taobao/java/bin/java"
if [ -z "$JAVA" ]; then
  if [ -f $ALIBABA_JAVA ] ; then
    JAVA=$ALIBABA_JAVA
  elif [ -f $TAOBAO_JAVA ] ; then
    JAVA=$TAOBAO_JAVA
  else
    echo "Cannot find a Java JDK. Please set either set JAVA or put java (>=1.5) in your PATH." 2>&2
    exit 1
  fi
fi
case "$#" 
in
0 ) 
  ;;
1 ) 
  var=$*
  if [ "$var" = "local" ]; then
    canal_conf=$canal_local_conf
  else
    if [ -f $var ] ; then 
      canal_conf=$var
    else
      echo "THE PARAMETER IS NOT CORRECT.PLEASE CHECK AGAIN."
      exit
    fi
  fi;;
2 ) 
  var=$1
  if [ "$var" = "local" ]; then
    canal_conf=$canal_local_conf
  else
    if [ -f $var ] ; then
      canal_conf=$var
    else 
      if [ "$1" = "debug" ]; then
        DEBUG_PORT=$2
        DEBUG_SUSPEND="n"
        JAVA_DEBUG_OPT="-Xdebug -Xnoagent -Djava.compiler=NONE -Xrunjdwp:transport=dt_socket,address=$DEBUG_PORT,server=y,suspend=$DEBUG_SUSPEND"
      fi
    fi
     fi;;
* )
  echo "THE PARAMETERS MUST BE TWO OR LESS.PLEASE CHECK AGAIN."
  exit;;
esac
str=`file -L $JAVA | grep 64-bit`
#if [ -n "$str" ]; then
# JAVA_OPTS="-server -Xms2048m -Xmx3072m -Xmn1024m -XX:SurvivorRatio=2 -XX:PermSize=96m -XX:MaxPermSize=256m -Xss256k -XX:-UseAdaptiveSizePolicy -XX:MaxTenuringThreshold=15 -XX:+DisableExplicitGC -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:+UseCMSCompactAtFullCollection -XX:+UseFastAccessorMethods -XX:+UseCMSInitiatingOccupancyOnly -XX:+HeapDumpOnOutOfMemoryError"
#else
# JAVA_OPTS="-server -Xms1024m -Xmx1024m -XX:NewSize=256m -XX:MaxNewSize=256m -XX:MaxPermSize=128m "
#fi
if [ -n "$str" ]; then
  JAVA_OPTS="-server -Xms256m -Xmx256m -Xmn128m -XX:SurvivorRatio=2  -Xss256k -XX:-UseAdaptiveSizePolicy -XX:MaxTenuringThreshold=15 -XX:+DisableExplicitGC -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0 "
else
  JAVA_OPTS="-server -Xms256m -Xmx256m -Xmn128m -XX:MaxNewSize=256m  "
fi
JAVA_OPTS=" $JAVA_OPTS -Djava.awt.headless=true -Djava.net.preferIPv4Stack=true -Dfile.encoding=UTF-8"
CANAL_OPTS="-DappName=otter-canal -Dlogback.configurationFile=$logback_configurationFile -Dcanal.conf=$canal_conf"
if [ -e $canal_conf -a -e $logback_configurationFile ]
then 
  for i in $base/lib/*;
    do CLASSPATH=$i:"$CLASSPATH";
  done
  CLASSPATH="$base/conf:$CLASSPATH";
  echo "cd to $bin_abs_path for workaround relative path"
    cd $bin_abs_path
  echo LOG CONFIGURATION : $logback_configurationFile
  echo canal conf : $canal_conf 
  echo CLASSPATH :$CLASSPATH
  $JAVA $JAVA_OPTS $JAVA_DEBUG_OPT $CANAL_OPTS -classpath .:$CLASSPATH com.alibaba.otter.canal.deployer.CanalLauncher 1>>$base/logs/canal/canal_stdout.log 2>&1 &
  echo $! > $base/bin/canal.pid 
  echo "cd to $current_path for continue"
    cd $current_path
else 
  echo "canal conf("$canal_conf") OR log configration file($logback_configurationFile) is not exist,please create then first!"
fi


canal.properties


#################################################
#########     common argument   #############
#################################################
# tcp bind ip
canal.ip =
# register ip to zookeeper
canal.register.ip =
canal.port = 11111
canal.metrics.pull.port = 11112
# canal instance user/passwd
# canal.user = canal
# canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458
# canal admin config
#canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
#canal.admin.register.auto = true
#canal.admin.register.cluster =
#canal.admin.register.name =
#集群使用
canal.zkServers =
# flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# tcp, kafka, rocketMQ, rabbitMQ
#需要修改的地方
canal.serverMode = rocketMQ
# flush meta cursor/parse position to file
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
## memory store RingBuffer size, should be Math.pow(2,n)
canal.instance.memory.buffer.size = 16384
## memory store RingBuffer used memory unit size , default 1kb
canal.instance.memory.buffer.memunit = 1024 
## meory store gets mode used MEMSIZE or ITEMSIZE
canal.instance.memory.batch.mode = MEMSIZE
canal.instance.memory.rawEntry = true
## detecing config
canal.instance.detecting.enable = false
#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
canal.instance.detecting.sql = select 1
canal.instance.detecting.interval.time = 3
canal.instance.detecting.retry.threshold = 3
canal.instance.detecting.heartbeatHaEnable = false
# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
canal.instance.transaction.size =  1024
# mysql fallback connected to new master should fallback times
canal.instance.fallbackIntervalInSeconds = 60
# network config
canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30
# binlog filter config
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false
canal.instance.filter.dml.insert = false
canal.instance.filter.dml.update = false
canal.instance.filter.dml.delete = false
# binlog format/image check
canal.instance.binlog.format = ROW,STATEMENT,MIXED 
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB
# binlog ddl isolation
canal.instance.get.ddl.isolation = false
# parallel parser config
canal.instance.parser.parallel = true
## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
#canal.instance.parser.parallelThreadSize = 16
## disruptor ringbuffer size, must be power of 2
canal.instance.parser.parallelBufferSize = 256
# table meta tsdb info
canal.instance.tsdb.enable = true
canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername = canal
canal.instance.tsdb.dbPassword = canal
# dump snapshot interval, default 24 hour
canal.instance.tsdb.snapshot.interval = 24
# purge snapshot expire , default 360 hour(15 days)
canal.instance.tsdb.snapshot.expire = 360
#################################################
#########     destinations    #############
#################################################
#canal实例,对应example文件,如果需要多个请将example也复制多分canal.destinations = example,example1,example2
canal.destinations = example
# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5
# set this value to 'true' means that when binlog pos not found, skip to latest.
# WARN: pls keep 'false' in production env, or if you know what you want.
canal.auto.reset.latest.pos.mode = false
canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml
canal.instance.global.mode = spring
canal.instance.global.lazy = false
canal.instance.global.manager.address = ${canal.admin.manager}
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
#canal.instance.global.spring.xml = classpath:spring/default-instance.xml
##################################################
#########         MQ Properties      #############
##################################################
# aliyun ak/sk , support rds/mq
canal.aliyun.accessKey =
canal.aliyun.secretKey =
canal.aliyun.uid=
canal.mq.flatMessage = true
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
# Set this value to "cloud", if you want open message trace feature in aliyun.
canal.mq.accessChannel = local
canal.mq.database.hash = true
canal.mq.send.thread.size = 30
canal.mq.build.thread.size = 8
##################################################
#########          Kafka         #############
##################################################
kafka.bootstrap.servers = 127.0.0.1:9092
kafka.acks = all
kafka.compression.type = none
kafka.batch.size = 16384
kafka.linger.ms = 1
kafka.max.request.size = 1048576
kafka.buffer.memory = 33554432
kafka.max.in.flight.requests.per.connection = 1
kafka.retries = 0
kafka.kerberos.enable = false
kafka.kerberos.krb5.file = "../conf/kerberos/krb5.conf"
kafka.kerberos.jaas.file = "../conf/kerberos/jaas.conf"
##################################################
#########         RocketMQ       #############
##################################################
rocketmq.producer.group = test
rocketmq.enable.message.trace = false
rocketmq.customized.trace.topic =
rocketmq.namespace =
#需要修改的地方,多个值用分号隔开
rocketmq.namesrv.addr = 192.168.6.145:9876;192.168.6.145:9877
rocketmq.retry.times.when.send.failed = 0
rocketmq.vip.channel.enabled = false
rocketmq.tag = canal
##################################################
#########         RabbitMQ       #############
##################################################
rabbitmq.host =
rabbitmq.virtual.host =
rabbitmq.exchange =
rabbitmq.username =
rabbitmq.password =
rabbitmq.deliveryMode =


instance.properties


#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0
# enable gtid use true/false
canal.instance.gtidon=false
# position info
#修改成自己的数据库连接
canal.instance.master.address=192.168.6.145:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=
# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=
# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=
# username/password
#数据库连接账号和密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
# table regex    
#mysql 数据解析关注的表,Perl正则表达式.
#多个正则之间以逗号(,)分隔,转义符需要双斜杠(\\)
#常见例子:
#1.  所有表:.*   or  .*\\..*
#2.  canal schema下所有表: canal\\..*
#3.  canal下的以canal打头的表:canal\\.canal.*
#4.  canal schema下的一张表:canal\\.test1
#5.  多个规则组合使用:canal\\..*,mysql.test1,mysql.test2 (逗号分隔)
canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch
# mq config
#定义的topic主题
canal.mq.topic=canal-topic
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
#################################################


6、启动


#启动
sh bin/startup.sh
#停止
sh bin/stop.sh



Docker方式


1、拉取镜像


docker pull canal/canal-server:v1.1.5


2、创建canal-docker目录存放启动脚本


[root@bogon ~]# mkdir /usr/local/canal-docker


#!/bin/bash
function usage() {
    echo "Usage:"
    echo "  run.sh [CONFIG]"
    echo "example 1 :"
    echo "  run.sh -e canal.instance.master.address=127.0.0.1:3306 \\"
    echo "         -e canal.instance.dbUsername=canal \\"
    echo "         -e canal.instance.dbPassword=canal \\"
    echo "         -e canal.instance.connectionCharset=UTF-8 \\"
    echo "         -e canal.instance.tsdb.enable=true \\"
    echo "         -e canal.instance.gtidon=false \\"
    echo "         -e canal.instance.filter.regex=.*\\\\\\..* "
    echo "example 2 :"
    echo "  run.sh -e canal.admin.manager=127.0.0.1:8089 \\"
    echo "         -e canal.admin.port=11110 \\"
    echo "         -e canal.admin.user=admin \\"
    echo "         -e canal.admin.passwd=4ACFE3202A5FF5CF467898FC58AAB1D615029441"
    exit
}
function check_port() {
    local port=$1
    local TL=$(which telnet)
    if [ -f $TL ]; then
        data=`echo quit | telnet 127.0.0.1 $port| grep -ic connected`
        echo $data
        return
    fi
    local NC=$(which nc)
    if [ -f $NC ]; then
        data=`nc -z -w 1 127.0.0.1 $port | grep -ic succeeded`
        echo $data
        return
    fi
    echo "0"
    return
}
function getMyIp() {
    case "`uname`" in
        Darwin)
         myip=`echo "show State:/Network/Global/IPv4" | scutil | grep PrimaryInterface | awk '{print $3}' | xargs ifconfig | grep inet | grep -v inet6 | awk '{print $2}'`
         ;;
        *)
         myip=`ip route get 1 | awk '{print $NF;exit}'`
         ;;
  esac
  echo $myip
}
CONFIG=${@:1}
#VOLUMNS="-v $DATA:/home/admin/canal-server/logs"
PORTLIST="11110 11111 11112 9100"
PORTS=""
for PORT in $PORTLIST ; do
    #exist=`check_port $PORT`
    exist="0"
    if [ "$exist" == "0" ]; then
        PORTS="$PORTS -p $PORT:$PORT"
    else
        echo "port $PORT is used , pls check"
        exit 1
    fi
done
NET_MODE=""
case "`uname`" in
    Darwin)
        bin_abs_path=`cd $(dirname $0); pwd`
        ;;
    Linux)
        bin_abs_path=$(readlink -f $(dirname $0))
        NET_MODE="--net=host"
        PORTS=""
        ;;
    *)
        bin_abs_path=`cd $(dirname $0); pwd`
        NET_MODE="--net=host"
        PORTS=""
        ;;
esac
BASE=${bin_abs_path}
DATA="$BASE/data"
mkdir -p $DATA
if [ $# -eq 0 ]; then
    usage
elif [ "$1" == "-h" ] ; then
    usage
elif [ "$1" == "help" ] ; then
    usage
fi
MEMORY="-m 4096m"
LOCALHOST=`getMyIp`
cmd="docker run -d -it -h $LOCALHOST $CONFIG --name=canal-server $VOLUMNS $NET_MODE $PORTS $MEMORY canal/canal-server:v1.1.5"
echo $cmd
eval $cmd

3、创建docker容器


sh run.sh -e canal.auto.scan=false \
    -e canal.destinations=test \
    -e canal.instance.master.address=127.0.0.1:3306  \
    -e canal.instance.dbUsername=canal  \
    -e canal.instance.dbPassword=canal  \
    -e canal.instance.connectionCharset=UTF-8 \
    -e canal.instance.tsdb.enable=true \
    -e canal.instance.gtidon=false  \
    -e canal.serverMode=rocketMQ \
    -e rocketmq.namesrv.addr=192.168.139.156:9876 \
    -e rocketmq.producer.group=test \
    -e canal.mq.topic=canal-topic 


参数说明


canal.auto.scan=false   #是否开启自动扫描

canal.destinations=test #对应配置文件中的canal.destinations=example

canal.instance.master.address=127.0.0.1:3306  #数据库链接地址

canal.instance.dbUsername=canal #数据库用户名

canal.instance.dbPassword=canal  #数据库密码

canal.instance.connectionCharset=UTF-8 #编码格式

canal.instance.tsdb.enable=true #切换数据库

canal.instance.gtidon=false  #没搞明白

canal.serverMode=rocketMQ #有tcp, kafka, rocketMQ, rabbitMQ

rocketmq.namesrv.addr=rocketmq.namesrv.addr=192.168.139.156:9876 #多个地址启动之后,连不上rocketmq,其他的mq没有尝试未知

rocketmq.producer.group=test #生产者组

canal.mq.topic=canal-topic#对应的topic


四、代码演示


package com.xiaojie.canal;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.xiaojie.entity.User;
import com.xiaojie.utils.RedisUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
 * @Description:解决redis与数据库数据不一致问题
 * @author: yan
 * @date: 2021.11.25
 */
@Component
@RocketMQMessageListener(consumerGroup = "canal1", topic = "canal-topic", consumeMode = ConsumeMode.ORDERLY
        , selectorType = SelectorType.TAG, selectorExpression = "*")
@Slf4j
public class CanalConsumer implements RocketMQListener<String> {
    @Autowired
    private RedisUtil redisUtil;
    @Override
    public void onMessage(String message) {
        try {
            log.info("接收到的数据是:》》》》》》{}", message);
            JSONObject jsonObj = JSONObject.parseObject(message);
            JSONArray jsonArray = (JSONArray) jsonObj.get("data");
            if (null != jsonArray && jsonArray.size() > 0) {
                User user = JSONObject.parseObject(jsonArray.get(0).toString(), User.class);
                String database = jsonObj.getString("database");
                String table = jsonObj.getString("table");
                String type = jsonObj.getString("type");
                if (StringUtils.isEmpty(type)) {
                    log.info("不用同步数据。。。。。。。。。。。。。。");
                }
                if (type.equals("INSERT") || type.equals("UPDATE")) {
                    log.info("更新数据》》》》》》》》》》》》");
                    redisUtil.set(database + "_" + table + "_" + user.getId(), JSONObject.toJSONString(user), 24 * 60 * 60);
                } else {
                    if (redisUtil.hasKey(database + "_" + table + "_" + user.getId())) {
                        log.info("删除数据》》》》》》》》》》》》");
                        redisUtil.del(database + "_" + table + "_" + user.getId());
                    }
                }
            }
        } catch (Exception e) {
            log.info("数据更新异常》》》》》》》》》》》》", e);
        }
    }
}


完整代码:spring-boot: Springboot整合redis、消息中间件等相关代码


相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
17天前
|
存储 人工智能 Cloud Native
云栖重磅|从数据到智能:Data+AI驱动的云原生数据库
在9月20日2024云栖大会上,阿里云智能集团副总裁,数据库产品事业部负责人,ACM、CCF、IEEE会士(Fellow)李飞飞发表《从数据到智能:Data+AI驱动的云原生数据库》主题演讲。他表示,数据是生成式AI的核心资产,大模型时代的数据管理系统需具备多模处理和实时分析能力。阿里云瑶池将数据+AI全面融合,构建一站式多模数据管理平台,以数据驱动决策与创新,为用户提供像“搭积木”一样易用、好用、高可用的使用体验。
云栖重磅|从数据到智能:Data+AI驱动的云原生数据库
|
9天前
|
NoSQL Redis
Redis的数据淘汰策略有哪些 ?
Redis 提供了 8 种数据淘汰策略,分为淘汰易失数据和淘汰全库数据两大类。易失数据淘汰策略包括:volatile-lru、volatile-lfu、volatile-ttl 和 volatile-random;全库数据淘汰策略包括:allkeys-lru、allkeys-lfu 和 allkeys-random。此外,还有 no-eviction 策略,禁止驱逐数据,当内存不足时新写入操作会报错。
43 16
|
29天前
|
监控 NoSQL Java
场景题:百万数据插入Redis有哪些实现方案?
场景题:百万数据插入Redis有哪些实现方案?
39 1
场景题:百万数据插入Redis有哪些实现方案?
|
19天前
|
SQL 关系型数据库 数据库
国产数据实战之docker部署MyWebSQL数据库管理工具
【10月更文挑战第23天】国产数据实战之docker部署MyWebSQL数据库管理工具
60 4
国产数据实战之docker部署MyWebSQL数据库管理工具
|
9天前
|
缓存 NoSQL 关系型数据库
Redis和Mysql如何保证数据⼀致?
在项目中,为了解决Redis与Mysql的数据一致性问题,我们采用了多种策略:对于低一致性要求的数据,不做特别处理;时效性数据通过设置缓存过期时间来减少不一致风险;高一致性但时效性要求不高的数据,利用MQ异步同步确保最终一致性;而对一致性和时效性都有高要求的数据,则采用分布式事务(如Seata TCC模式)来保障。
43 14
|
9天前
|
存储 NoSQL 算法
Redis分片集群中数据是怎么存储和读取的 ?
Redis集群采用哈希槽分区算法,共有16384个哈希槽,每个槽分配到不同的Redis节点上。数据操作时,通过CRC16算法对key计算并取模,确定其所属的槽和对应的节点,从而实现高效的数据存取。
37 13
|
9天前
|
存储 NoSQL Redis
Redis的数据过期策略有哪些 ?
Redis 采用两种过期键删除策略:惰性删除和定期删除。惰性删除在读取键时检查是否过期并删除,对 CPU 友好但可能积压大量过期键。定期删除则定时抽样检查并删除过期键,对内存更友好。默认每秒扫描 10 次,每次检查 20 个键,若超过 25% 过期则继续检查,单次最大执行时间 25ms。两者结合使用以平衡性能和资源占用。
33 11
|
9天前
|
监控 NoSQL 测试技术
【赵渝强老师】Redis的AOF数据持久化
Redis 是内存数据库,提供数据持久化功能,支持 RDB 和 AOF 两种方式。AOF 以日志形式记录每个写操作,支持定期重写以压缩文件。默认情况下,AOF 功能关闭,需在 `redis.conf` 中启用。通过 `info` 命令可监控 AOF 状态。AOF 重写功能可有效控制文件大小,避免性能下降。
|
9天前
|
存储 监控 NoSQL
【赵渝强老师】Redis的RDB数据持久化
Redis 是内存数据库,提供数据持久化功能以防止服务器进程退出导致数据丢失。Redis 支持 RDB 和 AOF 两种持久化方式,其中 RDB 是默认的持久化方式。RDB 通过在指定时间间隔内将内存中的数据快照写入磁盘,确保数据的安全性和恢复能力。RDB 持久化机制包括创建子进程、将数据写入临时文件并替换旧文件等步骤。优点包括适合大规模数据恢复和低数据完整性要求的场景,但也有数据完整性和一致性较低及备份时占用内存的缺点。
|
17天前
|
关系型数据库 分布式数据库 数据库
云栖大会|从数据到决策:AI时代数据库如何实现高效数据管理?
在2024云栖大会「海量数据的高效存储与管理」专场,阿里云瑶池讲师团携手AMD、FunPlus、太美医疗科技、中石化、平安科技以及小赢科技、迅雷集团的资深技术专家深入分享了阿里云在OLTP方向的最新技术进展和行业最佳实践。