阿里云Rocket MQ Java Http SDK发送消费消息示例Demo

简介: 消息队列 RocketMQ 版支持 RESTful 风格的 HTTP 协议通信,并提供了以下 7 种语言的 SDK,下面以最新的Java Http SDK为范例介绍RocketMQ消息的发送和接收。

Step By Step

1、创建实例,登陆阿里云控制台
图片.png
图片.png

2、实例下面分别创建Topic和Http
Group
图片.png
图片.png

3、pom.xml

        <dependency>
            <groupId>com.aliyun.mq</groupId>
            <artifactId>mq-http-sdk</artifactId>
            <version>1.0.2</version>
        </dependency>

4、Producer Code Sample

import com.aliyun.mq.http.MQClient;
import com.aliyun.mq.http.MQProducer;
import com.aliyun.mq.http.model.TopicMessage;
import java.util.Date;

public class ProducerDemo {
   

    public static void main(String[] args) {
   
        MQClient mqClient = new MQClient(
                // 设置HTTP接入域名(此处以公共云生产环境为例)
                "http://18482178********.mqrest.cn-shanghai.aliyuncs.com",
                // AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
                "LTAIOZZg********",
                // SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
                "v7CjUJCMk7j9aK****************"
        );

        // 所属的 Topic
        final String topic = "****";
        // Topic所属实例ID,默认实例为空
        final String instanceId = "MQ_INST_********";

        // 获取Topic的生产者
        MQProducer producer;
        if (instanceId != null && instanceId != "") {
   
            producer = mqClient.getProducer(instanceId, topic);
        } else {
   
            producer = mqClient.getProducer(topic);
        }

        try {
   
            // 循环发送40条消息
            for (int i = 0; i < 40; i++) {
   
                TopicMessage pubMsg;
                if (i % 2 == 0) {
   
                    // 普通消息
                    pubMsg = new TopicMessage(
                            // 消息内容
                            "hello common mq!".getBytes(),
                            // 消息标签
                            "A"
                    );
                    // 设置属性
                    pubMsg.getProperties().put("a", String.valueOf(i));
                    // 设置KEY
                    pubMsg.setMessageKey("MessageKey");
                } else {
   
                    pubMsg = new TopicMessage(
                            // 消息内容
                            "hello delay mq!".getBytes(),
                            // 消息标签
                            "B"
                    );
                    // 设置属性
                    pubMsg.getProperties().put("b", String.valueOf(i));
                    // 定时消息, 定时时间为10s后
                    pubMsg.setStartDeliverTime(System.currentTimeMillis() + 10 * 1000);
                }
                // 同步发送消息,只要不抛异常就是成功
                TopicMessage pubResultMsg = producer.publishMessage(pubMsg);

                // 同步发送消息,只要不抛异常就是成功
                System.out.println(new Date() + " Send mq message success. Topic is:" + topic + ", msgId is: " + pubResultMsg.getMessageId()
                        + ", bodyMD5 is: " + pubResultMsg.getMessageBodyMD5());
            }
        } catch (Throwable e) {
   
            // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理
            System.out.println(new Date() + " Send mq message failed. Topic is:" + topic);
            e.printStackTrace();
        }

        mqClient.close();
    }
}

5、Consumer Code Sample

import com.aliyun.mq.http.MQClient;
import com.aliyun.mq.http.MQConsumer;
import com.aliyun.mq.http.model.Message;
import java.util.*;

public class ConsumerDemo {
   

    public static void main(String[] args) {
   

        MQClient mqClient = new MQClient(
                // 设置HTTP接入域名(此处以公共云生产环境为例)
                "http://18482178********.mqrest.cn-shanghai.aliyuncs.com",
                // AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
                "LTAIOZZg********",
                // SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
                "v7CjUJCMk7j9aK****************"
        );

        String topicName = "****";
        String consumer = "GID_****"; //Http Consumer Group Name
        String messageTag =""; // Tag,为空表示订阅全部Tag
        String instanceId = "MQ_INST_******";

        MQConsumer mqConsumer = mqClient.getConsumer(instanceId,topicName, consumer,messageTag);

        while(true) {
   
            try {
   

                // 消费消息,轮训时间设置为3秒,一次至多拉去三条消息
                List<Message> listMessage = mqConsumer.consumeMessage(3, 3);

                if (listMessage == null || listMessage.size() == 0) {
   
                    System.out.println("Message is not exist!");
                } else {
   
                    List<String> receiptHandles = new ArrayList<String>();
                    for (Message message : listMessage
                    ) {
   
                        System.out.println("MessageBody" + message.getMessageBodyString());
                        receiptHandles.add(message.getReceiptHandle());
                    }
                    // 回调删除
                    mqConsumer.ackMessage(receiptHandles);
                }
            }catch (Exception ex)
            {
   
                System.out.println("error:" + ex.getMessage());
                mqClient.close();
            }
        }
    }
}
相关实践学习
消息队列RocketMQ版:基础消息收发功能体验
本实验场景介绍消息队列RocketMQ版的基础消息收发功能,涵盖实例创建、Topic、Group资源创建以及消息收发体验等基础功能模块。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
1月前
|
存储 消息中间件 安全
JUC组件实战:实现RRPC(Java与硬件通过MQTT的同步通信)
【10月更文挑战第9天】本文介绍了如何利用JUC组件实现Java服务与硬件通过MQTT的同步通信(RRPC)。通过模拟MQTT通信流程,使用`LinkedBlockingQueue`作为消息队列,详细讲解了消息发送、接收及响应的同步处理机制,包括任务超时处理和内存泄漏的预防措施。文中还提供了具体的类设计和方法实现,帮助理解同步通信的内部工作原理。
JUC组件实战:实现RRPC(Java与硬件通过MQTT的同步通信)
|
1月前
|
消息中间件 存储 JSON
rabbitmq基础教程(ui,java,springamqp)
本文提供了RabbitMQ的基础教程,包括如何使用UI创建队列和交换机、Java代码操作RabbitMQ、Spring AMQP进行消息发送和接收,以及如何使用不同的交换机类型(fanout、direct、topic)进行消息路由。
25 0
rabbitmq基础教程(ui,java,springamqp)
|
1月前
|
消息中间件 前端开发 Java
java高并发场景RabbitMQ的使用
java高并发场景RabbitMQ的使用
98 0
|
2月前
|
消息中间件 缓存 Java
RocketMQ的JAVA落地实战
RocketMQ作为一款高性能、高可靠、高实时、分布式特点的消息中间件,其核心作用主要体现在异步处理、削峰填谷以及系统解耦三个方面。
175 0
|
3月前
|
Java
MQTT(EMQX) - Java 调用 MQTT Demo 代码
MQTT(EMQX) - Java 调用 MQTT Demo 代码
157 0
MQTT(EMQX) - Java 调用 MQTT Demo 代码
|
4月前
|
JSON 运维 Serverless
Serverless 应用引擎使用问题之ThinkPHP框架是否有基于SDK的demo
阿里云Serverless 应用引擎(SAE)提供了完整的微服务应用生命周期管理能力,包括应用部署、服务治理、开发运维、资源管理等功能,并通过扩展功能支持多环境管理、API Gateway、事件驱动等高级应用场景,帮助企业快速构建、部署、运维和扩展微服务架构,实现Serverless化的应用部署与运维模式。以下是对SAE产品使用合集的概述,包括应用管理、服务治理、开发运维、资源管理等方面。
|
3月前
|
缓存 JavaScript 前端开发
微信 JS-SDK Demo “分享信息设置” API 及数字签名生成方法(NodeJS版本)
微信 JS-SDK Demo “分享信息设置” API 及数字签名生成方法(NodeJS版本)更新时间(2020-10-29)
|
4月前
|
消息中间件 Java Maven
如何在Java中使用RabbitMQ
如何在Java中使用RabbitMQ
|
5月前
|
消息中间件 负载均衡 网络性能优化
消息队列 MQ产品使用合集之 终端sdk和云端sdk的区别是什么
消息队列(MQ)是一种用于异步通信和解耦的应用程序间消息传递的服务,广泛应用于分布式系统中。针对不同的MQ产品,如阿里云的RocketMQ、RabbitMQ等,它们在实现上述场景时可能会有不同的特性和优势,比如RocketMQ强调高吞吐量、低延迟和高可用性,适合大规模分布式系统;而RabbitMQ则以其灵活的路由规则和丰富的协议支持受到青睐。下面是一些常见的消息队列MQ产品的使用场景合集,这些场景涵盖了多种行业和业务需求。
|
5月前
|
消息中间件 Java
Java一分钟之-RabbitMQ:AMQP协议实现
【6月更文挑战第11天】RabbitMQ是基于AMQP协议的开源消息队列服务,支持多种消息模式。本文介绍了RabbitMQ的核心概念:生产者、消费者、交换器、队列和绑定,以及常见问题和解决方案。例如,通过设置消息持久化和确认机制防止消息丢失,配置死信队列处理不可消费消息,以及妥善管理资源防止泄漏。还提供了Java代码示例,帮助读者理解和使用RabbitMQ。通过理解这些基础和最佳实践,可以提升RabbitMQ在分布式系统中的可靠性和效率。
129 0
Java一分钟之-RabbitMQ:AMQP协议实现

热门文章

最新文章