【消息队列开发】实现客户端

简介: 【消息队列开发】实现客户端

🍃前言

本次开发任务

  • 实现客户端代码部分

🌳实现思路

客户端三个核心类创建如下:

ConnectionFactory:表示一个连接工厂,这个类,持有服务器的地址,主要的功能,是创建出连接 Connection 对象

Connecttion:表示一个 TCP 连接,这个类,持有 Socket 对象,会进行写入请求与读取响应,管理者多个 Channel 对象

Channel:表示一个逻辑上的连接,还需要提供一系列方法,来和我们前面开发的服务器提供的核心 API 对应

这里的三个核心类都是博主仿照着 RabbitMQ 进行设定的。

这里博主再给一个通俗的理解吧。Connecttion 比作一个高速公路,那么 Channel 就是上面的行驶的小汽车,ConnectionFactory 就相当于选择那一条公路

🎍实现 ConnectionFactory 类

这里呢,本来仿照 RabbitMQ 我们是应该需要支持多个虚拟机的,本且每个虚拟机我都需要进行登录进行使用。

但是博主这里为了简单,此处先不进行实现,但是呢,会预留相应的字段,以便后续的功能扩展与实现

由于此处需要进行建立连接,所以必要的地址与端口号不能少,并且创建相应的方法创建

Connection 对象

代码实现如下:

public class ConnectionFactory {
    // broker server 的 ip 地址
    private String host;
    // broker server 的端口号
    private int port;
    // 访问 broker server 的哪个虚拟主机.
    // 下列几个属性暂时先都不搞了.
//    private String virtualHostName;
//    private String username;
//    private String password;
    public Connection newConnection() throws IOException {
        Connection connection = new Connection(host, port);
        return connection;
    }
    public String getHost() {
        return host;
    }
    public void setHost(String host) {
        this.host = host;
    }
    public int getPort() {
        return port;
    }
    public void setPort(int port) {
        this.port = port;
    }
}

🍀实现 Connection 类

该类我们需要的属性有以下几个通信必备的

  1. Socket
  2. InputStream
  3. OutputStream
  4. DataInputStream
  5. DataOutputStream

因为我们需要管理多个 Channel ,且在多前程下进行管理,所以我们这里使用了一个

ConcurrentHashMap 来进行管理

除此之外我们还创建一个线程池来执行我们 Channel 里面的回调方法

🚩读取相应

有了基础属性后,我们先来进行一个初始化,我们选择使用构造方法进行初始化,当然了,该构造方法里,除了初始化,还创建了一个扫描线程,线程负责不停的从 socket 中读取响应数据.,把这个响应数据再交给对应的 channel 负责处理

既然要读取相应,那我们就先将读取相应与写入请求两个方法先行写出来,根据我们前面所约定的协议格式进行读取就好,代码实现如下:

// 发送请求
public void writeRequest(Request request) throws IOException {
    dataOutputStream.writeInt(request.getType());
    dataOutputStream.writeInt(request.getLength());
    dataOutputStream.write(request.getPayload());
    dataOutputStream.flush();
    System.out.println("[Connection] 发送请求! type=" + request.getType() + ", length=" + request.getLength());
}
// 读取响应
public Response readResponse() throws IOException {
    Response response = new Response();
    response.setType(dataInputStream.readInt());
    response.setLength(dataInputStream.readInt());
    byte[] payload = new byte[response.getLength()];
    int n = dataInputStream.read(payload);
    if (n != response.getLength()) {
        throw new IOException("读取的响应数据不完整!");
    }
    response.setPayload(payload);
    System.out.println("[Connection] 收到响应! type=" + response.getType() + ", length=" + response.getLength());
    return response;
}

然后我们构造方法的实现如下:

public Connection(String host, int port) throws IOException {
    socket = new Socket(host, port);
    inputStream = socket.getInputStream();
    outputStream = socket.getOutputStream();
    dataInputStream = new DataInputStream(inputStream);
    dataOutputStream = new DataOutputStream(outputStream);
    callbackPool = Executors.newFixedThreadPool(4);
    // 创建一个扫描线程, 由这个线程负责不停的从 socket 中读取响应数据. 把这个响应数据再交给对应的 channel 负责处理.
    Thread t = new Thread(() -> {
        try {
            while (!socket.isClosed()) {
                Response response = readResponse();
                dispatchResponse(response);
            }
        } catch (SocketException e) {
            // 连接正常断开的. 此时这个异常直接忽略.
            System.out.println("[Connection] 连接正常断开!");
        } catch (IOException | ClassNotFoundException | MqException e) {
            System.out.println("[Connection] 连接异常断开!");
            e.printStackTrace();
        }
    });
    t.start();
}

🚩处理相应

在处理相应这一块,我们需要进行判断,当前相应是响应是一个针对控制请求的响应,还是服务器推送的消息

我们获取相应数据的 type 字段,若为 0xc,说明这是服务器推送来的消息数据、

此时我们就需要根据响应数据里面的 channelId 找到相应的 Channel 对象,然后实现该对象相应的回调方法

若不为 0xc,说明是一个针对控制请求的响应,此时我们只需要放入 Channel 相应的哈希表里面即可(该表后续会进行介绍)

// 使用这个方法来分别处理, 当前的响应是一个针对控制请求的响应, 还是服务器推送的消息.
private void dispatchResponse(Response response) throws IOException, ClassNotFoundException, MqException {
    if (response.getType() == 0xc) {
        // 服务器推送来的消息数据
        SubScribeReturns subScribeReturns = (SubScribeReturns) BinaryTool.fromBytes(response.getPayload());
        // 根据 channelId 找到对应的 channel 对象
        Channel channel = channelMap.get(subScribeReturns.getChannelId());
        if (channel == null) {
            throw new MqException("[Connection] 该消息对应的 channel 在客户端中不存在! channelId=" + channel.getChannelId());
        }
        // 执行该 channel 对象内部的回调.
        callbackPool.submit(() -> {
            try {
                channel.getConsumer().handleDelivery(subScribeReturns.getConsumerTag(), subScribeReturns.getBasicProperties(),
                        subScribeReturns.getBody());
            } catch (MqException | IOException e) {
                e.printStackTrace();
            }
        });
    } else {
        // 当前响应是针对刚才的控制请求的响应
        BasicReturns basicReturns = (BasicReturns) BinaryTool.fromBytes(response.getPayload());
        // 把这个结果放到对应的 channel 的 hash 表中.
        Channel channel = channelMap.get(basicReturns.getChannelId());
        if (channel == null) {
            throw new MqException("[Connection] 该消息对应的 channel 在客户端中不存在! channelId=" + channel.getChannelId());
        }
        channel.putReturns(basicReturns);
    }
}

🚩创建 Channel 对象

通过这个方法,在 Connection 中能够创建出一个 Channel

// 通过这个方法, 在 Connection 中能够创建出一个 Channel
public Channel createChannel() throws IOException {
    String channelId = "C-" + UUID.randomUUID().toString();
    Channel channel = new Channel(channelId, this);
    // 把这个 channel 对象放到 Connection 管理 channel 的 哈希表 中.
    channelMap.put(channelId, channel);
    // 同时也需要把 "创建 channel" 的这个消息也告诉服务器.
    boolean ok = channel.createChannel();
    if (!ok) {
        // 服务器这里创建失败了!! 整个这次创建 channel 操作不顺利!!
        // 把刚才已经加入 hash 表的键值对, 再删了.
        channelMap.remove(channelId);
        return null;
    }
    return channel;
}

🎋实现 Channel 类

关于 Channel 我们希望具有以下属性

  1. ChannelId:当前这个 channel 属于哪个连接
  2. Connection:用来存储后续客户端收到的服务器的响应
  3. ConcurrentHashMap<String, BasicReturns>:如果当前 Channel 订阅了某个队列, 就需要在此处记录下对应回调是啥,当该队列的消息返回回来的时候, 调用回调。此处约定一个 Channel 中只能有一个回调
  4. Consumer:调用回调方法

提供相应的构造方法,代码如下:

public Channel(String channelId, Connection connection) {
    this.channelId = channelId;
    this.connection = connection;
}

🚩等待与唤醒

我们希望,在我们发送消息后,可以等到一个回应,并且呢?

在每一次收到响应消息后,我们都可以唤醒一下。

如果响应等待收到了响应数据,则进行删除相应的hash表里的数据

代码实现如下:

// 期望使用这个方法来阻塞等待服务器的响应.
private BasicReturns waitResult(String rid) {
    BasicReturns basicReturns = null;
    while ((basicReturns = basicReturnsMap.get(rid)) == null) {
        // 如果查询结果为 null, 说明包裹还没回来.
        // 此时就需要阻塞等待.
        synchronized (this) {
            try {
                wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    // 读取成功之后, 还需要把这个消息从哈希表中删除掉.
    basicReturnsMap.remove(rid);
    return basicReturns;
}
public void putReturns(BasicReturns basicReturns) {
    basicReturnsMap.put(basicReturns.getRid(), basicReturns);
    synchronized (this) {
        // 当前也不知道有多少个线程在等待上述的这个响应.
        // 把所有的等待的线程都唤醒.
        notifyAll();
    }
}

🚩相关API的实现

接下来我们实现一下客户端与服务器相关API 的书写。

此处实现步骤都是类似的,步骤如下:

  1. 根据自定义的网络协议构建相应的请求信息
  2. 发送请求
  3. 等待读取响应
  4. 返回一个响应结果
// 在这个方法中, 和服务器进行交互, 告知服务器, 此处客户端创建了新的 channel 了.
public boolean createChannel() throws IOException {
    // 对于创建 Channel 操作来说, payload 就是一个 basicArguments 对象
    BasicArguments basicArguments = new BasicArguments();
    basicArguments.setChannelId(channelId);
    basicArguments.setRid(generateRid());
    byte[] payload = BinaryTool.toBytes(basicArguments);
    Request request = new Request();
    request.setType(0x1);
    request.setLength(payload.length);
    request.setPayload(payload);
    // 构造出完整请求之后, 就可以发送这个请求了.
    connection.writeRequest(request);
    // 等待服务器的响应
    BasicReturns basicReturns = waitResult(basicArguments.getRid());
    return basicReturns.isOk();
}
private String generateRid() {
    return "R-" + UUID.randomUUID().toString();
}
// 关闭 channel, 给服务器发送一个 type = 0x2 的请求
public boolean close() throws IOException {
    BasicArguments basicArguments = new BasicArguments();
    basicArguments.setRid(generateRid());
    basicArguments.setChannelId(channelId);
    byte[] payload = BinaryTool.toBytes(basicArguments);
    Request request = new Request();
    request.setType(0x2);
    request.setLength(payload.length);
    request.setPayload(payload);
    connection.writeRequest(request);
    BasicReturns basicReturns = waitResult(basicArguments.getRid());
    return basicReturns.isOk();
}
// 创建交换机
public boolean exchangeDeclare(String exchangeName, ExchangeType exchangeType, boolean durable, boolean autoDelete,
                               Map<String, Object> arguments) throws IOException {
    ExchangeDeclareArguments exchangeDeclareArguments = new ExchangeDeclareArguments();
    exchangeDeclareArguments.setRid(generateRid());
    exchangeDeclareArguments.setChannelId(channelId);
    exchangeDeclareArguments.setExchangeName(exchangeName);
    exchangeDeclareArguments.setExchangeType(exchangeType);
    exchangeDeclareArguments.setDurable(durable);
    exchangeDeclareArguments.setAutoDelete(autoDelete);
    exchangeDeclareArguments.setArguments(arguments);
    byte[] payload = BinaryTool.toBytes(exchangeDeclareArguments);
    Request request = new Request();
    request.setType(0x3);
    request.setLength(payload.length);
    request.setPayload(payload);
    connection.writeRequest(request);
    BasicReturns basicReturns = waitResult(exchangeDeclareArguments.getRid());
    return basicReturns.isOk();
}
// 删除交换机
public boolean exchangeDelete(String exchangeName) throws IOException {
    ExchangeDeleteArguments arguments = new ExchangeDeleteArguments();
    arguments.setRid(generateRid());
    arguments.setChannelId(channelId);
    arguments.setExchangeName(exchangeName);
    byte[] payload = BinaryTool.toBytes(arguments);
    Request request = new Request();
    request.setType(0x4);
    request.setLength(payload.length);
    request.setPayload(payload);
    connection.writeRequest(request);
    BasicReturns basicReturns = waitResult(arguments.getRid());
    return basicReturns.isOk();
}
// 创建队列
public boolean queueDeclare(String queueName, boolean durable, boolean exclusive, boolean autoDelete,
                            Map<String, Object> arguments) throws IOException {
    QueueDeclareArguments queueDeclareArguments = new QueueDeclareArguments();
    queueDeclareArguments.setRid(generateRid());
    queueDeclareArguments.setChannelId(channelId);
    queueDeclareArguments.setQueueName(queueName);
    queueDeclareArguments.setDurable(durable);
    queueDeclareArguments.setExclusive(exclusive);
    queueDeclareArguments.setAutoDelete(autoDelete);
    queueDeclareArguments.setArguments(arguments);
    byte[] payload = BinaryTool.toBytes(queueDeclareArguments);
    Request request = new Request();
    request.setType(0x5);
    request.setLength(payload.length);
    request.setPayload(payload);
    connection.writeRequest(request);
    BasicReturns basicReturns = waitResult(queueDeclareArguments.getRid());
    return basicReturns.isOk();
}
// 删除队列
public boolean queueDelete(String queueName) throws IOException {
    QueueDeleteArguments arguments = new QueueDeleteArguments();
    arguments.setRid(generateRid());
    arguments.setChannelId(channelId);
    arguments.setQueueName(queueName);
    byte[] payload = BinaryTool.toBytes(arguments);
    Request request = new Request();
    request.setType(0x6);
    request.setLength(payload.length);
    request.setPayload(payload);
    connection.writeRequest(request);
    BasicReturns basicReturns = waitResult(arguments.getRid());
    return basicReturns.isOk();
}
// 创建绑定
public boolean queueBind(String queueName, String exchangeName, String bindingKey) throws IOException {
    QueueBindArguments arguments = new QueueBindArguments();
    arguments.setRid(generateRid());
    arguments.setChannelId(channelId);
    arguments.setQueueName(queueName);
    arguments.setExchangeName(exchangeName);
    arguments.setBindingKey(bindingKey);
    byte[] payload = BinaryTool.toBytes(arguments);
    Request request = new Request();
    request.setType(0x7);
    request.setLength(payload.length);
    request.setPayload(payload);
    connection.writeRequest(request);
    BasicReturns basicReturns = waitResult(arguments.getRid());
    return basicReturns.isOk();
}
// 解除绑定
public boolean queueUnbind(String queueName, String exchangeName) throws IOException {
    QueueUnbindArguments arguments = new QueueUnbindArguments();
    arguments.setRid(generateRid());
    arguments.setChannelId(channelId);
    arguments.setQueueName(queueName);
    arguments.setExchangeName(exchangeName);
    byte[] payload = BinaryTool.toBytes(arguments);
    Request request = new Request();
    request.setType(0x8);
    request.setLength(payload.length);
    request.setPayload(payload);
    connection.writeRequest(request);
    BasicReturns basicReturns = waitResult(arguments.getRid());
    return basicReturns.isOk();
}
// 发送消息
public boolean basicPublish(String exchangeName, String routingKey, BasicProperties basicProperties, byte[] body) throws IOException {
    BasicPublishArguments arguments = new BasicPublishArguments();
    arguments.setRid(generateRid());
    arguments.setChannelId(channelId);
    arguments.setExchangeName(exchangeName);
    arguments.setRoutingKey(routingKey);
    arguments.setBasicProperties(basicProperties);
    arguments.setBody(body);
    byte[] payload = BinaryTool.toBytes(arguments);
    Request request = new Request();
    request.setType(0x9);
    request.setLength(payload.length);
    request.setPayload(payload);
    connection.writeRequest(request);
    BasicReturns basicReturns = waitResult(arguments.getRid());
    return basicReturns.isOk();
}
// 订阅消息
public boolean basicConsume(String queueName, boolean autoAck, Consumer consumer) throws MqException, IOException {
    // 先设置回调.
    if (this.consumer != null) {
        throw new MqException("该 channel 已经设置过消费消息的回调了, 不能重复设置!");
    }
    this.consumer = consumer;
    BasicConsumeArguments arguments = new BasicConsumeArguments();
    arguments.setRid(generateRid());
    arguments.setChannelId(channelId);
    arguments.setConsumerTag(channelId);  // 此处 consumerTag 也使用 channelId 来表示了.
    arguments.setQueueName(queueName);
    arguments.setAutoAck(autoAck);
    byte[] payload = BinaryTool.toBytes(arguments);
    Request request = new Request();
    request.setType(0xa);
    request.setLength(payload.length);
    request.setPayload(payload);
    connection.writeRequest(request);
    BasicReturns basicReturns = waitResult(arguments.getRid());
    return basicReturns.isOk();
}
// 确认消息
public boolean basicAck(String queueName, String messageId) throws IOException {
    BasicAckArguments arguments = new BasicAckArguments();
    arguments.setRid(generateRid());
    arguments.setChannelId(channelId);
    arguments.setQueueName(queueName);
    arguments.setMessageId(messageId);
    byte[] payload = BinaryTool.toBytes(arguments);
    Request request = new Request();
    request.setType(0xb);
    request.setLength(payload.length);
    request.setPayload(payload);
    connection.writeRequest(request);
    BasicReturns basicReturns = waitResult(arguments.getRid());
    return basicReturns.isOk();
}

⭕总结

关于《【消息队列开发】实现客户端》就讲解到这儿,感谢大家的支持,欢迎各位留言交流以及批评指正,如果文章对您有帮助或者觉得作者写的还不错可以点一下关注,点赞,收藏支持一下

相关文章
|
7月前
|
消息中间件 开发工具 RocketMQ
消息队列 MQ产品使用合集之如何关闭客户端的日志记录
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
6月前
|
消息中间件 Java C语言
消息队列 MQ使用问题之在使用C++客户端和GBase的ESQL进行编译时出现core dump,该怎么办
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
6月前
|
消息中间件 存储 监控
消息队列 MQ使用问题之客户端重启后仍然出现broker接收消息不均匀,该怎么办
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
6月前
|
消息中间件 安全 PHP
消息队列 MQ使用问题之如何获取PHP客户端代码
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
6月前
|
消息中间件 JavaScript Linux
消息队列 MQ操作报错合集之客户端在启动时遇到了连接错误,是什么原因
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
6月前
|
消息中间件 存储 负载均衡
消息队列 MQ使用问题之如何在grpc客户端中设置负载均衡器
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
7月前
|
消息中间件 Java 测试技术
消息队列 MQ操作报错合集之设置了setKeepAliveInterval(1)但仍然出现客户端未连接,该怎么解决
在使用消息队列MQ时,可能会遇到各种报错情况。以下是一些常见的错误场景、可能的原因以及解决建议的汇总:1.连接错误、2.消息发送失败、3.消息消费报错、4.消息重试与死信处理、5.资源与权限问题、6.配置错误、7.系统资源限制、8.版本兼容性问题。
109 2
|
7月前
|
消息中间件 Serverless 网络性能优化
消息队列 MQ产品使用合集之客户端和服务器之间的保活心跳检测间隔是怎么设置的
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
7月前
|
消息中间件 小程序 JavaScript
消息队列 MQ产品使用合集之如何限制部分客户端连接
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
7月前
|
消息中间件 Java Spring
JavaWeb后端开发Spring框架之消息 消息队列案例--订单短信通知
JavaWeb后端开发Spring框架之消息 消息队列案例--订单短信通知
66 0