【消息队列开发】 创建核心类

简介: 【消息队列开发】 创建核心类

🍃前言

今天我们来创建项目并实现服务器模块的核心类,如下图所示

🎍项目创建

这里我们创建一个sprig Boot 的项目,但是由于我们后面可能涉及与客户端交互的接口,所以我们这里也会添加spring MVC的依赖

🚩自定义包

这里我们将三个大的模块分别创建三个包

  • common:公共模块
  • mqclient:客户端模块
  • mqserver:服务器模块

    而在我们服务器模块我我们又会分为很多模块,今天我们先来完成内存管理模块,在mqserver路劲下创建一个新的包为core,接下来,将在这个core包里创建核心类

🍀Exchange(交换机)

作为一个交换机,我们希望它有以下属性

  1. 我们得拥有一个ID来标识该交换机,这里博主选择使用String类型的name属性来进行标识
  2. 对于交换机而言,因为有三个种类,所以我们希望有一个字段能够标识该交换机是何种类型,方法有很多种,这里博主选择另创建一个枚举类来标识。
  3. 枚举类名:ExchangeType,里面分别用DIRECT(0),FANOUT(1),TOPIC(2);来表示三种状态
  4. 我们使用一个Boolean的字段durable来表示该交换机是否持久化

以下两个属性,后面博主可能开发不完全,但是由于博主是仿照RabbitMQ实现的,所以RabbitMQ里面有的属性,博主这里也进行了添加

  1. 我们希望,如果该交换机没有人使用,我们就对它进行删除,这里博主选用boolean类型的autoDelete属性来标识,false-不删除,true-删除
  2. 我们再创建一个Map<String, Object>类型的属性arguments表示的是创建交换机时指定的一些额外的参数选项.
  3. 最后我们统一添加相应的get、set方法即可

代码实现如下:

/*
 * 这个类表示一个交换机
 */
public class Exchange {
    // 此处使用 name 来作为交换机的身份标识. (唯一的)
    private String name;
    // 交换机类型, DIRECT(专属红包), FANOUT(群发红包), TOPIC(指令红包)
    private ExchangeType type = ExchangeType.DIRECT;
    private boolean durable = false;
    // 如果当前交换机, 没人使用了, 就会自动被删除.
    // 这个属性暂时先列在这里, 后续的代码中并没有真的实现这个自动删除功能~~ (RabbitMQ 是有的)
    private boolean autoDelete = false;
    // arguments 表示的是创建交换机时指定的一些额外的参数选项. 后续代码中并没有真的实现对应的功能, 先列出来. (RabbitMQ 也是有的)
    // 为了把这个 arguments 存到数据库中, 就需要把 Map 转成 json 格式的字符串.
    private Map<String, Object> arguments = new HashMap<>();
    public String getName() { return name;}
    public void setName(String name) {this.name = name;}
    public ExchangeType getType() {
        return type;
    }
    public void setType(ExchangeType type) {
        this.type = type;
    }
    public boolean isDurable() {
        return durable;
    }
    public void setDurable(boolean durable) {
        this.durable = durable;
    }
    public boolean isAutoDelete() {
        return autoDelete;
    }
    public void setAutoDelete(boolean autoDelete) {
        this.autoDelete = autoDelete;
    }
    public Map<String, Object> getArguments() {
        return arguments;
    }
    public void setArguments(Map<String, Object> arguments) {
        this.arguments = arguments;
    }
/*
这是一个枚举类,里面表示的是交换机的类型
 */
public enum ExchangeType {
    DIRECT(0),
    FANOUT(1),
    TOPIC(2);
    private final int type;
    //private封装,不想让外界访问
    private ExchangeType(int type) {
        this.type = type;
    }
    public int getType() {
        return type;
    }
}

🎄MSGQueue(队列)

作为一个队列,我们希望它有以下属性

  1. 表示队列的身份标识,使用String类型的name属性
  2. 表示该队列是否持久化,使用Boolean类型的durable属性,true 表示持久化保存, false 表示不持久化.
  3. 对于队列,我们希望通过一个属性事项独占功能,也就是这个队列只能被一个消费者使用(别人用不了),这里我们使用Boolean类型的exclusive属性进行表示,true -独占,false -公用

是否删除与额外参数的属性与交换机里面属性一样,这里不做赘述了

代码实现如下:

/*
这表示一个队列
 */
public class MSGQueue {
    // 表示队列的身份标识.
    private String name;
    // 表示队列是否持久化, true 表示持久化保存, false 表示不持久化.
    private boolean durable = false;
    // 这个属性为 true, 表示这个队列只能被一个消费者使用(别人用不了). 如果为 false 则是大家都能使用
    // 这个 独占 功能, 也是先把字段列在这里, 具体的独占功能暂时先不实现.
    private boolean exclusive = false;
    // 为 true 表示没有人使用之后, 就自动删除. false 则是不会自动删除.
    // 这个 自动删除 功能, 也是先把字段列在这里, 具体的自动删除功能暂时先不实现.
    private boolean autoDelete = false;
    // 也是表示扩展参数. 当前也是先列在这里, 先暂时不实现
    private Map<String, Object> arguments = new HashMap<>();
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public boolean isDurable() {
        return durable;
    }
    public void setDurable(boolean durable) {
        this.durable = durable;
    }
    public boolean isExclusive() {
        return exclusive;
    }
    public void setExclusive(boolean exclusive) {
        this.exclusive = exclusive;
    }
    public boolean isAutoDelete() {
        return autoDelete;
    }
    public void setAutoDelete(boolean autoDelete) {
        this.autoDelete = autoDelete;
    }
    public Map<String, Object> getArguments() {
        return arguments;
    }
    public void setArguments(Map<String, Object> arguments) {
        this.arguments = arguments;
    }
}

🎋Binding(板顶管理,交换机与队列的关系)

该类我们主要有以下三个属性:

  1. 交换机id
  2. 队列id
  3. bindingKey对应的为交换机的类型

这里我们不设置是否持久化的原因

  • Binding 这个东西, 是依附于 Exchange 和 Queue 的!!!
  • 比如, 对于持久化来说, 如果 Exchange 和 Queue 任何一个都没有持久化,
  • 此时你针对 Binding 持久化是没有任何意义的

代码实现如下:

*
表示队列和交换机之间的关系
 */
public class Binding {
    //交换机名字
    private String exchangeName;
    //队列名字
    private String queueName;
    // bindingKey对应的交换机类型为Topic,主题匹配
    // 相当于就是在出题, 要求领红包的人要画个 "桌子" 出来~~
    private String bindingKey;
    //此处没有设置是否持久化存储的原因
    // Binding 这个东西, 是依附于 Exchange 和 Queue 的!!!
    // 比如, 对于持久化来说, 如果 Exchange 和 Queue 任何一个都没有持久化,
    // 此时你针对 Binding 持久化是没有任何意义的
    public String getExchangeName() {
        return exchangeName;
    }
    public void setExchangeName(String exchangeName) {
        this.exchangeName = exchangeName;
    }
    public String getQueueName() {
        return queueName;
    }
    public void setQueueName(String queueName) {
        this.queueName = queueName;
    }
    public String getBindingKey() {
        return bindingKey;
    }
    public void setBindingKey(String bindingKey) {
        this.bindingKey = bindingKey;
    }
}

🌴Message(传递的消息)

对于Message我们需要明确它的两个核心属性

  • 消息的属性
  • 消息的正文

关于消息的属性,我们单独使用一个类来对它进行表示,类名为:BasicProperties

关于一个消息,我们应该包含以下属性:

  • 唯一标识的id,由于消息比较多,此处为了保证 id 的唯一性, 使用 UUID 来作为 message id
  • 还应该包含routingKey,与bindingKey 做匹配
  • 如果当前的交换机类型是 DIRECT, 此时 routingKey 就表示要转发的队列名.
  • 如果当前的交换机类型是 FANOUT, 此时 routingKey 无意义(不使用).
  • 如果当前的交换机类型是 TOPIC, 此时 routingKey 就要和 bindingKey 做匹配. 符合要求的才能转发给对应队列.
  • 对于消息我们也添加上是否可持久化的选项,参考RabbitMQ ,使用int类型的deliverMode属性,1 表示不持久化, 2 表示持久化

除去核心属性,我们还有辅助用的属性

如果Message 后续存储到文件中(如果持久化的话).

一个文件中会存储很多的消息. 如何找到某个消息, 在文件中的具体位置呢?

我们则使用使用下列的两个偏移量来进行表示.

  • [offsetBeg, offsetEnd)

我们还希望添加一个属性isValid表示该消息在文件中是否是有效消息. (针对文件中的消息, 如果删除, 使用逻辑删除的方式,0x1 表示有效. 0x0 表示无效.)

对于消息,我们需要提供一个如何构造消息对象的方法,方法有很多,这里我们选择创建一个工厂方法, 让工厂方法帮我们封装一下创建 Message 对象的过程.

除此之外,由于消息是需要能够在网络上传输, 并且也需要能写入到文件中.

此时就需要针对 Message 进行序列化和反序列化.

此处使用 标准库 自带的 序列化/反序列化 操作.

只用实现Serializable接口就好,不用重写里面的方法,这里需要注意几点的是:

  • Message与BasicProperties都要进行序列化
  • 关于偏移量的俩属性并不需要被序列化保存到文件中,因为此时消息一旦被写入文件之后, 所在的位置就固定了. 并不需要单独存储.这俩属性存在的目的, 主要就是为了让内存中的 Message 对象, 能够快速找到对应的硬盘上的 Message 的位置.所以不用序列化
  • java提供了关键字transient可以使相应属性不被序列化

代码实现如下:

/*
 * 表示要传递的消息
 * 注意!!! 此处的 Message 对象, 是需要能够在网络上传输, 并且也需要能写入到文件中.
 * 此时就需要针对 Message 进行序列化和反序列化.
 * 此处使用 标准库 自带的 序列化/反序列化 操作.
 */
public class Message implements Serializable {
    //消息的属性
    private BasicProperties basicProperties = new BasicProperties();
    //消息的正文
    private byte[] body;
    // 下面的属性则是辅助用的属性.
    // Message 后续会存储到文件中(如果持久化的话).
    // 一个文件中会存储很多的消息. 如何找到某个消息, 在文件中的具体位置呢?
    // 使用下列的两个偏移量来进行表示. [offsetBeg, offsetEnd)
    // 这俩属性并不需要被序列化保存到文件中~~ 此时消息一旦被写入文件之后, 所在的位置就固定了. 并不需要单独存储.
    // 这俩属性存在的目的, 主要就是为了让内存中的 Message 对象, 能够快速找到对应的硬盘上的 Message 的位置.
    private transient long offsetBeg = 0;  // 消息数据的开头距离文件开头的位置偏移(字节)
    private transient long offsetEnd = 0;  // 消息数据的结尾距离文件开头的位置偏移(字节)
    // 使用这个属性表示该消息在文件中是否是有效消息. (针对文件中的消息, 如果删除, 使用逻辑删除的方式)
    // 0x1 表示有效. 0x0 表示无效.
    private byte isValid = 0x1;
    // 创建一个工厂方法, 让工厂方法帮我们封装一下创建 Message 对象的过程.
    // 这个方法中创建的 Message 对象, 会自动生成唯一的 MessageId
    // 万一 routingKey 和 basicProperties 里的 routingKey 冲突, 以外面的为主.
    public Message createMessageWithId(String routingKey, BasicProperties basicProperties, byte[] body) {
        Message message = new Message();
        if(basicProperties != null) {
            message.setBasicProperties(basicProperties);
        }
        //此处生成的 MessageId 以 M- 作为前缀,与其他id进行区分
        message.setMessageId("M-" + UUID.randomUUID());
        message.setRoutingKey(routingKey);
        message.body = body;
        // 此处是把 body 和 basicProperties 先设置出来. 他俩是 Message 的核心内容.
        // 而 offsetBeg, offsetEnd, isValid, 则是消息持久化的时候才会用到. 在把消息写入文件之前再进行设定.
        // 此处只是在内存中创建一个 Message 对象,所以不设置.
        return message;
    }
    public String getMessageId() {
        return basicProperties.getMessageId();
    }
    public void setMessageId(String messageId) {
        basicProperties.setMessageId(messageId);
    }
    public String getRoutingKey() {
        return basicProperties.getRoutingKey();
    }
    public void setRoutingKey(String routingKey) {
        basicProperties.setRoutingKey(routingKey);
    }
    public int getDeliverMode() {
        return basicProperties.getDeliverMode();
    }
    public void setDeliverMode(int mode) {
        basicProperties.setDeliverMode(mode);
    }
    public BasicProperties getBasicProperties() {
        return basicProperties;
    }
    public void setBasicProperties(BasicProperties basicProperties) {
        this.basicProperties = basicProperties;
    }
    public byte[] getBody() {
        return body;
    }
    public void setBody(byte[] body) {
        this.body = body;
    }
    public long getOffsetBeg() {
        return offsetBeg;
    }
    public void setOffsetBeg(long offsetBeg) {
        this.offsetBeg = offsetBeg;
    }
    public long getOffsetEnd() {
        return offsetEnd;
    }
    public void setOffsetEnd(long offsetEnd) {
        this.offsetEnd = offsetEnd;
    }
    public byte getIsValid() {
        return isValid;
    }
    public void setIsValid(byte isValid) {
        this.isValid = isValid;
    }
}
/*
 * 此处代码为 Message 对象相应的属性
 * 注意!!! 此处的 BasicProperties 对象, 是需要能够在网络上传输, 并且也需要能写入到文件中.
 * 此时就需要针对 BasicProperties 进行序列化和反序列化.
 * 此处使用 标准库 自带的 序列化/反序列化 操作.
 */
public class BasicProperties implements Serializable {
    // 消息的唯一身份标识. 此处为了保证 id 的唯一性, 使用 UUID 来作为 message id
    private String messageId;
    // 是一个消息上带有的内容, 和 bindingKey 做匹配.
    // 如果当前的交换机类型是 DIRECT, 此时 routingKey 就表示要转发的队列名.
    // 如果当前的交换机类型是 FANOUT, 此时 routingKey 无意义(不使用).
    // 如果当前的交换机类型是 TOPIC, 此时 routingKey 就要和 bindingKey 做匹配. 符合要求的才能转发给对应队列.
    private String routingKey;
    // 这个属性表示消息是否要持久化. 1 表示不持久化, 2 表示持久化. (RabbitMQ 就是这样搞的....)
    private int deliverMode = 1;
    //为了后续代码更加方便
    public String getMessageId() {
        return messageId;
    }
    public void setMessageId(String messageId) {
        this.messageId = messageId;
    }
    public String getRoutingKey() {
        return routingKey;
    }
    public void setRoutingKey(String routingKey) {
        this.routingKey = routingKey;
    }
    public int getDeliverMode() {
        return deliverMode;
    }
    public void setDeliverMode(int deliverMode) {
        this.deliverMode = deliverMode;
    }
}

⭕总结

关于《【消息队列开发】 创建核心类》就讲解到这儿,感谢大家的支持,欢迎各位留言交流以及批评指正,如果文章对您有帮助或者觉得作者写的还不错可以点一下关注,点赞,收藏支持一下!

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
6月前
|
消息中间件 Java 数据库
【消息队列开发】 实现 VirtualHostTests 类——测试虚拟主机操作
【消息队列开发】 实现 VirtualHostTests 类——测试虚拟主机操作
|
6月前
|
消息中间件 存储 安全
【消息队列开发】 实现ConsumerManager类——消费消息的核心逻辑
【消息队列开发】 实现ConsumerManager类——消费消息的核心逻辑
|
6月前
|
消息中间件 网络协议 Java
【消息队列开发】 实现BrokerServer类——本体服务器
【消息队列开发】 实现BrokerServer类——本体服务器
|
6月前
|
消息中间件 Java
【消息队列开发】 实现消费者订阅消息
【消息队列开发】 实现消费者订阅消息
|
6月前
|
消息中间件 Java 开发工具
消息队列 MQ产品使用合集之topic相同,但是tag不同,这个类不能放入map中,该如何处理
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
6月前
|
消息中间件 Java Spring
JavaWeb后端开发Spring框架之消息 消息队列案例--订单短信通知
JavaWeb后端开发Spring框架之消息 消息队列案例--订单短信通知
51 0
|
6月前
|
消息中间件 API
【消息队列开发】 实现 MqClientTests 类——测试客户端
【消息队列开发】 实现 MqClientTests 类——测试客户端
|
6月前
|
消息中间件 存储 网络协议
【消息队列开发】实现客户端
【消息队列开发】实现客户端
|
6月前
|
消息中间件 网络协议
【消息队列开发】 设计网络通信协议
【消息队列开发】 设计网络通信协议
|
5月前
|
消息中间件 C语言 RocketMQ
消息队列 MQ操作报错合集之出现"Connection reset by peer"的错误,该如何处理
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。