RabbitMq七种工作模式,java实战案例分享,别再说你不会!

简介: 最简单的一个消费者和一个生产者模式,生产者生成消息,消费者监听消息,若是消费者监听到它所需要的消息,就会消费该消息,这种消息是次性的,被消费了就没有了。

一、Maven依赖添加

 <!-- rabbitmq相关依赖 -->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>3.0.4</version>
        </dependency>

二、七种工作模式的java实例

1、简单模式

最简单的一个消费者和一个生产者模式,生产者生成消息,消费者监听消息,若是消费者监听到它所需要的消息,就会消费该消息,这种消息是次性的,被消费了就没有了。

1.1.1、EasyRecv.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

public class EasyRecv {
    //队列名称
    private final static String QUEUE_NAME = "hello world";
    public static void main(String[] argv) throws java.io.IOException,java.lang.InterruptedException {
        //打开连接和创建频道,与发送端一样
        ConnectionFactory factory = new ConnectionFactory();
        //设置RabbitMQ所在主机ip或者主机名
        factory.setHost("127.0.0.1");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列。
               /**
         * 队列名
         * 是否持久化
         *  是否排外  即只允许该channel访问该队列   一般等于true的话用于一个队列只能有一个消费者来消费的场景
         *  是否自动删除  消费完删除
         *  其他属性
         *
         */
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println("Waiting for messages. To exit press CTRL+C");

        //创建队列消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //指定消费队列
         /**
         * 队列名
         * 其他属性  路由
         * 消息body
         */
        channel.basicConsume(QUEUE_NAME, true, consumer);
        while (true)
        {
            //nextDelivery是一个阻塞方法(内部实现其实是阻塞队列的take方法)
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println("Received '" + message + "'");
        }

    }
}

1.1.2、EasySend.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.util.Scanner;

public class EasySend {

    //队列名称
    private final static String QUEUE_NAME = "hello world";

    public static void main(String[] argv) throws java.io.IOException
    {
        /**
         * 创建连接连接到MabbitMQ
         */
        ConnectionFactory factory = new ConnectionFactory();
        //设置MabbitMQ所在主机ip或者主机名
        factory.setHost("127.0.0.1");


        while (true){
            //创建一个连接
            Connection connection = factory.newConnection();
            //创建一个频道
            Channel channel = connection.createChannel();
            //指定一个队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            //发送的消息
            Scanner scanner = new Scanner(System.in);
            String ms = scanner.nextLine();
            //String message = "hello world!";
            //往队列中发出一条消息
            channel.basicPublish("", QUEUE_NAME, null, ms.getBytes());
            System.out.println("Sent '" + ms + "'");
            //关闭频道和连接
            channel.close();
            connection.close();
        }
    }

以上两个已经可以进行通信了,下面同样是简单的实例,但是我们可以看到在代码层面上,连接的代码都是一样的,所以我们可以创建一个连接的工具类。

1.2.1、RabbitmqConnectionUtil .java

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;


public  class RabbitmqConnectionUtil {

    public static Connection getConnection() throws IOException {
        //连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        //连接5672端口  注意15672为工具界面端口  25672为集群端口
        factory.setPort(5672);
        //factory.setVirtualHost("/xxxxx");
       // factory.setUsername("xxxxxx");
       // factory.setPassword("123456");
        //获取连接
        Connection connection = factory.newConnection();
        return connection;
    }
}

1.2.2、UtilSend.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import top.san.RabbitMq.util.RabbitmqConnectionUtil;

import java.io.IOException;

public class UtilSend {
    private final static String QUEUE_NAME = "UtilConn";
    public static void main(String[] args) throws IOException {

        Connection connection = RabbitmqConnectionUtil.getConnection();
        //创建通道
        Channel channel = connection.createChannel();
        //声明队列
 
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        //消息内容
  
        String message = "这里是lbw广场";
        channel.basicPublish("", QUEUE_NAME,null,message.getBytes());
        System.out.println("[x]Sent '"+message + "'");
        //最后关闭通关和连接
        channel.close();
        connection.close();
    }
}

1.2.3、UtilRecv.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import top.san.RabbitMq.util.RabbitmqConnectionUtil;

import java.io.IOException;

public class UtilRecv {
    private final static String QUEUE_NAME = "UtilConn";
    public static void main(String[] args) throws IOException, InterruptedException {

        Connection connection = null;
        connection = RabbitmqConnectionUtil.getConnection();
        //创建通道
        Channel channel = connection.createChannel();
        //声明队列
       
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);

        channel.basicConsume(QUEUE_NAME,true,queueingConsumer);

        while(true){
            //该方法会阻塞
            QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println("[x] Received '"+message+"'");
        }

    }
}

2、工作队列

工作队列也就是简单模式的强化版,一个队列是可以多个生产者,也可以有多个消费者来竞争消费消息,但是我们仍需保证队列的幂等性,队列存在就不能再创建同名队列,关注公众号:麒麟改bug,还可以获取Java核心知识点思维导图以及Java核心学习笔记。

下面的每个进程都控制其主线程休眠,让我们可以更好的看到结果。

2.1.1、Sender1.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import top.san.RabbitMq.util.RabbitmqConnectionUtil;

import java.io.IOException;

public class Sender1 {
    private final  static String QUEUE_NAME = "queue_work";

    public static void main(String[] args) throws IOException, InterruptedException {
        Connection connection = RabbitmqConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        for(int i = 0; i < 100; i++){
            String message = "lbw" + i;
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println("[x] Sent '"+message + "'");
            Thread.sleep(i*10);
        }

        channel.close();
        connection.close();
    }
}

2.1.2、Sender2.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import top.san.RabbitMq.util.RabbitmqConnectionUtil;

import java.io.IOException;

public class Sender2 {
    private final  static String QUEUE_NAME = "queue_work";

    public static void main(String[] args) throws IOException, InterruptedException {
        Connection connection = RabbitmqConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        for(int i = 0; i < 100; i++){
            String message = "nb" + i;
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println("[x] Sent '"+message + "'");
            Thread.sleep(i*10);
        }
        channel.close();
        connection.close();
    }
}

2.1.3、Receiver1.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;

import top.san.RabbitMq.util.RabbitmqConnectionUtil;

import java.io.IOException;

/**
 * Created by san
 */
public class Receiver1 {
    private final static  String QUEUE_NAME = "queue_work";

    public static void main(String[] args) throws IOException, InterruptedException {
        Connection connection = RabbitmqConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false,false, false,null);
        //同一时刻服务器只会发送一条消息给消费者
        channel.basicQos(1);

        QueueingConsumer consumer = new QueueingConsumer(channel);
        //关于手工确认 待之后有时间研究下
        channel.basicConsume(QUEUE_NAME, false, consumer);

        while(true){
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println("[x] Received1 '"+message+"'");
            Thread.sleep(10);
            //返回确认状态
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }

    }
}

2.1.4、Receiver2.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;

import top.san.RabbitMq.util.RabbitmqConnectionUtil;

import java.io.IOException;

/**
 * Created by san
 */
public class Receiver2 {
    private final static  String QUEUE_NAME = "queue_work";

    public static void main(String[] args) throws IOException, InterruptedException {

        Connection connection = RabbitmqConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false,false, false,null);
        //同一时刻服务器只会发送一条消息给消费者
        channel.basicQos(1);

        QueueingConsumer consumer = new QueueingConsumer(channel);
        
        
        channel.basicConsume(QUEUE_NAME, false, consumer);

        while(true){
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println("[x] Received2 '"+message+"'");
            Thread.sleep(1000);
            //返回确认状态
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }

    }
}

2.1.5、结果

上面的四个程序都运行起来,结果可以看到如下,依据结果分析,可知,同一个消息队列,是可以有多个生产者和消费者的。

3、发布/订阅(fanout)

3.1.1、Sender.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import top.san.RabbitMq.util.RabbitmqConnectionUtil;

public class Sender {
    private final static String EXCHANGE_NAME = "test_exchange_fanout";

    public static void main(String[] args)
    {
        try
        {
            //获取连接
            Connection connection = RabbitmqConnectionUtil.getConnection();
            //从连接中获取一个通道
            Channel channel = connection.createChannel();
            //声明交换机(分发:发布/订阅模式)
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
            //发送消息
            for (int i = 0; i < 5; i++)
            {
                String message = "卢本伟广场" + i;
                System.out.println("[send]:" + message);
                //发送消息
                channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("utf-8"));
                Thread.sleep(5 * i);
            }
            channel.close();
            connection.close();
        }
        catch (Exception e)
        {
            e.printStackTrace();
        }
    }

}

3.1.2、Receiver1.java

import com.rabbitmq.client.*;
import top.san.RabbitMq.util.RabbitmqConnectionUtil;

import java.io.IOException;

public class Receiver1 {
    //交换机名称
    private final static String EXCHANGE_NAME = "test_exchange_fanout";

    //队列名称
    private static final String QUEUE_NAME    = "test_queue_email";

    public static void main(String[] args)
    {
        try
        {
            //获取连接
            Connection connection = RabbitmqConnectionUtil.getConnection();
            //从连接中获取一个通道
            final Channel channel = connection.createChannel();
            //声明交换机(分发:发布/订阅模式)
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
            //声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            //将队列绑定到交换机
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
            //保证一次只分发一个
            int prefetchCount = 1;
            channel.basicQos(prefetchCount);
            //定义消费者
            DefaultConsumer consumer = new DefaultConsumer(channel)
            {
                //当消息到达时执行回调方法
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                           byte[] body) throws IOException
                {
                    String message = new String(body, "utf-8");
                    System.out.println("[email] Receive message:" + message);
                    try
                    {
                        //消费者休息2s处理业务
                        Thread.sleep(1000);
                    }
                    catch (InterruptedException e)
                    {
                        e.printStackTrace();
                    }
                    finally
                    {
                        System.out.println("[1] done");
                        //手动应答
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    }
                }
            };
            //设置手动应答
            boolean autoAck = false;
            //监听队列
            channel.basicConsume(QUEUE_NAME, autoAck, consumer);
        }
        catch (IOException e)
        {
            e.printStackTrace();
        }
    }
}

3.1.3、Receiver2.java

import com.rabbitmq.client.*;
import top.san.RabbitMq.util.RabbitmqConnectionUtil;

import java.io.IOException;

public class Receiver2 {

    //交换机名称
    private final static String EXCHANGE_NAME = "test_exchange_fanout";

    //队列名称
    private static final String QUEUE_NAME    = "test_queue_phone";

    public static void main(String[] args)
    {
        try
        {

            //获取连接
            Connection connection = RabbitmqConnectionUtil.getConnection();
            //从连接中获取一个通道
            final Channel channel = connection.createChannel();
            //声明交换机(分发:发布/订阅模式)
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
            //声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            //将队列绑定到交换机
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
            //保证一次只分发一个
            int prefetchCount = 1;
            channel.basicQos(prefetchCount);
            //定义消费者
            DefaultConsumer consumer = new DefaultConsumer(channel)
            {
                //当消息到达时执行回调方法
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                                           byte[] body) throws IOException
                {
                    String message = new String(body, "utf-8");
                    System.out.println("[phone] Receive message:" + message);
                    try
                    {
                        //消费者休息1s处理业务
                        Thread.sleep(1000);
                    }
                    catch (InterruptedException e)
                    {
                        e.printStackTrace();
                    }
                    finally
                    {
                        System.out.println("[2] done");
                        //手动应答
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    }
                }
            };
            //设置手动应答
            boolean autoAck = false;
            //监听队列
            channel.basicConsume(QUEUE_NAME, autoAck, consumer);
        }
        catch (IOException e)
        {
            e.printStackTrace();
        }
    }
}

3.1.4、结果

从程序运行结果和RabbitMq的后台看出,这样的消息属于广播型,两个不同名的队列的都能收到该消息,只需它们都将自己绑定到同一个交换机,而且,该消息是持久的,只要交换机还在,消费者啥时候上线都能消费它所绑定的交换机,而且只会一个消费者只会消费一

4、路由(direct)

  1. 在前面的示例中,我们已经在创建绑定。您可能会想起类似的代码:
channel.queueBind(queueName,EXCHANGE_NAME,“”);

绑定是交换和队列之间的关系。可以简单地理解为:队列对来自此交换的消息感兴趣。

  1. 绑定可以采用额外的routingKey参数。为了避免与basic\_publish参数混淆,我们将其称为 绑定键。这是我们可以创建带有键的绑定的方法:
channel.queueBind(queueName,EXCHANGE_NAME,“ black”);
  • 直接绑定(密钥直接绑定到单个队列)

  • 多重绑定(相同的绑定密钥绑定多个队列)

  • 不同密钥绑定不同的队列,可以发挥出不同日志级别发送到不同的队列的效果。

4.1.1、Sender

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import top.san.RabbitMq.util.RabbitmqConnectionUtil;

import java.io.IOException;


public class Sender {
    private final static String EXCHANGE_NAME = "exchange_direct";
    private final static String EXCHANGE_TYPE = "direct";

    public static void main(String[] args) throws IOException {
        Connection connection = RabbitmqConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME,EXCHANGE_TYPE);

        String message = "那一定是蓝色";
        channel.basicPublish(EXCHANGE_NAME,"key2", null, message.getBytes());
        System.out.println("[x] Sent '"+message+"'");

        channel.close();
        connection.close();
    }
}

4.1.2、Receiver1.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import top.san.RabbitMq.util.RabbitmqConnectionUtil;

import java.io.IOException;

/**
 * Created by san
 */
public class Receiver1 {
    private final  static  String QUEUE_NAME = "queue_routing";
    private final static String EXCHANGE_NAME = "exchange_direct";

    public static void main(String[] args) throws IOException, InterruptedException {
        // 获取到连接以及mq通道
        Connection connection = RabbitmqConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false,false,false,null);
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"key");
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"key2");

        channel.basicQos(1);

        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(QUEUE_NAME, false, consumer);

        while(true){
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println("[x] Received1 "+message);
            Thread.sleep(10);

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }


    }
}

4.1.3、Receiver2.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import top.san.RabbitMq.util.RabbitmqConnectionUtil;
import java.io.IOException;

/**
 * Created by san
 */
public class Receiver2 {
    private final  static  String QUEUE_NAME = "queue_routing2";
    private final static String EXCHANGE_NAME = "exchange_direct";

    public static void main(String[] args) throws IOException, InterruptedException {
        // 获取到连接以及mq通道
        Connection connection = RabbitmqConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false,false,false,null);
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"key2");

        channel.basicQos(1);

        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(QUEUE_NAME, false, consumer);

        while(true){
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println("[x] Received2 "+message);
            Thread.sleep(10);

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}

4.1.4、结果-总结

有一点要注意是:在direct下,必须是Exchange(交换机)已经存在,消费端的队列才能绑定到Exchange,否则会报错。也就说上面的程序第一次运行时,需先启Sender,才能成功启动Reciver。

5、话题(topic)

话题也是一个持久的消息,只要交换机还在,每个上线的消费者都可以消费一次自己感兴趣的topic。

  • *(星号)可以代替一个单词。
  • #(哈希)可以替代零个或多个单词。

关注公众号:麒麟改bug,还可以获取Java核心知识点思维导图以及Java核心学习笔记。

5.1.1、Sender.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import top.san.RabbitMq.util.RabbitmqConnectionUtil;

import java.io.IOException;

public class Sender {
    private final static String EXCHANGE_NAME = "exchange_topic";
    private final static String EXCHANGE_TYPE = "topic";

    public static void main(String[] args) throws IOException {
        Connection connection = RabbitmqConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE);

        //消息内容
        String message = "这里是卢本伟广场";
        //第二个参数是topic匹配值
        channel.basicPublish(EXCHANGE_NAME,"lbw.nb",null,message.getBytes());
        System.out.println("[x] Sent '"+message+"'");

        //关通道 关连接
        channel.close();
        connection.close();
    }
}

5.1.2、Receiver1.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import top.san.RabbitMq.util.RabbitmqConnectionUtil;

import java.io.IOException;

public class Receiver1 {
    private final static String QUEUE_NAME = "queue_topic";
    private final static String EXCHANGE_NAME = "exchange_topic";
    private final static String EXCHANGE_TYPE = "topic";

    public static void main(String[] args) throws IOException, InterruptedException {
        Connection connection = RabbitmqConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false,false, null);
        //第二参数就是去匹配我兴趣的topic
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "lbw.nb.*");

        channel.basicQos(1);

        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(QUEUE_NAME, false, consumer);

        while(true){
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println("[x] Received1 '"+message + "'");
            Thread.sleep(10);
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

        }
    }
}

5.1.3、Receiver2.java

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import top.san.RabbitMq.util.RabbitmqConnectionUtil;

import java.io.IOException;

public class Receiver2 {

    private final static String QUEUE_NAME = "queue_topic2";
    private final static String EXCHANGE_NAME = "exchange_topic";
    private final static String EXCHANGE_TYPE = "topic";

    public static void main(String[] args) throws IOException, InterruptedException {
        Connection connection = RabbitmqConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false,false, null);

        //第二参数就是去匹配我兴趣的topic
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "lbw.#");

        channel.basicQos(1);

        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(QUEUE_NAME, false, consumer);

        while(true){
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println("[x] Received2 '"+message + "'");
            Thread.sleep(10);

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}

5.1.4、结果-分析

话题的特色就是队列可以获取自己感兴趣的话题消息,可以通过通配符*或#来表示匹配所有的感兴趣的字符串。

6、RPC(远程过程调用)

给张图自己体会吧(官网没给示例代码,我也就不写了),就是通过两个交换机实现一个可回调的过程吧。

三、RabbitMq的交换机

RabbitMq是有一个交换机的概念的, 消息(Message)由Client发送,RabbitMQ接收到消息之后通过交换机转发到对应的队列上面。Worker会从队列中获取未被读取的数据处理。这样就可以实现消息的发送者无需知道消息使用者的存在,反之亦然,关注公众号:麒麟改bug,还可以获取Java核心知识点思维导图以及Java核心学习笔记。

Direct exchange:直连(路由)交换机,转发消息到routigKey指定的队列

Fanout exchange:扇形交换机,转发消息到所有绑定队列(相当于广播)

Topic exchange:主题交换机,按规则转发消息(很灵活)

Headers exchange:首部交换机

前面的简单类型我们都是忽略了交换机的参数的,如该方法:channel.basicPublish("", QUEUE\_NAME, null, message.getBytes());就是这个方法的第一个参数,置空说明使用了默认的交换机。

有几种交换类型可用:direct,topic,headers 和fanout。

小编分享的文章到这里就结束了,喜欢小编的分享可以点赞分享关注哦,感谢你们的支持!

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
2月前
|
存储 Java 开发者
Java Map实战:用HashMap和TreeMap轻松解决复杂数据结构问题!
【10月更文挑战第17天】本文深入探讨了Java中HashMap和TreeMap两种Map类型的特性和应用场景。HashMap基于哈希表实现,支持高效的数据操作且允许键值为null;TreeMap基于红黑树实现,支持自然排序或自定义排序,确保元素有序。文章通过具体示例展示了两者的实战应用,帮助开发者根据实际需求选择合适的数据结构,提高开发效率。
74 2
|
3天前
|
Java
Java基础却常被忽略:全面讲解this的实战技巧!
本次分享来自于一道Java基础的面试试题,对this的各种妙用进行了深度讲解,并分析了一些关于this的常见面试陷阱,主要包括以下几方面内容: 1.什么是this 2.this的场景化使用案例 3.关于this的误区 4.总结与练习
|
19天前
|
Java 程序员
Java基础却常被忽略:全面讲解this的实战技巧!
小米,29岁程序员,分享Java中`this`关键字的用法。`this`代表当前对象引用,用于区分成员变量与局部变量、构造方法间调用、支持链式调用及作为参数传递。文章还探讨了`this`在静态方法和匿名内部类中的使用误区,并提供了练习题。
21 1
|
1月前
|
安全 Java 开发者
Java 多线程并发控制:深入理解与实战应用
《Java多线程并发控制:深入理解与实战应用》一书详细解析了Java多线程编程的核心概念、并发控制技术及其实战技巧,适合Java开发者深入学习和实践参考。
52 6
|
1月前
|
存储 安全 Java
Java多线程编程中的并发容器:深入解析与实战应用####
在本文中,我们将探讨Java多线程编程中的一个核心话题——并发容器。不同于传统单一线程环境下的数据结构,并发容器专为多线程场景设计,确保数据访问的线程安全性和高效性。我们将从基础概念出发,逐步深入到`java.util.concurrent`包下的核心并发容器实现,如`ConcurrentHashMap`、`CopyOnWriteArrayList`以及`BlockingQueue`等,通过实例代码演示其使用方法,并分析它们背后的设计原理与适用场景。无论你是Java并发编程的初学者还是希望深化理解的开发者,本文都将为你提供有价值的见解与实践指导。 --- ####
|
1月前
|
消息中间件 存储 Java
RocketMQ文件刷盘机制深度解析与Java模拟实现
【11月更文挑战第22天】在现代分布式系统中,消息队列(Message Queue, MQ)作为一种重要的中间件,扮演着连接不同服务、实现异步通信和消息解耦的关键角色。Apache RocketMQ作为一款高性能的分布式消息中间件,广泛应用于实时数据流处理、日志流处理等场景。为了保证消息的可靠性,RocketMQ引入了一种称为“刷盘”的机制,将消息从内存写入到磁盘中,确保消息持久化。本文将从底层原理、业务场景、概念、功能点等方面深入解析RocketMQ的文件刷盘机制,并使用Java模拟实现类似的功能。
41 3
|
2月前
|
存储 消息中间件 安全
JUC组件实战:实现RRPC(Java与硬件通过MQTT的同步通信)
【10月更文挑战第9天】本文介绍了如何利用JUC组件实现Java服务与硬件通过MQTT的同步通信(RRPC)。通过模拟MQTT通信流程,使用`LinkedBlockingQueue`作为消息队列,详细讲解了消息发送、接收及响应的同步处理机制,包括任务超时处理和内存泄漏的预防措施。文中还提供了具体的类设计和方法实现,帮助理解同步通信的内部工作原理。
JUC组件实战:实现RRPC(Java与硬件通过MQTT的同步通信)
|
1月前
|
jenkins Java 测试技术
如何使用 Jenkins 自动发布 Java 代码,通过一个电商公司后端服务的实际案例详细说明
本文介绍了如何使用 Jenkins 自动发布 Java 代码,通过一个电商公司后端服务的实际案例,详细说明了从 Jenkins 安装配置到自动构建、测试和部署的全流程。文中还提供了一个 Jenkinsfile 示例,并分享了实践经验,强调了版本控制、自动化测试等关键点的重要性。
71 3
|
1月前
|
存储 Java 关系型数据库
在Java开发中,数据库连接是应用与数据交互的关键环节。本文通过案例分析,深入探讨Java连接池的原理与最佳实践
在Java开发中,数据库连接是应用与数据交互的关键环节。本文通过案例分析,深入探讨Java连接池的原理与最佳实践,包括连接创建、分配、复用和释放等操作,并通过电商应用实例展示了如何选择合适的连接池库(如HikariCP)和配置参数,实现高效、稳定的数据库连接管理。
65 2
|
1月前
|
Java 关系型数据库 数据库
面向对象设计原则在Java中的实现与案例分析
【10月更文挑战第25天】本文通过Java语言的具体实现和案例分析,详细介绍了面向对象设计的五大核心原则:单一职责原则、开闭原则、里氏替换原则、接口隔离原则和依赖倒置原则。这些原则帮助开发者构建更加灵活、可维护和可扩展的系统,不仅适用于Java,也适用于其他面向对象编程语言。
37 2