作业目标
- 掌握Docker对于RabbitMQ的安装、部署、运行
- 掌握RabbitMQ在工作中的场景引入、使用、优化
- 掌握开源API的实际场景接入(此案例以短信通知演练)
- 作为基础篇的最后一天,作业难度较前几天降低了一些,预留一点buffer复习近4天课程内容
工程介绍
技术架构图
物理部署图
环境准备
- 部署作业-原始作业中的工程:doctor-station(📎doctor-station.zip),其中SQL: 📎doctor_station.sql
题目一
目标
掌握RabbitMQ在Docker中的部署
提示
- 参照今天课程内容
参考答案
- 导入rabbitmq镜像
- 加载镜像:docker load -i mq.tar
- 启动运行
docker run \ -e RABBITMQ_DEFAULT_USER=itcast \ -e RABBITMQ_DEFAULT_PASS=123321 \ -v mq-plugins:/plugins \ --name mq \ --hostname mq \ -p 15672:15672 \ -p 5672:5672 \ -d \ rabbitmq:3-management
题目二
针对目前的工程,当医嘱创建成功之后,需要短信通知用户,请你完成
- 医嘱创建成功后,消息发送
- 交换机:doctor.topic,需手动创建一下
- 队列:create.success
- routingKey:success
- 消息体(Map结构):
- 手机号:mobile(String)
- 短信内容:content(String)
- notify-service消息消费,打印输出消息内容
- 查看消息发送、消费状态
目标
掌握RabbitMQ在实际工作场景中的代码应用
提示
- 父pom引入依赖(spring-boot-starter-amqp、jackson-databind)
- 增加rabbitmq配置,其中notify作为新工程还需要创建启动类
- 借助Rabbittemplate完成消息发送,采用发布-订阅模式中的默认模式
- 借助@RabbitListener实现消息接收,消费
参考答案
- 依赖引入(doctor-service、notify-service)
<dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency> <!--AMQP依赖,包含RabbitMQ--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> <version>2.3.9.RELEASE</version> </dependency>
- 配置修改
- doctor-service追加
spring: rabbitmq: host: 192.168.206.129 # 主机名(注意IP修改成自己的) port: 5672 # 端口 virtual-host: / # 虚拟主机 username: itcast # 用户名 password: 123321 # 密码
- notify-service新增
server: port: 8089 spring: application: name: notifyservice rabbitmq: host: 192.168.206.129 # 主机名 port: 5672 # 端口 virtual-host: / # 虚拟主机 username: itcast # 用户名 password: 123321 # 密码 logging: level: cn.itcast: debug pattern: dateformat: MM-dd HH:mm:ss:SSS
- 发送代码
package cn.itcast.service; import cn.itcast.config.DoctorServerConfig; import cn.itcast.config.InventoryServerConfig; import cn.itcast.entity.Doctor; import cn.itcast.feign.inventory.client.InventoryClient; import cn.itcast.feign.inventory.response.InventoryUpdateVO; import cn.itcast.mapper.DoctorMapper; import cn.itcast.web.request.DoctorCreateParam; import cn.itcast.web.response.DoctorCreateVO; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.time.LocalDateTime; import java.time.LocalTime; import java.time.ZoneId; import java.util.Date; import java.util.HashMap; import java.util.Map; import java.util.Objects; /** * @Description: 医生核心逻辑处理层 * @Date: 2023/1/30 14:41 */ @Service public class DoctorService { @Autowired private DoctorMapper doctorMapper; @Autowired private InventoryClient inventoryClient; @Autowired private DoctorServerConfig doctorServerConfig; @Autowired private RabbitTemplate rabbitTemplate; /** * 创建医嘱信息 */ public DoctorCreateVO createDoctorOrder(DoctorCreateParam param) { DoctorCreateVO result = new DoctorCreateVO(); // 0-合法性校验 validCreateCount(result); if (Boolean.FALSE.equals(result.getCreateResult())) { return result; } // 1-扣减库存(调用库存中心) InventoryUpdateVO inventory = inventoryClient.updateInventory(param.getInventoryId(), param.getDoctorInventoryNum()); // 1.1-库存存在性校验 if (Objects.isNull(inventory)) { result.setCreateResult(Boolean.FALSE); result.setCreateMsg("创建失败,库存不存在"); return result; } // 1.2-库存更新成功校验 if (Boolean.FALSE.equals(inventory.getUpdateResult())) { result.setCreateResult(Boolean.FALSE); result.setCreateMsg(inventory.getUpdateMsg()); return result; } // 2-插入医嘱信息 Doctor doctor = new Doctor(); doctor.setDoctorName(param.doctorName); doctor.setInventoryId(param.getInventoryId()); doctor.setDoctorInventoryNum(param.getDoctorInventoryNum()); int count = doctorMapper.insert(doctor); // 3-返回 if (count == 0) { result.setCreateResult(Boolean.FALSE); result.setCreateMsg("创建失败,请稍后重试"); return result; } // 4-短信通知 sendCreateSuccessMsg(); result.setCreateResult(Boolean.TRUE); result.setCreateMsg("创建成功"); return result; } /** * 校验创建次数 * @param result */ private DoctorCreateVO validCreateCount(DoctorCreateVO result) { QueryWrapper query = new QueryWrapper<>(); query.ge("create_time", getStartOfDay()); query.le("create_time", getEndOfDay()); Integer todayCount = doctorMapper.selectCount(query); int createCount = doctorServerConfig.getCreateCount(); if (todayCount >= createCount) { result.setCreateResult(Boolean.FALSE); result.setCreateMsg("今日开单次数已达上限"); return result; } result.setCreateResult(Boolean.TRUE); result.setCreateMsg("校验通过"); return result; } // 获得某天最大时间 2017-10-15 23:59:59 public static Date getEndOfDay() { LocalDateTime localDateTime = LocalDateTime.now(); LocalDateTime endOfDay = localDateTime.with(LocalTime.MAX); return Date.from(endOfDay.atZone(ZoneId.systemDefault()).toInstant()); } // 获得某天最小时间 2017-10-15 00:00:00 public static Date getStartOfDay() { LocalDateTime localDateTime = LocalDateTime.now(); LocalDateTime startOfDay = localDateTime.with(LocalTime.MIN); return Date.from(startOfDay.atZone(ZoneId.systemDefault()).toInstant()); } /** * 发送创建成功MQ */ public void sendCreateSuccessMsg() { Map<String, Object> message = new HashMap<>(6); message.put("mobile", "17600477102"); message.put("content", "医嘱创建成功,请及时完成付款"); rabbitTemplate.convertAndSend("doctor.topic","success", message); } }
- 消费代码
package it.cast.notify; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.util.Map; /** * @Description: * @Date: 2023/3/1 11:24 */ @Component public class Consumer { @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "create.success"), exchange = @Exchange(name = "doctor.topic", type = ExchangeTypes.DIRECT), key = {"success"} )) public void sendCreateSuccessMsg(Map<String, String> message){ System.out.println("接收到消息:" + message); } }
- 启动工程、postman测试创建医嘱后MQ是否有
接收到消息:{mobile=17600477102, content=医嘱创建成功,请及时完成付款}
题目三
目标
完成第三方消息发送API的整合、接入,实际体验
提示
- 自行阅读API并在notify-service中接入,查看实际效果。API链接(链接),有其他合适的也可自行替换
- API接入使用指南:https://help.aliyun.com/document_detail/157953.html?spm=5176.product-detail.detail.2.68d86682uVkWmp
参考答案
- 参照上述链接,完成0元购
- 注意:上述截图中的AppCode后续需使用
- 其中的使用指南,也是一般对外对接的接口编写规范,可做了解和学习
- 点击:接口,跳转到API接入页面,或直接点击(链接)
- 完成自己的代码编写,替换原有的:sout
- appcode需替换成自己的
- HttpUtils需按照注释提示的下载
- pom文件需增加依赖
<dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.15</version> </dependency> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> <version>4.2.1</version> </dependency> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpcore</artifactId> <version>4.2.1</version> </dependency> <dependency> <groupId>commons-lang</groupId> <artifactId>commons-lang</artifactId> <version>2.6</version> </dependency> <dependency> <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-util</artifactId> <version>9.3.7.v20160115</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.5</version> <scope>test</scope> </dependency>
- httpUtils需手动新增
package it.cast.notify.tuil; import java.io.UnsupportedEncodingException; import java.net.URLEncoder; import java.security.KeyManagementException; import java.security.NoSuchAlgorithmException; import java.security.cert.X509Certificate; import java.util.ArrayList; import java.util.List; import java.util.Map; import javax.net.ssl.SSLContext; import javax.net.ssl.TrustManager; import javax.net.ssl.X509TrustManager; import org.apache.commons.lang.StringUtils; import org.apache.http.HttpResponse; import org.apache.http.NameValuePair; import org.apache.http.client.HttpClient; import org.apache.http.client.entity.UrlEncodedFormEntity; import org.apache.http.client.methods.HttpDelete; import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpPost; import org.apache.http.client.methods.HttpPut; import org.apache.http.conn.ClientConnectionManager; import org.apache.http.conn.scheme.Scheme; import org.apache.http.conn.scheme.SchemeRegistry; import org.apache.http.conn.ssl.SSLSocketFactory; import org.apache.http.entity.ByteArrayEntity; import org.apache.http.entity.StringEntity; import org.apache.http.impl.client.DefaultHttpClient; import org.apache.http.message.BasicNameValuePair; /** * @Description: * @Date: 2023/3/1 11:57 */ public class HttpUtils { /** * get * * @param host * @param path * @param method * @param headers * @param querys * @return * @throws Exception */ public static HttpResponse doGet(String host, String path, String method, Map<String, String> headers, Map<String, String> querys) throws Exception { HttpClient httpClient = wrapClient(host); HttpGet request = new HttpGet(buildUrl(host, path, querys)); for (Map.Entry<String, String> e : headers.entrySet()) { request.addHeader(e.getKey(), e.getValue()); } return httpClient.execute(request); } /** * post form * * @param host * @param path * @param method * @param headers * @param querys * @param bodys * @return * @throws Exception */ public static HttpResponse doPost(String host, String path, String method, Map<String, String> headers, Map<String, String> querys, Map<String, String> bodys) throws Exception { HttpClient httpClient = wrapClient(host); HttpPost request = new HttpPost(buildUrl(host, path, querys)); for (Map.Entry<String, String> e : headers.entrySet()) { request.addHeader(e.getKey(), e.getValue()); } if (bodys != null) { List<NameValuePair> nameValuePairList = new ArrayList<NameValuePair>(); for (String key : bodys.keySet()) { nameValuePairList.add(new BasicNameValuePair(key, bodys.get(key))); } UrlEncodedFormEntity formEntity = new UrlEncodedFormEntity(nameValuePairList, "utf-8"); formEntity.setContentType("application/x-www-form-urlencoded; charset=UTF-8"); request.setEntity(formEntity); } return httpClient.execute(request); } /** * Post String * * @param host * @param path * @param method * @param headers * @param querys * @param body * @return * @throws Exception */ public static HttpResponse doPost(String host, String path, String method, Map<String, String> headers, Map<String, String> querys, String body) throws Exception { HttpClient httpClient = wrapClient(host); HttpPost request = new HttpPost(buildUrl(host, path, querys)); for (Map.Entry<String, String> e : headers.entrySet()) { request.addHeader(e.getKey(), e.getValue()); } if (StringUtils.isNotBlank(body)) { request.setEntity(new StringEntity(body, "utf-8")); } return httpClient.execute(request); } /** * Post stream * * @param host * @param path * @param method * @param headers * @param querys * @param body * @return * @throws Exception */ public static HttpResponse doPost(String host, String path, String method, Map<String, String> headers, Map<String, String> querys, byte[] body) throws Exception { HttpClient httpClient = wrapClient(host); HttpPost request = new HttpPost(buildUrl(host, path, querys)); for (Map.Entry<String, String> e : headers.entrySet()) { request.addHeader(e.getKey(), e.getValue()); } if (body != null) { request.setEntity(new ByteArrayEntity(body)); } return httpClient.execute(request); } /** * Put String * @param host * @param path * @param method * @param headers * @param querys * @param body * @return * @throws Exception */ public static HttpResponse doPut(String host, String path, String method, Map<String, String> headers, Map<String, String> querys, String body) throws Exception { HttpClient httpClient = wrapClient(host); HttpPut request = new HttpPut(buildUrl(host, path, querys)); for (Map.Entry<String, String> e : headers.entrySet()) { request.addHeader(e.getKey(), e.getValue()); } if (StringUtils.isNotBlank(body)) { request.setEntity(new StringEntity(body, "utf-8")); } return httpClient.execute(request); } /** * Put stream * @param host * @param path * @param method * @param headers * @param querys * @param body * @return * @throws Exception */ public static HttpResponse doPut(String host, String path, String method, Map<String, String> headers, Map<String, String> querys, byte[] body) throws Exception { HttpClient httpClient = wrapClient(host); HttpPut request = new HttpPut(buildUrl(host, path, querys)); for (Map.Entry<String, String> e : headers.entrySet()) { request.addHeader(e.getKey(), e.getValue()); } if (body != null) { request.setEntity(new ByteArrayEntity(body)); } return httpClient.execute(request); } /** * Delete * * @param host * @param path * @param method * @param headers * @param querys * @return * @throws Exception */ public static HttpResponse doDelete(String host, String path, String method, Map<String, String> headers, Map<String, String> querys) throws Exception { HttpClient httpClient = wrapClient(host); HttpDelete request = new HttpDelete(buildUrl(host, path, querys)); for (Map.Entry<String, String> e : headers.entrySet()) { request.addHeader(e.getKey(), e.getValue()); } return httpClient.execute(request); } private static String buildUrl(String host, String path, Map<String, String> querys) throws UnsupportedEncodingException { StringBuilder sbUrl = new StringBuilder(); sbUrl.append(host); if (!StringUtils.isBlank(path)) { sbUrl.append(path); } if (null != querys) { StringBuilder sbQuery = new StringBuilder(); for (Map.Entry<String, String> query : querys.entrySet()) { if (0 < sbQuery.length()) { sbQuery.append("&"); } if (StringUtils.isBlank(query.getKey()) && !StringUtils.isBlank(query.getValue())) { sbQuery.append(query.getValue()); } if (!StringUtils.isBlank(query.getKey())) { sbQuery.append(query.getKey()); if (!StringUtils.isBlank(query.getValue())) { sbQuery.append("="); sbQuery.append(URLEncoder.encode(query.getValue(), "utf-8")); } } } if (0 < sbQuery.length()) { sbUrl.append("?").append(sbQuery); } } return sbUrl.toString(); } private static HttpClient wrapClient(String host) { HttpClient httpClient = new DefaultHttpClient(); if (host.startsWith("https://")) { sslClient(httpClient); } return httpClient; } private static void sslClient(HttpClient httpClient) { try { SSLContext ctx = SSLContext.getInstance("TLS"); X509TrustManager tm = new X509TrustManager() { public X509Certificate[] getAcceptedIssuers() { return null; } public void checkClientTrusted(X509Certificate[] xcs, String str) { } public void checkServerTrusted(X509Certificate[] xcs, String str) { } }; ctx.init(null, new TrustManager[] { tm }, null); SSLSocketFactory ssf = new SSLSocketFactory(ctx); ssf.setHostnameVerifier(SSLSocketFactory.ALLOW_ALL_HOSTNAME_VERIFIER); ClientConnectionManager ccm = httpClient.getConnectionManager(); SchemeRegistry registry = ccm.getSchemeRegistry(); registry.register(new Scheme("https", 443, ssf)); } catch (KeyManagementException ex) { throw new RuntimeException(ex); } catch (NoSuchAlgorithmException ex) { throw new RuntimeException(ex); } } }
- 重启notify,再次创建医嘱,查看是否能收到短信
- 如上述发送失败,也可以尝试阿里云付费的短信API:https://www.aliyun.com/product/sms?spm=5176.21213303.J_3207526240.38.8e1d53c9cqBbWS