RocketMQ入门到入土(一)新手也能看懂的原理和实战!(下)

简介: RocketMQ入门到入土(一)新手也能看懂的原理和实战!(下)

1.2、特点


  • 每条消息只需要被处理一次,broker只会把消息发送给消费集群中的一个消费者
  • 在消息重投时,不能保证路由到同一台机器上
  • 消费状态由broker维护


2、广播模式(Broadcasting)


2.1、图解


image.png


2.2、特点


  • 消费进度由consumer维护
  • 保证每个消费者都消费一次消息
  • 消费失败的消息不会重投


八、Java API


说明:


  • RocketMQ服务端版本为目前最新版:4.7.0


  • Java客户端版本采取的目前最新版:4.7.0



pom如下


<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.7.0</version>
</dependency>


1、Producer


发消息肯定要必备如下几个条件:


  • 指定生产组名(不能用默认的,会报错)
  • 配置namesrv地址(必须)
  • 指定topic name(必须)
  • 指定tag/key(可选)


验证消息是否发送成功:消息发送完后可以启动消费者进行消费,也可以去管控台上看消息是否存在。


1.1、send(同步)


public class Producer {
    public static void main(String[] args) throws Exception {
        // 指定生产组名为my-producer
        DefaultMQProducer producer = new DefaultMQProducer("my-producer");
        // 配置namesrv地址
        producer.setNamesrvAddr("124.57.180.156:9876");
        // 启动Producer
        producer.start();
        // 创建消息对象,topic为:myTopic001,消息内容为:hello world
        Message msg = new Message("myTopic001", "hello world".getBytes());
        // 发送消息到mq,同步的
        SendResult result = producer.send(msg);
        System.out.println("发送消息成功!result is : " + result);
        // 关闭Producer
        producer.shutdown();
        System.out.println("生产者 shutdown!");
    }
}


输出结果:


发送消息成功!result is : SendResult [sendStatus=SEND_OK, msgId=A9FE854140F418B4AAC26F7973910000, offsetMsgId=7B39B49D00002A9F00000000000589BE, messageQueue=MessageQueue [topic=myTopic001, brokerName=broker-a, queueId=0], queueOffset=7]
生产者 shutdown!


1.2、send(批量)


public class ProducerMultiMsg {
    public static void main(String[] args) throws Exception {
        // 指定生产组名为my-producer
        DefaultMQProducer producer = new DefaultMQProducer("my-producer");
        // 配置namesrv地址
        producer.setNamesrvAddr("124.57.180.156:9876");
        // 启动Producer
        producer.start();
        String topic = "myTopic001";
        // 创建消息对象,topic为:myTopic001,消息内容为:hello world1/2/3
        Message msg1 = new Message(topic, "hello world1".getBytes());
        Message msg2 = new Message(topic, "hello world2".getBytes());
        Message msg3 = new Message(topic, "hello world3".getBytes());
        // 创建消息对象的集合,用于批量发送
        List<Message> msgs = new ArrayList<>();
        msgs.add(msg1);
        msgs.add(msg2);
        msgs.add(msg3);
        // 批量发送的api的也是send(),只是他的重载方法支持List<Message>,同样是同步发送。
        SendResult result = producer.send(msgs);
        System.out.println("发送消息成功!result is : " + result);
        // 关闭Producer
        producer.shutdown();
        System.out.println("生产者 shutdown!");
    }
}


输出结果:


发送消息成功!result is : SendResult [sendStatus=SEND_OK, msgId=A9FE854139C418B4AAC26F7D13770000,A9FE854139C418B4AAC26F7D13770001,A9FE854139C418B4AAC26F7D13770002, offsetMsgId=7B39B49D00002A9F0000000000058A62,7B39B49D00002A9F0000000000058B07,7B39B49D00002A9F0000000000058BAC, messageQueue=MessageQueue [topic=myTopic001, brokerName=broker-a, queueId=0], queueOffset=8]
生产者 shutdown!


从结果中可以看到只有一个msgId,所以可以发现虽然是三条消息对象,但是却只发送了一次,大大节省了client与server的开销。


错误情况:


批量发送的topic必须是同一个,如果message对象指定不同的topic,那么批量发送的时候会报错:


Exception in thread "main" org.apache.rocketmq.client.exception.MQClientException: Failed to initiate the MessageBatch
For more information, please visit the url, http://rocketmq.apache.org/docs/faq/
    at org.apache.rocketmq.client.producer.DefaultMQProducer.batch(DefaultMQProducer.java:950)
    at org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:898)
    at com.chentongwei.mq.rocketmq.ProducerMultiMsg.main(ProducerMultiMsg.java:29)
Caused by: java.lang.UnsupportedOperationException: The topic of the messages in one batch should be the same
    at org.apache.rocketmq.common.message.MessageBatch.generateFromList(MessageBatch.java:58)
    at org.apache.rocketmq.client.producer.DefaultMQProducer.batch(DefaultMQProducer.java:942)
    ... 2 more


1.3、sendCallBack(异步)


public class ProducerASync {
    public static void main(String[] args) throws Exception {
       // 指定生产组名为my-producer
        DefaultMQProducer producer = new DefaultMQProducer("my-producer");
        // 配置namesrv地址
        producer.setNamesrvAddr("124.57.180.156:9876");
        // 启动Producer
        producer.start();
        // 创建消息对象,topic为:myTopic001,消息内容为:hello world async
        Message msg = new Message("myTopic001", "hello world async".getBytes());
        // 进行异步发送,通过SendCallback接口来得知发送的结果
        producer.send(msg, new SendCallback() {
            // 发送成功的回调接口
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println("发送消息成功!result is : " + sendResult);
            }
            // 发送失败的回调接口
            @Override
            public void onException(Throwable throwable) {
                throwable.printStackTrace();
                System.out.println("发送消息失败!result is : " + throwable.getMessage());
            }
        });
        producer.shutdown();
        System.out.println("生产者 shutdown!");
    }
}


输出结果:


生产者 shutdown!
java.lang.IllegalStateException: org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to [124.57.180.156:9876] failed
    at org.apache.rocketmq.client.impl.factory.MQClientInstance.updateTopicRouteInfoFromNameServer(MQClientInstance.java:681)
    at org.apache.rocketmq.client.impl.factory.MQClientInstance.updateTopicRouteInfoFromNameServer(MQClientInstance.java:511)
    at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.tryToFindTopicPublishInfo(DefaultMQProducerImpl.java:692)
    at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:556)
    at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.access$300(DefaultMQProducerImpl.java:97)
    at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl$4.run(DefaultMQProducerImpl.java:510)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to [124.57.180.156:9876] failed
    at org.apache.rocketmq.remoting.netty.NettyRemotingClient.getAndCreateNameserverChannel(NettyRemotingClient.java:441)
    at org.apache.rocketmq.remoting.netty.NettyRemotingClient.getAndCreateChannel(NettyRemotingClient.java:396)
    at org.apache.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:365)
    at org.apache.rocketmq.client.impl.MQClientAPIImpl.getTopicRouteInfoFromNameServer(MQClientAPIImpl.java:1371)
    at org.apache.rocketmq.client.impl.MQClientAPIImpl.getTopicRouteInfoFromNameServer(MQClientAPIImpl.java:1361)
    at org.apache.rocketmq.client.impl.factory.MQClientInstance.updateTopicRouteInfoFromNameServer(MQClientInstance.java:624)
    ... 10 more
发送消息失败!result is : org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to [124.57.180.156:9876] failed


为啥报错了?很简单,他是异步的,从结果就能看出来,由于是异步的,我还没发送到mq呢,你就先给我shutdown了。肯定不行,所以我们在shutdown前面sleep 1s在看效果


public class ProducerASync {
    public static void main(String[] args) throws Exception {
       // 指定生产组名为my-producer
        DefaultMQProducer producer = new DefaultMQProducer("my-producer");
        // 配置namesrv地址
        producer.setNamesrvAddr("124.57.180.156:9876");
        // 启动Producer
        producer.start();
        // 创建消息对象,topic为:myTopic001,消息内容为:hello world async
        Message msg = new Message("myTopic001", "hello world async".getBytes());
        // 进行异步发送,通过SendCallback接口来得知发送的结果
        producer.send(msg, new SendCallback() {
            // 发送成功的回调接口
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println("发送消息成功!result is : " + sendResult);
            }
            // 发送失败的回调接口
            @Override
            public void onException(Throwable throwable) {
                throwable.printStackTrace();
                System.out.println("发送消息失败!result is : " + throwable.getMessage());
            }
        });
        Thread.sleep(1000);
        producer.shutdown();
        System.out.println("生产者 shutdown!");
    }
}


输出结果:


发送消息成功!result is : SendResult [sendStatus=SEND_OK, msgId=A9FE854106E418B4AAC26F8719B20000, offsetMsgId=7B39B49D00002A9F0000000000058CFC, messageQueue=MessageQueue [topic=myTopic001, brokerName=broker-a, queueId=1], queueOffset=2]
生产者 shutdown!


1.4、sendOneway


public class ProducerOneWay {
    public static void main(String[] args) throws Exception {
        // 指定生产组名为my-producer
        DefaultMQProducer producer = new DefaultMQProducer("my-producer");
        // 配置namesrv地址
        producer.setNamesrvAddr("124.57.180.156:9876");
        // 启动Producer
        producer.start();
        // 创建消息对象,topic为:myTopic001,消息内容为:hello world oneway
        Message msg = new Message("myTopic001", "hello world oneway".getBytes());
        // 效率最高,因为oneway不关心是否发送成功,我就投递一下我就不管了。所以返回是void
        producer.sendOneway(msg);
        System.out.println("投递消息成功!,注意这里是投递成功,而不是发送消息成功哦!因为我sendOneway也不知道到底成没成功,我没返回值的。");
        producer.shutdown();
        System.out.println("生产者 shutdown!");
    }
}


输出结果:


投递消息成功!,注意这里是投递成功,而不是发送消息成功哦!因为我sendOneway也不知道到底成没成功,我没返回值的。
生产者 shutdown!


1.5、效率对比


sendOneway > sendCallBack > send批量 > send单条

很容易理解,sendOneway不求结果,我就负责投递,我不管你失败还是成功,相当于中转站,来了我就扔出去,我不进行任何其他处理。所以最快。


而sendCallBack是异步发送肯定比同步的效率高。


send批量和send单条的效率也是分情况的,如果只有1条msg要发,那还搞毛批量,直接send单条完事。


2、Consumer


每个consumer只能关注一个topic。


发消息肯定要必备如下几个条件:


  • 指定消费组名(不能用默认的,会报错)
  • 配置namesrv地址(必须)
  • 指定topic name(必须)
  • 指定tag/key(可选)


2.1、CLUSTERING


集群模式,默认。


比如启动五个Consumer,Producer生产一条消息后,Broker会选择五个Consumer中的其中一个进行消费这条消息,所以他属于点对点消费模式。


public class Consumer {
    public static void main(String[] args) throws Exception {
        // 指定消费组名为my-consumer
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my-consumer");
        // 配置namesrv地址
        consumer.setNamesrvAddr("124.57.180.156:9876");
        // 订阅topic:myTopic001 下的全部消息(因为是*,*指定的是tag标签,代表全部消息,不进行任何过滤)
        consumer.subscribe("myTopic001", "*");
        // 注册监听器,进行消息消息。
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for (MessageExt msg : msgs) {
                    String str = new String(msg.getBody());
                    // 输出消息内容
                    System.out.println(str);
                }
                // 默认情况下,这条消息只会被一个consumer消费,这叫点对点消费模式。也就是集群模式。
                // ack确认
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 启动消费者
        consumer.start();
        System.out.println("Consumer start");
    }
}


2.2、BROADCASTING


广播模式。


比如启动五个Consumer,Producer生产一条消息后,Broker会把这条消息广播到五个Consumer中,这五个Consumer分别消费一次,每个都消费一次。


// 代码里只需要添加如下这句话即可:
consumer.setMessageModel(MessageModel.BROADCASTING);


2.3、两种模式对比


  • 集群默认是默认的,广播模式是需要手动配置。
  • 一条消息:集群模式下的多个Consumer只会有一个Consumer消费。广播模式下的每一个Consumer都会消费这条消息。
  • 广播模式下,发送一条消息后,会被当前被广播的所有Consumer消费,但是后面新加入的Consumer不会消费这条消息,很好理解:村里面大喇叭喊了全村来领鸡蛋,第二天你们村新来个人,那个人肯定听不到昨天大喇叭喊的消息呀。


3、TAG&&KEY


发送/消费 消息的时候可以指定tag/key来进行过滤消息,支持通配符。*代表消费此topic下的全部消息,不进行过滤。


看下org.apache.rocketmq.common.message.Message源码可以发现发消息的时候可以指定tag和keys:


public Message(String topic, String tags, String keys, byte[] body) {
    this(topic, tags, keys, 0, body, true);
}


比如:


public class ProducerTagsKeys {
    public static void main(String[] args) throws Exception {
        // 指定生产组名为my-producer
        DefaultMQProducer producer = new DefaultMQProducer("my-producer");
        // 配置namesrv地址
        producer.setNamesrvAddr("124.57.180.156:9876");
        // 启动Producer
        producer.start();
        // 创建消息对象,topic为:myTopic001,消息内容为:hello world,且tags为:test-tags,keys为test-keys
        Message msg = new Message("myTopic001", "test-tags", "test-keys", "hello world".getBytes());
        // 发送消息到mq,同步的
        SendResult result = producer.send(msg);
        System.out.println("发送消息成功!result is : " + result);
        // 关闭Producer
        producer.shutdown();
        System.out.println("生产者 shutdown!");
    }
}


输出结果:



发送消息成功!result is : SendResult [sendStatus=SEND_OK, msgId=A9FE854149DC18B4AAC26FA4B7200000, offsetMsgId=7B39B49D00002A9F0000000000058DA6, messageQueue=MessageQueue [topic=myTopic001, brokerName=broker-a, queueId=3], queueOffset=3]
生产者 shutdown!


查看管控台,可以发现tags和keys已经生效了:


image.png


消费的时候如果指定*那就是此topic下的全部消息,我们可以指定前缀通配符,比如:


// 这样就只会消费myTopic001下的tag为test-*开头的消息。
consumer.subscribe("myTopic001", "test-*");
// 代表订阅Topic为myTopic001下的tag为TagA或TagB的所有消息
consumer.subscribe("myTopic001", "TagA||TagB");


还支持SQL表达式过滤,不是很常用。不BB了。


4、常见错误


4.1、sendDefaultImpl call timeout


4.1.1、异常


Exception in thread "main" org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: sendDefaultImpl call timeout
    at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:666)
    at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1342)
    at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1288)
    at org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:324)
    at com.chentongwei.mq.rocketmq.Producer.main(Producer.java:18)


4.1.2、解决


1.如果你是云服务器,首先检查安全组是否允许9876这个端口访问,是否开启了防火墙,如果开启了的话是否将9876映射了出去。



2.修改配置文件broker.conf,加上:


brokerIP1=我用的是阿里云服务器,这里是我的公网IP


启动namesrv和broker的时候加上本机IP(我用的是阿里云服务器,这里是我的公网IP):


./bin/mqnamesrv -n IP:9876
./bin/mqbroker -n IP:9876 -c conf/broker.conf


4.2、No route info of this topic


4.2.1、异常


Exception in thread "main" org.apache.rocketmq.client.exception.MQClientException: No route info of this topic: myTopic001
See http://rocketmq.apache.org/docs/faq/ for further details.
    at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:684)
    at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1342)
    at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1288)
    at org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:324)
    at com.chentongwei.mq.rocketmq.Producer.main(Producer.java:18)


4.2.2、解决


很明显发送成功了,不再是刚才的超时了,但是告诉我们没有这个topic。那不能每次都手动创建呀,所以启动broker的时候可以指定参数让broker为我们自动创建。如下


./bin/mqbroker -n IP:9876 -c conf/broker.conf autoCreateTopicEnable=true


END

相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
2月前
|
消息中间件 Java RocketMQ
RocketMQ实战教程之RocketMQ安装
这是一篇关于RocketMQ安装的实战教程,主要介绍了在CentOS系统上使用传统安装和Docker两种方式安装RocketMQ。首先,系统需要是64位,并且已经安装了JDK 1.8。传统安装包括下载安装包,解压并启动NameServer和Broker。Docker安装则涉及安装docker和docker-compose,然后通过docker-compose.yaml文件配置并启动服务。教程还提供了启动命令和解决问题的提示。
|
2月前
|
消息中间件 前端开发 数据库
RocketMQ实战教程之MQ简介与应用场景
RocketMQ实战教程介绍了MQ的基本概念和应用场景。MQ(消息队列)是生产者和消费者模型,用于异步传输数据,实现系统解耦。消息中间件在生产者发送消息和消费者接收消息之间起到邮箱作用,简化通信。主要应用场景包括:1)应用解耦,如订单系统与库存系统的非直接交互;2)异步处理,如用户注册后的邮件和短信发送延迟处理,提高响应速度;3)流量削峰,如秒杀活动限制并发流量,防止系统崩溃。
|
25天前
|
消息中间件 存储 Java
RocketMQ技术详解:从基础知识到内部设计原理
RocketMQ技术详解:从基础知识到内部设计原理
36 2
|
6天前
|
消息中间件 存储 Kafka
01.RabbitMQ入门
01.RabbitMQ入门
25 0
|
2月前
|
消息中间件 存储 Apache
RocketMQ实战教程之常见概念和模型
Apache RocketMQ 实战教程介绍了其核心概念和模型。消息是基本的数据传输单元,主题是消息的分类容器,支持字节、数字和短划线命名,最长64个字符。消息类型包括普通、顺序、事务和定时/延时消息。消息队列是实际存储和传输消息的容器,是主题的分区。消费者分组是一组行为一致的消费者的逻辑集合,也有命名限制。此外,文档还提到了一些使用约束和建议,如主题和消费者组名的命名规则,消息大小限制,请求超时时间等。RocketMQ 提供了多种消息模型,包括发布/订阅模型,有助于理解和优化消息处理。
|
2月前
|
消息中间件 存储 Java
RocketMQ实战教程之NameServer与BrokerServer
这是一个关于RocketMQ实战教程的概要,主要讨论NameServer和BrokerServer的角色。NameServer负责管理所有BrokerServer,而BrokerServer存储和传输消息。生产者和消费者通过NameServer找到合适的Broker进行交互,不需要直接知道Broker的具体信息。工作流程包括生产者向NameServer查询后发送消息到Broker,以及消费者同样通过NameServer获取消息进行消费。这种设计类似于服务注册中心的概念,便于系统扩展和集群管理。
|
2月前
|
消息中间件 Cloud Native 自动驾驶
RocketMQ实战教程之MQ简介
Apache RocketMQ 是一个云原生的消息流平台,支持消息、事件和流处理,适用于云边端一体化场景。官网提供详细文档和下载资源:[RocketMQ官网](https://rocketmq.apache.org/zh/)。示例中提到了RocketMQ在物联网(如小米台灯)和自动驾驶等领域的应用。要开始使用,可从[下载页面](https://rocketmq.apache.org/zh/download)获取软件。
|
2月前
|
消息中间件 中间件 Java
RocketMQ实战教程之几种MQ优缺点以及选型
该文介绍了几种主流消息中间件,包括ActiveMQ、RabbitMQ、RocketMQ和Kafka。ActiveMQ和RabbitMQ是较老牌的选择,前者在中小企业中常见,后者因强大的并发能力和活跃社区而流行。RocketMQ是阿里巴巴的开源产品,适用于大规模分布式系统,尤其在数据可靠性方面进行了优化。Kafka最初设计用于大数据日志处理,强调高吞吐量。在选择MQ时,考虑因素包括性能、功能、开发语言、社区支持、学习难度、稳定性和集群功能。小型公司推荐使用RabbitMQ,而大型公司则可在RocketMQ和Kafka之间根据具体需求抉择。
|
20天前
|
消息中间件 存储 前端开发
RabbitMQ在Java中的完美实现:从入门到精通
本文由木头左介绍如何在Java项目中使用RabbitMQ。RabbitMQ是开源的AMQP实现,支持多种客户端,适合分布式系统中的消息传递。首先需安装Erlang和RabbitMQ,接着在Java项目中添加RabbitMQ客户端库依赖。通过创建连接工厂和连接,建立与RabbitMQ的通信,并展示了创建连接和通道的代码示例。
|
2月前
|
消息中间件 Docker 微服务
RabbitMQ入门指南(十一):延迟消息-延迟消息插件
RabbitMQ是一个高效、可靠的开源消息队列系统,广泛用于软件开发、数据传输、微服务等领域。本文主要介绍了DelayExchange插件、延迟消息插件实现延迟消息等内容。
263 0

热门文章

最新文章