docker-compose.yml
注意:启动 RocketMQ Server + Broker + Console 至少需要 2G 内存
version: '3.5' services: rmqnamesrv: image: rocketmqinc/rocketmq container_name: rmqnamesrv restart: always ports: - 9876:9876 environment: #内存分配 JAVA_OPT_EXT: "-server -Xms1g -Xmx1g" volumes: - ./logs:/root/logs command: sh mqnamesrv networks: rmq: aliases: - rmqnamesrv rmqbroker: image: rocketmqinc/rocketmq container_name: rmqbroker restart: always depends_on: - rmqnamesrv ports: - 10909:10909 - 10911:10911 volumes: - ./logs:/root/logs - ./store:/root/store - ./conf/broker.conf:/opt/rocketmq-4.4.0/conf/broker.conf command: sh mqbroker -c /opt/rocketmq-4.4.0/conf/broker.conf environment: NAMESRV_ADDR: "rmqnamesrv:9876" JAVA_OPT_EXT: "-server -Xms1g -Xmx1g -Xmn1g" networks: rmq: aliases: - rmqbroker rmqconsole: image: styletang/rocketmq-console-ng container_name: rocketmq-console restart: always ports: - 8080:8080 depends_on: - rmqnamesrv volumes: - /etc/localtime:/etc/localtime:ro - /home/rocketmq/console/logs:/root/logs environment: JAVA_OPTS: "-Drocketmq.namesrv.addr=rmqnamesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false" networks: rmq: aliases: - rmqconsole networks: rmq: name: rmq driver: bridge
broker.conf
RocketMQ Broker 需要一个配置文件,按照上面的 Compose 配置,我们需要在 ./conf
目录下创建一个名为 broker.conf
的配置文件,内容如下:
brokerName = broker-a brokerId = 0 deleteWhen = 04 fileReservedTime = 48 brokerRole = ASYNC_MASTER flushDiskType = ASYNC_FLUSH # 如果是本地程序调用云主机 mq,这个需要设置成 云主机 IP brokerIP1=192.168.72.78
启动rocketmq
在编写的docker-compose.yml下执行
docker-compose up -d
Maven 依赖
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.3.0</version> </dependency>
Gradle
compile 'org.apache.rocketmq:rocketmq-client:4.3.0'
同步消费者
public class SyncProducer { public static void main(String[] args) throws Exception { //Instantiate with a producer group name. DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); // Specify name server addresses. producer.setNamesrvAddr("localhost:9876"); //Launch the instance. producer.start(); for (int i = 0; i < 100; i++) { //Create a message instance, specifying topic, tag and message body. Message msg = new Message("TopicTest" /* Topic */, "TagA" /* Tag */, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); //Call send message to deliver message to one of brokers. SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } //Shut down once the producer instance is not longer in use. producer.shutdown(); } }
异步生产者
public class AsyncProducer { public static void main(String[] args) throws Exception { //Instantiate with a producer group name. DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); // Specify name server addresses. producer.setNamesrvAddr("localhost:9876"); //Launch the instance. producer.start(); producer.setRetryTimesWhenSendAsyncFailed(0); int messageCount = 100; final CountDownLatch countDownLatch = new CountDownLatch(messageCount); for (int i = 0; i < messageCount; i++) { try { final int index = i; Message msg = new Message("Jodie_topic_1023", "TagA", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); producer.send(msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { countDownLatch.countDown(); System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId()); } @Override public void onException(Throwable e) { countDownLatch.countDown(); System.out.printf("%-10d Exception %s %n", index, e); e.printStackTrace(); } }); } catch (Exception e) { e.printStackTrace(); } } countDownLatch.await(5, TimeUnit.SECONDS); producer.shutdown(); } }
单向生产者
public class OnewayProducer { public static void main(String[] args) throws Exception{ //Instantiate with a producer group name. DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); // Specify name server addresses. producer.setNamesrvAddr("localhost:9876"); //Launch the instance. producer.start(); for (int i = 0; i < 100; i++) { //Create a message instance, specifying topic, tag and message body. Message msg = new Message("TopicTest" /* Topic */, "TagA" /* Tag */, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); //Call send message to deliver message to one of brokers. producer.sendOneway(msg); } //Wait for sending to complete Thread.sleep(5000); producer.shutdown(); } }
消费者
public class Consumer { public static void main(String[] args) throws InterruptedException, MQClientException { // Instantiate with specified consumer group name. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name"); // Specify name server addresses. consumer.setNamesrvAddr("localhost:9876"); // Subscribe one more more topics to consume. consumer.subscribe("TopicTest", "*"); // Register callback to execute on arrival of messages fetched from brokers. consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //Launch the consumer instance. consumer.start(); System.out.printf("Consumer Started.%n"); } }
订阅模式
- 生产者:
public class OrderedProducer { public static void main(String[] args) throws Exception { //Instantiate with a producer group name. MQProducer producer = new DefaultMQProducer("example_group_name"); //Launch the instance. producer.start(); String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"}; for (int i = 0; i < 100; i++) { int orderId = i % 10; //Create a message instance, specifying topic, tag and message body. Message msg = new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Integer id = (Integer) arg; int index = id % mqs.size(); return mqs.get(index); } }, orderId); System.out.printf("%s%n", sendResult); } //server shutdown producer.shutdown(); } }
- 消费者:
public class OrderedConsumer { public static void main(String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("TopicTest", "TagA || TagC || TagD"); consumer.registerMessageListener(new MessageListenerOrderly() { AtomicLong consumeTimes = new AtomicLong(0); @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { context.setAutoCommit(false); System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n"); this.consumeTimes.incrementAndGet(); if ((this.consumeTimes.get() % 2) == 0) { return ConsumeOrderlyStatus.SUCCESS; } else if ((this.consumeTimes.get() % 3) == 0) { return ConsumeOrderlyStatus.ROLLBACK; } else if ((this.consumeTimes.get() % 4) == 0) { return ConsumeOrderlyStatus.COMMIT; } else if ((this.consumeTimes.get() % 5) == 0) { context.setSuspendCurrentQueueTimeMillis(3000); return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } return ConsumeOrderlyStatus.SUCCESS; } }); consumer.start(); System.out.printf("Consumer Started.%n"); } }
广播
- 生产者:
public class BroadcastProducer { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); producer.start(); for (int i = 0; i < 100; i++){ Message msg = new Message("TopicTest", "TagA", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } producer.shutdown(); } }
- 消费者:
public class BroadcastConsumer { public static void main(String[] args) throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); //set to broadcast mode consumer.setMessageModel(MessageModel.BROADCASTING); consumer.subscribe("TopicTest", "TagA || TagC || TagD"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n"); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.printf("Broadcast Consumer Started.%n"); } }
SpringBoot整合RocketMQ
- maven:
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> </dependency>
- controll:
@RestController public class ProducerController { @Resource private RocketMQTemplate rocketMQTemplate; @GetMapping("/send") public String send(String msg) { for (int i = 1; i < 100; i++) { rocketMQTemplate.convertAndSend("test-topic888",msg+i); } return "success"; } }