pmq学习一-一些典型的使用和套路

简介: pmq是信也科技开源的一款消息中间件,虽然没有RocketMQ和Kafka出名,但是里面的代码还是有值得我们学习的地方的。pmq的源码里面很多套路还是值得学习的,说实话,这些都是可以用到项目里面的。下面的代码来源于pmq。首先安装好maven、mysql,对下载下拉的包进行打包:如果遇到时区问题,则可以调整时区问题。1.MqBootstrapListener 观察者模式的使用

pmq是信也科技开源的一款消息中间件,虽然没有RocketMQ和Kafka出名,但是里面的代码还是有值得我们学习的地方的。

pmq的源码里面很多套路还是值得学习的,说实话,这些都是可以用到项目里面的。下面的代码来源于pmq。

首先安装好maven、mysql,对下载下拉的包进行打包:

如果遇到时区问题,则可以调整时区问题。

1.MqBootstrapListener 观察者模式的使用

/*** mqBootstrap监听 实现 上下文监听 观察者模式*/@ComponentpublicclassMqBootstrapListenerimplementsApplicationListener<ContextRefreshedEvent>, Ordered {
privatestaticfinalLoggerlog=LoggerFactory.getLogger(MqBootstrapListener.class);
privatestaticbooleanisInit=false;
@AutowiredprivateReportServicereportService;
@OverridepublicintgetOrder() {
// TODO Auto-generated method stubreturnOrdered.LOWEST_PRECEDENCE;
   }
//使用观察者模式重写onApplicationEvent方法@OverridepublicvoidonApplicationEvent(ContextRefreshedEventevent) {
//判断是否初始化了,如果初始化,则启动定时任务if (!isInit) {
try {
startTimer();
startPortalTimer();
//注册reportreportService.registerReport();
//将其进行初始化isInit=true;
log.info("mq初始化成功!");
         } catch (Exceptione) {
log.error("mq初始化异常", e);
throwe;
         }
      }
   }
//启动portal定时任务privatevoidstartPortalTimer() {
Map<String, PortalTimerService>startedServices=SpringUtil.getBeans(PortalTimerService.class);
if (startedServices!=null) {
startedServices.entrySet().forEach(t1-> {
try {
t1.getValue().startPortal();
log.info(t1.getKey() +"启动完成!");
            } catch (Exceptione) {
log.error(t1.getKey() +"启动异常!", e);
            }
         });
      }
   }
//启动定时任务privatevoidstartTimer() {
Map<String, TimerService>startedServices=SpringUtil.getBeans(TimerService.class);
if (startedServices!=null) {
startedServices.entrySet().forEach(t1-> {
try {
t1.getValue().start();
log.info(t1.getKey() +"启动完成!");
            } catch (Exceptione) {
log.error(t1.getKey() +"启动异常!", e);
            }
         });
      }
   }
}

观察者模式,通常我们进行配置或者将token放入到配置里面的时候,可以使用,而这样可以实时更新。

2.CAS的使用和volitale的使用,使用volitale的时候会考虑到读写屏障。使用@PostConstruct后置构造,这个类似于实现实现InitializingBean,重写AfterPropertiesSet。当然也可以基于ApplicationEvent实现,观察者模式。或者使用后置处理器。

/*** 数据report服务*/@ComponentpublicclassDbReportService {
//消息服务@AutowiredprivateMessage01Servicemessage01Service;
//数据节点服务@AutowiredprivateDbNodeServicedbNodeService;
//原子对象 mapprivateAtomicReference<Map<String, Integer>>conMapRef=newAtomicReference<Map<String, Integer>>(
newHashMap<>());
//线程池privateExecutorServiceexecutorService=Executors         .newSingleThreadExecutor(SoaThreadFactory.create("DbReportService", true));
//运行为trueprivatevolatilebooleanisRunning=true;
//度量指标privatevolatileMap<String, Boolean>metricMap=newConcurrentHashMap<>();
//启动标识privateAtomicBooleanstartFlag=newAtomicBoolean(false);
//后置构造注解@PostConstructpublicvoidreport() {
//使用cas进行比较if (startFlag.compareAndSet(false, true)) {
//线程池服务提交服务executorService.submit(newRunnable() {
@Overridepublicvoidrun() {
while (isRunning) {
Map<Long, DbNodeEntity>dbNodeMap=dbNodeService.getCache();
Map<String, DbNodeEntity>dataSourceMap=newHashMap<>();
Map<String, Integer>conMap=newHashMap<>();
try {
for (longdbId : dbNodeMap.keySet()) {
if (!dataSourceMap.containsKey(dbNodeMap.get(dbId).getIp())) {
dataSourceMap.put(dbNodeMap.get(dbId).getIp(), dbNodeMap.get(dbId));
                        }
                     }
for (Stringip : dataSourceMap.keySet()) {
message01Service.setDbId(dataSourceMap.get(ip).getId());
//连接计数intconCount=message01Service.getConnectionsCount();
conMap.put(ip, conCount);
if (!metricMap.containsKey(ip)) {
//System.out.println(ip);metricMap.put(ip, true);
//度量单例 获取度量注册MetricSingleton.getMetricRegistry().register("mq.ip.con.count?ip="+ip,
newGauge<Integer>() {
@OverridepublicIntegergetValue() {
if (conMapRef.get().containsKey(ip)) {
returnconMapRef.get().get(ip);
                                       } else {
return0;
                                       }
                                    }
                                 });
                        }
                     }
conMapRef.set(conMap);
                  } catch (Exceptione) {
// TODO: handle exception                  }
Util.sleep(10000);
               }
            }
         });
      }
   }
//销毁前将运行状态变成false,同时将线程池关闭@PreDestroyprivatevoidclose() {
isRunning=false;
executorService.shutdown();
   }
}

3.使用过滤器:实现filer接口,同时重写init方法、doFilter方法、destroy。通常会在doFilter进行逻辑处理。以前我写过一个url的拼接,基于CAS的重定向url拼接,当时也是基于filter实现的,当时也是在doFilter中实现的。

/*** 权限过滤器*/@Order(1)
@WebFilter(filterName="WebAuthFilter", urlPatterns="/*")
publicclassAuthFilterimplementsFilter {
Loggerlog=LoggerFactory.getLogger(this.getClass().getName());
@AutowiredprivateMessage01Servicemessage01Service;
@AutowiredUserInfoHolderuserInfoHolder;
@Overridepublicvoidinit(FilterConfigfilterConfig) throwsServletException {
   }
@OverridepublicvoiddoFilter(ServletRequestreq, ServletResponseresp, FilterChainchain)
throwsIOException, ServletException {
HttpServletRequestrequest= (HttpServletRequest) req;
HttpServletResponseresponse= (HttpServletResponse) resp;
Stringuri=request.getRequestURI();
if (skipUri(uri)) {
chain.doFilter(request, response);
      } else {
try {
Cookiecookie=CookieUtil.getCookie(request, "userSessionId");
if (cookie==null) {
response.sendRedirect("/login");
            } else {
StringuserId=DesUtil.decrypt(cookie.getValue());
userInfoHolder.setUserId(userId);
chain.doFilter(request, response);
            }
         } catch (Exceptione) {
log.error("login fail", e);
response.sendRedirect("/login");
         } finally {
message01Service.clearDbId();
userInfoHolder.clear();
         }
      }
   }
privateList<String>skipUrlLst=newArrayList<>();
publicAuthFilter() {     
skipUrlLst=Arrays.asList("/login", ".js", ".css", ".jpg", ".woff", ".png", "/auth" ,"/cat","/hs","/message/getByTopic");
   }
privatebooleanskipUri(Stringuri) {
for(Stringt : skipUrlLst){
if(uri.indexOf(t)!=-1){
returntrue;
         }
      }
returnfalse;
   }
@Overridepublicvoiddestroy() {
// TODO Auto-generated method stub   }
}

4.获取用户信息,使用threadLocal

/*** 默认用户信息holder*/@ServicepublicclassDefaultUserInfoHolderimplementsUserInfoHolder {
@AutowiredprivateUserProviderServiceuserProviderService;
//使用threadLocal,用户idLocalprivateThreadLocal<String>userIdLocal=newThreadLocal<>();
//获取用户信息@OverridepublicUserInfogetUser() {
StringuserId=userIdLocal.get();
Map<String, UserInfo>mapUser=userProviderService.getUsers();
if (mapUser.containsKey(userId)) {
returnmapUser.get(userId);
      }
returnnull;
   }
//获取用户id@OverridepublicStringgetUserId() {
returnuserIdLocal.get();
   }
//设置用户id@OverridepublicvoidsetUserId(StringuserId) {
userIdLocal.set(userId);
   }
//执行清理操作Thread@Overridepublicvoidclear() {
userIdLocal.remove();
   }
}

这个在我们项目里面也是这样使用的,这里可以进行改进,可以将token放入到用户信息中去,使用观察者模式。

5.cookie工具类:

/*** cookie工具类*/publicclassCookieUtil {
//获取用户名称publicstaticStringgetUserName(HttpServletRequestrequest){
//获取一个cookie数组Cookie[] cookies=request.getCookies();
StringuserName="";
if (cookies!=null) {
//获取cookieCookiecookie=getCookie(request, "userSessionId");
if (cookie==null) {
return"";
            }
try {
//返回对数据进行DES加密的数据returnDesUtil.decrypt(cookie.getValue());
            } catch (Exceptione) {
return"";
            }
        }
returnuserName;
    }
//获取cookiepublicstaticCookiegetCookie(HttpServletRequestrequest, Stringkey) {
Cookie[] cks=request.getCookies();
if (cks!=null) {
for (Cookietemp : cks) {
if (temp.getName().equals(key))
returntemp;
            }
        }
returnnull;
    }
}

常用的获取cooike的工具类,够实用。当然里面还有很多值得学习的地方,慢慢学习吧!

相关实践学习
快速体验阿里云云消息队列RocketMQ版
本实验将带您快速体验使用云消息队列RocketMQ版Serverless系列实例进行获取接入点、创建Topic、创建订阅组、收发消息、查看消息轨迹和仪表盘。
消息队列 MNS 入门课程
1、消息队列MNS简介 本节课介绍消息队列的MNS的基础概念 2、消息队列MNS特性 本节课介绍消息队列的MNS的主要特性 3、MNS的最佳实践及场景应用 本节课介绍消息队列的MNS的最佳实践及场景应用案例 4、手把手系列:消息队列MNS实操讲 本节课介绍消息队列的MNS的实际操作演示 5、动手实验:基于MNS,0基础轻松构建 Web Client 本节课带您一起基于MNS,0基础轻松构建 Web Client
目录
相关文章
|
安全 编译器 C++
【C/C++ 类型转换规则】一文了解C/C++ 中的类型转换规则,帮助你更好的编程
【C/C++ 类型转换规则】一文了解C/C++ 中的类型转换规则,帮助你更好的编程
392 0
|
机器人 jenkins Java
jenkins pipeline流水线集成jacoco,sonar,robot framework,jmeter,fortify
jenkins pipeline流水线集成jacoco,sonar,robot framework,jmeter,fortify
1212 0
jenkins pipeline流水线集成jacoco,sonar,robot framework,jmeter,fortify
|
搜索推荐 Java jenkins
sonar整合阿里java规范开发历程
sonar整合阿里java规范开发历程
|
3月前
|
缓存 编译器 Shell
【实战指南】 CMake搭建编译环境总结
本文总结了使用CMake搭建编译环境的技巧,涵盖单个及多个源文件的编译、CMakeLists嵌套管理、变量设置、交叉编译配置、常用编译选项及警告处理等内容。通过实例说明了如何高效组织工程结构,并利用CMake灵活控制编译流程,适用于嵌入式开发场景。
495 39
|
9月前
|
存储 监控 算法
Java JVM 面试题
Java JVM(虚拟机)相关基础面试题
|
jenkins Java 持续交付
jenkins学习笔记之十四:SonarSQube项目管理
jenkins学习笔记之十四:SonarSQube项目管理
|
消息中间件 存储 缓存
阿里一面,说说你知道消息中间件的应用场景有哪些?
阿里一面,说说你知道消息中间件的应用场景有哪些?
350 90
阿里一面,说说你知道消息中间件的应用场景有哪些?
|
机器学习/深度学习 数据可视化 前端开发
【Python机器学习专栏】机器学习模型评估的实用方法
【4月更文挑战第30天】本文介绍了机器学习模型评估的关键方法,包括评估指标(如准确率、精确率、召回率、F1分数、MSE、RMSE、MAE及ROC曲线)和交叉验证技术(如K折交叉验证、留一交叉验证、自助法)。混淆矩阵提供了一种可视化分类模型性能的方式,而Python的scikit-learn库则方便实现这些评估。选择适合的指标和验证方法能有效优化模型性能。
553 0
|
Java 测试技术 Maven
《手把手教你》系列基础篇之(二)-java+ selenium自动化测试-环境搭建(下)基于Maven(详细教程)
【2月更文挑战第11天】《手把手教你》系列基础篇之(二)-java+ selenium自动化测试-环境搭建(下)基于Maven(详细教程) 是一个软件项目管理和综合工具。基于项目对象模型(POM)的概念,Maven可以从一个中心资料片管理项目构建,报告和文件。由于现在企业和公司中Java的大部分项目都是基于Maven, 因此宏哥为了照顾到企业或者公司用的java项目中用到maven的童鞋或者小伙伴们,这里也简单的介绍和分享一下。在Maven项目中使用Selenium. 非常简单。
546 3
|
存储 运维 Oracle
选择Percona Server、MariaDB还是MYSQL|学习笔记
快速学习选择Percona Server、MariaDB还是MYSQL