分布式消息队列中间件系列研究之阿堂教程(进阶篇)

本文涉及的产品
性能测试 PTS,5000VUM额度
Serverless 应用引擎免费试用套餐包,4320000 CU,有效期3个月
注册配置 MSE Nacos/ZooKeeper,118元/月
简介:

 上篇文章,阿堂和大家分享了《分布式消息队列中间件系列研究之阿堂教程(基础篇-Local模式)》,后面由于时间关系,就一直没有接着写了。忙里偷闲,昨天晚上在家写了大部分,今天抽点时间阿堂将继续为大家奉献完成《分布式消息队列中间件系列研究之阿堂教程(进阶篇)》。这里阿堂结合发送邮件的一个相对具体的案例,应该说是有一定的代表性的,相对比较深入的剖析开源框架metq分布式消息队列的使用。相信通过阿堂的分享后,大家就基本明白分布式消息队列是怎么回事和大致知道如何使用了。当然,网友们如果想非常深入的学习和使用metq,建议网友们可以直接到metq的官网去学习和了解。

metq使用的大致流程如下所示

分布式消息队列中间件系列研究之阿堂教程(进阶篇)

分布式消息队列中间件系列研究之阿堂教程(进阶篇)

分布式消息队列中间件系列研究之阿堂教程(进阶篇)


public class Productor {

private static Log log = LogFactory.getLog(Productor.class);
public static void main(String[] args) throws Exception {

//由消息工厂产生消息生产者
MessageProducer producer = MessageSessionFactoryManager.getSessionFactory(true).createProducer();
//设置topic,必须要在server.ini文件中进行配置
final String topic = "email";
//发布topic
producer.publish(topic);
//发布内容(根据实际业务来组装内容)
String line = "网络时空(阿堂)恭喜大家2015年心想事成!";
try{
//模拟发邮件(根据实际业务定义,可以是发邮件,发短信,httpPost提交等均可)
EmailRequest request = new EmailRequest("heyitang@qq.com", topic,line);

//序列化发送内容
String jsonString = JSON.toJSONString(request, SerializerFeature.WriteClassName);

log.info("------------------------------------------------------------------------");
log.info("发布者发送的EmailAddress = "+request.getEmailAddress());
log.info("发布者发送的 Topic = "+request.getEmailTopic());
log.info("发布者发送的邮件的内容= "+request.getContent());
log.info("发布者发送的邮件发送的时间点= "+request.getCreateTime());
log.info("------------------------------------------------------------------------");

//发布订消息,这里老何定义了重写回调SendMessageCallbackImpl实现方法
producer.sendMessage(new Message(topic, jsonString.getBytes()),new SendMessageCallbackImpl(producer,topic,jsonString));

}catch(Exception ex){
log.error(ex);
}

}

}

------------------------------------


public class SendMessageCallbackImpl implements SendMessageCallback {

private static Log log = LogFactory.getLog(SendMessageCallbackImpl.class);

private MessageProducer messageProducer;
private String topic;
private String content;


public SendMessageCallbackImpl( MessageProducer messageProducer, String topic, String content) {

super();
this.messageProducer = messageProducer;
this.topic = topic;
this.content = content;

}


public void onException(Throwable e) {

log.fatal("metaq server exception , error message:" + e.getMessage());
log.info("--------------------------------------------------------------");
//出现异常时写入日志文件,进行补偿机制
logToFile();
log.info("--------------------------------------------------------------");
}


public void onMessageSent(SendResult result) {

log.info("result = "+result);
if (!result.isSuccess()) {

log.info("--------------------------------------------------------------");
log.warn("Send " + topic + " message failed,error message:" + result.getErrorMessage());
//没有收到broker服务器的正常应答时写入日志文件,进行补偿机制
logToFile();

}
else {
log.info("--------------------------------------------------------------");
log.info("Send "+topic+" successfully,sent to " + result.getPartition()+" "+result.getOffset());
log.info("--------------------------------------------------------------");
}

}


private void logToFile() {

//定义日志文件
String fileName = MessageSessionFactoryManager.getMessagedir()+UUID.randomUUID().toString();
File file = new File(fileName);
FileWriter fw = null;

try {

fw = new FileWriter(file);
fw.write(topic+"\r\n");
fw.write(content);
fw.flush();

} catch (IOException e) {

log.error(e);

} finally {

if(fw != null) {
try {
fw.close();
} catch (IOException e) {
}
}

}

}

}

------------------------------------


public class Consumer {


private static Log log = LogFactory.getLog(Consumer.class);

public static void main(String[] args) throws Exception {

final String topic = "email";
final String group = "meta-example";

try{

//复用SessionFacotory(单例模式) 发布者和订阅者共用sessionFactory
MessageConsumer consumer = MessageSessionFactoryManager.getSessionFactory(false).createConsumer(new ConsumerConfig(group));

//每次订阅300k的字节流内容,将订阅信息保存到本地
consumer.subscribe(topic, 1024 * 300, new EmailMessageListener());

//completeSubscribe一次性将所有的订阅生效,并处理zk和metaq服务器的所有交互过程
consumer.completeSubscribe();

}catch(Exception ex){
log.error(ex);
}

}

}

------------------------------------


public class EmailMessageListener implements MessageListener {

private static Log log = LogFactory.getLog(EmailMessageListener.class);

public void recieveMessages(Message message) throws InterruptedException {

try {

log.info("------------------------------------------------------------------------");
log.info("Receive Email message, BrokerId-Partition:"
+ message.getPartition().getBrokerId() + ","
+ message.getPartition().getPartition()+", "+message.getTopic()+" ,"+message.getId());

//反序列化接收内容
EmailRequest emailRequest = JSON.parseObject(new String(message.getData()), EmailRequest.class);
log.info("------------------------------------------------------------------------");
log.info("订阅者接收到的EmailAddress = "+emailRequest.getEmailAddress());
log.info("订阅者接收到的 Topic = "+emailRequest.getEmailTopic());
log.info("订阅者接收到的邮件的内容= "+emailRequest.getContent());
log.info("订阅者接收到的邮件发送的时间点= "+emailRequest.getCreateTime());

//这里目前网友可以根据当前系统时间 - emailRequest.getCreateTime() 比较,超过多长时间,可以丢弃此次订阅信息,不消费,直接return
log.info("------------------------------------------------------------------------");
log.info("订阅者开始订阅消息啦!");
log.info("开始发送邮件啦!");

//发邮件的代码逻辑(过程略),与本文介绍的内容没有太大联系!
//执行发送邮件的代码逻辑

log.info("发送邮件成功啦!");


log.info("订阅者结束订阅消息啦!");
log.info("结束发送邮件啦!");
log.info("End Send Email:" + emailRequest.getEmailAddress());

log.info("------------------------------------------------------------------------");

} catch(Exception ex) {

log.error("EmailMessageListener exception", ex);

}
}

public Executor getExecutor() {
// TODO Auto-generated method stub
return null;
}

}

----------------------------------


public class MessageSessionFactoryManager {

private static Log log = LogFactory.getLog(MessageSessionFactoryManager.class);

private static MessageSessionFactory sessionFactory = null;
private static String messagedir;



public static String getMessagedir() {
return messagedir;
}


private MessageSessionFactoryManager() {
}


public synchronized static MessageSessionFactory getSessionFactory() {
if(sessionFactory == null) {
init(true);
}
return sessionFactory;
}


public synchronized static MessageSessionFactory getSessionFactory(boolean isProducer) {
if(sessionFactory == null) {
init(isProducer);
}
return sessionFactory;
}


private static void init(boolean isProducer) {

try {

String confFile = "/metaq.ini";
InputStream in = EmailMessageProducerManager.class
.getResourceAsStream(confFile);

Properties conf = new Properties();
conf.load(in);

String zookeeper = conf.getProperty("zookeeper");
messagedir = conf.getProperty("messagedir");

final MetaClientConfig metaClientConfig = new MetaClientConfig();
final ZKConfig zkConfig = new ZKConfig();
zkConfig.zkConnect = zookeeper;
metaClientConfig.setZkConfig(zkConfig);
sessionFactory = new MetaMessageSessionFactory(metaClientConfig);

//如果是生产者,则传入值为真;如果是消费者,则传入值为假。可以共用同一个sessionFactory
if(isProducer) {

//补偿机制,这里主要是针对如下两种情况
//发布者在onException中产生了异常,或者 result.isSuccess()返回值不成功,写会入到meaq.ini文件中对应的messagedir目录下
//然后由定时器每间隔30分钟扫描一次,扫到文件后,然后进行补偿机制进行重新由producer重新发布

Timer timer = new Timer();
//在2秒后执行此任务,每次间隔半小时扫描 D:\metaq\mmp\logs 目录下的异常文件,进行补偿机制发布消息 MesaageExceptionHandleTask()实现方法很简单(由于字数限制不贴上去了)

timer.schedule(new MesaageExceptionHandleTask(), 2000, 1000*30*60);
}

} catch (Exception e) {

log.error(e);
throw new RuntimeException(e.getCause());

}
}


public static MessageProducer getMessageProducer() {
return getSessionFactory(true).createProducer();
}

}



本文转自 www19 51CTO博客,原文链接:http://blog.51cto.com/doujh/1715275,如需转载请自行联系原作者

相关文章
|
3月前
|
消息中间件 监控 中间件
常用的消息队列中间件都有什么?优缺点是什么?如何选择?
常用的消息队列中间件都有什么?优缺点是什么?如何选择?
128 5
|
5月前
|
存储 缓存 监控
分布式链路监控系统问题之kywalking在后期维护过程中可能会遇到中间件版本升级的问题如何解决
分布式链路监控系统问题之kywalking在后期维护过程中可能会遇到中间件版本升级的问题如何解决
|
3月前
|
消息中间件 中间件 Kafka
解锁Kafka等消息队列中间件的测试之道
在这个数字化时代,分布式系统和消息队列中间件(如Kafka、RabbitMQ)已成为日常工作的核心组件。本次公开课由前字节跳动资深专家KK老师主讲,深入解析消息队列的基本原理、架构及测试要点,涵盖功能、性能、可靠性、安全性和兼容性测试,并探讨其主要应用场景,如应用解耦、异步处理和限流削峰。课程最后设有互动答疑环节,助你全面掌握消息队列的测试方法。
|
3月前
|
消息中间件 中间件 UED
为什么需要消息队列中间件?
为什么需要消息队列中间件?
56 4
|
5月前
|
消息中间件 存储 Java
【干货】看看我司消息队列用啥,全网最接地气pulsar教程(含业务解耦demo源码)
本文介绍了Apache Pulsar消息队列系统的核心特性及其与其它消息队列的区别,通过Docker安装Pulsar及Pulsar Manager,并结合电商业务场景,对比了串行执行与使用Pulsar实现异步解耦的优势,最后通过Java代码示例展示了如何利用Pulsar解决实际业务问题。
320 4
【干货】看看我司消息队列用啥,全网最接地气pulsar教程(含业务解耦demo源码)
|
5月前
|
运维 安全 Cloud Native
核心系统转型问题之分布式数据库和数据访问中间件协作如何解决
核心系统转型问题之分布式数据库和数据访问中间件协作如何解决
|
6月前
|
消息中间件 缓存 IDE
MetaQ/RocketMQ 原理问题之消息队列中间件的问题如何解决
MetaQ/RocketMQ 原理问题之消息队列中间件的问题如何解决
|
6月前
|
消息中间件 存储 负载均衡
中间件消息队列与发布/订阅模型
【7月更文挑战第15天】
206 6
|
6月前
|
消息中间件 存储 负载均衡
中间件消息队列模型
【7月更文挑战第9天】
50 1
|
7月前
|
消息中间件 存储 中间件
【消息中间件】详解三大MQ:RabbitMQ、RocketMQ、Kafka
【消息中间件】详解三大MQ:RabbitMQ、RocketMQ、Kafka
1829 0