Java整合RocketMQ实现生产消费

简介: Java整合RocketMQ实现生产消费

参考文档

RocketMQ作为阿里系开源项目,有非常成熟的中文文档可以快速了解并上手。

环境搭建

  1. 创建Maven项目。
  2. pom.xml文件中引入RocketMQ依赖。
<dependencies>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.8.0</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.13.2</version>
        </dependency>
</dependencies>

生产者

普通消息

RocketMQ可用于以三种方式发送消息:同步、异步和单向传输。前两种消息类型是可靠的,因为无论它们是否成功发送都有响应。

同步发送

在这里插入图片描述

    private final static String nameServer = "127.0.0.1:9876";

    private final static String producerGroup = "my_group";

    private final static String topic = "topic-test";
@Test
    public void syncSend() {
        try {
            // 初始化一个producer并设置Producer group name
            DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
            // 设置NameServer地址
            producer.setNamesrvAddr(nameServer);
            // 启动producer
            producer.start();
            // 创建一条消息,并指定topic、tag、body等信息,tag可以理解成标签,对消息进行再归类,RocketMQ可以在消费端对tag进行过滤
            Message msg = new Message(topic, "tagA", "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));
            // 利用producer进行发送,并同步等待发送结果
            SendResult sendResult = producer.send(msg, 10000);
            System.out.printf("%s%n", sendResult);
            // 一旦producer不再使用,关闭producer
            producer.shutdown();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

异步发送

在这里插入图片描述

@Test
    public void asyncSend() throws IOException {
        try {
            // 初始化一个producer并设置Producer group name
            DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
            // 设置NameServer地址
            producer.setNamesrvAddr(nameServer);
            // 启动producer
            producer.start();
            // 创建一条消息,并指定topic、tag、body等信息,tag可以理解成标签,对消息进行再归类,RocketMQ可以在消费端对tag进行过滤
            Message msg = new Message(topic, "tagB", "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));
            // 异步发送消息, 发送结果通过callback返回给客户端
            producer.send(msg, new SendCallback() {
                public void onSuccess(SendResult sendResult) {
                    System.out.printf("OK %s %n",
                            sendResult.getMsgId());
                }

                public void onException(Throwable e) {
                    System.out.printf("Exception %s %n", e);
                    e.printStackTrace();
                }
            },10000);
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.in.read();
    }

单向传输

在这里插入图片描述

@Test
    public void onewaySend()  {
        try {
            // 初始化一个producer并设置Producer group name
            DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
            // 设置NameServer地址
            producer.setNamesrvAddr(nameServer);
            producer.setSendMsgTimeout(10000);
            // 启动producer
            producer.start();
            // 创建一条消息,并指定topic、tag、body等信息,tag可以理解成标签,对消息进行再归类,RocketMQ可以在消费端对tag进行过滤
            Message msg = new Message(topic, "tagB", "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));
            // 异步发送消息, 发送结果通过callback返回给客户端
            producer.sendOneway(msg);
            // 一旦producer不再使用,关闭producer
            //producer.shutdown();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

顺序消息

RocketMQ 通过生产者和服务端的协议保障单个生产者串行地发送消息,并按序存储和持久化。如需保证消息生产的顺序性,则必须满足以下条件:

  • 单一生产者: 消息生产的顺序性仅支持单一生产者,不同生产者分布在不同的系统,即使设置相同的分区键,不同生产者之间产生的消息也无法判定其先后顺序。
  • 串行发送:生产者客户端支持多线程安全访问,但如果生产者使用多线程并行发送,则不同线程间产生的消息将无法判定其先后顺序。
@Test
    public void orderSend() {
        try {
            // 初始化一个producer并设置Producer group name
            DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
            // 设置NameServer地址
            producer.setNamesrvAddr(nameServer);
            producer.setSendMsgTimeout(10000);
            // 启动producer
            producer.start();
            String[] tags = new String[]{"TagA", "TagB", "TagC"};
            for (int i = 0; i < 10; i++) {
                int orderId = i % 10;
                Message msg = new Message(topic, tags[i % tags.length], "KEY" + i, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                        Integer id = (Integer) arg;
                        int index = id % mqs.size();
                        return mqs.get(index);
                    }
                }, orderId);
                System.out.printf("%s%n", sendResult);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

延迟消息

延迟消息发送是指消息发送到RocketMQ后,并不期望立马投递这条消息,而是延迟一定时间后才投递到Consumer进行消费。
使用 RocketMQ 的延时消息可以简化定时调度任务的开发逻辑,实现高性能、可扩展、高可靠的定时触发能力。
RocketMQ 一共支持18个等级的延迟投递,具体时间如下:

投递等级 延迟时间 投递等级 延迟时间
1 1s 10 6min
2 5s 11 7min
3 10s 12 8min
4 30s 13 9min
5 1min 14 10min
6 2min 15 20min
7 3min 16 30min
8 4min 17 1h
9 5min 18 2h
@Test
    public void scheduledSend() {
        try {
            // 初始化一个producer并设置Producer group name
            DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
            // 设置NameServer地址
            producer.setNamesrvAddr(nameServer);
            // 启动producer
            producer.start();
            // 创建一条消息,并指定topic、tag、body等信息,tag可以理解成标签,对消息进行再归类,RocketMQ可以在消费端对tag进行过滤
            Message msg = new Message(topic, "tagA", "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));
            // 消息延迟等级
            msg.setDelayTimeLevel(2);
            // 利用producer进行发送,并同步等待发送结果
            SendResult sendResult = producer.send(msg, 10000);
            System.out.printf("%s%n", sendResult);
            // 一旦producer不再使用,关闭producer
            producer.shutdown();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

批量消息

在对吞吐率有一定要求的情况下,Apache RocketMQ可以将一些消息聚成一批以后进行发送,可以增加吞吐率,并减少API和网络调用次数。

 @Test
    public void batchSend() {
        try {
            // 初始化一个producer并设置Producer group name
            DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
            // 设置NameServer地址
            producer.setNamesrvAddr(nameServer);
            // 启动producer
            producer.start();
            List<Message> messages = new ArrayList<Message>();
            messages.add(new Message(topic, "Tag", "Order001", "Hello world 0".getBytes()));
            messages.add(new Message(topic, "Tag", "Order002", "Hello world 1".getBytes()));
            messages.add(new Message(topic, "Tag", "Order003", "Hello world 2".getBytes()));
            // 利用producer进行发送,并同步等待发送结果
            SendResult sendResult = producer.send(messages, 10000);
            System.out.printf("%s%n", sendResult);
            // 一旦producer不再使用,关闭producer
            producer.shutdown();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

事务消息

在一些对数据一致性有强需求的场景,可以用RocketMQ 事务消息来解决,从而保证上下游数据的一致性。

基于 RocketMQ 的分布式事务消息功能,在普通消息基础上,支持二阶段的提交能力。将二阶段提交和本地事务绑定,实现全局提交结果的一致性。

第一阶段会发送一个半事务消息,半事务消息是指暂不能投递的消息。
如果发送成功则执行本地事务,并根据本地事务执行成功与否,向 Broker 半事务消息状态(commit或者rollback)。
半事务消息只有 commit 状态才会真正向下游投递。
如果由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,Broker 端会通过扫描发现某条消息长期处于“半事务消息”时,需要主动向消息生产者询问该消息的最终状态(Commit或是Rollback)。这样最终保证了本地事务执行成功,下游就能收到消息,本地事务执行失败,下游就收不到消息。

事务消息的详细交互流程如下图所示:
在这里插入图片描述

 @Test
    public void transactionSend() {
        try {
            // 事务消息的发送不再使用 DefaultMQProducer,而是使用 TransactionMQProducer 进行发送
            TransactionMQProducer producer = new TransactionMQProducer(producerGroup);
            // 设置NameServer地址
            producer.setNamesrvAddr(nameServer);
            // 事务回查的线程池
            ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
                public Thread newThread(Runnable r) {
                    Thread thread = new Thread(r);
                    thread.setName("client-transaction-msg-check-thread");
                    return thread;
                }
            });

            producer.setExecutorService(executorService);
            producer.setTransactionListener(new TransactionListener() {
                //半事务消息发送成功后,执行本地事务的方法
                public LocalTransactionState executeLocalTransaction(Message msg, Object o) {
                    System.out.printf("执行本地事务 %n");
                    /*
                    二次确认
                    LocalTransactionState.COMMIT_MESSAGE:提交事务,允许消费者消费该消息
                    LocalTransactionState.ROLLBACK_MESSAGE:回滚事务,消息将被丢弃不允许消费。
                    LocalTransactionState.UNKNOW:暂时无法判断状态,等待固定时间以后Broker端根据回查规则向生产者进行消息回查。
                    */
                    return LocalTransactionState.UNKNOW;
                }

                // 二次确认消息没有收到,Broker端回查事务状态的方法,默认60s
                public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                    System.out.printf("二次确认失败,broker事务回查  %n");
                    return LocalTransactionState.COMMIT_MESSAGE;
                }
            });
            producer.setSendMsgTimeout(10000);
            // 启动producer
            producer.start();
            Message msg = new Message(topic, "tagA", "Hello RocketMQ".getBytes(RemotingHelper.DEFAULT_CHARSET));
            // 利用producer进行发送事务消息,并同步等待发送结果
            SendResult sendResult = producer.sendMessageInTransaction(msg, null);
            System.out.printf("%s%n", sendResult);
            // 一旦producer不再使用,关闭producer
            System.in.read();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

消费者

MQ的消费模式可以大致分为两种,一种是推Push,一种是拉Pull。

Push消费

Push是服务端主动推送消息给客户端,优点是及时性较好,但如果客户端没有做好流控,一旦服务端推送大量消息到客户端时,就会导致客户端消息堆积甚至崩溃。

    private final static String nameServer = "127.0.0.1:9876";

    private final static String consumerGroup = "my_group";

    private final static String topic = "topic-test";


    @Test
    public void consumerPush() throws MQClientException, IOException {
        // 初始化consumer,并设置consumer group name
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
        // 设置NameServer地址
        consumer.setNamesrvAddr(nameServer);
        // 订阅一个或多个topic,并指定tag过滤条件,这里指定*表示接收所有tag的消息
        consumer.subscribe(topic, "*");
        //设置采用广播模式,广播模式下,消费组内的每一个消费者都会消费全量消息。
        //consumer.setMessageModel(MessageModel.BROADCASTING);
        //注册回调接口来处理从Broker中收到的消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                // 返回消息消费状态,ConsumeConcurrentlyStatus.CONSUME_SUCCESS为消费成功
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 顺序消费
//        consumer.registerMessageListener(new MessageListenerOrderly() {
//            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
//                return ConsumeOrderlyStatus.SUCCESS;
//            }
//        });
        // 启动Consumer
        consumer.start();
        System.out.printf("Consumer Started.%n");
        System.in.read();
    }

Pull 消费

Pull是客户端需要主动到服务端取数据,优点是客户端可以依据自己的消费能力进行消费,但拉取的频率也需要用户自己控制,拉取频繁容易造成服务端和客户端的压力,拉取间隔长又容易造成消费不及时。

@Test
    public void consumerPull() {
        try {
            DefaultLitePullConsumer consumer = new DefaultLitePullConsumer(consumerGroup);
            consumer.setNamesrvAddr(nameServer);
            //关闭自动提交
            consumer.setAutoCommit(false);
            consumer.subscribe(topic, "*");
            consumer.setPullBatchSize(20);
            consumer.start();
            while (true) {
                List<MessageExt> messageExts = consumer.poll();
                System.out.printf("%s%n", messageExts);
                consumer.commitSync();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
21天前
|
存储 消息中间件 安全
JUC组件实战:实现RRPC(Java与硬件通过MQTT的同步通信)
【10月更文挑战第9天】本文介绍了如何利用JUC组件实现Java服务与硬件通过MQTT的同步通信(RRPC)。通过模拟MQTT通信流程,使用`LinkedBlockingQueue`作为消息队列,详细讲解了消息发送、接收及响应的同步处理机制,包括任务超时处理和内存泄漏的预防措施。文中还提供了具体的类设计和方法实现,帮助理解同步通信的内部工作原理。
JUC组件实战:实现RRPC(Java与硬件通过MQTT的同步通信)
|
5月前
|
消息中间件 网络协议 RocketMQ
消息队列 MQ产品使用合集之broker开启proxy,启动之后producer生产消息始终都只到一个broker,该怎么办
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
1月前
|
消息中间件 存储 JSON
rabbitmq基础教程(ui,java,springamqp)
本文提供了RabbitMQ的基础教程,包括如何使用UI创建队列和交换机、Java代码操作RabbitMQ、Spring AMQP进行消息发送和接收,以及如何使用不同的交换机类型(fanout、direct、topic)进行消息路由。
22 0
rabbitmq基础教程(ui,java,springamqp)
|
29天前
|
消息中间件 前端开发 Java
java高并发场景RabbitMQ的使用
java高并发场景RabbitMQ的使用
71 0
|
2月前
|
消息中间件 缓存 Java
RocketMQ的JAVA落地实战
RocketMQ作为一款高性能、高可靠、高实时、分布式特点的消息中间件,其核心作用主要体现在异步处理、削峰填谷以及系统解耦三个方面。
148 0
|
3月前
|
Java
MQTT(EMQX) - Java 调用 MQTT Demo 代码
MQTT(EMQX) - Java 调用 MQTT Demo 代码
139 0
MQTT(EMQX) - Java 调用 MQTT Demo 代码
|
4月前
|
消息中间件 Java Maven
如何在Java中使用RabbitMQ
如何在Java中使用RabbitMQ
|
4月前
|
存储 设计模式 监控
Java面试题:如何在不牺牲性能的前提下,实现一个线程安全的单例模式?如何在生产者-消费者模式中平衡生产和消费的速度?Java内存模型规定了变量在内存中的存储和线程间的交互规则
Java面试题:如何在不牺牲性能的前提下,实现一个线程安全的单例模式?如何在生产者-消费者模式中平衡生产和消费的速度?Java内存模型规定了变量在内存中的存储和线程间的交互规则
47 0
|
4月前
|
消息中间件 负载均衡 Java
JAVA面试之MQ
JAVA面试之MQ
67 0
|
4月前
|
消息中间件 Java Maven
如何在Java中使用RabbitMQ
如何在Java中使用RabbitMQ