🍃前言
今天我们来创建项目并实现服务器模块的核心类,如下图所示
🎍项目创建
这里我们创建一个sprig Boot 的项目,但是由于我们后面可能涉及与客户端交互的接口,所以我们这里也会添加spring MVC的依赖
🚩自定义包
这里我们将三个大的模块分别创建三个包
- common:公共模块
- mqclient:客户端模块
- mqserver:服务器模块
而在我们服务器模块我我们又会分为很多模块,今天我们先来完成内存管理模块,在mqserver路劲下创建一个新的包为core,接下来,将在这个core包里创建核心类
🍀Exchange(交换机)
作为一个交换机,我们希望它有以下属性
- 我们得拥有一个ID来标识该交换机,这里博主选择使用String类型的name属性来进行标识
- 对于交换机而言,因为有三个种类,所以我们希望有一个字段能够标识该交换机是何种类型,方法有很多种,这里博主选择另创建一个枚举类来标识。
- 枚举类名:ExchangeType,里面分别用
DIRECT(0),FANOUT(1),TOPIC(2);
来表示三种状态 - 我们使用一个Boolean的字段durable来表示该交换机是否持久化
以下两个属性,后面博主可能开发不完全,但是由于博主是仿照RabbitMQ实现的,所以RabbitMQ里面有的属性,博主这里也进行了添加
- 我们希望,如果该交换机没有人使用,我们就对它进行删除,这里博主选用boolean类型的autoDelete属性来标识,false-不删除,true-删除
- 我们再创建一个Map<String, Object>类型的属性arguments表示的是创建交换机时指定的一些额外的参数选项.
- 最后我们统一添加相应的get、set方法即可
代码实现如下:
/* * 这个类表示一个交换机 */ public class Exchange { // 此处使用 name 来作为交换机的身份标识. (唯一的) private String name; // 交换机类型, DIRECT(专属红包), FANOUT(群发红包), TOPIC(指令红包) private ExchangeType type = ExchangeType.DIRECT; private boolean durable = false; // 如果当前交换机, 没人使用了, 就会自动被删除. // 这个属性暂时先列在这里, 后续的代码中并没有真的实现这个自动删除功能~~ (RabbitMQ 是有的) private boolean autoDelete = false; // arguments 表示的是创建交换机时指定的一些额外的参数选项. 后续代码中并没有真的实现对应的功能, 先列出来. (RabbitMQ 也是有的) // 为了把这个 arguments 存到数据库中, 就需要把 Map 转成 json 格式的字符串. private Map<String, Object> arguments = new HashMap<>(); public String getName() { return name;} public void setName(String name) {this.name = name;} public ExchangeType getType() { return type; } public void setType(ExchangeType type) { this.type = type; } public boolean isDurable() { return durable; } public void setDurable(boolean durable) { this.durable = durable; } public boolean isAutoDelete() { return autoDelete; } public void setAutoDelete(boolean autoDelete) { this.autoDelete = autoDelete; } public Map<String, Object> getArguments() { return arguments; } public void setArguments(Map<String, Object> arguments) { this.arguments = arguments; }
/* 这是一个枚举类,里面表示的是交换机的类型 */ public enum ExchangeType { DIRECT(0), FANOUT(1), TOPIC(2); private final int type; //private封装,不想让外界访问 private ExchangeType(int type) { this.type = type; } public int getType() { return type; } }
🎄MSGQueue(队列)
作为一个队列,我们希望它有以下属性
- 表示队列的身份标识,使用String类型的name属性
- 表示该队列是否持久化,使用Boolean类型的durable属性,true 表示持久化保存, false 表示不持久化.
- 对于队列,我们希望通过一个属性事项独占功能,也就是这个队列只能被一个消费者使用(别人用不了),这里我们使用Boolean类型的exclusive属性进行表示,true -独占,false -公用
是否删除与额外参数的属性与交换机里面属性一样,这里不做赘述了
代码实现如下:
/* 这表示一个队列 */ public class MSGQueue { // 表示队列的身份标识. private String name; // 表示队列是否持久化, true 表示持久化保存, false 表示不持久化. private boolean durable = false; // 这个属性为 true, 表示这个队列只能被一个消费者使用(别人用不了). 如果为 false 则是大家都能使用 // 这个 独占 功能, 也是先把字段列在这里, 具体的独占功能暂时先不实现. private boolean exclusive = false; // 为 true 表示没有人使用之后, 就自动删除. false 则是不会自动删除. // 这个 自动删除 功能, 也是先把字段列在这里, 具体的自动删除功能暂时先不实现. private boolean autoDelete = false; // 也是表示扩展参数. 当前也是先列在这里, 先暂时不实现 private Map<String, Object> arguments = new HashMap<>(); public String getName() { return name; } public void setName(String name) { this.name = name; } public boolean isDurable() { return durable; } public void setDurable(boolean durable) { this.durable = durable; } public boolean isExclusive() { return exclusive; } public void setExclusive(boolean exclusive) { this.exclusive = exclusive; } public boolean isAutoDelete() { return autoDelete; } public void setAutoDelete(boolean autoDelete) { this.autoDelete = autoDelete; } public Map<String, Object> getArguments() { return arguments; } public void setArguments(Map<String, Object> arguments) { this.arguments = arguments; } }
🎋Binding(板顶管理,交换机与队列的关系)
该类我们主要有以下三个属性:
- 交换机id
- 队列id
- bindingKey对应的为交换机的类型
这里我们不设置是否持久化的原因
- Binding 这个东西, 是依附于 Exchange 和 Queue 的!!!
- 比如, 对于持久化来说, 如果 Exchange 和 Queue 任何一个都没有持久化,
- 此时你针对 Binding 持久化是没有任何意义的
代码实现如下:
* 表示队列和交换机之间的关系 */ public class Binding { //交换机名字 private String exchangeName; //队列名字 private String queueName; // bindingKey对应的交换机类型为Topic,主题匹配 // 相当于就是在出题, 要求领红包的人要画个 "桌子" 出来~~ private String bindingKey; //此处没有设置是否持久化存储的原因 // Binding 这个东西, 是依附于 Exchange 和 Queue 的!!! // 比如, 对于持久化来说, 如果 Exchange 和 Queue 任何一个都没有持久化, // 此时你针对 Binding 持久化是没有任何意义的 public String getExchangeName() { return exchangeName; } public void setExchangeName(String exchangeName) { this.exchangeName = exchangeName; } public String getQueueName() { return queueName; } public void setQueueName(String queueName) { this.queueName = queueName; } public String getBindingKey() { return bindingKey; } public void setBindingKey(String bindingKey) { this.bindingKey = bindingKey; } }
🌴Message(传递的消息)
对于Message我们需要明确它的两个核心属性
- 消息的属性
- 消息的正文
关于消息的属性,我们单独使用一个类来对它进行表示,类名为:BasicProperties
关于一个消息,我们应该包含以下属性:
- 唯一标识的id,由于消息比较多,此处为了保证 id 的唯一性, 使用 UUID 来作为 message id
- 还应该包含routingKey,与bindingKey 做匹配
- 如果当前的交换机类型是 DIRECT, 此时 routingKey 就表示要转发的队列名.
- 如果当前的交换机类型是 FANOUT, 此时 routingKey 无意义(不使用).
- 如果当前的交换机类型是 TOPIC, 此时 routingKey 就要和 bindingKey 做匹配. 符合要求的才能转发给对应队列.
- 对于消息我们也添加上是否可持久化的选项,参考RabbitMQ ,使用int类型的deliverMode属性,1 表示不持久化, 2 表示持久化
除去核心属性,我们还有辅助用的属性
如果Message 后续存储到文件中(如果持久化的话).
一个文件中会存储很多的消息. 如何找到某个消息, 在文件中的具体位置呢?
我们则使用使用下列的两个偏移量来进行表示.
- [offsetBeg, offsetEnd)
我们还希望添加一个属性isValid表示该消息在文件中是否是有效消息. (针对文件中的消息, 如果删除, 使用逻辑删除的方式,0x1 表示有效. 0x0 表示无效.)
对于消息,我们需要提供一个如何构造消息对象的方法,方法有很多,这里我们选择创建一个工厂方法, 让工厂方法帮我们封装一下创建 Message 对象的过程.
除此之外,由于消息是需要能够在网络上传输, 并且也需要能写入到文件中.
此时就需要针对 Message 进行序列化和反序列化.
此处使用 标准库 自带的 序列化/反序列化 操作.
只用实现Serializable接口就好,不用重写里面的方法,这里需要注意几点的是:
- Message与BasicProperties都要进行序列化
- 关于偏移量的俩属性并不需要被序列化保存到文件中,因为此时消息一旦被写入文件之后, 所在的位置就固定了. 并不需要单独存储.这俩属性存在的目的, 主要就是为了让内存中的 Message 对象, 能够快速找到对应的硬盘上的 Message 的位置.所以不用序列化
- java提供了关键字transient可以使相应属性不被序列化
代码实现如下:
/* * 表示要传递的消息 * 注意!!! 此处的 Message 对象, 是需要能够在网络上传输, 并且也需要能写入到文件中. * 此时就需要针对 Message 进行序列化和反序列化. * 此处使用 标准库 自带的 序列化/反序列化 操作. */ public class Message implements Serializable { //消息的属性 private BasicProperties basicProperties = new BasicProperties(); //消息的正文 private byte[] body; // 下面的属性则是辅助用的属性. // Message 后续会存储到文件中(如果持久化的话). // 一个文件中会存储很多的消息. 如何找到某个消息, 在文件中的具体位置呢? // 使用下列的两个偏移量来进行表示. [offsetBeg, offsetEnd) // 这俩属性并不需要被序列化保存到文件中~~ 此时消息一旦被写入文件之后, 所在的位置就固定了. 并不需要单独存储. // 这俩属性存在的目的, 主要就是为了让内存中的 Message 对象, 能够快速找到对应的硬盘上的 Message 的位置. private transient long offsetBeg = 0; // 消息数据的开头距离文件开头的位置偏移(字节) private transient long offsetEnd = 0; // 消息数据的结尾距离文件开头的位置偏移(字节) // 使用这个属性表示该消息在文件中是否是有效消息. (针对文件中的消息, 如果删除, 使用逻辑删除的方式) // 0x1 表示有效. 0x0 表示无效. private byte isValid = 0x1; // 创建一个工厂方法, 让工厂方法帮我们封装一下创建 Message 对象的过程. // 这个方法中创建的 Message 对象, 会自动生成唯一的 MessageId // 万一 routingKey 和 basicProperties 里的 routingKey 冲突, 以外面的为主. public Message createMessageWithId(String routingKey, BasicProperties basicProperties, byte[] body) { Message message = new Message(); if(basicProperties != null) { message.setBasicProperties(basicProperties); } //此处生成的 MessageId 以 M- 作为前缀,与其他id进行区分 message.setMessageId("M-" + UUID.randomUUID()); message.setRoutingKey(routingKey); message.body = body; // 此处是把 body 和 basicProperties 先设置出来. 他俩是 Message 的核心内容. // 而 offsetBeg, offsetEnd, isValid, 则是消息持久化的时候才会用到. 在把消息写入文件之前再进行设定. // 此处只是在内存中创建一个 Message 对象,所以不设置. return message; } public String getMessageId() { return basicProperties.getMessageId(); } public void setMessageId(String messageId) { basicProperties.setMessageId(messageId); } public String getRoutingKey() { return basicProperties.getRoutingKey(); } public void setRoutingKey(String routingKey) { basicProperties.setRoutingKey(routingKey); } public int getDeliverMode() { return basicProperties.getDeliverMode(); } public void setDeliverMode(int mode) { basicProperties.setDeliverMode(mode); } public BasicProperties getBasicProperties() { return basicProperties; } public void setBasicProperties(BasicProperties basicProperties) { this.basicProperties = basicProperties; } public byte[] getBody() { return body; } public void setBody(byte[] body) { this.body = body; } public long getOffsetBeg() { return offsetBeg; } public void setOffsetBeg(long offsetBeg) { this.offsetBeg = offsetBeg; } public long getOffsetEnd() { return offsetEnd; } public void setOffsetEnd(long offsetEnd) { this.offsetEnd = offsetEnd; } public byte getIsValid() { return isValid; } public void setIsValid(byte isValid) { this.isValid = isValid; } }
/* * 此处代码为 Message 对象相应的属性 * 注意!!! 此处的 BasicProperties 对象, 是需要能够在网络上传输, 并且也需要能写入到文件中. * 此时就需要针对 BasicProperties 进行序列化和反序列化. * 此处使用 标准库 自带的 序列化/反序列化 操作. */ public class BasicProperties implements Serializable { // 消息的唯一身份标识. 此处为了保证 id 的唯一性, 使用 UUID 来作为 message id private String messageId; // 是一个消息上带有的内容, 和 bindingKey 做匹配. // 如果当前的交换机类型是 DIRECT, 此时 routingKey 就表示要转发的队列名. // 如果当前的交换机类型是 FANOUT, 此时 routingKey 无意义(不使用). // 如果当前的交换机类型是 TOPIC, 此时 routingKey 就要和 bindingKey 做匹配. 符合要求的才能转发给对应队列. private String routingKey; // 这个属性表示消息是否要持久化. 1 表示不持久化, 2 表示持久化. (RabbitMQ 就是这样搞的....) private int deliverMode = 1; //为了后续代码更加方便 public String getMessageId() { return messageId; } public void setMessageId(String messageId) { this.messageId = messageId; } public String getRoutingKey() { return routingKey; } public void setRoutingKey(String routingKey) { this.routingKey = routingKey; } public int getDeliverMode() { return deliverMode; } public void setDeliverMode(int deliverMode) { this.deliverMode = deliverMode; } }
⭕总结
关于《【消息队列开发】 创建核心类》就讲解到这儿,感谢大家的支持,欢迎各位留言交流以及批评指正,如果文章对您有帮助或者觉得作者写的还不错可以点一下关注,点赞,收藏支持一下!