阿里云Rocket MQ Java Http SDK发送消费消息示例Demo

简介: 消息队列 RocketMQ 版支持 RESTful 风格的 HTTP 协议通信,并提供了以下 7 种语言的 SDK,下面以最新的Java Http SDK为范例介绍RocketMQ消息的发送和接收。

Step By Step

1、创建实例,登陆阿里云控制台
图片.png
图片.png

2、实例下面分别创建Topic和Http
Group
图片.png
图片.png

3、pom.xml

        <dependency>
            <groupId>com.aliyun.mq</groupId>
            <artifactId>mq-http-sdk</artifactId>
            <version>1.0.2</version>
        </dependency>

4、Producer Code Sample

import com.aliyun.mq.http.MQClient;
import com.aliyun.mq.http.MQProducer;
import com.aliyun.mq.http.model.TopicMessage;
import java.util.Date;

public class ProducerDemo {
   

    public static void main(String[] args) {
   
        MQClient mqClient = new MQClient(
                // 设置HTTP接入域名(此处以公共云生产环境为例)
                "http://18482178********.mqrest.cn-shanghai.aliyuncs.com",
                // AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
                "LTAIOZZg********",
                // SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
                "v7CjUJCMk7j9aK****************"
        );

        // 所属的 Topic
        final String topic = "****";
        // Topic所属实例ID,默认实例为空
        final String instanceId = "MQ_INST_********";

        // 获取Topic的生产者
        MQProducer producer;
        if (instanceId != null && instanceId != "") {
   
            producer = mqClient.getProducer(instanceId, topic);
        } else {
   
            producer = mqClient.getProducer(topic);
        }

        try {
   
            // 循环发送40条消息
            for (int i = 0; i < 40; i++) {
   
                TopicMessage pubMsg;
                if (i % 2 == 0) {
   
                    // 普通消息
                    pubMsg = new TopicMessage(
                            // 消息内容
                            "hello common mq!".getBytes(),
                            // 消息标签
                            "A"
                    );
                    // 设置属性
                    pubMsg.getProperties().put("a", String.valueOf(i));
                    // 设置KEY
                    pubMsg.setMessageKey("MessageKey");
                } else {
   
                    pubMsg = new TopicMessage(
                            // 消息内容
                            "hello delay mq!".getBytes(),
                            // 消息标签
                            "B"
                    );
                    // 设置属性
                    pubMsg.getProperties().put("b", String.valueOf(i));
                    // 定时消息, 定时时间为10s后
                    pubMsg.setStartDeliverTime(System.currentTimeMillis() + 10 * 1000);
                }
                // 同步发送消息,只要不抛异常就是成功
                TopicMessage pubResultMsg = producer.publishMessage(pubMsg);

                // 同步发送消息,只要不抛异常就是成功
                System.out.println(new Date() + " Send mq message success. Topic is:" + topic + ", msgId is: " + pubResultMsg.getMessageId()
                        + ", bodyMD5 is: " + pubResultMsg.getMessageBodyMD5());
            }
        } catch (Throwable e) {
   
            // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理
            System.out.println(new Date() + " Send mq message failed. Topic is:" + topic);
            e.printStackTrace();
        }

        mqClient.close();
    }
}

5、Consumer Code Sample

import com.aliyun.mq.http.MQClient;
import com.aliyun.mq.http.MQConsumer;
import com.aliyun.mq.http.model.Message;
import java.util.*;

public class ConsumerDemo {
   

    public static void main(String[] args) {
   

        MQClient mqClient = new MQClient(
                // 设置HTTP接入域名(此处以公共云生产环境为例)
                "http://18482178********.mqrest.cn-shanghai.aliyuncs.com",
                // AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
                "LTAIOZZg********",
                // SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
                "v7CjUJCMk7j9aK****************"
        );

        String topicName = "****";
        String consumer = "GID_****"; //Http Consumer Group Name
        String messageTag =""; // Tag,为空表示订阅全部Tag
        String instanceId = "MQ_INST_******";

        MQConsumer mqConsumer = mqClient.getConsumer(instanceId,topicName, consumer,messageTag);

        while(true) {
   
            try {
   

                // 消费消息,轮训时间设置为3秒,一次至多拉去三条消息
                List<Message> listMessage = mqConsumer.consumeMessage(3, 3);

                if (listMessage == null || listMessage.size() == 0) {
   
                    System.out.println("Message is not exist!");
                } else {
   
                    List<String> receiptHandles = new ArrayList<String>();
                    for (Message message : listMessage
                    ) {
   
                        System.out.println("MessageBody" + message.getMessageBodyString());
                        receiptHandles.add(message.getReceiptHandle());
                    }
                    // 回调删除
                    mqConsumer.ackMessage(receiptHandles);
                }
            }catch (Exception ex)
            {
   
                System.out.println("error:" + ex.getMessage());
                mqClient.close();
            }
        }
    }
}
相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
11月前
|
监控 物联网 网络性能优化
【杂谈】-MQTT与HTTP在物联网中的比较:为什么MQTT是更好的选择
通过上述分析,可以看出MQTT在物联网应用中的确是更好的选择。其高效的通信模型、低带宽消耗、稳定的连接保持机制以及可靠的消息质量保证,使其在各种物联网场景中都能表现出色。开发者在设计和实现物联网系统时,应优先考虑采用MQTT协议,以充分发挥其在资源受限环境下的优势,提升系统的整体性能和可靠性。
2223 26
|
安全 API 持续交付
阿里云云效产品使用问题之如何从流水线访问内网平台的HTTP接口
云效作为一款全面覆盖研发全生命周期管理的云端效能平台,致力于帮助企业实现高效协同、敏捷研发和持续交付。本合集收集整理了用户在使用云效过程中遇到的常见问题,问题涉及项目创建与管理、需求规划与迭代、代码托管与版本控制、自动化测试、持续集成与发布等方面。
|
存储 消息中间件 安全
JUC组件实战:实现RRPC(Java与硬件通过MQTT的同步通信)
【10月更文挑战第9天】本文介绍了如何利用JUC组件实现Java服务与硬件通过MQTT的同步通信(RRPC)。通过模拟MQTT通信流程,使用`LinkedBlockingQueue`作为消息队列,详细讲解了消息发送、接收及响应的同步处理机制,包括任务超时处理和内存泄漏的预防措施。文中还提供了具体的类设计和方法实现,帮助理解同步通信的内部工作原理。
JUC组件实战:实现RRPC(Java与硬件通过MQTT的同步通信)
|
消息中间件 存储 Java
RocketMQ文件刷盘机制深度解析与Java模拟实现
【11月更文挑战第22天】在现代分布式系统中,消息队列(Message Queue, MQ)作为一种重要的中间件,扮演着连接不同服务、实现异步通信和消息解耦的关键角色。Apache RocketMQ作为一款高性能的分布式消息中间件,广泛应用于实时数据流处理、日志流处理等场景。为了保证消息的可靠性,RocketMQ引入了一种称为“刷盘”的机制,将消息从内存写入到磁盘中,确保消息持久化。本文将从底层原理、业务场景、概念、功能点等方面深入解析RocketMQ的文件刷盘机制,并使用Java模拟实现类似的功能。
376 3
|
消息中间件 测试技术
通过轻量消息队列(原MNS)主题HTTP订阅+ARMS实现自定义数据多渠道告警
轻量消息队列(原MNS)以其简单队列模型、轻量化协议及按量后付费模式,成为阿里云产品间消息传输首选。本文通过创建主题、订阅、配置告警集成等步骤,展示了该产品在实际应用中的部分功能,确保消息的可靠传输。
294 2
|
网络协议 物联网 网络性能优化
物联网协议比较 MQTT CoAP RESTful/HTTP XMPP
【10月更文挑战第18天】本文介绍了物联网领域中四种主要的通信协议:MQTT、CoAP、RESTful/HTTP和XMPP,分别从其特点、应用场景及优缺点进行了详细对比,并提供了简单的示例代码。适合开发者根据具体需求选择合适的协议。
602 5
|
消息中间件 存储 JSON
rabbitmq基础教程(ui,java,springamqp)
本文提供了RabbitMQ的基础教程,包括如何使用UI创建队列和交换机、Java代码操作RabbitMQ、Spring AMQP进行消息发送和接收,以及如何使用不同的交换机类型(fanout、direct、topic)进行消息路由。
222 0
rabbitmq基础教程(ui,java,springamqp)
|
网络协议 物联网 网络性能优化
物联网江湖风云变幻!MQTT CoAP RESTful/HTTP XMPP四大门派谁主沉浮?
【9月更文挑战第3天】物联网(IoT)的兴起催生了多种通信协议,如MQTT、CoAP、RESTful/HTTP和XMPP,各自适用于不同场景。本文将对比这些协议的特点、优缺点,并提供示例代码。MQTT轻量级且支持QoS,适合大规模部署;CoAP基于UDP,适用于低功耗网络;RESTful/HTTP易于集成但不适合资源受限设备;XMPP支持双向通信,适合复杂交互应用。通过本文,开发者可更好地选择合适的物联网通信协议。
324 2
|
网络协议 物联网 网络性能优化
物联网江湖风云变幻!MQTT CoAP RESTful/HTTP XMPP四大门派谁主沉浮?
【8月更文挑战第14天】本文概览了MQTT、CoAP、RESTful/HTTP及XMPP四种物联网通信协议。MQTT采用发布/订阅模式,轻量高效;CoAP针对资源受限设备,基于UDP,低延迟;RESTful/HTTP易于集成现有Web基础设施;XMPP支持双向通信,扩展性强。每种协议均附有示例代码,助您根据不同场景和设备特性作出最佳选择。
237 5
|
消息中间件 前端开发 Java
java高并发场景RabbitMQ的使用
java高并发场景RabbitMQ的使用
444 0