暂时未有相关云产品技术能力~
1.DMA技术详解(1)应用程序 从 磁盘读写数据 的时序图(未用DMA技术前)(2)什么是DMA 技术 (Direct Memory Access)直接内存访问,直接内存访问是计算机科学中的一种内存访问技术。DMA之前:要把外设的数据读入内存或把内存的数据传送到外设,一般都要通过CPU控制完成,利用中断技术。允许某些硬件系统能够独立于CPU直接读写操作系统的内存,不需要中处理器(CPU)介入处理。数据传输操作在一个DM控制器(DMAC)的控制下进行,在传输过程中CPU可以继续进行其他的工作。在大部分时间CPU和I/O操作都处于并行状态,系统的效率更高。(3)应用程序的读写数据读本地磁盘操作系统检查内存缓冲区读取,如果存在则直接把内核空间的数据copy到用户空间(CPU负责),应用程序即可使用。上步没数据,则从磁盘中读取到内核缓冲(DMA负责),再把内核空间的数据copy到用户空间(CPU负责),应用程序即可使用硬盘->内核缓冲区->用户缓冲区写操作本地磁盘根据操作系统的写入方式不一样,buffer IO 和 direct IO ,写入磁盘时机不一样。buffer IO应用程序把数据从用户空间copy到内核空间的缓冲区(CPU负责),再把内核缓冲区的数据写到磁盘(DMA负责)。direct IO应用程序把数据直接从用户态地址空间写入到磁盘中,直接跳过内核空间缓冲区。减少操作系统缓冲区和用户地址空间的拷贝次数,降低了CPU和内存开销。用户缓冲区->内核缓冲区->硬盘读网络数据网卡Socket(类似磁盘)中读取客户端发送的数据到内核空间(DMA负责)。把内核空间的数据copy到用户空间(CPU负责),然后应用程序即可使用。写网络数据用户缓冲区中的数据copy到内核缓冲区的Socket Buffer 中(CPU负责)将内核空间中的Socket Buffer 拷贝到Socket协议栈(网卡设备)进行传输(DMA负责)(4)DMA的工作总结从磁盘的缓冲区到内核缓冲区的拷贝工作。从网卡设备到内核的socket buffer 的拷贝工作。从内核缓冲区到磁盘缓冲区的拷贝工作。从内核的socket buffer到网卡设备的拷贝工作。注意:内核缓冲区到用户缓冲区之间的拷贝工作仍然由CPU负责(5)DMA技术带来的性能损耗上图应用程序从磁盘读取数据发送到网络上的损耗,程序需要两个命令 先read读取,再write写出四次内核态和用户态的切换四次缓冲区的拷贝(2次DMA拷贝、2次CPU拷贝)读取:磁盘缓冲区到内核缓冲区(DMA)读取:内核缓冲区到用户缓冲区(CPU)写出:用户缓冲区到内核缓冲区Socket Buffer(CPU)写出:内核缓冲区的Socket Buffer到网卡设备(DMA)为了解决这种性能的损耗所以就诞生了零拷贝。2.ZeroCopy零拷贝技术简介(1)什么是零拷贝ZeroCopy 减少不必要的内核缓冲区跟用户缓冲区之间的拷贝工作,从而减少CPU的开销和减少kernel和user模式的上下文切换,达到性能的提升。从磁盘中读取文件通过网络发送出去,只需要拷贝2\3次和2\4的内核态和用户态的切换即可。ZeroCopy技术实现方式有两种(内核态和用户态切换次数不一样)方式一:mmap+write方式二:sendfile(2)ZeroCopy的实现底层 mmap + write操作系统都使用虚拟内存,虚拟地址通过多级页表映射物理地址。多个虚拟内存可以指向同一个物理地址,虚拟内存的总空间远大于物理内存空间。如果把内核空间和用户空间的虚拟地址映射到同一个物理地址,就不需要来回复制数据。mmap系统调用函数会直接把内核缓冲区的数据映射到用户空间,内核空间和用户空间就不需要在进行数据拷贝的操作了,节省了CPU开销。mmap()负责读取,write()负责写出执行流程应用程序先调用mmap()方法,将数据从磁盘拷贝到内核缓冲区,返回结束(DMA负责)。在调用write(),内核缓冲区的数据直接拷贝到内核socket buffer (CPU负责),然后把内核缓冲区的Socket Buffer 给直接拷贝给Socket协议线,即网卡设备中,返回结束(DMA负责)采用mmap之后,CPU用户态和内核态上下文切换依旧是4次和全程有3次数据拷贝2次DMA拷贝、1次CPU拷贝、4次内核态用户态切换,减少了1次CPU拷贝(3)ZeroCopy的实现底层 sendfileLinux kernal 2.1新增发送文件的系统调用函数sendfile()。执行流程替代read()和write()两个系统调用,减少一次系统调用,即减少2次CPU上下文切换的开销,调用sendfile(),从磁盘读取到内核缓冲区,然后直接把内核缓冲区的数据拷贝到socket buffer缓冲区里,再把内核缓冲区的SocketBuffer给直接拷贝给Socket协议栈,即网卡设备中(DMA负责)。采用sendfile后,CPU用户态和内核态上下文切换是2次 和 全程3次的数据拷贝,2次DMA拷贝、1次的CPU拷贝、2次内核态用户态切换。Linux 2.4+ 版本之后改进sendfile,利用DMA Gather(带有收集功能的DMA),变成了真正的零拷贝(没有CPU Copy)应用程序先调用sendfile()方法,将数据从磁盘拷贝到内核缓冲区(DMA负责)把内存地址、偏移量的缓冲区fd描述符拷贝到Socket Buffer中去 拷贝很少的数据,可忽略本质和虚拟内存的解决方法思路一样,就是内存地址的记录然后把内核缓冲区的Socket Buffer给直接拷贝给Socket协议栈 即网卡设备中,返回结束(DMA负责)3.Java和主流中间件里的零拷贝技术(1)Java中有哪些零拷贝技术Java NIO对mmap的实现 fileChannel.map()Java NIO对sendfile的实现 fileChannel.transferTo() 和 fileChannel.transferFrom()(2)什么是FileChannelFileChannel是一个连接到文件的通道,可以通过文件通道读写文件,该常被用于搞笑的网络/文件的数据传输和大文件拷贝应用程序使用FileChannel写完以后,数据是在PageCache上的,操作系统不定时的把PageCache的数据写入到磁盘。为了避免宕机数据丢失,使用channel.force(true) 把文件相关的数据强制刷入磁盘上去。使用之前必须先打开它,但是无法直接new一个FileChannel。常规通过使用一个InputStream、OutputStream或者RandomAccessFile来获取一个FileChannel实例。RandomAccessFile randomAccessFile = new RandomAccessFile("文件路径","rw"); FileChannel inChannel = randomAccessFile.getChannel(); (3)mmap方式实现map方法,把文件映射成内存映射文件MappedByteBuffer,是抽象类也是ByteBuffer的子类,具体实现子类是DirectByteBuffer,可被通道进行读写。一次map大小要限制在2G内,过大map会增加虚拟内存回收和重新分配的压力,直接报错。FileChannel.java中的map对long size 进行了限制,不能大于Integer.MAX_VALUE,否则就报错JDK层做限制是因为底层C++的类型,无符号int类型最大是2^31 -1, 2^31 -1 字节就是 2GB - 1B。MappedByteBuffer map(int mode,long position,long size) position:文件开始位置 size:映射文件区域大小 mode:访问该内存映射文件的方式,READ_ONLY(只读)、READ_WRITE(读写)、PRIVATE(创建一个读写副本) (4)sendfile方式实现fileChannel.transferTo(long postition,long count,WritableByteChannel target)将字节从此通道的文件传输到给定的可写入字节通道。返回值为真实拷贝的size,最大拷贝2G,超出2G的部分将丢弃。position:文件中的位置,从此位置开始传输,必须非负数 count:要传输的最大字节数,必须非负数 target:目标通道 返回:实际已传输的字节数,可能为零 fileChannel.transferFrom(ReadableByteChannel src, long position, long count)将字节从给定的可读取字节通道传输到此通道的文件中对比 从源通道读取并将内容写入此通道的循环语句相比,此方法更高效src:源通道 position:文件中的位置,从此位置开始传输,必须非负数 count:要传输的最大字节数, 必须非负数 返回:实际已传输的字节数,可能为零 transferFrom允许将一个通道连接到另一个通道,不需要在用户态和内核态来回复制,同时通道的内核态数据也无需复制,transferTo只有源为FileChannel才支持transfer这种搞笑的复制方式,其他如SocketChannel都不支持transfer模式。一般可以做FileChannel->FileChannel->FileChannel 和 FileChannel->SocketChannel的transfer零拷贝4.文件IO性能对比实战实现一个文件拷贝,对比不同IO方式性能差异,文件大小 200MB~5GB编码实现:普通java的io流普通java的带buffer的io零拷贝实现之mmap的io零拷贝实现之sendfile的io运行环境准备Linux CentOS7.X安装JDK11 配置全局环境变量 vi /etc/profileJAVA_HOME=/usr/local/jdk11 CLASSPATH=$JAVA_HOME/lib/ PATH=$PATH:$JAVA_HOME/bin export PATH JAVA_HOME CLASSPATH 环境变量立刻生效source /etc/profile查看安装情况 java -version准备1.34G测试文件(1)普通java的io验证public class IOTest { public static void main(String[] args) { String inputFilePath = args[0]; String outputFilePath = args[1]; long start = System.currentTimeMillis(); try ( FileInputStream fis = new FileInputStream(inputFilePath); FileOutputStream fos = new FileOutputStream(outputFilePath) ) { byte[] buf = new byte[1]; while(fis.read(buf) != -1){ fos.write(buf); } } catch (IOException e) { e.printStackTrace(); } long end = System.currentTimeMillis(); System.out.println("普通IO耗时:"+(end-start)); } 测试:java IOTest.java "/usr/local/music.zip" "/usr/local/io-music.zip" (2)普通java的带buffer的iopublic class BufferIOTest { public static void main(String[] args) { String inputFilePath = args[0]; String outputFilePath = args[1]; long start = System.currentTimeMillis(); try ( BufferedInputStream bis = new BufferedInputStream(new FileInputStream(inputFilePath)); BufferedOutputStream bos = new BufferedOutputStream(new FileOutputStream(outputFilePath)) ) { byte[] buf = new byte[1]; while(bis.read(buf) != -1){ bos.write(buf); } } catch (IOException e) { e.printStackTrace(); } long end = System.currentTimeMillis(); System.out.println("Buffer IO耗时:"+(end-start)); } } (3)零拷贝实现之mmap的io一次 map 最大支持2GB,超过2GB会报错public class MmapIOTest { public static void main(String[] args) { String inputFilePathStr = args[0]; String outputFilePathStr = args[1]; long start = System.currentTimeMillis(); try ( FileChannel channelIn = new FileInputStream(inputFilePathStr).getChannel(); FileChannel channelOut = new RandomAccessFile(outputFilePathStr, "rw").getChannel() ) { long size = channelIn.size(); System.out.println("mappedFile:"+size); MappedByteBuffer mbbi = channelIn.map(FileChannel.MapMode.READ_ONLY, 0, size); MappedByteBuffer mbbo = channelOut.map(FileChannel.MapMode.READ_WRITE, 0, size); for (int i = 0; i < size; i++) { byte b = mbbi.get(i); mbbo.put(i,b); } } catch (Exception e) { e.printStackTrace(); } long end = System.currentTimeMillis(); System.out.println("mmap 零拷贝 IO 耗时:"+(end-start)); } } (4)零拷贝实现之sendfile的io最大拷贝2G,超出2G的部分将丢弃,最终拷贝的文件大小只有2GB多点,超过2GB可以考虑多次执行public class SendFileIOTest { public static void main(String[] args) { String inputFilePathStr = args[0]; String outputFilePathStr = args[1]; long start = System.currentTimeMillis(); try ( FileChannel channelIn = new FileInputStream(inputFilePathStr).getChannel(); FileChannel channelOut = new FileOutputStream(outputFilePathStr).getChannel() ) { // 代码一:针对小于2GB的问题,返回值为真实拷贝的size,最大拷贝2G,超出2G的部分将丢弃,最终拷贝文件大小只有2GB //channelIn.transferTo(0,channelIn.size(),channelOut); // 代码二:针对大于2GB的文件 long size = channelIn.size(); for (long left = size;left>0;){ //transferSize所拷贝过去的真实长度,size - left 计算出下次要拷贝的位置 long transferSize = channelIn.transferTo((size - left),left,channelOut); System.out.println("总大小:"+size+",拷贝大小:"+transferSize); //left剩余字节多少 left = left - transferSize; } } catch (Exception e) { e.printStackTrace(); } long end = System.currentTimeMillis(); System.out.println("sendfile 零拷贝 IO 耗时:"+(end-start)); } } (5)测试结果分析1~2GB的文件普通拷贝普通java的io流【慢】3973924秒普通java的带buffer的io【快】33196秒零拷贝零拷贝实现之mmap的io【快】7131秒零拷贝实现之sendfile的io【快】1784秒分析原因之前,我们先来了解一下局部性原理局部性原理:指计算机在执行某个程序时,倾向于使用最近使用的数据 时间局部性:如果程序中的某条指令一旦被执行,则不久的将来该指令可能再次被执行 空间局部性:一旦程序访问了某个存储单元,在不久的将来附近的存储单元也有可能被访问 普通的IO和Buffer IO,为什么带有Buffer的IO要比普通的IO性能高?每次读取数据的时候,系统根据局部性原理,通过DMA会读入更多的数据到内核缓冲区里面 OS根据局部性原理会在一次read(),系统调用过程中预读更多的文件数据缓存在内核IO缓冲区中 当继续访问的文件数据在缓冲区中时便直接拷贝数据到进程缓冲区,避免了再次的抵消磁盘IO操作 OS已经帮减少磁盘IO操作次数,提高了性能 两种零拷贝的方式对比(1)sendfile 无法在调用过程中修改数据,只适用于应用程序不需要对所访问数据进行处理修改情况,适合静态文件传输,MQ的Broker发送消息给消费者。适合大文件传输,2次上下文切换,最少2次数据拷贝。 (2)mmap 在mmap调用可以在应用程序中直接修改Page Cache中的数据,使用的是mmap+write两步。调用比sendfile成本高,但由于传统的拷贝方式,适用于多个线程以只读的方式同时访问同一个文件,mmap机制下多线程共享同一个物理内存空间,节约内存。适合小数据量续写,4次上下文切换,3次数据拷贝。 5.主流中间件中用到的ZeroCopy技术(1)Nginx使用的是sendfile 零拷贝WebServer处理静态页面请求时,是从磁盘中读取网页的内容,因为sendfile不能在应用程序中修改数据,所以最适合静态文件服务器或者是直接转发数据的代理服务器。(2)rocketmq主要是mmap,也有小部分使用sendfilerocketMQ在消息存盘和网络发送使用mmap, 单个CommitLog文件大小默认1GB要在用户进程内处理数据,然后再发送出去的话,用户空间和内核空间的数据传输就是不可避免的(3)Kafka主要是sendfile,也有小部分使用mmapkafka 在客户端和 broker 进行数据传输时,broker 使用 sendfile 系统调用,类似【FileChannel.transferTo】 API,将磁盘文件读到 OS 内核缓冲区后,直接转到 socket buffer 进行网络发送,即 Linux 的 sendfile。中读取网页的内容,因为sendfile不能在应用程序中修改数据,所以最适合静态文件服务器或者是直接转发数据的代理服务器。2)rocketmq主要是mmap,也有小部分使用sendfilerocketMQ在消息存盘和网络发送使用mmap, 单个CommitLog文件大小默认1GB要在用户进程内处理数据,然后再发送出去的话,用户空间和内核空间的数据传输就是不可避免的(3)Kafka主要是sendfile,也有小部分使用mmapkafka 在客户端和 broker 进行数据传输时,broker 使用 sendfile 系统调用,类似【FileChannel.transferTo】 API,将磁盘文件读到 OS 内核缓冲区后,直接转到 socket buffer 进行网络发送,即 Linux 的 sendfile。
1.需求背景很多人都用手机注册一些网站的验证了,比如手机验证码。先填手机号,然后发一条验证码过去,输入验证码,完成验证,注册成功。为了避免自己的网站被刷,增加图形验证码。当然为了防止网站被刷还不止这一种办法。2.Docker急速部署Redis#创建文件夹 mkdir -p /usr/local/data/redis/data docker run -itd --name redis -p 6379:6379 -v /usr/local/data/redis/data:/data redis:6.2.4 --requirepass 123456 3.搭建SpringBoot项目(1)添加依赖 <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.6.7</version> <relativePath/> <!-- lookup parent from repository --> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.20</version> <scope>compile</scope> </dependency> </dependencies> (2)创建启动主类@SpringBootApplication public class KaptchaApplication { public static void main(String[] args) { SpringApplication.run(KaptchaApplication.class, args); } } (3)创建ymlserver: port: 8012 spring: application: name: kaptcha-server (4)启动验证4.开发图形验证码(1)添加Maven依赖 <!--redis客户端--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> <exclusions> <exclusion> <groupId>io.lettuce</groupId> <artifactId>lettuce-core</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> </dependency> <!--kaptcha依赖包--> <dependency> <groupId>com.baomidou</groupId> <artifactId>kaptcha-spring-boot-starter</artifactId> <version>1.1.0</version> </dependency> (2)yml配置Redis # 配置redis redis: # redis类型,jedis和lettuce client-type: jedis host: 192.168.139.101 password: 123456 port: 6379 jedis: pool: # 连接池最大连接数(使用负值表示没有限制) max-active: 100 # 连接池中的最大空闲连接 max-idle: 100 # 连接池中的最小空闲连接 min-idle: 100 # 连接池最大阻塞等待时间(使用负值表示没有限制) max-wait: 60000 (3)配置CaptchaConfig/** * 图形验证码配置类 * @author lixiang */ @Configuration public class CaptchaConfig { /** * 验证码配置 * Kaptcha配置类名 * @return */ @Bean @Qualifier("captchaProducer") public DefaultKaptcha kaptcha() { DefaultKaptcha kaptcha = new DefaultKaptcha(); Properties properties = new Properties(); //properties.setProperty(Constants.KAPTCHA_BORDER, "yes"); //properties.setProperty(Constants.KAPTCHA_BORDER_COLOR, "220,220,220"); //properties.setProperty(Constants.KAPTCHA_TEXTPRODUCER_FONT_COLOR, "38,29,12"); //properties.setProperty(Constants.KAPTCHA_IMAGE_WIDTH, "147"); //properties.setProperty(Constants.KAPTCHA_IMAGE_HEIGHT, "34"); //properties.setProperty(Constants.KAPTCHA_TEXTPRODUCER_FONT_SIZE, "25"); //properties.setProperty(Constants.KAPTCHA_SESSION_KEY, "code"); //验证码个数 properties.setProperty(Constants.KAPTCHA_TEXTPRODUCER_CHAR_LENGTH, "4"); //properties.setProperty(Constants.KAPTCHA_TEXTPRODUCER_FONT_NAMES, "Courier"); //字体间隔 properties.setProperty(Constants.KAPTCHA_TEXTPRODUCER_CHAR_SPACE,"8"); //干扰线颜色 //properties.setProperty(Constants.KAPTCHA_NOISE_COLOR, "white"); //干扰实现类 properties.setProperty(Constants.KAPTCHA_NOISE_IMPL, "com.google.code.kaptcha.impl.NoNoise"); //图片样式 properties.setProperty(Constants.KAPTCHA_OBSCURIFICATOR_IMPL, "com.google.code.kaptcha.impl.WaterRipple"); //文字来源 properties.setProperty(Constants.KAPTCHA_TEXTPRODUCER_CHAR_STRING, "0123456789"); Config config = new Config(properties); kaptcha.setConfig(config); return kaptcha; } } (4)配置RedisTemplate/** * redisTemplate配置类 * @author lixiang */ @Configuration public class RedisTemplateConfiguration { @Bean public RedisTemplate<Object,Object> redisTemplate(RedisConnectionFactory redisConnectionFactory){ RedisTemplate<Object,Object> redisTemplate = new RedisTemplate<>(); redisTemplate.setConnectionFactory(redisConnectionFactory); //配置序列化规则 Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class); ObjectMapper objectMapper = new ObjectMapper(); objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); jackson2JsonRedisSerializer.setObjectMapper(objectMapper); //设置key-value的序列化规则 redisTemplate.setHashKeySerializer(new StringRedisSerializer()); redisTemplate.setHashKeySerializer(jackson2JsonRedisSerializer); redisTemplate.setKeySerializer(new StringRedisSerializer()); redisTemplate.setValueSerializer(jackson2JsonRedisSerializer); return redisTemplate; } } (5)编写Controller测试@RestController @RequestMapping("/kaptcha") @Slf4j public class KaptchaController { @Autowired private StringRedisTemplate redisTemplate; @Autowired private Producer captchaProducer; /** * 设置过期时间10min */ private static final long CAPTCHA_CODE_EXPIRED = 60 * 1000; /** * 获取图形验证码 * @param request * @param response */ @GetMapping("/get_captcha") public void getCaptcha(HttpServletRequest request, HttpServletResponse response){ //创建验证码内容 String captchaText = captchaProducer.createText(); log.info("图形验证码内容为:{}",captchaText); redisTemplate.opsForValue().set(getCaptchaKey(request),captchaText,CAPTCHA_CODE_EXPIRED, TimeUnit.MILLISECONDS); //生成图片 BufferedImage captchaImage = captchaProducer.createImage(captchaText); try(ServletOutputStream outputStream = response.getOutputStream()){ ImageIO.write(captchaImage,"jpg",outputStream); }catch (IOException e){ log.error("获取流出错:"+e.getMessage()); } } /** * 获取图形验证码的缓存key * @param request * @return */ private String getCaptchaKey(HttpServletRequest request) { String userAgent = request.getHeader("User-Agent"); return "account:captcha:"+ userAgent; } }
1.ThreadLocal简介多线程访问同一个共享变量的时候容易出现并发问题,特别是多个线程对一个变量进行写入的时候,为了保证线程安全,一般使用者在访问共享变量的时候需要进行额外的同步措施才能保证线程安全性。ThreadLocal是除了加锁这种同步方式之外的一种保证一种规避多线程访问出现线程不安全的方法,当我们在创建一个变量后,如果每个线程对其进行访问的时候访问的都是线程自己的变量这样就不会存在线程不安全问题。ThreadLocal是JDK包提供的,它提供线程本地变量,如果创建一个ThreadLocal变量,那么访问这个变量的每个线程都会有这个变量的一个副本,在实际多线程操作的时候,操作的是自己本地内存中的变量,从而规避了线程安全问题。2.ThreadLocal的简单使用启用两个线程分别对自己工作空间的num进行++public class ThreadLocalDemo { private ThreadLocal<Integer> threadLocal = ThreadLocal.withInitial(()->0); public void add(){ threadLocal.set(threadLocal.get()+1); } public Integer get(){ return threadLocal.get(); } public static void main(String[] args) { ThreadLocalDemo threadLocalDemo = new ThreadLocalDemo(); for (int i = 0; i < 2; i++) { new Thread(()->{ while (true){ try { Thread.sleep(5000L); } catch (InterruptedException e) { e.printStackTrace(); } threadLocalDemo.add(); System.out.println(Thread.currentThread().getName()+":线程当前获取num值为:"+threadLocalDemo.get()); } }).start(); } } } 运行结果3.ThreadLocal的实现原理下面是ThreadLocal的类图结构,从图中可知:Thread类中有两个变量threadlocals和inheritableThreadLocals,二者都是ThreadLocal内部类ThreadLocalMap类型变量,我们通过查看内部类ThreadLocalMap可以发现实际上它类似于一个HashMap。在默认情况下,每个线程中的两个变量都是null。class Thread implements Runnable { ThreadLocal.ThreadLocalMap threadLocals = null; ThreadLocal.ThreadLocalMap inheritableThreadLocals = null; } 只有当线程第一次调用ThreadLocal的set或者get方法的时候才会创建他们。除此之外,每个线程的本地变量不是存放在ThreadLocal实例中,而是存放在调用线程的Thread实例的threadLocals变量里面,也就是说ThreadLocal类型的本地变量是存放在具体的线程空间上,其本身相当于一个装载线程本地变量的容器,通过set方法将value添加到调用线程的threadLocals中,当调用线程调用get方法的时候,从它的threadLocals中取出变量。如果调用线程一直不终止,那么这个本地变量将会一直存放在它的threadLocals中,所以不使用本地变量的时候调用remove()方法,将threadLocals中删除不使用的变量。类关系图set方法源码分析public void set(T value){ //先获取当前线程实例(调用者线程) Thread t = Thread.currentThread(); //先通过当前线程实例作为key去获取ThreadLocalMap ThreadLocalMap map = getMap(t); //如果获取不是null,直接set值,key为当前定义的ThreadLocal变量的this引用。 if(map !=null ) map.set(this,value); //如果为null说明是首次添加,要先创建出对应的map else createMap(t,value); } getMap(Thread t)方法获取的是当前线程对应的threadLocals。ThreadLocalMap getMap(Thread t){ //获取的是自己的线程变量threadLocals,并且绑定到调用线程的的成员变量threadLocals上。 return t.threadLocals; } 如果调用getMap方法返回null,就直接将value值设置到threadLocals中(key为当前线程的引用,值为本地变量);如果getMap方法返回null说明是第一次调用set方法(因为threadLoclas默认值为null),这个时候就要调用createMap方法创建threadLocals。createMap方法不仅创建了threadLocals,同时也将本地变量添加到threadLocals中。void createMap(Thread t,T firstValue){ t.threadLocals = new ThreadLocalMap(this,firstValue); } get方法源码分析 在get方法的实现中,首先获取当前调用者线程,如果当前线程的threadLocals不为null,就直接返回当前线程绑定的本地变量的值,否则执行setInitialValue方法初始化threadLocals变量。setInitialValue方法中,类似于set方法的实现,都是判断当前线程的threadLocals变量是否为null,是则添加本地变量(这个时候由于初始化,所以添加的值也为null),否则创建threadLocals变量,同样添加的值为null。public T get(){ //获取当前线程实例 Thread t = Thread.currentThread(); //获取当前实例的ThreadLocalMap ThreadLocalMap map = getMap(t); //判断map是否为空 if(map != null){ //获取map中的value返回 ThreadLocalMap.Entry e = map.getEntry(this); if(e != null){ T result = (T)e.value; return result; } } //如果map为空则调用setInitialValue进行初始化,设置threadLocals变量value设为null return setInitialValue(); } private T setInitialValue(){ //protected T initialValue() { return null; } T value = initialValue(); //获取当前线程 Thread t = Thread.currentThread(); //以当前线程作为key值,查找对应的线程变量,找到对应的map ThreadLocalMap map = getMap(t); //如果map不为null if(map != null) //这块value 的值为null map.set(this,value); else //这块value的值也为null createMap(t,value); return value; } remove方法源码分析remove方法判断该当前线程对应的threadLocals变量是否为null,不为null就直接删除当前线程中指定的threadLocals变量。public void remove(){ //获取当前线程绑定的threadLocals ThreadLocalMap map = getMap(Thread.currentThread()); //如果map不为null,就移除当前线程中指定的ThreadLocal实例的本地变量 if(m != null){ m.remove(this); } } Threadlocal的内部结构图从上面的我们可以看出:每一个Thread线程内部都有一个Map类型为ThreadLocalMap。Map里面存储线程本地对象(key)和线程的变量副本(value)。Thread内部的Map是由ThreadLocal维护的,由ThreadLocal负责向map获取和设置线程的变量值。对于不同的线程,每次获取副本值时,别的线程并不能获取当前西安城的副本值,于是形成了副本的隔离互相不干扰。ThreadLocalMap是ThreadLocal的内部类,没有实现Map接口,用独立的方式实现了Map的功能,Map内部节点对象Entry也独立实现。每个ThreadLocal只能保存一个变量副本,如果想要上线一个线程能够保存多个副本,就要多创建几个ThreadLocal实例。Threadlocal里存储的可以是long,也可以是其他对象,同一个线程里不断set会不断覆盖。不同线程间数据互不影响。ThreadLocalMap源码中的set方法 每个线程内部都会有一个名为threadLocals的成员变量,该变量的类型为ThreadLocal.ThreadLocalMap类型,其中的key为当前定义的ThreadLocal变量的this引用,value为我们使用set方法设置的值,每个线程本地变量存放在自己的本地内存变量threadLocals中,如果当前线程一直不消亡,那么这些线程一直存在(所以会导致内存溢出),因此使用完毕后要将其remove掉。4.ThreadLocal不支持继承性同一个ThreadLocal变量在父线程中被设置值后,在子线程中是获取不到的。(threadLocals中为当前调用线程对应的本地变量,所以二者自然是不能共享的public class ThreadLocalDemo { private static ThreadLocal<String> threadLocal = new ThreadLocal<>(); public static void main(String[] args) { threadLocal.set("mainValue"); new Thread(()->{ System.out.println("子线程拿到threadLocal中的值:"+threadLocal.get()); }).start(); System.out.println("main线程拿到threadLocal中的值:"+threadLocal.get()); } } 5.InheritableThreadLocal支持继承性在上面说到的ThreadLocal类是不能提供子线程访问父线程的本地变量的,而InheritableThreadLocal类则可以做到这个功能.同样是上面的代码,将ThreadLocal改成InheritableThreadLocal即可6.从ThreadLocalMap看ThreadLocal使用不当的内存泄漏问题首先我们先来看一下Java的四种引用用类型。强引用:Java中默认的引用类型,一个对象如果具有强引用那么只要这种引用还存在就不会被GC。软引用:如果一个对象持有软引用,那么在JVM发生OOM之前,是不会GC这个对象的,只有到JVM内存内存不足的时候才会GC这个对象,软引用和一个引用队列联合使用,如果软引用所引用的对象被回收之后,该引用就会加入到与之关联的引用队列中。弱引用:弱引用对象指挥生存到下一次GC之前,只要发生GC无论内存够不够,都会把弱引用对象GC掉。虚引用:虚引用是所有引用中最弱的一个,其存在就是将关联虚引用的对象在被GC掉之后收到一个通知。分析ThreadLocalMap内部实现ThreadLocalMap内部实现其实就是一个Entry[]数组,分析Entry内部类。/** * 是继承自WeakReference的一个类,该类中实际存放的key是 * 指向ThreadLocal的弱引用和与之对应的value值(该value值 * 就是通过ThreadLocal的set方法传递过来的值) * 由于是弱引用,当get方法返回null的时候意味着坑能引用 */ static class Entry extends WeakReference<ThreadLocal<?>> { /** value就是和ThreadLocal绑定的 */ Object value; //k:ThreadLocal的引用,被传递给WeakReference的构造方法 Entry(ThreadLocal<?> k, Object v) { super(k); value = v; } } //WeakReference构造方法(public class WeakReference<T> extends Reference<T> ) public WeakReference(T referent) { super(referent); //referent:ThreadLocal的引用 } //Reference构造方法 Reference(T referent) { this(referent, null);//referent:ThreadLocal的引用 } Reference(T referent, ReferenceQueue<? super T> queue) { this.referent = referent; this.queue = (queue == null) ? ReferenceQueue.NULL : queue; } Entry的源码中我们可以看出Entry继承WeakReference<ThreadLocal<?>>类,当前的ThreadLocal的应用key被传递到WeakReference<ThreadLocal<?>>的构造函数中,所以ThreadLocalMap中的key为ThreadLocal的弱引用,value就是通过ThreadLocal设置的值,,如果当前线程一直没由调用该ThreadLocal 的remove方法,这时还有别的地方对ThreadLocal的引用,那么当前线程中的ThreadLocalMap中会存在对ThreadLocal变量的引用和value对象的引用,由于ThreadLocalMap中key为ThreadlLocal的弱引用,当下一次GC的时候,key会被释放,但是value不会,这时就会造成key为空,但是value不为空的情况,造成内存泄露,所以实际开发中应尽量漏掉remove方法。
1.什么是线程安全性当多个线程访问某个类,不管运行时环境采用何种调度方式或者这些线程如何交替执行,并且在主调代码中不需要任何额外的同步或协同,这个类都能表现出正确的行为,那么就称这个类为线程安全的。线程不安全的原因线程是抢先执行的原子性操作,当CPU执行一个线程过程时,调度器可能调走CPU,去执行另一个线程,此线程的操作可能还没有结束。多个线程尝试修改同一个变量。内存可变性指令重排2.原子性操作一个操作或这多个操作,要么全部执行并且执行过程中不被任何因素打断,要么就都不执行。如何把非原子性操作变成原子性volatile关键字仅仅保证可见性,并不保证原子性,synchronized关键字使得操作具有原子性。3.深入理解synchronized(1)内置锁每个java对象都可以做一个实现同步的锁,这些锁成为内置锁。线程进入同步代码块或者方法的时候会自动获得锁,在退出同步代码块或者方法的时候释放该锁。获得内置锁的唯一途径就是进入这个锁的保护的同步代码块或者方法。(2)互斥锁内置锁是一个互斥锁,这就意味着最多只有一个线程能够获得该锁,当线程A尝试去获得线程B持有的内置锁时,线程A必须等待或者阻塞,知道线程B释放这个锁,如果B线程不释放这个锁,那么A线程将永远等待下去。(3)synchronized修饰普通方法synchronzied修饰普通方法锁住的是当前调用的对象,假如开两个线程两个实例去掉方法,那么两个实例各自持有一个锁,互相不干扰,但是如果是两个线程用同一实例去调用,那么就持有一个锁,第一个线程释放锁后,第二个才会拿到锁。public class SyncDemo { public synchronized void exampleOut(){ try { Thread.sleep(5000L); RuntimeMXBean runtimeMXBean = ManagementFactory.getRuntimeMXBean(); long uptime = runtimeMXBean.getUptime(); System.out.println(Thread.currentThread().getName()+"-线程运行时长:"+uptime+"ms"); } catch (InterruptedException e) { e.printStackTrace(); } } } 两个线程调用两个实例案例public static void main(String[] args) { SyncDemo syncDemo1 = new SyncDemo(); SyncDemo syncDemo2 = new SyncDemo(); new Thread(() -> { syncDemo1.exampleOut(); }).start(); new Thread(() -> { syncDemo2.exampleOut(); }).start(); } 两个线程调用一个实例案例public static void main(String[] args) { SyncDemo syncDemo1 = new SyncDemo(); SyncDemo syncDemo2 = new SyncDemo(); new Thread(() -> { syncDemo1.exampleOut(); }).start(); new Thread(() -> { syncDemo1.exampleOut(); }).start(); } (4)synchronized修饰静态方法synchronized修饰静态方法锁住的是整个类的对象,无论有多少个实例,只要是当前类的实例,都持有一个锁,一般生产不建议用静态同步方法,可能会导致程序运行阻塞。public class SyncDemo { public static synchronized void staticOut(){ try { Thread.sleep(5000L); RuntimeMXBean runtimeMXBean = ManagementFactory.getRuntimeMXBean(); long uptime = runtimeMXBean.getUptime(); System.out.println(Thread.currentThread().getName()+"-线程运行时长:"+uptime+"ms"); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) { SyncDemo syncDemo1 = new SyncDemo(); SyncDemo syncDemo2 = new SyncDemo(); new Thread(() -> { syncDemo1.staticOut(); }).start(); new Thread(() -> { syncDemo2.staticOut(); }).start(); } (5)synchronized修饰代码块synchronized修饰代码块锁住的是当前对象,用法和synchronized修饰普通方法一样,但是更细粒度确定锁的位置,比synchronized修饰普通方法的效率要高。public class SyncDemo { private Object lock = new Object(); public void blockOut(){ try { Thread.sleep(5000L); RuntimeMXBean runtimeMXBean = ManagementFactory.getRuntimeMXBean(); long uptime = runtimeMXBean.getUptime(); System.out.println(Thread.currentThread().getName()+"-线程运行时长:"+uptime+"ms"); } catch (InterruptedException e) { e.printStackTrace(); } } } 两个线程调用两个实例案例public static void main(String[] args) { SyncDemo syncDemo1 = new SyncDemo(); SyncDemo syncDemo2 = new SyncDemo(); new Thread(() -> { syncDemo1.blockOut(); }).start(); new Thread(() -> { syncDemo2.blockOut(); }).start(); } 两个线程调用一个实例案例public static void main(String[] args) { SyncDemo syncDemo1 = new SyncDemo(); SyncDemo syncDemo2 = new SyncDemo(); new Thread(() -> { syncDemo1.blockOut(); }).start(); new Thread(() -> { syncDemo1.blockOut(); }).start(); } 3.4.volatile关键字(1)volatile关键字的作用保证了变量的可见性(visibility)。被volatile关键字修饰的变量,如果值发生了变更,其他线程立马可见,避免出现脏读的现象。(2)为什么会出现脏读Java内存模型规定所有的变量都是存在主存当中,每个线程都有自己的工作内存。线程对变量的所有操作都必须在工作内存中进行,而不能直接对主存进行操作。并且每个线程不能访问其他线程的工作内存。变量的值何时从线程的工作内存写回主存,无法确定。(3)volatile案例实战public class VolatileDemo { private volatile boolean flag = false; public void work(){ while (!flag) { System.out.println("线程开始工作"); } } public void down(){ flag = true; System.out.println("线程停止工作"); } public static void main(String[] args) { VolatileDemo work = new VolatileDemo(); new Thread(work::work).start(); new Thread(work::work).start(); new Thread(work::down).start(); new Thread(work::work).start(); new Thread(work::work).start(); } } 不加volatile关键字的运行结果加volatile关键字的运行结果(4)volatile只能保证变量的可见性,不能保证对volatile变量操作的原子性案例实战,分别对num进行+1000,+2000,+3000的操作public class VolatileDemo { private volatile int num = 0; //执行方法加上synchronized public synchronized void addNum(){ num++; } //执行方法加上synchronized public synchronized int getNum(){ return num; } public static void main(String[] args) { VolatileDemo volatileDemo = new VolatileDemo(); new Thread(()->{ for (int i = 0; i < 1000; i++) { volatileDemo.addNum(); } System.out.println(volatileDemo.getNum()); }).start(); new Thread(()->{ for (int i = 0; i < 2000; i++) { volatileDemo.addNum(); } System.out.println(volatileDemo.getNum()); }).start(); new Thread(()->{ for (int i = 0; i < 3000; i++) { volatileDemo.addNum(); } System.out.println(volatileDemo.getNum()); }).start(); } } 加上synchronized之后3.5.happens-before规则(1)理解happens-before如果一个操作happens-before另一个操作,那么第一个操作的执行结果将对第二个操作可见,而且第一个操作执行顺序排在第二个操作之前。两个操作之间存在happens-before关系,并不意味着java平台的具体实现必须按照happens-before关系指定的顺序来执行。如果重排序之后的执行结果,与按happens-before关系来执行的结果一致,那么JMM也允许这样的重排序。如果操作A happens-before操作B,那么操作A在内存上所做的所有操作对于操作B都是可见的,不管它们在不在同一个线程。happens-before关系保证正确同步的多线程程序执行的结果不被重排序改变。(2)happens-before六大规则前三个规则用这个例子来看class VolatileExample{ int a=0; volatile boolean flag=false; public void writer(){ a=1; // 操作1 flag=true; // 操作2 } public void reader(){ if(flag){ // 操作3 int i=a; // 操作4 //这里i会是多少呢? } } } 程序顺序规则一个线程中的每一个操作,happens-before于该线程中的任意后续操作可见。程序前面对某个变量的修改一定是对后续操作可见的。例如上面代码块,按照程序顺序执行规则,1 happens-before 2,3 happens-before 4。volatile变量规则对一个volatile域的写,happens-before于任意后续对这个volatile域的读。例如上面代码块,按照volatile变量规则,2 happens-before 3。传递性规则如果A appens-before B,B happens-before C,那么A happens-before C。例如上面代码块,按照传递性规则,1 happens-before 4。管程中锁的规则对一个锁的解锁,happens-before于随后对这个锁的加锁。管程是一种通用的同步原语,在java中指的就是synchronized,synchronized是java里对管程的实现。synchronized(this){ if(this.x < 12){ this.x = 12; } } //根据管程中锁的规则,线程A执行完成后x会变成12,执行完释放锁,线程B进入代码块的时候,能够看到线程A对x的操作,也就睡说B看到的x值为12。 线程start规则父线程A启动后,启动子线程B,子线程B能够看到主线程在启动B前的所有操作(指共享变量的操作)。public class StartDemo { private static int num = 0; public static void main(String[] args) { Thread A = new Thread(()->{ Thread B = new Thread(()->{ System.out.println("B线程中读取num:"+num); //操作2 }); num = 1; //操作1 B.start(); }); A.start(); } } //根据线程start规则,1 happens-before 2,线程A对共享变量a=1的操作对于线程B是可见的。 线程join规则父线程A等待子线程B完成, 当子线程B完成后 ,父线程A能够看到子线程B的操作(指的是对共享变量的操作)。public class JoinDemo { private static int num = 0; public static void main(String[] args) { Thread A = new Thread(()->{ Thread B = new Thread(()->{ num = 2; }); num = 1; B.start(); try { B.join(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("A线程中读取num:"+num); }); A.start(); } } 3.6.如何避免线程安全性问题(1)线程安全性问题成因多线程环境多个线程操作同意共享资源对该共享资源进行了非原子性操作(2)如何避免线程安全性问题多线程环境–将多线程改为单线程(加锁)多个线程操作同一共享资源–让其资源不进行共享(ThreadLocal、资源不可变、操作无状态化)对该共享资源进行了非原子性操作–将非原子性的操作改成原子性的操作(加锁)
1.Git简介1.1.什么是GitGit 是一个开源的分布式版本控制系统,用于敏捷高效地处理任何或小或大的项目。Git 是 Linus Torvalds 为了帮助管理 Linux 内核开发而开发的一个开放源码的版本控制软件。Git 与常用的版本控制工具 CVS, Subversion 等不同,它采用了分布式版本库的方式,不必服务器端软件支持。1.2.Git与SVN区别Git是分布式的,SVN不是:这是Git和其他非分布式版本控制系统,如SVN、CVS最核心的区别。Git把内容按元数据方式存储,而SVN是按文件:所有的资源控制系统都是把文件的辕信息隐藏在一个类似SVN、CVS等文件夹里。Git分支和SVN的分支不同:分支在SVN中一点也不特别,其实他是版本库的另一个目录。Git没有一个全局的版本号,SVN有:目前为止跟SVN相比Git最缺少的一个特征。Git的内容完整性要大于SVN:Git 的内容存储使用的是 SHA-1 哈希算法。这能确保代码内容的完整性,确保在遇到磁盘故障和网络问题时降低对版本库的破坏。2.Git工作流程2.1.Git的工程流程克隆Git资源作为工作目录在克隆的资源上修改添加文件如果别人更新了代码文件,你可以拉取最新的资源在提交前查看修改提交修改在修改完成后,如果发现错误,可以撤回并且再次修改并提交3.Git工作区、暂存区和版本库3.1.Git工作区、暂存区和版本库工作区:就是自己电脑中能看到的目录,工作空间。暂存区:英文叫stage和index,一般存在.git目录下的index文件中,所以把暂存区有时也叫索引区。版本库:工作区有一个隐藏目录 .git,这个不算工作区,而是 Git 的版本库。图中左侧为工作区,右侧为版本库。在版本库中标记为 “index” 的区域是暂存区(stage/index),标记为 “master” 的是 master 分支所代表的目录树。图中我们可以看出此时 “HEAD” 实际是指向 master 分支的一个"游标"。所以图示的命令中出现 HEAD 的地方可以用 master 来替换。图中的 objects 标识的区域为 Git 的对象库,实际位于 “.git/objects” 目录下,里面包含了创建的各种对象及内容。当对工作区修改(或新增)的文件执行 git add 命令时,暂存区的目录树被更新,同时工作区修改(或新增)的文件内容被写入到对象库中的一个新的对象中,而该对象的ID被记录在暂存区的文件索引中。当执行提交操作(git commit)时,暂存区的目录树写到版本库(对象库)中,master 分支会做相应的更新。即 master 指向的目录树就是提交时暂存区的目录树。当执行 git reset HEAD 命令时,暂存区的目录树会被重写,被 master 分支指向的目录树所替换,但是工作区不受影响。当执行 git rm --cached 命令时,会直接从暂存区删除文件,工作区则不做出改变。当执行 git checkout . 或者 git checkout – 命令时,会用暂存区全部或指定的文件替换工作区的文件。这个操作很危险,会清除工作区中未添加到暂存区中的改动。当执行 git checkout HEAD . 或者 git checkout HEAD 命令时,会用 HEAD 指向的 master 分支中的全部或者部分文件替换暂存区和以及工作区中的文件。这个命令也是极具危险性的,因为不但会清除工作区中未提交的改动,也会清除暂存区中未提交的改动。4.Git创建仓库4.1.git initGit 使用git init命令来初始化一个 Git 仓库,Git 的很多命令都需要在 Git 的仓库中运行,所以git init是使用 Git 的第一个命令。 在执行完成git init命令后,Git 仓库会生成一个 .git 目录,该目录包含了资源的所有元数据,其他的项目目录保持不变。 使用当前目录作为Git仓库git init使用我们指定目录作为Git仓库git init newrepo将文件添加到暂存区git add ./*将暂存区里的文件提交到本地仓库git commit -m "提交说明"4.2.git clone我们使用 git clone 从现有 Git 仓库中拷贝项目(类似 svn checkout)克隆仓库的命令格式为git clone <repo>如果我们需要克隆到指定的目录git clone <repo> <directory>参数说明**repo:**Git 仓库。**directory:**本地目录。4.3.git configgit 的设置使用 git config 命令显示当前的git信息git config --list编辑 git 配置文件git config -e # 针对当前仓库 git config -e --global # 针对系统上所有仓库设置提交代码时的用户信息git config --global user.name "runoob" git config --global user.email test@runoob.com5.Git基本操作(1)提交与修改命令说明git add添加文件到仓库git status查看当前仓库状态,显示有变更的文件git diff比较文件的不同,即暂存区和工作区的差异git commit提交暂存区到本地仓库git reset回退版本git rm删除工作区中的文件git mv移动或者重命名工作区文件(2)提交日志命令说明git log查看历史提交记录git blame以表格的形式插卡你指定文件的历史修改记录(3)远程操作命令说明git remote远程仓库操作git fetch从远程获取代码库git pull下载远程代码并合并git push上传远程代码并合并(4)创建分支命令git branch 分支名(5)切换分支git checkout 分支名 git checkout -b 分支名 #创建新分支并且切换到此分支(6)合并分支get merge(7)删除分支git branch -d 分支名(8)git文件的删除git rm 文件名如果只是 git rm --cache 仅删除暂存区里的文件 如果不加–cache 会删除工作区里的文件 并提交到暂存区(9)git log命令git log -数字 #表示查看最近几次提交的不同点git log -p -数字 #表示最近几次提交的不同点 git log --graph #以一个简单的线串联起整个提交历史
6.Kafka数据存储流程和原理概述6.1.Partitiontopic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列是以文件夹的形式存储在具体Broker本机上6.2.LEO(LogEndOffset)表示每个partition的log最后一条Message的位置6.3.HW(HighWatermark)表示partition各个replicas数据键同步且一致的offset位置,即表示allreplicas已经commit的位置HW之前的数据才是commit后的,对消费者才可见ISR集合里面最小leo6.4.offset每个partition都由一些列有序的、不可变的消息组成,这些消息被连续的追加到partition中partition中的每个消息都有一个连续的序号叫做offset,用于partition唯一标识一条信息可以认为offset是partition中Message的id6.5.Segment每个partition又由多个segment file组成segment file 由2部分组成,分别为index file 和 data file(log file)两个文件一一对应,后缀“.index”和".log"分别标识索引文件和数据文件命名规则:partition的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset+16.6.Kafka高效文件存储设计特点kafka把topic中一个partition大文件分成多个小文件段,通过多个小文件段,就容易定期清除或删除已经消费完的文件,减少磁盘占用。通过索引信息可以快速定位Messageproducer生产数据,要写入到log文件中,写的过程中一直追加到文件末尾,为顺序写,官网数据表明,同样的磁盘,顺序写能到600M/S,而随机写只是100K/S7.SpringBoot2.x项目整合Kafka7.1.引入kafka-clients依赖<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.4.0</version> </dependency> 7.2.配置客户端AdminClientConfig.BOOTSTARP_SERVERS_CONFIG,"ip:端口" AdminClient.create(配置信息); /** * 设置admin客户端 * @return */ public static AdminClient initAdminClient(){ Properties properties = new Properties(); properties.setProperties(AdminClientConfig.BOOTSTARP_SERVERS_CONFIG,"112.74.55.160:9092"); AdminClient adminClient = AdminClient.create(properties); return adminClient; } 7.3.创建topicNewTopic newTopic = new NewTopic(topic名称,分区数量,副本数量); adminClient.createTopics(Arrays.asList(newTopic)); //返回一个CreateTopicsResult createTopicsResult.all().get(); //异常处理 @Test public void createTopic(){ AdminClient adminClient = initAdminClient(); //2个分区,1个副本 NewTopic newTopic = new NewTopic(TOPIC_NAME,2,(short)1); CreateTopicsResult createTopicsResult = adminClient.createTopics(Arrays.asList(newTopic)); try { //future等待创建,成功不会有任何报错,失败或者超时会报错 createTopicsResult.all().get(); } catch (Exception e) { e.printStackTrace(); } System.out.println("ڠୌෛጱtopic"); } 7.4.删除topicadminClient.deleteTopics(topic名称的list集合); //返回一个DeleteTopicsResult deleteTopicsResult.all().get(); @Test public void delTopicTest(){ AdminClient adminClient = initAdminClient(); DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Arrays.asList("xdclass-test-topic")); try { deleteTopicsResult.all().get(); } catch (Exception e) { e.printStackTrace(); } } 7.5.列举topic-listadminClient.listTopics(); adminClient.listTopics(options); //返回一个ListTopicsResult Set<String> topics = listTopics.names().get(); //得到一个set集合遍历 //是否查看内部topic,可以不用 ListTopicsOptions options = new ListTopicsOptions(); options.listInternal(true); @Test public void listTopic(){ AdminClient adminClient = initAdminClient(); //是否查看内部topic,可以不用 ListTopicsOptions options = new ListTopicsOptions(); options.listInternal(true); ListTopicsResult listTopics = adminClient.listTopics(options); Set<String> topics = listTopics.names().get(); for (String topic : topics) { System.err.println(topic); } } 7.6.增加分区数量NewPartitions.increateTo(5); infoMap.put(TOPIC_NAME, newPartitions); adminClient.createPartitions(infoMap); @Test public void incrPartitionsTest() throws Exception{ Map<String, NewPartitions> infoMap = new HashMap<>(); NewPartitions newPartitions = NewPartitions.increaseTo(5); AdminClient adminClient = initAdminClient(); infoMap.put(TOPIC_NAME, newPartitions); CreatePartitionsResult createPartitionsResult = adminClient.createPartitions(infoMap); createPartitionsResult.all().get(); } 7.7.查看topic详情adminClient.describeTopics(Arrays.asList(TOPIC_NAME)) @Test public void getTopicInfo() throws Exception { AdminClient adminClient = initAdminClient(); DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Arrays.asList(TOPIC_NAME)); Map<String, TopicDescription> stringTopicDescriptionMap = describeTopicsResult.all().get(); Set<Map.Entry<String, TopicDescription>> entries = stringTopicDescriptionMap.entrySet(); entries.stream().forEach((entry)-> System.out.println("name :"+entry.getKey()+" , desc:"+ entry.getValue())); } 8.producer发送到Broker分区策略8.1.生产者发送消息到broker的策略如果指定Partition ID,则PR(ProducerRecord)被发送到指定的Partition。如果未指定Partition ID,但是指定了Key,PR就会按照Key的哈希取模发送到对应的Partition如果未指定Partition ID,也未指定Key,PR会按照默认round-robin轮询模式发送到每个Partition如果即制定了Partition ID,也指定了Key,PR会被发送到指定的Partition注意:Partition有多个副本,但只有一个replicationLeader复制该Partition和生产者消费者交互,消费者默认的消费Partition是range模式。8.2.生产者到broker发送流程Kafka的客户端发送数据到服务器,会经过内存缓冲区(默认是16KB),通过KafkaProducer发送出去的消息都是先进入到客户端本地的内存缓冲区里,然后把很多消息收集到Batch里面,然后在通过Sender线程发送到Broker上面,这样才尽可能的提高性能。8.3.生产者常见配置#Kafka地址,即broker的ip地址 bootstrap.servers #当producer向leader发送数据时,可以通过request.required.acks来设置数据可靠性的级别,分别是0,1,all acks #请求失败,生产者会自动重试,默认是0次,如果启动重试,则会有消息重复消费的可能性 retries #每个分区未发送消息的总字节大小,超过的化就会提交到服务端broker,默认是16kb batch.size #默认是0,指消息立即发送,即便是batch.size缓冲区还没满,到达linger.ms设置的秒数,也会提交消息到服务器 linger.ms #buffer.memory用来约束KafkaProducer能够使用的缓冲区大小,默认是32MB #注意:buffer.memory不能设置的太小,否则一旦写满,就会阻塞用户线程,不能在向kafka里写消息了 #buffer.momery一定要比batch.size设置的大,否则会报申请内存不足的错误。 buffer.memory #key的序列化器,将用户提供的 key和value对象ProducerRecord 进行序列化处理,key.serializer必须被设置,即使消息中没有指定key,序列化器必须是一个实现org.apache.kafka.common.serialization.Serializer接口的类,将key序列化成字节数组。 key.serializer value.serializer 9.producer API讲解9.1.封装配置属性public static Properties getProperties(){ Properties props = new Properties(); //kafka服务器地址 props.put("bootstrap.servers", "112.74.55.160:9092"); //props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "112.74.55.160:9092"); //当producer向leader发下哦那个数据时,可以通过request.required.acks参数来设置数据可靠性级别,0,1,all props.put("acks", "all"); //props.put(ProducerConfig.ACKS_CONFIG, "all"); //表示请求失败时,生产者会尝试自动重连,0表示不重连(默认),如果开启重连的话,可能会导致消息重复消费 props.put("retries", 0); //props.put(ProducerConfig.RETRIES_CONFIG, 0); //设置batch缓冲区的大小,表示当到达缓冲区的大小时,sender线程将拿取batch中的消息,默认是16kb props.put("batch.size", 16384); //表示缓冲区消息停留的时间,默认是0,立即发送,配置之后即使batch中的数据没有达到设定的值,到达时间后也会发送消息 props.put("linger.ms", 1); //用来约束kafka Producer能够使用的内存缓冲区的大小,默认是32MB props.put("buffer.memory", 33554432); //序列化机制 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); return props; } 9.2.生产者投递消息同步发送send()方法是异步的,添加消息到缓冲区等待发送,并立即返回 生产者将单个消息批量在一起发送提高效率,即batch.size和linger.ms结合 实现同步发送:一条消息发送之后,会阻塞当前线程,直至返回ack 发送消息后返回一个Future对象,调用get即可 消息发送主要是两个线程:Main主线程,Sender线程 main线程发送消息到RecordAccumulator即返回 sender线程从RecordAccumulator拉取信息发送到broker batch.size和linger.ms两个参数可以影响 sender 线程发送次数 @Test public void testSend(){ Properites props = getProperties(); Producer<String,String> producer = new KafkaProducer<>(props); for(int i = 0;i < 3;i++){ Future<RecordMetadata> future = producer.send(TOPIC_NAME,"xdclass-key"+i,"xdclass-value"+i); try{ RecordMetadata recordMetadata = future.get(); //不关心结果的话,可以不用这部 System.out.println("发送状态:"+recordMetadata.toString()); }catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } //记得要关必 producer.close(); } 9.3.异步发送配置回调函数发送消息配置回调函数即可,该回调方法会在Producer收到ack时被调用,为异步调用 回调函数有两个参数RecordMetadata和Exception,如果Exception是null,则发送消息成功,否则失败 @Test public void testSendCallback(){ Properties properties = getProperties(); Producer<String, String> producer = new KafkaProducer<>(properties); for (int i = 0; i < 3; i++) { producer.send(new ProducerRecord<>(TOPIC_NAME, "xdclass-key" + i, "xdclass-value" + i), new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if(e == null){ System.out.println("发送状态:"+recordMetadata.toString()); }else{ e.printStackTrace(); } } }); } producer.close(); } 9.4.producer发送消息到指定分区发送到指定topic的第五个分区@Test public void testSendCallbackAndPartition(){ Properties properties = getProperties(); Producer<String, String> producer = new KafkaProducer<>(properties); for (int i = 0; i < 3; i++) { //在ProducerRecord中配置,第二个参数 producer.send(new ProducerRecord<>(TOPIC_NAME, 4,"xdclass-key" + i, "xdclass-value" + i), new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { if(e == null){ System.out.println("发送状态:"+recordMetadata.toString()); }else{ e.printStackTrace(); } } }); } producer.close(); } 10.ProducerRecord介绍10.1.ProducerRecord(简称PR)发送给Kafka Broker的key/value键值对,封装基础数据信息--Topic(topic名称) --Partition ID(可选,分区的ID) --Key(可选,指定的key) --value(发送的消息value) key默认是null,大多数应用程序会用到key如果key为空,kafka会使用默认的partitioner,使用RoundRobin算法将消息均衡的分布在各个partition上如果key不为空,kafka使用自己实现的hash方法对key进行散列,决定消息改写到Topic的那个partition,拥有相同的key的消息会被写道同一个partition上,实现顺序消息11.生产者自定义partition分区规则11.1.源码解读默认分区器org.apache.kafka.clients.producer.internals.DefaultPartitioner11.2.自定义Partitioner类实现Partitioner接口public class MyPartitioner implements Partitioner { //在partition方法里实现自定义的配置规则 @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); if("xdclass".equals(key)) { return 0; } //使用hash值取模,确定分区(默认的也是这个方式) return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } @Override public void close() { } @Override public void configure(Map<String, ?> configs) { } } 11.3.配置自定义的Partitioner生效在配置的对象里加上配置props.put(“partitioner.class”,“com.lixiang.config.MyPartitioner”); //自定义的配置路径props.put("partitioner.class", "com.lixiang.config.MyPartitioner");
9.SpringCache+MyBatisPlus整合9.1.SpringCache+MyBatisPlus整合(1)SpringCache简介文档:https://spring.io/guides/gs/caching/自Spring3.1起,提供了类似于@Transactional注解事务的注解Cache支持,且提供了Cache抽象提供基本的Cache抽象,方便切换各种底层Cache只需要更少的代码就可以完成业务数据的缓存提供事务回滚时也自动回滚缓存,支持比较复杂的缓存逻辑核心一个是Cache接口,缓存操作的API一个是CacheManager管理各类缓存,有多个缓存框架实现(2)项目中引入cache的starter <!--springCache依赖包--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-cache</artifactId> </dependency> (3)配置文件指定缓存类型spring: cache: type: redis(4)启动类开启缓存注解@EnableCaching(5)添加数据库依赖 <!--mybatis plus和springboot整合--> <dependency> <groupId>com.baomidou</groupId> <artifactId>mybatis-plus-boot-starter</artifactId> <version>3.4.0</version> </dependency> <!--数据库驱动--> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.15</version> </dependency> (6)增加数据库配置以及mybatisplus日志打印的配置#配置plus打印sql⽇志 mybatis-plus: configuration: log-impl: org.apache.ibatis.logging.stdout.StdOutImpl spring: #数据库连接配置 datasource: url: jdbc:mysql://127.0.0.1:3306/redis_test?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=Asia/Shanghai username: root password: 123456 driver-class-name: com.mysql.cj.jdbc.Driver (7)数据库表建立CREATE TABLE `product` ( `id` int(11) unsigned NOT NULL AUTO_INCREMENT, `title` varchar(128) DEFAULT NULL COMMENT '标 题', `cover_img` varchar(128) DEFAULT NULL COMMENT '封⾯图', `detail` varchar(256) DEFAULT '' COMMENT '详 情', `amount` int(10) DEFAULT NULL COMMENT '新价 格', `stock` int(11) DEFAULT NULL COMMENT '库存', `create_time` datetime DEFAULT NULL COMMENT '创建时间', PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8mb4; ==================================================== INSERT INTO `product` (`id`, `title`, `cover_img`, `detail`, `amount`, `stock`, `create_time`) VALUES (1, 'AlibabaCloud', 'https://file.xdclass.net/video/2020/alibabaclo ud/zt-alibabacloud.png', 'https://file.xdclass.net/video/2021/60- MLS/summary.jpeg', 213, 100, '2021-09-12 00:00:00'), (2, 'Linux', 'https://file.xdclass.net/video/2020/alibabaclo ud/zt-alibabacloud.png', 'https://file.xdclass.net/video/2021/59- Postman/summary.jpeg', 42, 100, '2021-03-12 00:00:00'), (3, 'Docker', 'https://file.xdclass.net/video/2020/alibabaclo ud/zt-alibabacloud.png', 'https://file.xdclass.net/video/2021/60- MLS/summary.jpeg', 12, 20, '2022-09-22 00:00:00'), (4, 'Nginx', 'https://file.xdclass.net/video/2020/alibabaclo ud/zt-alibabacloud.png', 'https://file.xdclass.net/video/2021/60- MLS/summary.jpeg', 14, 20, '2022-11-12 00:00:00'); (8)数据库表对应的实体类编写@Data @TableName("product") public class ProductDO { @TableId(value = "id", type = IdType.AUTO) private Long id; /** * 标题 */ private String title; /** * 封⾯图 */ private String coverImg; /** * 详情 */ private String detail; /** * 新价格 */ private Integer amount; /** * 库存 */ private Integer stock; /** * 创建时间 */ private Date createTime; } (9)开发商品的CRUD和分页查询主类开启对Mapper的支持//mapper所在的路径 @MapperScan("xxx.xxx.xxx")编写Mapperpublic interface ProductMapper extends BaseMapper<ProductDO> { }编写service@Service public class ProductServiceImpl implements ProductService { @Autowired private ProductMapper productMapper; @Override public int save(ProductDO productDO) { int insert = productMapper.insert(productDO); return insert; } @Override public int del(int id) { int i = productMapper.deleteById(id); return i; } @Override public int update(ProductDO productDO) { int i = productMapper.updateById(productDO); return i; } @Override public ProductDO findById(int id) { return productMapper.selectById(id); } @Override public Map<String, Object> page(int page, int size) { Page pageInfo = new Page<>(page,size); IPage<ProductDO> iPage = productMapper.selectPage(pageInfo, null); Map<String,Object> pageMap = new HashMap<>(3); pageMap.put("total_record",iPage.getTotal()); pageMap.put("total_page",iPage.getPages()); pageMap.put("current_total",iPage.getRecords()); return pageMap; } } 编写controller@RestController @RequestMapping("/api/v1/product") public class ProductController { @Autowired private ProductService productService; @PostMapping("/add") public JsonData add(@RequestBody ProductDO productDO){ int save = productService.save(productDO); return JsonData.buildSuccess(save); } @PostMapping("/update") public JsonData update(@RequestBody ProductDO productDO){ int save = productService.update(productDO); return JsonData.buildSuccess(save); } @GetMapping("/findById") public JsonData findById(@RequestParam("product_id") int id){ ProductDO productDO = productService.findById(id); return JsonData.buildSuccess(productDO); } @DeleteMapping("/del") public JsonData del(@RequestParam("product_id") int id){ int i = productService.del(id); return JsonData.buildSuccess(i); } @GetMapping("/page") public JsonData page(@RequestParam("page") int page,@RequestParam("size") int size){ Map<String, Object> map = productService.page(page, size); return JsonData.buildSuccess(map); } } 9.2.Cacheable注解(1)Cacheable注解标记在一个方法上,也可以标记在一个类上缓存标注对象的返回结果,标注咋i方法上缓存该方法的返回值,标注在类上缓存所有方法的返回值value缓存名称,可以有多个key缓存的key规则,可以用springEL表达式,默认是方法参数组合condition缓存条件,使用springEL编写,返回true才缓存(2)用法案例//对象 @Cacheable(value = {"product"},key="#root.methodName") //分⻚ @Cacheable(value ={"product_page"},key="#root.methodName +#page+'_'+#size") (3)spEL表达式methodName当前被调用的方法名root.methodnameargs当前被调用的方法的参数列表root.args[0]result方法执行后的返回值result9.3.自定义CacheManager在springBoot的configuration类中添加修改redis缓存序列化器和配置manager过期时间/** * 修改Redis缓存序列化器和配置manager过期时间 */ @Primary @Bean public RedisCacheManager cacheManager1Hour(RedisConnectionFactory connectionFactory){ RedisCacheConfiguration config = instanceConfig(3600L); return RedisCacheManager.builder(connectionFactory).cacheDefaults(config).transactionAware().build(); } @Bean public RedisCacheManager cacheManager1Day(RedisConnectionFactory connectionFactory){ RedisCacheConfiguration config = instanceConfig(3600 * 24L); return RedisCacheManager.builder(connectionFactory).cacheDefaults(config).transactionAware().build(); } @Bean public RedisCacheManager cacheManager10Min(RedisConnectionFactory connectionFactory){ RedisCacheConfiguration config = instanceConfig(600L); return RedisCacheManager.builder(connectionFactory).cacheDefaults(config).transactionAware().build(); } /** * 序列化机制 * @param ttl * @return */ private RedisCacheConfiguration instanceConfig(Long ttl) { Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class); ObjectMapper objectMapper = new ObjectMapper(); objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); objectMapper.registerModule(new JavaTimeModule()); // 去掉各种@JsonSerialize注解的解析 objectMapper.configure(MapperFeature.USE_ANNOTATIONS, false); // 只针对⾮空的值进⾏序列化 objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL); // 将类型序列化到属性json字符串中 objectMapper.activateDefaultTyping(LaissezFaireSubTypeValidator.instance,ObjectMapper.DefaultTyping.NON_FINAL, JsonTypeInfo.As.PROPERTY); jackson2JsonRedisSerializer.setObjectMapper(objectMapper); return RedisCacheConfiguration.defaultCacheConfig() .entryTtl(Duration.ofSeconds(ttl)) .disableCachingNullValues() .serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(jackson2JsonRedisSerializer)); } 9.4.自定义缓存KeyGeneratorkey规则定义麻烦,支持自定义的规则,同样在springBoot的configuration中进行配置 /** * keyGenerator自定义key的规则 */ @Bean public KeyGenerator springCacheDefaultKeyGenerator(){ return new KeyGenerator() { @Override public Object generate(Object o, Method method, Object... objects) { return o.getClass().getSimpleName()+":"+method.getName()+":"+ StringUtils.arrayToDelimitedString(objects,":"); } }; } java代码实现key 属性和keyGenerator属性只能⼆选⼀@Cacheable(value = {"product"},keyGenerator ="springCacheCustomKeyGenerator", cacheManager ="cacheManager1Minute") 9.5.CachePut注解CachePut注解根据方法的请求参数对其结果进行缓存,每次都会触发真实方法的调用value缓存名称,可以有多个key缓存的key规则,可以用springEL表达式,默认是方法参数组合condition缓存条件,使用springEL编写,返回true才缓存@CachePut(value = {"product"},key = "#productDO.id")//常用于修改的方法上,修改数据库,然后修改对应的缓存 9.6.CacheEvict注解CacheEvict注解从缓存中移除相应数据,触发缓存删除的操作value缓存名称,可以有多个key 缓存的key规则,可以⽤springEL表达式,默认是⽅法参数组合beforeInvocation = false缓存的清除是否在方法之前执行,默认代表缓存清除操作是在方法执行之后执行如果出现异常缓存就不会清除beforeInvocation = true代表清除缓存操作实在方法执行之前,无论方法是否出现异常,缓存都清除@CacheEvict(value = {"product"},key = "#root.args[0]") 9.7.Caching注解Caching注解组合多个Cache注解使用允许在同一方法上使用多个@Cacheable、@CachePut、@CacheEvict注释@Caching( cacheable = { @Cacheable = (value = "product",key = "#id"), }, put = { @CachePut(value ="product",key = "#id"), @CachePut(value ="product",key = "'stock:'+#id") } ) 10.Redis6持久化配置-RDB和AOF10.1.Redis6.x持久化操作-RDB(1)Redis持久化介绍Redis时一个内存数据库,如果没有配置持久化,redis重启后数据就会全部丢失。因此开启redis的持久化功能,将数据保存到磁盘上,当redis重启后,可以从磁盘中恢复数据。(2)两种持久化方式RDB(Redis DataBase)AOF(append only file)(3)RDB持久化介绍在指定的时间间隔内将内存中的数据集快照写入磁盘默认的文件名为dump.rdb产生快照的情况save会阻塞当前Redis服务器,执行save命令期间,Redis不能处理其他命令,直到RDB过程完成为止bgsavefork创建子进程,RDB持久化过程由子进程负责,会在后台异步进行快照操作,快照同时还可以响应客户端请求自动化配置文件来完成,配置redis触发Redis的RDB持久化条件,比如“save m n”。表示m秒内数据集存在n次修改时,会自动触发bgsave主从架构从服务器同步数据的时候,会发送sync执行同步操作,master主服务器就会执行bgsave(4)优点和缺点优点RDB文件紧凑,全量备份,适合用于进行备份和灾难恢复在恢复大数据集时的速度比AOF的恢复速度要快生成的是一个紧凑的文件缺点每次快照时一次全量的备份,fork子进程进行后台操作,子进程存在开销在快照持久化期间修改的数据不会被保存,可能丢失数据(5)核心配置dir 持久化文件的路径dbfilename 文件名#任何ip可以访问 bind 0.0.0.0 #守护进程 daemonize yes #密码 requirepass 123456 #日志文件 logfile "/user/local/redis/log/redis.log" #持久化文件名 dbfilename xdclass.rdb #持久化文件路径 dir /usr/local/redis/data #关闭rdb #save "" #持久化策略,10s内有1个key改动,执行快照 save 10 1 #导出rdb数据库文件压缩字符串和对象,默认时yes,会浪费CPU但是节省空间 rdbcompression yes #导入时是否检查 rdbchecksum yes (6)配置文件触发#关闭RDB save "" #10秒2个key变动触发RDB save 10 2 #100秒5个key变动触发RDB save 100 5 (7)Linux内存分配策略0 表示内核将检查是否有足够的可用内存供应用进程使用,如果有足够的可用内存,内存申请允许,否则,内存申请失败,并把错误返回给前台 1 表示内核允许分配所有的物理内存,而不管当前内存状态如何 2 表示内核允许分配超过所有物理内存和交换空间总和的内存 解决方式 echo > /proc/sys/vm/overcommit_memory 持久化配置 vim /etc/sysctl.conf 改为 vm.overcommit_memory=1 修改sysctl.conf后,需要执⾏ sysctl -p 以使⽣效 10.2.Redis6.x持久化操作-AOF(1)AOF持久化介绍append only file ,追加文件的方式,文件容易被人读懂以独立日志的方式记录每次写命令,重启时在重新执行AOF中的命令达到恢复数据的目的写入过程中宕机,也不会影响之前的数据,可以通过redis-check-aof检查修复问题(2)配置appendonly yes,默认不开启,开启aof持久化AOF文件名通过appendfilename配置设置,默认文件名是appendonly.aof存储路径同RDB持久化方式一致,使用dir配置(3)核心原理Redis每次写入命令会追加到aof_buf(缓冲区)AOF缓冲区根据对应的策略向磁盘做同步操作高频的AOF会带来影响,特别是每次刷盘(4)提供了3种同步方式,在性能和安全方面做出平衡appendfsync always:每次有数据修改发生时都会写入AOF文件,消耗性能多appendfsync everysec:每秒同步一次,该策略为AOF的缺省策略appendfsync no:不主从同步,有草祖宗系统自动调度刷磁盘,性能是最好的,但是最不安全appendonly yes appendfilename "xdclass.aof" appendfsync everysec(5)rewrite重写介绍AOF文件越来越大,需要定期对AOF文件进行重写达到压缩旧的AOF文件含有无效命令会被忽略,保留最新的数据命令多条写命令可以合并为一个AOF重写降低了文件占用空间更小的AOF文件可以更快的被Redis加载(6)重写触发配置手动触发直接调用bgrewriteaof命令自动触发auto-aof-rewrite-min-size:表示运行AOF重写时文件最小体积,默认64mbauto-aof-rewrite-percentage:代表当前AOF文件空间和上一次重写后AOF文件空间(aof_base_size)的比值。(7)aof常用配置# 是否开启aof appendonly yes # ⽂件名称 appendfilename "appendonly.aof" # 同步⽅式 appendfsync everysec # aof重写期间是否同步 no-appendfsync-on-rewrite no # 重写触发配置 #AOF文件最小重写大小,只有AOF文件大小大于该值的时候才可以重写,默认64mb auto-aof-rewrite-min-size 64mb #当前AOF文件大小和最后一次重写后的大小之间的比率等于指定的增长率重写,100 表示当最后一次压缩为150 ,那么就300的时候进行压缩 auto-aof-rewrite-percentage 100 # 加载aof时如果有错如何处理 # yes表示如果aof尾部⽂件出问题,写log记录并继续执⾏。 #no表示提示写⼊等待修复后写⼊ aof-load-truncated yes (8)重写前后对比10.3.AOF和RDB的选择和混合模式(1)Redis提供了不同的持久化选项RDB持久化以指定的时间间隔执行数据集的时间点快照。AOF持久化记录服务器接收的每个写入操作,将在服务器启动时再次读取,重建原始数据集。使与Redis本身相同的格式以仅追加的方式记录命令,当文件太大时,Redis能够重写。(2)RDB的优缺点优点:RDB最⼤限度地提⾼了Redis的性能,⽗进程不需要参与磁盘I/ORDB⽂件紧凑,全量备份,适合⽤于进⾏备份和灾难恢复在恢复⼤数据集时的速度⽐ AOF 的恢复速度要快⽣成的是⼀个紧凑压缩的⼆进制⽂件缺点:如果您需要在Redis停⽌⼯作时(例如断电后)将数据丢失的可能性降⾄最低,则RDB并不好RDB经常需要fork才能使⽤⼦进程持久存储在磁盘上。如果数据集很⼤,Fork可能会⾮常耗时(3)AOF的优缺点优点:数据更加安全当Redis AOF⽂件太⼤时,Redis能够在后台⾃动重写AOFAOF以易于理解和解析的格式,⼀个接⼀个地包含所有操作的⽇志缺点:AOF⽂件通常⽐同⼀数据集的等效RDB⽂件⼤根据确切的fsync策略,恢复的时候AOF可能⽐RDB慢(4)线上系统怎末处理RDB持久化与AOF持久化⼀起使⽤如果Redis中的数据并不是特别敏感或者可以通过其它⽅式重写⽣成数据集群中可以关闭AOF持久化,靠集群的备份⽅式保证可⽤性⾃⼰制定策略定期检查Redis的情况,然后可以⼿动触发备份、重写数据采⽤集群和主从同步(5)Redis4.0后开始的rewrite支持混合模式就是rdb和aof⼀起⽤直接将rdb持久化的⽅式来操作将⼆进制内容覆盖到aof⽂件中,rdb是⼆进制,所以很⼩有写⼊的话还是继续append追加到⽂件原始命令,等下次⽂件过⼤的时候再次rewrite默认是开启状态好处:混合持久化结合了RDB持久化和AOF持续化的优点,采取了rdb的文件小易于灾难恢复同时结合AOF,增量的数据以AOF⽅式保存了,数据更少的丢失坏处:前部分是RDB格式,是⼆进制,所以阅读性较差数据恢复:先看是否存在aof⽂件,若存在则先按照aof⽂件恢复,aof⽐rdb全,且aof⽂件也rewrite成rdb⼆进制格式若aof不存在,则才会查找rdb是否存在11.Redis6服务端配置info+config命令11.1.info命令介绍服务器的各种信息和统计数值Server: #有关redis服务器的常规信息 redis_mode:standalone #运行模式,单机或者集群 multiplexing_api:epoll #redis所使用的事件处理机制 run_id:3abd26c33dfd059e87a0279defc4c96c13962e #redis服务器的随机标识符(用于sentinel和集群) config_file:/usr/local/redis/conf/redis/conf #配置文件路径 Clinets: #客户端连接部分 connected_clients:10 #已连接客户端的数量(不包括slave连接的客户端) Memory: #内存消耗相关信息 userd_memory:874152 #使用内存 used_memory_human:853.66K #以⼈类可读的格式返回 Redis 分配的内存总量 user_memory_rss:2834432 #系统给redis分配的内存即常驻内存,和top 、 ps 等命令的输出⼀致 used_memory_rss_human:2.70M # 以⼈类可读的格式返回系统redis分配的常驻内存top、ps等命令的输出⼀致 used_memory_peak:934040 #系统使用的峰值大小 used_memory_peak_human:912.15K total_system_memory:1039048704 #操作系统的总字节 total_system_memory_human:990.91M used_memory_lua:37888 # lua引擎使⽤的内存 used_memory_lua_human:37.00K maxmemory:0 #最大内存的配置值,0表示不限制 maxmemory_human:0B maxmemory_policy:noeviction #达到最⼤内存配置值后的策略 Persistence: #rdb和aof相关信息 rdb_bgsave_in_progress:0 #标识rdb save是否进⾏中 rdb_last_bgsave_status:ok # 上次的save操作状态 rdb_last_bgsave_time_sec:-1 # 上次rdb save操作使⽤的时间(单位s) rdb_current_bgsave_time_sec:-1 #如果rdbsave操作正在进⾏,则是所使⽤的时间 aof_enabled:1 #是否开启aof,默认没开启 aof_rewrite_in_progress:0 # 标识aof的rewrite操作是否在进⾏中 aof_last_rewrite_time_sec:-1 #上次rewrite操作使⽤的时间(单位s) aof_current_rewrite_time_sec:-1 #如果rewrite操作正在进⾏,则记录所使⽤的时间 aof_last_bgrewrite_status:ok #上次rewrite操作的状态 aof_current_size:0 # aof当前⼤⼩ Stats: #一版统计 evicted_keys:0 #因为内存⼤⼩限制,⽽被驱逐出去的键的个数 Replication: #主从同步信息 role:master #角色 connected_slaves:1 #连接的从库数 master_sync_in_progress:0 #标识主redis正在同步到从redis Cluster: #集群部分 cluster_enabled:0 # 实例是否启⽤集群模式 Keyspace: #数据库相关统计 db0:keys=4,expires=0,avg_ttl=0 # db0的key的数量,带有⽣存期的key的数,平均存活时间 11.2.config命令介绍可以动态的调整Redis服务器的配置(configuration)而无需重启config get xxx ,config set key “xxxx”timeout #客户端连接时的超时时间,单位为秒。当客户端在这段时间内没有发出任何指令,那么关闭该连接 databases #设置数据库的个数,可以使⽤ SELECT 命令来切换数据库。默认使⽤的数据库是 0 save #设置 Redis 进⾏rdb持久化数据库镜像的频率。 rdbcompression #在进⾏镜像备份时,是否进⾏压缩 slaveof #设置该数据库为其他数据库的从数据库 masterauth #当主数据库连接需要密码验证时,在这⾥配置 maxclients #限制同时连接的客户数量,当连接数超过这个值时,redis 将不再接收其他连接请求,返回error maxmemory #设置 redis 能够使⽤的最⼤内存, maxmemory #设置redis能够使用的最大内存备注防止所有内存超过服务器物理内存,maxmemory限制的时Redis实际使用的内存量,也就是used_memory统计项对应的内存由于内存碎片率的存在,实际消耗的内存可能会比maxmemory设置的更⼤,实际使⽤时要小心这部分内存溢出默认⽆限使⽤服务器内存, 为防⽌极端情况下导致系统内存耗尽, 建议所有的Redis进程都要配置maxmemory在64bit系统下,maxmemory设置为0表示不限制Redis内存使⽤,在32bit系统下,maxmemory不能超过3GB注意:redis在占用的内存超过指定的maxmemory之后,通过maxmemory_policy确定redis是否释放内存以及如何释放内存12.Redis6的key过期时间删除策略(1)背景redis的key配置了过期时间,这个是怎么被删除的redis数据明明过期了,怎末还占用内存redis只能用10G,往里面写20G会发生什么(2)Redis key过期策略定期删除+惰性删除(3)Redis如何淘汰过期的key定期删除:隔一段时间,就会随机抽取一些设置了过期时间的key,检查是否过期,如果过期了就删除定期删除可能会导致很多过期的key到了时间但是并没有被删除,这块就用到惰性删除惰性删除:当一些用户尝试访问它时,key会被发现并主动的过期,这会惰性删除算法会删除key放任键过期不管,但是每次从键空间中获取键时,都检查取得的键是否过期,如果过期了就删除Redis服务器实际使⽤的是惰性删除和定期删除两种策略:通过配合使⽤这两种删除策略,服务器可以很好地在合理使⽤CPU时间和避免浪费内存空间之间取得平衡。问题:如果定期删除漏掉了很多过期key,然后你也没有及时去查,也就没有走惰性删除,这回大量的过期key就会堆积在内存中,导致redis内存消耗尽了,就需要走内存淘汰机制。 注意:设计缓存中间件,可以参考redis的key过期淘汰方式和内存不足淘汰方式 (4)Redis key内存淘汰策略redis在占⽤的内存超过指定的maxmemory之后,通过maxmemory_policy确定redis是否释放内存以及如何释放内存 策略volatile-lru(least recently used) 最近最少使⽤算法,从设置了过期时间的键中选择空转时间最⻓的键值对清除掉; volatile-lfu(least frequently used) 最近最不经常使⽤算法,从设置了过期时间的键中选择某段时间之内使⽤频次最⼩的键值对清除掉; volatile-ttl 从设置了过期时间的键中选择过期时间最早的键值对清除 (删除即将过期的) volatile-random 从设置了过期时间的键中,随机选择键进⾏清除; allkeys-lru 最近最少使⽤算法,从所有的键中选择空转时间最⻓的键值对清除; allkeys-lfu 最近最不经常使⽤算法,从所有的键中选择某段时间之内使⽤频次最少的键值对清除; allkeys-random 所有的键中,随机选择键进⾏删除; noeviction 不做任何的清理⼯作,在redis的内存超过限制之后,所有的写⼊操作都会返回错误;但是读操作都能正常的进⾏; 注意:config配置的时候 下划线_的key需要⽤中横线- 127.0.0.1:6379> config set maxmemory_policy volatile-lru (error) ERR Unsupported CONFIG parameter:maxmemory_policy 127.0.0.1:6379> config set maxmemory-policy volatile-lru OK 8种 (1)不做处理(默认的) (2)从所有key中随机删除 (3)从所有key中找出访问次数少的,不怎么使用的 (4)从所有key中找出最近空转时间最长的 (5)从设置过期的key中随机删除 (6)从设置过期key中最近空转时间最长的 (7)从过期key中,快要到期的key中删除 (8)从过期key中删除最近不常用的 13.Redis高可用之主从复制13.1.Redis6主从复制+读写分离架构:一主二从架构搭建(1)背景单机部署简单,但是可靠性低,其不能很好的利用CPU多核处理生产环境-必须要保证高可用-一般不可能单机部署读写分离时可用性要求不高、性能要求不高、数据规模小的情况(2)目标读写分离,扩展主节点的读能力,分担主节点读压力容灾恢复,一旦主节点宕机,从节点作为主节点的备份可以随时顶上来(3)主从复制架构环境搭建准备配置#创建三个存放redis.conf的配置文件 mkdir -p /data/redis/master/data mkdir -p /data/redis/slave1/data mkdir -p /data/redis/slave2/data #从节点设置只读(默认) replica-read-only yes #从节点访问主节点的密码,和requirepass一样,注意主节点也要配下这个,因为主节点宕机后会重新选取主节点 masterauth 123456 #哪个主节点进行复制 replicaof 8.129.113.233 6379 创建配置文件主节点的redis.confbind 0.0.0.0 port 6379 daemonize yes requirepass "123456" logfile "/usr/local/redis/log/redis_master.log" dbfilename "xdclass_master.rdb" dir "/usr/local/redis/data" appendonly yes appendfilename "appendonly_master.aof" masterauth "123456" 创建两个从节点配置⽂件redis.confbind 0.0.0.0 port 6380 daemonize yes requirepass "123456" logfile "/usr/local/redis/log/redis_slave1.log" dbfilename "xdclass_slave1.rdb" dir "/usr/local/redis/data" appendonly yes appendfilename "appendonly_slave1.aof" replicaof 8.129.113.233 6379 masterauth "123456" bind 0.0.0.0 port 6381 daemonize yes requirepass "123456" logfile "/usr/local/redis/log/redis_slave2.log" dbfilename "xdclass_slave2.rdb" dir "/usr/local/redis/data" appendonly yes appendfilename "appendonly_slave2.aof" replicaof 8.129.113.233 6379 masterauth "123456" (4)启动主节点和从节点#启动主 ./redis-server /data/redis/master/data/redis.conf #启动从 ./redis-server /data/redis/slave1/data/redis.conf ./redis-server /data/redis/slave2/data/redis.conf 13.2.主从复制读写分离原理解析(1)主从复制分两种(主从刚连接时,进行全量同步,全量同步结束后,进行增量同步)全量复制刚连接时,从节点会向主节点发送一个sync指令,master服务器会开启一个后台进程用于redis的数据生成一个rdb文件主服务器会缓存所有接受到的来自客户端的写命令,当后台保存进程处理完毕后,会将该rdb文件传递给slaveslave服务器会将rdb文件保存在磁盘并通过读取该文件将数据加载到内存在此之后master服务器会将此期间缓存的命令通过redis传输协议发送给slave服务器然后slave服务器将这些命令依次作用在自己的服务器上,保证主从数据的一致性增量复制slave初始化后开始正常工作时主服务器发生的写操作同步到从服务器的过程服务器每执行一个写命令就会向从服务器发送相同的写命令,从服务器接受并且执行(2)特点主从复制对于主/从redis服务器来说是非阻塞的,所以同步期间都可以正常处理外界请求一个主redis可以包含多个从redis,每个从redis可以接受来其他从redis服务器的连接从节点不会让key过期,而是主节点的key过期删除后,成为del命令传到从节点进行删除(3)加速复制完全重新同步需要在磁盘上创建⼀个RDB⽂件,然后加载这个⽂件以便为从服务器发送数据在⽐较低速的磁盘,这种操作会给主服务器带来较⼤的压⼒新版支持无磁盘的复制,子进程直接将RDB通过网络发送给从服务器,不使用磁盘作为中间存储repl-diskless-sync yes(默认是no)(4)主从断开连接如果遭遇连接断开,重新连接之后可以从中断处继续进⾏复制,⽽不必重新同步2.8版本后 部分重新同步这个新特性内部使⽤PSYNC命令,旧的实现中使⽤SYNC命令14.Redis6节点高可用监控之Sentinel(1)背景Redis主从复制,当主机宕机后,需要手动将从服务器切换成主服务器,人工干预费事费力,还会造成一段时间内服务不可用(2)哨兵模式介绍Redis提供了哨兵的命令,是一个独立的进程原理:哨兵通过发送命令给多个节点,等待Redis服务器响应,从⽽监控运⾏的多个Redis实例的运⾏情况当哨兵监测到master宕机,会⾃动将slave切换成master,通过通知其他的从服务器,修改配置⽂件切换主机(3)Sentinel三大工作任务**监控:**Sentinel会不断的检查你的主服务器和从服务器是否运行正常**提醒:**当被监控的某个redis服务器出现问题时,Sentinel可以通过API向管理员或者其他应用程序发送通知**自动故障迁移:**当一个主服务器不能正常工作时,Sentinel会开始一次自动故障迁移操作,它会将失效主服务器的其中一个从服务器升级为新的主服务器,并让失效主服务器的其他从服务器改为连接新的主服务器,当客户端试图连接失效的主服务器时,集群也会向客户端返回新的服务器地址。注意:一般使用多个哨兵进行监控,各个哨兵之间还会进行监控,形成哨兵模式(4)多哨兵模式下线名称介绍主观下线(Subjectively Down, 简称 SDOWN)是单个Sentinel 实例对服务器做出的下线判断,⽐如⽹络问题接收不到通知等⼀个服务器没有在 down-after-milliseconds 选项所指定的时间内, 对向它发送 PING 命令的 Sentinel返回⼀个有效回复(valid reply), 那么 Sentinel就会将这个服务器标记为主观下线客观下线(Objectively Down, 简称 ODOWN)指的是多个 Sentinel 实例在对同⼀个服务器做出SDOWN 判断, 并且通过 SENTINEL is-master-down-by-addr 命令互相交流之后, 得出的服务器下线判断⼀个 Sentinel 可以通过向另⼀个 Sentinel 发送SENTINEL is-master-down-by-addr 命令来询问对⽅是否认为给定的服务器已下线客观下线条件只适⽤于主服务器仲裁 qurumSentinel 在给定的时间范围内, 从其他 Sentinel 那⾥接收到了【⾜够数量】的主服务器下线报告, 那么 Sentinel 就会将主服务器的状态从主观下线改变为客观下线这个【⾜够数量】就是配置⽂件⾥⾯的值,⼀般是Sentinel个数的⼀半加1,⽐如3个Sentinel则就设置为2down-after-milliseconds 是⼀个哨兵在超过规定时间依旧没有得到响应后,会⾃⼰认为主机不可⽤当拥有认为主观下线的哨兵达到sentinel monitor所配置的数量时,就会发起⼀次投票,进⾏failover(5)核心流程每秒ping,超过时间不响应则任务主管下线满足多个,则认为是客观下线投票选择主节点如果没有足够的节点同意master下线,则状态会被移除(6)环境准备配置三个哨兵,每个哨兵的配置都是一样的启动顺序,先启动主节点在启动从节点,最后启动三个哨兵哨兵端口是【26379】记得开发#不限制ip bind 0.0.0.0 # 让sentinel服务后台运⾏ daemonize yes # 配置监听的主服务器,mymaster代表服务器的名称,⾃定义,172.18.172.109 代表监控的主服务器,6379代表端⼝,2代表只有两个或两个以上的哨兵认为主服务器不可⽤的时候,才会进⾏failover操作。 sentinel monitor mymaster 172.18.172.109 6379 2 # sentinel auth-pass定义服务的密码,mymaster是服务名称,123456是Redis服务器密码 sentinel auth-pass mymaster 123456 #超过5秒master还没有连接上,则认为master已经停⽌ sentinel down-after-milliseconds mymaster 5000 #如果该时间内没完成failover操作,则认为本次failover失败 sentinel failover-timeout mymaster 30000 在/usr/local/redis/conf创建三个文件sentinel-1.conf、sentinel-2.conf、sentinel-3.confport 26379 bind 0.0.0.0 daemonize yes pidfile "/var/run/redis-sentinel-1.pid" logfile "/var/log/redis/sentinel_26379.log" dir "/tmp" sentinel monitor mymaster 8.129.113.233 6379 2 sentinel down-after-milliseconds mymaster 5000 sentinel auth-pass mymaster 123456 sentinel failover-timeout mymaster 30000 port 26380 bind 0.0.0.0 daemonize yes pidfile "/var/run/redis-sentinel-2.pid" logfile "/var/log/redis/sentinel_26380.log" dir "/tmp" sentinel monitor mymaster 8.129.113.233 6379 2 sentinel down-after-milliseconds mymaster 5000 sentinel auth-pass mymaster 123456 sentinel failover-timeout mymaster 30000 port 26381 bind 0.0.0.0 daemonize yes pidfile "/var/run/redis-sentinel-3.pid" logfile "/var/log/redis/sentinel_26381.log" dir "/tmp" sentinel monitor mymaster 8.129.113.233 6379 2 sentinel down-after-milliseconds mymaster 5000 sentinel auth-pass mymaster 123456 sentinel failover-timeout mymaster 30000 (7)启动哨兵集群./redis-server /usr/local/redis/conf/sentinel-1.conf --sentinel ./redis-server /usr/local/redis/conf/sentinel-2.conf --sentinel ./redis-server /usr/local/redis/conf/sentinel-3.conf --sentinel (8)SpringBoot/微服务cloud整合Redis主从+Sentinel哨兵注释掉host和port新增配置redis: #host: 8.140.116.67 #port: 6379 sentinel: master: mymaster nodes: 8.140.116.67:26379,8.140.116.67:26380,8.140.116.67:26381 password: 123456 client-type: jedis 15.Redis6节点高可用之Cluster集群(1)背景Sentinel解决了主从架构故障自动迁移的问题但是Master主节点的写能力和存储能力依旧受限使用Redis的集群Cluster就是为了解决单机Redis容量有限的问题,将数据一定的规划分配到多台机器(2)什么是集群Cluster是一组项目独立的、通过高速网络互联的计算机,它们构成了⼀个组,并以单⼀系统的模式加以管理(3)Redis集群模式介绍Cluster模式是Redis3.0开始推出采用务无中心结构,每个节点保存数据和整个集群状态,每个节点都和其他所有节点连接官⽅要求:⾄少6个节点才可以保证⾼可⽤,即3主3从;扩展性强、更好做到⾼可⽤各个节点会互相通信,采⽤gossip协议交换节点元数据信息数据分散存储到各个节点上(4)Cluster数据分片和虚拟哈希槽介绍常见的数据分区算法哈希取模对选择的partitioning key计算其哈希值,得到的哈希值就是对应的分区范围分片通过确定分区键是否在某个范围内来选择分区一致性Hash分区redis cluster集群没有采用一致性哈希方案,而是采用【数据分片】中的哈希槽来进行数据存储与读取的(5)什么是Redis的哈希槽slotRedis集群预分好16384个槽,当需要在Redis集群中放置一个key-value时,根据CRC16(key)mod16384的值,决定key放在哪个桶中(6)大体流程假设主节点的数量为3,将16384个曹魏按照【用户自己的规则】取分配这三个节点,每个节点复制一部分槽位节点1的槽位区间范围为0-5460节点2的槽位区间范围为5461-10922节点3的槽位区间范围为10923-16383注意:从节点是没有槽位的,只有主节点才有(7)存储查找对要存储查找的键进行crc16哈希运算,得到一个值,并取模16384,判断这个值在哪个节点的范围区间,假设crc16(“test_key”)%16384=3000,就是节点1,crc16算法不是简单的hash算法,是一种校验算法使⽤哈希槽的好处就在于可以⽅便的添加或移除节点。当需要增加节点时,只需要把其他节点的某些哈希槽挪到新节点就可以了当需要移除节点时,只需要把移除节点上的哈希槽挪到其他节点就⾏了(8)Cluster集群环境准备旧版本需要使用ruby语言进行构建,新版5之后直接redis-cli即可6个节点,三主双从,主从节点会自动分配,不是人工指定主节点故障后,从节点会替换主节点节点:6381、6382 6383、6384 6385、6386 配置bind 0.0.0.0 port 6381 daemonize yes requirepass "123456" dbfilename "xdclass_6381.rdb" logfile "/usr/local/redis/log/redis_6381.log" dir "/usr/local/redis/data" appendonly yes appendfilename "appendonly_6381.aof" masterauth "123456" #是否开启集群 cluster-enabled yes #生成node文件,记录集群节点信息,默认为nodes.conf,防止冲突,改为nodes-6381.conf cluster-config-file nodes-6381.conf #节点连接超时时间 cluster-node-timeout 20000 #集群节点的ip,当前节点ip cluster-announce-ip 172.18.172.109 #集群节点映射端口 cluster-announce-bus-port 16381 #集群节点总线端口,节点之间互相通信,常规端口+1万 cluster-announce-bus-port 16381 注意:阿里云开放网络安全组(9)Cluster集群三主三从搭建实战启动六个节点./redis-server ../conf/cluster/redis_6381.conf ./redis-server ../conf/cluster/redis_6382.conf ./redis-server ../conf/cluster/redis_6383.conf ./redis-server ../conf/cluster/redis_6384.conf ./redis-server ../conf/cluster/redis_6385.conf ./redis-server ../conf/cluster/redis_6386.conf 现在启动还没开启集群加入集群--cluster #构建集群中全部节点信息 --cluster-replicas 1 #主从节点的比例,1表示1主1从的方式 ./redis-cli -a 123456 --cluster create 8.140.116.67:6381 8.140.116.67:6382 8.140.116.67:6383 8.140.116.67:6384 8.140.116.67:6385 8.140.116.67:6386 --cluster-replicas 1 检查状态信息(其中一个节点执行即可)./redis-cli -a 123456 --cluster check 8.140.116.67:6381(10)Cluster集群读写命令./redis-cli -c -a 123456 -p 6379 #集群信息 cluster info #节点信息 cluster nodes 测试集群读写命令set/getkey哈希运算计算槽位置 槽在当前节点的话直接插入/读取。否则自动转向对应的节点 操作都是主节点,从节点只是备份流程解析主节点从节点启动应用->加入集群->从节点请求复制主节点(主从复制一样)(11)Cluster集群整合SpringBoot2.X修改配置文件redis: #host: 8.140.116.67 #port: 6379 # sentinel: # master: mymaster # nodes: 8.140.116.67:26379,8.140.116.67:26380,8.140.116.67:26381 cluster: #命名的最多转发次数 max-redirects: 3 nodes: 8.140.116.67:6381,8.140.116.67:6382,8.140.116.67:6383,8.140.116.67:6384,8.140.116.67:6385,8.140.116.67:6386 注意:一定要在同一个网段当kill掉主节点从节点会尝试连接,知道连不上,把自己置成主节点当原本的主节点挂掉之后,重新恢复,成为从节点,复制主节点的数据16.新版Redis6核心特性16.1.Redis6新特性-多线程(1)支持多线程redis6多线程只是用来处理网络数据的读写和协议解析上,底层数据操作还是单线程执行命令依旧是单线程,之所以这么设计是不想因为多线程而变的复杂,需要去控制key,lua,事务等等并发问题(2)默认不开启io-threads-do-reads yes io-threads 线程数 4核的机器建议设置为 2 或 3 个线程8核的建议设置为 4或6个线程注意:开启多线程后,是否会存在线程并发安全问题?不会有安全问题,Redis的多线程部分只是用来吃力网络数据的读写和协议解析,执行命令仍然是单线程顺序执行 16.2.Redis6新特性-acl权限控制(1)引入ACL(Access Control List)之前的redis没有⽤户的概念,redis6引⼊了acl 可以给每个⽤户分配不同的权限来控制权限 通过限制对命令和密钥的访问来提⾼安全性,以使不受信任的客户端⽆法访问 提⾼操作安全性,以防⽌由于软件错误或⼈为错误⽽导致进程或⼈员访问 Redis,从⽽损坏数据或配置 (2)常用命令acl list #当前启用的ACL规则 acl cat #支持的权限分类列表 acl cat hash #返回指定类别中的命令 acl setuser #创建和修改用户命令 acl deluser #删除用户命令 +<command>:将命令添加到⽤户可以调⽤的命令列表中,如+@hash -<command>:将命令从⽤户可以调⽤的命令列表中移除 #切换默认用户 auth default 123456 #例⼦ 密码 123 ,全部key,全部权限 acl setuser jack on >123 ~* +@all #例⼦ 密码 123 ,全部key,get权限 acl setuser jack on >123 ~* +get 参数说明user用户default示默认⽤户名,或则⾃⼰定义的⽤户名on表示是否启⽤该⽤户,默认为off(禁⽤)~*表示可以访问的Key(正则匹配)+@表示⽤户的权限,“+”表示授权权限,有权限操作或访问,“-”表示还是没有权限; @为权限分类,可以通过 ACL CAT 查询⽀持的分类。+@all表示所有权限,nocommands 表示不给与任何命令的操作权限。16.3.Redis6新特性-客户端缓存client side caching客户端缓存
4.String数据结构实战4.1.图形验证码存入Redis实战背景:注册-登录-修改密码一版需要发送验证码,但是容易被攻击而已调用。如何避免自己的网站被刷呢增加图形验证码单IP请求次数限制限制号码发送(1)Kaptcha框架介绍验证码的字体/大小/颜色验证码内容的范围(数字、字母、中文汉字)验证码图片的大小,边框,边框粗细,边框颜色验证码的干扰线,验证码的样式(2)添加Kaptcha依赖 <!--kaptcha依赖包 (图形验证码)--> <dependency> <groupId>com.baomidou</groupId> <artifactId>kaptcha-spring-boot-starter</artifactId> <version>1.0.0</version> </dependency> (3)代码配置,编写CaptchaConfig类@Configuration public class CaptchaConfig { /** * 验证码配置 * Kaptcha配置类名 * * @return */ @Bean @Qualifier("captchaProducer") public DefaultKaptcha kaptcha() { DefaultKaptcha kaptcha = new DefaultKaptcha(); Properties properties = new Properties(); //验证码个数 properties.setProperty(Constants.KAPTCHA_TEXTPRODUCER_CHAR_LENGTH, "4"); //字体间隔 properties.setProperty(Constants.KAPTCHA_TEXTPRODUCER_CHAR_SPACE,"8"); //干扰线颜色 //干扰实现类 properties.setProperty(Constants.KAPTCHA_NOISE_IMPL, "com.google.code.kaptcha.impl.NoNoise"); //图片样式 properties.setProperty(Constants.KAPTCHA_OBSCURIFICATOR_IMPL, "com.google.code.kaptcha.impl.WaterRipple"); //文字来源 properties.setProperty(Constants.KAPTCHA_TEXTPRODUCER_CHAR_STRING, "0123456789"); Config config = new Config(properties); kaptcha.setConfig(config); return kaptcha; } } (4)编写统一返回工具类public class JsonData { /** * 状态码 0 表示成功 */ private Integer code; /** * 数据 */ private Object data; /** * 描述 */ private String msg; public JsonData(int code,Object data,String msg){ this.data = data; this.msg = msg; this.code = code; } /** * 成功,不传入数据 * @return */ public static JsonData buildSuccess() { return new JsonData(0, null, null); } /** * 成功,传入数据 * @param data * @return */ public static JsonData buildSuccess(Object data) { return new JsonData(0, data, null); } /** * 失败,传入描述信息 * @param msg * @return */ public static JsonData buildError(String msg) { return new JsonData(-1, null, msg); } public Integer getCode() { return code; } public void setCode(Integer code) { this.code = code; } public Object getData() { return data; } public void setData(Object data) { this.data = data; } public String getMsg() { return msg; } public void setMsg(String msg) { this.msg = msg; } } z (5)编写CommonUtil工具类(获取前台请求ip和md5方法)public class CommonUtil { /** * 获取ip * @param request * @return */ public static String getIpAddr(HttpServletRequest request) { String ipAddress = null; try { ipAddress = request.getHeader("x-forwarded-for"); if (ipAddress == null || ipAddress.length() == 0 || "unknown".equalsIgnoreCase(ipAddress)) { ipAddress = request.getHeader("Proxy-Client-IP"); } if (ipAddress == null || ipAddress.length() == 0 || "unknown".equalsIgnoreCase(ipAddress)) { ipAddress = request.getHeader("WL-Proxy-Client-IP"); } if (ipAddress == null || ipAddress.length() == 0 || "unknown".equalsIgnoreCase(ipAddress)) { ipAddress = request.getRemoteAddr(); if (ipAddress.equals("127.0.0.1")) { // 根据网卡取本机配置的IP InetAddress inet = null; try { inet = InetAddress.getLocalHost(); } catch (UnknownHostException e) { e.printStackTrace(); } ipAddress = inet.getHostAddress(); } } // 对于通过多个代理的情况,第一个IP为客户端真实IP,多个IP按照','分割 if (ipAddress != null && ipAddress.length() > 15) { // "***.***.***.***".length() // = 15 if (ipAddress.indexOf(",") > 0) { ipAddress = ipAddress.substring(0, ipAddress.indexOf(",")); } } } catch (Exception e) { ipAddress=""; } return ipAddress; } public static String MD5(String data) { try { java.security.MessageDigest md = MessageDigest.getInstance("MD5"); byte[] array = md.digest(data.getBytes("UTF-8")); StringBuilder sb = new StringBuilder(); for (byte item : array) { sb.append(Integer.toHexString((item & 0xFF) | 0x100).substring(1, 3)); } return sb.toString().toUpperCase(); } catch (Exception exception) { } return null; } } (6)编写生成验证码存入Redis的逻辑@RestController @RequestMapping("/api/v1/captcha") public class CaptchaController{ @Autowired private StringRedisTemplate redisTemplate; @Autowired private Producer captchaProduct; @GetMapping("/get_captcha") public void getCaptcha(HttpServletRequest request,HttpServletResponse response){ /** * 获取随机的验证码 */ String captchaProducerText = captchaProducer.createText(); String key = getCaptchaKey(request); //放在Redis10分钟过期 redisTemplate.opsForValue().set(key,captchaProducerText,10, TimeUnit.MINUTES); BufferedImage image = captchaProducer.createImage(captchaProducerText); ServletOutputStream outputStream = null; try{ outputStream = response.getOutputStream(); ImageIO.write(image,"jpg",outputStream); outputStream.flush(); outputStream.close(); }catch (Exception e){ e.printStackTrace(); } } @GetMapping("/send_code") public JsonData sendCode(@RequestParam(value = "to",required = true)String to, @RequestParam(value = "captcha",required = true) String captcha, HttpServletRequest request){ String key = getCaptchaKey(request); String cacheCaptcha = redisTemplate.opsForValue().get(key); if(cacheCaptcha != null && captcha != null && cacheCaptcha.equalsIgnoreCase(captcha)){ //匹配通过一定要删除当前key redisTemplate.delete(key); //TODO 发送验证码逻辑 return JsonData.buildSuccess(); }else{ return JsonData.buildError("图形验证码不正确"); } } /* * * 获取存在缓存中的key用请求的ip和请求头,md5加密 */ private String getCaptchaKey(HttpServletRequest request){ String ip = CommonUtil.getIpAddr(request); String userAgent = request.getHeader("User-Agent"); String key = "user-service:captcha:"+CommonUtil.MD5(ip+userAgent); return key; } } 4.2.高并发商品首页热点数据开发实战(1)热点数据经常会被查询,但是不经产被修改或者删除的数据首页-详情页(2)链路逻辑检查缓存是否存在缓存不存在则查询数据库查询数据库的结果放到缓存中,设置过期时间下次访问则命中缓存(3)接口开发实体类编写,商品项,商品卡片//商品卡片实体类,里面有多个商品 public class VideoCardDO { private String title; private int id; private int weight; List<VideoDO> videoDOList; public String getTitle() { return title; } public void setTitle(String title) { this.title = title; } public int getId() { return id; } public void setId(int id) { this.id = id; } public int getWeight() { return weight; } public void setWeight(int weight) { this.weight = weight; } public List<VideoDO> getVideoDOList() { return videoDOList; } public void setVideoDOList(List<VideoDO> videoDOList) { this.videoDOList = videoDOList; } } //商品实体类 public class VideoDO { private int id; private String title; private String img; private int price; public VideoDO() { } public VideoDO(String title, String img, int price) { this.title = title; this.img = img; this.price = price; } public VideoDO(int id, String title, String img, int price) { this.id = id; this.title = title; this.img = img; this.price = price; } public int getId() { return id; } public void setId(int id) { this.id = id; } public String getTitle() { return title; } public void setTitle(String title) { this.title = title; } public String getImg() { return img; } public void setImg(String img) { this.img = img; } public int getPrice() { return price; } public void setPrice(int price) { this.price = price; } } cDao层编写,这块采用模拟数据库查询@Repository public class VideoCardDao { public List<VideoCardDO> list(){ try { TimeUnit.MILLISECONDS.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } List<VideoCardDO> cardDOList = new ArrayList<>(); VideoCardDO videoCardDO1 = new VideoCardDO(); VideoDO videoDO1 = new VideoDO(1,"SpringCloud","xxxxxxxxxxxxx",1000); VideoDO videoDO2 = new VideoDO(2,"Netty","xxxxxxxxxxxxx",234); VideoDO videoDO3 = new VideoDO(3,"面试专题视频","xxxxxxxxxxxxx",3564); VideoDO videoDO4 = new VideoDO(4,"AlibabaCloud","xxxxxxxxxxxxx",123); VideoDO videoDO5 = new VideoDO(5,"Dubbo","xxxxxxxxxxxxx",445); videoCardDO1.setId(1); videoCardDO1.setTitle("热门视频"); List<VideoDO> videoDOS = new ArrayList<>(); videoDOS.add(videoDO1); videoDOS.add(videoDO2); videoDOS.add(videoDO3); videoDOS.add(videoDO4); videoDOS.add(videoDO5); videoCardDO1.setVideoDOList(videoDOS); cardDOList.add(videoCardDO1); VideoCardDO videoCardDO2 = new VideoCardDO(); VideoDO videoDO6 = new VideoDO(1,"SpringCloud","xxxxxxxxxxxxx",1000); VideoDO videoDO7 = new VideoDO(2,"Netty","xxxxxxxxxxxxx",234); VideoDO videoDO8 = new VideoDO(3,"面试专题视频","xxxxxxxxxxxxx",3564); VideoDO videoDO9 = new VideoDO(4,"AlibabaCloud","xxxxxxxxxxxxx",123); VideoDO videoDO10 = new VideoDO(5,"Dubbo","xxxxxxxxxxxxx",445); videoCardDO1.setId(1); videoCardDO1.setTitle("项目实战"); List<VideoDO> videoDOS2 = new ArrayList<>(); videoDOS2.add(videoDO6); videoDOS2.add(videoDO7); videoDOS2.add(videoDO8); videoDOS2.add(videoDO9); videoDOS2.add(videoDO10); videoCardDO2.setVideoDOList(videoDOS2); cardDOList.add(videoCardDO2); return cardDOList; } } service层编写public interface VideoCardService { List<VideoCardDO> list(); } @Service public class VideoCardServiceImpl implements VideoCardService { @Autowired private VideoCardDao videoCardDao; @Autowired private RedisTemplate redisTemplate; private static final String VIDEO_CARD_CACHE_KEY = "video:card:key"; @Override public List<VideoCardDO> list() { Object cacheObj = redisTemplate.opsForValue().get(VIDEO_CARD_CACHE_KEY); if(cacheObj != null){ return (List<VideoCardDO>)cacheObj; }else{ List<VideoCardDO> list = videoCardDao.list(); redisTemplate.opsForValue().set(VIDEO_CARD_CACHE_KEY,list,10,TimeUtil.MINUTES); return list; } } } Controller层 @Autowired private VideoCardService videoCardService; /** * 缓存查找热点卡片 * @return */ @RequestMapping("/list_cache") public JsonData listCache(){ List<VideoCardDO> list = videoCardService.list(); return JsonData.buildSuccess(list); } 4.3.Redis6+Lua脚本实现原生分布式锁(1)分布式锁简介简介:分布试锁核心知识介绍和注意事项背景:保证同一时间只有一个客户端可以对共享资源进行操作案例:优惠劵领券限制次数、商品库存超卖核心:为了防止分布式系统中的多个线程之间进行相互干扰,我们需要一种分布式协调技术来对这些进程进行调度利用互斥机制来控制共享资源的访问,这就是分布式锁要解决的问题避免共享资源并发操作导致数据问题加锁:本地锁:synchronize、lock等,锁在当前进程内,集群部署下依旧存在问题分布式锁:redis、zookeeper等实现,虽然还是锁,但是多个进程公用锁标记,可以用Redis、Zookeeper、MySql等都可以设计分布式锁应该考虑的东西排他性:在分布式应用集群中,同一个方法在同一时间只能被一台机器的一个线程执行容错性:分布式锁一定能得到释放,比如客户端崩溃或者网络中断满足可重入、高性能、高可用注意分布式锁的开销、锁粒度(2)基于Redis实现分布式锁的几种坑实现分布式锁可以用Redis、Zookeeper、MySql数据库这几种,性能最好的是Redis且最容易理解的分布式锁离不开key -value 设置key是锁的唯一标识,一版按业务来决定命名,比如想要给一种优惠劵活动加锁,key命名为"coupon:id",value可以用固定值,比如设置成1基于redis实现分布式锁,文档:http://www.redis.cn/commands.html#string加锁setnx key valuesetnx 的含义就是SET if Not Exists,有两个参数setnx(key,value),该方法是原子性操作。 如果key不存在,则设置当前key成功,返回1。 如果当前key已经存在,则设置当前key失败,返回0。 解锁del(key)得到锁的线程执行完任务,需要释放锁,以便其他线程可以进入,调用del(key)配置锁超时expire(key,30s)客户端崩溃或者网络中断,资源将永会被锁住,即死锁,因此需要给key配置过期时间,以保证即使没有被显示释放,这把锁也要在一定时间后自动释放。 综合伪代码methodA(){ String key = "coupon_66" if(setnx(key,1) == 1){ //注意设置时间和设置key不是原子性 expire(key,30,TimeUnit.MILLISECONDS) try{ //做对应的业务逻辑 //查询用户是否已经领卷 }finally{ del(key) } }else{ //睡眠100毫秒,然后自旋调用本方法 methodA() } } 存在什么问题多个命令之间不是原子性操作,如setnx和expire之间,如果setnx成功,但是expire失败,且死机,则就是个死锁。使用原子性命令:设置和配置过期时间 setnx|setex 如:set key 1 ex 30 nx redisTemplate.opsFosValue().setIfAbsent("seckill_1",success,30,TimeUnit.MILLISECONDS) 业务超时,存在其他线程误删,key30秒过期,假如线程A执行很慢超过30s,则key就被释放了,其他线程B就得到了锁,这个时候线程A执行完成,而B还没有执行完成,结果A把B加的锁给删掉了。进一步细化误删可以在del释放锁之前做一个判断,验证当前的锁是不是自己加的锁,那value应该是存当前线程的标识或者uuid methodA(){ String key = "coupon_66" if(setnx(key,1) == 1){ //注意设置时间和设置key不是原子性 expire(key,30,TimeUnit.MILLISECONDS) try{ //做对应的业务逻辑 //查询用户是否已经领卷 }finally{ //删除锁操作判断是否为当前线程加的 if(redisTemplate.get(key).equals(value)){ //还在当前时间规定内 del(key) } } }else{ //睡眠100毫秒,然后自旋调用本方法 methodA() } } 核心还是判断和删除命令不是原子性操作导致的总结加锁+配置过期时间:保证原子性操作解锁:防止误删除、也要保证原子性操作采用Lua脚本+redis,保证多个命令的原子性(3)Lua脚本+Redis实现分布式锁的编码实现//获取lock的值和传递的值⼀样,调⽤删除操作返回1,否则返回0 String script = "if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end"; //Arrays.asList(lockKey)是key列表,uuid是参数 Integer result = redisTemplate.execute(new DefaultRedisScript<>(script, Integer.class),Arrays.asList(lockKey), uuid); @Slf4j @RestController @RequestMapping("/api/v1/coupon") public class CouponController { @Autowired private RedisTemplate redisTemplate; @GetMapping("/add") public JsonData save(@RequestParam(value = "coupon_id",required = true) int couponId){ //防止其他线程误删 String uuid = UUID.randomUUID().toString(); String lockKey = "lock:coupon:"+couponId; lock(couponId,uuid,lockKey); return JsonData.buildSuccess(); } private void lock(int couponId,String uuid,String lockKey) { Boolean nativeLock = redisTemplate.opsForValue().setIfAbsent(lockKey, uuid, Duration.ofSeconds(30)); log.info(uuid+"---加锁状态:"+nativeLock); //定义Lua脚本 String script = "if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end"; if(nativeLock){ //加锁成功,做相应的业务逻辑 try{ //核心业务逻辑 TimeUnit.SECONDS.sleep(3L); } catch (InterruptedException e) { e.printStackTrace(); } finally { //解锁操作 Object result = redisTemplate.execute(new DefaultRedisScript(script, Long.class), Arrays.asList(lockKey), uuid); log.info("解锁结果:"+result); } }else{ //加锁失败进入睡眠5s,然后在自旋调用 try { log.info("加锁失败,睡眠5s,进入自旋"); TimeUnit.MILLISECONDS.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } lock(couponId,uuid,lockKey); } } } setIfAbsent(): execute(): 5.List数据结构实战5.1.昨日热销榜单实战(1)需求:天猫每天的热销榜单,每天更新一次需要支持人工运营替换榜单的位置(2)企业中流程:定时任务计算昨天那些商品出售的数量最多晚上12点到1点更新到榜单上预留一个接口,支持人工运营(3)类似场景:京东:热销手机榜单、电脑榜单等百度:搜索热榜(4)编码实战开发接口@RequestMapping("rank") public JsonData videoRank(){ List<VideoDO> list = redisTemplate.opsForValue().range(RANK_KEY,0,-1); return JsonData.buildSuccess(list); } 测试数据@Test public void rankTest(){ String RANK_KEY = "rank:video"; VideoDO video1 = new VideoDO(3,"PaaS⼯业级微服务⼤课","xdclass.net",1099); VideoDO video2 = new VideoDO(5,"AlibabaCloud全家桶实战","xdclass.net",59); VideoDO video3 = new VideoDO(53,"SpringBoot2.X+Vue3综合实战","xdclass.net",49); VideoDO video4 = new VideoDO(15,"玩转23种设计模式+最近实战","xdclass.net",49); VideoDO video5 = new VideoDO(45,"Nginx⽹关+LVS+KeepAlive","xdclass.net",89); //leftPushAll向左边插入,所以放在最后一位的才是首个 redisTemplate.opsForList().leftPush(RANK_KEY,video6,video5,video4,video3,video2,video1); //rightPushAll向右边插入,所以首个就是第一个 //sTemplate.opsForList().leftPush(RANK_KEY,video1,video2,video3,video4,video5); } 6.Hash数据结构实战6.1.购物车实现案例实战(1)背景:电商购物车实现,支持买多见商品,每个商品不同数量支持高性能处理(2)购物车常见的实现方式:实现方式一:存储到数据库性能存在瓶颈实现方式二:前端本地存储-localstorage,sessionstoragelocalstorage在浏览器中存储key/value对,没有过期时间sessionstorage在浏览器中存储key/value对,在关闭会话窗口后将会删除这些数据实现方式三:后端存储到缓存redis可以开启AOF持久化防止重启丢失(推荐)(2)购物车数据结构介绍一个购物车里面,存在多个购物项所以购物车是一个双层的MapMap<String,Map<String,String>>第一层Map,key是用户id第二层Map,key是购物车商品的id,值是购物项的数据(3)对应redis里面的存储(4)编码实战实体类VideoDO、CartItemVO、CartVOpublic class CartItemVO { /** * 商品id */ private Integer productId; /** * 购买数量 */ private Integer buyNum; /** * 商品标题 */ private String productTitle; /** * 图片 */ private String productImg; /** * 商品单价 */ private int price; /** * 总价格 */ private int totalPrice; public Integer getProductId() { return productId; } public void setProductId(Integer productId) { this.productId = productId; } public Integer getBuyNum() { return buyNum; } public void setBuyNum(Integer buyNum) { this.buyNum = buyNum; } public String getProductTitle() { return productTitle; } public void setProductTitle(String productTitle) { this.productTitle = productTitle; } public String getProductImg() { return productImg; } public void setProductImg(String productImg) { this.productImg = productImg; } public int getPrice() { return price; } public void setPrice(int price) { this.price = price; } public int getTotalPrice() { return totalPrice*buyNum; } public void setTotalPrice(int totalPrice) { this.totalPrice = totalPrice; } } public class CartVO { private List<CartItemVO> cartItemVOS; private Integer totalAmount; public List<CartItemVO> getCartItemVOS() { return cartItemVOS; } public void setCartItemVOS(List<CartItemVO> cartItemVOS) { this.cartItemVOS = cartItemVOS; } /** * 返回购物车总价格 * @return */ public Integer getTotalAmount() { //jdk8新语法 return cartItemVOS.stream().mapToInt(CartItemVO::getTotalPrice).sum(); } public void setTotalAmount(Integer totalAmount) { this.totalAmount = totalAmount; } } 模拟dao层,数据库根据id返回数据@Repository public class VideoDao { private static Map<Integer, VideoDO> map = new HashMap<>(); static { map.put(1,new VideoDO(1,"工业级PaaS云平台SpringCloudAlibaba综合项⽬实战(完结)","https://xdclass.net",1099)); map.put(2,new VideoDO(2,"玩转新版⾼性能RabbitMQ容器化分布式集群实战","https://xdclass.net",79)); map.put(3,new VideoDO(3,"新版后端提效神器MybatisPlus+SwaggerUI3.X+Lombok","https://xdclass.net",49)); map.put(4,new VideoDO(4,"玩转Nginx分布式架构实战教程 零基础到⾼级","https://xdclass.net",49)); map.put(5,new VideoDO(5,"ssm新版SpringBoot2.3/spring5/mybatis3","https://xdclass.net",49)); map.put(6,new VideoDO(6,"新⼀代微服务全家桶AlibabaCloud+SpringCloud实 战","https://xdclass.net",59)); } /** * 模拟返回数据库资源 * @param videoId * @return */ public VideoDO findByVideoId(int videoId){ return map.get(videoId); } } JsonUtil工具类开发public class JsonUtil { private static final ObjectMapper MAPPER = new ObjectMapper(); /** * 对象转json字符串的方法 * @param data * @return */ public static String objectToJson(Object data){ try{ return MAPPER.writeValueAsString(data); } catch (JsonProcessingException e) { e.printStackTrace(); } return null; } /** * json字符串转对象的方法 * @param jsonData * @param beanType * @param <T> * @return */ public static <T> T jsonToObject(String jsonData,Class<T> beanType){ try { T t = MAPPER.readValue(jsonData, beanType); return t; } catch (JsonProcessingException e) { e.printStackTrace(); } return null; } } 开发VideoCardController,购物车控制层@RestController @RequestMapping("/api/v1/cart") @Slf4j public class VideoCardController{ @Autowired private VideoDao videoDao; @Autowired private RedisTemplate redisTemplate; /** * 添加到购物车 * @param videoId * @param buyNum * @return */ @RequestMapping("/add") public JsonData addCart(int videoId,int buyNum){ /** * 获取购物车 */ BoundHashOperations<String, Object, Object> myCartOps = getMyCartOps(); Object cacheObj = myCartOps.get(videoId + ""); String result = ""; //当购物车有这个商品,转化成字符串 if(cacheObj != null){ result = (String) cacheObj; } if(cacheObj == null){ //购物车没这个商品,从数据库里拿出来,在放到缓存中 CartItemVO cartItemVO = new CartItemVO(); VideoDO videoDO = videoDao.findByVideoId(videoId); cartItemVO.setBuyNum(buyNum); cartItemVO.setPrice(videoDO.getPrice()); cartItemVO.setProductId(videoDO.getId()); cartItemVO.setProductImg(videoDO.getImg()); cartItemVO.setProductTitle(videoDO.getTitle()); cartItemVO.setTotalPrice(videoDO.getPrice()*buyNum); myCartOps.put(videoId+"", JsonUtil.objectToJson(cartItemVO)); }else{ //不为空就将字符串转成对象,增加商品购买数量,在转成字符串放到redis里 CartItemVO cartItemVO = JsonUtil.jsonToObject(result, CartItemVO.class); cartItemVO.setBuyNum(cartItemVO.getBuyNum()+buyNum); myCartOps.put(videoId+"",JsonUtil.objectToJson(cartItemVO)); } return JsonData.buildSuccess(); } /** * 查看我的购物车 * @return */ @RequestMapping("/my-cart") public JsonData getMyCart(){ //获取购物车 BoundHashOperations<String, Object, Object> myCartOps = getMyCartOps(); List<CartItemVO> cartItemVOS = new ArrayList<>(); List<Object> itemList = myCartOps.values(); for (Object item : itemList) { CartItemVO cartItemVO = JsonUtil.jsonToObject((String) item, CartItemVO.class); cartItemVOS.add(cartItemVO); } CartVO cartVO = new CartVO(); cartVO.setCartItemVOS(cartItemVOS); return JsonData.buildSuccess(cartVO); } /** * 清空我的购物车 * @return */ @RequestMapping("/clear") public JsonData clear(){ String cartKey = getCartKey(); redisTemplate.delete(cartKey); return JsonData.buildSuccess(); } /*******************通用的方法,获取购物购物车数据,获取当前key*****************/ /** * 获取我的购物车通用方法 * @return */ private BoundHashOperations<String,Object,Object> getMyCartOps(){ //获取定义在Hash里的key,指定方法拼接 String key = getCartKey(); //返回当前key的集合,没有则新建返回 return redisTemplate.boundHashOps(key); } /** * 获取购物车的key,用前缀加上用户的id * @return */ private String getCartKey(){ //用户id,获取用户id,JWT解密后获取 int userId = 88; String cartKey = String.format("video:cart:%s", userId); return cartKey; } } 7.Set数据结构实战7.1.大数据下的用户画像标签去重(1)简介用户画像 英文User Profile,是根据用户基本属性、社会属性、行为属性、心理属性等真实信息抽象出的一个标签化的、虚拟的用户模型。“用户画像”的实质是对“人”的数字化。应用场景很多,比如个性化推荐、精准营销、金融风控、精细化运营等等,举个例子来理解用户画像的实际实用价值,我们经常用手机网购,淘宝里面的千人千面,通过“标签tag”来对用户的多维度特征进行提炼和标识,那灭个人的用户画像就需要存储,set集合就适合去重。用户画像不止针对某个人,也可以某一人群或行业的画像。(2)案例 /** * 用户画像去重 */ @Test public void userProfile(){ BoundSetOperations operations = redisTemplate.boundSetOps("user:tags:1"); operations.add("car","student","rich","dog","guangdong","rich"); Set<String> set1 = operations.members(); System.out.println(set1); operations.remove("dog"); Set<String> set2 = operations.members(); System.out.println(set2); } 7.2.关注、粉丝、共同好友(1)背景社交应用里面的关注、粉丝、共同好友案例(2)案例public void testSet(){ BoundSetOperations operationsLW = redisTemplate.boundSetOps("user:lw"); operationsLW.add("A","B","C","D","E"); System.out.println("LW的粉丝:"+operationsLW.members()); BoundSetOperations operationsLX = redisTemplate.boundSetOps("user:lx"); operationsLX.add("A","B","F","Z","H"); System.out.println("LX的粉丝:"+operationsLX.members()); //差集 Set lwSet = operationsLW.diff("user:lx"); System.out.println("lw的专属用户:"+lwSet); Set lxSet = operationsLX.diff("user:lw"); System.out.println("lx的专属用户:"+lxSet); //交集 Set intersectSet = operationsLW.intersect("user:lx"); System.out.println("同时关注的用户:"+intersectSet); Set union = operationsLW.union("user:lx"); //并集 System.out.println("两个人的并集:"+union); Boolean a = operationsLW.isMember("A"); System.out.println("用户A是否为lw的粉丝:"+a); } 8.SortedSet数据结构实战8.1.用户积分实时榜单(1)背景用户玩游戏-积分实时榜单IT视频热销实时榜单电商商品热销实时榜单一般的排行榜读多写少,可以对master进行写入操作做,然后多个slave进行读操作(2)对象准备public class UserPointVO { private String username; private String phone; public UserPointVO(String username, String phone) { this.username = username; this.phone = phone; } public UserPointVO() { } public String getUsername() { return username; } public void setUsername(String username) { this.username = username; } public String getPhone() { return phone; } public void setPhone(String phone) { this.phone = phone; } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; UserPointVO that = (UserPointVO) o; return Objects.equals(phone, that.phone); } @Override public int hashCode() { return Objects.hash(phone); } } @Test public void testData(){ UserPointVO p1 = new UserPointVO("老王","13113"); UserPointVO p2 = new UserPointVO("老A","324"); UserPointVO p3 = new UserPointVO("老B","242"); UserPointVO p4 = new UserPointVO("老C","542345"); UserPointVO p5 = new UserPointVO("老D","235"); UserPointVO p6 = new UserPointVO("老E","1245"); UserPointVO p7 = new UserPointVO("老F","2356432"); UserPointVO p8 = new UserPointVO("老G","532332"); BoundZSetOperations boundZSetOperations = redisTemplate.boundZSetOps("point:rank:real"); boundZSetOperations.add(p1,348); boundZSetOperations.add(p2,18); boundZSetOperations.add(p3,328); boundZSetOperations.add(p4,848); boundZSetOperations.add(p5,98); boundZSetOperations.add(p6,188); boundZSetOperations.add(p7,838); boundZSetOperations.add(p8,8828); } (3)接口开发返回榜单-从大到小排序 /** * 返回全部榜单从大到小 * @return */ @RequestMapping("/real-rank2") public JsonData rankList2(){ Set set = redisTemplate.boundZSetOps("point:rank:real").reverseRange(0, -1); return JsonData.buildSuccess(set); } 返回榜单-从小到大排序 /** * 返回全部榜单从小到大 * @return */ @RequestMapping("/real-rank1") public JsonData rankList1(){ Set range = redisTemplate.boundZSetOps("point:rank:real").range(0, -1); return JsonData.buildSuccess(range); } 查询个人用户排名 /** * 查询个人用户排名 * @param username * @param phone * @return */ @RequestMapping("find_my_rank") public JsonData find(String username,String phone){ UserPointVO userPointVO = new UserPointVO(username,phone); Long rank = redisTemplate.boundZSetOps("point:rank:real").reverseRank(userPointVO); return JsonData.buildSuccess(++rank); } 查看个人积分 /** * 查看个人积分 * @param username * @param phone * @return */ @RequestMapping("find_my_score") public JsonData findMyScore(String username,String phone){ UserPointVO userPointVO = new UserPointVO(username,phone); Double score = redisTemplate.boundZSetOps("point:rank:real").score(userPointVO); return JsonData.buildSuccess(score); } 个人加积分 /** * 加积分 * @param username * @param phone * @return */ @RequestMapping("add_score") public JsonData addScore(String username,String phone){ UserPointVO userPointVO = new UserPointVO(username,phone); redisTemplate.boundZSetOps("point:rank:real").incrementScore(userPointVO,1000000); return JsonData.buildSuccess(redisTemplate.boundZSetOps("point:rank:real").reverseRange(0,-1)); }
1.分布式缓存Redis6安装1.1.缓存和队列简介高并发必备两大“核心技术”(1)什么是队列(MQ消息中间件)全称:MessageQueue,主要用于程序与程序之间的通信(异步+解耦)。 核心应用: (1)解耦:订单系统->物流系统 (2)异步:用户注册同时发送优惠劵,和初始化操作 (3)削峰:秒杀、日志处理 (2)什么是缓存程序需要经常调用的数据放在内存中,因为内存中的响应非常快,使其快速调用,避免去数据库持久层去查。 主要就是提高性能 DNS缓存、前端缓存、代理缓存服务器Nginx、应用程序缓存、数据库缓存 1.2.本地缓存和分布式缓存介绍(1)分布式缓存与应用隔离的缓存组件或服务,与本地服务隔离的一个独立的缓存服务,多个服务可共享这一个缓存,多个节点共享缓存,需要考虑带宽。 常见的分布式缓存:Redis、Memcached (2)本地缓存和业务程序一起的缓存,例如mybatis的一二级缓存,只能由服务本身调用,不能多节点共享,不需要考虑带宽。 常见的本地缓存:guava、redis也可以做本地缓存、SpringCache (3)本地缓存和分布式缓存的选择结合业务去选择缓存,高并发的项目一般分布式缓存和本地缓存都存在。(4)热点key的解决方案热点key一般都放在本地缓存中,因为不需要带宽,效率很高,先去本地缓存中去查找,没有的话再去分布式缓存中查找。 应用:热点新闻、热卖商品、大V明星结婚1.3.Nosql和Redis简介什么是Redis其两者最重要的区别是NoSQL不使用SQL作为查询语言。NoSQL数据存储可以不需要固定的表格模式键 - 值对存储,列存储,文档存储,图形数据库NoSql:redis、memcached、mongodb、Hbase一个开源的使用 ANSI C 语言编写、遵守 BSD 协议、支持网络、可基于内存、分布式、可选持久性的键值对(Key-Value)存储数据库,并提供多种语言的 API 高性能:Redis能读的速度是110000次/s,写的速度是81000次/s 内存中的存储结构,它可以做为消息中间件、缓存、数据库。如:lists(列表)、hashs(散列)、sorted sets(有序集合)、sets(集合)、strings(字符串) 1.4.Linux源码安装Redis6(1)源码安装Redis上传到linux服务器(先安装升级gcc新版才能编译)#安装gcc yum install -y gcc-c++ autoconf automake #centos7默认的gcc是4.8.5版本,版本小于5.3无法编译,要先安装gcc新版才能编译 gcc -v(查看gcc当前版本) #升级新版gcc,配置永久生效 yum -y install centos-release-scl yum -y install devtoolset-9-gcc devtoolset-9-gcc-c++ devtoolset-9-binutils scl enable devtoolset-9 bash #从gcc4.8.5切换到gcc9编译器 echo "source /opt/rh/devtoolset-9/enable" >>/etc/profile #解压redis安装包 tar -xvf redis.6.2.1.tar.gz mv redis.6.2.1 redis6 #编译redis cd redis6 make #安装到指定的目录 mkdir -p /usr/local/redis make PREFIX = /usr/local/redis install 注意:安装编译redis6需要升级gcc,默认自带的gcc版本比较老redis-server:redis启动文件redis-cli:redis客户端redis.conf:redis配置文件1.5.Docker容器化部署Redis6云计算+容器化是当下的主流,也是未来的趋势, docker就是可 以快速部署启动应⽤ 实现虚拟化,完整资源隔离,⼀次编写,四处运⾏ 但有⼀定的限制,⽐如Docker是基于Linux 64bit的,⽆法在 32bit的linux/Windows/unix环境下使⽤ (1)Docker安装#安装并运行Docker yum install -y docker-io #启动docker systemctl start docker #检查安装结果 docker info #启动使用docker systemctl start docker #运行Docker守护进程 systemctl stop docker #停止Docker守护进程 systemctl restart docker#重启Docker守护进程 docker ps #查看容器 docker stop 容器id #停掉某个容器 #修改镜像文件 vim /etc/docker/daemon.json { "debug":true,"experimental":true, "registry-mirrors":["https://pb5bklzr.mirror.aliyuncs.com","https://hubmirror.c.163.com","https://docker.mirrors.ustc.edu.cn"] } (2)Docker部署redis并配置密码docker run -itd --name xdclass-redis -p 6379:6379 redis --requirepass 123456 -i:以交互模式运行容器,通常与-t同事使用。 -d:后台运行容器,并返回容器ID。 1.6.分布式缓存Redis6核心配置(1)redis.conf配置文件的核心配置daemonize yes #配置偶后台运行,默认是no bind ip号 #绑定指定ip访问,0.0.0.0是不限制,配置多个ip用空格隔开,bind 192.168.10.1 192.168.10.2 port 端口号 #端口号,默认6379 requirepass #密码配置 dbfilename #配置redis持久化文件名称 dir #配置redis持久化文件存储地址 save #配置redis持久化机制 (2)在redis安装目录创建log、data、conf目录日志:/usr/local/redis/log 数据:/usr/local/redis/data 配置文件:/usr/local/redis/conf(3)在/usr/local/redis/conf中创建自定义的配置文件touch /usr/local/redis/conf/redis.confvi /usr/local/redis/conf/redis.conf #任何ip都可以访问 bind 0.0.0.0 #守护进程 daemonize yes #密码 requirepass 123456 #日志文件 logfile "/usr/local/redis/log/redis.log" #持久化文件名称 dbfilename xdclass.rdb #持久化文件路径 dir "/usr/local/redis/data" #持久化策略,10s内有一个key改动,执行快照 save 10 1 (4)指定配置文件启动redis/usr/local/redis/bin/./redis-server /usr/local/redis/conf/redis.conf查看日志确定是否启动:tail -f /usr/local/redis/log/redis.log2.分布式缓存Redis6数据结构2.1.Redis6常见数据结构(1)exists 判断key是否存在exists name #判断name这个key是否存在(2)del 删除keydel name #删除name这个key(3)type 判断key的类型type name #判断name是什么类型(4)ttl 查看key的存活时间ttl name #判断name还有多长时间过期 ttl age #判断age还有多长时间过期 2.2.Redis6数据结构之String类型简介:存储字符串类型的key-value应用场景:验证码、计数器、订单重复提交、用户登录信息、商品详情常用命令:(1)set/get 设置和获取key-value设置key-value:set user:name lixiang 获取key:get user:name (2)incr 对指定key的value进行自增1incr user:age(3)incrby 对指定key的value进行+n操作incrby user:age 10(4)mget/mset 一次获取多个key值,一次设置多个key-value设置:mset user:addr tianjin user:phone 1333333333 获取:mget user:addr user:phone(5)setex 设置一个key-value带有过期时间的setex user:code 30 236589(6)setnx 当key不存在时,才设置key-value,key存在时,不做操作setnx user:name xxxxx2.3.Redis6数据结构之List类型简介:字符串列表,按照插入顺序排序,双向链表,插入删除时间复杂度为O(1)快,查找为O(n)慢。应用场景:简单队列、最新商品列表、评论列表、非实时排行榜常用命令:(1)lpush 将一个或者多个值插入到列表头部,从左边开始插入lpush phone:rank:daily iphone(2)rpush将一个或者多个值插入到列表尾部,从右边开始插入rpush phone:rank:daily xiaomi(3)lrange获取指定key下边的范围元素,0代表第一个,1代表第二个,-1代表最后一个lrange phone:rank:daily 0 -1(4)llen获取当前key的元素个数llen phone:rank:daily(5)lindex获取当前索引元素的值lindex phone:rank:daily 2(6)lpop从顶部弹出一个元素,从左边弹出lpop phone:rank:daily 1(7)rpop从底部弹出一个元素,从右边弹出rpop phone:rank:daily 1(8)lrem 删除一个元素,可以指定移除个数lrem word 2 a(9)brpop移除并且获取列表的最后一个元素,如果列表没有元素会阻塞设置的时长或者在规定的时间内弹出元素为止brpop word 102.4.Redis6数据结构之Hash类型简介:Hash是一个string类型的field和value的映射表,hash特别适用于存储对象。应用场景:购物车存储、用户个人信息存储、商品详情存储注意:每个hash可以存储2[^32] -1 键值对(40多亿)常用命令:(1)hset/hget 设置和获取key中指定字段的值设置key-value:hset product:daily:1 title iphone 获取key:hset product:daily:1 title iphone (2)hdel 删除指定key的指定字段hdel product:daily:1 title(3)hmset/hmget 一次设置和获取多个key中指定字段的值hmget product:daily:1 title color(4)hgetall 获取指定key的全部字段的值hgetall product:daily:1(5)hincrby 对指定key的指定字段进行+n操作(n可以为正数也可以为负数)hincrby product:daily:1 price 100(6)hexists 判断指定key的指定字段是否存在hexits product:daily:1 color2.5.Redis6数据结构之Set类型简介:将一个或者多个成员元素加入到集合中,已经存在的成员元素将被忽略应用场景:去重、社交应用关注、粉丝、共同好友、大数据里面用户画像标签常用命令:(1)sadd添加一个或者多个指定的member元素到集合中,若集合中已存在元素,将被忽略sadd user:tags:1 woman bwn 18-25 beijing(2)smembers 获取当前集合中的所有元素smembers user:tags:1(3)srem 删除集合中某个元素srem user:tags:1 bwn(4)scard 返回集合中所有元素的个数scard user:tags:1(5)sismember 返回集合中是否存在当前元素slsmember user:tags:1 bwn(6)sdiff 返回两个集合中的差集sdiff user:tags:2 user:tags:1(7)sinter返回两个集合中的交集siner user:tags:2 user:tags:1(8)sunion 返回两个集合的并集sunion user:tags:2 user:tags:12.6.Redis6数据结构之SortedSet类型简介:用于将一个或者多个成员元素及分数值加入到有序集合中应用场景:实时排行榜:商品热销榜、体育类应用热门球队、积分榜、优先级任务、朋友圈 文章点赞-取消注意:底层采用Ziplist压缩列表和“跳跃表”两种存储结构,如果重复添加相同的元素数据,score值将被覆盖,保留最后一次修改的结果。常用命令:(1)zadd向有序集合中添加一个或者多个成员,或者更新已经存在的成员的分数zadd video:rank 90 springcloud 80 springBoot 70 nginx 60 html 50 javascript 40 linux (2)zcard获取有序集合中成员数zcard video:rank(3)zcount计算指定分数区间的成员的个数zcount video:rank 0 50(4)zincrby 有序集合中对指定的成员的分数+n(n可以为正数也可以为负数)zincrby video:rank 5 linux(5)zrange 返回索引区间的所有元素,从小到大zrange video:rank 0 -1(6)zrevrange返回索引区间的所有元素,从大到小zrevrange video:rank 0 -1(7)zrank 返回有序集合中指定成员的索引,从小到大返回zrank video:rank 1(8)zrevrank返回有序集合中指定成员的索引,从大到小排序zrevrank video:rank 1(9)zrem移除有序集合中的一个或者多个成员zrem video:rank linux(10)zscore返回有序集合中,成员的分数值zscore video:rank springBoot3.SpringBoot2.x整合Redis6客户端3.1.分布式缓存Redis客户端讲解自带客户端:redis-clijava语言客户端jedisJedis是直连模式,在多个线程间共享一个Jedis时是线程不安全的,需要使用连接池 其API提供了比较全面的Redis命令支持,相比其他Redis封装框架更加原生 Jedis中的方法调用是比较底层的暴漏的Redis的API,Java方法基本和Redis的API保持这一致 使用阻塞的I/O,方法调用同步,程序流需要等到socket处理完I/O才能执行,不支持异步操作 lettuce高级Redis客户端,用于线程安全同步,异步响应 基于Netty的的事件驱动,可以在多个线程间并发访问, 通过异步的方式可以更好的利用系统资源 3.2.新版SpringBoot2.x项目创建(1)相关软件环境JDK1.8+以上Maven3.5+(2)加入依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> 注意:springBoot2后默认使用Lettuce作为redis的客户端旧版本的Lettuce踩在堆外内存溢出的bug,5.3版本修复了 这个bug。解决:升级版本或者换jedis(3)SpringDataRedis的RedisTemplate介绍RedisTemplate介绍ValueOperations:简单K-V操作SetOperations:set类型数据操作ZSetOperations:zset类型数据操作HashOperations:针对map类型的数据操作ListOperations:针对List类型的数据操作RedisTemplate和StringRedisTemplate的区别StringRedisTemplate继承RedisTemplate两者的数据是不互通的(默认的序列化机制导致key不一样)RedisTemplate默认采用的是JDK的序列化策略,会将数据先序列化成字节数组在存入Redis中总结:当Redis数据库里面操作的都是字符串数据的时候,那使用StringRedisTemplate即可数据是复杂的对象类型,那么使用RedisTemplate是更好的选择(4)Redis序列化和反序列化机制同个key为啥获取不到值呢,核心就是序列化机制导致key值不一致什么是序列化把对象转化为字节序列的过程就称为对象的序列化把字节序列恢复成对象的过程就是反序列化对象字节序列化主要有两种用途把对象的字节序列永久的保存在硬盘上,通常放在一个文件上在网络上传输对象的字节序列(5)Redis为什么要序列化性能可以提高,不同的序列化方式性能不一样可视化工具更好查看采用默认的jdk方式会乱码(POJO类需要实现Serializable接口)采用JSON方式则不用,且可视化工具更好的查看(6)自定义Redis序列化方式,可以选择多种选择策略JdkSerializationRedisSerializerPOJO对象的存存储场景,使用JDK本身序列化机制默认机制ObjectInputStream/ObjectOutputStream进行序列化操作StringRedisSerializerkey或者value为字符串Jackson2JsonRedisSerializer利用jackson-json工具,将POJO实例序列化成json格式存储GenericFastJsonRedisSerializer另一种javabean与json之间的转化,同时也需要指定Class类型3.3.RedisTemplate序列化机制配置@Configuration public class RedisTemplateConfiguration { /** * 自定义redisTemplate配置 * @param redisConnectionFactory * @return */ @Bean public RedisTemplate<Object,Object> redisTemplate(RedisConnectionFactory redisConnectionFactory){ //默认使用JDK的序列化方式 RedisTemplate<Object, Object> redisTemplate = new RedisTemplate<>(); redisTemplate.setConnectionFactory(redisConnectionFactory); //使用Jackson2JsonRedisSerialize替换默认序列化 Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class); ObjectMapper objectMapper = new ObjectMapper(); //指定要序列化的域,field,get和set,以及修饰符范围,ANY是都有包括private和public objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); jackson2JsonRedisSerializer.setObjectMapper(objectMapper); //设置Key和Value的序列化规则 redisTemplate.setKeySerializer(new StringRedisSerializer()); redisTemplate.setValueSerializer(jackson2JsonRedisSerializer); //设置hashKey和hashValue的的序列化规则 redisTemplate.setHashKeySerializer(new StringRedisSerializer()); redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer); //设置支持事务 //redisTemplate.setEnableTransactionSupport(true); //初始化RedisTemplate redisTemplate.afterPropertiesSet(); return redisTemplate; } } 3.4.Jedis+Lettuce客户端连接池配置(1)lettuce连接池 <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> </dependency> (2)配置application.properties#连接池最大连接数(使用负数表示没有限制) spring.redis.lettuce.pool.max-active = 10 #连接池中的最大空闲连接 spring.redis.lettuce.pool.max-idle = 10 #连接池中的最小空闲连接 spring.redis.lettuce.pool.min-idle = 0 #连接池最大阻塞等待时间(使用负数表示没有限制) spring.redis.lettuce.pool.max-wait = -1ms #指定客户端 spring.redis.client-type = lettuce (3)配置application.ymlserver: port: 8080 spring: redis: host: 8.140.116.67 port: 6379 password: 123456 client-type: jedis lettuce: pool: #连接池的最大连接数(负数表示没有限制) max-active: 10 #连接池中的最大空闲连接 max-idle: 10 #连接池的最小空闲连接 min-idle: 0 #连接池最大阻塞的等待时间(负数表示没有限制) max-wait: -1ms jedis: pool: #连接池的最大连接数(负数表示没有限制) max-active: 10 #连接池中的最大空闲连接 max-idle: 10 #连接池的最小空闲连接 min-idle: 0 #连接池最大阻塞的等待时间(负数表示没有限制) max-wait: -1ms (4)Jedis连接池介绍(可以不排除lettuce依赖包) <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> <!-- 可以不排除lettuce依赖包 --> <exclusions> <exclusion> <groupId>io.lettuce</groupId> <artifactId>lettuce-core</artifactId> </exclusion> </exclusions> </dependency> <!--不用指定版本号,本身spring-data-redis里面有--> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> </dependency>
2023年05月