想象一下这样一个场景:多名旅行者同时尝试预订热门目的地的最后一个可用房间。如果没有适当的并发控制机制,这种情况很快就会变成竞争状态,导致房间超额预订和客户沮丧。
我们将深入研究用于应对这些挑战的两种关键策略的复杂性:乐观锁定和消息队列。
想象一下您正在使用一个在线酒店预订平台,类似于 Booking.com 或 Expedia 等知名平台。以下是同步和异步流程如何发挥作用:
同步流程:
预订房间(同步):
- 您访问酒店预订网站并选择您的目的地、入住和退房日期以及其他偏好。
- 您点击“立即预订”按钮即可预订房间。
- 该网站使用基于 HTTP 的同步协议(如 REST 或 SOAP)将您的请求发送到酒店的预订系统。
- 酒店的系统会立即同步处理您的请求。它检查房间可用性,为您预订房间,并生成预订号码。
- 预订号码将发送回您的浏览器,并在几秒钟内显示在网站上。
- 您可以立即获得预订号码,然后可以放心地继续您的旅行计划。
创建房间实体
import javax.persistence.Entity; import javax.persistence.GeneratedValue; import javax.persistence.GenerationType; import javax.persistence.Id; @Entity public class Room { @Id @GeneratedValue(strategy = GenerationType.IDENTITY) private Long id; private String roomType; private boolean isAvailable; // getters and setters }
创建房间存储库
import org.springframework.data.jpa.repository.JpaRepository; public interface RoomRepository extends JpaRepository<Room, Long> { Room findByRoomType(String roomType); }
创建客房预订请求 DTO
import java.time.LocalDate; public class RoomBookingRequest { private String roomType; private LocalDate checkInDate; private LocalDate checkOutDate; // getters and setters }
创建客房预订响应 DTO
public class RoomBookingResponse { private String reservationNumber; // getters and setters }
创建客房服务
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.UUID; @Service public class RoomService { @Autowired private RoomRepository roomRepository; public RoomBookingResponse bookRoom(RoomBookingRequest bookingRequest) { String roomType = bookingRequest.getRoomType(); LocalDate checkInDate = bookingRequest.getCheckInDate(); LocalDate checkOutDate = bookingRequest.getCheckOutDate(); Room room = roomRepository.findByRoomType(roomType); if (room != null && room.isAvailable()) { // Add validation to check availability based on check-in and check-out dates here. // For simplicity, we'll assume the room is available. room.setAvailable(false); roomRepository.save(room); // Generate a reservation number (you can implement your logic here). String reservationNumber = generateReservationNumber(); return new RoomBookingResponse(reservationNumber); } else { throw new RoomNotAvailableException(); } } private String generateReservationNumber() { // Generate a unique reservation number (you can implement your logic here). return UUID.randomUUID().toString(); } }
创建房间控制器
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*; @RestController @RequestMapping("/api/rooms") public class RoomController { @Autowired private RoomService roomService; // Book a room @PostMapping("/book") public RoomBookingResponse bookRoom(@RequestBody RoomBookingRequest bookingRequest) { return roomService.bookRoom(bookingRequest); } }
定义自定义异常
import org.springframework.http.HttpStatus; import org.springframework.web.bind.annotation.ResponseStatus; @ResponseStatus(HttpStatus.BAD_REQUEST) public class RoomNotAvailableException extends RuntimeException { public RoomNotAvailableException() { super("The requested room is not available."); } }
测试API
您可以使用 Postman 或 cURL 等工具来测试您的 API。要预订房间,请http://localhost:8080/api/rooms/book使用包含房间类型、入住日期和退房日期的 JSON 正文发出 POST 请求:
{ "roomType" : "Standard" , "checkInDate" : "2023-10-01" , "checkOutDate" : "2023-10-05" }
如果房间可用,API 将返回带有预订编号的 JSON 响应。您可以根据您的课堂需求自定义预订逻辑和预订号码生成RoomService。
异步流程
当多个用户同时调用Booking API时
当多个并发呼叫在系统中搜索同一房间时,可能存在潜在的缺点和挑战:
竞争条件:当多个请求尝试同时预订同一房间时,可能会出现竞争条件。如果处理不当,这可能会导致超额预订,即系统允许的预订数量超过了可用房间的数量。
如何解决并发问题?
乐观锁定是一种数据库级技术,可防止多个用户同时尝试更新同一资源时发生数据冲突。
另一方面,消息队列是异步通信工具,可确保请求的有序、可靠处理,使其成为分布式系统中处理并发请求的理想选择。
方法一:实现消息队列响应并发请求
消息队列确保请求按照接收顺序进行处理,从而防止竞争条件和超量预订。
- 多个客户端向端点发出 POST 请求/api/rooms/book以同时预订酒店房间。
- 处理RoomController传入的预订请求。
- 该roomService.bookRoom方法接收预订请求。
- 它使用该方法将预订请求发送到名为“room-booking”的 RabbitMQ 消息队列rabbitTemplate.convertAndSend。
- 它向客户端返回初步响应,其中包含一条消息,表明预订请求已发送,客户端应等待确认。
- 预订请求被放入“房间预订”队列中。消息队列系统(在本例中为 RabbitMQ)确保每个预订请求都按照收到的顺序进行处理,以防止竞争情况。
- 监听RoomBookingMessageConsumer“房间预订”队列。
- processBookingRequest当预订请求出队时,将调用消费者的方法。在该方法中,您通常会实现以下逻辑:
- 根据请求的房型、入住日期和退房日期检查客房供应情况。
- 如果房间可用,则生成预订号码。
- 更新数据库中的房间可用性,将其标记为不可用,以防止重复预订。
- 通过RabbitMQ向客户端发送包含预约号的响应消息
8. 在 中RoomBookingMessageConsumer,处理预订请求并生成预订号码后,您可以使用传统的 HTTP 客户端(例如RestTemplate、HttpClient)将确认响应直接发送到客户端的回调 URL 端点(该端点在请求中发送)。
执行:
创建客房预订请求和响应 DTO
import java.time.LocalDate; public class RoomBookingRequest { private String roomType; private LocalDate checkInDate; private LocalDate checkOutDate; private String clientCallbackUrl; // Added to specify the client's callback URL // getters and setters } public class RoomBookingResponse { private String reservationNumber; // getters and setters }
修改控制器
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*; @RestController @RequestMapping("/api/rooms") public class RoomController { @Autowired private RoomService roomService; @PostMapping("/book") public RoomBookingResponse bookRoom(@RequestBody RoomBookingRequest bookingRequest) { return roomService.bookRoom(bookingRequest); } }
创建客房预订服务(生产者)
import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.http.ResponseEntity; import org.springframework.web.client.RestTemplate; @Service public class RoomService { @Autowired private RoomRepository roomRepository; @Autowired private RabbitTemplate rabbitTemplate; private RestTemplate restTemplate = new RestTemplate(); public RoomBookingResponse bookRoom(RoomBookingRequest bookingRequest) { String roomType = bookingRequest.getRoomType(); // Send the booking request to the message queue rabbitTemplate.convertAndSend("room-booking-exchange", "room-booking", bookingRequest); return new RoomBookingResponse("Booking request sent. Please wait for confirmation."); } // This method sends the response to the client's callback URL public void sendResponseToClient(RoomBookingResponse response, String clientCallbackUrl) { ResponseEntity<Void> result = restTemplate.postForEntity(clientCallbackUrl, response, Void.class); if (result.getStatusCode().is2xxSuccessful()) { // Handle a successful response sent to the client } else { // Handle the case when the response to the client failed } } }
创建消息消费者
import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class RoomBookingMessageConsumer { @Autowired private RoomService roomService; @RabbitListener(queues = "room-booking-queue") public void processBookingRequest(RoomBookingRequest bookingRequest) { // Process the booking request RoomBookingResponse response = processBookingLogic(bookingRequest); // Send the confirmation response to the client's callback URL roomService.sendResponseToClient(response, bookingRequest.getClientCallbackUrl()); } private RoomBookingResponse processBookingLogic(RoomBookingRequest bookingRequest) { // Implement your booking logic here, e.g., checking room availability and generating a reservation number // Update room availability in the database // Send a response message to confirm the booking or indicate unavailability // For simplicity, we'll assume the room is available and generate a reservation number. String reservationNumber = generateReservationNumber(); return new RoomBookingResponse(reservationNumber); } private String generateReservationNumber() { // Generate a unique reservation number (you can implement your logic here). return "RES-" + System.currentTimeMillis(); } }
方法二:实现乐观锁来处理并发请求
您可以修改代码以使用同步方法和 JPA 乐观锁定。
步骤1:修改Room实体:@Version向实体添加一个字段Room以启用乐观锁定:
import javax.persistence.*; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @Entity public class Room { @Id @GeneratedValue(strategy = GenerationType.IDENTITY) private Long id; private String roomType; private boolean isAvailable; @Version private Long version; // getters and setters }
步骤2:修改客房服务对每个房间使用ReentrantLock来同步访问房间预订操作
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @Service public class RoomService { @Autowired private RoomRepository roomRepository; private final ConcurrentHashMap<Long, Lock> roomLocks = new ConcurrentHashMap<>(); public RoomBookingResponse bookRoom(RoomBookingRequest bookingRequest) { String roomType = bookingRequest.getRoomType(); LocalDate checkInDate = bookingRequest.getCheckInDate(); LocalDate checkOutDate = bookingRequest.getCheckOutDate(); Room room = roomRepository.findByRoomType(roomType); if (room != null) { Lock roomLock = roomLocks.computeIfAbsent(room.getId(), id -> new ReentrantLock()); roomLock.lock(); try { if (room.isAvailable()) { // Add validation to check availability based on check-in and check-out dates here. // For simplicity, we'll assume the room is available. room.setAvailable(false); roomRepository.save(room); // Generate a reservation number (you can implement your logic here). String reservationNumber = generateReservationNumber(); return new RoomBookingResponse(reservationNumber); } } finally { roomLock.unlock(); } } throw new RoomNotAvailableException(); } private String generateReservationNumber() { // Generate a unique reservation number (you can implement your logic here). return UUID.randomUUID().toString(); } }
详细工作原理:
并发请求&ConcurrentHashMap:当同一房间收到多个并发预订请求时,它们可能同时到达并可能导致竞争条件。的引入ConcurrentHashMap确保每个房间都有自己的锁。这ConcurrentHashMap是一个线程安全的映射,可以由多个线程同时安全地访问。
通过锁定并发更新房间可用性:如果两个线程同时尝试预订同一个房间,则只有其中一个线程会使用 成功获取锁roomLock.lock(),而另一个线程将暂时阻塞,直到第一个线程释放锁。
释放锁以供其他线程更新:一旦线程获取了锁并成功修改了房间的可用性,它就会使用 释放锁roomLock.unlock(),从而允许其他线程继续预订其他房间。
乐观锁防止数据库级别的竞争条件:在代码中,实体中的字段启用数据库级别的乐观锁。更新房间时,JPA 在允许更新之前会根据实体中的版本字段检查数据库中的版本字段。@VersionRoom
- 如果两个事务同时尝试更新同一个房间,根据版本号的比较,只有其中一个会成功,从而防止数据库级别的数据冲突。
- 因此 2 个不同的事务无法同时更新数据库中的一个房间