由于项目场景的实际情况(可能不需要那么所谓的专业消息中间件),以及开发团队对技术选型的讨论,为了不引入过多的中间件让系统的部署变得复杂,我们使用Redis实现了系统的消息发布订阅功能。实践证明Redis的消息发布订阅功能还是靠谱的。话不多说,直接上代码。
首先引入下面的依赖,让系统支持redis的功能。
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency>
其次定义一个配置类,配置消息订阅的主题key,以及该key的消息被哪个消费类消费,比如下面的
MsgInfoConsumer,消费的方法是该类的onMessage方法。这就是消息发布订阅的消费方。
publicclassRedisListener { RedisMessageListenerContainercontainer(RedisConnectionFactoryconnectionFactory, MessageListenerAdaptermsgIngoListenerAdapter) { RedisMessageListenerContainercontainer=newRedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); // 可以添加多个 messageListener,配置不同的交换机container.addMessageListener(msgIngoListenerAdapter, newPatternTopic("channel:MSGINFO")); returncontainer; } MessageListenerAdaptermsgIngoListenerAdapter(MsgInfoConsumerreceiver) { returnnewMessageListenerAdapter(receiver, "onMessage"); } }
publicclassMsgInfoConsumer { privatestaticfinalLoggerLOGGER=LoggerFactory.getLogger(MsgInfoConsumer.class); publicvoidonMessage(Stringmessage){ try { JSONObjectjsonObject=JSONObject.parseObject(message, JSONObject.class); processMsg(jsonObject); }catch (Exceptione){ e.printStackTrace(); } } privatevoidprocessMsg(JSONObjectjsonObject){ //do something } }
定义完上述消费方,就是消息的生产方,生产方的代码很简单,主要是一行语句执行消息的发布
stringRedisTemplate.convertAndSend("channel:MSGINFO", JSON.toJSONString(obj));
至此就完成了消息的发布订阅功能。当然完整的功能还有很多,比如处理消息异常的数据,处理历史数据的消息的工作,这块大家可以通过分析数据以及写一些符合业务的代码去实现。