SpringBoot RabbitMQ实现消息队列 邮箱

简介: SpringBoot RabbitMQ实现消息队列 邮箱

主页:写程序的小王叔叔的博客欢迎来访👀

支持:点赞收藏关注


一、效果

image.png

二、RMQ可以实现的功能

【介绍】:集合了网上各种大佬的教学一起整理

2.1消息中间件:

image.png

image.png

image.pngimage.png

image.png

2.2rmq安装

2.3含义

image.png

image.png

2.4原理

image.png

image.png

image.pngimage.pngimage.pngimage.png


三、SpringBoot + RMQ集成项目消息队列及聊天功能

【实现】:根据各位大佬的整理的原理,我们自己实现下如何使用吧

3.1RMQ配置

在rmq安装成功之后,浏览器输入http://localhost:15672,账号密码:guest/guest登录之后,给这个guest账号设置初始交换机(代码中默认设置的交换机,我的是:EXCHANGE_Member)的权限,这个问题注意下,要不一直提示:

to exchange 'RabbitMQ_Exchange_Member' in vhost '/' refused for user 'guest', (这个错误找了半天并且找我们领导了,才知道的)

如下图:

image.png

3.1.1pom.xml(公共文件配置)

<!--RabbitMQ-->

       <dependency>

           <groupId>org.springframework.boot</groupId>

           <artifactId>spring-boot-starter-amqp</artifactId>

       </dependency>

3.1.2spring.xml(公共文件配置)

##############RabbitMQ配置#######spring.rabbitmq.host=localhostspring.rabbitmq.port=5672spring.rabbitmq.username=guestspring.rabbitmq.password=guestspring.rabbitmq.virtual-host=/spring.rabbitmq.connection-timeout=15000#开启confirms回调P->Exchangespring.rabbitmq.publisher-confirms=true#开启returnedMessage回调Exchange->Queuespring.rabbitmq.publisher-returns=true#设置手动确认(ack) Queue->Cspring.rabbitmq.listener.simple.acknowledge-mode=manualspring.rabbitmq.listener.simple.prefetch=100

3.2Java RMQ类代码

3.2.1 使用交换机DirectExchange : 按照routingkey分发到指定队列-(直连)

image.png

rmqpConfig.java  rmqp基本配置

@ConfigurationpublicclassRabbitConfig {
privatestaticfinalLoggerLOGGER=LogManager.getLogger(RabbitConfig.class);
@Value("${spring.rabbitmq.host}")
privateStringhost;
@Value("${spring.rabbitmq.port}")
privateintport;
@Value("${spring.rabbitmq.username}")
privateStringusername;
@Value("${spring.rabbitmq.password}")
privateStringpassword;
@Value("${spring.rabbitmq.virtual-host}")
privateStringvhost;
publicstaticfinalStringEXCHANGE_Member="RabbitMQ_Exchange_Member";//邮件:注册+登录   publicstaticfinalStringQUEUE_Member="RabbitMQ_Queue_Member";//邮件:注册+登录publicstaticfinalStringROUTINGKEY_Member="RabbitMQ_RoutingKey_Member";//邮件:注册+登录//建立一个连接容器,类型数据库的连接池@BeanpublicConnectionFactoryconnectionFactory() {
CachingConnectionFactoryconnectionFactory=newCachingConnectionFactory(host,port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(vhost);
connectionFactory.setPublisherConfirms(true);
returnconnectionFactory;
    }
@Bean@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
publicRabbitTemplaterabbitTemplate() {
RabbitTemplatetemplate=newRabbitTemplate(connectionFactory());
template.setMandatory(true);
template.setEncoding("UTF-8");
// 消息发送失败返回到队列中, yml需要配置 publisher-returns: truetemplate.setMandatory(true);
template.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
LOGGER.info("消息成功消费");
            } else {
LOGGER.info("消息消费失败:"+cause);
            }
        });
template.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
StringcorrelationId=message.getMessageProperties().getCorrelationIdString();
LOGGER.info("消息:{} 发送失败, 应答码:{} 原因:{} 交换机: {}  路由键: {}", correlationId, replyCode, replyText, exchange, routingKey);
          });
returntemplate;
    }
/*** 交换机针对消费者配置* FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念* DirectExchange:按照routingkey分发到指定队列,多关键字匹配*/@BeanpublicDirectExchangedirectExchange() {
returnnewDirectExchange(EXCHANGE_Member, true,false);
    }
/*** 队列** @return*/@BeanpublicQueuedirectQueue() {
returnnewQueue(QUEUE_Member, true); 
    }
/*** 绑定** @return*/@BeanpublicBindingdirectBinding() {
returnBindingBuilder.bind(directQueue()).to(directExchange()).with(ROUTINGKEY_Member);
    }
@BeanpublicMessageConverterjsonMessageConverter() {
returnnewJackson2JsonMessageConverter();
    }
}

RMQProducer.java   rmqp消息提供端/消息发送端

@ComponentpublicclassRMQProducer {
privateLoggerLOGGER=LoggerFactory.getLogger(RMQProducer.class);
privateSimpleDateFormatsdf=newSimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@AutowiredprivateRabbitTemplaterabbitTemplate;
/**** 延迟消息队列信息* @param routingKeyName* @param msg*/publicvoidsendMsg(StringroutingKeyName,Stringmsg) {
LOGGER.info("消息发送成功,routingKeyName: {},msg:{},时间:{}", routingKeyName,msg,sdf.format(newDate()));
rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_Member, routingKeyName, msg, newMessagePostProcessor() {
@OverridepublicMessagepostProcessMessage(Messagemessage) throwsAmqpException {
message.getMessageProperties().setContentEncoding("utf-8");
message.getMessageProperties().setExpiration("120000"); //设置消息存活时间returnmessage;
             }
         });
//rabbitTemplate.convertAndSend(routingKeyName, msg);     }
}

RMQReceiver.java   rmqp消费端

/**** 消费者* @author Administrator**/@ComponentpublicclassRMQReceiverimplementsChannelAwareMessageListener{
privatefinalLoggerlogger=LoggerFactory.getLogger(this.getClass());
@RabbitListener(queues=RabbitConfig.QUEUE_Member )
@RabbitHandlerpublicvoidhandler(Messagemessage, Channelchannel) throwsIOException  {
logger.info("接收处理队列Member当中的消息: "+newString(message.getBody()) );
longdeliveryTag=message.getMessageProperties().getDeliveryTag();
channel.basicAck(deliveryTag, false); // 采用手动应答模式, 手动确认应答更为安全稳定    }
}

3.2.2使用交换机 FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念-(广播)

image.pngimage.png


rmqpConfig.java  rmqp配置类

@ConfigurationpublicclassRabbitConfig {
privatestaticfinalLoggerLOGGER=LogManager.getLogger(RabbitConfig.class);
@Value("${spring.rabbitmq.host}")
privateStringhost;
@Value("${spring.rabbitmq.port}")
privateintport;
@Value("${spring.rabbitmq.username}")
privateStringusername;
@Value("${spring.rabbitmq.password}")
privateStringpassword;
@Value("${spring.rabbitmq.virtual-host}")
privateStringvhost;
/**** 交换机*/publicstaticfinalStringEXCHANGE_Order="RabbitMQ_Exchange_Order";// 下单/*** 队列*/publicstaticfinalStringQUEUE_Order="RabbitMQ_Queue_Order";// 下单publicstaticfinalStringQUEUE_Pay="RabbitMQ_Queue_Pay";// 支付/**** 路由*/publicStringROUTINGKEY_Order="RabbitMQ_RoutingKey_Order";// 下单//建立一个连接容器,类型数据库的连接池@BeanpublicConnectionFactoryconnectionFactory() {
CachingConnectionFactoryconnectionFactory=newCachingConnectionFactory(host,port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(vhost);
connectionFactory.setPublisherConfirms(true);
returnconnectionFactory;
    }
@Bean@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
publicRabbitTemplaterabbitTemplate() {
RabbitTemplatetemplate=newRabbitTemplate(connectionFactory());
template.setMandatory(true);
template.setEncoding("UTF-8");
// 消息发送失败返回到队列中, yml需要配置 publisher-returns: truetemplate.setMandatory(true);
template.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
LOGGER.info("消息成功消费");
            } else {
LOGGER.info("消息消费失败:"+cause);
            }
        });
template.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
StringcorrelationId=message.getMessageProperties().getCorrelationIdString();
LOGGER.info("消息:{} 发送失败, 应答码:{} 原因:{} 交换机: {}  路由键: {}", correlationId, replyCode, replyText, exchange, routingKey);
        });
returntemplate;
    }

 

/////////Fanout广播配置////////////////////**** 交换机配置* @return*/@BeanpublicFanoutExchangefanoutExchange() {
returnnewFanoutExchange(EXCHANGE_Order);
    }
/*** 队列配置* @return*/@BeanpublicQueuefanoutQueueOrder() {
returnnewQueue(QUEUE_Order);
    }
@BeanpublicQueuefanoutQueuePay() {
returnnewQueue(QUEUE_Pay);
    }
/**** 绑定交换机和队列* @return*/@BeanpublicBindingbindFanoutExchangeOrder() {   
returnBindingBuilder.bind(fanoutQueueOrder()).to(fanoutExchange());
    }
@BeanpublicBindingbindFanoutExchangePay() {  
returnBindingBuilder.bind(fanoutQueuePay()).to(fanoutExchange());
    }
@BeanpublicMessageConverterjsonMessageConverter() {
returnnewJackson2JsonMessageConverter();
    }
}

RMQProducer.java     rmqp消息提供端/消息发送端

/**** 提供者* @author Administrator**/@ComponentpublicclassRMQProducer {
privateLoggerLOGGER=LoggerFactory.getLogger(RMQProducer.class);
privateSimpleDateFormatsdf=newSimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@AutowiredprivateRabbitTemplaterabbitTemplate;
/**** 延迟消息队列信息* @param routingKeyName* @param msg* @param string */publicvoidsendMsgtoFound(StringexchangeName , Stringmsg) {
LOGGER.info("消息发送成功,msg:{},时间:{}",msg,sdf.format(newDate()));
rabbitTemplate.convertAndSend(exchangeName , msg);
    }
}

RMQReceiver.java  rmqp消费端

/**** 消费者* @author Administrator**/@ComponentpublicclassRMQReceiverimplementsChannelAwareMessageListener{
privatefinalLoggerlogger=LoggerFactory.getLogger(this.getClass());
@RabbitListener(queues=RabbitConfig.QUEUE_Order )
@RabbitHandlerpublicvoidhandlerOrder(Messagemessage, Channelchannel) throwsIOException  {
logger.info("接收处理队列QUEUE_Order当中的消息: "+newString(message.getBody()) );
longdeliveryTag=message.getMessageProperties().getDeliveryTag();
channel.basicAck(deliveryTag, false); // 采用手动应答模式, 手动确认应答更为安全稳定    }
@RabbitListener(queues=RabbitConfig.QUEUE_Pay )
@RabbitHandlerpublicvoidhandlerPay(Messagemessage, Channelchannel) throwsIOException  {
logger.info("接收处理队列QUEUE_Pay当中的消息: "+newString(message.getBody()) );
longdeliveryTag=message.getMessageProperties().getDeliveryTag();
channel.basicAck(deliveryTag, false); // 采用手动应答模式, 手动确认应答更为安全稳定   }
/*** @param trim* @return*/privateMap<String, String>mapStringToMap(Stringstr) {
str=str.substring(1, str.length() -1);
String[] strs=str.split(",");
Map<String, String>map=newHashMap<String, String>();
for (Stringstring : strs) {
Stringkey=string.split("=")[0].trim();
Stringvalue=string.split("=")[1];
map.put(key, value);
        }
returnmap;
   }
}

3.2.3使用交换机

topicExchange: 通配符方式分发消息-(订阅)

四、解决

4.1 工具安装

guest/guest登录失败如何解决:

image.png

解决办法:执行如下命令

命令1:rabbitmqctl set_user_tags guest administrator

命令2:rabbitmqctl set_permissions -p / guest '.*' '.*' '.*'

重启rabbitmq即可。

image.png

重启服务:

image.png

------------------------ 我是愉快的分割线 -----------------------------

停止:service rabbitmq-server stop
启动:service rabbitmq-server start
查看状态:service rabbitmq-server status

4.2什么时间让它消费

什么时间手动消费(手动消费:不消费永远都在rmqp中保留)

操作:目前的得到的方案是:将消费端的监听事件关闭,不用监听,则这样的消息会永远停留在rmqp的交换机-队列-路由中

/**** 消费者* @author Administrator**/@ComponentpublicclassRMQReceiverimplementsChannelAwareMessageListener{
privatefinalLoggerlogger=LoggerFactory.getLogger(this.getClass());
//@RabbitListener(queues = RabbitConfig.QUEUE_Member )//@RabbitHandlerpublicvoidhandler(Messagemessage, Channelchannel) throwsIOException  {
logger.info("接收处理队列Member当中的消息: "+newString(message.getBody()) );
longdeliveryTag=message.getMessageProperties().getDeliveryTag();
// channel.basicAck(deliveryTag, false); // 采用手动应答模式, 手动确认应答更为安全稳定    }
}

如下图:

image.png

五、邮件发送

5.1QQ邮箱授权码获取

image.png

5.2邮箱配置

pom.xml<!--Mail--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-mail</artifactId></dependency>
spring.xml--》配置邮箱########Mail配置########################QQsmtp.qq.com##sinasmtp.sina.cn##aliyunsmtp.aliyun.com##163smtp.163.com#126邮箱SMTP服务器地址:smtp.126.com,端口号:465或者994#163邮箱SMTP服务器地址:smtp.163.com,端口号:465或者994#yeah邮箱SMTP服务器地址:smtp.yeah.net,端口号:465或者994##发送方spring.mail.host=smtp.qq.com##邮件地址spring.mail.from=1247622527@qq.com#用户名spring.mail.username=1247622527@qq.com##客户端授权码(不是邮箱密码,这个在qq邮箱设置里面自动生成的)spring.mail.password=----------------》》》邮箱的授权码#端口号465或587spring.properties.mail.smtp.port: 25##编码格式spring.mail.default-encoding=UTF-8spring.mail.properties.mail.smtp.auth=truespring.mail.properties.mail.smtp.starttls.enable=truespring.mail.properties.mail.smtp.starttls.required=truespring.mail.properties.mail.smtp.ssl.enable=true

sendMailUtil.java

@ComponentpublicclassSendMailUtil {
privatestaticfinalLoggerLOGGER=LogManager.getLogger(SendMailUtil.class);
@AutowiredprivateJavaMailSendermailSender;
//发送方邮件的发送地址@Value("${spring.mail.host}")
publicstaticStringsendMailHost;
//发送方发送邮件的账号@Value("${spring.mail.username}")
publicstaticStringsendMailUsername;
//发送方发送邮件的客户端授权码@Value("${pring.mail.password}")
publicstaticStringsendMailPassword;
//发送方发送邮件的端口@Value("${spring.properties.mail.smtp.port}")
publicstaticStringsendMailPort;
@Value("${spring.mail.from")
publicstaticStringsendMailFrom;
publicstaticvoidsendSimpleMail(Stringto, Stringsubject, Stringcontent) throwsException{
//创建连接对象 连接到邮件服务器Propertiesproperties=newProperties();
//设置发送邮件的基本参数//发送邮件服务器properties.put("mail.smtp.host", sendMailHost);
//发送端口properties.put("mail.smtp.port", sendMailPort);
properties.put("mail.smtp.auth", "true");
//设置发送邮件的账号和密码Sessionsession=Session.getInstance(properties, newAuthenticator() {
@OverrideprotectedPasswordAuthenticationgetPasswordAuthentication() {
//两个参数分别是发送邮件的账户和密码returnnewPasswordAuthentication(sendMailUsername,sendMailPassword);
            }
        });
//创建邮件对象Messagemessage=newMimeMessage(session);
//设置发件人message.setFrom(newInternetAddress(sendMailUsername));
//设置收件人message.setRecipient(Message.RecipientType.TO,newInternetAddress(to));
//设置主题message.setSubject(subject);
// 设置邮件内容message.setContent(content,"text/html;charset=UTF-8"); 
//发送一封邮件Transport.send(message);
   }
publicvoidsendHtmlMail(Stringto, Stringsubject, Stringcontent) {
//获取MimeMessage对象MimeMessagemessage=mailSender.createMimeMessage();
MimeMessageHelpermessageHelper;
try {
messageHelper=newMimeMessageHelper(message, true);
//邮件发送人messageHelper.setFrom(sendMailFrom);
//邮件接收人messageHelper.setTo(subject);
//邮件主题message.setSubject(subject);
//邮件内容,html格式messageHelper.setText(content, true);
//发送mailSender.send(message);
        } catch (MessagingExceptione) {
        }
   }
publicvoidsendAttachmentsMail(Stringto, Stringsubject, Stringcontent, StringfilePath) {
MimeMessagemessage=mailSender.createMimeMessage();
try {
MimeMessageHelperhelper=newMimeMessageHelper(message, true);
helper.setFrom(sendMailFrom);
helper.setTo(to);
helper.setSubject(subject);
helper.setText(content, true);
FileSystemResourcefile=newFileSystemResource(newFile(filePath));
StringfileName=filePath.substring(filePath.lastIndexOf(File.separator));
helper.addAttachment(fileName, file);
mailSender.send(message);
           } catch (MessagingExceptione) {
//日志信息             }
   }
publicstaticvoidmain(String[] args) throwsException {
//sendSimpleMail("1901660505@qq.com","主题:邮箱注册","内容:这是一个邮件注册码,请输入:"+ IdGenerate.random2FiveId()) ;//System.exit(0);   }
}

业务代码.Java

@OverridepublicObjectuserSendNewMailCode(StringmemberId , StringnewMemberEmail) {
Map<String , Object>mapMemberInfo=newHashMap<String, Object>(); 
if(!"".equals(memberId) &&!"".equals(newMemberEmail)) {
Membermember=memberService.getOnlyOneMemberInfoByMemberId(memberId);
StringvalidEmail=IdGenerate.random2FiveId();
//邮箱登录注册后,可绑定更改新邮箱if (member!=null&&newMemberEmail.equals(member.getMemberEmail())) {
try {
SendMailUtils.sendSimpleMail(newMemberEmail,"主题:邮箱注册","内容:新邮箱账号:[ "+newMemberEmail+" ]未注册,可放心使用! \n 请将数字验证码:[ "+validEmail+" ] 填入邮箱注册码中!");
        } catch (Exceptione) {
e.printStackTrace();
mapMemberInfo.put("exception", e.getMessage());
        }
//邮箱未登录注册后,任意邮箱可绑定      }elseif(member!=null&&!newMemberEmail.equals(member.getMemberEmail())){
try {
SendMailUtils.sendSimpleMail(newMemberEmail,"主题:邮箱注册","内容:邮箱账号:[ "+newMemberEmail+" ]未注册,可放心使用! \n 请将数字验证码:[ "+validEmail+" ] 填入邮箱注册码中!");
        } catch (Exceptione) {
e.printStackTrace();
mapMemberInfo.put("exception", e.getMessage());
        }
      }
List<Object>memberInfo=newArrayList<Object>();
mapMemberInfo.put("newMemberEmail", newMemberEmail);
mapMemberInfo.put("validEmail", validEmail);
memberInfo.add(mapMemberInfo);
returnmapMemberInfo;
    }
returnmapMemberInfo;
  }

转载声明:本文为博主原创文章,未经博主允许不得转载


⚠️注意 ~

💯本期内容就结束了,如果内容有误,麻烦大家评论区指出!

如有疑问❓可以在评论区留言💬或私信留言💬,尽我最大能力🏃‍♀️帮大家解决👨‍🏫!

如果我的文章有帮助到您,欢迎点赞+关注✔️鼓励博主🏃,您的鼓励是我分享的动力🏃🏃🏃~


相关实践学习
RocketMQ一站式入门使用
从源码编译、部署broker、部署namesrv,使用java客户端首发消息等一站式入门RocketMQ。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
相关文章
|
30天前
|
消息中间件 NoSQL Java
springboot redis 实现消息队列
springboot redis 实现消息队列
34 1
|
1月前
|
消息中间件 存储 监控
RabbitMQ:分布式系统中的高效消息队列
RabbitMQ:分布式系统中的高效消息队列
|
2月前
|
消息中间件 存储 监控
搭建消息时光机:深入探究RabbitMQ_recent_history_exchange在Spring Boot中的应用【RabbitMQ实战 二】
搭建消息时光机:深入探究RabbitMQ_recent_history_exchange在Spring Boot中的应用【RabbitMQ实战 二】
32 1
|
2月前
|
消息中间件 监控 Java
Spring Boot中的RabbitMQ死信队列魔法:从异常到延迟,一网打尽【RabbitMQ实战 一】
Spring Boot中的RabbitMQ死信队列魔法:从异常到延迟,一网打尽【RabbitMQ实战 一】
58 0
|
2月前
|
消息中间件 NoSQL Java
Redis Streams在Spring Boot中的应用:构建可靠的消息队列解决方案【redis实战 二】
Redis Streams在Spring Boot中的应用:构建可靠的消息队列解决方案【redis实战 二】
187 1
|
1月前
|
消息中间件 Java
springboot整合消息队列——RabbitMQ
springboot整合消息队列——RabbitMQ
74 0
|
12天前
|
消息中间件 存储 负载均衡
消息队列学习之RabbitMQ
【4月更文挑战第3天】消息队列学习之RabbitMQ,一种基于erlang语言开发的流行的开源消息中间件。
15 0
|
1月前
|
消息中间件 存储 中间件
【SpringCloud Stream消息驱动、设计思想以及整合rabbitmq消息队列案例--学习笔记】
【SpringCloud Stream消息驱动、设计思想以及整合rabbitmq消息队列案例--学习笔记】
46 0
|
1月前
|
消息中间件 存储 Cloud Native
【Spring云原生系列】Spring RabbitMQ:异步处理机制的基础--消息队列 原理讲解+使用教程
【Spring云原生系列】Spring RabbitMQ:异步处理机制的基础--消息队列 原理讲解+使用教程
|
3月前
|
消息中间件 Java Spring
RabbitMQ各种模式的含义与Spring Boot实例详解
RabbitMQ各种模式的含义与Spring Boot实例详解
34 0

相关产品

  • 云消息队列 MQ