Redis大总结,要言不烦,字字珠玑(下)

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
日志服务 SLS,月写入数据量 50GB 1个月
简介: Redis大总结,要言不烦,字字珠玑(下)

🐴 五、SpringBoot整合



🌈 5.1 源码分析


@Bean
//我们自己可以定义一个RedisTemplate来替换默认的
@ConditionalOnMissingBean(name = "redisTemplate")
@ConditionalOnSingleCandidate(RedisConnectionFactory.class)
public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory
redisConnectionFactory) {
  //默认的RedisTemplate没有过多的设置,redis对象都是需要序列化
  //两个泛型都是Object, Object的类型,我们后续需要强制转换<String, Object>
  RedisTemplate<Object, Object> template = new RedisTemplate<>();
  template.setConnectionFactory(redisConnectionFactory);
  return template;
}
@Bean
//由于String类型是redis中最常用的类型,所以单独出来一个bean
@ConditionalOnMissingBean
@ConditionalOnSingleCandidate(RedisConnectionFactory.class)
public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory
redisConnectionFactory) {
  StringRedisTemplate template = new StringRedisTemplate();
  template.setConnectionFactory(redisConnectionFactory);
  return template;
}


🚀🚀🚀 导入依赖


<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>


🚀🚀🚀 yaml文件


spring:
 redis:
  host: 127.0.0.1
  port: 6379
  password:
  jedis:
   pool:
    max-active: 8
    max-wait: -1ms
    max-idle: 500
    min-idle: 0
  lettuce:
   shutdown-timeout: 0ms


🚀🚀🚀 测试类


//连接redis
//    RedisConnection connection =
redisTemplate.getConnectionFactory().getConnection();
//    connection.flushAll();
//    connection.flushAll();
//redisTemplate 操作不同的数据类型,api和我们的指令是一样的
//opsForValue 操作字符串 类似String
//opsForList 操作list 类似List
redisTemplate.opsForValue().set("myKey","myValue");
System.out.println(redisTemplate.opsForValue().get("myKey"));


🌈 5.2 RedisConfig


RedisTemplate,固定模板,可以直接使用


@Configuration
public class RedisConfig {
  @Bean
  @SuppressWarnings("all")
  public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory
factory) {
    RedisTemplate<String, Object> template = new RedisTemplate<String,
Object>();
    template.setConnectionFactory(factory);
    Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new
Jackson2JsonRedisSerializer(Object.class);
    ObjectMapper om = new ObjectMapper();
    om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
    om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
    jackson2JsonRedisSerializer.setObjectMapper(om);
    StringRedisSerializer stringRedisSerializer = new
StringRedisSerializer();
    // key采用String的序列化方式
    template.setKeySerializer(stringRedisSerializer);
    // hash的key也采用String的序列化方式
    template.setHashKeySerializer(stringRedisSerializer);
// value序列化方式采用jackson
    template.setValueSerializer(jackson2JsonRedisSerializer);
    // hash的value序列化方式采用jackson
    template.setHashValueSerializer(jackson2JsonRedisSerializer);
    template.afterPropertiesSet();
    return template;
 }
}


🌈 5.3 RedisUtil


在企业开发中80%都不会使用原生的方式去编写代码,使用RedisUtil

public class RedisUtil {
  @Autowired
  private RedisTemplate<String, Object> redisTemplate;
  // =============================common============================
  /**
  * 指定缓存失效时间
  * @param key 键
  * @param time 时间(秒)
  */
  public boolean expire(String key, long time) {
    try {
      if (time > 0) {
        redisTemplate.expire(key, time, TimeUnit.SECONDS);
     }
      return true;
   } catch (Exception e) {
      e.printStackTrace();
      return false;
   }
 }
  /**
  * 根据key 获取过期时间
  * @param key 键 不能为null
  * @return 时间(秒) 返回0代表为永久有效
  */
  public long getExpire(String key) {
    return redisTemplate.getExpire(key, TimeUnit.SECONDS);
 }
  /**
  * 判断key是否存在
  * @param key 键
  * @return true 存在 false不存在
  */
  public boolean hasKey(String key) {
    try {
      return redisTemplate.hasKey(key);
   } catch (Exception e) {
e.printStackTrace();
      return false;
   }
 }
  /**
  * 删除缓存
  * @param key 可以传一个值 或多个
  */
  @SuppressWarnings("unchecked")
  public void del(String... key) {
    if (key != null && key.length > 0) {
      if (key.length == 1) {
        redisTemplate.delete(key[0]);
     } else {
        redisTemplate.delete((Collection<String>)
CollectionUtils.arrayToList(key));
     }
   }
 }
  // ============================String=============================
  /**
  * 普通缓存获取
  * @param key 键
  * @return 值
  */
  public Object get(String key) {
    return key == null ? null : redisTemplate.opsForValue().get(key);
 }
  /**
  * 普通缓存放入
  * @param key  键
  * @param value 值
  * @return true成功 false失败
  */
  public boolean set(String key, Object value) {
    try {
      redisTemplate.opsForValue().set(key, value);
      return true;
   } catch (Exception e) {
      e.printStackTrace();
      return false;
   }
 }
  /**
  * 普通缓存放入并设置时间
  * @param key  键
  * @param value 值
  * @param time 时间(秒) time要大于0 如果time小于等于0 将设置无限期
  * @return true成功 false 失败
*/
  public boolean set(String key, Object value, long time) {
    try {
      if (time > 0) {
        redisTemplate.opsForValue().set(key, value, time,
TimeUnit.SECONDS);
     } else {
        set(key, value);
     }
      return true;
   } catch (Exception e) {
      e.printStackTrace();
      return false;
   }
 }
  /**
  * 递增
  * @param key  键
  * @param delta 要增加几(大于0)
  */
  public long incr(String key, long delta) {
    if (delta < 0) {
      throw new RuntimeException("递增因子必须大于0");
   }
    return redisTemplate.opsForValue().increment(key, delta);
 }
  /**
  * 递减
  * @param key  键
  * @param delta 要减少几(小于0)
  */
  public long decr(String key, long delta) {
    if (delta < 0) {
      throw new RuntimeException("递减因子必须大于0");
   }
    return redisTemplate.opsForValue().increment(key, -delta);
 }
  // ================================Map=================================
  /**
  * HashGet
  * @param key 键 不能为null
  * @param item 项 不能为null
  */
  public Object hget(String key, String item) {
    return redisTemplate.opsForHash().get(key, item);
 }
  /**
  * 获取hashKey对应的所有键值
  * @param key 键
* @return 对应的多个键值
  */
  public Map<Object, Object> hmget(String key) {
    return redisTemplate.opsForHash().entries(key);
 }
  /**
  * HashSet
  * @param key 键
  * @param map 对应多个键值
  */
  public boolean hmset(String key, Map<String, Object> map) {
    try {
      redisTemplate.opsForHash().putAll(key, map);
      return true;
   } catch (Exception e) {
      e.printStackTrace();
      return false;
   }
 }
  /**
  * HashSet 并设置时间
  * @param key 键
  * @param map 对应多个键值
  * @param time 时间(秒)
  * @return true成功 false失败
  */
  public boolean hmset(String key, Map<String, Object> map, long time) {
    try {
      redisTemplate.opsForHash().putAll(key, map);
      if (time > 0) {
        expire(key, time);
     }
      return true;
   } catch (Exception e) {
      e.printStackTrace();
      return false;
   }
 }
  /**
  * 向一张hash表中放入数据,如果不存在将创建
  *
  * @param key  键
  * @param item 项
  * @param value 值
  * @return true 成功 false失败
  */
  public boolean hset(String key, String item, Object value) {
    try {
      redisTemplate.opsForHash().put(key, item, value);
      return true;
   } catch (Exception e) {
      e.printStackTrace();
      return false;
 }
 }
  /**
  * 向一张hash表中放入数据,如果不存在将创建
  *
  * @param key  键
  * @param item 项
  * @param value 值
  * @param time 时间(秒) 注意:如果已存在的hash表有时间,这里将会替换原有的时间
  * @return true 成功 false失败
  */
  public boolean hset(String key, String item, Object value, long time) {
    try {
      redisTemplate.opsForHash().put(key, item, value);
      if (time > 0) {
        expire(key, time);
     }
      return true;
   } catch (Exception e) {
      e.printStackTrace();
      return false;
   }
 }
  /**
  * 删除hash表中的值
  *
  * @param key 键 不能为null
  * @param item 项 可以使多个 不能为null
  */
  public void hdel(String key, Object... item) {
    redisTemplate.opsForHash().delete(key, item);
 }
  /**
  * 判断hash表中是否有该项的值
  *
  * @param key 键 不能为null
  * @param item 项 不能为null
  * @return true 存在 false不存在
  */
  public boolean hHasKey(String key, String item) {
    return redisTemplate.opsForHash().hasKey(key, item);
 }
  /**
  * hash递增 如果不存在,就会创建一个 并把新增后的值返回
  *
  * @param key 键
  * @param item 项
  * @param by  要增加几(大于0)
  */
  public double hincr(String key, String item, double by) {
    return redisTemplate.opsForHash().increment(key, item, by);
}
  /**
  * hash递减
  *
  * @param key 键
  * @param item 项
  * @param by  要减少记(小于0)
  */
  public double hdecr(String key, String item, double by) {
    return redisTemplate.opsForHash().increment(key, item, -by);
 }
  // ============================set=============================
  /**
  * 根据key获取Set中的所有值
  * @param key 键
  */
  public Set<Object> sGet(String key) {
    try {
      return redisTemplate.opsForSet().members(key);
   } catch (Exception e) {
      e.printStackTrace();
      return null;
   }
 }
  /**
  * 根据value从一个set中查询,是否存在
  *
  * @param key  键
  * @param value 值
  * @return true 存在 false不存在
  */
  public boolean sHasKey(String key, Object value) {
    try {
      return redisTemplate.opsForSet().isMember(key, value);
   } catch (Exception e) {
      e.printStackTrace();
      return false;
   }
 }
  /**
  * 将数据放入set缓存
  *
  * @param key  键
  * @param values 值 可以是多个
  * @return 成功个数
  */
  public long sSet(String key, Object... values) {
    try {
      return redisTemplate.opsForSet().add(key, values);
} catch (Exception e) {
      e.printStackTrace();
      return 0;
   }
 }
  /**
  * 将set数据放入缓存
  *
  * @param key  键
  * @param time  时间(秒)
  * @param values 值 可以是多个
  * @return 成功个数
  */
  public long sSetAndTime(String key, long time, Object... values) {
    try {
      Long count = redisTemplate.opsForSet().add(key, values);
      if (time > 0)
        expire(key, time);
      return count;
   } catch (Exception e) {
      e.printStackTrace();
      return 0;
   }
 }
  /**
  * 获取set缓存的长度
  *
  * @param key 键
  */
  public long sGetSetSize(String key) {
    try {
      return redisTemplate.opsForSet().size(key);
   } catch (Exception e) {
      e.printStackTrace();
      return 0;
   }
 }
  /**
  * 移除值为value的
  *
  * @param key  键
  * @param values 值 可以是多个
  * @return 移除的个数
  */
  public long setRemove(String key, Object... values) {
    try {
      Long count = redisTemplate.opsForSet().remove(key, values);
      return count;
   } catch (Exception e) {
      e.printStackTrace();
      return 0;
 }
 }
  // ===============================list=================================
  /**
  * 获取list缓存的内容
  *
  * @param key  键
  * @param start 开始
  * @param end  结束 0 到 -1代表所有值
  */
  public List<Object> lGet(String key, long start, long end) {
    try {
      return redisTemplate.opsForList().range(key, start, end);
   } catch (Exception e) {
      e.printStackTrace();
      return null;
   }
 }
  /**
  * 获取list缓存的长度
  *
  * @param key 键
  */
  public long lGetListSize(String key) {
    try {
      return redisTemplate.opsForList().size(key);
   } catch (Exception e) {
      e.printStackTrace();
      return 0;
   }
 }
  /**
  * 通过索引 获取list中的值
  *
  * @param key  键
  * @param index 索引 index>=0时, 0 表头,1 第二个元素,依次类推;index<0时,-1,表
尾,-2倒数第二个元素,依次类推
  */
  public Object lGetIndex(String key, long index) {
    try {
      return redisTemplate.opsForList().index(key, index);
   } catch (Exception e) {
      e.printStackTrace();
      return null;
   }
 }
  /**
  * 将list放入缓存
  *
  * @param key  键
* @param value 值
  */
  public boolean lSet(String key, Object value) {
    try {
      redisTemplate.opsForList().rightPush(key, value);
      return true;
   } catch (Exception e) {
      e.printStackTrace();
      return false;
   }
 }
  /**
  * 将list放入缓存
  * @param key  键
  * @param value 值
  * @param time 时间(秒)
  */
  public boolean lSet(String key, Object value, long time) {
    try {
      redisTemplate.opsForList().rightPush(key, value);
      if (time > 0)
        expire(key, time);
      return true;
   } catch (Exception e) {
      e.printStackTrace();
      return false;
   }
 }
  /**
  * 将list放入缓存
  *
  * @param key  键
  * @param value 值
  * @return
  */
  public boolean lSet(String key, List<Object> value) {
    try {
      redisTemplate.opsForList().rightPushAll(key, value);
      return true;
   } catch (Exception e) {
      e.printStackTrace();
      return false;
   }
 }
  /**
  * 将list放入缓存
  *
  * @param key  键
  * @param value 值
  * @param time 时间(秒)
* @return
  */
  public boolean lSet(String key, List<Object> value, long time) {
    try {
      redisTemplate.opsForList().rightPushAll(key, value);
      if (time > 0)
        expire(key, time);
      return true;
   } catch (Exception e) {
      e.printStackTrace();
      return false;
   }
 }
  /**
  * 根据索引修改list中的某条数据
  *
  * @param key  键
  * @param index 索引
  * @param value 值
  * @return
  */
  public boolean lUpdateIndex(String key, long index, Object value) {
    try {
      redisTemplate.opsForList().set(key, index, value);
      return true;
   } catch (Exception e) {
      e.printStackTrace();
      return false;
   }
 }
  /**
  * 移除N个值为value
  *
  * @param key  键
  * @param count 移除多少个
  * @param value 值
  * @return 移除的个数
  */
  public long lRemove(String key, long count, Object value) {
    try {
      Long remove = redisTemplate.opsForList().remove(key, count, value);
      return remove;
   } catch (Exception e) {
      e.printStackTrace();
      return 0;
   }
 }
}

🐴 六、redis.conf详解



单位,大小写不敏感


# Note on units: when memory size is needed, it is possible to specify
# it in the usual form of 1k 5GB 4M and so forth:
#
# 1k => 1000 bytes
# 1kb => 1024 bytes
# 1m => 1000000 bytes
# 1mb => 1024*1024 bytes
# 1g => 1000000000 bytes
# 1gb => 1024*1024*1024 bytes
#
# units are case insensitive so 1GB 1Gb 1gB are all the same.


包含


# include /path/to/local.conf
# include /path/to/other.conf


网络


#绑定IP
bind 127.0.0.1
#保护模式
protected-mode yes
#端口设置
port 6379


通用配置


#以守护进程的方式运行,默认是no,我们需要自己开启为yes
daemonize yes
#如果是以后台的方式运行,我们需要自己开启一个pid文件
pidfile /var/run/redis_6379.pid
#日志
# Specify the server verbosity level.
# This can be one of:
# debug (a lot of information, useful for development/testing)
# verbose (many rarely useful info, but not a mess like the debug level)
# notice (moderately verbose, what you want in production probably) 生产环境
# warning (only very important / critical messages are logged)
loglevel notice
#日志的文件位置名
logfile ""
#数据库的数量,默认是16的数据库
databases 16
#总是显示Logo
always-show-logo yes


快照,持久化:在指定时间内,执行了多少次操作,则会持久化到文件.rdb.aof

redis是内存数据库,如果没有持久化,那么数据断电即

#如果900s内,至少有1个key进行了修改,我们及进行持久化操作
save 900 1
#300,至少有10个key进行了修改,我们及进行持久化操作
save 300 10
#60,至少有10000个key进行了修改,我们及进行持久化操作
save 60 10000
#持久化如果出错,是否还需要继续工作
stop-writes-on-bgsave-error yes
#是否压缩rdb文件,需要消耗一些cpu资源
rdbcompression yes
#保存rdb文件的时候是否进行错误的检查校验
rdbchecksum yes
#rdb文件保存的目录
dir ./


SECURITY 安全


#查看和设置redis密码
127.0.0.1:6379> config get requirepass
1) "requirepass"
2) ""
127.0.0.1:6379> config set requirepass "1234"
OK
#登录redis
127.0.0.1:6379> auth 1234
OK



CLIENTS 设置客户端


#设置能够连接redis的客户端登录数量
maxclients 10000
#redis配置最大内存
maxmemory <bytes>
#内存到达上限后的处理策略
maxmemory-policy noeviction
1、volatile-lru:只对设置了过期时间的key进行LRU(默认值)
2、allkeys-lru : 删除lru算法的key 
3、volatile-random:随机删除即将过期key 
4、allkeys-random:随机删除 
5、volatile-ttl : 删除即将过期的 
6、noeviction : 永不过期,返回错误


APPEND ONLY MODE aof配

#默认是不开启aof的,默认是使用rdb方式持久化的,在大部分的情况下rdb完全够用
appendonly no
#持久化文件的名字
appendfilename "appendonly.aof"
# appendfsync always #每次修改都会sync,消耗性能
appendfsync everysec #每秒执行一次sync,可能会对视着1s的数据
# appendfsync no #不执行sunc,这个时候操作系统会自己同步数据,速度最快


🐴 七、Redis持久化



🌈 7.1 RDB


什么是RDB

80427c78e44c4fed81e8c2ad63511d1d.png


在指定的时间间隔内将内存中的数据集快照写入磁盘,也就是行话讲的Snapshot快照,它恢复时是将快照文件直接读到内存里。Redis会单独创建(fork)一个子进程来进行持久化,会先将数据写入到一个临时文件中,待持久化过程都结束了,再用这个临时文 件替换.上次持久化好的文件。整个过程中,主进程是不进行任何I0操作的。这就确保了极高的性能。如果需要进行大规模数据的恢复,且对于数据恢复的完整性不是非常敏感,那RDB方式要比AOF方式更加的高效。RDB的缺点是最后一次持久化后的数据可能丟失。

RDB持久化是把当前进程数据生成快照保存到硬盘的过程,触发RDB持久化过程分为手动触发和自动触

发rdb保存的文件时dump.rdb 都是在我们配置文件快照中进行配置的


🚀🚀🚀 触发机制


  1. save的规则满足的情况下,会自动触发rdb规则
  2. 执行flushall命令,也会触发
  3. 退出redis


3ec8ee84b9ed44c5a8a815c89f1ad46a.png


🚀🚀🚀 如何恢复rdb文件


1. 将dump.rdb放在我们的redis启动目录下就可以,redis启动的时候会自动检查dump.rdb恢复数据
2. 查看需要存在的位置
127.0.0.1:6379> config get dir
1) "dir"
2) "/usr/local/bin"


🚀🚀🚀 优缺点


优点:
1. 适合大规模的数恢复
2. 对数据的完整性要求不高
缺点:
1. 需要一定的时间间隔进程操作,如果redis意外宕机了,这个最后一场修改数据就没有了
2. fork进程的时候会占用一定的内容空间


🌈 7.2 AOF(Append Only File)


将我们的所有命令记录下来,恢复的时候就把这个文件里面的命令全部执行一遍


84ba44abaff842058f3308cd08e33af7.png


以日志的形式来记录每个写操作,将Redis执行过得所有指令记录下来(读操作不记录),只追加文件但不可以改写文件,redis启动之初读取该文件重新构建数据库,即redis重启的话根据日志文件的内容将写指令从前到后执行一次完成数据库的恢复工作


🚀🚀🚀 优缺点

优点
1. 每一次修改都要同步,文件的完整性更好
2. 每秒同步一次,可能会丢失这一秒的数据
3. 从不同步,效率最高
缺点
1. 相对于数据文件,AOF远远大于RDB,修复速度也比rdb慢
2. AOF效率也比RDB慢


🐴 八、发布和订阅消息



使用 subscribe 频道名 进行订阅监听

使用 publish 频道名 发布的内容 进行发布消息广播;


🚀🚀🚀 订阅监听


127.0.0.1:6379> subscribe wangqin
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "wangqin"
3) (integer) 1
1) "message"
2) "wangqin"
3) "hello, qin"


🚀🚀🚀 发布消息广播


127.0.0.1:6379> publish wangqin "hello, qin"
(integer) 1


🐴 九、主从复制



🌈 9.1 主从复制


主从复制,是指将一台Redis服务器的数据 ,复制到其他的Redis服务器I前者称为主节点(master/leader),

后者称为从节点(slave/follower)数据的复制是单向的,只能由主节点到从节点,Master以写为主, Slave

以读为主。

默认情况下,每台Redis服务器都是主节点,且一个主节点可以有多个从节点(或没有从节点) ,但一个从节

点只能有一一个主节点。 主从复制的作用主要包括:


数据冗余:主从复制实现了数据的热备份,是持久化之外的一种数据冗余方式。

故障恢复:当主节点出现问题时,可以由从节点提供服务,实现快速的故障恢复;实际上是一种服务的

冗余。

负载均衡:在主从复制的基础上,配合读写分离,可以由主节点提供写服务,由从节点提供读服务(即写

Redis数据时应用连接主节点,读Redis数据时应用连接从节点) , 分担服务器负载;尤其是在写少读多

的场景下,通过多个从节点分担读负载,可以大大提高Redis服务器的并发量。

高可用基石:除了.上述作用以外,主从复制还是哨兵和集群能够实施的基础,因此说主从复制是Redis

高可用的基础。

一般来说,要将Redis运用于工程项目中,只使用一台Redis是万万不能的,原因如下:

从结构上,单个Redis服务器会发生单点故障,并且一台服务器需要处理所有的请求负载,压力较大;

从容量上,单个Redis服务器内存容量有限,就算一台Redis服务器内存容量为256G ,也不能将所有内

存用作Redis存储内存, 一般来说 ,单台Redis最大使用内存不应该超过20G。

电商网站上的商品,一般都是一次上传,无数次浏览的,说专业点也就是"多读少写"


573ac12eb067425893d3373be99ea657.png

[root@qin bin]# redis-cli -p 6380
127.0.0.1:6380> info replication
# Replication
role:master  #主机
connected_slaves:2  #从机两个 6379和6380
slave0:ip=127.0.0.1,port=6379,state=online,offset=266,lag=1
slave1:ip=127.0.0.1,port=6381,state=online,offset=266,lag=1
master_replid:ee9893cc165118d4639a473c3db9bb55b7e29248
master_replid2:0000000000000000000000000000000000000000
master_repl_offset:266
second_repl_offset:-1
repl_backlog_active:1
repl_backlog_size:1048576
repl_backlog_first_byte_offset:1
repl_backlog_histlen:266

🚀🚀🚀 主从复制配置过程


在redis-cli应用中配置主从 slaveof 127.0.0.1 6380
这里以主服务器:192.168.238.143 ;从服务器:192.168.238.144 为例;
配置从服务器的 redis.conf ,添加:
配置主机: replicaof 127.0.0.1 6380
密码: masterauth <master-password>
配置从机只能做读操作 replica-read-only yes
如果主机断开连接,从机依旧连接到主机的,但是没有写操作。如果主机回来,从机依旧可以直接获取
到主机写的信息


🚀🚀🚀 复制原理


Slave启动成功连接到master后会发送一个sync同步命令
Master接到命令,启动后台的存盘进程,同时收集所有接收到的用于修改数据集命令,在后台进程执行
完毕之后,master将传送整个数据文件到slave,并完成一次完全同步。
全量复制:而slave服务在接收到数据库文件数据后,将其存盘并加载到内存中。
[root@qin bin]# redis-cli -p 6380
127.0.0.1:6380> info replication
# Replication
role:master  #主机
connected_slaves:2  #从机两个 6379和6380
slave0:ip=127.0.0.1,port=6379,state=online,offset=266,lag=1
slave1:ip=127.0.0.1,port=6381,state=online,offset=266,lag=1
master_replid:ee9893cc165118d4639a473c3db9bb55b7e29248
master_replid2:0000000000000000000000000000000000000000
master_repl_offset:266
second_repl_offset:-1
repl_backlog_active:1
repl_backlog_size:1048576
repl_backlog_first_byte_offset:1
repl_backlog_histlen:266
增量复制:Master继续将新的所有收集到的修改命令依次传给slave,完成同步
但是只要是重新连接master,一次完全同步(全量复制)将被自动执行


🚀🚀🚀 层层链


7d64923ff18b4f3689060ae03acd9054.png


🌈 9.2 哨兵模式


假设主服务器宕机,哨兵1先检测到这个结果,系统并不会马上进行failover过程,仅仅是哨兵1主观的认为主服务器不可用,这个现象成为主观下线。当后面的哨兵也检测到主服务器不可用,并且数量达到一定值时,那么哨兵之间就会进行一次投票,投票的结果由一个哨兵发起,进行failover[故障转移]操作。切换成功后,就会通过发布订阅模式,让各个哨兵把自己监控的从服务器实现切换主机,这个过程称为客观下线。

942c01da6aa145d3872175ba4255b4aa.png

主要是在 Redis 2.0 中的,用于对主服务器进行监控

功能一:监控主数据库和从数据库是否正常运行;

功能二:主数据库出现故障时候,可以自动将从数据库转换为主数据库,实现自动切换;

实现步骤:

修改从服务器的 sentinel.conf

sentinel monitor myredis 192.168.238.143 6379 1 # 主数据库名称、IP、端口、

投票选举次数

sentinel down-after-milliseconds myredis 5000 #默认 1 秒检查一次,这里配置超

时 5000 毫秒为宕机;

sentinel failover-timeout myredis 900000

sentinel parallel-syncs myredis 2

sentinel can-failover myredis yes



🚀🚀🚀 启动 sentinel 哨兵


[root@qin bin]# redis-sentinel qconfig/sentinel.conf
72466:X 08 Dec 2020 02:41:53.764 # oO0OoO0OoO0Oo Redis is starting
oO0OoO0OoO0Oo
72466:X 08 Dec 2020 02:41:53.764 # Redis version=5.0.7, bits=64,
commit=00000000, modified=0, pid=72466, just started
72466:X 08 Dec 2020 02:41:53.764 # Configuration loaded
       _._                         
     _.-``__ ''-._                      
  _.-``   `. `_.  ''-._      Redis 5.0.7 (00000000/0) 64 bit
.-`` .-```. ```\/  _.,_ ''-._                 
(   '   ,    .-` | `,  )   Running in sentinel mode
|`-._`-...-` __...-.``-._|'` _.-'|   Port: 26379
|   `-._  `._  /   _.-'  |   PID: 72466
 `-._  `-._  `-./ _.-'  _.-'                 
|`-._`-._   `-.__.-'  _.-'_.-'|                 
|   `-._`-._    _.-'_.-'  |      http://redis.io    
 `-._  `-._`-.__.-'_.-'  _.-'                 
|`-._`-._   `-.__.-'  _.-'_.-'|                 
|   `-._`-._    _.-'_.-'  |                 
 `-._  `-._`-.__.-'_.-'  _.-'                 
   `-._  `-.__.-'  _.-'                   
     `-._    _.-'                     
       `-.__.-'                       
72466:X 08 Dec 2020 02:41:53.765 # WARNING: The TCP backlog setting of 511
cannot be enforced because /proc/sys/net/core/somaxconn is set to the lower
value of 128.
72466:X 08 Dec 2020 02:41:53.765 # Sentinel ID is
df5d73e7709db2ce099fa9b7432a58281b982f48
72466:X 08 Dec 2020 02:41:53.765 # +monitor master myredis 127.0.0.1 6380
quorum 1



若主机断开了,哨兵会选举一个从机作为主机。如果主机后来回来了,只能归并到新的主机下当做

从机。

优点:


哨兵集群,基于主从复制模式,所有的主从配置优点,它全有

主从可以切换 ,故障可以转移,系统的可用性就会更好

哨兵模式就是主从模式的升级,手动到自动,更加健壮。

缺点:

Redis 不好啊在线扩容的,集群容量一旦到达上限,在线扩容就十分麻烦!

实现哨兵模式的配置其实是很麻烦的,里面有很多选择

397cd96798264411898564572ef68113.png


假设主服务器宕机,哨兵1先检测到这个结果,系统并不会马上进行failover故障转移过程,仅仅是哨兵1

主观的认为主服务器不可用,这个现象成为主观下线。当后面的哨兵也检测到主服务器不可用,并且数

量达到一定值时,那么哨兵之间就会进行一次投票,投票的结果由一个哨兵发起,进行failover[故障转

移]操作。切换成功后,就会通过发布订阅模式,让各个哨兵把自己监控的从服务器实现切换主机,这个

过程称为客观下线。


🚀🚀🚀 配置文件


# Example sentinel.conf 
# 哨兵sentinel实例运行的端口 默认26379 
port 26379 
# 哨兵sentinel的工作目录 
dir /tmp 
# 哨兵sentinel监控的redis主节点的 ip port 
# master-name 可以自己命名的主节点名字 只能由字母A-z、数字0-9 、这三个字符".-_"组成。 
# quorum 当这些quorum个数sentinel哨兵认为master主节点失联 那么这时 客观上认为主节点失联了
# sentinel monitor <master-name> <ip> <redis-port> <quorum> 
sentinel monitor mymaster 127.0.0.1 6379 2 
# 当在Redis实例中开启了requirepass foobared 授权密码 这样所有连接Redis实例的客户端都要提供
密码 
# 设置哨兵sentinel 连接主从的密码 注意必须为主从设置一样的验证密码 
# sentinel auth-pass <master-name> <password> 
sentinel auth-pass mymaster MySUPER--secret-0123password 
# 指定多少毫秒之后 主节点没有应答哨兵sentinel 此时 哨兵主观上认为主节点下线 默认30秒 
# sentinel down-after-milliseconds <master-name> <milliseconds> 
sentinel down-after-milliseconds mymaster 30000 
# 这个配置项指定了在发生failover主备切换时最多可以有多少个slave同时对新的master进行 同步, 
这个数字越小,完成failover所需的时间就越长, 
但是如果这个数字越大,就意味着越 多的slave因为replication而不可用。 
可以通过将这个值设为 1 来保证每次只有一个slave 处于不能处理命令请求的状态。 
# sentinel parallel-syncs <master-name> <numslaves>
sentinel parallel-syncs mymaster 1 
# 故障转移的超时时间 failover-timeout 可以用在以下这些方面: 
#1. 同一个sentinel对同一个master两次failover之间的间隔时间。 
#2. 当一个slave从一个错误的master那里同步数据开始计算时间。直到slave被纠正为向正确的master那
里同步数据时。 
#3.当想要取消一个正在进行的failover所需要的时间。  
#4.当进行failover时,配置所有slaves指向新的master所需的最大时间。不过,即使过了这个超时,
slaves依然会被正确配置为指向master,但是就不按parallel-syncs所配置的规则来了 
# 默认三分钟 
# sentinel failover-timeout <master-name> <milliseconds> 
sentinel failover-timeout mymaster 180000 
# SCRIPTS EXECUTION 
#配置当某一事件发生时所需要执行的脚本,可以通过脚本来通知管理员,例如当系统运行不正常时发邮件通知
相关人员。 
#对于脚本的运行结果有以下规则: 
#若脚本执行后返回1,那么该脚本稍后将会被再次执行,重复次数目前默认为10 
#若脚本执行后返回2,或者比2更高的一个返回值,脚本将不会重复执行。 
#如果脚本在执行过程中由于收到系统中断信号被终止了,则同返回值为1时的行为相同。 
#一个脚本的最大执行时间为60s,如果超过这个时间,脚本将会被一个SIGKILL信号终止,之后重新执行。
#通知型脚本:当sentinel有任何警告级别的事件发生时(比如说redis实例的主观失效和客观失效等等),
将会去调用这个脚本, 
这时这个脚本应该通过邮件,SMS等方式去通知系统管理员关于系统不正常运行的信息。调用该脚本时,将传
给脚本两个参数, 
一个是事件的类型, 
一个是事件的描述。 
如果sentinel.conf配置文件中配置了这个脚本路径,那么必须保证这个脚本存在于这个路径,并且是可执
行的,否则sentinel无法正常启动成功。 
#通知脚本 
# sentinel notification-script <master-name> <script-path> 
sentinel notification-script mymaster /var/redis/notify.sh 
# 客户端重新配置主节点参数脚本 
# 当一个master由于failover而发生改变时,这个脚本将会被调用,通知相关的客户端关于master地址已
经发生改变的信息。 
# 以下参数将会在调用脚本时传给脚本: 
# <master-name> <role> <state> <from-ip> <from-port> <to-ip> <to-port> 
# 目前<state>总是“failover”, 
# <role>是“leader”或者“observer”中的一个。 
# 参数 from-ip, from-port, to-ip, to-port是用来和旧的master和新的master(即旧的slave)通
信的 
# 这个脚本应该是通用的,能被多次调用,不是针对性的。 
# sentinel client-reconfig-script <master-name> <script-path> 
sentinel client-reconfig-script mymaster /var/redis/reconfig.sh  


🐴 十、Redis缓存穿透和雪崩



Redis缓存的使用,极大的提升了应用程序的性能和效率,特别是数据查询方面。但同时,它也带来了一些问题。其中,最要害的 问题,就是数据的一致性问题,从严格意义上讲,这个问题无解。如果对数据的一致性要求很高,那么就不能使用缓存。

另外的一些典型问题就是,缓存穿透、缓存雪崩和缓存击穿。目前,业界也都有比较流行的解决方案。


🌈 10.1 缓存穿透(查不到)


🚀🚀🚀 概念


缓存穿透的概念很简单,用户想要查询一个数据,发现redis内存数据库没有,也就是缓存没有命中,于是向持久层数据库查询。发 现也没有,于是本次查询失败。当用户很多的时候,缓存都没有命中(秒

杀),于是都去请求了持久层数据库。这会给持久层数据库造成很 大的压力,这时候就相当于出现了缓存


🚀🚀🚀 解决方案


布隆过滤器
当存储层不命中后,即使返回的空对象也要将其缓存起来,同时设置一个过期时间,之后再访问这个数
据库将会从缓存中获取,保护了后端数据库
但是这种方法会存在两个问题:
1. 如果空值能够被缓存起来,这就意味着缓存需要更多的空间存储更多的键,因为这当中可能会有很
多的空值的键;
2. 即使对空值设置了过期时间,还是会存在缓存层和存储层的数据会有一段时间窗口的不一致,这对
于需要保持一致性的业务会 有影响。


🌈 10.2 缓存击穿(量太大)

🚀🚀🚀 概述


这里需要注意和缓存击穿的区别,缓存击穿,是指一个key非常热点,在不停的扛着大并发,大并发集中对这一个点进行访问,当这个key在失效的瞬间,持续的大并发就穿破缓存,直接请求数据库,就像在一个屏障上凿开了一个洞。当某个key在过期的瞬间,有大量的请求并发访问,这类数据一般是热点数据,由于缓存过期,会同时访问数据库来查询最新数 据,并且回写缓存,会导使数据库瞬间压力过大。


🚀🚀🚀 解决方案


设置热点数据永不过期从缓存层面来看,没有设置过期时间,所以不会出现热点key过期后产生的问题。加互斥锁

分布式锁:使用分布式锁,保证对于每个key同时只有一一个线程去查询后端服务,其他线程没有获得分布式锁的权限,因此只需要等待即可。这种方式将高并发的压力转移到了分布式锁,因此对分布式锁的考验很大。


🌈 10.3 缓存雪崩

🚀🚀🚀 概念


缓存雪崩,是指在某一个时间段,缓存集中过期失效。Redis宕机!

产生雪崩的原因之一,比如在写本文的时候,马上就要到双十二零点,很快就会迎来一波抢购,这波商

品时间比较集中的放入了缓 存,假设缓存一个小时。那么到了凌晨一点钟的时候,这批商品的缓存就都

过期了。而对这批商品的访问查询,都落到了数据库 上,对于数据库而言,就会产生周期性的压力波

峰。于是所有的请求都会达到存储层,存储层的调用量会暴增,造成存储层也会挂 掉的情况。


🚀🚀🚀 解决方案


redis高可用

这个思想的含义是,既然redis有可能挂掉,那我多增设几台redis ,这样-台挂掉之后其他的还可以继续工作,

其实就是搭建的集 群。(异地多活!|)

限流降级

这个解决方案的思想是,在缓存失效后,通过加锁或者队列来控制读数据库写缓存的线程数量。比如对某个

key只允许一个线程查 询数据和写缓存,其他线程等待。

数据预热.

数据加热的含义就是在正式部署之前,我先把可能的数据先预先访问一遍,这样部分可能大量访问的数据就

会加载到缓存中。在即. 将发生大并发访问前手动触发加载缓存不同的key ,设置不同的过期时间,让缓存

失效的时间点尽量均匀。


🐴 十一、redis 在项目中的应用



🌈 11.1 redis缓存注解


@Cacheable:根据方法对其返回结果进行缓存,下次请求时,如果缓存存在,则直接读取缓存数

据返回;如果缓存不存在,则执行方法,并把返回的结果存入缓存中,一般用在查询方法上。

value:缓存名,必填,它指定了你的缓存存放在哪块命名空间

cacheNames:与 value 差不多,二选一即可

key:可选属性,可以使用 SpEL 标签自定义缓存的key

@CachePut:使用该注解标志的方法,每次都会执行,并将结果存入指定的缓存中。其他方法可

以直接从响应的缓存中读取缓存数据,而不需要再去查询数据库,一般用在新增方法上。

value:缓存名,必填,它指定了你的缓存存放在哪块命名空间

cacheNames:与 value 差不多,二选一即可

key:可选属性,可以使用 SpEL 标签自定义缓存的key

@CacheEvict:使用该注解标志的方法,会清空指定的缓存,一般用在更新或者删除方法上

value:缓存名,必填,它指定了你的缓存存放在哪块命名空间

cacheNames:与 value 差不多,二选一即可

key:可选属性,可以使用 SpEL 标签自定义缓存的key

allEntries:是否清空所有缓存,默认为 false。如果指定为 true,则方法调用后将立即清空所

有的缓存

beforeInvocation:是否在方法执行前就清空,默认为 false。如果指定为 true,则在方法执行

前就会清空缓存


🌈 11.2 环境配置


<!-- redis -->
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- spring2.X集成redis所需common-pool2 -->
<dependency>
  <groupId>org.apache.commons</groupId>
  <artifactId>commons-pool2</artifactId>
  <version>2.6.0</version>
</dependency>


🚀🚀🚀 在service-base模块添加redis配置类


@EnableCaching //开启缓存
@Configuration //配置类
public class RedisConfig extends CachingConfigurerSupport {
  @Bean
  public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory
factory) {
    RedisTemplate<String, Object> template = new RedisTemplate<>();
    RedisSerializer<String> redisSerializer = new StringRedisSerializer();
    Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new
Jackson2JsonRedisSerializer(Object.class);
    ObjectMapper om = new ObjectMapper();
    om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
    om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
    jackson2JsonRedisSerializer.setObjectMapper(om);
    template.setConnectionFactory(factory);
    //key序列化方式
    template.setKeySerializer(redisSerializer);
    //value序列化
    template.setValueSerializer(jackson2JsonRedisSerializer);
    //value hashmap序列化
    template.setHashValueSerializer(jackson2JsonRedisSerializer);
    return template;
 }
  @Bean
  public CacheManager cacheManager(RedisConnectionFactory factory) {
    RedisSerializer<String> redisSerializer = new StringRedisSerializer();
    Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new
Jackson2JsonRedisSerializer(Object.class);
    //解决查询缓存转换异常的问题
    ObjectMapper om = new ObjectMapper();
    om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
    om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
    jackson2JsonRedisSerializer.setObjectMapper(om);
    // 配置序列化(解决乱码的问题),过期时间600秒
    RedisCacheConfiguration config =
RedisCacheConfiguration.defaultCacheConfig()
.entryTtl(Duration.ofSeconds(600))      
.serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(re
disSerializer))     
.serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(
jackson2JsonRedisSerializer)).disableCachingNullValues();
    RedisCacheManager cacheManager = RedisCacheManager.builder(factory)
     .cacheDefaults(config)
     .build();
    return cacheManager;
 }
}


🚀🚀🚀 Linux启动Redis要修改的配置


  1. 关闭Linux防火墙
  2. 配置文件注释:# bind 127.0.0.1
  3. protected-mode no


🌈 11.3 banner接口改造


由于首页数据变化不是很频繁,而且首页访问量相对较大,所以我们有必要把首页接口数据缓存到redis缓存中,减少数据库压力和提高访问速度。

改造service-cms模块首页banner接口,首页课程与讲师接口类似


🚀🚀🚀 properies文件配置,在service-cms模块配置文件添加redis配置


spring.redis.host=120.55.59.163

spring.redis.port=6379

spring.redis.database= 0

spring.redis.timeout=1800000

spring.redis.lettuce.pool.max-active=20

spring.redis.lettuce.pool.max-wait=-1

#最大阻塞等待时间(负数表示没限制)

spring.redis.lettuce.pool.max-idle=5

spring.redis.lettuce.pool.min-idle=0


🚀🚀🚀 修改CrmBannerServiceImpl,添加redis缓存注解


@Service
public class CrmBannerServiceImpl extends ServiceImpl<CrmBannerMapper,
CrmBanner> implements CrmBannerService {
  /**
  * 查询所有banner
  * @return
  */
  @Cacheable(value = "banner",key = "'selectIndexList'")
  @Override
  public List<CrmBanner> selectAllBanner() {
    //根据id进行降序排列,显示排列之后前两条记录
    QueryWrapper<CrmBanner> wrapper = new QueryWrapper<>();
    wrapper.orderByDesc("id");
    //last方法,拼接sql语句
    wrapper.last("limit 2");
    List<CrmBanner> list = baseMapper.selectList(null);
    return list;
 }
}


🚀🚀🚀 在redis添加了key


127.0.0.1:6379> keys *
1) "banner::selectIndexList"
2) "name"
127.0.0.1:6379> get banner::selectIndexList
"[\"java.util.ArrayList\",[[\"com.wq.educms.entity.CrmBanner\",
{\"id\":\"1194556896025845762\",\"title\":\"test1\",\"imageUrl\":\"https://onlin
e-teach-file.oss-cn-beijing.aliyuncs.com/cms/2019/11/14/297acd3b-b592-4cfb-a446-
a28310369675.jpg\",\"linkUrl\":\"/course\",\"sort\":1,\"isDeleted\":false,\"gmtC
reate\":[\"java.util.Date\",1573639532000],\"gmtModified\":
[\"java.util.Date\",1574044102000]}],[\"com.wq.educms.entity.CrmBanner\",
{\"id\":\"1194607458461216769\",\"title\":\"test2\",\"imageUrl\":\"https://onlin
e-teach-file.oss-cn-beijing.aliyuncs.com/cms/2019/11/13/8f80790d-d736-4842-a6a4-
4dcb0d684d4e.jpg\",\"linkUrl\":\"/teacher\",\"sort\":2,\"isDeleted\":false,\"gmt
Create\":[\"java.util.Date\",1573651587000],\"gmtModified\":
[\"java.util.Date\",1573693935000]}]]]"


相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
缓存 监控 NoSQL
Redis高可用总结:Redis主从复制、哨兵集群、脑裂...
在实际的项目中,服务高可用非常重要,如,当Redis作为缓存服务使用时, 缓解数据库的压力,提高数据的访问速度,提高网站的性能 ,但如果使用Redis 是单机模式运行 ,只要一个服务器宕机就不可以提供服务,这样会可能造成服务效率低下,甚至出现其相对应的服务应用不可用。
523 0
Redis高可用总结:Redis主从复制、哨兵集群、脑裂...
|
NoSQL Redis
Redis学习7:按次结算的服务控制、微信会话顺序管理(应用场景总结)
现在数据类型五种基本的已经学完了,现在开始应用一个简单的业务场景。
Redis学习7:按次结算的服务控制、微信会话顺序管理(应用场景总结)
|
存储 SQL NoSQL
新人入坑Redis必会的吐血总结(一)
新人入坑Redis必会的吐血总结
181 0
新人入坑Redis必会的吐血总结(一)
|
缓存 监控 NoSQL
Redis常见面试题总结
Redis常见面试题总结
109 0
Redis常见面试题总结
|
数据采集 分布式计算 NoSQL
爬虫识别-爬虫写入 Redis-效果及总结| 学习笔记
快速学习爬虫识别-爬虫写入 Redis-效果及总结
爬虫识别-爬虫写入 Redis-效果及总结| 学习笔记
|
存储 缓存 NoSQL
浅浅总结Redis
Redis作为开源数据库,为开发者提供了多种语言的API。而Redis应用在实际开发中已经很常见了,不仅能作为缓存存储数据,也由于其键值对存储数据的形式而可以作为持久化数据存储。接下来我们浅谈一下Redis的集群和缓存。
131 0
浅浅总结Redis
|
存储 缓存 监控
redis总结万能手册,熟悉不等于精通;
日常总结:redis线程模型,多路复用原理;单体,哨兵架构,集群架构的自动化安装,解释说明,集群扩容,缩容,选举,主从自动切换策略,应用程序接入……
293 0
redis总结万能手册,熟悉不等于精通;
|
NoSQL Redis
熬夜爆肝总结Liunx环境源码安装Redis
熬夜爆肝总结Liunx环境源码安装Redis
148 0
熬夜爆肝总结Liunx环境源码安装Redis
|
存储 NoSQL 安全
新人入坑Redis必会的吐血总结(二)
新人入坑Redis必会的吐血总结(二)
137 0
|
存储 缓存 负载均衡
Redis cluster去中心化设计的思考与总结
Redis cluster去中心化设计的思考与总结
291 0
下一篇
DataWorks