跨平台 AMQP 客户端开发指南

本文涉及的产品
实时数仓Hologres,5000CU*H 100GB 3个月
实时计算 Flink 版,5000CU*H 3个月
智能开放搜索 OpenSearch行业算法版,1GB 20LCU 1个月
简介: 【8月更文第28天】高级消息队列协议 (AMQP) 是一种开放标准的应用层协议,用于中间件。它定义了消息如何在消息代理(通常是消息队列服务器)与客户端应用程序之间传递。本文将指导您如何为不同的编程语言构建跨平台的 AMQP 客户端,并提供一些具体的代码示例。

摘要

高级消息队列协议 (AMQP) 是一种开放标准的应用层协议,用于中间件。它定义了消息如何在消息代理(通常是消息队列服务器)与客户端应用程序之间传递。本文将指导您如何为不同的编程语言构建跨平台的 AMQP 客户端,并提供一些具体的代码示例。

1. 引言

AMQP 协议支持多种消息传递模式,包括发布/订阅、点对点等。由于其开放性和标准化的特点,AMQP 成为了许多分布式系统中消息传递的首选方案。本指南旨在帮助开发者了解如何使用不同的编程语言创建 AMQP 客户端。

2. 工具选择

对于跨平台的 AMQP 客户端开发,我们推荐使用以下工具和技术栈:

  • RabbitMQ:一个开源的消息代理,支持 AMQP 协议。
  • Libraries
    • Python: pika
    • Java: RabbitMQ Java Client
    • Node.js: amqplib
    • C#: RabbitMQ.Client

3. 示例代码

3.1 Python (使用 Pika)

Python 中可以使用 pika 库来实现 AMQP 客户端。下面是一个简单的 Python 发送者和接收者的例子。

发送者:
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello')

channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()
接收者:
import pika

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello')

channel.basic_consume(queue='hello',
                      on_message_callback=callback,
                      auto_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
3.2 Java (使用 RabbitMQ Java Client)

Java 开发者通常会使用官方提供的 RabbitMQ Java Client 库。

发送者:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Send {
   
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
   
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
   
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String message = "Hello World!";
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + message + "'");
        }
    }
}
接收者:
import com.rabbitmq.client.*;

public class Receive {
   
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
   
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
   
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
   
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
            };
            channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
   });
        }
    }
}
3.3 Node.js (使用 amqplib)

Node.js 社区通常会选择 amqplib 这个库。

发送者:
const amqp = require('amqplib/callback_api');

amqp.connect('amqp://localhost', (err, conn) => {
   
  conn.createChannel((err, ch) => {
   
    const q = 'hello';
    ch.assertQueue(q, {
    durable: false });
    console.log(` [x] Sending 'Hello World!'`);
    ch.sendToQueue(q, Buffer.from('Hello World!'));
    setTimeout(() => {
    conn.close(); process.exit(0); }, 500);
  });
});
接收者:
const amqp = require('amqplib/callback_api');

amqp.connect('amqp://localhost', (err, conn) => {
   
  conn.createChannel((err, ch) => {
   
    const q = 'hello';
    ch.assertQueue(q, {
    durable: false });
    console.log(` [*] Waiting for messages in ${
     q}. To exit press CTRL+C`);

    ch.consume(q, (msg) => {
   
      console.log(` [x] Received '${
     msg.content.toString()}'`);
    }, {
    noAck: true });
  });
});

4. 总结

本文提供了几种常见编程语言下的 AMQP 客户端实现示例。通过这些示例,您可以快速上手并构建自己的 AMQP 应用程序。无论是进行消息的发布还是订阅,AMQP 都能提供稳定且高效的服务。

5. 参考资料

通过这些工具和示例代码,您可以开始构建您的跨平台 AMQP 客户端应用。

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
5月前
|
消息中间件 监控 物联网
MQTT的奇妙之旅:探索RabbitMQ Web MQTT插件的威力【RabbitMQ 十一】
MQTT的奇妙之旅:探索RabbitMQ Web MQTT插件的威力【RabbitMQ 十一】
195 0
|
5月前
|
网络性能优化 Windows
|
11月前
|
传感器 负载均衡 网络协议
01 MQTT简介
01 MQTT简介
67 0
|
Web App开发 JavaScript 算法
开源轻量级 IM 框架 MobileIMSDK 的Uniapp客户端库已发布
开源轻量级 IM 框架 MobileIMSDK 的Uniapp客户端库已发布
245 0
|
网络协议 Android开发 数据安全/隐私保护
HarmonyOS系统中内核实现MQTT协议开发的方法
大家好,今天主要来聊一聊,如何使用鸿蒙开始实现MQTT协议开发的方法
314 1
HarmonyOS系统中内核实现MQTT协议开发的方法
|
网络协议 安全 物联网
MQTT- 基于 mosquitto 开源 SDK 实现发布订阅 | 学习笔记
快速学习 MQTT- 基于 mosquitto 开源 SDK 实现发布订阅
600 0
MQTT- 基于 mosquitto 开源 SDK 实现发布订阅 | 学习笔记
EMQ
|
存储 缓存 网络协议
MQTT X Web:在线的 MQTT 5.0 客户端工具
MQTT X Web无需繁杂的下载安装步骤,只需在浏览器内打开页面,即可在线快速连接和测试MQTT服务与应用,了解和探索MQTT协议。
EMQ
1211 0
MQTT X Web:在线的 MQTT 5.0 客户端工具
|
Web App开发 网络协议 Java
WebSocket 开发指南
春节假期看了一下 websocket, 做了一篇笔记, 原文链接: https://oolap.com/websocket     WebSocket 由来已久, 常用于 "服务器推" 场景。最近开始学习 WebSocket (从 tomcat examples 开始), 本文的主要目的是做学习笔记, 同时记录一份开发指南。  &nbs
11465 0
|
安全 物联网 编译器
QT应用编程: 编写MQTT客户端登录OnetNet服务器完成主题订阅与发布
QT应用编程: 编写MQTT客户端登录OnetNet服务器完成主题订阅与发布
686 0
QT应用编程: 编写MQTT客户端登录OnetNet服务器完成主题订阅与发布