♨️本篇文章记录的为RabbitMQ知识中
企业级项目
中秒杀相关内容,适合在学Java的小白,帮助新手快速上手,也适合复习中,面试中的大佬🙉🙉🙉。
♨️如果文章有什么需要改进的地方还请大佬不吝赐教❤️🧡💛💖个人主页 : 阿千弟
💖点击这里👉👉👉: RabbitMQ专栏学习
上期我们使用阻塞队列和分布式锁Redission分布式锁对业务功能进行优化,解决了在分布式环境下的秒杀安全问题,但是呢阻塞队列的确不够优雅, 而且还存在很多问题, 比如说这时候突然停电了,我们队列里的消息没来得及被消费就消失了.这就要出大问题.
现在我们要用一种更优雅的方式实现异步下单, 今天我们的主角就是
RabbitMQ
.@[TOC]
🐇思路分析
- 我们先把MYSQL数据库中的订单数量在redis中保存一份.
- 在秒杀的业务中我们还是先执行Lua脚本, 判断我们是否具有购买资格, 如果没有购买资格我们直接返回结果;
- 如果有有购买资格,Lua脚本会直接进行redis中的订单扣减操作,进行秒杀
- 同时在redis的扣减操作完成后, 把下单信息保存到阻塞队列进行异步执行, 实现MYSQL数据库中订单扣减的同步操作,完美.
🐇代码操作
思路听起来很简单,下面我们来进行具体操作.
先引入RabbitMQ的核心依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
在yml文件中进行相关配置
spring:
rabbitmq:
host: localhost
port: 5672
virtual-host: /
username: guest
password: guest
publisher-confirm-type: correlated
publisher-returns: true
template:
mandatory: true
retry:
#发布重试,默认false
enabled: true
#重试时间 默认1000ms
initial-interval: 1000
#重试最大次数 最大3
max-attempts: 3
#重试最大间隔时间
max-interval: 10000
#重试的时间隔乘数,比如配2,0 第一次等于10s,第二次等于20s,第三次等于40s
multiplier: 1
listener:
# 默认配置是simple
type: simple
simple:
# 手动ack Acknowledge mode of container. auto none
acknowledge-mode: manual
#消费者调用程序线程的最小数量
concurrency: 10
#消费者最大数量
max-concurrency: 10
#限制消费者每次只处理一条信息,处理完在继续下一条
prefetch: 1
#启动时是否默认启动容器
auto-startup: true
#被拒绝时重新进入队列
default-requeue-rejected: true
配置RabbitMQConfig文件
@Slf4j
@Configuration
public class RabbitMQConfig {
@Bean("RabbitTemplate")
public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
//设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
rabbitTemplate.setMandatory(true);
//设置Json转换器
rabbitTemplate.setMessageConverter(jsonMessageConverter());
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("ConfirmCallback: "+"相关数据:"+correlationData);
log.info("ConfirmCallback: "+"确认情况:"+ack);
log.info("ConfirmCallback: "+"原因:"+cause);
}
});
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
log.info("return 执行了....");
log.info("ReturnCallback: "+"消息:"+returnedMessage.getMessage().toString());
log.info("ReturnCallback: "+"回应码:"+returnedMessage.getReplyCode());
log.info("ReturnCallback: "+"回应信息:"+returnedMessage.getReplyText());
log.info("ReturnCallback: "+"交换机:"+returnedMessage.getExchange());
log.info("ReturnCallback: "+"路由键:"+returnedMessage.getRoutingKey());
}
});
return rabbitTemplate;
}
/**
* Json转换器
*/
@Bean
public Jackson2JsonMessageConverter jsonMessageConverter()
{
return new Jackson2JsonMessageConverter();
}
//交换机名称
public static final String SECKILL_EXCHANGE ="seckill_exchange";
//队列名称
public static final String ORDER_QUEUE ="order_queue";
@Bean("ORDER_QUEUE")
Queue confirmQueue(){
return QueueBuilder.durable(ORDER_QUEUE).build();
}
/**
* 创建一个交换机
* @return
*/
@Bean("SECKILL_EXCHANGE")
Exchange confirmExchange(){
return ExchangeBuilder.topicExchange(SECKILL_EXCHANGE).durable(true).build();
}
@Bean
Binding confirmExchange(@Qualifier("SECKILL_EXCHANGE") Exchange exchange,
@Qualifier("ORDER_QUEUE") Queue queue){
//bind(queue) 绑定队列 to(exchange) 绑定交换机 with("") routingKey这里绑定的是空白可以按照自己的要求绑定 noargs() 没有参数
return BindingBuilder.bind(queue).to(exchange).with("order.#").noargs();
}
}
补充 : RabbitMQ的Json转换器尤为重要, 因为rabbitMQ中发送和接收的都是字符串/字节数组类型的消息, 所以我们要把java对象转成json串
进行发送.
Lua脚本
-- 1.参数列表
-- 1.1.优惠券id
local voucherId = ARGV[1]
-- 1.2.用户id
local userId = ARGV[2]
-- 2.数据key
-- 2.1.库存key
local stockKey = 'seckill:stock:' .. voucherId
-- 2.2.订单key
local orderKey = 'seckill:order:' .. voucherId
-- 3.脚本业务
-- 3.1.判断库存是否充足 get stockKey
if(tonumber(redis.call('get', stockKey)) <= 0) then
-- 3.2.库存不足,返回1
return 1
end
-- 3.2.判断用户是否下单 SISMEMBER orderKey userId
if(redis.call('sismember', orderKey, userId) == 1) then
-- 3.3.存在,说明是重复下单,返回2
return 2
end
-- 3.4.扣库存 incrby stockKey -1
redis.call('incrby', stockKey, -1)
-- 3.5.下单(保存用户)sadd orderKey userId
redis.call('sadd', orderKey, userId)
return 0
下面是秒杀业务代码
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Resource
private ISeckillVoucherService seckillVoucherService;
@Resource
private RedisIdWorker redisIdWorker;
@Resource
private StringRedisTemplate stringRedisTemplate;
@Resource
private RedissonClient redissonClient;
@Resource(description = "RabbitTemplate")
private RabbitTemplate rabbitTemplate;
private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
static {
SECKILL_SCRIPT = new DefaultRedisScript<>();
SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
SECKILL_SCRIPT.setResultType(Long.class);
}
private IVoucherOrderService proxy;
public void handleVoucherOrder(VoucherOrder voucherOrder) {
//1.获取用户
Long userId = voucherOrder.getUserId();
// 2.创建锁对象
RLock redisLock = redissonClient.getLock("lock:order:" + userId);
// 3.尝试获取锁
boolean isLock = redisLock.tryLock();
// 4.判断是否获得锁成功
if (!isLock) {
// 获取锁失败,直接返回失败或者重试
log.error("不允许重复下单!");
return;
}
try {
//注意:由于是spring的事务是放在threadLocal中,此时的是多线程,事务会失效
createVoucherOrder(voucherOrder);
} finally {
// 释放锁
redisLock.unlock();
}
}
@Override
public Result seckillVoucher(Long voucherId) {
//获取用户
Long userId = UserHolder.getUser().getId();
// 1.执行lua脚本
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(), userId.toString()
);
int r = result.intValue();
// 2.判断结果是否为0
if (r != 0) {
// 2.1.不为0 ,代表没有购买资格
return Result.fail(r == 1 ? "库存不足" : "不能重复下单");
}
// 2.2.为0 ,有购买资格,把下单信息保存到阻塞队列
VoucherOrder voucherOrder = new VoucherOrder();
// 2.3.订单id
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
// 2.4.用户id
voucherOrder.setUserId(userId);
// 2.5.代金券id
voucherOrder.setVoucherId(voucherId);
// 2.6.放入mq
rabbitTemplate.convertAndSend("seckill_exchange", "order.voucher", voucherOrder);
// 3.获取代理对象
proxy = (IVoucherOrderService) AopContext.currentProxy();
// 4.返回订单id
return Result.ok(orderId);
}
@Transactional
@Override
public void createVoucherOrder(VoucherOrder voucherOrder) {
Long userId = voucherOrder.getUserId();
// 5.1.查询订单
int count = Math.toIntExact(query().eq("user_id", userId).eq("voucher_id", voucherOrder.getVoucherId()).count());
// 5.2.判断是否存在
if (count > 0) {
// 用户已经购买过了
log.error("不允许重复下单!");
return;
}
// 6.扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1") // set stock = stock - 1
.eq("voucher_id", voucherOrder.getVoucherId())
.gt("stock", 0) // where id = ? and stock > 0
.update();
if (!success) {
// 扣减失败
log.error("库存不足!");
return;
}
// 7.创建订单
save(voucherOrder);
}
}
总得有个消费者来监听我们的队列
消费者代码
@Slf4j
@Component
@RabbitListener(queues = "order_queue")
public class VoucherOrderListener implements ChannelAwareMessageListener {
@Autowired
VoucherOrderServiceImpl voucherOrderService;
@RabbitHandler(isDefault=true)
@Override
public void onMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
//1.接收转换消息
ObjectMapper mapper = new ObjectMapper();
VoucherOrder voucherOrder = mapper.readValue(message.getBody(), VoucherOrder.class);
//2. 处理业务逻辑
try {
// 2.创建订单
voucherOrderService.handleVoucherOrder(voucherOrder);
System.out.println("订单已执行");
} catch (Exception e) {
log.error("处理订单异常", e);
}
//3. 手动签收
channel.basicAck(deliveryTag,true);
} catch (Exception e) {
//4.拒绝签收
/*
第三个参数:requeue:重回队列。如果设置为true,则消息重新回到queue,broker会重新发送该消息给消费端
*/
channel.basicNack(deliveryTag,true,true);
}
}
}
代码部分就这么些, 思路并不算复杂
🐇展示结果
这里显示抢购成功了
我们再看看redis和mysql中情况如何
显而易见, 表中对应的订单数量也都扣减了, 完美.
🐇补充
RabbitMQ中JSON格式转换为实体对象
//将JSON格式数据转换为实体对象
ObjectMapper mapper = new ObjectMapper();
UserInfo userInfo = mapper.readValue(message.getBody(), VoucherOrder.class);
RabbitMQ中JSON格式转换为Map对象
@Test
public void sender() throws AmqpException
{
//创建用户信息Map
Map<String, Object> userMap = new HashMap<>();
userMap.put("userId", "1");
userMap.put("userName", "阿千弟的博客");
userMap.put("blogUrl", "https://blog.csdn.net/qq_51033936");
userMap.put("userRemark", "您好,欢迎访问 阿千弟的博客");
/**
* 发送消息,参数说明:
* String exchange:交换器名称。
* String routingKey:路由键。
* Object object:发送内容。
*/
rabbitTemplate.convertAndSend(RabbitMqConfig.DIRECT_EXCHANGE_NAME, RabbitMqConfig.DIRECT_ROUTING_KEY, userMap);
//将JSON格式数据转换为Map对象
@RabbitListener(queues = "order_queue")
public class VoucherOrderListener implements ChannelAwareMessageListener {
@Autowired
VoucherOrderServiceImpl voucherOrderService;
@RabbitHandler(isDefault=true)
@Override
public void onMessage(Message message, Channel channel) throws Exception {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
//将JSON格式数据转换为Map对象
ObjectMapper mapper = new ObjectMapper();
JavaType javaType = mapper.getTypeFactory().constructMapType(Map.class, String.class, Object.class);
Map<String, Object> resultMap = mapper.readValue(message.getBody(),javaType);
System.out.println("接收者收到Map格式消息:");
System.out.println("用户编号:" + resultMap.get("userId"));
System.out.println("用户名称:" + resultMap.get("userName"));
System.out.println("博客地址:" + resultMap.get("blogUrl"));
System.out.println("博客信息:" + resultMap.get("userRemark"));
//确认消息
channel.basicAck(deliveryTag, true)
} catch (Exception e) {
//4.拒绝签收
/*
第三个参数:requeue:重回队列。如果设置为true,则消息重新回到queue,broker会重新发送该消息给消费端
*/
channel.basicNack(deliveryTag,true,true);
}
}
}
注意!!!如果@RabbitListener加在类上面,需要有一个默认的处理方法@RabbitHandler(isDefault=true),默认是false。不设置一个true,消费mq消息的时候会出现“Listener method ‘no match’ threw exception”异常。
如果这篇【文章】有帮助到你💖,希望可以给我点个赞👍,创作不易,如果有对Java后端或者对redis感兴趣的朋友,请多多关注💖💖💖
💖个人主页:阿千弟
如果大家对redis相关知识感兴趣请点击这里👉👉👉redis专栏学习