分布式项目中锁的应用(本地锁-_redis【setnx】-_redisson-_springcache)-fen-bu-shi-xiang-mu-zhong-suo-de-ying-yong--ben-de-suo--redissetnx-springcache-redisson(一)https://developer.aliyun.com/article/1469578
数据一致性
写模式,会存在数据一致性问题: 1.加读写锁实现(所以对一致性高的数据不要放在缓存里) 2.引入canal,感知mysql更新去更新缓存 3.读多写多,直接查数据库
1.双写模式和失效模式与最终一致性(指修改数据方案)
注:双写模式和失效模式都会导致数据一致性问题(写和读操作并发时导致,解决,读与写操作加读写锁) 双写模式: 描述:同时写 漏洞:缓存有脏数据。操作1写缓存慢于操作2写缓存,导致缓存与DB数据不一致 解决: 方案1:写数据库+写缓存整个加锁 方案2:业务是否允许暂时性数据不一致问题,若允许则给数据设置一个过期时间即可 失效模式: 描述:DB写完,删除缓存 注:下图有错误,用户3先读db-1,然后用户2再写db-2,用户2删缓存,用户3写缓存【写入脏数据1】 漏洞:缓存有脏数据。用户3将db-1写入了缓存 解决: 方案1:写数据库+写缓存整个加锁 方案2:业务是否允许暂时性数据不一致问题,若允许则给数据设置一个过期时间即可 二者都有脏数据的可能性
2.解决方案(选用失效模式)
三种方案: 1.仅加过期时间即可(首先考虑业务造成脏数据的概率,例如用户维度数据(订单数据、用户数据)并发几率很小,每过一段时间触发读的主动更新) 2.canal订阅binlog的方式(菜单、商品介绍等基础数据)【完美解决】 3.加读写锁 4.实时性、一致性要求高的数据,应该直接查数据库 最终方案: 1.所有数据加上过期时间 2.读写数据加分布式读写锁(经常写的数据不要放在缓存里)
2.1.canal
canal: 阿里开源的中间件,可以作为数据库的从服务器,订阅数据库的binlog日志,数据更新canal也同步更新redis 另一作用: 解析不同的表日志分析计算生成一张新的表记录 案例: 根据用户访问的商品记录、订单记录 + 商品记录表共同生成一张用户推荐表,展示首页的数据(每个用户的首页推荐数据是不一样的)
SpringCache
简介: 通过注解实现缓存;属于spring内容不是springboot 文档: https://docs.spring.io/spring/docs/current/spring-framework-reference/integration.html#spring-integration
1.整合
注:name::key,缓存区域化指name,key是键 1.引入SpringCache依赖 <!--Spring Cache,使用注解简化开发--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-cache</artifactId> </dependency> 2.引入redis依赖 <!--redis启动器--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> 3.这一步只是查看一下自动配置类+属性类,没有实际编码动作 1)自动配置以下内容: 属性类:CacheProperties.java【属性以spring.cache开头】 自动配置类:CacheAutoConfiguration.java【会导入RedisCacheConfiguration配置】 redis自动配置类:RedisCacheConfiguration.java【往IOC注入了redis缓存管理器】 redis缓存管理器:RedisCacheManager【会初始化所有缓存(决定每个缓存使用什么配置)】 【如果RedisCacheConfiguration有就使用,没有就使用默认的(导致缓存使用默认配置,默认配置值来自于this.cacheProperties.getRedis())】 注:缓存区域化只是springcache的内容,在redis里数据存放没有区域化的概念,体现为 name::key 4.注解解释: @Cacheable:更新缓存【读操作:如果当前缓存存在方法不被执行,不存在则执行get方法并更新缓存】 @CacheEvict:删除缓存【写操作:失效模式,方法执行完删除缓存】 @CachePut:更新缓存【写操作:双写模式,方法执行完更新缓存】 @Caching:组合以上多个缓存操作 @CacheConfig:在类级别共享缓存的相同配置 5.属性 spring: redis: host: 192.168.56.10 port: 6379 cache: type: redis # 使用redis作为缓存 redis: time-to-live: 3600s # 过期时间 # key-prefix: CACHE_ # 会导致自己在@Cacheable里设置的名字失效,所以这里不指定 use-key-prefix: true # key值加前缀 cache-null-values: true # 缓存控制 6.默认行为: key自动生成:缓存名字::key值 默认过期时间:-1 value值默认序列化方式:jdk序列化【值使用jdk序列化后存放到redis】 7.自定义行为 缓存名字:value = {"category"}【区域划分】 key值:key = "'levelCategorys'" 【接收一个SpEl表达式,可以获取当前方法名,参数列表,单引号表字符串】 【使用方法名作为key:"#root.method.name"】 过期时间:在application.yml中指定 修改序列化方式要在配置类中修改 8.配置类【添加@EnableCache使用springcache】 @EnableConfigurationProperties(CacheProperties.class) @EnableCaching @Configuration public class MyCacheConfig { // @Autowired // CacheProperties cacheProperties; /** * 需要将配置文件中的配置设置上 * 1、使配置类生效 * 1)开启配置类与属性绑定功能EnableConfigurationProperties * * @ConfigurationProperties(prefix = "spring.cache") public class CacheProperties * 2)注入就可以使用了 * @Autowired CacheProperties cacheProperties; * 3)直接在方法参数上加入属性参数redisCacheConfiguration(CacheProperties redisProperties) * 自动从IOC容器中找 * <p> * 2、给config设置上 */ @Bean RedisCacheConfiguration redisCacheConfiguration(CacheProperties cacheProperties) { RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig(); config = config.serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(new StringRedisSerializer())); config = config.serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(new GenericJackson2JsonRedisSerializer())); // 当自己往IOC注入了RedisCacheConfiguration配置类时,以下参数全都失效,需要手动设置 CacheProperties.Redis redisProperties = cacheProperties.getRedis(); if (redisProperties.getTimeToLive() != null) { config = config.entryTtl(redisProperties.getTimeToLive()); } if (redisProperties.getKeyPrefix() != null) { config = config.prefixCacheNameWith(redisProperties.getKeyPrefix()); } if (!redisProperties.isCacheNullValues()) { config = config.disableCachingNullValues(); } if (!redisProperties.isUseKeyPrefix()) { config = config.disableKeyPrefix(); } return config; } } 9.使用案例:在service层代码上添加注解 /** * 查出所有1级分类 */ @Cacheable(value = {"category"}, key = "'level1Categorys'") @Override public List<CategoryEntity> getLevel1Categorys() { System.out.println("调用了getLevel1Categorys..."); // 查询父id=0 return baseMapper.selectList(new QueryWrapper<CategoryEntity>().eq("parent_cid", 0)); }
redis缓存管理器源码,会初始化过期时间、key前缀、空数据是否缓存、是否使用缓存前缀
2.读模式与写模式
2.1.读模式
直接在get方法上添加@Cacheable即可 /** * 查出所有1级分类 */ @Cacheable(value = {"category"}, key = "'level1Categorys'") @Override public List<CategoryEntity> getLevel1Categorys() { System.out.println("调用了getLevel1Categorys..."); // 查询父id=0 return baseMapper.selectList(new QueryWrapper<CategoryEntity>().eq("parent_cid", 0)); }
2.2.写模式
失效模式
/** * 级联更新 * 缓存策略:失效模式,方法执行完删除缓存 */ @CacheEvict(value = "category", key = "'level1Categorys'") @Transactional @Override public void updateCascade(CategoryEntity category) { this.updateById(category); if (!StringUtils.isEmpty(category.getName())) { // 更新冗余表 categoryBrandRelationService.updateCategory(category.getCatId(), category.getName()); // TODO 更新其他冗余表 } }
双写模式
/** * 级联更新 * 缓存策略:双写模式,方法执行完更新缓存 */ @CachePut(value = "category", key = "'level1Categorys'") @Transactional @Override public void updateCascade(CategoryEntity category) { this.updateById(category); if (!StringUtils.isEmpty(category.getName())) { // 更新冗余表 categoryBrandRelationService.updateCategory(category.getCatId(), category.getName()); // TODO 更新其他冗余表 } }
2.3.@Caching+失效模式+解决击穿、雪崩、穿透(分布式锁)
失效模式,级联更新类型时,删除与类型相关的所有缓存 两种方式: 方式1:指定每个key @Caching(evict = { @CacheEvict(value = "category", key = "'getLevel1Categorys'"), @CacheEvict(value = "category", key = "'getCatalogJson'") }) 方式2:直接删除区域化内所有缓存 @CacheEvict(value = {"category"}, allEntries = true)
/** * 级联更新所有关联表的冗余数据 * 缓存策略:失效模式,方法执行完删除缓存 */ @CacheEvict(value = {"category"}, allEntries = true) @Transactional @Override public void updateCascade(CategoryEntity category) { this.updateById(category); if (!StringUtils.isEmpty(category.getName())) { // 更新冗余表 categoryBrandRelationService.updateCategory(category.getCatId(), category.getName()); // TODO 更新其他冗余表 } } /** * 查出所有1级分类 */ @Cacheable(value = {"category"}, key = "'getLevel1Categorys'") @Override public List<CategoryEntity> getLevel1Categorys() { System.out.println("调用了getLevel1Categorys..."); // 查询父id=0 return baseMapper.selectList(new QueryWrapper<CategoryEntity>().eq("parent_cid", 0)); } /** * 查询三级分类并封装成Map返回 * 使用SpringCache注解方式简化缓存设置 */ @Cacheable(value = {"category"}, key = "'getCatalogJson'") @Override public Map<String, List<Catalog2VO>> getCatalogJsonWithSpringCache() { // 未命中缓存 // 1.抢占分布式锁,同时设置过期时间【不使用读写锁,因为就是为了防止缓存击穿】 RLock lock = redisson.getLock(CategoryConstant.LOCK_KEY_CATALOG_JSON); lock.lock(30, TimeUnit.SECONDS); try { // 2.double check,占锁成功需要再次检查缓存 // 查询非空即返回 String catlogJSON = redisTemplate.opsForValue().get("getCatalogJson"); if (!StringUtils.isEmpty(catlogJSON)) { // 查询成功直接返回不需要查询DB Map<String, List<Catalog2VO>> result = JSON.parseObject(catlogJSON, new TypeReference<Map<String, List<Catalog2VO>>>() { }); return result; } // 3.查询所有分类,按照parentCid分组 Map<Long, List<CategoryEntity>> categoryMap = baseMapper.selectList(null).stream() .collect(Collectors.groupingBy(key -> key.getParentCid())); // 4.获取1级分类 List<CategoryEntity> level1Categorys = categoryMap.get(0L); // 5.封装数据 Map<String, List<Catalog2VO>> result = level1Categorys.stream().collect(Collectors.toMap(key -> key.getCatId().toString(), l1Category -> { // 6.查询2级分类,并封装成List<Catalog2VO> List<Catalog2VO> catalog2VOS = categoryMap.get(l1Category.getCatId()) .stream().map(l2Category -> { // 7.查询3级分类,并封装成List<Catalog3VO> List<Catalog2VO.Catalog3Vo> catalog3Vos = categoryMap.get(l2Category.getCatId()) .stream().map(l3Category -> { // 封装3级分类VO Catalog2VO.Catalog3Vo catalog3Vo = new Catalog2VO.Catalog3Vo(l2Category.getCatId().toString(), l3Category.getCatId().toString(), l3Category.getName()); return catalog3Vo; }).collect(Collectors.toList()); // 封装2级分类VO返回 Catalog2VO catalog2VO = new Catalog2VO(l1Category.getCatId().toString(), catalog3Vos, l2Category.getCatId().toString(), l2Category.getName()); return catalog2VO; }).collect(Collectors.toList()); return catalog2VOS; })); return result; } finally { // 8.释放锁 lock.unlock(); } }
4.细节
2.1.@ConfigurationProperties标注方法上使用
使用@ConfigurationProperties标注在方法上使用时必须配合@Bean + @Configuration使用 @Configuration public class DruidDataSourceConfig { /** * DataSource 配置 * @return */ @ConfigurationProperties(prefix = "spring.datasource.druid.read") @Bean(name = "readDruidDataSource") public DataSource readDruidDataSource() { return new DruidDataSource(); } /** * DataSource 配置 * @return */ @ConfigurationProperties(prefix = "spring.datasource.druid.write") @Bean(name = "writeDruidDataSource") @Primary public DataSource writeDruidDataSource() { return new DruidDataSource(); } }
spring.datasource.druid.write.username=root spring.datasource.druid.write.password=1 spring.datasource.druid.write.driver-class-name=com.mysql.jdbc.Driver spring.datasource.druid.read.url=jdbc:mysql://localhost:3306/jpa spring.datasource.druid.read.username=root spring.datasource.druid.read.password=1 spring.datasource.druid.read.driver-class-name=com.mysql.jdbc.Driver
2.2.@ConfigurationProperties标注类上使用
@ConfigurationProperties(prefix = "spring.datasource") @Component @Setter @Getter public class DatasourcePro { private String url; private String username; private String password; // 配置文件中是driver-class-name, 转驼峰命名便可以绑定成 private String driverClassName; private String type; } @Controller @RequestMapping(value = "/config") public class ConfigurationPropertiesController { @Autowired private DatasourcePro datasourcePro; @RequestMapping("/test") @ResponseBody public Map<String, Object> test(){ Map<String, Object> map = new HashMap<>(); map.put("url", datasourcePro.getUrl()); map.put("userName", datasourcePro.getUsername()); map.put("password", datasourcePro.getPassword()); map.put("className", datasourcePro.getDriverClassName()); map.put("type", datasourcePro.getType()); return map; } }
spring.datasource.url=jdbc:mysql://127.0.0.1:8888/test?useUnicode=false&autoReconnect=true&characterEncoding=utf-8 spring.datasource.username=root spring.datasource.password=root spring.datasource.driver-class-name=com.mysql.jdbc.Driver spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
2.3. @EnableConfigurationProperties标注在类上使用
@EnableConfigurationProperties(prefix = "spring.datasource.druid.read") @Configuration public class DruidDataSourceConfig { /** * DataSource 配置 * @return */ @ConfigurationProperties(prefix = "spring.datasource.druid.read") @Bean(name = "readDruidDataSource") public DataSource readDruidDataSource(JDBCProperties properties) { DruidDataSource dataSource = new DruidDataSource(); // dataSource.setUrl(properties.getXX) return dataSource; } /** * DataSource 配置 * @return */ @ConfigurationProperties(prefix = "spring.datasource.druid.write") @Bean(name = "writeDruidDataSource") @Primary public DataSource writeDruidDataSource() { return new DruidDataSource(); } }
5.spring-cache不足
1、读模式: 缓存穿透:查询一个DB不存在的数据。解决:缓存空数据;ache-null-values=true【布隆过滤器】 缓存击穿:大量并发进来同时查询一个正好过期的数据。解决:加锁; 默认未加锁【sync = true】只是本地锁 缓存雪崩:大量的key同时过期。解决:加上过期时间。: spring.cache.redis.time-to-live= 360000s 2、写模式:(缓存与数据库一致)(没有解决) 1)、手动读写加锁。 2)、引入canal,感知mysql的更新去更新缓存 3)、读多写多,直接去查询数据库就行 总结: 常规数据(读多写少,即时性,一致性要求不高的数据)﹔完全可以使用Spring-Cache,写模式(只要缓存的数据有过期时间就可以) 特殊数据:特殊设计(canal、读写锁) 在RedisCache里面打断点查看get同步方法
最终版:失效模式+解决击穿、雪崩、穿透(本地锁)
/** * 级联更新所有关联表的冗余数据 * 缓存策略:失效模式,方法执行完删除缓存 */ @CacheEvict(value = {"category"}, allEntries = true) @Transactional @Override public void updateCascade(CategoryEntity category) { this.updateById(category); if (!StringUtils.isEmpty(category.getName())) { // 更新冗余表 categoryBrandRelationService.updateCategory(category.getCatId(), category.getName()); // TODO 更新其他冗余表 } } /** * 查出所有1级分类 */ @Cacheable(value = {"category"}, key = "'getLevel1Categorys'", sync = true) @Override public List<CategoryEntity> getLevel1Categorys() { System.out.println("调用了getLevel1Categorys..."); // 查询父id=0 return baseMapper.selectList(new QueryWrapper<CategoryEntity>().eq("parent_cid", 0)); } /** * 查询三级分类并封装成Map返回 * 使用SpringCache注解方式简化缓存设置 */ @Cacheable(value = {"category"}, key = "'getCatalogJson'", sync = true) @Override public Map<String, List<Catalog2VO>> getCatalogJsonWithSpringCache() { // 未命中缓存 // 1.double check,占锁成功需要再次检查缓存(springcache使用本地锁) // 查询非空即返回 String catlogJSON = redisTemplate.opsForValue().get("getCatalogJson"); if (!StringUtils.isEmpty(catlogJSON)) { // 查询成功直接返回不需要查询DB Map<String, List<Catalog2VO>> result = JSON.parseObject(catlogJSON, new TypeReference<Map<String, List<Catalog2VO>>>() { }); return result; } // 2.查询所有分类,按照parentCid分组 Map<Long, List<CategoryEntity>> categoryMap = baseMapper.selectList(null).stream() .collect(Collectors.groupingBy(key -> key.getParentCid())); // 3.获取1级分类 List<CategoryEntity> level1Categorys = categoryMap.get(0L); // 4.封装数据 Map<String, List<Catalog2VO>> result = level1Categorys.stream().collect(Collectors.toMap(key -> key.getCatId().toString(), l1Category -> { // 5.查询2级分类,并封装成List<Catalog2VO> List<Catalog2VO> catalog2VOS = categoryMap.get(l1Category.getCatId()) .stream().map(l2Category -> { // 7.查询3级分类,并封装成List<Catalog3VO> List<Catalog2VO.Catalog3Vo> catalog3Vos = categoryMap.get(l2Category.getCatId()) .stream().map(l3Category -> { // 封装3级分类VO Catalog2VO.Catalog3Vo catalog3Vo = new Catalog2VO.Catalog3Vo(l2Category.getCatId().toString(), l3Category.getCatId().toString(), l3Category.getName()); return catalog3Vo; }).collect(Collectors.toList()); // 封装2级分类VO返回 Catalog2VO catalog2VO = new Catalog2VO(l1Category.getCatId().toString(), catalog3Vos, l2Category.getCatId().toString(), l2Category.getName()); return catalog2VO; }).collect(Collectors.toList()); return catalog2VOS; })); return result; }
StringRedisTemplate
1.一些使用案例
1.1.BoundHashOperations
/** * 根据用户信息获取购物车redis操作对象 */ private BoundHashOperations<String, Object, Object> getCartOps() { // 获取用户登录信息 UserInfoTO userInfo = CartInterceptor.threadLocal.get(); String cartKey = ""; if (userInfo.getUserId() != null) { // 登录态,使用用户购物车 cartKey = CartConstant.CART_PREFIX + userInfo.getUserId(); } else { // 非登录态,使用游客购物车 cartKey = CartConstant.CART_PREFIX + userInfo.getUserKey(); } // 绑定购物车的key操作Redis BoundHashOperations<String, Object, Object> operations = redisTemplate.boundHashOps(cartKey); return operations; }
get方法:
/** * 根据skuId获取购物车商品信息 */ @Override public CartItemVO getCartItem(Long skuId) { // 获取购物车redis操作对象 BoundHashOperations<String, Object, Object> cartOps = getCartOps(); String cartItemJSONString = (String) cartOps.get(skuId.toString()); CartItemVO cartItemVo = JSON.parseObject(cartItemJSONString, CartItemVO.class); return cartItemVo; }
put方法:
/** * 添加sku商品到购物车 */ @Override public CartItemVO addToCart(Long skuId, Integer num) throws ExecutionException, InterruptedException { // 获取购物车redis操作对象 BoundHashOperations<String, Object, Object> operations = getCartOps(); // 获取商品 String cartItemJSONString = (String) operations.get(skuId.toString()); if (StringUtils.isEmpty(cartItemJSONString)) { // 购物车不存在此商品,需要将当前商品添加到购物车中 CartItemVO cartItem = new CartItemVO(); CompletableFuture<Void> getSkuInfoFuture = CompletableFuture.runAsync(() -> { // 远程查询当前商品信息 R r = productFeignService.getInfo(skuId); SkuInfoVO skuInfo = r.getData("skuInfo", new TypeReference<SkuInfoVO>() { }); cartItem.setSkuId(skuInfo.getSkuId());// 商品ID cartItem.setTitle(skuInfo.getSkuTitle());// 商品标题 cartItem.setImage(skuInfo.getSkuDefaultImg());// 商品默认图片 cartItem.setPrice(skuInfo.getPrice());// 商品单价 cartItem.setCount(num);// 商品件数 cartItem.setCheck(true);// 是否选中 }, executor); CompletableFuture<Void> getSkuAttrValuesFuture = CompletableFuture.runAsync(() -> { // 远程查询attrName:attrValue信息 List<String> skuSaleAttrValues = productFeignService.getSkuSaleAttrValues(skuId); cartItem.setSkuAttrValues(skuSaleAttrValues); }, executor); CompletableFuture.allOf(getSkuInfoFuture, getSkuAttrValuesFuture).get(); operations.put(skuId.toString(), JSON.toJSONString(cartItem)); return cartItem; } else { // 当前购物车已存在此商品,修改当前商品数量 CartItemVO cartItem = JSON.parseObject(cartItemJSONString, CartItemVO.class); cartItem.setCount(cartItem.getCount() + num); operations.put(skuId.toString(), JSON.toJSONString(cartItem)); return cartItem; } }