🍃前言
本次开发任务
- 实现客户端代码部分
🌳实现思路
客户端三个核心类创建如下:
ConnectionFactory:表示一个连接工厂,这个类,持有服务器的地址,主要的功能,是创建出连接 Connection 对象
Connecttion:表示一个 TCP 连接,这个类,持有 Socket 对象,会进行写入请求与读取响应,管理者多个 Channel 对象
Channel:表示一个逻辑上的连接,还需要提供一系列方法,来和我们前面开发的服务器提供的核心 API 对应
这里的三个核心类都是博主仿照着 RabbitMQ 进行设定的。
这里博主再给一个通俗的理解吧。Connecttion 比作一个高速公路,那么 Channel 就是上面的行驶的小汽车,ConnectionFactory 就相当于选择那一条公路
🎍实现 ConnectionFactory 类
这里呢,本来仿照 RabbitMQ 我们是应该需要支持多个虚拟机的,本且每个虚拟机我都需要进行登录进行使用。
但是博主这里为了简单,此处先不进行实现,但是呢,会预留相应的字段,以便后续的功能扩展与实现
由于此处需要进行建立连接,所以必要的地址与端口号不能少,并且创建相应的方法创建
Connection 对象
代码实现如下:
public class ConnectionFactory { // broker server 的 ip 地址 private String host; // broker server 的端口号 private int port; // 访问 broker server 的哪个虚拟主机. // 下列几个属性暂时先都不搞了. // private String virtualHostName; // private String username; // private String password; public Connection newConnection() throws IOException { Connection connection = new Connection(host, port); return connection; } public String getHost() { return host; } public void setHost(String host) { this.host = host; } public int getPort() { return port; } public void setPort(int port) { this.port = port; } }
🍀实现 Connection 类
该类我们需要的属性有以下几个通信必备的
- Socket
- InputStream
- OutputStream
- DataInputStream
- DataOutputStream
因为我们需要管理多个 Channel ,且在多前程下进行管理,所以我们这里使用了一个
ConcurrentHashMap 来进行管理
除此之外我们还创建一个线程池来执行我们 Channel 里面的回调方法
🚩读取相应
有了基础属性后,我们先来进行一个初始化,我们选择使用构造方法进行初始化,当然了,该构造方法里,除了初始化,还创建了一个扫描线程,线程负责不停的从 socket 中读取响应数据.,把这个响应数据再交给对应的 channel 负责处理
既然要读取相应,那我们就先将读取相应与写入请求两个方法先行写出来,根据我们前面所约定的协议格式进行读取就好,代码实现如下:
// 发送请求 public void writeRequest(Request request) throws IOException { dataOutputStream.writeInt(request.getType()); dataOutputStream.writeInt(request.getLength()); dataOutputStream.write(request.getPayload()); dataOutputStream.flush(); System.out.println("[Connection] 发送请求! type=" + request.getType() + ", length=" + request.getLength()); } // 读取响应 public Response readResponse() throws IOException { Response response = new Response(); response.setType(dataInputStream.readInt()); response.setLength(dataInputStream.readInt()); byte[] payload = new byte[response.getLength()]; int n = dataInputStream.read(payload); if (n != response.getLength()) { throw new IOException("读取的响应数据不完整!"); } response.setPayload(payload); System.out.println("[Connection] 收到响应! type=" + response.getType() + ", length=" + response.getLength()); return response; }
然后我们构造方法的实现如下:
public Connection(String host, int port) throws IOException { socket = new Socket(host, port); inputStream = socket.getInputStream(); outputStream = socket.getOutputStream(); dataInputStream = new DataInputStream(inputStream); dataOutputStream = new DataOutputStream(outputStream); callbackPool = Executors.newFixedThreadPool(4); // 创建一个扫描线程, 由这个线程负责不停的从 socket 中读取响应数据. 把这个响应数据再交给对应的 channel 负责处理. Thread t = new Thread(() -> { try { while (!socket.isClosed()) { Response response = readResponse(); dispatchResponse(response); } } catch (SocketException e) { // 连接正常断开的. 此时这个异常直接忽略. System.out.println("[Connection] 连接正常断开!"); } catch (IOException | ClassNotFoundException | MqException e) { System.out.println("[Connection] 连接异常断开!"); e.printStackTrace(); } }); t.start(); }
🚩处理相应
在处理相应这一块,我们需要进行判断,当前相应是响应是一个针对控制请求的响应,还是服务器推送的消息
我们获取相应数据的 type 字段,若为 0xc,说明这是服务器推送来的消息数据、
此时我们就需要根据响应数据里面的 channelId 找到相应的 Channel 对象,然后实现该对象相应的回调方法
若不为 0xc,说明是一个针对控制请求的响应,此时我们只需要放入 Channel 相应的哈希表里面即可(该表后续会进行介绍)
// 使用这个方法来分别处理, 当前的响应是一个针对控制请求的响应, 还是服务器推送的消息. private void dispatchResponse(Response response) throws IOException, ClassNotFoundException, MqException { if (response.getType() == 0xc) { // 服务器推送来的消息数据 SubScribeReturns subScribeReturns = (SubScribeReturns) BinaryTool.fromBytes(response.getPayload()); // 根据 channelId 找到对应的 channel 对象 Channel channel = channelMap.get(subScribeReturns.getChannelId()); if (channel == null) { throw new MqException("[Connection] 该消息对应的 channel 在客户端中不存在! channelId=" + channel.getChannelId()); } // 执行该 channel 对象内部的回调. callbackPool.submit(() -> { try { channel.getConsumer().handleDelivery(subScribeReturns.getConsumerTag(), subScribeReturns.getBasicProperties(), subScribeReturns.getBody()); } catch (MqException | IOException e) { e.printStackTrace(); } }); } else { // 当前响应是针对刚才的控制请求的响应 BasicReturns basicReturns = (BasicReturns) BinaryTool.fromBytes(response.getPayload()); // 把这个结果放到对应的 channel 的 hash 表中. Channel channel = channelMap.get(basicReturns.getChannelId()); if (channel == null) { throw new MqException("[Connection] 该消息对应的 channel 在客户端中不存在! channelId=" + channel.getChannelId()); } channel.putReturns(basicReturns); } }
🚩创建 Channel 对象
通过这个方法,在 Connection 中能够创建出一个 Channel
// 通过这个方法, 在 Connection 中能够创建出一个 Channel public Channel createChannel() throws IOException { String channelId = "C-" + UUID.randomUUID().toString(); Channel channel = new Channel(channelId, this); // 把这个 channel 对象放到 Connection 管理 channel 的 哈希表 中. channelMap.put(channelId, channel); // 同时也需要把 "创建 channel" 的这个消息也告诉服务器. boolean ok = channel.createChannel(); if (!ok) { // 服务器这里创建失败了!! 整个这次创建 channel 操作不顺利!! // 把刚才已经加入 hash 表的键值对, 再删了. channelMap.remove(channelId); return null; } return channel; }
🎋实现 Channel 类
关于 Channel 我们希望具有以下属性
- ChannelId:当前这个 channel 属于哪个连接
- Connection:用来存储后续客户端收到的服务器的响应
- ConcurrentHashMap<String, BasicReturns>:如果当前 Channel 订阅了某个队列, 就需要在此处记录下对应回调是啥,当该队列的消息返回回来的时候, 调用回调。此处约定一个 Channel 中只能有一个回调
- Consumer:调用回调方法
提供相应的构造方法,代码如下:
public Channel(String channelId, Connection connection) { this.channelId = channelId; this.connection = connection; }
🚩等待与唤醒
我们希望,在我们发送消息后,可以等到一个回应,并且呢?
在每一次收到响应消息后,我们都可以唤醒一下。
如果响应等待收到了响应数据,则进行删除相应的hash表里的数据
代码实现如下:
// 期望使用这个方法来阻塞等待服务器的响应. private BasicReturns waitResult(String rid) { BasicReturns basicReturns = null; while ((basicReturns = basicReturnsMap.get(rid)) == null) { // 如果查询结果为 null, 说明包裹还没回来. // 此时就需要阻塞等待. synchronized (this) { try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } // 读取成功之后, 还需要把这个消息从哈希表中删除掉. basicReturnsMap.remove(rid); return basicReturns; } public void putReturns(BasicReturns basicReturns) { basicReturnsMap.put(basicReturns.getRid(), basicReturns); synchronized (this) { // 当前也不知道有多少个线程在等待上述的这个响应. // 把所有的等待的线程都唤醒. notifyAll(); } }
🚩相关API的实现
接下来我们实现一下客户端与服务器相关API 的书写。
此处实现步骤都是类似的,步骤如下:
- 根据自定义的网络协议构建相应的请求信息
- 发送请求
- 等待读取响应
- 返回一个响应结果
// 在这个方法中, 和服务器进行交互, 告知服务器, 此处客户端创建了新的 channel 了. public boolean createChannel() throws IOException { // 对于创建 Channel 操作来说, payload 就是一个 basicArguments 对象 BasicArguments basicArguments = new BasicArguments(); basicArguments.setChannelId(channelId); basicArguments.setRid(generateRid()); byte[] payload = BinaryTool.toBytes(basicArguments); Request request = new Request(); request.setType(0x1); request.setLength(payload.length); request.setPayload(payload); // 构造出完整请求之后, 就可以发送这个请求了. connection.writeRequest(request); // 等待服务器的响应 BasicReturns basicReturns = waitResult(basicArguments.getRid()); return basicReturns.isOk(); } private String generateRid() { return "R-" + UUID.randomUUID().toString(); } // 关闭 channel, 给服务器发送一个 type = 0x2 的请求 public boolean close() throws IOException { BasicArguments basicArguments = new BasicArguments(); basicArguments.setRid(generateRid()); basicArguments.setChannelId(channelId); byte[] payload = BinaryTool.toBytes(basicArguments); Request request = new Request(); request.setType(0x2); request.setLength(payload.length); request.setPayload(payload); connection.writeRequest(request); BasicReturns basicReturns = waitResult(basicArguments.getRid()); return basicReturns.isOk(); } // 创建交换机 public boolean exchangeDeclare(String exchangeName, ExchangeType exchangeType, boolean durable, boolean autoDelete, Map<String, Object> arguments) throws IOException { ExchangeDeclareArguments exchangeDeclareArguments = new ExchangeDeclareArguments(); exchangeDeclareArguments.setRid(generateRid()); exchangeDeclareArguments.setChannelId(channelId); exchangeDeclareArguments.setExchangeName(exchangeName); exchangeDeclareArguments.setExchangeType(exchangeType); exchangeDeclareArguments.setDurable(durable); exchangeDeclareArguments.setAutoDelete(autoDelete); exchangeDeclareArguments.setArguments(arguments); byte[] payload = BinaryTool.toBytes(exchangeDeclareArguments); Request request = new Request(); request.setType(0x3); request.setLength(payload.length); request.setPayload(payload); connection.writeRequest(request); BasicReturns basicReturns = waitResult(exchangeDeclareArguments.getRid()); return basicReturns.isOk(); } // 删除交换机 public boolean exchangeDelete(String exchangeName) throws IOException { ExchangeDeleteArguments arguments = new ExchangeDeleteArguments(); arguments.setRid(generateRid()); arguments.setChannelId(channelId); arguments.setExchangeName(exchangeName); byte[] payload = BinaryTool.toBytes(arguments); Request request = new Request(); request.setType(0x4); request.setLength(payload.length); request.setPayload(payload); connection.writeRequest(request); BasicReturns basicReturns = waitResult(arguments.getRid()); return basicReturns.isOk(); } // 创建队列 public boolean queueDeclare(String queueName, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException { QueueDeclareArguments queueDeclareArguments = new QueueDeclareArguments(); queueDeclareArguments.setRid(generateRid()); queueDeclareArguments.setChannelId(channelId); queueDeclareArguments.setQueueName(queueName); queueDeclareArguments.setDurable(durable); queueDeclareArguments.setExclusive(exclusive); queueDeclareArguments.setAutoDelete(autoDelete); queueDeclareArguments.setArguments(arguments); byte[] payload = BinaryTool.toBytes(queueDeclareArguments); Request request = new Request(); request.setType(0x5); request.setLength(payload.length); request.setPayload(payload); connection.writeRequest(request); BasicReturns basicReturns = waitResult(queueDeclareArguments.getRid()); return basicReturns.isOk(); } // 删除队列 public boolean queueDelete(String queueName) throws IOException { QueueDeleteArguments arguments = new QueueDeleteArguments(); arguments.setRid(generateRid()); arguments.setChannelId(channelId); arguments.setQueueName(queueName); byte[] payload = BinaryTool.toBytes(arguments); Request request = new Request(); request.setType(0x6); request.setLength(payload.length); request.setPayload(payload); connection.writeRequest(request); BasicReturns basicReturns = waitResult(arguments.getRid()); return basicReturns.isOk(); } // 创建绑定 public boolean queueBind(String queueName, String exchangeName, String bindingKey) throws IOException { QueueBindArguments arguments = new QueueBindArguments(); arguments.setRid(generateRid()); arguments.setChannelId(channelId); arguments.setQueueName(queueName); arguments.setExchangeName(exchangeName); arguments.setBindingKey(bindingKey); byte[] payload = BinaryTool.toBytes(arguments); Request request = new Request(); request.setType(0x7); request.setLength(payload.length); request.setPayload(payload); connection.writeRequest(request); BasicReturns basicReturns = waitResult(arguments.getRid()); return basicReturns.isOk(); } // 解除绑定 public boolean queueUnbind(String queueName, String exchangeName) throws IOException { QueueUnbindArguments arguments = new QueueUnbindArguments(); arguments.setRid(generateRid()); arguments.setChannelId(channelId); arguments.setQueueName(queueName); arguments.setExchangeName(exchangeName); byte[] payload = BinaryTool.toBytes(arguments); Request request = new Request(); request.setType(0x8); request.setLength(payload.length); request.setPayload(payload); connection.writeRequest(request); BasicReturns basicReturns = waitResult(arguments.getRid()); return basicReturns.isOk(); } // 发送消息 public boolean basicPublish(String exchangeName, String routingKey, BasicProperties basicProperties, byte[] body) throws IOException { BasicPublishArguments arguments = new BasicPublishArguments(); arguments.setRid(generateRid()); arguments.setChannelId(channelId); arguments.setExchangeName(exchangeName); arguments.setRoutingKey(routingKey); arguments.setBasicProperties(basicProperties); arguments.setBody(body); byte[] payload = BinaryTool.toBytes(arguments); Request request = new Request(); request.setType(0x9); request.setLength(payload.length); request.setPayload(payload); connection.writeRequest(request); BasicReturns basicReturns = waitResult(arguments.getRid()); return basicReturns.isOk(); } // 订阅消息 public boolean basicConsume(String queueName, boolean autoAck, Consumer consumer) throws MqException, IOException { // 先设置回调. if (this.consumer != null) { throw new MqException("该 channel 已经设置过消费消息的回调了, 不能重复设置!"); } this.consumer = consumer; BasicConsumeArguments arguments = new BasicConsumeArguments(); arguments.setRid(generateRid()); arguments.setChannelId(channelId); arguments.setConsumerTag(channelId); // 此处 consumerTag 也使用 channelId 来表示了. arguments.setQueueName(queueName); arguments.setAutoAck(autoAck); byte[] payload = BinaryTool.toBytes(arguments); Request request = new Request(); request.setType(0xa); request.setLength(payload.length); request.setPayload(payload); connection.writeRequest(request); BasicReturns basicReturns = waitResult(arguments.getRid()); return basicReturns.isOk(); } // 确认消息 public boolean basicAck(String queueName, String messageId) throws IOException { BasicAckArguments arguments = new BasicAckArguments(); arguments.setRid(generateRid()); arguments.setChannelId(channelId); arguments.setQueueName(queueName); arguments.setMessageId(messageId); byte[] payload = BinaryTool.toBytes(arguments); Request request = new Request(); request.setType(0xb); request.setLength(payload.length); request.setPayload(payload); connection.writeRequest(request); BasicReturns basicReturns = waitResult(arguments.getRid()); return basicReturns.isOk(); }
⭕总结
关于《【消息队列开发】实现客户端》就讲解到这儿,感谢大家的支持,欢迎各位留言交流以及批评指正,如果文章对您有帮助或者觉得作者写的还不错可以点一下关注,点赞,收藏支持一下