主页:写程序的小王叔叔的博客欢迎来访👀
支持:点赞收藏关注
一、效果
二、RMQ可以实现的功能
【介绍】:集合了网上各种大佬的教学一起整理
2.1消息中间件:
2.2rmq安装
2.3含义
2.4原理
三、SpringBoot + RMQ集成项目消息队列及聊天功能
【实现】:根据各位大佬的整理的原理,我们自己实现下如何使用吧
3.1RMQ配置
在rmq安装成功之后,浏览器输入http://localhost:15672,账号密码:guest/guest登录之后,给这个guest账号设置初始交换机(代码中默认设置的交换机,我的是:EXCHANGE_Member)的权限,这个问题注意下,要不一直提示:
to exchange 'RabbitMQ_Exchange_Member' in vhost '/' refused for user 'guest', (这个错误找了半天并且找我们领导了,才知道的)
如下图:
3.1.1pom.xml(公共文件配置)
<!--RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
3.1.2spring.xml(公共文件配置)
##############RabbitMQ配置#######spring.rabbitmq.host=localhostspring.rabbitmq.port=5672spring.rabbitmq.username=guestspring.rabbitmq.password=guestspring.rabbitmq.virtual-host=/spring.rabbitmq.connection-timeout=15000#开启confirms回调P->Exchangespring.rabbitmq.publisher-confirms=true#开启returnedMessage回调Exchange->Queuespring.rabbitmq.publisher-returns=true#设置手动确认(ack) Queue->Cspring.rabbitmq.listener.simple.acknowledge-mode=manualspring.rabbitmq.listener.simple.prefetch=100
3.2Java RMQ类代码
3.2.1 使用交换机DirectExchange : 按照routingkey分发到指定队列-(直连)
rmqpConfig.java rmqp基本配置
publicclassRabbitConfig { privatestaticfinalLoggerLOGGER=LogManager.getLogger(RabbitConfig.class); "${spring.rabbitmq.host}") (privateStringhost; "${spring.rabbitmq.port}") (privateintport; "${spring.rabbitmq.username}") (privateStringusername; "${spring.rabbitmq.password}") (privateStringpassword; "${spring.rabbitmq.virtual-host}") (privateStringvhost; publicstaticfinalStringEXCHANGE_Member="RabbitMQ_Exchange_Member";//邮件:注册+登录 publicstaticfinalStringQUEUE_Member="RabbitMQ_Queue_Member";//邮件:注册+登录publicstaticfinalStringROUTINGKEY_Member="RabbitMQ_RoutingKey_Member";//邮件:注册+登录//建立一个连接容器,类型数据库的连接池publicConnectionFactoryconnectionFactory() { CachingConnectionFactoryconnectionFactory=newCachingConnectionFactory(host,port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setVirtualHost(vhost); connectionFactory.setPublisherConfirms(true); returnconnectionFactory; } ConfigurableBeanFactory.SCOPE_PROTOTYPE) (publicRabbitTemplaterabbitTemplate() { RabbitTemplatetemplate=newRabbitTemplate(connectionFactory()); template.setMandatory(true); template.setEncoding("UTF-8"); // 消息发送失败返回到队列中, yml需要配置 publisher-returns: truetemplate.setMandatory(true); template.setConfirmCallback((correlationData, ack, cause) -> { if (ack) { LOGGER.info("消息成功消费"); } else { LOGGER.info("消息消费失败:"+cause); } }); template.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { StringcorrelationId=message.getMessageProperties().getCorrelationIdString(); LOGGER.info("消息:{} 发送失败, 应答码:{} 原因:{} 交换机: {} 路由键: {}", correlationId, replyCode, replyText, exchange, routingKey); }); returntemplate; } /*** 交换机针对消费者配置* FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念* DirectExchange:按照routingkey分发到指定队列,多关键字匹配*/publicDirectExchangedirectExchange() { returnnewDirectExchange(EXCHANGE_Member, true,false); } /*** 队列** @return*/publicQueuedirectQueue() { returnnewQueue(QUEUE_Member, true); } /*** 绑定** @return*/publicBindingdirectBinding() { returnBindingBuilder.bind(directQueue()).to(directExchange()).with(ROUTINGKEY_Member); } publicMessageConverterjsonMessageConverter() { returnnewJackson2JsonMessageConverter(); } }
RMQProducer.java rmqp消息提供端/消息发送端
publicclassRMQProducer { privateLoggerLOGGER=LoggerFactory.getLogger(RMQProducer.class); privateSimpleDateFormatsdf=newSimpleDateFormat("yyyy-MM-dd HH:mm:ss"); privateRabbitTemplaterabbitTemplate; /**** 延迟消息队列信息* @param routingKeyName* @param msg*/publicvoidsendMsg(StringroutingKeyName,Stringmsg) { LOGGER.info("消息发送成功,routingKeyName: {},msg:{},时间:{}", routingKeyName,msg,sdf.format(newDate())); rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_Member, routingKeyName, msg, newMessagePostProcessor() { publicMessagepostProcessMessage(Messagemessage) throwsAmqpException { message.getMessageProperties().setContentEncoding("utf-8"); message.getMessageProperties().setExpiration("120000"); //设置消息存活时间returnmessage; } }); //rabbitTemplate.convertAndSend(routingKeyName, msg); } }
RMQReceiver.java rmqp消费端
/**** 消费者* @author Administrator**/publicclassRMQReceiverimplementsChannelAwareMessageListener{ privatefinalLoggerlogger=LoggerFactory.getLogger(this.getClass()); queues=RabbitConfig.QUEUE_Member ) (publicvoidhandler(Messagemessage, Channelchannel) throwsIOException { logger.info("接收处理队列Member当中的消息: "+newString(message.getBody()) ); longdeliveryTag=message.getMessageProperties().getDeliveryTag(); channel.basicAck(deliveryTag, false); // 采用手动应答模式, 手动确认应答更为安全稳定 } }
3.2.2使用交换机 FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念-(广播)
rmqpConfig.java rmqp配置类
publicclassRabbitConfig { privatestaticfinalLoggerLOGGER=LogManager.getLogger(RabbitConfig.class); "${spring.rabbitmq.host}") (privateStringhost; "${spring.rabbitmq.port}") (privateintport; "${spring.rabbitmq.username}") (privateStringusername; "${spring.rabbitmq.password}") (privateStringpassword; "${spring.rabbitmq.virtual-host}") (privateStringvhost; /**** 交换机*/publicstaticfinalStringEXCHANGE_Order="RabbitMQ_Exchange_Order";// 下单/*** 队列*/publicstaticfinalStringQUEUE_Order="RabbitMQ_Queue_Order";// 下单publicstaticfinalStringQUEUE_Pay="RabbitMQ_Queue_Pay";// 支付/**** 路由*/publicStringROUTINGKEY_Order="RabbitMQ_RoutingKey_Order";// 下单//建立一个连接容器,类型数据库的连接池publicConnectionFactoryconnectionFactory() { CachingConnectionFactoryconnectionFactory=newCachingConnectionFactory(host,port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setVirtualHost(vhost); connectionFactory.setPublisherConfirms(true); returnconnectionFactory; } ConfigurableBeanFactory.SCOPE_PROTOTYPE) (publicRabbitTemplaterabbitTemplate() { RabbitTemplatetemplate=newRabbitTemplate(connectionFactory()); template.setMandatory(true); template.setEncoding("UTF-8"); // 消息发送失败返回到队列中, yml需要配置 publisher-returns: truetemplate.setMandatory(true); template.setConfirmCallback((correlationData, ack, cause) -> { if (ack) { LOGGER.info("消息成功消费"); } else { LOGGER.info("消息消费失败:"+cause); } }); template.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { StringcorrelationId=message.getMessageProperties().getCorrelationIdString(); LOGGER.info("消息:{} 发送失败, 应答码:{} 原因:{} 交换机: {} 路由键: {}", correlationId, replyCode, replyText, exchange, routingKey); }); returntemplate; }
/////////Fanout广播配置////////////////////**** 交换机配置* @return*/publicFanoutExchangefanoutExchange() { returnnewFanoutExchange(EXCHANGE_Order); } /*** 队列配置* @return*/publicQueuefanoutQueueOrder() { returnnewQueue(QUEUE_Order); } publicQueuefanoutQueuePay() { returnnewQueue(QUEUE_Pay); } /**** 绑定交换机和队列* @return*/publicBindingbindFanoutExchangeOrder() { returnBindingBuilder.bind(fanoutQueueOrder()).to(fanoutExchange()); } publicBindingbindFanoutExchangePay() { returnBindingBuilder.bind(fanoutQueuePay()).to(fanoutExchange()); } publicMessageConverterjsonMessageConverter() { returnnewJackson2JsonMessageConverter(); } }
RMQProducer.java rmqp消息提供端/消息发送端
/**** 提供者* @author Administrator**/publicclassRMQProducer { privateLoggerLOGGER=LoggerFactory.getLogger(RMQProducer.class); privateSimpleDateFormatsdf=newSimpleDateFormat("yyyy-MM-dd HH:mm:ss"); privateRabbitTemplaterabbitTemplate; /**** 延迟消息队列信息* @param routingKeyName* @param msg* @param string */publicvoidsendMsgtoFound(StringexchangeName , Stringmsg) { LOGGER.info("消息发送成功,msg:{},时间:{}",msg,sdf.format(newDate())); rabbitTemplate.convertAndSend(exchangeName , msg); } }
RMQReceiver.java rmqp消费端
/**** 消费者* @author Administrator**/publicclassRMQReceiverimplementsChannelAwareMessageListener{ privatefinalLoggerlogger=LoggerFactory.getLogger(this.getClass()); queues=RabbitConfig.QUEUE_Order ) (publicvoidhandlerOrder(Messagemessage, Channelchannel) throwsIOException { logger.info("接收处理队列QUEUE_Order当中的消息: "+newString(message.getBody()) ); longdeliveryTag=message.getMessageProperties().getDeliveryTag(); channel.basicAck(deliveryTag, false); // 采用手动应答模式, 手动确认应答更为安全稳定 } queues=RabbitConfig.QUEUE_Pay ) (publicvoidhandlerPay(Messagemessage, Channelchannel) throwsIOException { logger.info("接收处理队列QUEUE_Pay当中的消息: "+newString(message.getBody()) ); longdeliveryTag=message.getMessageProperties().getDeliveryTag(); channel.basicAck(deliveryTag, false); // 采用手动应答模式, 手动确认应答更为安全稳定 } /*** @param trim* @return*/privateMap<String, String>mapStringToMap(Stringstr) { str=str.substring(1, str.length() -1); String[] strs=str.split(","); Map<String, String>map=newHashMap<String, String>(); for (Stringstring : strs) { Stringkey=string.split("=")[0].trim(); Stringvalue=string.split("=")[1]; map.put(key, value); } returnmap; } }
3.2.3使用交换机
topicExchange: 通配符方式分发消息-(订阅)
四、解决
4.1 工具安装
guest/guest登录失败如何解决:
解决办法:执行如下命令
命令1:rabbitmqctl set_user_tags guest administrator
命令2:rabbitmqctl set_permissions -p / guest '.*' '.*' '.*'
重启rabbitmq即可。
重启服务:
------------------------ 我是愉快的分割线 -----------------------------
停止:service rabbitmq-server stop 启动:service rabbitmq-server start 查看状态:service rabbitmq-server status
4.2什么时间让它消费
什么时间手动消费(手动消费:不消费永远都在rmqp中保留)
操作:目前的得到的方案是:将消费端的监听事件关闭,不用监听,则这样的消息会永远停留在rmqp的交换机-队列-路由中
/**** 消费者* @author Administrator**/publicclassRMQReceiverimplementsChannelAwareMessageListener{ privatefinalLoggerlogger=LoggerFactory.getLogger(this.getClass()); //@RabbitListener(queues = RabbitConfig.QUEUE_Member )//@RabbitHandlerpublicvoidhandler(Messagemessage, Channelchannel) throwsIOException { logger.info("接收处理队列Member当中的消息: "+newString(message.getBody()) ); longdeliveryTag=message.getMessageProperties().getDeliveryTag(); // channel.basicAck(deliveryTag, false); // 采用手动应答模式, 手动确认应答更为安全稳定 } }
如下图:
五、邮件发送
5.1QQ邮箱授权码获取
5.2邮箱配置
pom.xml<!--Mail--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-mail</artifactId></dependency>
spring.xml--》配置邮箱########Mail配置########################QQsmtp.qq.com##sinasmtp.sina.cn##aliyunsmtp.aliyun.com##163smtp.163.com#126邮箱SMTP服务器地址:smtp.126.com,端口号:465或者994#163邮箱SMTP服务器地址:smtp.163.com,端口号:465或者994#yeah邮箱SMTP服务器地址:smtp.yeah.net,端口号:465或者994##发送方spring.mail.host=smtp.qq.com##邮件地址spring.mail.from=1247622527 .com#用户名spring.mail.username=1247622527 .com##客户端授权码(不是邮箱密码,这个在qq邮箱设置里面自动生成的)spring.mail.password=----------------》》》邮箱的授权码#端口号465或587spring.properties.mail.smtp.port: 25##编码格式spring.mail.default-encoding=UTF-8spring.mail.properties.mail.smtp.auth=truespring.mail.properties.mail.smtp.starttls.enable=truespring.mail.properties.mail.smtp.starttls.required=truespring.mail.properties.mail.smtp.ssl.enable=true
sendMailUtil.java
publicclassSendMailUtil { privatestaticfinalLoggerLOGGER=LogManager.getLogger(SendMailUtil.class); privateJavaMailSendermailSender; //发送方邮件的发送地址"${spring.mail.host}") (publicstaticStringsendMailHost; //发送方发送邮件的账号"${spring.mail.username}") (publicstaticStringsendMailUsername; //发送方发送邮件的客户端授权码"${pring.mail.password}") (publicstaticStringsendMailPassword; //发送方发送邮件的端口"${spring.properties.mail.smtp.port}") (publicstaticStringsendMailPort; "${spring.mail.from") (publicstaticStringsendMailFrom; publicstaticvoidsendSimpleMail(Stringto, Stringsubject, Stringcontent) throwsException{ //创建连接对象 连接到邮件服务器Propertiesproperties=newProperties(); //设置发送邮件的基本参数//发送邮件服务器properties.put("mail.smtp.host", sendMailHost); //发送端口properties.put("mail.smtp.port", sendMailPort); properties.put("mail.smtp.auth", "true"); //设置发送邮件的账号和密码Sessionsession=Session.getInstance(properties, newAuthenticator() { protectedPasswordAuthenticationgetPasswordAuthentication() { //两个参数分别是发送邮件的账户和密码returnnewPasswordAuthentication(sendMailUsername,sendMailPassword); } }); //创建邮件对象Messagemessage=newMimeMessage(session); //设置发件人message.setFrom(newInternetAddress(sendMailUsername)); //设置收件人message.setRecipient(Message.RecipientType.TO,newInternetAddress(to)); //设置主题message.setSubject(subject); // 设置邮件内容message.setContent(content,"text/html;charset=UTF-8"); //发送一封邮件Transport.send(message); } publicvoidsendHtmlMail(Stringto, Stringsubject, Stringcontent) { //获取MimeMessage对象MimeMessagemessage=mailSender.createMimeMessage(); MimeMessageHelpermessageHelper; try { messageHelper=newMimeMessageHelper(message, true); //邮件发送人messageHelper.setFrom(sendMailFrom); //邮件接收人messageHelper.setTo(subject); //邮件主题message.setSubject(subject); //邮件内容,html格式messageHelper.setText(content, true); //发送mailSender.send(message); } catch (MessagingExceptione) { } } publicvoidsendAttachmentsMail(Stringto, Stringsubject, Stringcontent, StringfilePath) { MimeMessagemessage=mailSender.createMimeMessage(); try { MimeMessageHelperhelper=newMimeMessageHelper(message, true); helper.setFrom(sendMailFrom); helper.setTo(to); helper.setSubject(subject); helper.setText(content, true); FileSystemResourcefile=newFileSystemResource(newFile(filePath)); StringfileName=filePath.substring(filePath.lastIndexOf(File.separator)); helper.addAttachment(fileName, file); mailSender.send(message); } catch (MessagingExceptione) { //日志信息 } } publicstaticvoidmain(String[] args) throwsException { //sendSimpleMail("1901660505@qq.com","主题:邮箱注册","内容:这是一个邮件注册码,请输入:"+ IdGenerate.random2FiveId()) ;//System.exit(0); } }
业务代码.Java
publicObjectuserSendNewMailCode(StringmemberId , StringnewMemberEmail) { Map<String , Object>mapMemberInfo=newHashMap<String, Object>(); if(!"".equals(memberId) &&!"".equals(newMemberEmail)) { Membermember=memberService.getOnlyOneMemberInfoByMemberId(memberId); StringvalidEmail=IdGenerate.random2FiveId(); //邮箱登录注册后,可绑定更改新邮箱if (member!=null&&newMemberEmail.equals(member.getMemberEmail())) { try { SendMailUtils.sendSimpleMail(newMemberEmail,"主题:邮箱注册","内容:新邮箱账号:[ "+newMemberEmail+" ]未注册,可放心使用! \n 请将数字验证码:[ "+validEmail+" ] 填入邮箱注册码中!"); } catch (Exceptione) { e.printStackTrace(); mapMemberInfo.put("exception", e.getMessage()); } //邮箱未登录注册后,任意邮箱可绑定 }elseif(member!=null&&!newMemberEmail.equals(member.getMemberEmail())){ try { SendMailUtils.sendSimpleMail(newMemberEmail,"主题:邮箱注册","内容:邮箱账号:[ "+newMemberEmail+" ]未注册,可放心使用! \n 请将数字验证码:[ "+validEmail+" ] 填入邮箱注册码中!"); } catch (Exceptione) { e.printStackTrace(); mapMemberInfo.put("exception", e.getMessage()); } } List<Object>memberInfo=newArrayList<Object>(); mapMemberInfo.put("newMemberEmail", newMemberEmail); mapMemberInfo.put("validEmail", validEmail); memberInfo.add(mapMemberInfo); returnmapMemberInfo; } returnmapMemberInfo; }
转载声明:本文为博主原创文章,未经博主允许不得转载
⚠️注意 ~
💯本期内容就结束了,如果内容有误,麻烦大家评论区指出!
如有疑问❓可以在评论区留言💬或私信留言💬,尽我最大能力🏃♀️帮大家解决👨🏫!
如果我的文章有帮助到您,欢迎点赞+关注✔️鼓励博主🏃,您的鼓励是我分享的动力🏃🏃🏃~