前言
该篇为redis使用系列的第六篇,在springboot整合redis基础上使用BloomFilter 布隆过滤器。
至于布隆过滤器的作用和实现的简单原理,该篇不做讲述,还不了解的可以先看我这篇
《JAVA 你应该有所了解的布隆过滤器》
OK,再啰嗦一下,相信点进来这篇的小伙伴,多半都跟redis缓存穿透有点渊源,是的,查询redis,为了防止他人恶意使用不存在的key访问redis,造成大批量的出现缓存穿透现象(直接查询数据库,导致数据库扛不住)。
而加入布隆过滤器,能很大程度去解决这个问题。
正文
首先是pom.xml文件,加入我们这次使用redis & BloomFilter 的核心依赖包:
<!--使用Redis--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <!--借助guava的布隆过滤器--> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>19.0</version> </dependency>
然后是yml的redis连接信息:
spring: redis: database: 3 host: 127.0.0.1 port: 6379 password: 12345 jedis.pool.max-idle: 100 jedis.pool.max-wait: -1ms jedis.pool.min-idle: 2 timeout: 2000ms
如果是一般的使用redis存字符串的话,使用StringRedisTemplate,就不需要配置序列化。
但是咱们这里使用的是RedisTemplate<String, Object> redisTemplate ,存储的是对象,所以为了防止存入的对象值在查看的时候不显示乱码,就需要配置相关的序列化(其实我们存的bit结构数据,布隆过滤器存值分分钟都是百万级别的,会因为数据量太大redis客户端也没办法显示,不过不影响使用)。
RedisConfig.class:
import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.PropertyAccessor; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Charsets; import com.google.common.hash.Funnel; import com.jc.mytest.util.BloomFilterHelper; import org.springframework.cache.CacheManager; import org.springframework.cache.annotation.EnableCaching; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.cache.RedisCacheManager; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer; /** * @Author: JCccc * @CreateTime: 2018-09-11 * @Description: */ @Configuration @EnableCaching public class RedisConfig { @Bean public CacheManager cacheManager(RedisConnectionFactory connectionFactory) { RedisCacheManager rcm=RedisCacheManager.create(connectionFactory); return rcm; } @Bean public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) { RedisTemplate<String, Object> redisTemplate = new RedisTemplate<String, Object>(); redisTemplate.setConnectionFactory(factory); Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class); ObjectMapper om = new ObjectMapper(); om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); jackson2JsonRedisSerializer.setObjectMapper(om); //序列化设置 ,这样计算是正常显示的数据,也能正常存储和获取 redisTemplate.setKeySerializer(jackson2JsonRedisSerializer); redisTemplate.setValueSerializer(jackson2JsonRedisSerializer); redisTemplate.setHashKeySerializer(jackson2JsonRedisSerializer); redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer); return redisTemplate; } @Bean public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory factory) { StringRedisTemplate stringRedisTemplate = new StringRedisTemplate(); stringRedisTemplate.setConnectionFactory(factory); return stringRedisTemplate; } //初始化布隆过滤器,放入到spring容器里面 @Bean public BloomFilterHelper<String> initBloomFilterHelper() { return new BloomFilterHelper<>((Funnel<String>) (from, into) -> into.putString(from, Charsets.UTF_8).putString(from, Charsets.UTF_8), 1000000, 0.01); } }
BloomFilterHelper .calss:
import com.google.common.base.Preconditions; import com.google.common.hash.Funnel; import com.google.common.hash.Hashing; public class BloomFilterHelper<T> { private int numHashFunctions; private int bitSize; private Funnel<T> funnel; public BloomFilterHelper(Funnel<T> funnel, int expectedInsertions, double fpp) { Preconditions.checkArgument(funnel != null, "funnel不能为空"); this.funnel = funnel; // 计算bit数组长度 bitSize = optimalNumOfBits(expectedInsertions, fpp); // 计算hash方法执行次数 numHashFunctions = optimalNumOfHashFunctions(expectedInsertions, bitSize); } public int[] murmurHashOffset(T value) { int[] offset = new int[numHashFunctions]; long hash64 = Hashing.murmur3_128().hashObject(value, funnel).asLong(); int hash1 = (int) hash64; int hash2 = (int) (hash64 >>> 32); for (int i = 1; i <= numHashFunctions; i++) { int nextHash = hash1 + i * hash2; if (nextHash < 0) { nextHash = ~nextHash; } offset[i - 1] = nextHash % bitSize; } return offset; } /** * 计算bit数组长度 */ private int optimalNumOfBits(long n, double p) { if (p == 0) { // 设定最小期望长度 p = Double.MIN_VALUE; } int sizeOfBitArray = (int) (-n * Math.log(p) / (Math.log(2) * Math.log(2))); return sizeOfBitArray; } /** * 计算hash方法执行次数 */ private int optimalNumOfHashFunctions(long n, long m) { int countOfHash = Math.max(1, (int) Math.round((double) m / n * Math.log(2))); return countOfHash; } }
然后是具体的布隆过滤器配合redis使用的 方法类,RedisBloomFilter.class :
import com.google.common.base.Preconditions; import com.jc.mytest.util.BloomFilterHelper; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Service; /** * @Author : JCccc * @CreateTime : 2020/4/23 * @Description : **/ @Service public class RedisBloomFilter { @Autowired private RedisTemplate redisTemplate; /** * 根据给定的布隆过滤器添加值 */ public <T> void addByBloomFilter(BloomFilterHelper<T> bloomFilterHelper, String key, T value) { Preconditions.checkArgument(bloomFilterHelper != null, "bloomFilterHelper不能为空"); int[] offset = bloomFilterHelper.murmurHashOffset(value); for (int i : offset) { System.out.println("key : " + key + " " + "value : " + i); redisTemplate.opsForValue().setBit(key, i, true); } } /** * 根据给定的布隆过滤器判断值是否存在 */ public <T> boolean includeByBloomFilter(BloomFilterHelper<T> bloomFilterHelper, String key, T value) { Preconditions.checkArgument(bloomFilterHelper != null, "bloomFilterHelper不能为空"); int[] offset = bloomFilterHelper.murmurHashOffset(value); for (int i : offset) { System.out.println("key : " + key + " " + "value : " + i); if (!redisTemplate.opsForValue().getBit(key, i)) { return false; } } return true; } }
到这里,其实整合redis并使用BloomFilter布隆过滤器 的代码都已经完毕了。
存入值代码分析
在使用测试之前,稍微讲讲这里的流程、思路,我们把目光放到 存入值到布隆过滤器的方法addByBloomFilter上,
如果你了解过或者看过我文章开头说的那篇《JAVA 你应该有所了解的布隆过滤器》 ,那么你对这个方法的阅读就是一目了然。
红色框内的代码,就是把我们需要存入的value,通过算法计算出相关需要绑定 1的 bit位 的数组。
而蓝色框内的代码,就是将计算完得到的bit位数组,存入redis里面的bit结构里面,i就是数组内的bit位位置,每个都设置为true。
校验值代码分析
红色框内的代码,就是把我们需要校验的value,通过算法计算出相关需要绑定 1的 bit位 的数组。
而蓝色框内的代码,就是遍历计算完得到的bit位数组,检查在redis的bit结构里,是不是每一个都绑定了1(是不是都是true),
根据布隆过滤器的原则,只要有一个不是1(true),那么就是这个值不存在!
简单写两个接口,展示一下效果(测试直接写了两个get接口,没有结合其他业务逻辑):
@Autowired RedisBloomFilter redisBloomFilter; @Autowired private BloomFilterHelper bloomFilterHelper; @ResponseBody @RequestMapping("/add") public String addBloomFilter(@RequestParam ("orderNum") String orderNum) { try { redisBloomFilter.addByBloomFilter(bloomFilterHelper,"bloom",orderNum); } catch (Exception e) { e.printStackTrace(); return "添加失败"; } return "添加成功"; } @ResponseBody @RequestMapping("/check") public boolean checkBloomFilter(@RequestParam ("orderNum") String orderNum) { boolean b = redisBloomFilter.includeByBloomFilter(bloomFilterHelper, "bloom", orderNum); return b; }
先调用存值接口:
可以看到对于 存入的值,order20200423 ,计算出来7个bit位,这些都设置成true了。
接下来调用一下校验值接口,可以看到同样的值order20200423 计算出来的bit位一样,而且redis里面都是true,所以返回了存在(但是咱们知道布隆过滤器年迈,对于存在的检测,会随着存入的数据量的增大而慢慢出现误判):
那么咱们校验一个不存在的 值, 因为缓存穿透就是恶意查询不存在的值,例如id为 -1 这种情况(毕竟很多项目里,id的存值或者有一些key都是不考虑到负数的,而且接口还不做校验,所以-1基本不存在),
这时候布隆过滤器 校验结果,不存在!这个非常值得信赖,百分百是不存在的:
ps:
那么很多小伙伴是不是觉得布隆过滤器只能用来判断不存在 ,因为这个可信。 感觉有点不是滋味。
布隆过滤器因为保证效率,导致误判存在的情况的出现,这种情况其实也是能补救,解决方案思路很多,
我个人简单说一个,如果某个值判断存在,失误了,你发现了。 你可以把这个值存起来,例如就是一个误判列表那种,用redis的list结构也行。这样再加一层误判查询检索的逻辑环节,这样也是能起到一定程度的解决。
毕竟100万条数据,出现误判总量也就1000条, 也就是误判列表也就存1000个数据。
具体需不需使用到它的判断存在? 如果业务范围允许误判率跟布隆过滤器的误判率是相差不大的,也能使用。
对于一般的场景,咱们就是使用它来筛选不存在的值的。 因为它的 不存在 是肯定的。
OK,该篇就到此。