前面我们已经了解了在做好基础数据的准备工作后,启动测试的时候,会做一个注册消费组的工作,完成后,我们就可以执行生产者发消息操作了。发消息的操作是:
"/test1") (publicvoidtest1(StringtopicName, intcount) { if (Util.isEmpty(topicName)) return; Executors.newSingleThreadExecutor().submit(newRunnable() { publicvoidrun() { for (inti=1; i<count; i++) { try { MqClient.publish(topicName, "", newProducerDataDto(String.valueOf(i))); } catch (MqNotInitExceptione) { // TODO Auto-generated catch blocke.printStackTrace(); } catch (ContentExceed65535Exceptione) { // TODO Auto-generated catch blocke.printStackTrace(); } Util.sleep(10); } } }); }
发送消息完成后,由于其采用的拉模式,我们可以看到消息在经过发送,存储到数据库之后,会做一个通知拉取数据操作,然后执行拉取。拉取完成后,进行响应。此时会进行消费操作,而这个过程的处理关键是handleData操作,从代码中,我们可以看到其是执行的线程操作是一个batchExcute批量执行操作,可以看到其里面有一个重要方法:threadExcute方法,从而进一步看到我们想看到的方法doMessageReceived,这个方法会调用我们自定义的方法实现消费。
// 执行消费操作
publicList<Long>onMessageReceived(List<MessageDto>messages) { try { // 执行消息消费TransactioncatTransaction=null; System.out.println("开始接收生产者发送过来的消息"); for (MessageDtomessageDto : messages) { // 执行消息消费System.out.println("当前接收到的消息是messageDto消费组:"+messageDto.getConsumerGroupName() +" "+"当前接收的消费主题:"+messageDto.getTopicName() \+" "+"当前接收到的消息:"+messageDto.getBody()); } System.out.println("接收成功"); } catch (Exceptione) { // TODO Auto-generated catch blocke.printStackTrace(); } returnnull; }
而从消息的结果可以看到其打印出来的消费消息。