最近工作室的一个业务跟另一个业务合并 自然要用到MQ(消息队列Message Queue)那么很显然 就要部署个RabbitMQ到服务器上了
我们用的是云托管的的服务 那自然是部署中间件到云服务上去了 服务是一路开通 结果到了需要调试的时候 怎么也连不上 (说是内网直连,但关键是 同事们都在线下做本地测试的呀)
直接无语了 面对这一场景 怎么办?业务还要继续 等着交货的 于是我想起了之前学过的技术栈
Redis 也能作为消息队列的(不过用的比较少所以不大容易记起来 或者也没啥人知道) 于是一顿卡卡操作 步骤还比MQ简单 下面就来看是如何实现的
Redis作为消息队列的优缺点:
使用Redis作为消息队列的选择相对于使用专门的消息队列系统(如RabbitMQ、Kafka等)有以下优点:
- 简单轻量:Redis是一个内存中的数据存储系统,具有轻量级和简单的特点。相比较专门的消息队列系统,使用Redis作为消息队列不需要引入额外的组件和依赖,可以减少系统的复杂性。
- 速度快:由于Redis存储在内存中,它具有非常高的读写性能。这对于需要低延迟的应用程序非常有优势。
- 多种数据结构支持:Redis提供了丰富的数据结构,如列表、发布/订阅、有序集合等。这使得Redis在处理不同类型的消息和任务时更加灵活。
- 数据持久化:Redis可以通过将数据持久化到磁盘来提供数据的持久性。这意味着即使Redis重启,之前的消息也不会丢失。
- 广泛的应用场景:Redis不仅可以用作消息队列,还可以用作缓存、数据库、分布式锁等多种用途。如果你的应用程序已经使用了Redis,那么使用Redis作为消息队列可以减少技术栈的复杂性。
缺点也很明显:
- 缺少一些高级特性:相对于专门的消息队列系统,Redis在消息队列方面的功能可能相对简单。例如,它可能缺乏一些高级消息传递功能,如消息重试、消息路由、持久化消息等。
- 可靠性和一致性:Redis的主要设计目标是提供高性能和低延迟,而不是强一致性和高可靠性。在某些情况下,Redis可能会丢失消息,或者在出现故障时可能无法提供持久性保证。
应用场景:
适用于简单的中小型项目 如果功能简单,访问量并不大可以考虑
如果你的应用程序对可靠性和高级功能有严格要求,并且需要处理大量的消息和复杂的消息路由,那么使用专门的消息队列系统可能更合适。
Redis实现消息队列系统 实现步骤:
配置Redis:
- 首先,确保你已经正确地配置了Redis和Lettuce依赖项,并创建了LettuceConnectionFactory对象。
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency>
redis: host: port: 6379 password: lettuce: pool: max-active: 1000 max-idle: 1000 min-idle: 0 time-between-eviction-runs: 10s max-wait: 10000
- 创建一个RedisTemplate对象,并将LettuceConnectionFactory设置为其连接工厂:
@Bean public RedisTemplate<String, String> redisTemplate(LettuceConnectionFactory connectionFactory) { RedisTemplate<String, String> template = new RedisTemplate<>(); template.setConnectionFactory(connectionFactory); template.setDefaultSerializer(new StringRedisSerializer()); return template; }
设置RedisTemplate的序列化器。在消息队列中,你可以使用默认的序列化器,即StringRedisSerializer,它会将消息以字符串的形式进行存储和传输。可以通过以下代码设置默认的序列化器:
redisTemplate.setDefaultSerializer(new StringRedisSerializer());
实现消息的发布和订阅功能。
- 发布消息:
redisTemplate.convertAndSend("channel_name", "message_payload");
- 在上述代码中,"channel_name"是消息的通道名称,"message_payload"是要发布的消息内容。
- 订阅消息:
- 首先,创建一个MessageListener实现类来处理接收到的消息:
public class MessageListenerImpl implements MessageListener { @Override public void onMessage(Message message, byte[] pattern) { // 处理接收到的消息 String channel = new String(message.getChannel()); String payload = new String(message.getBody()); // 执行自定义的逻辑 } }
创建一个LettuceMessageListenerAdapter对象,并提供MessageListener实现类:
LettuceMessageListenerAdapter listenerAdapter = new LettuceMessageListenerAdapter(new MessageListenerImpl()); listenerAdapter.afterPropertiesSet();
创建一个RedisMessageListenerContainer对象,并配置它的LettuceConnectionFactory和监听适配器:
RedisMessageListenerContainer listenerContainer = new RedisMessageListenerContainer(); listenerContainer.setConnectionFactory(lettuceConnectionFactory); listenerContainer.addMessageListener(listenerAdapter, new ChannelTopic("通道名称")); listenerContainer.start();
通过以上步骤,我们创建了一个LettuceConnectionFactory对象来与Redis服务器建立连接。然后,我们创建了一个MessageListener实现类来处理接收到的消息。接下来,我们创建了一个LettuceMessageListenerAdapter对象,并提供MessageListener实现类。最后,我们创建了一个RedisMessageListenerContainer对象,并配置它的LettuceConnectionFactory和监听适配器,然后启动容器以开始监听指定通道上的消息。
以上的方案 好处就是 可以很明显的知道监听者在哪个部分 监听对应通道的信息 然而 业务当中 如果每一个对应模块的业务和通道都建立一个监听者来进行监听(我们假设每一个就业务所要得到消息以后所执行的逻辑都不相同) 那这个工作量就会暴增
于是就有了第二种写法 :
实战与改良
/*** * @title MessageManager * @author SUZE * @Date 2-17 **/ @Component public class ReservedMessageManager { private String ListenerId; private String UserId; private String message; private final RedisTemplate<String, String> redisTemplate; @Autowired public ReservedMessageManager(RedisTemplate<String, String> redisTemplate) { this.redisTemplate = redisTemplate; subscribeToChannel("reserved"); } @Resource private SmsServer smsServer; public void publishMessage(String channel, reserveMessage message) { String Message=serialize(message); redisTemplate.convertAndSend("channel_name", "message_payload"); redisTemplate.convertAndSend(channel, Message); } // 接收到消息时触发的事件 private void handleReserveMessage(String channel, reserveMessage reserveMessage) { if (reserveMessage != null) { String userId = reserveMessage.getUserId(); String ListenerId=reserveMessage.getListenerId(); String message = reserveMessage.getMessage(); //TODO 处理接收到的消息逻辑 这里后续要对Message进行一个检测他有wait agree和refused和over四种状态 思种状态就是不一样的发送内容 switch (message){ //TODO 消息要给两边都发 所以要发两份 发信息的文案 case "wait": smsServer.sendSms(userId,ListenerId,message); break; case "agree": smsServer.sendSms(userId,ListenerId,message); break; case "refuse": smsServer.sendSms(userId,ListenerId,message); break; case "over": //这里要操作文档系统了 //拒绝的话 那就要监听一下 smsServer.sendSms(userId,ListenerId,message); break; } //smsServer.sendSms(userId,ListenerId,message); // 其他处理逻辑... } } public void subscribeToChannel(String channel) { redisTemplate.execute((RedisCallback<Object>) (connection) -> { connection.subscribe((message, pattern) -> { String channelName = new String(message.getChannel()); byte[] body = message.getBody(); // 解析接收到的消息 switch (channelName){ case "reserved": reserveMessage reserveMessage = deserializeMessage(new String(body)); handleReserveMessage(channelName, reserveMessage); break; //还有其他的通道 例如refuse就是一个 拒绝通道 专门监听拒绝的理由 } }, channel.getBytes()); return null; }); } // 反序列化 private reserveMessage deserializeMessage(String body) { ObjectMapper objectMapper = new ObjectMapper(); try { return objectMapper.readValue(body, reserveMessage.class); } catch (IOException e) { // 处理反序列化异常 e.printStackTrace(); return null; } } // 序列化 public String serialize(reserveMessage reserveMessage) throws SerializationException { if (reserveMessage == null) { return null; } try { ObjectMapper objectMapper = new ObjectMapper(); return objectMapper.writeValueAsString(reserveMessage); } catch (JsonProcessingException e) { throw new SerializationException("Error serializing object", e); } } }
代码解释
- subscribeToChannel方法接受一个channel参数,用于指定要订阅的通道名称。
- redisTemplate.execute方法用于执行Redis操作,并传入一个RedisCallback回调函数。
- 回调函数使用lambda表达式的形式实现,接受一个connection参数,表示与Redis的连接。
- 在回调函数中,调用connection.subscribe方法来订阅通道。该方法接受一个回调函数作为参数,用于处理接收到的消息。
- 在消息回调函数中,首先从message对象中获取通道名称和消息体。
- 使用new String(message.getChannel())将通道名称转换为字符串类型,并存储在channelName变量中。
- 使用message.getBody()获取消息体的字节数组表示,并存储在body变量中。
- 在switch语句中,根据通道名称进行不同的处理。在这个例子中,仅处理"reserved"通道。
- 对于"reserved"通道的处理,调用deserializeMessage方法将消息体反序列化为reserveMessage对象,并将其存储在名为reserveMessage的局部变量中。
- 调用handleReserveMessage方法,将通道名称和反序列化后的reserveMessage对象作为参数进行处理。
- handleReserveMessage方法用于处理接收到的保留消息的逻辑。它检查消息类型,并根据类型执行不同的操作。根据消息类型,它调用smsServer.sendSms方法向指定的userId和listenerId发送短信。
我把消息处理的系统中心化处理,也就是说是这个监听系统他可以监听reserved通道的所有业务类型,我这里列举了四种wait,agree,refuse,over四种 但如果是更大的业务体系 同一个通道可能面临着更多可能性分支 那如果按照第一套的方案 需要对每一个具体业务实现一个监听者 工作量就很大(可能这样耦合会低一些吧)
但是我这样把消息集中处理 然后短信发送系统就专门只做短信发送的事情 xx系统就只做对应的工作 就能把工作上的耦合度大大降低
那么大家应该也注意到我的两个负责序列化和反序列化的方法了吧 这是因为业务需求 要把对象封装成一个类 所以这里的方案就是在信息中心处理器上 自定义一个序列化方案(如果再做得好一点其实可以把这个序列器抽理出来封装为一个抽象方法 用泛型来定义返回结果和参数 这样就能序列化所有引用的类型了)
遇到的问题:
对了 中途遇到了这样一个错误
错误信息:com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of `TopOne.MessageSystem.entity.reserveMessage` (no Creators, like default constructor, exist): cannot deserialize from Object value (no delegate- or property-based Creator)
原因与分析:
reserveMessage类缺少默认构造函数,这导致Jackson库无法构造该类的实例。错误消息中提到了以下内容:"Cannot construct instance of TopOne.MessageSystem.entity.reserveMessage (no Creators, like default constructor, exist)"。
为了使Jackson能够正确地反序列化对象,需要在reserveMessage类中添加一个默认构造函数。默认构造函数是一个无参数的构造函数,它不需要任何参数来创建对象。
在你的reserveMessage类中
这个是改好的封装类:
@Data public class reserveMessage { private String UserId; private String ListenerId; private String message; public reserveMessage() { // 默认构造函数 } public reserveMessage(String userId, String ListenerId,String message) { this.UserId = userId; this.ListenerId = ListenerId; this.message=message; } }
实际业务中的测试
发布服务
订阅服务(监听服务)
测试结果
成功
这里面的打印是代替了原本业务中的短信发送 也算是成了