安装完成了之后,我们就可以正式使用啦,现在我们来完成一个hello world的消息推送和消费吧
引入链接端
创建一个springboot项目,在pom文件引入RabbitMQ的依赖
<!-- RabbitMQ -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.10.0</version>
</dependency>
编写一个main方法,用来推送消息
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author tang.sl
* @date 2021-01-14 14:35
*/
public class ProductTest {
private static final String EXCHANGE_NAME = "exchange_demo";
private static final String ROUTING_KEY = "routing_demo";
private static final String QUEUE_NAME = "queue_demo";
private static final String IP_ADDRESS = "192.168.230.131";
private static final int PORT = 5672;
public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置RabbitMQ的链接参数
factory.setHost(IP_ADDRESS);
factory.setPort(PORT);
factory.setUsername("echo");
factory.setPassword("123456");
// 和RabbitMQ建立一个链接
Connection connection = factory.newConnection();
// 创建一个频道
Channel channel = connection.createChannel();
// 创建一个 type="direct" 、持久化的、非自动删除的交换器
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null);
// 创建一个持久化、非排他的、非自动删除的队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 将交换器与队列通过路由键绑定
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
// 发送一条持久化的消息: hello world !
String message = "Hello World !";
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
// 关闭资源
channel.close();
connection.close();
}
}
启动测试,我们可以看到web界面的如下结果
启动main方法之前要保证我们的用户有足够的权限和端口不被防火墙拦截。
到这里我们就已经成功推送了消息啦。关于代码里面的一些设置,出了注释提示的外,后面我们会不断深入讲解
编写一个消费者案例
import com.rabbitmq.client.*;
import lombok.SneakyThrows;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* @author tang.sl
* @date 2021-01-14 15:05
*/
public class ConsumerTest {
private static final String QUEUE_NAME = "queue_demo";
private static final String IP_ADDRESS = "192.168.230.131";
private static final int PORT = 5672;
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
Address[] addresses = new Address[]{new Address(IP_ADDRESS, PORT)};
ConnectionFactory factory = new ConnectionFactory();
// 设置RabbitMQ的链接参数
factory.setUsername("echo");
factory.setPassword("123456");
// 和RabbitMQ建立一个链接
Connection connection = factory.newConnection(addresses);
final Channel channel = connection.createChannel();
Consumer consumer = new DefaultConsumer(channel) {
@SneakyThrows
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
System.out.println("recv message: " + new String(body));
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(QUEUE_NAME, consumer);
//等待回调函数执行完毕之后 关闭资源
TimeUnit.SECONDS.sleep(5);
channel.close();
connection.close();
}
}
启动测试,我们可以看到控制台接受到了我们的消息了
这里接受到消息之后,我们可以去观察web管理界面,会发现消息不见了