开发者社区 > 云原生 > 云消息队列 > 正文

RocketMQ SimpleConsumer异步订阅消息demo

RocketMQ SimpleConsumer异步订阅消息demo

展开
收起
嘟嘟嘟嘟嘟嘟 2024-08-28 08:12:43 52 0
3 条回答
写回答
取消 提交回答
  • 根据您的问题,您希望了解如何使用RocketMQ的SimpleConsumer实现异步订阅消息的示例代码。虽然提供的知识内容没有直接包含异步订阅的Demo代码,但我们可以基于RocketMQ的API文档和常规实践来构建一个简单的异步订阅消息的示例。下面是一个简化的Java示例代码,展示如何使用SimpleConsumer(更准确地说,应使用org.apache.rocketmq.client.consumer.DefaultMQPushConsumer,因为SimpleConsumer更多指的是低级API,而异步订阅通常推荐使用PushConsumer模式)实现异步消费消息的功能。
    异步订阅消息的示例代码
    首先,确保您已添加了Apache RocketMQ客户端依赖到您的项目中。如果您使用Maven,可以在pom.xml中添加如下依赖:

    org.apache.rocketmq
    rocketmq-client
    4.9.3 
    

    接下来是异步消费消息的Java代码示例:
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
    import org.apache.rocketmq.common.message.MessageExt;
    import java.util.List;
    public class AsyncConsumerExample {
    public static void main(String[] args) throws Exception {
    // 实例化消费者
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("your_consumer_group");
    // 设置NameServer地址
    consumer.setNamesrvAddr("your_nameserver_addr");
    // 订阅主题和Tag
    consumer.subscribe("your_topic", "*");
    // 设置消费模式,如从第一条开始消费
    consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    // 注册消息监听器
    consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
    // 打印每个消息的属性和内容
    for (MessageExt msg : msgs) {
    System.out.printf("Received message: %s %n", new String(msg.getBody()));
    }
    // 返回消费状态,这里简单示例为成功消费
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
    });
    // 启动消费者
    consumer.start();
    System.out.println("Async Consumer Started.");
    }
    }
    此回答整理自钉群“群2-Apache RocketMQ 中国开发者钉钉群”

    2024-08-28 10:45:05
    赞同 22 展开评论 打赏
  • import com.aliyun.openservices.ons.api.*;
    import com.aliyun.openservices.ons.api.consumer.MessageListener;
    
    public class SimpleConsumerDemo {
        public static void main(String[] args) {
         //消费示例:使用SimpleConsumer消费普通消息,主动获取消息处理并提交。
            ClientServiceProvider provider = ClientServiceProvider.loadService();
            String topic = "Your Topic";
            FilterExpression filterExpression = new FilterExpression("Your Filter Tag", FilterExpressionType.TAG);
    
            SimpleConsumer simpleConsumer = provider.newSimpleConsumerBuilder()
                    //设置消费者分组。
                    .setConsumerGroup("Your ConsumerGroup")
                    //设置接入点。
                    .setClientConfiguration(ClientConfiguration.newBuilder().setEndpoints("Your Endpoint").build())
                    //设置预绑定的订阅关系。
                    .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
                    .build();
            List<MessageView> messageViewList = null;
            try {
                //SimpleConsumer需要主动获取消息,并处理。
                messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));
                messageViewList.forEach(messageView -> {
                    System.out.println(messageView);
                    //消费处理完成后,需要主动调用ACK提交消费结果。
                    try {
                        simpleConsumer.ack(messageView);
                    } catch (ClientException e) {
                        e.printStackTrace();
                    }
                });
            } catch (ClientException e) {
                //如果遇到系统流控等原因造成拉取失败,需要重新发起获取消息请求。
                e.printStackTrace();
            }     
    }
    }
    

    image.png
    参考链接
    https://help.aliyun.com/zh/apsaramq-for-rocketmq/cloud-message-queue-rocketmq-5-x-series/developer-reference/consumer-types
    回答不易请采纳

    2024-08-28 10:14:34
    赞同 32 展开评论 打赏
  • SimpleConsumer
    SimpleConsumer是一种接口原子型的消费者类型,消息的获取、消费状态提交以及消费重试都是通过消费者业务逻辑主动发起调用完成。

    使用方式
    SimpleConsumer的使用涉及多个接口调用,由业务逻辑按需调用接口获取消息,然后分发给业务线程处理消息,最后按照处理的结果调用提交接口,返回服务端当前消息的处理结果。示例如下:

            ClientServiceProvider provider = ClientServiceProvider.loadService();
            String topic = "Your Topic";
            FilterExpression filterExpression = new FilterExpression("Your Filter Tag", FilterExpressionType.TAG);
    
            SimpleConsumer simpleConsumer = provider.newSimpleConsumerBuilder()
                    //设置消费者分组。
                    .setConsumerGroup("Your ConsumerGroup")
                    //设置接入点。
                    .setClientConfiguration(ClientConfiguration.newBuilder().setEndpoints("Your Endpoint").build())
                    //设置预绑定的订阅关系。
                    .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
                    .build();
            List<MessageView> messageViewList = null;
            try {
                //SimpleConsumer需要主动获取消息,并处理。
                messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));
                messageViewList.forEach(messageView -> {
                    System.out.println(messageView);
                    //消费处理完成后,需要主动调用ACK提交消费结果。
                    try {
                        simpleConsumer.ack(messageView);
                    } catch (ClientException e) {
                        e.printStackTrace();
                    }
                });
            } catch (ClientException e) {
                //如果遇到系统流控等原因造成拉取失败,需要重新发起获取消息请求。
                e.printStackTrace();
            }
    

    image.png
    参考文档https://help.aliyun.com/zh/apsaramq-for-rocketmq/cloud-message-queue-rocketmq-5-x-series/developer-reference/consumer-types?spm=a2c4g.11186623.0.i28

    2024-08-28 08:47:15
    赞同 26 展开评论 打赏

涵盖 RocketMQ、Kafka、RabbitMQ、MQTT、轻量消息队列(原MNS) 的消息队列产品体系,全系产品 Serverless 化。RocketMQ 一站式学习:https://rocketmq.io/

相关产品

  • 云消息队列 MQ
  • 相关电子书

    更多
    RocketMQ Client-GO 介绍 立即下载
    RocketMQ Prometheus Exporter 打造定制化 DevOps 平台 立即下载
    基于 RocketMQ Prometheus Exporter 打造定制化 DevOps 平台 立即下载