RabbitMQ-Java版本生产与消费

简介: JAVA语言对RabbitMQ的入门操作(生产与消费)。后续将推出RabbitMQ四种ExChange具体用法
---------Maven依赖---------
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>4.1.0</version>
</dependency>


---------消息生产---------
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;  
import com.rabbitmq.client.Connection;  
import com.rabbitmq.client.ConnectionFactory;  
import com.rabbitmq.client.MessageProperties;

public class TestSend
{
    private final static String QUEUE_NAME = "testdurable";
    private final static String QUEUE_IP = " 你的服务器IP或域名";
    private final static int QUEUE_PORT = 5672;//RabbitMQ对外服务端口
    private final static String QUEUE_USER = "testuser";
    private final static String QUEUE_PWD = "123456";

    public static void main(String[] argv) throws java.io.IOException, TimeoutException
    {
        /**
         * 创建连接连接到MabbitMQ
         */
        ConnectionFactory factory = new ConnectionFactory();
        //设置MabbitMQ所在主机ip或者主机名
        factory.setHost(QUEUE_IP);
        factory.setPort(QUEUE_PORT);// MQ端口
        factory.setUsername(QUEUE_USER);// MQ用户名
        factory.setPassword(QUEUE_PWD);// MQ密码
        //创建一个连接
        Connection connection = factory.newConnection();
        //创建一个频道
        Channel channel = connection.createChannel();
        //指定一个队列
        //channel.queueDeclare(QUEUE_NAME, false, false, false, null);
         //如QUEUE_NAME是一个transient的queue,第二个参数必须是false;重启rabbit后QUEUE_NAME会被删除掉
         //如QUEUE_NAME是一个durability的queue,第二个参数必须是true;重启rabbit后QUEUE_NAME不会被删除掉
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        //发送的消息
        String message = "hello world!";
        //往队列中发出一条消息
        int j=0;
        Long start = System.currentTimeMillis();
        for(int i=j;i<j+10000;i++)
        {
         //将消息保存起来,重启rabbit后待消费的消息不会被删除
         channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, (message+i).getBytes());

         //不保存消息,重启rabbit后待消费的消息都将丢失
         //channel.basicPublish("", QUEUE_NAME, null, (message+i).getBytes());
        }
        System.out.println("发送完成:"+(System.currentTimeMillis() - start));
        //关闭频道和连接
        channel.close();
        connection.close();
     }
}

---------消息消费 ---------
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

public class TestRead
{
    private final static String QUEUE_NAME = "testdurable";
    private final static String QUEUE_IP = " 你的服务器IP或域名";
    private final static int QUEUE_PORT =  5672;//RabbitMQ对外服务端口
    private final static String QUEUE_USER = "testuser";
    private final static String QUEUE_PWD = "123456";

    public static void main(String[] argv) throws java.io.IOException,
            java.lang.InterruptedException, TimeoutException
    {
        //打开连接和创建频道,与发送端一样
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(QUEUE_IP);
        factory.setPort(QUEUE_PORT);// MQ端口
        factory.setUsername(QUEUE_USER);// MQ用户名
        factory.setPassword(QUEUE_PWD);// MQ密码
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列。
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        System.out.println("Waiting for messages. To exit press CTRL+C");
        
        //创建队列消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //指定消费队列
        channel.basicConsume(QUEUE_NAME, true, consumer);
        Long start = System.currentTimeMillis();
        while (true)
        {
            //nextDelivery是一个阻塞方法(内部实现其实是阻塞队列的take方法)
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println("Received '" + message + "'    "+(System.currentTimeMillis() - start));
        }
    }
}

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
19天前
|
消息中间件 Linux API
centos7 安装rabbitmq自定义版本及配置
centos7 安装rabbitmq自定义版本及配置
|
25天前
|
Java 中间件 测试技术
java依赖冲突解决问题之jar包版本冲突无法通过升降级解决时如何解决
java依赖冲突解决问题之jar包版本冲突无法通过升降级解决时如何解决
|
25天前
|
Java 应用服务中间件 Windows
【应用服务 App Service】App Service 中部署Java项目,查看Tomcat配置及上传自定义版本
【应用服务 App Service】App Service 中部署Java项目,查看Tomcat配置及上传自定义版本
|
3月前
|
消息中间件 网络协议 RocketMQ
消息队列 MQ产品使用合集之broker开启proxy,启动之后producer生产消息始终都只到一个broker,该怎么办
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
12天前
|
Java API 开发工具
Java不同的版本
Java不同的版本Java不同的版本
31 4
|
12天前
|
Java API 开发工具
Java版本
Java版本
22 2
|
1月前
|
Java API Apache
JDK8到JDK24版本升级的新特性问题之在Java中,HttpURLConnection有什么局限性,如何解决
JDK8到JDK24版本升级的新特性问题之在Java中,HttpURLConnection有什么局限性,如何解决
|
1月前
|
Oracle 安全 Java
JDK8到JDK28版本升级的新特性问题之在Java 15及以后的版本中,密封类和密封接口是怎么工作的
JDK8到JDK28版本升级的新特性问题之在Java 15及以后的版本中,密封类和密封接口是怎么工作的
|
1月前
|
Java API 开发者
JDK8到JDK17版本升级的新特性问题之SpringBoot选择JDK17作为最小支持的Java lts版本意味着什么
JDK8到JDK17版本升级的新特性问题之SpringBoot选择JDK17作为最小支持的Java lts版本意味着什么
JDK8到JDK17版本升级的新特性问题之SpringBoot选择JDK17作为最小支持的Java lts版本意味着什么
|
20天前
|
消息中间件 SQL 监控
RocketMQ 5.3.0 版本中 Broker IP 配置为 IPv6 的情况
【8月更文第28天】RocketMQ 是一款分布式消息中间件,支持多种消息发布和订阅模式。在 RocketMQ 5.3.0 版本中,Broker 的配置文件 `broker.conf` 允许配置 IPv6 地址。当 Broker 的 `brokerIP1` 配置为 IPv6 地址时,会对 Broker 的启动、消息推送和状态监控等方面产生影响。本文将探讨如何在 RocketMQ 中配置 IPv6 地址,并检查 Broker 的状态。
57 0