java B2B2C 仿淘宝电子商城系统-基于Rabbitmq实现延迟消息

简介: 预备知识1.1 消息传递首先我们知道消费者是从队列中获取消息的,那么消息是如何到达队列的?当我们发送一条消息时,首先会发给交换器(exchange),交换器根据规则(路由键:routing key)将会确定消息投递到那个队列(queue)。
  1. 预备知识

1.1 消息传递

首先我们知道消费者是从队列中获取消息的,那么消息是如何到达队列的?

当我们发送一条消息时,首先会发给交换器(exchange),交换器根据规则(路由键:routing key)将会确定消息投递到那个队列(queue)。

带着这几个关键字:交换器、路由键和队列。

1.2 交换器类型

如之前所说,交换器根据规则决定消息的路由方向。因此,rabbitmq的消息投递分类便是从交换器开始的,不同的交换器实现不同的路由算法便实现了不同的消息投递方式。

direct交换器

direct -> routingKey -> queue,相当一种点对点的消息投递,如果路由键匹配,就直接投递到相应的队列

fanout交换器

fanout交换器相当于实现了一(交换器)对多(队列)的广播投递方式

topic交换器

提供一种模式匹配的投递方式,我们可以根据主题来决定消息投递到哪个队列。

1.3 消息延迟

本文想要实现一个可延迟发送的消息机制。消息如何延迟?

ttl (time to live) 消息存活时间

ttl是指一个消息的存活时间。

Per-Queue Message TTL in Queues

引用官方的一句话:

TTL can be set for a given queue by setting the x-message-ttl argument to queue.declare, or by setting the message-ttl policy. A message that has been in the queue for longer than the configured TTL is said to be dead.
我们可以通过x-message-ttl设置一个队列中消息的过期时间,消息一旦过期,将会变成死信(dead-letter),可以选择重新路由。

Per-Message TTL in Publishers

引用官方的一句话:

A TTL can be specified on a per-message basis, by setting the expiration field in the basic AMQP class when sending a basic.publish.

The value of the expiration field describes the TTL period in milliseconds. The same constraints as for x-message-ttl apply. Since the expiration field must be a string, the broker will (only) accept the string representation of the number.

我们可以通过设置每一条消息的属性expiration,指定单条消息有效期。消息一旦过期,将会变成死信(dead-letter),可以选择重新路由。

重新路由-死信交换机(Dead Letter Exchanges)
引用官方一句话:

Dead Letter Exchanges

Messages from a queue can be ‘dead-lettered’; that is, republished to
another exchange when any of the following events occur:

The message is rejected (basic.reject or basic.nack) with
requeue=false, The TTL for the message expires; or The queue length
limit is exceeded. Dead letter exchanges (DLXs) are normal exchanges.
They can be any of the usual types and are declared as usual.
To set the dead letter exchange for a queue, set the x-dead-letter-exchange argument to the name of the exchange.

我们可以通过设置死信交换器(x-dead-letter-exchange)来重新发送消息到另外一个队列,而这个队列将是最终的消费队列。

  1. 具体实现

rabbitmq配置

属性文件-rabbitmq.properties

交换、路由等配置按照以上策略,其中,添加了prefetch参数来根据服务器能力控制消费数量。

连接用户名

mq.user =sms_user

密码

mq.password =123456

主机

mq.host =192.168.99.100

端口

mq.port =5672

默认virtual-host

mq.vhost =/

the default cache size for channels is 25

mq.channelCacheSize =50

发送消息路由

sms.route.key =sms_route_key

延迟消息队列

sms.delay.queue =sms_delay_queue

延迟消息交换器

sms.delay.exchange =sms_delay_exchange

消息的消费队列

sms.queue =sms_queue

消息交换器

sms.exchange =sms_exchange

每秒消费消息数量

sms.prefetch =30

配置rabbitmq.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
     http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
     http://www.springframework.org/schema/beans
     http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
     http://www.springframework.org/schema/rabbit
     http://www.springframework.org/schema/rabbit/spring-rabbit-1.7.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
    <context:property-placeholder location="rabbitmq.properties"/>
    <!--配置connection-factory,指定连接rabbit server参数 -->
    <rabbit:connection-factory id="connectionFactory"
                       username="${mq.user}" password="${mq.password}"
                       host="${mq.host}" port="${mq.port}" virtual-host="${mq.vhost}" />

    <!--定义rabbit template用于数据的接收和发送 -->
    <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" />

    <!--通过指定下面的admin信息,当前producer中的exchange和queue会在rabbitmq服务器上自动生成 -->
    <rabbit:admin connection-factory="connectionFactory" />

    <!--定义queue -->
    <rabbit:queue name="${sms.queue}" durable="true" auto-delete="false" exclusive="false" />
    <!-- 创建延迟,有消息有效期的队列 -->
    <rabbit:queue name="${sms.delay.queue}" durable="true" auto-delete="false">
        <rabbit:queue-arguments>
            <entry key="x-message-ttl">
                <!-- 队列默认消息过期时间 -->
                <value type="java.lang.Long">3600000</value>
            </entry>
            <!-- 消息过期根据重新路由 -->
            <entry key="x-dead-letter-exchange" value="${sms.exchange}"/>
        </rabbit:queue-arguments>
    </rabbit:queue>

    <!-- 定义direct exchange,sms_queue -->
    <rabbit:direct-exchange name="${sms.exchange}" durable="true" auto-delete="false">
        <rabbit:bindings>
            <rabbit:binding queue="${sms.queue}" key="${sms.route.key}"/>
        </rabbit:bindings>
    </rabbit:direct-exchange>
    <!-- 延迟消息配置,durable=true 持久化生效 -->
    <rabbit:direct-exchange name="${sms.delay.exchange}" durable="true" auto-delete="false">
        <rabbit:bindings>
            <rabbit:binding queue="${sms.delay.queue}" key="${sms.route.key}"/>
        </rabbit:bindings>
    </rabbit:direct-exchange>

    <!-- 消息接收者 -->
    <bean id="messageReceiver" class="git.yampery.consumer.MsgConsumer"/>
    <!-- queue litener  观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象-->
    <rabbit:listener-container connection-factory="connectionFactory" prefetch="${sms.prefetch}">
        <rabbit:listener queues="${sms.queue}" ref="messageReceiver"/>
    </rabbit:listener-container>
</beans>

消息发布者

package git.yampery.producer;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
* @decription MsgProducer
* <p>生产者</p>
* @author Yampery
* @date 2018/2/11 11:44
*/
@Component
public class MsgProducer {

   @Resource
   private AmqpTemplate amqpTemplate;
   @Value("${sms.delay.exchange}") private String SMS_DELAY_EXCHANGE;
   @Value("${sms.exchange}") private String SMS_EXCHANGE;
   @Value("${sms.route.key}") private String SMS_ROUTE_KEY;

   /**
    * 延迟消息放入延迟队列中
    * @param msg
    * @param expiration
    */
   public void publish(String msg, String expiration) {
       amqpTemplate.convertAndSend(SMS_DELAY_EXCHANGE, SMS_ROUTE_KEY, msg, message -> {
           // 设置消息属性-过期时间
           message.getMessageProperties().setExpiration(expiration);
           return message;
       });
   }

   /**
    * 非延迟消息放入待消费队列
    * @param msg
    */
   public void publish(String msg) {
       amqpTemplate.convertAndSend(SMS_EXCHANGE, SMS_ROUTE_KEY, msg);
   }
}

消费者

package git.yampery.consumer;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;

/**
* @decription MsgConsumer
* <p>消费者</p>
* @author Yampery
* @date 2018/2/11 11:43
*/
public class MsgConsumer implements MessageListener {
   @Override
   public void onMessage(Message message) {
       String msg;
       try {
           // 线程每秒消费一次
           Thread.sleep(1000);
           msg = new String(message.getBody(), "utf-8");
           System.out.println(msg);

       } catch (Exception e) {
           // 这里并没有对服务异常等失败的消息做处理,直接丢弃了
           // 防止因业务异常导致消息失败造成unack阻塞再队列里
           // 可以选择路由到另外一个专门处理消费失败的队列
           return;
       }
   }
}

测试

package git.yampery.mq;
 //需要JAVA Spring Cloud大型企业分布式微服务云构建的B2B2C电子商务平台源码
 //  一零三八七七四六二六
import com.alibaba.fastjson.JSONObject;
import git.yampery.producer.MsgProducer;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import javax.annotation.Resource;

/**
 * @decription TestMq
 * <p>测试</p>
 * @author Yampery
 * @date 2018/2/11 15:03
 */
@RunWith(SpringRunner.class)
@SpringBootTest
public class TestMq {

    @Resource
    private MsgProducer producer;

    @Test
    public void testMq() {
        JSONObject jObj = new JSONObject();
        jObj.put("msg", "这是一条短信");
        producer.publish(jObj.toJSONString(), String.valueOf(10 * 1000));
    }
}
相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
16天前
|
运维 Java
Java版HIS系统 云HIS系统 云HIS源码 结构简洁、代码规范易阅读
云HIS系统分为两个大的系统,一个是基层卫生健康云综合管理系统,另一个是基层卫生健康云业务系统。基层卫生健康云综合管理系统由运营商、开发商和监管机构使用,用来进行运营管理、运维管理和综合监管。基层卫生健康云业务系统由基层医院使用,用来支撑医院各类业务运转。
41 5
|
16天前
|
JavaScript Java 测试技术
基于Java的代驾应用系统的设计与实现(源码+lw+部署文档+讲解等)
基于Java的代驾应用系统的设计与实现(源码+lw+部署文档+讲解等)
28 0
|
16天前
|
JavaScript Java 测试技术
基于Java的穿戴搭配系统的设计与实现(源码+lw+部署文档+讲解等)
基于Java的穿戴搭配系统的设计与实现(源码+lw+部署文档+讲解等)
27 0
|
2天前
|
消息中间件 缓存 Java
java基于云部署的SaaS医院云HIS系统源码 心理CT、B超 lis、电子病历
云HIS系统是一款满足基层医院各类业务需要的健康云产品。该产品能帮助基层医院完成日常各类业务,提供病患预约挂号支持、病患问诊、电子病历、开药发药、会员管理、统计查询、医生工作站和护士工作站等一系列常规功能,还能与公卫、PACS等各类外部系统融合,实现多层机构之间的融合管理。
33 12
|
3天前
|
存储 缓存 前端开发
Java串口通信技术探究3:RXTX库线程 优化系统性能的SerialPortEventListener类
Java串口通信技术探究3:RXTX库线程 优化系统性能的SerialPortEventListener类
15 3
|
7天前
|
监控 Java BI
java基于云计算的SaaS医院his信息系统源码 HIS云平台源码
基于云计算技术的B/S架构的HIS系统源码,SaaS模式Java版云HIS系统,融合B/S版电子病历系统,支持电子病历四级,HIS与电子病历系统均拥有自主知识产权。
29 5
|
11天前
|
Java 关系型数据库 MySQL
基于swing的java物业管理系统
基于swing的java物业管理系统
19 5
|
11天前
|
设计模式 JavaScript Java
[设计模式Java实现附plantuml源码~结构型] 扩展系统功能——装饰模式
[设计模式Java实现附plantuml源码~结构型] 扩展系统功能——装饰模式
|
16天前
|
JavaScript Java 测试技术
基于Java的珠宝购物网站系统的设计与实现(源码+lw+部署文档+讲解等)
基于Java的珠宝购物网站系统的设计与实现(源码+lw+部署文档+讲解等)
26 0
|
16天前
|
JavaScript Java 测试技术
基于Java的电影评论系统的设计与实现(源码+lw+部署文档+讲解等)
基于Java的电影评论系统的设计与实现(源码+lw+部署文档+讲解等)
32 0