RocketMQ 5.0 是一款分布式消息中间件,由阿里巴巴开源,提供了高性能、高可靠、高可扩展性的消息传递服务。它采用发布/订阅模式,支持多种消息协议,如 JMS、MQTT 等,可用于构建企业级应用的异步消息处理、系统解耦、流量削峰等场景。
- 下载并安装 RocketMQ
在 RocketMQ 官网下载对应版本的二进制包,然后解压并启动 RocketMQ 服务。 - 生产者发送消息
创建一个生产者实例,设置生产者组名、NameServer 地址等信息,然后通过 API 或客户端发送消息到 RocketMQ。 - 消费者接收消息
创建一个消费者实例,设置消费者组名、NameServer 地址等信息,然后通过 API 或客户端接收消息。 - 监控 RocketMQ
通过 RocketMQ 的监控系统,如 NameServer 的 Web 界面,查看 RocketMQ 的运行状态、消息队列等。
推荐使用以下 demo 进行 RocketMQ 的入门学习: - 创建一个简单的生产者
public class ProducerDemo {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("producer_group");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
Message msg = new Message("TopicTest", "TagA", "Hello RocketMQ " + System.currentTimeMillis());
SendResult sendResult = producer.send(msg);
System.out.println("SendResult: " + sendResult);
producer.shutdown();
}
}
public class ConsumerDemo {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.println("Received messages: " + msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("Consumer started. Wait for messages...");
}
}