阿里云Rocket MQ Java Http SDK发送消费消息示例Demo

简介: 消息队列 RocketMQ 版支持 RESTful 风格的 HTTP 协议通信,并提供了以下 7 种语言的 SDK,下面以最新的Java Http SDK为范例介绍RocketMQ消息的发送和接收。

Step By Step

1、创建实例,登陆阿里云控制台
图片.png
图片.png

2、实例下面分别创建Topic和Http
Group
图片.png
图片.png

3、pom.xml

        <dependency>
            <groupId>com.aliyun.mq</groupId>
            <artifactId>mq-http-sdk</artifactId>
            <version>1.0.2</version>
        </dependency>

4、Producer Code Sample

import com.aliyun.mq.http.MQClient;
import com.aliyun.mq.http.MQProducer;
import com.aliyun.mq.http.model.TopicMessage;
import java.util.Date;

public class ProducerDemo {
   

    public static void main(String[] args) {
   
        MQClient mqClient = new MQClient(
                // 设置HTTP接入域名(此处以公共云生产环境为例)
                "http://18482178********.mqrest.cn-shanghai.aliyuncs.com",
                // AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
                "LTAIOZZg********",
                // SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
                "v7CjUJCMk7j9aK****************"
        );

        // 所属的 Topic
        final String topic = "****";
        // Topic所属实例ID,默认实例为空
        final String instanceId = "MQ_INST_********";

        // 获取Topic的生产者
        MQProducer producer;
        if (instanceId != null && instanceId != "") {
   
            producer = mqClient.getProducer(instanceId, topic);
        } else {
   
            producer = mqClient.getProducer(topic);
        }

        try {
   
            // 循环发送40条消息
            for (int i = 0; i < 40; i++) {
   
                TopicMessage pubMsg;
                if (i % 2 == 0) {
   
                    // 普通消息
                    pubMsg = new TopicMessage(
                            // 消息内容
                            "hello common mq!".getBytes(),
                            // 消息标签
                            "A"
                    );
                    // 设置属性
                    pubMsg.getProperties().put("a", String.valueOf(i));
                    // 设置KEY
                    pubMsg.setMessageKey("MessageKey");
                } else {
   
                    pubMsg = new TopicMessage(
                            // 消息内容
                            "hello delay mq!".getBytes(),
                            // 消息标签
                            "B"
                    );
                    // 设置属性
                    pubMsg.getProperties().put("b", String.valueOf(i));
                    // 定时消息, 定时时间为10s后
                    pubMsg.setStartDeliverTime(System.currentTimeMillis() + 10 * 1000);
                }
                // 同步发送消息,只要不抛异常就是成功
                TopicMessage pubResultMsg = producer.publishMessage(pubMsg);

                // 同步发送消息,只要不抛异常就是成功
                System.out.println(new Date() + " Send mq message success. Topic is:" + topic + ", msgId is: " + pubResultMsg.getMessageId()
                        + ", bodyMD5 is: " + pubResultMsg.getMessageBodyMD5());
            }
        } catch (Throwable e) {
   
            // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理
            System.out.println(new Date() + " Send mq message failed. Topic is:" + topic);
            e.printStackTrace();
        }

        mqClient.close();
    }
}

5、Consumer Code Sample

import com.aliyun.mq.http.MQClient;
import com.aliyun.mq.http.MQConsumer;
import com.aliyun.mq.http.model.Message;
import java.util.*;

public class ConsumerDemo {
   

    public static void main(String[] args) {
   

        MQClient mqClient = new MQClient(
                // 设置HTTP接入域名(此处以公共云生产环境为例)
                "http://18482178********.mqrest.cn-shanghai.aliyuncs.com",
                // AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
                "LTAIOZZg********",
                // SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
                "v7CjUJCMk7j9aK****************"
        );

        String topicName = "****";
        String consumer = "GID_****"; //Http Consumer Group Name
        String messageTag =""; // Tag,为空表示订阅全部Tag
        String instanceId = "MQ_INST_******";

        MQConsumer mqConsumer = mqClient.getConsumer(instanceId,topicName, consumer,messageTag);

        while(true) {
   
            try {
   

                // 消费消息,轮训时间设置为3秒,一次至多拉去三条消息
                List<Message> listMessage = mqConsumer.consumeMessage(3, 3);

                if (listMessage == null || listMessage.size() == 0) {
   
                    System.out.println("Message is not exist!");
                } else {
   
                    List<String> receiptHandles = new ArrayList<String>();
                    for (Message message : listMessage
                    ) {
   
                        System.out.println("MessageBody" + message.getMessageBodyString());
                        receiptHandles.add(message.getReceiptHandle());
                    }
                    // 回调删除
                    mqConsumer.ackMessage(receiptHandles);
                }
            }catch (Exception ex)
            {
   
                System.out.println("error:" + ex.getMessage());
                mqClient.close();
            }
        }
    }
}
相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
JSON Java Apache
非常实用的Http应用框架,杜绝Java Http 接口对接繁琐编程
UniHttp 是一个声明式的 HTTP 接口对接框架,帮助开发者快速对接第三方 HTTP 接口。通过 @HttpApi 注解定义接口,使用 @GetHttpInterface 和 @PostHttpInterface 等注解配置请求方法和参数。支持自定义代理逻辑、全局请求参数、错误处理和连接池配置,提高代码的内聚性和可读性。
602 3
|
Java 开发工具
【Azure Developer】Azure Graph SDK获取用户列表的问题: SDK中GraphServiceClient如何指向中国区的Endpoint:https://microsoftgraph.chinacloudapi.cn/v1.0
【Azure Developer】Azure Graph SDK获取用户列表的问题: SDK中GraphServiceClient如何指向中国区的Endpoint:https://microsoftgraph.chinacloudapi.cn/v1.0
182 0
|
12月前
|
弹性计算 安全 开发工具
灵码评测-阿里云提供的ECS python3 sdk做安全组管理
批量变更阿里云ECS安全组策略(批量变更)
|
程序员 开发工具 Android开发
Android|使用阿里云推流 SDK 实现双路推流不同画面
本文记录了一种使用没有原生支持多路推流的阿里云推流 Android SDK,实现同时推送两路不同画面的流的方法。
255 7
|
JavaScript 安全 Java
谈谈UDP、HTTP、SSL、TLS协议在java中的实际应用
下面我将详细介绍UDP、HTTP、SSL、TLS协议及其工作原理,并提供Java代码示例(由于Deno是一个基于Node.js的运行时,Java代码无法直接在Deno中运行,但可以通过理解Java示例来类比Deno中的实现)。
257 1
|
Java 开发工具
通过Java SDK调用阿里云模型服务
在阿里云平台上,可以通过创建应用并使用模型服务完成特定任务,如生成文章内容。本示例展示了一段简化的Java代码,演示了如何调用阿里云模型服务生成关于“春秋战国经济与文化”的简短文章。示例代码通过设置系统角色为历史学家,并提出文章生成需求,最终处理并输出生成的文章内容。在实际部署前,请确保正确配置环境变量中的密钥和ID,并根据需要调整SDK导入语句及类名。更多详情和示例,请参考相关链接。
|
API
使用`System.Net.WebClient`类发送HTTP请求来调用阿里云短信API
使用`System.Net.WebClient`类发送HTTP请求来调用阿里云短信API
233 0
|
消息中间件 分布式计算 DataWorks
DataWorks产品使用合集之如何使用Python和阿里云SDK读取OSS中的文件
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
|
安全 网络协议 Java
Java中的网络通信:HTTP详解
Java中的网络通信:HTTP详解
|
API 开发工具 Python
【Azure Developer】使用 Azure Python SDK时,遇见 The resource principal named https://management.azure.com was not found in the tenant China Azure问题的解决办法
【Azure Developer】使用 Azure Python SDK时,遇见 The resource principal named https://management.azure.com was not found in the tenant China Azure问题的解决办法
152 0