RabbitMQ-Java版本生产与消费

简介: JAVA语言对RabbitMQ的入门操作(生产与消费)。后续将推出RabbitMQ四种ExChange具体用法
---------Maven依赖---------
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>4.1.0</version>
</dependency>


---------消息生产---------
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;  
import com.rabbitmq.client.Connection;  
import com.rabbitmq.client.ConnectionFactory;  
import com.rabbitmq.client.MessageProperties;

public class TestSend
{
    private final static String QUEUE_NAME = "testdurable";
    private final static String QUEUE_IP = " 你的服务器IP或域名";
    private final static int QUEUE_PORT = 5672;//RabbitMQ对外服务端口
    private final static String QUEUE_USER = "testuser";
    private final static String QUEUE_PWD = "123456";

    public static void main(String[] argv) throws java.io.IOException, TimeoutException
    {
        /**
         * 创建连接连接到MabbitMQ
         */
        ConnectionFactory factory = new ConnectionFactory();
        //设置MabbitMQ所在主机ip或者主机名
        factory.setHost(QUEUE_IP);
        factory.setPort(QUEUE_PORT);// MQ端口
        factory.setUsername(QUEUE_USER);// MQ用户名
        factory.setPassword(QUEUE_PWD);// MQ密码
        //创建一个连接
        Connection connection = factory.newConnection();
        //创建一个频道
        Channel channel = connection.createChannel();
        //指定一个队列
        //channel.queueDeclare(QUEUE_NAME, false, false, false, null);
         //如QUEUE_NAME是一个transient的queue,第二个参数必须是false;重启rabbit后QUEUE_NAME会被删除掉
         //如QUEUE_NAME是一个durability的queue,第二个参数必须是true;重启rabbit后QUEUE_NAME不会被删除掉
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        //发送的消息
        String message = "hello world!";
        //往队列中发出一条消息
        int j=0;
        Long start = System.currentTimeMillis();
        for(int i=j;i<j+10000;i++)
        {
         //将消息保存起来,重启rabbit后待消费的消息不会被删除
         channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, (message+i).getBytes());

         //不保存消息,重启rabbit后待消费的消息都将丢失
         //channel.basicPublish("", QUEUE_NAME, null, (message+i).getBytes());
        }
        System.out.println("发送完成:"+(System.currentTimeMillis() - start));
        //关闭频道和连接
        channel.close();
        connection.close();
     }
}

---------消息消费 ---------
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

public class TestRead
{
    private final static String QUEUE_NAME = "testdurable";
    private final static String QUEUE_IP = " 你的服务器IP或域名";
    private final static int QUEUE_PORT =  5672;//RabbitMQ对外服务端口
    private final static String QUEUE_USER = "testuser";
    private final static String QUEUE_PWD = "123456";

    public static void main(String[] argv) throws java.io.IOException,
            java.lang.InterruptedException, TimeoutException
    {
        //打开连接和创建频道,与发送端一样
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(QUEUE_IP);
        factory.setPort(QUEUE_PORT);// MQ端口
        factory.setUsername(QUEUE_USER);// MQ用户名
        factory.setPassword(QUEUE_PWD);// MQ密码
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列。
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        System.out.println("Waiting for messages. To exit press CTRL+C");
        
        //创建队列消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //指定消费队列
        channel.basicConsume(QUEUE_NAME, true, consumer);
        Long start = System.currentTimeMillis();
        while (true)
        {
            //nextDelivery是一个阻塞方法(内部实现其实是阻塞队列的take方法)
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println("Received '" + message + "'    "+(System.currentTimeMillis() - start));
        }
    }
}

相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
9天前
|
Java
java中如何确保一个集合不能被修改? - 源码解读详细--JavaPub版本
java中如何确保一个集合不能被修改? - 源码解读详细--JavaPub版本
11 2
|
1月前
|
移动开发 监控 供应链
JAVA智慧工厂制造生产管理MES系统,全套源码,多端展示(app、小程序、H5、台后管理端)
一开始接触MES系统,很多人会和博主一样,对MES细节的应用不了解,这样很正常,因为MES系统相对于其他系统来讲应用比较多!
41 1
JAVA智慧工厂制造生产管理MES系统,全套源码,多端展示(app、小程序、H5、台后管理端)
|
17天前
|
算法 Java 大数据
Java不同版本的多线程开发:进化之路
Java不同版本的多线程开发:进化之路
23 1
|
24天前
|
存储 Java
java使用pdfbox 3.0版本删除pdf文件中指定字符所在行,生成新的pdf文件
【5月更文挑战第25天】java使用pdfbox 3.0版本删除pdf文件中指定字符所在行,生成新的pdf文件
47 1
|
4天前
|
Oracle Java 关系型数据库
Java官网下载JDK21版本详细教程(下载、安装、环境变量配置)
Java官网下载JDK21版本详细教程(下载、安装、环境变量配置)
|
29天前
|
Java
JAVA生产随机生成验证码
Java代码实现随机生成指定位数的数字、大小写字母混合的验证码。使用`Random`类生成类型(0-数字,1-大写字母,2-小写字母),然后根据类型转换为相应字符。示例代码展示了一个4位验证码的生成,并附有实现结果图和过程分析。
48 2
|
1月前
|
IDE Java 编译器
Java 错误: 无效的目标发行版 5 Java: 错误: 无效的目标发行版8 Java: 错误: 不支持发行版本17Java:无效的目标发行版17
Java 错误: 无效的目标发行版 5 Java: 错误: 无效的目标发行版8 Java: 错误: 不支持发行版本17Java:无效的目标发行版17
|
1月前
|
监控 NoSQL Java
java云MES 系统源码Java+ springboot+ mysql 一款基于云计算技术的企业级生产管理系统
MES系统是生产企业对制造执行系统实施的重点在智能制造执行管理领域,而MES系统特点中的可伸缩、信息精确、开放、承接、安全等也传递出:MES在此管理领域中无可替代的“王者之尊”。MES制造执行系统特点集可伸缩性、精确性、开放性、承接性、经济性与安全性于一体,帮助企业解决生产中遇到的实际问题,降低运营成本,快速适应企业不断的制造执行管理需求,使得企业已有基础设施与一切可用资源实现高度集成,提升企业投资的有效性。
69 5
|
8天前
|
前端开发 JavaScript Java
计算Java项目|基于SpringBoot的协力服装厂服装生产管理系统的设计与实现
计算Java项目|基于SpringBoot的协力服装厂服装生产管理系统的设计与实现
|
9天前
|
Java
快速排序-Java版本
快速排序-Java版本
5 0