读数据
Openresty(lua+nginx)-Guava-Redis做多级缓存
- 开辟一块内存空间
在Nginx配置文件conf/nginx.conf中开启了一个内存大小为128M的内存空间,用来存储缓存数据;
- 定义内存字典业务实现lua脚本memory_shared_dic_java.lua(添加缓存,获取缓存)
--- --- 基于lua+Openresty实现内存字典,直接访问redis缓存 --- 引入Redis lua库文件 local red = require "resty.redis" --- 调用new方法,获取redis对象 local redis = red:new() -- local password = "123456"; -- 设置一下redis密码 redis:auth(password) redis:select(0) --- 1、实现缓存的添加操作 function set_to_cache(key,value,expr) if not expr then expr = 0 end --- 获取本地内存字典对象 local ngx_cache = ngx.shared.ngx_cache --- 向内存字典中添加缓存数据 local succ,err,forcible = ngx_cache:set(key,value,expr) return succ end --- 2、从内存字典中获取缓存数据 function get_from_cache(key) --- 获取本地内存字典对象 local ngx_cache = ngx.shared.ngx_cache --- 从内存字典中获取数据缓存 local res = ngx_cache:get(key) -- 如果内存字典缓存不存在 if not res then -- 查询redis缓存数据 local rev,err = get_from_redis(key) if not rev then ngx.say("redis cache not exists",err) return end -- 添加到本地内存字典 set_to_cache(key,rev,60) end return res end --- 向redis添加缓存数据 function set_to_redis(key,value) -- 设置连接Redis的超时时间 redis:set_timeout(10000) -- 连接redis服务器 local ok,err = redis:connect("172.17.61.90",6379) -- 判断连接redis服务是否成功 if not ok then ngx.say("failed to connect:",err) return end -- 如果连接成功,向redis添加缓存数据 local succ,err = redis:set(key,value) if not succ then ngx.say("failed set redis:",err) return end return succ end -- 从Redis中获取缓存数据 function get_from_redis(key) -- 设置连接Redis的超时时间 redis:set_timeout(10000) -- 连接redis服务器 local ok,err = redis:connect("172.17.61.90",6379) -- 判断连接redis服务是否成功 if not ok then ngx.say("failed to connect:",err) return end -- 从Redis中获取缓存数据 local succ,err = redis:get(key) if not succ then ngx.say("failed get redis:",err) return end ngx.say("get cache from redis......") return succ end --- 3、内存字典缓存响应业务实现 --- 获取请求参数 local params = ngx.req.get_uri_args() --- 获取参数属性值 local id = params.id --- 从内存字典中获取数据缓存 local goods = get_from_cache("seckill_goods_"..id) --- 判断内存字典中是否存在缓存数据,如果不存在,将会去查询后端服务数据 if goods == nil then -- 从后端服务查询数据 local result = ngx.location.capture("/seckill/goods/detail/"..id) goods = result.body -- 向内存字典中添加缓存数据 set_to_cache("seckill_goods_"..id,goods,60) end -- 输出结果 ngx.say(goods)
这个lua脚本写好后,上传到Nginx/conf同级目录lua目录下,并且在nginx.conf文件中配置,然后sbin/nginx -s reload生效:
3. java中代码
添加guava依赖
<dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>18.0</version> </dependency>
@Autowired private SeckillGoodsMapper seckillGoodsMapper; //注入redistemplate对象 @Autowired private RedisTemplate redisTemplate; //注入guva缓存对象 @Autowired private Cache<String,Object> guavaCahce; public TbSeckillGoods findOneByCache(Integer id){ //1、先从jvm堆缓存中读取数据,使用guva缓存 TbSeckillGoods seckillGoods = (TbSeckillGoods) guavaCahce.getIfPresent("seckill_goods_"+id); //判断jvm堆内缓存是否存在 if(seckillGoods == null){ //2、从分布式缓存中查询 seckillGoods = (TbSeckillGoods) redisTemplate.opsForValue().get("seckill_goods_"+id); //判断 if(seckillGoods == null){ //3、直接从数据库查询 seckillGoods = seckillGoodsMapper.selectByPrimaryKey(id); if(seckillGoods != null && seckillGoods.getStatus() == 1){ //添加缓存 redisTemplate.opsForValue().set("seckill_goods_"+id,seckillGoods,1,TimeUnit.HOURS); } } //添加guava缓存 guavaCahce.put("seckill_goods_"+id,seckillGoods); } //如果缓存存在,返回Redis缓存 return seckillGoods; }
写数据
完整代码地址:https://gitee.com/zhushangling/miaosha
controller
@Controller("order") @RequestMapping("/order") @CrossOrigin(origins = {"*"},allowCredentials = "true") public class OrderController extends BaseController { @Autowired private OrderService orderService; @Autowired private HttpServletRequest httpServletRequest; @Autowired private RedisTemplate redisTemplate; @Autowired private MqProducer mqProducer; @Autowired private ItemService itemService; @Autowired private PromoService promoService; private ExecutorService executorService; private RateLimiter orderCreateRateLimiter; @PostConstruct public void init(){ // executorService = Executors.newFixedThreadPool(20); executorService = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), 100, 100,TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setName("client-transaction-msg-check-thread"); return thread; } }); orderCreateRateLimiter = RateLimiter.create(300.0); } //生成验证码 @RequestMapping(value = "/generateverifycode",method = {RequestMethod.GET,RequestMethod.POST}) @ResponseBody public void generateverifycode(HttpServletResponse response) throws BusinessException, IOException { String token = httpServletRequest.getParameterMap().get("token")[0]; if(StringUtils.isEmpty(token)){ throw new BusinessException(EmBusinessError.USER_NOT_LOGIN,"用户还未登陆,不能生成验证码"); } UserModel userModel = (UserModel) redisTemplate.opsForValue().get(token); if(userModel == null){ throw new BusinessException(EmBusinessError.USER_NOT_LOGIN,"用户还未登陆,不能生成验证码"); } Map<String,Object> map = CodeUtil.generateCodeAndPic(); redisTemplate.opsForValue().set("verify_code_"+userModel.getId(),map.get("code")); redisTemplate.expire("verify_code_"+userModel.getId(),10,TimeUnit.MINUTES); ImageIO.write((RenderedImage) map.get("codePic"), "jpeg", response.getOutputStream()); } //生成秒杀令牌 @RequestMapping(value = "/generatetoken",method = {RequestMethod.POST},consumes={CONTENT_TYPE_FORMED}) @ResponseBody public CommonReturnType generatetoken(@RequestParam(name="itemId")Integer itemId, @RequestParam(name="promoId")Integer promoId, @RequestParam(name="verifyCode")String verifyCode) throws BusinessException { //根据token获取用户信息 String token = httpServletRequest.getParameterMap().get("token")[0]; if(StringUtils.isEmpty(token)){ throw new BusinessException(EmBusinessError.USER_NOT_LOGIN,"用户还未登陆,不能下单"); } //获取用户的登陆信息 UserModel userModel = (UserModel) redisTemplate.opsForValue().get(token); if(userModel == null){ throw new BusinessException(EmBusinessError.USER_NOT_LOGIN,"用户还未登陆,不能下单"); } //通过verifycode验证验证码的有效性 String redisVerifyCode = (String) redisTemplate.opsForValue().get("verify_code_"+userModel.getId()); if(StringUtils.isEmpty(redisVerifyCode)){ throw new BusinessException(EmBusinessError.PARAMETER_VALIDATION_ERROR,"请求非法"); } if(!redisVerifyCode.equalsIgnoreCase(verifyCode)){ throw new BusinessException(EmBusinessError.PARAMETER_VALIDATION_ERROR,"请求非法,验证码错误"); } //获取秒杀访问令牌 String promoToken = promoService.generateSecondKillToken(promoId,itemId,userModel.getId()); if(promoToken == null){ throw new BusinessException(EmBusinessError.PARAMETER_VALIDATION_ERROR,"生成令牌失败"); } //返回对应的结果 return CommonReturnType.create(promoToken); } //封装下单请求 @RequestMapping(value = "/createorder",method = {RequestMethod.POST},consumes={CONTENT_TYPE_FORMED}) @ResponseBody public CommonReturnType createOrder(@RequestParam(name="itemId")Integer itemId, @RequestParam(name="amount")Integer amount, @RequestParam(name="promoId",required = false)Integer promoId, @RequestParam(name="promoToken",required = false)String promoToken) throws BusinessException { // 限流 boolean tryAcquire = orderCreateRateLimiter.tryAcquire(); if(!tryAcquire){ throw new BusinessException(EmBusinessError.RATELIMIT); } String token = httpServletRequest.getParameterMap().get("token")[0]; Map map = httpServletRequest.getParameterMap(); if(StringUtils.isEmpty(token)){ throw new BusinessException(EmBusinessError.USER_NOT_LOGIN,"用户还未登陆,不能下单"); } //获取用户的登陆信息 UserModel userModel = (UserModel) redisTemplate.opsForValue().get(token); if(userModel == null){ throw new BusinessException(EmBusinessError.USER_NOT_LOGIN,"用户还未登陆,不能下单"); } //校验秒杀令牌是否正确 if(promoId != null){ String inRedisPromoToken = (String) redisTemplate.opsForValue().get("promo_token_"+promoId+"_userid_"+userModel.getId()+"_itemid_"+itemId); if(inRedisPromoToken == null){ throw new BusinessException(EmBusinessError.PARAMETER_VALIDATION_ERROR,"秒杀令牌校验失败"); } if(!org.apache.commons.lang3.StringUtils.equals(promoToken,inRedisPromoToken)){ throw new BusinessException(EmBusinessError.PARAMETER_VALIDATION_ERROR,"秒杀令牌校验失败"); } } //同步调用线程池的submit方法 //拥塞窗口为20的等待队列,用来队列化泄洪 Future<Object> future = executorService.submit(new Callable<Object>() { @Override public Object call() throws Exception { //每次秒杀,生成一条流水,加入库存流水init状态,这是为了MqProducer类中checkLocalTransaction方法 查看扣减库存成功与否 String stockLogId = itemService.initStockLog(itemId,amount); //再去完成对应的下单事务型消息机制 if(!mqProducer.transactionAsyncReduceStock(userModel.getId(),itemId,promoId,amount,stockLogId)){ throw new BusinessException(EmBusinessError.UNKNOWN_ERROR,"下单失败"); } return null; } }); try { future.get(); } catch (InterruptedException e) { throw new BusinessException(EmBusinessError.UNKNOWN_ERROR); } catch (ExecutionException e) { throw new BusinessException(EmBusinessError.UNKNOWN_ERROR); } return CommonReturnType.create(null); } }
MqProducer
@Component public class MqProducer { private DefaultMQProducer producer; // 默认生产者 private TransactionMQProducer transactionMQProducer; // 事务型生产者 @Value("${mq.nameserver.addr}") private String nameAddr; @Value("${mq.topicname}") private String topicName; @Autowired private OrderService orderService; @Autowired private StockLogDOMapper stockLogDOMapper; @PostConstruct public void init() throws MQClientException { //做mq producer的初始化 producer = new DefaultMQProducer("producer_group"); producer.setNamesrvAddr(nameAddr); producer.start(); transactionMQProducer = new TransactionMQProducer("transaction_producer_group"); transactionMQProducer.setNamesrvAddr(nameAddr); transactionMQProducer.start(); transactionMQProducer.setTransactionListener(new TransactionListener() { // 第2步执行 @Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { //真正要做的事 创建订单 //arg的值来自 transactionAsyncReduceStock方法中的argsMap Integer itemId = (Integer) ((Map)arg).get("itemId"); Integer promoId = (Integer) ((Map)arg).get("promoId"); Integer userId = (Integer) ((Map)arg).get("userId"); Integer amount = (Integer) ((Map)arg).get("amount"); String stockLogId = (String) ((Map)arg).get("stockLogId"); try { orderService.createOrder(userId,itemId,promoId,amount,stockLogId); } catch (Exception e) { e.printStackTrace(); //设置对应的stockLog为回滚状态3,表示下单失败 StockLogDO stockLogDO = stockLogDOMapper.selectByPrimaryKey(stockLogId); stockLogDO.setStatus(3); stockLogDOMapper.updateByPrimaryKeySelective(stockLogDO); return LocalTransactionState.ROLLBACK_MESSAGE; } return LocalTransactionState.COMMIT_MESSAGE; } // 第2步执行出错后,没有明确返回COMMIT_MESSAGE或ROLLBACK_MESSAGE的时候 执行 @Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { //根据是否扣减库存成功,来判断要返回COMMIT,ROLLBACK还是继续UNKNOWN String jsonString = new String(msg.getBody()); Map<String,Object>map = JSON.parseObject(jsonString, Map.class); String stockLogId = (String) map.get("stockLogId"); // 查询操作流水,看这笔交易是否成功 StockLogDO stockLogDO = stockLogDOMapper.selectByPrimaryKey(stockLogId); if(stockLogDO == null){ return LocalTransactionState.UNKNOW; } if(stockLogDO.getStatus().intValue() == 2){ return LocalTransactionState.COMMIT_MESSAGE; }else if(stockLogDO.getStatus().intValue() == 1){ return LocalTransactionState.UNKNOW; } return LocalTransactionState.ROLLBACK_MESSAGE; } }); } //事务型同步库存扣减消息 第1步执行 public boolean transactionAsyncReduceStock(Integer userId,Integer itemId,Integer promoId,Integer amount,String stockLogId){ Map<String,Object> bodyMap = new HashMap<>(); bodyMap.put("itemId",itemId); bodyMap.put("amount",amount); bodyMap.put("stockLogId",stockLogId); Map<String,Object> argsMap = new HashMap<>(); argsMap.put("itemId",itemId); argsMap.put("amount",amount); argsMap.put("userId",userId); argsMap.put("promoId",promoId); argsMap.put("stockLogId",stockLogId); Message message = new Message(topicName,"increase", JSON.toJSON(bodyMap).toString().getBytes(Charset.forName("UTF-8"))); TransactionSendResult sendResult = null; try { //这里的artsMap,会传到 executeLocalTransaction(Message msg, Object arg)中的arg sendResult = transactionMQProducer.sendMessageInTransaction(message,argsMap); } catch (MQClientException e) { e.printStackTrace(); return false; } if(sendResult.getLocalTransactionState() == LocalTransactionState.ROLLBACK_MESSAGE){ return false; }else if(sendResult.getLocalTransactionState() == LocalTransactionState.COMMIT_MESSAGE){ return true; }else{ return false; } } }
MqConsumer
@Component public class MqConsumer { private DefaultMQPushConsumer consumer; @Value("${mq.nameserver.addr}") private String nameAddr; @Value("${mq.topicname}") private String topicName; @Autowired private ItemStockDOMapper itemStockDOMapper; @PostConstruct public void init() throws MQClientException { consumer = new DefaultMQPushConsumer("stock_consumer_group"); consumer.setNamesrvAddr(nameAddr); consumer.subscribe(topicName,"*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { //实现库存真正到数据库内扣减的逻辑 Message msg = msgs.get(0); String jsonString = new String(msg.getBody()); Map<String,Object>map = JSON.parseObject(jsonString, Map.class); Integer itemId = (Integer) map.get("itemId"); Integer amount = (Integer) map.get("amount"); itemStockDOMapper.decreaseStock(itemId,amount); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); } }
OrderServiceImpl
@Service public class OrderServiceImpl implements OrderService { @Autowired private SequenceDOMapper sequenceDOMapper; @Autowired private ItemService itemService; @Autowired private UserService userService; @Autowired private OrderDOMapper orderDOMapper; @Autowired private StockLogDOMapper stockLogDOMapper; @Override @Transactional public OrderModel createOrder(Integer userId, Integer itemId, Integer promoId, Integer amount,String stockLogId) throws Exception { //1.校验下单状态,下单的商品是否存在,用户是否合法,购买数量是否正确 //ItemModel itemModel = itemService.getItemById(itemId); ItemModel itemModel = itemService.getItemByIdInCache(itemId); if(itemModel == null){ throw new BusinessException(EmBusinessError.PARAMETER_VALIDATION_ERROR,"商品信息不存在"); } if(amount <= 0 || amount > 99){ throw new BusinessException(EmBusinessError.PARAMETER_VALIDATION_ERROR,"数量信息不正确"); } //2.落单减库存 boolean result = itemService.decreaseStock(itemId,amount); if(!result){ throw new BusinessException(EmBusinessError.STOCK_NOT_ENOUGH); } //3.订单入库 OrderModel orderModel = new OrderModel(); orderModel.setUserId(userId); orderModel.setItemId(itemId); orderModel.setAmount(amount); if(promoId != null){ orderModel.setItemPrice(itemModel.getPromoModel().getPromoItemPrice()); }else{ orderModel.setItemPrice(itemModel.getPrice()); } orderModel.setPromoId(promoId); orderModel.setOrderPrice(orderModel.getItemPrice().multiply(new BigDecimal(amount))); //生成交易流水号,订单号 orderModel.setId(generateOrderNo()); OrderDO orderDO = convertFromOrderModel(orderModel); orderDOMapper.insertSelective(orderDO); //加上商品的销量 itemService.increaseSales(itemId,amount); //设置库存流水状态为成功 StockLogDO stockLogDO = stockLogDOMapper.selectByPrimaryKey(stockLogId); if(stockLogDO == null){ throw new BusinessException(EmBusinessError.UNKNOWN_ERROR); } stockLogDO.setStatus(2); stockLogDOMapper.updateByPrimaryKeySelective(stockLogDO); // int i = 10/0; //4.返回前端 return orderModel; } // @Transactional(propagation = Propagation.REQUIRES_NEW) private String generateOrderNo(){ //订单号有16位 StringBuilder stringBuilder = new StringBuilder(); //前8位为时间信息,年月日 LocalDateTime now = LocalDateTime.now(); String nowDate = now.format(DateTimeFormatter.ISO_DATE).replace("-",""); stringBuilder.append(nowDate); //中间6位为自增序列 //获取当前sequence int sequence = 0; SequenceDO sequenceDO = sequenceDOMapper.getSequenceByName("order_info"); sequence = sequenceDO.getCurrentValue(); sequenceDO.setCurrentValue(sequenceDO.getCurrentValue() + sequenceDO.getStep()); sequenceDOMapper.updateByPrimaryKeySelective(sequenceDO); String sequenceStr = String.valueOf(sequence); for(int i = 0; i < 6-sequenceStr.length();i++){ stringBuilder.append(0); } stringBuilder.append(sequenceStr); //最后2位为分库分表位,暂时写死 stringBuilder.append("00"); return stringBuilder.toString(); } private OrderDO convertFromOrderModel(OrderModel orderModel){ if(orderModel == null){ return null; } OrderDO orderDO = new OrderDO(); BeanUtils.copyProperties(orderModel,orderDO); orderDO.setItemPrice(orderModel.getItemPrice().doubleValue()); orderDO.setOrderPrice(orderModel.getOrderPrice().doubleValue()); return orderDO; } }
StockLogDOMapper
public interface StockLogDOMapper { /** * This method was generated by MyBatis Generator. * This method corresponds to the database table stock_log * * @mbg.generated Mon Feb 25 23:42:11 CST 2019 */ int deleteByPrimaryKey(String stockLogId); /** * This method was generated by MyBatis Generator. * This method corresponds to the database table stock_log * * @mbg.generated Mon Feb 25 23:42:11 CST 2019 */ int insert(StockLogDO record); /** * This method was generated by MyBatis Generator. * This method corresponds to the database table stock_log * * @mbg.generated Mon Feb 25 23:42:11 CST 2019 */ int insertSelective(StockLogDO record); /** * This method was generated by MyBatis Generator. * This method corresponds to the database table stock_log * * @mbg.generated Mon Feb 25 23:42:11 CST 2019 */ StockLogDO selectByPrimaryKey(String stockLogId); /** * This method was generated by MyBatis Generator. * This method corresponds to the database table stock_log * * @mbg.generated Mon Feb 25 23:42:11 CST 2019 */ int updateByPrimaryKeySelective(StockLogDO record); /** * This method was generated by MyBatis Generator. * This method corresponds to the database table stock_log * * @mbg.generated Mon Feb 25 23:42:11 CST 2019 */ int updateByPrimaryKey(StockLogDO record); }
xml
<?xml version="1.0" encoding="UTF-8"?> <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd"> <mapper namespace="com.imooc.miaoshaproject.dao.StockLogDOMapper"> <resultMap id="BaseResultMap" type="com.imooc.miaoshaproject.dataobject.StockLogDO"> <id column="stock_log_id" jdbcType="VARCHAR" property="stockLogId" /> <result column="item_id" jdbcType="INTEGER" property="itemId" /> <result column="amount" jdbcType="INTEGER" property="amount" /> <result column="status" jdbcType="INTEGER" property="status" /> </resultMap> <sql id="Base_Column_List"> stock_log_id, item_id, amount, status </sql> <select id="selectByPrimaryKey" parameterType="java.lang.String" resultMap="BaseResultMap"> select <include refid="Base_Column_List" /> from stock_log where stock_log_id = #{stockLogId,jdbcType=VARCHAR} </select> <delete id="deleteByPrimaryKey" parameterType="java.lang.String"> delete from stock_log where stock_log_id = #{stockLogId,jdbcType=VARCHAR} </delete> <insert id="insert" parameterType="com.imooc.miaoshaproject.dataobject.StockLogDO"> insert into stock_log (stock_log_id, item_id, amount, status) values (#{stockLogId,jdbcType=VARCHAR}, #{itemId,jdbcType=INTEGER}, #{amount,jdbcType=INTEGER}, #{status,jdbcType=INTEGER}) </insert> <insert id="insertSelective" parameterType="com.imooc.miaoshaproject.dataobject.StockLogDO"> insert into stock_log <trim prefix="(" suffix=")" suffixOverrides=","> <if test="stockLogId != null"> stock_log_id, </if> <if test="itemId != null"> item_id, </if> <if test="amount != null"> amount, </if> <if test="status != null"> status, </if> </trim> <trim prefix="values (" suffix=")" suffixOverrides=","> <if test="stockLogId != null"> #{stockLogId,jdbcType=VARCHAR}, </if> <if test="itemId != null"> #{itemId,jdbcType=INTEGER}, </if> <if test="amount != null"> #{amount,jdbcType=INTEGER}, </if> <if test="status != null"> #{status,jdbcType=INTEGER}, </if> </trim> </insert> <update id="updateByPrimaryKeySelective" parameterType="com.imooc.miaoshaproject.dataobject.StockLogDO"> update stock_log <set> <if test="itemId != null"> item_id = #{itemId,jdbcType=INTEGER}, </if> <if test="amount != null"> amount = #{amount,jdbcType=INTEGER}, </if> <if test="status != null"> status = #{status,jdbcType=INTEGER}, </if> </set> where stock_log_id = #{stockLogId,jdbcType=VARCHAR} </update> <update id="updateByPrimaryKey" parameterType="com.imooc.miaoshaproject.dataobject.StockLogDO"> update stock_log set item_id = #{itemId,jdbcType=INTEGER}, amount = #{amount,jdbcType=INTEGER}, status = #{status,jdbcType=INTEGER} where stock_log_id = #{stockLogId,jdbcType=VARCHAR} </update> </mapper>