Step By Step
1、创建实例,登陆阿里云控制台
2、实例下面分别创建Topic和Http
Group
3、pom.xml
<dependency>
<groupId>com.aliyun.mq</groupId>
<artifactId>mq-http-sdk</artifactId>
<version>1.0.2</version>
</dependency>
4、Producer Code Sample
import com.aliyun.mq.http.MQClient;
import com.aliyun.mq.http.MQProducer;
import com.aliyun.mq.http.model.TopicMessage;
import java.util.Date;
public class ProducerDemo {
public static void main(String[] args) {
MQClient mqClient = new MQClient(
// 设置HTTP接入域名(此处以公共云生产环境为例)
"http://18482178********.mqrest.cn-shanghai.aliyuncs.com",
// AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
"LTAIOZZg********",
// SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
"v7CjUJCMk7j9aK****************"
);
// 所属的 Topic
final String topic = "****";
// Topic所属实例ID,默认实例为空
final String instanceId = "MQ_INST_********";
// 获取Topic的生产者
MQProducer producer;
if (instanceId != null && instanceId != "") {
producer = mqClient.getProducer(instanceId, topic);
} else {
producer = mqClient.getProducer(topic);
}
try {
// 循环发送40条消息
for (int i = 0; i < 40; i++) {
TopicMessage pubMsg;
if (i % 2 == 0) {
// 普通消息
pubMsg = new TopicMessage(
// 消息内容
"hello common mq!".getBytes(),
// 消息标签
"A"
);
// 设置属性
pubMsg.getProperties().put("a", String.valueOf(i));
// 设置KEY
pubMsg.setMessageKey("MessageKey");
} else {
pubMsg = new TopicMessage(
// 消息内容
"hello delay mq!".getBytes(),
// 消息标签
"B"
);
// 设置属性
pubMsg.getProperties().put("b", String.valueOf(i));
// 定时消息, 定时时间为10s后
pubMsg.setStartDeliverTime(System.currentTimeMillis() + 10 * 1000);
}
// 同步发送消息,只要不抛异常就是成功
TopicMessage pubResultMsg = producer.publishMessage(pubMsg);
// 同步发送消息,只要不抛异常就是成功
System.out.println(new Date() + " Send mq message success. Topic is:" + topic + ", msgId is: " + pubResultMsg.getMessageId()
+ ", bodyMD5 is: " + pubResultMsg.getMessageBodyMD5());
}
} catch (Throwable e) {
// 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理
System.out.println(new Date() + " Send mq message failed. Topic is:" + topic);
e.printStackTrace();
}
mqClient.close();
}
}
5、Consumer Code Sample
import com.aliyun.mq.http.MQClient;
import com.aliyun.mq.http.MQConsumer;
import com.aliyun.mq.http.model.Message;
import java.util.*;
public class ConsumerDemo {
public static void main(String[] args) {
MQClient mqClient = new MQClient(
// 设置HTTP接入域名(此处以公共云生产环境为例)
"http://18482178********.mqrest.cn-shanghai.aliyuncs.com",
// AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
"LTAIOZZg********",
// SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
"v7CjUJCMk7j9aK****************"
);
String topicName = "****";
String consumer = "GID_****"; //Http Consumer Group Name
String messageTag =""; // Tag,为空表示订阅全部Tag
String instanceId = "MQ_INST_******";
MQConsumer mqConsumer = mqClient.getConsumer(instanceId,topicName, consumer,messageTag);
while(true) {
try {
// 消费消息,轮训时间设置为3秒,一次至多拉去三条消息
List<Message> listMessage = mqConsumer.consumeMessage(3, 3);
if (listMessage == null || listMessage.size() == 0) {
System.out.println("Message is not exist!");
} else {
List<String> receiptHandles = new ArrayList<String>();
for (Message message : listMessage
) {
System.out.println("MessageBody" + message.getMessageBodyString());
receiptHandles.add(message.getReceiptHandle());
}
// 回调删除
mqConsumer.ackMessage(receiptHandles);
}
}catch (Exception ex)
{
System.out.println("error:" + ex.getMessage());
mqClient.close();
}
}
}
}