在这个项目里我用的是 springboot的2版本,ORM选用 JPA快速开发,JSON工具使用阿里的 fastjson,当然,mq用的是 rabbitMQ。导入的是 springboot集成的依赖。
1. 配置部分
1.1 pom.xml
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-jpa</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.16.18</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>RELEASE</version><scope>compile</scope></dependency></dependencies>
1.2 application.properties
server.port=10000spring.datasource.url=jdbc:mysql://xxxxx/xxxxx?characterEncoding=utf-8spring.datasource.username=xxxspring.datasource.password=xxxxspring.datasource.driver-class-name=com.mysql.jdbc.Driverspring.jpa.properties.hibernate.hbm2ddl.auto=updatespring.jpa.show-sql=truespring.rabbitmq.host=localhostspring.rabbitmq.username=rootspring.rabbitmq.password=rootspring.rabbitmq.port=5672
我只是很有针对性的对 mq和 datasource进行了配置。
1.3 数据表
create table ifnot result(id int auto_increment primary key,ticket_id intnull,user_id intnull);create table ifnot exists ticket(id int auto_increment primary key,name varchar(255)null,content varchar(255)null,user_name varchar(20)null,count intdefault'6666'notnull);
根据数据表可以Generate出JavaBean,不贴JavaBean了。 ##### 1.4 项目架构
├── src│ ├── main│ │ ├── java│ │ │ └── com│ │ │ └── fantj│ │ │ └── springbootjpa│ │ │ ├── AMQP.java│ │ │ ├── controller│ │ │ │ └──TicketController.java│ │ │ ├── mq│ │ │ │ ├──Message.java│ │ │ │ ├──MQConstants.java│ │ │ │ ├──MQReceiver.java│ │ │ │ └──MQSender.java│ │ │ ├── pojo│ │ │ │ ├──Result.java│ │ │ │ └──Ticket.java│ │ │ ├── repostory│ │ │ │ ├──ResultRepository.java│ │ │ │ └──TicketRepository.java│ │ │ └── service│ │ │ ├──ResultServiceImpl.java│ │ │ ├──ResultService.java│ │ │ ├──TicketServiceImpl.java│ │ │ └──TicketService.java│ │ └── resources│ │ ├── application.properties│ │ └── rebel.xml
2. 启动类
@SpringBootApplication@EntityScan("com.fantj.springbootjpa.pojo")@EnableRabbitpublicclass AMQP {publicstaticvoid main(String[] args){SpringApplication.run(AMQP.class, args);}}
注意这个 @EnableRabbit注解,它会开启对rabbit注解的支持。
3. controller
很简单的一个controller类,实现查询和抢票功能。
@RestController@RequestMapping("/ticket")publicclassTicketController{@AutowiredprivateTicketService ticketService;@AutowiredprivateMQSender mqSender;@RequestMapping("/get/{id}")publicTicket getByid(@PathVariableInteger id){return ticketService.findById(id);}@RequestMapping("/reduce/{id}/{userId}")publicString reduceCount(@PathVariableInteger id,@PathVariableInteger userId){Message message =newMessage(id,userId);ticketService.reduceCount(id);mqSender.sendMessage(newMessage(message.getTicketId(),message.getUserId()));return"抢票成功!";}}
注意 privateMQSendermqSender;这是我的 rabbit发送消息的类。
4. Service
接口我就不再这里贴出,直接贴实现类。
4.1 ResultServiceImpl.java
@ServicepublicclassResultServiceImplimplementsResultService{@AutowiredprivateResultRepository resultRepository;@Overridepublicvoid add(Result result){resultRepository.add(result.getTicketId(), result.getUserId());}@OverridepublicResult findOneByUserId(Integer userId){return resultRepository.findByUserId(userId);}}
4.2 TicketServiceImpl.java
@ServicepublicclassTicketServiceImplimplementsTicketService{@AutowiredprivateTicketRepository repository;@OverridepublicTicket findById(Integer id){return repository.findTicketById(id);}@OverridepublicTicket reduceCount(Integer id){repository.reduceCount(id);return findById(id);}}
这两个都是很普通的service实现类,没有新加入的东西。
5. Dao
5.1 ResultRepository.java
@RepositorypublicinterfaceResultRepositoryextendsJpaRepository<Result,Integer>{@Transactional@Modifying@Query(value ="insert into result(ticket_id,user_id) values(?1,?2) ",nativeQuery =true)void add(@Param("ticketId")Integer ticketId,@Param("userId")Integer userId);Result findByUserId(Integer userId);}
5.2 TicketRepository.java
@RepositorypublicinterfaceTicketRepositoryextendsJpaRepository<Ticket,Integer>{/*** 减少库存*/@Transactional@Modifying@Query(value ="update ticket t set t.count=t.count+(-1) where id=?1",nativeQuery =true)int reduceCount(Integer id);/*** 查询信息*/Ticket findTicketById(Integer id);}
到了这里,你会发现,md哪里有用mq的痕迹...
6. MQ
剩下的全是mq的处理。
6.1 Message.java
这个类用来封装mq传输的消息对象,我们使用它来对传输的byte进行编解码,得到我们想要的数据。
@DatapublicclassMessageimplementsSerializable{privateInteger ticketId;privateInteger userId;publicMessage(){}publicMessage(Integer ticketId,Integer userId){this.ticketId = ticketId;this.userId = userId;}}
6.2 MQConstants.java
这是一个常量类,用来定义和保存
queue的名字,虽然里面只有一个常量,好习惯要从小事做起。
publicclassMQConstants{publicstaticfinalString QUEUE="qiangpiao";}
6.3 MQSender.java
这是消息发送类,用来给queue发送数据。
@Service@Slf4jpublicclassMQSender{@AutowiredprivateAmqpTemplate amqpTemplate;publicvoid sendMessage(Message message){String msg =JSONObject.toJSONString(message);log.info("send message : "+msg);amqpTemplate.convertAndSend(MQConstants.QUEUE,msg);}}
AmqpTemplate是springboot框架提供给我们使用的amqp操作模板,利用它我们能更方便的调用和处理业务。 我们在Controller层调用它,来完成消息入队的操作,完成削峰和异步处理,大大增加了系统并发和强健性。
6.4 MQReceiver.java
这是消息接收类,用来从queue里获取数据。
@Service@Slf4jpublicclassMQReceiver{@AutowiredprivateTicketService ticketService;@AutowiredprivateResultService resultService;@RabbitListener(queues =MQConstants.QUEUE)publicvoid receive(String message){log.info("receive msg : "+message);JSONObject jsonObject =JSONObject.parseObject(message);System.out.println(jsonObject);Message msg =JSONObject.toJavaObject(jsonObject,Message.class);Integer ticketId = msg.getTicketId();Integer userId = msg.getUserId();// 减库存Ticket ticket = ticketService.reduceCount(ticketId);if(ticket.getCount()<=0){return;}// 判断是否已经抢过Result oneByUserId = resultService.findOneByUserId(userId);if(oneByUserId !=null){return;}resultService.add(newResult(ticketId,userId));}}
在这个类中, @RabbitListener(queues=MQConstants.QUEUE)标记的是监听方法,该方法会从queue中获取到String数据。
之后我们需要将其复原为JavaBean,取出我们该要的属性,继续处理业务: 查询票剩余量-> 判断是否已抢到过-> 减库存 -> 增加抢票数据。 (我这里写的有点草率,应该先查余量...,不过不重要,本章重点在过一遍springboot与rabbitmq的整合)。
运行效果
我对该抢票功能做了一个9999请求,我本来做3k并发,电脑没那么多句柄,实现不了,最后做了1k并发的压测。
这是rabbitMQ 自带Managerment模板上的截图:
压测报告:
ServerSoftware:ServerHostname: 127.0.0.1ServerPort: 10000DocumentPath: /ticket/reduce/1/10DocumentLength: 13 bytesConcurrencyLevel: 1000Time taken for tests: 423.101 secondsComplete requests: 9999Failed requests: 0Total transferred: 1459854 bytesHTML transferred: 129987 bytesRequests per second: 23.63[#/sec] (mean)Time per request: 42314.334[ms](mean)Time per request: 42.314[ms](mean, across all concurrent requests)Transfer rate: 3.37[Kbytes/sec] receivedConnectionTimes(ms)min mean[+/-sd] median maxConnect: 0 2 6.8 0 29Processing: 217401977390.7 41984 58488Waiting: 217401977390.8 41984 58488Total: 246401997384.8 41985 58488Percentage of the requests served within a certain time (ms)50% 4198466% 4267075% 4274480% 4275890% 4280195% 4282898% 4285099% 42868100% 58488(longest request)
注意
- 本项目没有考虑线程安全的问题,事实上线程是不安全的,线程安全问题后面会说。
- 本项目只是为了mq的削峰和异步处理,最直观的就是数据库可以称住高并发,一般来讲,数据库连接这块是称不住的。
- mq在分布式下的问题后面会说。
