实现高效消息传递:使用RabbitMQ构建可复用的企业级消息系统

简介: RabbitMQ是一个在 AMQP(高级消息队列协议)基础上完成的,可复用的企业消息系统,是当前最主流的消息中间件之一。

前言


RabbitMQ是一个在 AMQP(高级消息队列协议)基础上完成的,可复用的企业消息系统,是当前最主流的消息中间件之一。

由erlang开发的AMQP(Advanced Message Queue 高级消息队列协议 )的开源实现,由于erlang 语言的高并发特性,性能较好,本质是个队列,FIFO 先入先出,里面存放的内容是message,下面介绍通过在ubuntu+cpolar+rabbitMQ环境下,实现mq服务端远程访问。


1.安装erlang 语言


由于rabbitMQ是erlang语言实现的,所以我们需要安装erlang

sudo apt-get install erlang-nox


2.安装rabbitMQ


安装最新版rabbitMQ

sudo apt-get install rabbitmq-server


查看rabbitMQ状态,active(running)表示在线

sudo systemctl status rabbitmq-server




设置访问MQ用户名账号和密码,admin表示账号(可自定义),123456表示密码(可自定义)

sudo rabbitmqctl add_user admin 123456




设置上面admin用户的角色,administrator表示是最高管理员

sudo rabbitmqctl set_user_tags admin administrator


设置admin角色权限

sudo rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"


以上信息设置好后,我们往下走。


3. 内网穿透


接着我们使用[cpolar](cpolar - 安全的内网穿透工具)穿透本地MQ服务,使得远程可以进行访问连接,cpolar支持http/https/tcp协议,不限制流量,操作简单,无需公网IP,也无需路由器。


cpolar官网:https://www.cpolar.com/


3.1 安装cpolar内网穿透(支持一键自动安装脚本)

cpolar 安装(国内使用)

curl -L https://www.cpolar.com/static/downloads/install-release-cpolar.sh | sudo bash


或 cpolar短链接安装方式:(国外使用)

curl -sL https://git.io/cpolar | sudo bash


查看版本号

cpolar version


token认证

登录cpolar官网后台,点击左侧的验证,查看自己的认证token,之后将token贴在命令行里


cpolar authtoken xxxxxxx

向系统添加服务

sudo systemctl enable cpolar


启动cpolar服务

sudo systemctl start cpolar


正常显示为active则表示服务为正常在线启动状态


3.2 创建HTTP隧道


在ubuntu系统本地安装cpolar内网穿透之后,在ubuntu浏览器上访问本地9200端口,打开cpolar web ui界面:http://127.0.0.1:9200。


点击左侧仪表盘的隧道管理——创建隧道,由于rabbitMQ中默认的是5672端口,因此我们要来创建一条http隧道,指向5672端口:


隧道名称:可自定义,注意不要重复

协议:tcp

本地地址:5672

域名类型:选择随机域名

地区:选择China VIP

点击创建



打开在线隧道列表,查看随机公网tcp地址,使用下面随机的tcp公网地址,即可远程连接MQ



4. 公网远程连接


maven坐标


<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.10.0</version>
  </dependency>


这里使用java 测试使用上面公网地址进行连接,编写发布者


 

ConnectionFactory factory = new ConnectionFactory();
        //cpolar公网地址
        factory.setHost("1.tcp.cpolar.cn");
        //公网地址对于的端口号
        factory.setPort(24889);
        //用户名和密码
        factory.setUsername("admin");
        factory.setPassword("123456");
        Connection connection = null;
        Channel channel = null;
        try {
            // 1.创建连接和通道
            connection = factory.newConnection();
            channel = connection.createChannel();
            // 2.为通道声明exchange和exchange的类型
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
            String msg = " hello world";
            // 3.发送消息到指定的exchange,队列指定为空,由exchange根据情况判断需要发送到哪些队列
            channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());
            System.out.println("product send a msg: " + msg);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        } finally {
            // 4.关闭连接
            if (channel != null) {
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }

 

编写消费者      

ConnectionFactory factory = new ConnectionFactory();
        //cpolar公网地址
        factory.setHost("1.tcp.cpolar.cn");
        //公网地址对于的端口号
        factory.setPort(24889);
        //用户名和密码
        factory.setUsername("admin");
        factory.setPassword("123456");
        Connection connection = null;
        Channel channel = null;
        try {
            // 1.创建连接和通道
            connection = factory.newConnection();
            channel = connection.createChannel();
            // 2.为通道声明exchange以及exchange类型
            channel.exchangeDeclare("exchange", BuiltinExchangeType.FANOUT);
            // 3.创建随机名字的队列
            String queueName = channel.queueDeclare().getQueue();
            // 4.建立exchange和队列的绑定关系
            channel.queueBind(queueName, "exchange", "");
            System.out.println(" **** Consumer1 keep alive ,waiting for messages, and then deal them");
            // 5.通过回调生成消费者并进行监听
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body) throws IOException {
                    // 获取消息内容然后处理
                    String msg = new String(body, "UTF-8");
                    System.out.println("*********** Consumer1" + " get message :[" + msg + "]");
                }
            };
            // 6.消费消息
            channel.basicConsume(queueName, true, consumer);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }


先启动消费者,然后启动发布者,然后消费者控制台输出消费者发送的消息表示成功.我们实现了远程访问MQ。



5.固定公网TCP地址


由于以上创建的隧道使用的是随机地址隧道,地址会在24小时内变化,为了使连接更加稳定,我们还需要固定tcp地址。


5.1 保留一个固定的公网TCP端口地址


登录cpolar官网后台,点击左侧的预留,选择保留的TCP地址。


地区:选择China VIP

描述:即备注,可自定义填写

点击保留



地址保留成功后,系统会生成相应的固定公网地址,将其复制下来



5.2 配置固定公网TCP端口地址


在浏览器上访问9200端口,登录cpolar web ui管理界面,点击左侧仪表盘的隧道管理——隧道列表,找到上面创建的隧道,点击右侧的编辑



修改隧道信息,将保留成功的固定tcp地址配置到隧道中


端口类型:修改为固定tcp端口

预留的tcp地址:填写保留成功的地址

点击更新



隧道更新成功后,点击左侧仪表盘的状态在线隧道列表,找到需要编辑的隧道,可以看到公网地址已经更新成为了固定TCP地址。



更新好后,我们修改代码中的两个参数

//cpolar公网地址,改为我们固定的地址
        factory.setHost("5.tcp.vip.cpolar.cn");
        //固定地址对应的端口号
        factory.setPort(13630);


然后我们重新启动消费者,再启动生产者,正常发布和消费消息表示成功



转载自cpolar内网穿透的文章:无公网IP,在外公网远程访问RabbitMQ服务「内网穿透」


相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
6月前
|
消息中间件 Java Kafka
消息传递新纪元:探索RabbitMQ、RocketMQ和Kafka的魅力所在
【8月更文挑战第29天】这段内容介绍了在分布式系统中起到异步通信与解耦作用的消息队列,并详细探讨了三种流行的消息队列产品:RabbitMQ、RocketMQ 和 Kafka。其中,RabbitMQ 是一个基于 AMQP 协议的开源消息队列系统,支持多种消息模型;RocketMQ 则是由阿里巴巴开源的具备高性能、高可用性和高可靠性的分布式消息队列,支持事务消息等多种特性;而 Kafka 作为一个由 LinkedIn 开源的分布式流处理平台,以高吞吐量和良好的可扩展性著称。此外,还提供了使用这三种消息队列发送和接收消息的代码示例。总之,这三种消息队列各有优势,适用于不同的业务场景。
90 3
|
6月前
|
消息中间件 监控 数据挖掘
基于RabbitMQ与Apache Flink构建实时分析系统
【8月更文第28天】本文将介绍如何利用RabbitMQ作为数据源,结合Apache Flink进行实时数据分析。我们将构建一个简单的实时分析系统,该系统能够接收来自不同来源的数据,对数据进行实时处理,并将结果输出到另一个队列或存储系统中。
381 2
|
6月前
|
消息中间件 安全 Java
构建基于RabbitMQ的安全消息传输管道
【8月更文第28天】在分布式系统中,消息队列如RabbitMQ为应用间的数据交换提供了可靠的支持。然而,随着数据的敏感性增加,确保这些消息的安全传输变得至关重要。本文将探讨如何在RabbitMQ中实施一系列安全措施,包括加密通信、认证和授权机制,以保护敏感信息。
154 1
|
6月前
|
消息中间件 存储 负载均衡
"RabbitMQ集群大揭秘!让你的消息传递系统秒变超级英雄,轻松应对亿级并发挑战!"
【8月更文挑战第24天】RabbitMQ是一款基于AMQP的开源消息中间件,以其高可靠性、扩展性和易用性闻名。面对高并发和大数据挑战时,可通过构建集群提升性能。本文深入探讨RabbitMQ集群配置、工作原理,并提供示例代码。集群由多个通过网络连接的节点组成,共享消息队列,确保高可用性和负载均衡。搭建集群需准备多台服务器,安装Erlang和RabbitMQ,并确保节点间通信顺畅。核心步骤包括配置.erlang.cookie文件、使用rabbitmqctl命令加入集群。消息发布至任一节点时,通过集群机制同步至其他节点;消费者可从任一节点获取消息。
74 2
|
6月前
|
存储 C# 关系型数据库
“云端融合:WPF应用无缝对接Azure与AWS——从Blob存储到RDS数据库,全面解析跨平台云服务集成的最佳实践”
【8月更文挑战第31天】本文探讨了如何将Windows Presentation Foundation(WPF)应用与Microsoft Azure和Amazon Web Services(AWS)两大主流云平台无缝集成。通过具体示例代码展示了如何利用Azure Blob Storage存储非结构化数据、Azure Cosmos DB进行分布式数据库操作;同时介绍了如何借助Amazon S3实现大规模数据存储及通过Amazon RDS简化数据库管理。这不仅提升了WPF应用的可扩展性和可用性,还降低了基础设施成本。
135 0
|
6月前
|
消息中间件 Java RocketMQ
微服务架构师的福音:深度解析Spring Cloud RocketMQ,打造高可靠消息驱动系统的不二之选!
【8月更文挑战第29天】Spring Cloud RocketMQ结合了Spring Cloud生态与RocketMQ消息中间件的优势,简化了RocketMQ在微服务中的集成,使开发者能更专注业务逻辑。通过配置依赖和连接信息,可轻松搭建消息生产和消费流程,支持消息过滤、转换及分布式事务等功能,确保微服务间解耦的同时,提升了系统的稳定性和效率。掌握其应用,有助于构建复杂分布式系统。
95 0
|
7月前
|
消息中间件 存储 缓存
架构设计篇问题之消息队列(MQ)在微服务系统中问题如何解决
架构设计篇问题之消息队列(MQ)在微服务系统中问题如何解决
|
8月前
|
消息中间件 监控 调度
构建Python中的分布式系统结合Celery与RabbitMQ
在当今的软件开发中,构建高效的分布式系统是至关重要的。Python作为一种流行的编程语言,提供了许多工具和库来帮助开发人员构建分布式系统。其中,Celery和RabbitMQ是两个强大的工具,它们结合在一起可以为你的Python应用程序提供可靠的异步任务队列和消息传递机制。
|
7月前
|
消息中间件 存储 Java
使用RabbitMQ实现可靠的消息传递机制
使用RabbitMQ实现可靠的消息传递机制
|
8月前
|
消息中间件 Serverless Windows
消息队列 MQ产品使用合集之MQTT协议是否可以应用于社交软件的系统通知场景
阿里云消息队列MQ(Message Queue)是一种高可用、高性能的消息中间件服务,它允许您在分布式应用的不同组件之间异步传递消息,从而实现系统解耦、流量削峰填谷以及提高系统的可扩展性和灵活性。以下是使用阿里云消息队列MQ产品的关键点和最佳实践合集。