ps:本文只是简单一个整合介绍,属于抛砖引玉,具体实现还需大家深入研究哈..
1.首先是生产者配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
<?
xml
version
=
"1.0"
encoding
=
"UTF-8"
?>
<
beans
xmlns
=
"http://www.springframework.org/schema/beans"
xmlns:xsi
=
"http://www.w3.org/2001/XMLSchema-instance"
xmlns:context
=
"http://www.springframework.org/schema/context"
xmlns:rabbit
=
"http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">
<!-- 连接服务配置 -->
<
rabbit:connection-factory
id
=
"connectionFactory"
host
=
"localhost"
username
=
"guest"
password
=
"guest"
port
=
"5672"
/>
<
rabbit:admin
connection-factory
=
"connectionFactory"
/>
<!-- queue 队列声明-->
<
rabbit:queue
id
=
"queue_one"
durable
=
"true"
auto-delete
=
"false"
exclusive
=
"false"
name
=
"queue_one"
/>
<!-- exchange queue binging key 绑定 -->
<
rabbit:direct-exchange
name
=
"my-mq-exchange"
durable
=
"true"
auto-delete
=
"false"
id
=
"my-mq-exchange"
>
<
rabbit:bindings
>
<
rabbit:binding
queue
=
"queue_one"
key
=
"queue_one_key"
/>
</
rabbit:bindings
>
</
rabbit:direct-exchange
>
<-- spring amqp默认的是jackson 的一个插件,目的将生产者生产的数据转换为json存入消息队列,由于fastjson的速度快于jackson,这里替换为fastjson的一个实现 -->
<
bean
id
=
"jsonMessageConverter"
class
=
"mq.convert.FastJsonMessageConverter"
></
bean
>
<-- spring template声明-->
<
rabbit:template
exchange
=
"my-mq-exchange"
id
=
"amqpTemplate"
connection-factory
=
"connectionFactory"
message-converter
=
"jsonMessageConverter"
/>
</
beans
>
|
2.fastjson messageconver插件实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
|
import
org.apache.commons.logging.Log;
import
org.apache.commons.logging.LogFactory;
import
org.springframework.amqp.core.Message;
import
org.springframework.amqp.core.MessageProperties;
import
org.springframework.amqp.support.converter.AbstractMessageConverter;
import
org.springframework.amqp.support.converter.MessageConversionException;
import
fe.json.FastJson;
public
class
FastJsonMessageConverter
extends
AbstractMessageConverter {
private
static
Log log = LogFactory.getLog(FastJsonMessageConverter.
class
);
public
static
final
String DEFAULT_CHARSET =
"UTF-8"
;
private
volatile
String defaultCharset = DEFAULT_CHARSET;
public
FastJsonMessageConverter() {
super
();
//init();
}
public
void
setDefaultCharset(String defaultCharset) {
this
.defaultCharset = (defaultCharset !=
null
) ? defaultCharset
: DEFAULT_CHARSET;
}
public
Object fromMessage(Message message)
throws
MessageConversionException {
return
null
;
}
public
<T> T fromMessage(Message message,T t) {
String json =
""
;
try
{
json =
new
String(message.getBody(),defaultCharset);
}
catch
(UnsupportedEncodingException e) {
e.printStackTrace();
}
return
(T) FastJson.fromJson(json, t.getClass());
}
protected
Message createMessage(Object objectToConvert,
MessageProperties messageProperties)
throws
MessageConversionException {
byte
[] bytes =
null
;
try
{
String jsonString = FastJson.toJson(objectToConvert);
bytes = jsonString.getBytes(
this
.defaultCharset);
}
catch
(UnsupportedEncodingException e) {
throw
new
MessageConversionException(
"Failed to convert Message content"
, e);
}
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
messageProperties.setContentEncoding(
this
.defaultCharset);
if
(bytes !=
null
) {
messageProperties.setContentLength(bytes.length);
}
return
new
Message(bytes, messageProperties);
}
}
|
3.生产者端调用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
import
java.util.List;
import
org.springframework.amqp.core.AmqpTemplate;
public
class
MyMqGatway {
@Autowired
private
AmqpTemplate amqpTemplate;
public
void
sendDataToCrQueue(Object obj) {
amqpTemplate.convertAndSend(
"queue_one_key"
, obj);
}
}
|
4.消费者端配置(与生产者端大同小异)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
<?
xml
version
=
"1.0"
encoding
=
"UTF-8"
?>
<
beans
xmlns
=
"http://www.springframework.org/schema/beans"
xmlns:xsi
=
"http://www.w3.org/2001/XMLSchema-instance"
xmlns:context
=
"http://www.springframework.org/schema/context"
xmlns:rabbit
=
"http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">
<!-- 连接服务配置 -->
<
rabbit:connection-factory
id
=
"connectionFactory"
host
=
"localhost"
username
=
"guest"
password
=
"guest"
port
=
"5672"
/>
<
rabbit:admin
connection-factory
=
"connectionFactory"
/>
<!-- queue 队列声明-->
<
rabbit:queue
id
=
"queue_one"
durable
=
"true"
auto-delete
=
"false"
exclusive
=
"false"
name
=
"queue_one"
/>
<!-- exchange queue binging key 绑定 -->
<
rabbit:direct-exchange
name
=
"my-mq-exchange"
durable
=
"true"
auto-delete
=
"false"
id
=
"my-mq-exchange"
>
<
rabbit:bindings
>
<
rabbit:binding
queue
=
"queue_one"
key
=
"queue_one_key"
/>
</
rabbit:bindings
>
</
rabbit:direct-exchange
>
<!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象-->
<
rabbit:listener-container
connection-factory
=
"connectionFactory"
acknowledge
=
"auto"
task-executor
=
"taskExecutor"
>
<
rabbit:listener
queues
=
"queue_one"
ref
=
"queueOneLitener"
/>
</
rabbit:listener-container
>
</
beans
>
|
5.消费者端调用
1
2
3
4
5
6
7
8
9
|
import
org.springframework.amqp.core.Message;
import
org.springframework.amqp.core.MessageListener;
public
class
QueueOneLitener
implements
MessageListener{
@Override
public
void
onMessage(Message message) {
System.out.println(
" data :"
+ message.getBody());
}
}
|
6.由于消费端当队列有数据到达时,对应监听的对象就会被通知到,无法做到批量获取,批量入库,因此可以在消费端缓存一个临时队列,将mq取出来的数据存入本地队列,后台线程定时批量处理即可
转自:http://blog.csdn.net/l192168134/article/details/51210188