使用Kafka实现Java异步更新通知解决Redis与MySQL数据不一致

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云数据库 Tair(兼容Redis),内存型 2GB
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
简介: 使用Kafka实现Java异步更新通知解决Redis与MySQL数据不一致

使用Kafka实现Java异步更新通知解决Redis与MySQL数据不一致

背景

在高并发的应用场景中,秒杀系统等业务可能导致Redis与MySQL中的数据不一致。通过异步更新通知,我们可以及时发现不一致并采取相应措施,确保系统的稳定性和一致性。

设计思路

我们将设计一个Java程序,定期巡检Redis和MySQL中的库存数据。当发现不一致时,通过Kafka发送异步通知,以便其他系统及时进行处理。

1. Maven依赖

首先,确保在项目的pom.xml文件中添加以下Maven依赖:

<dependencies>
    <!-- MySQL连接驱动 -->
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>8.0.23</version>
    </dependency>
    <!-- Jedis:Java连接Redis的客户端库 -->
    <dependency>
        <groupId>redis.clients</groupId>
        <artifactId>jedis</artifactId>
        <version>3.7.0</version>
    </dependency>
    <!-- Kafka客户端 -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.8.1</version>
    </dependency>
</dependencies>
2. Java代码实现
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import redis.clients.jedis.Jedis;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Properties;
import java.util.Timer;
import java.util.TimerTask;
public class AsyncInventoryNotifier {
    // Redis连接信息
    private static final String REDIS_HOST = "localhost";
    private static final int REDIS_PORT = 6379;
    private static final int REDIS_DB = 0;
    // MySQL连接信息
    private static final String MYSQL_URL = "jdbc:mysql://localhost:3306/ecommerce";
    private static final String MYSQL_USER = "user";
    private static final String MYSQL_PASSWORD = "password";
    // Kafka连接信息
    private static final String KAFKA_BROKER = "localhost:9092";
    private static final String KAFKA_TOPIC = "inventory_updates";
    public static void main(String[] args) {
        // 创建定时任务
        Timer timer = new Timer();
        timer.schedule(new InventoryNotifierTask(), 0, 30 * 60 * 1000); // 每30分钟执行一次
    }
    static class InventoryNotifierTask extends TimerTask {
        @Override
        public void run() {
            System.out.println("Starting async inventory notification...");
            try {
                // 连接Redis
                Jedis jedis = new Jedis(REDIS_HOST, REDIS_PORT);
                jedis.select(REDIS_DB);
                // 连接MySQL
                Connection mysqlConnection = DriverManager.getConnection(MYSQL_URL, MYSQL_USER, MYSQL_PASSWORD);
                // 连接Kafka
                Properties kafkaProps = new Properties();
                kafkaProps.put("bootstrap.servers", KAFKA_BROKER);
                kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
                kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
                KafkaProducer<String, String> producer = new KafkaProducer<>(kafkaProps);
                // 查询所有商品ID
                PreparedStatement preparedStatement = mysqlConnection.prepareStatement("SELECT id FROM products");
                ResultSet resultSet = preparedStatement.executeQuery();
                while (resultSet.next()) {
                    int productId = resultSet.getInt("id");
                    // 从Redis获取缓存库存
                    int redisStock = Integer.parseInt(jedis.get("product:" + productId + ":stock"));
                    // 从MySQL获取实际库存
                    PreparedStatement stockStatement = mysqlConnection.prepareStatement("SELECT stock FROM products WHERE id = ?");
                    stockStatement.setInt(1, productId);
                    ResultSet stockResultSet = stockStatement.executeQuery();
                    int mysqlStock = 0;
                    if (stockResultSet.next()) {
                        mysqlStock = stockResultSet.getInt("stock");
                    }
                    // 检测库存一致性
                    if (redisStock != mysqlStock) {
                        System.out.println("Inventory inconsistency detected for product " + productId +
                                ". Redis: " + redisStock + ", MySQL: " + mysqlStock);
                        // 发送异步通知到Kafka
                        ProducerRecord<String, String> record = new ProducerRecord<>(KAFKA_TOPIC, String.valueOf(productId), "inventory_inconsistency");
                        producer.send(record);
                        System.out.println("Async notification sent to Kafka.");
                    }
                }
                // 关闭连接
                jedis.close();
                mysqlConnection.close();
                producer.close();
            } catch (SQLException e) {
                System.err.println("Error during async inventory notification: " + e.getMessage());
            }
        }
    }
}

代码设计思路解释

  1. 连接到Redis、MySQL和Kafka:
Jedis jedis = new Jedis(REDIS_HOST, REDIS_PORT);
jedis.select(REDIS_DB);
Connection mysqlConnection = DriverManager.getConnection(MYSQL_URL, MYSQL_USER, MYSQL_PASSWORD);
Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", KAFKA_BROKER);
kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(kafkaProps);
  1. 查询商品ID并检测库存一致性:
PreparedStatement preparedStatement = mysqlConnection.prepareStatement("SELECT id FROM products");
ResultSet resultSet = preparedStatement.executeQuery();
while (resultSet.next()) {
    int productId = resultSet.getInt("id");
    int redisStock = Integer.parseInt(jedis.get("product:" + productId + ":stock"));
    PreparedStatement stockStatement = mysqlConnection.prepareStatement("SELECT stock FROM products WHERE id = ?");
    stockStatement.setInt(1, productId);
    ResultSet stockResultSet = stockStatement.executeQuery();
    int mysqlStock = 0;
    if (stockResultSet.next()) {
        mysqlStock = stockResultSet.getInt("stock");
    }
    if (redisStock != mysqlStock) {
        // 发送异步通知到Kafka
        ProducerRecord<String, String> record = new ProducerRecord<>(KAFKA_TOPIC, String.valueOf(productId), "inventory_inconsistency");
        producer.send(record

);

System.out.println(“Async notification sent to Kafka.”);

}

}

3. **异步通知:**
```java
ProducerRecord<String, String> record = new ProducerRecord<>(KAFKA_TOPIC, String.valueOf(productId), "inventory_inconsistency");
producer.send(record);
System.out.println("Async notification sent to Kafka.");
  1. 关闭连接:
jedis.close();
mysqlConnection.close();
producer.close();

通过这个异步更新通知的设计,我们能够在检测到Redis与MySQL数据不一致的情况时,及时发送异步通知到Kafka,以便其他系统能够实时处理这些不一致性。这种设计适用于高并发应用场景,可以在实际生产环境中部署并根据业务需求调整执行频率。

相关文章
|
22天前
|
缓存 NoSQL 关系型数据库
美团面试:MySQL有1000w数据,redis只存20w的数据,如何做 缓存 设计?
美团面试:MySQL有1000w数据,redis只存20w的数据,如何做 缓存 设计?
美团面试:MySQL有1000w数据,redis只存20w的数据,如何做 缓存 设计?
|
1月前
|
数据采集 存储 NoSQL
基于Scrapy-Redis的分布式景点数据爬取与热力图生成
基于Scrapy-Redis的分布式景点数据爬取与热力图生成
193 67
|
16天前
|
存储 缓存 NoSQL
告别数据僵尸!Redis实现自动清理过期键值对
在数据激增的时代,Redis如同内存管理的智能管家,支持键值对的自动过期功能,实现“数据保鲜”。通过`EXPIRE`设定生命倒计时、`TTL`查询剩余时间,结合惰性删除与定期清理策略,Redis高效维护内存秩序。本文以Python实战演示其过期机制,并提供最佳实践指南,助你掌握数据生命周期管理的艺术,让数据优雅退场。
69 0
|
3月前
|
存储 NoSQL 算法
Redis分片集群中数据是怎么存储和读取的 ?
Redis集群采用的算法是哈希槽分区算法。Redis集群中有16384个哈希槽(槽的范围是 0 -16383,哈希槽),将不同的哈希槽分布在不同的Redis节点上面进行管理,也就是说每个Redis节点只负责一部分的哈希槽。在对数据进行操作的时候,集群会对使用CRC16算法对key进行计算并对16384取模(slot = CRC16(key)%16383),得到的结果就是 Key-Value 所放入的槽,通过这个值,去找到对应的槽所对应的Redis节点,然后直接到这个对应的节点上进行存取操作
|
3月前
|
缓存 NoSQL 关系型数据库
Redis和Mysql如何保证数据⼀致?
1. 先更新Mysql,再更新Redis,如果更新Redis失败,可能仍然不⼀致 2. 先删除Redis缓存数据,再更新Mysql,再次查询的时候在将数据添加到缓存中 这种⽅案能解决1 ⽅案的问题,但是在⾼并发下性能较低,⽽且仍然会出现数据不⼀致的问题,⽐如线程1删除了 Redis缓存数据,正在更新Mysql,此时另外⼀个查询再查询,那么就会把Mysql中⽼数据⼜查到 Redis中 1. 使用MQ异步同步, 保证数据的最终一致性 我们项目中会根据业务情况 , 使用不同的方案来解决Redis和Mysql的一致性问题 : 1. 对于一些一致性要求不高的场景 , 不做处理例如 : 用户行为数据 ,
|
3月前
|
NoSQL Redis
Redis的数据淘汰策略有哪些 ?
Redis 提供 8 种数据淘汰策略: 淘汰易失数据(具有过期时间的数据) 1. volatile-lru(least recently used):从已设置过期时间的数据集(server.db[i].expires)中挑选最近最少使用的数据淘汰 2. volatile-lfu(least frequently used):从已设置过期时间的数据集(server.db[i].expires)中挑选最不经常使用的数据淘汰 3. volatile-ttl:从已设置过期时间的数据集(server.db[i].expires)中挑选将要过期的数据淘汰 4. volatile-random:从已设置过期
|
3月前
|
NoSQL Redis
Redis的数据持久化策略有哪些 ?
Redis 提供了两种方式,实现数据的持久化到硬盘。 1. RDB 持久化(全量),是指在指定的时间间隔内将内存中的数据集快照写入磁盘。 2. AOF持久化(增量),以日志的形式记录服务器所处理的每一个写、删除操作 RDB和AOF一起使用, 在Redis4.0版本支持混合持久化方式 ( 设置 aof-use-rdb-preamble yes )
|
3月前
|
存储 NoSQL Redis
Redis的数据过期策略有哪些 ?
1. 惰性删除 :只会在取出 key 的时候才对数据进行过期检查。这样对 CPU 最友好,但是可能会造成太多过期 key 没有被删除。数据到达过期时间,不做处理。等下次访问该数据时,我们需要判断 a. 如果未过期,返回数据 b. 发现已过期,删除,返回nil 2. 定期删除 : 每隔一段时间抽取一批 key 执行删除过期 key 操作。并且,Redis 底层会通过限制删除操作执行的时长和频率来减少删除操作对 CPU 时间的影响。默认情况下 Redis 定期检查的频率是每秒扫描 10 次,用于定期清除过期键。当然此值还可以通过配置文件进行设置,在 redis.conf 中修改配置“hz”
|
5月前
|
消息中间件 存储 缓存
kafka 的数据是放在磁盘上还是内存上,为什么速度会快?
Kafka的数据存储机制通过将数据同时写入磁盘和内存,确保高吞吐量与持久性。其日志文件按主题和分区组织,使用预写日志(WAL)保证数据持久性,并借助操作系统的页缓存加速读取。Kafka采用顺序I/O、零拷贝技术和批量处理优化性能,支持分区分段以实现并行处理。示例代码展示了如何使用KafkaProducer发送消息。
|
8月前
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
302 1

推荐镜像

更多