RabbitMQ创建生产者和消费者

简介: RabbitMQ创建生产者和消费者

1.创建项目(此处忽略)

2.引入依赖

在pom.xml文件中引入如下依赖

<dependencies>
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.6.0</version>
        </dependency>
 </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

3.编写生产者

package com.tjrac.rabbitmq.TestDemo;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.util.Map;
public class Producer {
    public static void main(String[] args) throws Exception {
        //创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //主机地址
        connectionFactory.setHost("192.168.100.110");
        //连接端口;默认为 5672
        connectionFactory.setPort(5672);
        //虚拟主机名称;默认为 /
        connectionFactory.setVirtualHost("/");
        //连接用户名;默认为guest
        connectionFactory.setUsername("admin");
        //连接密码;默认为guest
        connectionFactory.setPassword("123456");
        //创建连接
        Connection connection = connectionFactory.newConnection();
        //创建频道
        Channel channel = connection.createChannel();
        // 声明(创建)队列
        /**
         * queue      参数1:队列名称
         * durable    参数2:是否定义持久化队列,当mq重启之后,还在
         * exclusive  参数3:是否独占本次连接
         *            ① 是否独占,只能有一个消费者监听这个队列
         *            ② 当connection关闭时,是否删除队列
         * autoDelete 参数4:是否在不使用的时候自动删除队列,当没有consumer时,自动删除
         * arguments  参数5:队列其它参数
         */
        channel.queueDeclare("simple_queue", true, false, false, null);
        // 要发送的信息
        String message = "Hello RabbitMQ";
        /**
         * 参数1:交换机名称,如果没有指定则使用默认Default Exchage
         * 参数2:路由key,简单模式可以传递队列名称
         * 参数3:配置信息
         * 参数4:消息内容
         */
        channel.basicPublish("", "simple_queue", null, message.getBytes());
        System.out.println("已发送消息:" + message);
        // 关闭资源
        channel.close();
        connection.close();
    }
}

4.编写消费者

package com.tjrac.rabbitmq.TestDemo;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer {
    public static void main(String[] args) throws Exception {
        //1.创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //2. 设置参数
        factory.setHost("192.168.110.110");//ip
        factory.setPort(5672); //端口  默认值 5672
        factory.setVirtualHost("/");//虚拟机 默认值/
        factory.setUsername("admin");//用户名
        factory.setPassword("123456");//密码
        //3. 创建连接 Connection
        Connection connection = factory.newConnection();
        //4. 创建Channel
        Channel channel = connection.createChannel();
        //5. 创建队列Queue
        /*
        queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
        参数:
            1. queue:队列名称
            2. durable:是否持久化,当mq重启之后,还在
            3. exclusive:
                * 是否独占。只能有一个消费者监听这队列
                * 当Connection关闭时,是否删除队列
            4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
            5. arguments:参数。
         */
        //如果没有一个名字叫simple_queue的队列,则会创建该队列,如果有则不会创建
        channel.queueDeclare("simple_queue",true,false,false,null);
        // 接收消息
        DefaultConsumer consumer = new DefaultConsumer(channel){
            /*
               回调方法,当收到消息后,会自动执行该方法
               1. consumerTag:标识
               2. envelope:获取一些信息,交换机,路由key...
               3. properties:配置信息
               4. body:数据
            */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumerTag:"+consumerTag);
                System.out.println("Exchange:"+envelope.getExchange());
                System.out.println("RoutingKey:"+envelope.getRoutingKey());
                System.out.println("properties:"+properties);
                System.out.println("body:"+new String(body));
            }
        };
        /*
        basicConsume(String queue, boolean autoAck, Consumer callback)
        参数:
            1. queue:队列名称
            2. autoAck:是否自动确认 ,类似咱们发短信,发送成功会收到一个确认消息
            3. callback:回调对象
         */
        // 消费者类似一个监听程序,主要是用来监听消息
        channel.basicConsume("simple_queue",true,consumer);
   }
}


相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
2月前
|
消息中间件 存储 Java
RabbitMQ重复消费
RabbitMQ重复消费
31 3
|
9月前
|
消息中间件 存储 缓存
RabbitMq如何防止消息被重复消费
RabbitMq如何防止消息被重复消费
970 0
|
14天前
|
消息中间件 RocketMQ
RocketMq消费者/生产者配置
RocketMq消费者/生产者配置
|
4月前
|
消息中间件 Java
RabbitMQ中的消息确认机制是什么?为什么需要消息确认?
RabbitMQ中的消息确认机制是什么?为什么需要消息确认?
33 0
|
4月前
|
消息中间件 缓存 Java
RabbitMQ在项目中做什么用?怎么消费消息?具体怎么使用的?
RabbitMQ在项目中做什么用?怎么消费消息?具体怎么使用的?
|
6月前
|
消息中间件 缓存 网络协议
RabbitMQ消息确认
RabbitMQ消息确认
|
9月前
|
消息中间件 存储
消息队列之RabbitMQ之主题(Topics)模式
RabbitMQ是一个消息中间件,它接受并转发消息。它有6中工作模式,而主题模式是它的核心。在主题模式中,主要是通配符的添加与使用。
175 0
|
9月前
|
消息中间件 存储
RabbitMQ如何保证消息发送成功
RabbitMQ如何保证消息发送成功
93 0
RabbitMQ如何保证消息发送成功
|
10月前
|
消息中间件 应用服务中间件 nginx
【RabbitMQ六】——RabbitMQ主题模式(Topic)
【RabbitMQ六】——RabbitMQ主题模式(Topic)
165 1
|
10月前
|
消息中间件 缓存 中间件
RabbitMQ重复消费的原因
关于RabbitMQ重复消费的原因
801 1