消息中间件介绍&RabitMQ环境搭建(Linux)(下)

本文涉及的产品
注册配置 MSE Nacos/ZooKeeper,118元/月
Serverless 应用引擎免费试用套餐包,4320000 CU,有效期3个月
性能测试 PTS,5000VUM额度
简介: 消息中间件介绍&RabitMQ环境搭建(Linux)

正文


五、RabitMQ消息类型


222.png


第一种:点对点 生产者将消息发送到队列,然后消费者从队列中取消息依次消费,消费之后,消息出队列,本次消费结束


第二种:工作队列。又称任务队列。主要思想就是避免执行资源密集型任务时,必须等待它执行完成。我们将任务封装为消息并将其发送到队列。 在后台运行的工作进程将获取任务并最终执行作业。消息在多个消费者共享,但是一个消息只能被一个消费者消费。

总之:让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消

费,就会消失,因此任务是不会被重复执行的 。


第三种:发布订阅、Routing(路由键)、Topics(主题)


1、1个生产者,多个消费者


2、每一个消费者都有自己的一个队列


3、生产者没有将消息直接发送到队列,而是发送到了交换机


4、每个队列都要绑定到交换机


5、生产者发送的消息,经过交换机到达队列,实现一个消息被多个消费者获取的目的。


X(Exchanges):交换机一方面:接收生产者发送的消息。另一方面:知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型direct(直连交换机,把消息交给符合指定routing key 的队列)、topic(通配符,把消息交给符合routing pattern(路由模式) 的队列)、headers 和fanout(扇形交换机或者广播,将消息交给所有绑定到交换机的队列)。Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失。


(1)订阅模型-Fanout


Fanout,也称为广播。

在广播模式下,消息发送流程是这样的:


1) 可以有多个消费者

2) 每个消费者有自己的queue(队列)

3) 每个队列都要绑定到Exchange(交换机)

4) 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。

5) 交换机把消息发送给绑定过的所有队列

6) 队列的消费者都能拿到消息。实现一条消息被多个消费者消费


(2)订阅模型-Direct:


555.jpg


在Direct模型下,队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key),消息的发送方在向Exchange发送消息时,也必须指定消息的routing key。


P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。


X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列


C1:消费者,其所在队列指定了需要routing key 为 error 的消息


C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息


(3)订阅模型-Topic


555.jpg


Topic 类型的 Exchange 与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 Topic 类型 Exchange 可以让队列在绑定 Routing key 的时候使用通配符


通配符规则:#:匹配一个或多个词


*:匹配不多不少恰好 1 个词


六、代码


连接工具

package com.xiaojie.rabbitmq;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 * @author xiaojie
 * @version 1.0
 * @description: 创建mq连接
 * @date 2021/9/24 22:54
 */
public class MyConnection {
    public static Connection getConnection() throws  IOException, TimeoutException {
        // 1.创建连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2.设置连接地址
        connectionFactory.setHost("192.168.139.154");
        // 3.设置端口号
        connectionFactory.setPort(5672);
        // 4.设置账号和密码
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        // 5.设置VirtualHost
        connectionFactory.setVirtualHost("/xiaojie");
        return connectionFactory.newConnection();
    }
}


点对点和工作队列模式


package com.xiaojie.rabbitmq.p2p;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.xiaojie.rabbitmq.MyConnection;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
/**
 * @author xiaojie
 * @version 1.0
 * @description: 点对点生产者
 * 生产者生产消息时如果没有对应的队列,则直接遗弃消息,并不会报错。
 * @date 2021/9/24 22:53
 */
public class PProvider {
    //定义队列
    private static final String QUEUE_NAME = "myqueue";
    public static void main(String[] args) throws IOException, TimeoutException {
        System.out.println("生产者启动成功..");
        // 1.创建连接
        Connection connection = MyConnection.getConnection();
        // 2.创建通道
        Channel channel = connection.createChannel();
        //创建队列,如果队列存在则使用这个队列,不存在则创建
        //第一个参数,对列名称  myqueue
        //第二个参数,是否持久话,false表示不持久化数据,MQ停掉后数据就会丢失
        //第三个参数,是否队列私有化,false表示所有的消费者都可以访问,true表示只有第一次拥有它的消费者才可以一直使用,其他消费者不能访问
        //第四个参数,是否自动删除,false连接停掉后不自动删除掉这个队列
        //第五个参数,其他额外的参数
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        for (int i = 0; i < 10; i++) {
            String msg = "测试点对点消息" + i;
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes(StandardCharsets.UTF_8));
            System.out.println("生产者发送消息成功:" + msg);
        }
        channel.close();
        connection.close();
    }
}


package com.xiaojie.rabbitmq.p2p;
import com.rabbitmq.client.*;
import com.xiaojie.rabbitmq.MyConnection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 * @author xiaojie
 * @version 1.0
 * @description: TODO
 * @date 2021/9/24 23:15
 */
public class PConsumer {
    private static final String QUEUE_NAME = "myqueue";
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.创建我们的连接
        Connection connection = MyConnection.getConnection();
        // 2.创建我们通道
        Channel channel = connection.createChannel();
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "UTF-8");
                System.out.println("消费消息msg:" + msg);
            }
        };
        // 3.创建我们的监听的消息
        channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
    }
}


package com.xiaojie.rabbitmq.p2p;
import com.rabbitmq.client.*;
import com.xiaojie.rabbitmq.MyConnection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 * @author xiaojie
 * @version 1.0
 * @description: 模拟工作队列,即多个消费者消费消息,
 * 结果是消息均等消费,就是在工作队列模式下,默认情况下消息是均摊到每个消费者的。
 * @date 2021/9/24 23:15
 */
public class PConsumer2 {
    private static final String QUEUE_NAME = "myqueue";
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.创建我们的连接
        Connection connection = MyConnection.getConnection();
        // 2.创建我们通道
        Channel channel = connection.createChannel();
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String msg = new String(body, "UTF-8");
                System.out.println("消费消息msg:" + msg);
            }
        };
        // 3.创建我们的监听的消息
        channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
    }
}


Fanout模式


package com.xiaojie.rabbitmq.fanout;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.xiaojie.rabbitmq.MyConnection;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
/**
 * @author xiaojie
 * @version 1.0
 * @description: fanout模式生产者
 * @date 2021/9/24 23:45
 */
public class Provider {
    public static final  String EXCHANGE="my_fanout_exchange";
    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接
        Connection connection = MyConnection.getConnection();
        //创建通道
        Channel channel = connection.createChannel();
        //生产者绑定交换机 参数1 交换机的名称。2,交换机的类型(扇形交换机)
        channel.exchangeDeclare(EXCHANGE, "fanout");
        //创建消息
        String msg="fanout交换机消息。。。。";
        //发送消息
        channel.basicPublish(EXCHANGE, "", null, msg.getBytes(StandardCharsets.UTF_8));
        System.out.println("消息发送成功。。。。。。");
        //关闭通道,关闭连接
        channel.close();
        connection.close();
    }
}


package com.xiaojie.rabbitmq.fanout;
import com.rabbitmq.client.*;
import com.xiaojie.rabbitmq.MyConnection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 * @author xiaojie
 * @version 1.0
 * @description: 短信消费者
 * 扇形交换机是通过同一个交换机,将消息处理到不同的队列,不同的队列对应不同的消费者
 * @date 2021/9/24 23:47
 */
public class SmsConsumer {
    //交换机
    private static final  String EXCHANGE="my_fanout_exchange";
    //队列
    private  static final String SMS_QUEUE="sms_queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = MyConnection.getConnection();
        Channel channel = connection.createChannel();
        //创建队列
        channel.queueDeclare(SMS_QUEUE, false, false, false, null);
        //绑定队列到交换机
        channel.queueBind(SMS_QUEUE, EXCHANGE, "");
        System.out.println("短信消费者开启。。。。");
        //开启生产者监听
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String msg= new String(body,"utf-8");
                System.out.println("接收到的短信消息时msg:"+msg);
            }
        };
        //设置应答模式
        channel.basicConsume(SMS_QUEUE,true, consumer);
    }
}


package com.xiaojie.rabbitmq.fanout;
import com.rabbitmq.client.*;
import com.xiaojie.rabbitmq.MyConnection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 * @author xiaojie
 * @version 1.0
 * @description: 微信消费者
 * 扇形交换机是通过同一个交换机,将消息处理到不同的队列,不同的队列对应不同的消费者
 * @date 2021/9/24 23:47
 */
public class WxConsumer {
    //交换机
    private static final  String EXCHANGE="my_fanout_exchange";
    //队列
    private  static final String WX_QUEUE="wx_queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = MyConnection.getConnection();
        Channel channel = connection.createChannel();
        //创建队列
        channel.queueDeclare(WX_QUEUE, false, false, false, null);
        //绑定队列到交换机
        channel.queueBind(WX_QUEUE, EXCHANGE, "");
        System.out.println("微信消费者开启。。。。");
        //开启生产者监听
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String msg= new String(body,"utf-8");
                System.out.println("接收到的短信消息时msg:"+msg);
            }
        };
        //设置应答模式
        channel.basicConsume(WX_QUEUE,true, consumer);
    }
}


Direct模式


package com.xiaojie.rabbitmq.direct;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.xiaojie.rabbitmq.MyConnection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 * @author xiaojie
 * @version 1.0
 * @description: direct生产者
 * @date 2021/9/24 23:39
 */
public class Provider {
    public static final  String EXCHANGE="my_direct_exchange";
    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接
        Connection connection = MyConnection.getConnection();
        //创建通道
        Channel channel = connection.createChannel();
        //生产者绑定交换机 参数1 交换机的名称。2,交换机的类型
        channel.exchangeDeclare(EXCHANGE, "direct");
        //路由键
        String routingKey="email";
        //创建消息
        String msg="direct---交换机的消息。。。。。";
        //发送消息
        channel.basicPublish(EXCHANGE, routingKey, null, msg.getBytes());
        System.out.println("生产者启动成功。。。。。");
        //关闭连接,管道
        channel.close();
        connection.close();
    }
}


package com.xiaojie.rabbitmq.direct;
import com.rabbitmq.client.*;
import com.xiaojie.rabbitmq.MyConnection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 * @author xiaojie
 * @version 1.0
 * @description: 路由键交换机
 * @date 2021/9/25 0:01
 */
public class Consumer {
    public static String EMAIL_QUEUE_FANOUT="email_queue";
    public static final  String EXCHANGE="my_direct_exchange";
    public static void main(String[] args) throws IOException, TimeoutException {
        //创建mq连接
        Connection connection = MyConnection.getConnection();
        //创建通道
        Channel channel = connection.createChannel();
        //创建队列
        channel.queueDeclare(EMAIL_QUEUE_FANOUT, false, false, false, null);
        //消费者队列绑定交换机 参数分别是 队列、交换机、routingkey
        channel.queueBind(EMAIL_QUEUE_FANOUT, EXCHANGE, "email");
        System.out.println("邮件消费者开启。。。。");
        //开启生产者监听
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String msg= new String(body,"utf-8");
                System.out.println("接收到的消息时msg:"+msg);
            }
        };
        //设置应答模式
        channel.basicConsume(EMAIL_QUEUE_FANOUT,true, consumer);
    }
}


Topic模式


package com.xiaojie.rabbitmq.topic;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.xiaojie.rabbitmq.MyConnection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 * @author xiaojie
 * @version 1.0
 * @description: topic生产者 通配符模式
 * @date 2021/9/24 23:39
 */
public class Provider {
    public static final  String EXCHANGE="my_topic_exchange";
    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接
        Connection connection = MyConnection.getConnection();
        //创建通道
        Channel channel = connection.createChannel();
        //生产者绑定交换机 参数1 交换机的名称。2,交换机的类型
        channel.exchangeDeclare(EXCHANGE, "topic");
        //路由键
        String routingKey="email.send";
        //创建消息
        String msg="topic---交换机的消息。。。。。";
        //发送消息
        channel.basicPublish(EXCHANGE, routingKey, null, msg.getBytes());
        System.out.println("生产者启动成功。。。。。");
        //关闭连接,管道
        channel.close();
        connection.close();
    }
}


package com.xiaojie.rabbitmq.topic;
import com.rabbitmq.client.*;
import com.xiaojie.rabbitmq.MyConnection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 * @author xiaojie
 * @version 1.0
 * @description: 路由键交换机
 * 通配符 #:匹配一个或多个词;*:匹配不多不少恰好 1 个词
 * @date 2021/9/25 0:01
 */
public class Consumer {
    public static String EMAIL_QUEUE_FANOUT="email_queue";
    public static final  String EXCHANGE="my_topic_exchange";
    public static void main(String[] args) throws IOException, TimeoutException {
        //创建mq连接
        Connection connection = MyConnection.getConnection();
        //创建通道
        Channel channel = connection.createChannel();
        //创建队列
        channel.queueDeclare(EMAIL_QUEUE_FANOUT, false, false, false, null);
        //消费者队列绑定交换机 参数分别是 队列、交换机、routingkey
        //通配符 #:匹配一个或多个词;*:匹配不多不少恰好 1 个词
        channel.queueBind(EMAIL_QUEUE_FANOUT, EXCHANGE, "email.#");
        System.out.println("邮件消费者开启。。。。");
        //开启生产者监听
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String msg= new String(body,"utf-8");
                System.out.println("接收到的消息时msg:"+msg);
            }
        };
        //设置应答模式
        channel.basicConsume(EMAIL_QUEUE_FANOUT,true, consumer);
    }
}


完整代码:spring-boot: Springboot整合redis、消息中间件等相关代码


参考:docker部署RabbitMQ集群 - Alan6 - 博客园


RabbitMQ之五种消息模型 - 早上起床喝酸_奶 - 博客园

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
4月前
|
存储 安全 Linux
离线Linux服务器环境搭建
【9月更文挑战第3天】在离线环境下搭建Linux服务器需按以下步骤进行:首先确定服务器用途及需求,准备安装介质与所需软件包;接着安装Linux系统并配置网络;然后设置系统基础参数,如主机名与时区;安装必要软件并配置服务;最后进行安全设置,包括关闭非必要服务、配置防火墙、强化用户认证及定期备份数据。整个过程需确保软件包的完整性和兼容性。
|
5月前
|
消息中间件 Java 中间件
在Linux中,什么是中间件?什么是jdk?
在Linux中,什么是中间件?什么是jdk?
|
6月前
|
消息中间件 Java Kafka
kafka Linux环境搭建安装及命令创建队列生产消费消息
kafka Linux环境搭建安装及命令创建队列生产消费消息
124 4
|
7月前
|
Linux 测试技术 开发者
【Docker项目实战】使用Docker部署instantbox临时Linux系统环境搭建工具
【6月更文挑战第13天】使用Docker部署instantbox临时Linux系统环境搭建工具
84 3
|
7月前
|
Linux Windows 虚拟化
【Linux环境搭建实战手册】:打造高效开发空间的秘籍
【Linux环境搭建实战手册】:打造高效开发空间的秘籍
|
6月前
|
负载均衡 Java Linux
黑马头条01,环境搭建,今日头条的介绍,今日头条的功能架构图,技术栈的说明,服务层,nacos(奶靠丝)安装,安装在Linux服务器上环境准备,
黑马头条01,环境搭建,今日头条的介绍,今日头条的功能架构图,技术栈的说明,服务层,nacos(奶靠丝)安装,安装在Linux服务器上环境准备,
|
6月前
|
监控 网络协议 物联网
一款轻量级的通信协议---MQTT (内含Linux环境搭建)
**MQTT协议摘要** MQTT是一种轻量级的发布/订阅型网络协议,适用于低带宽、高延迟或不可靠的网络环境,尤其适合物联网(IoT)设备。其主要特点包括: 1. **发布/订阅模型**:设备通过主题进行通信,发布者无需知道订阅者,订阅者也不需知道消息来源。 2. **轻量级**:协议头部小,减少网络负载,适合资源受限的设备。 3. **断线重连**:支持客户端在失去连接后重新连接,保持通信。 4. **服务质量级别(QoS)**:提供0(最多一次)、1(至少一次)和2(恰好一次)三种级别,保证消息传递的可靠性。
112 0
|
8月前
|
消息中间件 运维 Linux
运维最全Linux 命令大全之scp命令_linux scp 指令(1),2024年最新从消息中间件看分布式系统的多种套路
运维最全Linux 命令大全之scp命令_linux scp 指令(1),2024年最新从消息中间件看分布式系统的多种套路
|
8月前
|
存储 Linux 开发工具
Linux 基础(从环境搭建到基础命令)
Linux 基础(从环境搭建到基础命令)
|
8月前
|
运维 Linux Shell
day02-Linux运维-系统介绍与环境搭建_硬件 系统核心 解释器shell 外围操作系统
day02-Linux运维-系统介绍与环境搭建_硬件 系统核心 解释器shell 外围操作系统