spring boot 集成 RabbitMq还是很方便的。现在来一个简单的例子来集成rabbitmq。入门demo。
主要概念:
其中比较重要的概念有 4 个,分别为:虚拟主机,交换机,队列,和绑定。
虚拟主机:一个虚拟主机持有一组交换机、队列和绑定。为什么需要多个虚拟主机呢?很简单,RabbitMQ当中,用户只能在虚拟主机的粒度进行权限控制。 因此,如果需要禁止A组访问B组的交换机/队列/绑定,必须为A和B分别创建一个虚拟主机。每一个RabbitMQ服务器都有一个默认的虚拟主机“/”。
交换机:Exchange 用于转发消息,但是它不会做存储 ,如果没有 Queue bind 到 Exchange 的话,它会直接丢弃掉 Producer 发送过来的消息。 这里有一个比较重要的概念:路由键 。消息到交换机的时候,交互机会转发到对应的队列中,那么究竟转发到哪个队列,就要根据该路由键。
绑定:也就是交换机需要和队列相绑定,这其中如上图所示,是多对多的关系。
首先是配制文件。
1
2
3
4
5
|
#spring.application.name=spring-boot-rabbitmq
spring.rabbitmq.host=
127.0
.
0.1
spring.rabbitmq.port=
5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
|
发送者:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
package
com.basic.rabbitmq.send;
import
com.basic.rabbitmq.configuration.RabbitMqConfig2;
import
org.springframework.amqp.core.AmqpTemplate;
import
org.springframework.beans.factory.annotation.Autowired;
import
org.springframework.stereotype.Component;
import
org.springframework.stereotype.Service;
import
java.util.Date;
/**
* Created by sdc on 2017/6/17.
*/
@Service
(
"helloSender"
)
public
class
HelloSender {
@Autowired
private
AmqpTemplate amqpTemplate;
// private Rabbitt
public
void
send() {
String contenxt =
"order_queue_message"
;
this
.amqpTemplate.convertAndSend(RabbitMqConfig2.QUEUE_EXCHANGE_NAME,
"order_queue_routing"
,contenxt);
// this.amqpTemplate.conver
}
}
|
配制信息:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
|
package
com.basic.rabbitmq.configuration;
import
com.basic.rabbitmq.receiver.Receiver;
import
com.rabbitmq.client.Channel;
import
com.rabbitmq.client.ConfirmListener;
import
org.springframework.amqp.core.*;
import
org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import
org.springframework.amqp.rabbit.connection.ConnectionFactory;
import
org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import
org.springframework.amqp.rabbit.listener.MessageListenerContainer;
import
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import
org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import
org.springframework.context.annotation.Bean;
import
org.springframework.context.annotation.Configuration;
import
java.io.IOException;
/**
* Created by sdc on 2017/6/17.
*/
@Configuration
public
class
RabbitMqConfig2 {
public
static
final
String QUEUE_NAME =
"order_queue"
;
public
static
final
String QUEUE_EXCHANGE_NAME =
"topic_exchange_new"
;
public
static
final
String routing_key =
"order_queue_routing"
;
@Bean
public
Queue queue() {
//是否持久化
boolean
durable =
false
;
//仅创建者可以使用该队列,断开后自动删除
boolean
exclusive =
false
;
//当所有消费者都断开连接后,是否删除队列
boolean
autoDelete =
false
;
return
new
Queue(QUEUE_NAME, durable, exclusive, autoDelete);
}
@Bean
public
TopicExchange exchange() {
//是否持久化
boolean
durable =
false
;
//当所有消费者都断开连接后,是否删除队列
boolean
autoDelete =
false
;
return
new
TopicExchange(QUEUE_EXCHANGE_NAME, durable, autoDelete);
}
@Bean
public
Binding binding() {
return
BindingBuilder.bind(queue()).to(exchange()).with(routing_key);
}
@Bean
public
ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory =
new
CachingConnectionFactory(
"127.0.0.1"
,
5672
);
connectionFactory.setUsername(
"admin"
);
connectionFactory.setPassword(
"admin"
);
connectionFactory.setVirtualHost(
"/"
);
/** 如果要进行消息回调,则这里必须要设置为true */
connectionFactory.setPublisherConfirms(
true
);
// 必须要设置
// connectionFactory.setPublisherReturns();
return
connectionFactory;
}
@Bean
SimpleMessageListenerContainer container() {
SimpleMessageListenerContainer container =
new
SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory());
// container.setQueueNames(QUEUE_NAME);
container.setQueues(queue());
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
//设置确认模式手工确认
container.setMessageListener(
new
ChannelAwareMessageListener() {
@Override
public
void
onMessage(Message message, Channel channel)
throws
Exception {
byte
[] body = message.getBody();
System.out.println(
"收到消息 : "
+
new
String(body));
channel.queueDeclare(QUEUE_NAME,
true
,
false
,
false
,
null
);
// channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //确认消息成功消费
// channel.basicAck(); //应答
// channel.basicReject();//拒绝
// channel.basicRecover(); //恢复
// channel.basicQos();
// channel.addConfirmListener(new ConfirmListener() {
// @Override
// public void handleAck(long deliveryTag, boolean multiple) throws IOException {
// //失败重发
// }
//
// @Override
// public void handleNack(long deliveryTag, boolean multiple) throws IOException {
// //确认ok
// }
// });
}
});
return
container;
}
// @Bean
// MessageListenerAdapter listenerAdapter(Receiver receiver) {
// return new MessageListenerAdapter(receiver, "receiveMessage");
// }
}
|
测试类:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
package
com.rabbit.test;
import
com.basic.rabbitmq.send.HelloSender;
import
com.basic.system.Application;
import
org.junit.Test;
import
org.junit.runner.RunWith;
import
org.springframework.beans.factory.annotation.Autowired;
import
org.springframework.boot.test.context.SpringBootTest;
import
org.springframework.test.context.junit4.SpringRunner;
import
javax.annotation.Resource;
/**
* Created by sdc on 2017/6/17.
*/
@RunWith(SpringRunner.
class
)
@SpringBootTest(classes = Application.
class
)
public
class
RabbitMqTest {
@Autowired
public
HelloSender helloSender;
@Test
public
void helloword() throws Exception {
helloSender.send();
}
}
|
这只是一个demo,学习的时候会测试各种的事情,在这基础上更改就可以了,心中的疑虑测试没了就可以写一些项目了。
本文转自 豆芽菜橙 51CTO博客,原文链接:http://blog.51cto.com/shangdc/1944218