🍃前言
本次开发任务
- 实现 BrokerServer 类,也就是咱们消息队列的本体服务器。
其实本质上就是一个 TCP 的服务器。
🎋创建 BrokerServer 类
创建 BrokerServer 类如下:
public class BrokerServer { // 当前程序只考虑⼀个虚拟主机的情况. private VirtualHost virtualHost = new VirtualHost("default-VirtualHost"); // key 为 channelId, value 为 channel 对应的 socket 对象. private ConcurrentHashMap<String, Socket> sessions = new ConcurrentHashMap<> private ServerSocket serverSocket; private ExecutorService executorService; private volatile boolean runnable = true; }
- virtualHost表示服务器持有的虚拟主机.队列,交换机,绑定,消息都是通过虚拟主机管理.
- sessions ⽤来管理所有的客⼾端的连接. 记录每个客户端的 socket.
- serverSocket 是服务器自身的 socket
- executorService 这个线程池用来处理响应
- runnable 这个标志位用来控制服务器的运行停⽌
🎍启动与停止服务器
代码实现如下:
public BrokerServer(int port) throws IOException { serverSocket = new ServerSocket(port); } public void start() throws IOException { System.out.println("[BrokerServer] 启动!"); executorService = Executors.newCachedThreadPool(); try { while (runnable) { Socket clientSocket = serverSocket.accept(); // 把处理连接的逻辑丢给这个线程池. executorService.submit(() -> { processConnection(clientSocket); }); } } catch (SocketException e) { System.out.println("[BrokerServer] 服务器停止运行!"); // e.printStackTrace(); } } // 一般来说停止服务器, 就是直接 kill 掉对应进程就行了. // 此处还是搞一个单独的停止方法. 主要是用于后续的单元测试. public void stop() throws IOException { runnable = false; // 把线程池中的任务都放弃了. 让线程都销毁. executorService.shutdownNow(); serverSocket.close(); }
🍀实现处理连接
通过这个方法, 来处理一个客户端的连接.
我们使用 InputStream
与 OutputStream
,由于后面要按照特定格式来读取并解析.
此时就需要用到 DataInputStream
和 DataOutputStream
在这一个连接中, 可能会涉及到多个请求和响应,我们使用一个while(true)
来进行实现
在此循环我们要做的事情有三件:
- 读取请求并解析
- 根据请求计算响应
- 把响应写回客户端
具体处理逻辑,我们后面再仔细实现,
那么我们怎么结束这个循环呢?
注意我们上面使用的是 DataInputStream
和 DataOutputStream
,当没有数据进行读取的时候,就会进行抛出异常而结束循环
最后当连接处理完了, 就需要记得关闭 socket, 一个 TCP 连接中, 可能包含多个 channel. 需要把当前这个 socket 对应的所有 channel 也顺便清理掉.
代码实现如下:
// 通过这个方法, 来处理一个客户端的连接. // 在这一个连接中, 可能会涉及到多个请求和响应. private void processConnection(Socket clientSocket) { try (InputStream inputStream = clientSocket.getInputStream(); OutputStream outputStream = clientSocket.getOutputStream()) { // 这里需要按照特定格式来读取并解析. 此时就需要用到 DataInputStream 和 DataOutputStream try (DataInputStream dataInputStream = new DataInputStream(inputStream); DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) { while (true) { // 1. 读取请求并解析. Request request = readRequest(dataInputStream); // 2. 根据请求计算响应 Response response = process(request, clientSocket); // 3. 把响应写回给客户端 writeResponse(dataOutputStream, response); } } } catch (EOFException | SocketException e) { // 对于这个代码, DataInputStream 如果读到 EOF , 就会抛出一个 EOFException 异常. // 需要借助这个异常来结束循环 System.out.println("[BrokerServer] connection 关闭! 客户端的地址: " + clientSocket.getInetAddress().toString() + ":" + clientSocket.getPort()); } catch (IOException | ClassNotFoundException | MqException e) { System.out.println("[BrokerServer] connection 出现异常!"); e.printStackTrace(); } finally { try { // 当连接处理完了, 就需要记得关闭 socket clientSocket.close(); // 一个 TCP 连接中, 可能包含多个 channel. 需要把当前这个 socket 对应的所有 channel 也顺便清理掉. clearClosedSession(clientSocket); } catch (IOException e) { e.printStackTrace(); } } }
🎄实现 readRequest 与 writeResponse
关于读取请求,我们前面定义了一个类为 Request ,此时我们构造相应的对象,并对该对象相应属性进行填充即可。
代码实现如下:
private Request readRequest(DataInputStream dataInputStream) throws IOException { Request request = new Request(); request.setType(dataInputStream.readInt()); request.setLength(dataInputStream.readInt()); byte[] payload = new byte[request.getLength()]; int n = dataInputStream.read(payload); if (n != request.getLength()) { throw new IOException("读取请求格式出错!"); } request.setPayload(payload); return request; }
关于响应,实现相反,传入的 响应对象 相应的属性返回即可。
最后不要忘了刷新缓冲区
代码实现如下:
private void writeResponse(DataOutputStream dataOutputStream, Response response) throws IOException { dataOutputStream.writeInt(response.getType()); dataOutputStream.writeInt(response.getLength()); dataOutputStream.write(response.getPayload()); // 这个刷新缓冲区也是重要的操作!! dataOutputStream.flush(); }
🌴实现处理请求
先把请求转换成 BaseArguments , 获取到其中的 channelId 和 rid
再根据不同的 type, 分别处理不同的逻辑. (主要是调用virtualHost中不同的方法).
针对消息订阅操作,则需要在存在消息的时候通过回调,把响应结果写回给对应的客⼾端.
最后构造成统⼀的响应.
代码实现如下:
private Response process(Request request, Socket clientSocket) throws IOException, ClassNotFoundException, MqException { // 1. 把 request 中的 payload 做一个初步的解析. BasicArguments basicArguments = (BasicArguments) BinaryTool.fromBytes(request.getPayload()); System.out.println("[Request] rid=" + basicArguments.getRid() + ", channelId=" + basicArguments.getChannelId() + ", type=" + request.getType() + ", length=" + request.getLength()); // 2. 根据 type 的值, 来进一步区分接下来这次请求要干啥. boolean ok = true; if (request.getType() == 0x1) { // 创建 channel sessions.put(basicArguments.getChannelId(), clientSocket); System.out.println("[BrokerServer] 创建 channel 完成! channelId=" + basicArguments.getChannelId()); } else if (request.getType() == 0x2) { // 销毁 channel sessions.remove(basicArguments.getChannelId()); System.out.println("[BrokerServer] 销毁 channel 完成! channelId=" + basicArguments.getChannelId()); } else if (request.getType() == 0x3) { // 创建交换机. 此时 payload 就是 ExchangeDeclareArguments 对象了. ExchangeDeclareArguments arguments = (ExchangeDeclareArguments) basicArguments; ok = virtualHost.exchangeDeclare(arguments.getExchangeName(), arguments.getExchangeType(), arguments.isDurable(), arguments.isAutoDelete(), arguments.getArguments()); } else if (request.getType() == 0x4) { ExchangeDeleteArguments arguments = (ExchangeDeleteArguments) basicArguments; ok = virtualHost.exchangeDelete(arguments.getExchangeName()); } else if (request.getType() == 0x5) { QueueDeclareArguments arguments = (QueueDeclareArguments) basicArguments; ok = virtualHost.queueDeclare(arguments.getQueueName(), arguments.isDurable(), arguments.isExclusive(), arguments.isAutoDelete(), arguments.getArguments()); } else if (request.getType() == 0x6) { QueueDeleteArguments arguments = (QueueDeleteArguments) basicArguments; ok = virtualHost.queueDelete((arguments.getQueueName())); } else if (request.getType() == 0x7) { QueueBindArguments arguments = (QueueBindArguments) basicArguments; ok = virtualHost.queueBind(arguments.getQueueName(), arguments.getExchangeName(), arguments.getBindingKey()); } else if (request.getType() == 0x8) { QueueUnbindArguments arguments = (QueueUnbindArguments) basicArguments; ok = virtualHost.queueUnbind(arguments.getQueueName(), arguments.getExchangeName()); } else if (request.getType() == 0x9) { BasicPublishArguments arguments = (BasicPublishArguments) basicArguments; ok = virtualHost.basicPublish(arguments.getExchangeName(), arguments.getRoutingKey(), arguments.getBasicProperties(), arguments.getBody()); } else if (request.getType() == 0xa) { BasicConsumeArguments arguments = (BasicConsumeArguments) basicArguments; ok = virtualHost.basicConsume(arguments.getConsumerTag(), arguments.getQueueName(), arguments.isAutoAck(), new Consumer() { // 这个回调函数要做的工作, 就是把服务器收到的消息可以直接推送回对应的消费者客户端 @Override public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException { // 先知道当前这个收到的消息, 要发给哪个客户端. // 此处 consumerTag 其实是 channelId. 根据 channelId 去 sessions 中查询, 就可以得到对应的 // socket 对象了, 从而可以往里面发送数据了 // 1. 根据 channelId 找到 socket 对象 Socket clientSocket = sessions.get(consumerTag); if (clientSocket == null || clientSocket.isClosed()) { throw new MqException("[BrokerServer] 订阅消息的客户端已经关闭!"); } // 2. 构造响应数据 SubScribeReturns subScribeReturns = new SubScribeReturns(); subScribeReturns.setChannelId(consumerTag); subScribeReturns.setRid(""); // 由于这里只有响应, 没有请求, 不需要去对应. rid 暂时不需要. subScribeReturns.setOk(true); subScribeReturns.setConsumerTag(consumerTag); subScribeReturns.setBasicProperties(basicProperties); subScribeReturns.setBody(body); byte[] payload = BinaryTool.toBytes(subScribeReturns); Response response = new Response(); // 0xc 表示服务器给消费者客户端推送的消息数据. response.setType(0xc); // response 的 payload 就是一个 SubScribeReturns response.setLength(payload.length); response.setPayload(payload); // 3. 把数据写回给客户端. // 注意! 此处的 dataOutputStream 这个对象不能 close !!! // 如果 把 dataOutputStream 关闭, 就会直接把 clientSocket 里的 outputStream 也关了. // 此时就无法继续往 socket 中写入后续数据了. DataOutputStream dataOutputStream = new DataOutputStream(clientSocket.getOutputStream()); writeResponse(dataOutputStream, response); } }); } else if (request.getType() == 0xb) { // 调用 basicAck 确认消息. BasicAckArguments arguments = (BasicAckArguments) basicArguments; ok = virtualHost.basicAck(arguments.getQueueName(), arguments.getMessageId()); } else { // 当前的 type 是非法的. throw new MqException("[BrokerServer] 未知的 type! type=" + request.getType()); } // 3. 构造响应 BasicReturns basicReturns = new BasicReturns(); basicReturns.setChannelId(basicArguments.getChannelId()); basicReturns.setRid(basicArguments.getRid()); basicReturns.setOk(ok); byte[] payload = BinaryTool.toBytes(basicReturns); Response response = new Response(); response.setType(request.getType()); response.setLength(payload.length); response.setPayload(payload); System.out.println("[Response] rid=" + basicReturns.getRid() + ", channelId=" + basicReturns.getChannelId() + ", type=" + response.getType() + ", length=" + response.getLength()); return response; }
🌲实现 clearClosedSession
这里要做的事情, 主要就是遍历上述 sessions hash 表, 把该被关闭的 socket 对应的键值对, 统统删掉
需要注意的是:
- 我们在进行迭代的时候,不要直接删除,这样会影响集合类的结构
代码实现如下:
private void clearClosedSession(Socket clientSocket) { // 这里要做的事情, 主要就是遍历上述 sessions hash 表, 把该被关闭的 socket 对应的键值对, 统统删掉. List<String> toDeleteChannelId = new ArrayList<>(); for (Map.Entry<String, Socket> entry : sessions.entrySet()) { if (entry.getValue() == clientSocket) { // 不能在这里直接删除!!! // 这属于使用集合类的一个大忌!!! 一边遍历, 一边删除!!! // sessions.remove(entry.getKey()); toDeleteChannelId.add(entry.getKey()); } } for (String channelId : toDeleteChannelId) { sessions.remove(channelId); } System.out.println("[BrokerServer] 清理 session 完成! 被清理的 channelId=" + toDeleteChannelId); }
⭕总结
关于《【消息队列开发】 实现BrokerServer类——本体服务器》就讲解到这儿,感谢大家的支持,欢迎各位留言交流以及批评指正,如果文章对您有帮助或者觉得作者写的还不错可以点一下关注,点赞,收藏支持一下