原创作品,允许转载,转载时请务必以超链接形式标明文章
原始出处 、作者信息和本声明。否则将追究法律责任。
http://dgd2010.blog.51cto.com/1539422/1749983
ActiveMQ软件概述
ActiveMQ高可用原理
1.ActiveMQ的master-slave
2.ActiveMQ的networkConnectors
3.ActiveMQ的failover客户端连接协议
ActiveMQ集群环境
1.私有云环境下的ActiveMQ高可用集群方案
2.公有云环境下的ActiveMQ高可用集群方案
ActiveMQ集群环境配置之私有云环境配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
<
broker
xmlns
=
"http://activemq.apache.org/schema/core"
dataDirectory
=
"${activemq.data}"
brokerName
=
"192.168.1.241"
useJmx
=
"true"
advisorySupport
=
"false"
persistent
=
"true"
deleteAllMessagesOnStartup
=
"false"
useShutdownHook
=
"false"
schedulerSupport
=
"true"
>
<
networkConnectors
>
<
networkConnector
uri
=
"multicast://default"
/>
</
networkConnectors
>
<
transportConnectors
>
<
transportConnector
name
=
"openwire"
uri
=
"tcp://0.0.0.0:61618"
discoveryUri
=
"multicast://default"
/>
</
transportConnectors
>
<
persistenceAdapter
>
<
kahaDB
directory
=
"/data/ActivemqSharedBrokerData"
/>
</
persistenceAdapter
>
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
<
broker
xmlns
=
"http://activemq.apache.org/schema/core"
dataDirectory
=
"${activemq.data}"
brokerName
=
"192.168.1.242"
useJmx
=
"true"
advisorySupport
=
"false"
persistent
=
"true"
deleteAllMessagesOnStartup
=
"false"
useShutdownHook
=
"false"
schedulerSupport
=
"true"
>
<
networkConnectors
>
<
networkConnector
uri
=
"multicast://default"
/>
</
networkConnectors
>
<
transportConnectors
>
<
transportConnector
name
=
"openwire"
uri
=
"tcp://0.0.0.0:61618"
discoveryUri
=
"multicast://default"
/>
</
transportConnectors
>
<
persistenceAdapter
>
<
kahaDB
directory
=
"/data/ActivemqSharedBrokerData"
/>
</
persistenceAdapter
>
|
1
2
3
4
5
6
7
8
9
10
11
|
<
broker
xmlns
=
"http://activemq.apache.org/schema/core"
dataDirectory
=
"${activemq.data}"
brokerName
=
"192.168.1.243"
useJmx
=
"true"
advisorySupport
=
"false"
persistent
=
"true"
deleteAllMessagesOnStartup
=
"false"
useShutdownHook
=
"false"
schedulerSupport
=
"true"
>
<
networkConnectors
>
<
networkConnector
uri
=
"multicast://default"
/>
</
networkConnectors
>
<
transportConnectors
>
<
transportConnector
name
=
"openwire"
uri
=
"tcp://0.0.0.0:61618"
discoveryUri
=
"multicast://default"
/>
</
transportConnectors
>
|
ActiveMQ集群环境配置之共有云环境配置
1
2
3
|
<
networkConnectors
>
<
networkConnector
uri
=
"static:(tcp://host1:61616,tcp://host2:61616,tcp://..)"
/>
</
networkConnectors
>
|
ActiveMQ集群环境配置之共有云+Docker环境配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
|
<
beans
xmlns
=
"http://www.springframework.org/schema/beans"
xmlns:xsi
=
"http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation
=
"http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"
>
<
bean
class
=
"org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"
>
<
property
name
=
"locations"
>
<
value
>file:${activemq.conf}/credentials.properties</
value
>
</
property
>
</
bean
>
<
bean
id
=
"logQuery"
class
=
"io.fabric8.insight.log.log4j.Log4jLogQuery"
lazy-init
=
"false"
scope
=
"singleton"
init-method
=
"start"
destroy-method
=
"stop"
></
bean
>
<
broker
xmlns
=
"http://activemq.apache.org/schema/core"
dataDirectory
=
"${activemq.data}"
brokerName
=
"localhost"
useJmx
=
"true"
advisorySupport
=
"false"
persistent
=
"true"
deleteAllMessagesOnStartup
=
"false"
useShutdownHook
=
"false"
>
<
networkConnectors
>
<
networkConnector
uri
=
"static:(tcp://server1-activemq-01-master:61616,tcp://server2-activemq-02-master:61616)"
/>
</
networkConnectors
>
<
destinationPolicy
>
<
policyMap
>
<
policyEntries
>
<
policyEntry
topic
=
">"
>
<
pendingMessageLimitStrategy
>
<
constantPendingMessageLimitStrategy
limit
=
"1000"
/>
</
pendingMessageLimitStrategy
>
</
policyEntry
>
</
policyEntries
>
</
policyMap
>
</
destinationPolicy
>
<
managementContext
>
<
managementContext
createConnector
=
"false"
/>
</
managementContext
>
<
persistenceAdapter
>
<
kahaDB
directory
=
"/data/activemq/kahadb"
/>
</
persistenceAdapter
>
<
systemUsage
>
<
systemUsage
>
<
memoryUsage
>
<
memoryUsage
percentOfJvmHeap
=
"70"
/>
</
memoryUsage
>
<
storeUsage
>
<
storeUsage
limit
=
"100 gb"
/>
</
storeUsage
>
<
tempUsage
>
<
tempUsage
limit
=
"50 gb"
/>
</
tempUsage
>
</
systemUsage
>
</
systemUsage
>
<
transportConnectors
>
<
transportConnector
name
=
"openwire"
uri
=
"tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"
/>
<
transportConnector
name
=
"amqp"
uri
=
"amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600"
/>
<
transportConnector
name
=
"stomp"
uri
=
"stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"
/>
<
transportConnector
name
=
"mqtt"
uri
=
"mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"
/>
<
transportConnector
name
=
"ws"
uri
=
"ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"
/>
</
transportConnectors
>
<
shutdownHooks
>
<
bean
xmlns
=
"http://www.springframework.org/schema/beans"
class
=
"org.apache.activemq.hooks.SpringContextHook"
/>
</
shutdownHooks
>
</
broker
>
<
import
resource
=
"jetty.xml"
/>
</
beans
>
|
1
|
ActiveMQConnectionFactory connectionFactory =
new
ActiveMQConnectionFactory(
"admin"
,
"your_password"
,
"failover:(tcp://192.168.1.241:61616,tcp://192.168.1.242:61616)?randomize=false"
);
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
|
import
org.apache.activemq.ActiveMQConnectionFactory;
import
javax.jms.Connection;
import
javax.jms.DeliveryMode;
import
javax.jms.Destination;
import
javax.jms.ExceptionListener;
import
javax.jms.JMSException;
import
javax.jms.Message;
import
javax.jms.MessageConsumer;
import
javax.jms.MessageProducer;
import
javax.jms.Session;
import
javax.jms.TextMessage;
/**
* Hello world!
*/
public
class
activemq5Failover {
public
static
void
main(String[] args)
throws
Exception {
thread(
new
HelloWorldProducer(),
false
);
thread(
new
HelloWorldProducer(),
false
);
thread(
new
HelloWorldConsumer(),
false
);
Thread.sleep(
1000
);
thread(
new
HelloWorldConsumer(),
false
);
thread(
new
HelloWorldProducer(),
false
);
thread(
new
HelloWorldConsumer(),
false
);
thread(
new
HelloWorldProducer(),
false
);
Thread.sleep(
1000
);
thread(
new
HelloWorldConsumer(),
false
);
thread(
new
HelloWorldProducer(),
false
);
thread(
new
HelloWorldConsumer(),
false
);
thread(
new
HelloWorldConsumer(),
false
);
thread(
new
HelloWorldProducer(),
false
);
thread(
new
HelloWorldProducer(),
false
);
Thread.sleep(
1000
);
thread(
new
HelloWorldProducer(),
false
);
thread(
new
HelloWorldConsumer(),
false
);
thread(
new
HelloWorldConsumer(),
false
);
thread(
new
HelloWorldProducer(),
false
);
thread(
new
HelloWorldConsumer(),
false
);
thread(
new
HelloWorldProducer(),
false
);
thread(
new
HelloWorldConsumer(),
false
);
thread(
new
HelloWorldProducer(),
false
);
thread(
new
HelloWorldConsumer(),
false
);
thread(
new
HelloWorldConsumer(),
false
);
thread(
new
HelloWorldProducer(),
false
);
}
public
static
void
thread(Runnable runnable,
boolean
daemon) {
Thread brokerThread =
new
Thread(runnable);
brokerThread.setDaemon(daemon);
brokerThread.start();
}
public
static
class
HelloWorldProducer
implements
Runnable {
public
void
run() {
try
{
// Create a ConnectionFactory
// Refer: http://activemq.apache.org/maven/apidocs/org/apache/activemq/ActiveMQConnectionFactory.html
ActiveMQConnectionFactory connectionFactory =
new
ActiveMQConnectionFactory(
"admin"
,
"your_password"
,
"failover:(tcp://192.168.1.241:61616,tcp://192.168.1.242:61616)?randomize=false"
);
// Create a Connection
Connection connection = connectionFactory.createConnection();
connection.start();
// Create a Session
Session session = connection.createSession(
false
, Session.AUTO_ACKNOWLEDGE);
// Create the destination (Topic or Queue)
Destination destination = session.createQueue(
"TEST.FOO"
);
// Create a MessageProducer from the Session to the Topic or Queue
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
// Create a messages
String text =
"Hello world! From: "
+ Thread.currentThread().getName() +
" : "
+
this
.hashCode();
TextMessage message = session.createTextMessage(text);
// Tell the producer to send the message
System.out.println(
"Sent message: "
+ message.hashCode() +
" : "
+ Thread.currentThread().getName());
producer.send(message);
// Clean up
session.close();
connection.close();
}
catch
(Exception e) {
System.out.println(
"Caught: "
+ e);
e.printStackTrace();
}
}
}
public
static
class
HelloWorldConsumer
implements
Runnable, ExceptionListener {
public
void
run() {
try
{
// Create a ConnectionFactory
ActiveMQConnectionFactory connectionFactory =
new
ActiveMQConnectionFactory(
"admin"
,
"your_password"
,
"failover:(tcp://192.168.1.241:61616,tcp://192.168.1.242:61616)?randomize=false"
);
// Create a Connection
Connection connection = connectionFactory.createConnection();
connection.start();
connection.setExceptionListener(
this
);
// Create a Session
Session session = connection.createSession(
false
, Session.AUTO_ACKNOWLEDGE);
// Create the destination (Topic or Queue)
Destination destination = session.createQueue(
"TEST.FOO"
);
// Create a MessageConsumer from the Session to the Topic or Queue
MessageConsumer consumer = session.createConsumer(destination);
// Wait for a message
Message message = consumer.receive(
1000
);
if
(message
instanceof
TextMessage) {
TextMessage textMessage = (TextMessage) message;
String text = textMessage.getText();
System.out.println(
"Received: "
+ text);
}
else
{
System.out.println(
"Received: "
+ message);
}
consumer.close();
session.close();
connection.close();
}
catch
(Exception e) {
System.out.println(
"Caught: "
+ e);
e.printStackTrace();
}
}
public
synchronized
void
onException(JMSException ex) {
System.out.println(
"JMS Exception occured. Shutting down client."
);
}
}
}
|