RabbitMQ-Java版本生产与消费

简介: JAVA语言对RabbitMQ的入门操作(生产与消费)。后续将推出RabbitMQ四种ExChange具体用法
---------Maven依赖---------
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>4.1.0</version>
</dependency>


---------消息生产---------
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;  
import com.rabbitmq.client.Connection;  
import com.rabbitmq.client.ConnectionFactory;  
import com.rabbitmq.client.MessageProperties;

public class TestSend
{
    private final static String QUEUE_NAME = "testdurable";
    private final static String QUEUE_IP = " 你的服务器IP或域名";
    private final static int QUEUE_PORT = 5672;//RabbitMQ对外服务端口
    private final static String QUEUE_USER = "testuser";
    private final static String QUEUE_PWD = "123456";

    public static void main(String[] argv) throws java.io.IOException, TimeoutException
    {
        /**
         * 创建连接连接到MabbitMQ
         */
        ConnectionFactory factory = new ConnectionFactory();
        //设置MabbitMQ所在主机ip或者主机名
        factory.setHost(QUEUE_IP);
        factory.setPort(QUEUE_PORT);// MQ端口
        factory.setUsername(QUEUE_USER);// MQ用户名
        factory.setPassword(QUEUE_PWD);// MQ密码
        //创建一个连接
        Connection connection = factory.newConnection();
        //创建一个频道
        Channel channel = connection.createChannel();
        //指定一个队列
        //channel.queueDeclare(QUEUE_NAME, false, false, false, null);
         //如QUEUE_NAME是一个transient的queue,第二个参数必须是false;重启rabbit后QUEUE_NAME会被删除掉
         //如QUEUE_NAME是一个durability的queue,第二个参数必须是true;重启rabbit后QUEUE_NAME不会被删除掉
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        //发送的消息
        String message = "hello world!";
        //往队列中发出一条消息
        int j=0;
        Long start = System.currentTimeMillis();
        for(int i=j;i<j+10000;i++)
        {
         //将消息保存起来,重启rabbit后待消费的消息不会被删除
         channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, (message+i).getBytes());

         //不保存消息,重启rabbit后待消费的消息都将丢失
         //channel.basicPublish("", QUEUE_NAME, null, (message+i).getBytes());
        }
        System.out.println("发送完成:"+(System.currentTimeMillis() - start));
        //关闭频道和连接
        channel.close();
        connection.close();
     }
}

---------消息消费 ---------
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

public class TestRead
{
    private final static String QUEUE_NAME = "testdurable";
    private final static String QUEUE_IP = " 你的服务器IP或域名";
    private final static int QUEUE_PORT =  5672;//RabbitMQ对外服务端口
    private final static String QUEUE_USER = "testuser";
    private final static String QUEUE_PWD = "123456";

    public static void main(String[] argv) throws java.io.IOException,
            java.lang.InterruptedException, TimeoutException
    {
        //打开连接和创建频道,与发送端一样
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(QUEUE_IP);
        factory.setPort(QUEUE_PORT);// MQ端口
        factory.setUsername(QUEUE_USER);// MQ用户名
        factory.setPassword(QUEUE_PWD);// MQ密码
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列。
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        System.out.println("Waiting for messages. To exit press CTRL+C");
        
        //创建队列消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //指定消费队列
        channel.basicConsume(QUEUE_NAME, true, consumer);
        Long start = System.currentTimeMillis();
        while (true)
        {
            //nextDelivery是一个阻塞方法(内部实现其实是阻塞队列的take方法)
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println("Received '" + message + "'    "+(System.currentTimeMillis() - start));
        }
    }
}

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
接替此文【下篇-服务端+后台管理】优雅草蜻蜓z系统JAVA版暗影版为例-【蜻蜓z系列通用】-2025年全新项目整合搭建方式-这是独立吃透代码以后首次改变-独立PC版本vue版搭建教程-优雅草卓伊凡
接替此文【下篇-服务端+后台管理】优雅草蜻蜓z系统JAVA版暗影版为例-【蜻蜓z系列通用】-2025年全新项目整合搭建方式-这是独立吃透代码以后首次改变-独立PC版本vue版搭建教程-优雅草卓伊凡
160 96
接替此文【下篇-服务端+后台管理】优雅草蜻蜓z系统JAVA版暗影版为例-【蜻蜓z系列通用】-2025年全新项目整合搭建方式-这是独立吃透代码以后首次改变-独立PC版本vue版搭建教程-优雅草卓伊凡
|
2月前
|
Java—多线程实现生产消费者
本文介绍了多线程实现生产消费者模式的三个版本。Version1包含四个类:`Producer`(生产者)、`Consumer`(消费者)、`Resource`(公共资源)和`TestMain`(测试类)。通过`synchronized`和`wait/notify`机制控制线程同步,但存在多个生产者或消费者时可能出现多次生产和消费的问题。 Version2将`if`改为`while`,解决了多次生产和消费的问题,但仍可能因`notify()`随机唤醒线程而导致死锁。因此,引入了`notifyAll()`来唤醒所有等待线程,但这会带来性能问题。
Java—多线程实现生产消费者
centos7 安装rabbitmq自定义版本及配置
centos7 安装rabbitmq自定义版本及配置
java依赖冲突解决问题之jar包版本冲突无法通过升降级解决时如何解决
java依赖冲突解决问题之jar包版本冲突无法通过升降级解决时如何解决
【应用服务 App Service】App Service 中部署Java项目,查看Tomcat配置及上传自定义版本
【应用服务 App Service】App Service 中部署Java项目,查看Tomcat配置及上传自定义版本
如何查看已安装的 Java 版本
要查看已安装的 Java 版本,打开命令提示符或终端,输入 `java -version`,回车后即可显示当前系统中 Java 的版本信息。
1400 1
如何检查 Java 版本是否兼容
要检查Java版本是否兼容,可在命令行输入“java -version”查看当前安装的Java版本,然后对比目标应用所需的Java版本,确保其满足要求。
148 1
java: 警告: 源发行版 11 需要目标发行版 11 无效的目标发行版: 11 jdk版本不符,项目jdk版本为其他版本
如何解决Java项目中因JDK版本不匹配导致的编译错误,包括修改`pom.xml`文件、调整项目结构、设置Maven和JDK版本,以及清理缓存和重启IDEA。
113 1
java: 警告: 源发行版 11 需要目标发行版 11 无效的目标发行版: 11 jdk版本不符,项目jdk版本为其他版本
|
4月前
|
java版本学习网站又添加了一个libgdx模块
java版本学习网站之前添加了docker,想了想还是再把libgdx添加进去吧。
52 3

热门文章

最新文章