阿里云Rocket MQ Java Http SDK发送消费消息示例Demo

简介: 消息队列 RocketMQ 版支持 RESTful 风格的 HTTP 协议通信,并提供了以下 7 种语言的 SDK,下面以最新的Java Http SDK为范例介绍RocketMQ消息的发送和接收。

Step By Step

1、创建实例,登陆阿里云控制台
图片.png
图片.png

2、实例下面分别创建Topic和Http
Group
图片.png
图片.png

3、pom.xml

        <dependency>
            <groupId>com.aliyun.mq</groupId>
            <artifactId>mq-http-sdk</artifactId>
            <version>1.0.2</version>
        </dependency>

4、Producer Code Sample

import com.aliyun.mq.http.MQClient;
import com.aliyun.mq.http.MQProducer;
import com.aliyun.mq.http.model.TopicMessage;
import java.util.Date;

public class ProducerDemo {
   

    public static void main(String[] args) {
   
        MQClient mqClient = new MQClient(
                // 设置HTTP接入域名(此处以公共云生产环境为例)
                "http://18482178********.mqrest.cn-shanghai.aliyuncs.com",
                // AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
                "LTAIOZZg********",
                // SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
                "v7CjUJCMk7j9aK****************"
        );

        // 所属的 Topic
        final String topic = "****";
        // Topic所属实例ID,默认实例为空
        final String instanceId = "MQ_INST_********";

        // 获取Topic的生产者
        MQProducer producer;
        if (instanceId != null && instanceId != "") {
   
            producer = mqClient.getProducer(instanceId, topic);
        } else {
   
            producer = mqClient.getProducer(topic);
        }

        try {
   
            // 循环发送40条消息
            for (int i = 0; i < 40; i++) {
   
                TopicMessage pubMsg;
                if (i % 2 == 0) {
   
                    // 普通消息
                    pubMsg = new TopicMessage(
                            // 消息内容
                            "hello common mq!".getBytes(),
                            // 消息标签
                            "A"
                    );
                    // 设置属性
                    pubMsg.getProperties().put("a", String.valueOf(i));
                    // 设置KEY
                    pubMsg.setMessageKey("MessageKey");
                } else {
   
                    pubMsg = new TopicMessage(
                            // 消息内容
                            "hello delay mq!".getBytes(),
                            // 消息标签
                            "B"
                    );
                    // 设置属性
                    pubMsg.getProperties().put("b", String.valueOf(i));
                    // 定时消息, 定时时间为10s后
                    pubMsg.setStartDeliverTime(System.currentTimeMillis() + 10 * 1000);
                }
                // 同步发送消息,只要不抛异常就是成功
                TopicMessage pubResultMsg = producer.publishMessage(pubMsg);

                // 同步发送消息,只要不抛异常就是成功
                System.out.println(new Date() + " Send mq message success. Topic is:" + topic + ", msgId is: " + pubResultMsg.getMessageId()
                        + ", bodyMD5 is: " + pubResultMsg.getMessageBodyMD5());
            }
        } catch (Throwable e) {
   
            // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理
            System.out.println(new Date() + " Send mq message failed. Topic is:" + topic);
            e.printStackTrace();
        }

        mqClient.close();
    }
}

5、Consumer Code Sample

import com.aliyun.mq.http.MQClient;
import com.aliyun.mq.http.MQConsumer;
import com.aliyun.mq.http.model.Message;
import java.util.*;

public class ConsumerDemo {
   

    public static void main(String[] args) {
   

        MQClient mqClient = new MQClient(
                // 设置HTTP接入域名(此处以公共云生产环境为例)
                "http://18482178********.mqrest.cn-shanghai.aliyuncs.com",
                // AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
                "LTAIOZZg********",
                // SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
                "v7CjUJCMk7j9aK****************"
        );

        String topicName = "****";
        String consumer = "GID_****"; //Http Consumer Group Name
        String messageTag =""; // Tag,为空表示订阅全部Tag
        String instanceId = "MQ_INST_******";

        MQConsumer mqConsumer = mqClient.getConsumer(instanceId,topicName, consumer,messageTag);

        while(true) {
   
            try {
   

                // 消费消息,轮训时间设置为3秒,一次至多拉去三条消息
                List<Message> listMessage = mqConsumer.consumeMessage(3, 3);

                if (listMessage == null || listMessage.size() == 0) {
   
                    System.out.println("Message is not exist!");
                } else {
   
                    List<String> receiptHandles = new ArrayList<String>();
                    for (Message message : listMessage
                    ) {
   
                        System.out.println("MessageBody" + message.getMessageBodyString());
                        receiptHandles.add(message.getReceiptHandle());
                    }
                    // 回调删除
                    mqConsumer.ackMessage(receiptHandles);
                }
            }catch (Exception ex)
            {
   
                System.out.println("error:" + ex.getMessage());
                mqClient.close();
            }
        }
    }
}
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
6月前
|
存储 Java API
【Azure 存储服务】Java Storage SDK 调用 uploadWithResponse 代码示例(询问ChatGTP得代码原型后人力验证)
【Azure 存储服务】Java Storage SDK 调用 uploadWithResponse 代码示例(询问ChatGTP得代码原型后人力验证)
|
2月前
|
弹性计算 安全 开发工具
灵码评测-阿里云提供的ECS python3 sdk做安全组管理
批量变更阿里云ECS安全组策略(批量变更)
|
4月前
|
程序员 开发工具 Android开发
Android|使用阿里云推流 SDK 实现双路推流不同画面
本文记录了一种使用没有原生支持多路推流的阿里云推流 Android SDK,实现同时推送两路不同画面的流的方法。
93 7
|
6月前
|
Java 开发工具
通过Java SDK调用阿里云模型服务
在阿里云平台上,可以通过创建应用并使用模型服务完成特定任务,如生成文章内容。本示例展示了一段简化的Java代码,演示了如何调用阿里云模型服务生成关于“春秋战国经济与文化”的简短文章。示例代码通过设置系统角色为历史学家,并提出文章生成需求,最终处理并输出生成的文章内容。在实际部署前,请确保正确配置环境变量中的密钥和ID,并根据需要调整SDK导入语句及类名。更多详情和示例,请参考相关链接。
|
6月前
|
Java 开发工具
【Azure Developer】示例: 在中国区调用MSGraph SDK通过User principal name获取到User信息,如Object ID
【Azure Developer】示例: 在中国区调用MSGraph SDK通过User principal name获取到User信息,如Object ID
|
6月前
|
JSON Java API
【Azure API 管理】通过Java APIM SDK创建一个新的API,如何为Reqeust的Representation设置一个内容示例(Sample)?
【Azure API 管理】通过Java APIM SDK创建一个新的API,如何为Reqeust的Representation设置一个内容示例(Sample)?
|
6月前
|
存储 API 开发工具
【Azure Developer】使用 Python SDK连接Azure Storage Account, 计算Blob大小代码示例
【Azure Developer】使用 Python SDK连接Azure Storage Account, 计算Blob大小代码示例
|
Web App开发 前端开发
|
Web App开发 前端开发 数据库
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd"> <html><head><meta http-equiv="Cont
数据仓库建模:定义事实表的粒度Posted on 2015-08-25 09:03 xuzhengzhu 阅读(28) 评论(0) 编辑 收藏 维度建模中一个非常重要的步骤是定义事实表的粒度。
709 0
|
Web App开发 前端开发 数据库
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd"> <html><head><meta http-equiv="Cont
数据仓库建设步骤Posted on 2015-03-04 10:18 xuzhengzhu 阅读(1164) 评论(0) 编辑 收藏 1.系统分析,确定主题 确定一下几个因素:    ·操作出现的频率,即业务部门每隔多长时间做一次查询分析。
880 0

热门文章

最新文章