pmq是信也科技开源的一款消息中间件,虽然没有RocketMQ和Kafka出名,但是里面的代码还是有值得我们学习的地方的。
pmq的源码里面很多套路还是值得学习的,说实话,这些都是可以用到项目里面的。下面的代码来源于pmq。
首先安装好maven、mysql,对下载下拉的包进行打包:
如果遇到时区问题,则可以调整时区问题。
1.MqBootstrapListener 观察者模式的使用
/*** mqBootstrap监听 实现 上下文监听 观察者模式*/publicclassMqBootstrapListenerimplementsApplicationListener<ContextRefreshedEvent>, Ordered { privatestaticfinalLoggerlog=LoggerFactory.getLogger(MqBootstrapListener.class); privatestaticbooleanisInit=false; privateReportServicereportService; publicintgetOrder() { // TODO Auto-generated method stubreturnOrdered.LOWEST_PRECEDENCE; } //使用观察者模式重写onApplicationEvent方法publicvoidonApplicationEvent(ContextRefreshedEventevent) { //判断是否初始化了,如果初始化,则启动定时任务if (!isInit) { try { startTimer(); startPortalTimer(); //注册reportreportService.registerReport(); //将其进行初始化isInit=true; log.info("mq初始化成功!"); } catch (Exceptione) { log.error("mq初始化异常", e); throwe; } } } //启动portal定时任务privatevoidstartPortalTimer() { Map<String, PortalTimerService>startedServices=SpringUtil.getBeans(PortalTimerService.class); if (startedServices!=null) { startedServices.entrySet().forEach(t1-> { try { t1.getValue().startPortal(); log.info(t1.getKey() +"启动完成!"); } catch (Exceptione) { log.error(t1.getKey() +"启动异常!", e); } }); } } //启动定时任务privatevoidstartTimer() { Map<String, TimerService>startedServices=SpringUtil.getBeans(TimerService.class); if (startedServices!=null) { startedServices.entrySet().forEach(t1-> { try { t1.getValue().start(); log.info(t1.getKey() +"启动完成!"); } catch (Exceptione) { log.error(t1.getKey() +"启动异常!", e); } }); } } }
观察者模式,通常我们进行配置或者将token放入到配置里面的时候,可以使用,而这样可以实时更新。
2.CAS的使用和volitale的使用,使用volitale的时候会考虑到读写屏障。使用@PostConstruct后置构造,这个类似于实现实现InitializingBean,重写AfterPropertiesSet。当然也可以基于ApplicationEvent实现,观察者模式。或者使用后置处理器。
/*** 数据report服务*/publicclassDbReportService { //消息服务privateMessage01Servicemessage01Service; //数据节点服务privateDbNodeServicedbNodeService; //原子对象 mapprivateAtomicReference<Map<String, Integer>>conMapRef=newAtomicReference<Map<String, Integer>>( newHashMap<>()); //线程池privateExecutorServiceexecutorService=Executors .newSingleThreadExecutor(SoaThreadFactory.create("DbReportService", true)); //运行为trueprivatevolatilebooleanisRunning=true; //度量指标privatevolatileMap<String, Boolean>metricMap=newConcurrentHashMap<>(); //启动标识privateAtomicBooleanstartFlag=newAtomicBoolean(false); //后置构造注解publicvoidreport() { //使用cas进行比较if (startFlag.compareAndSet(false, true)) { //线程池服务提交服务executorService.submit(newRunnable() { publicvoidrun() { while (isRunning) { Map<Long, DbNodeEntity>dbNodeMap=dbNodeService.getCache(); Map<String, DbNodeEntity>dataSourceMap=newHashMap<>(); Map<String, Integer>conMap=newHashMap<>(); try { for (longdbId : dbNodeMap.keySet()) { if (!dataSourceMap.containsKey(dbNodeMap.get(dbId).getIp())) { dataSourceMap.put(dbNodeMap.get(dbId).getIp(), dbNodeMap.get(dbId)); } } for (Stringip : dataSourceMap.keySet()) { message01Service.setDbId(dataSourceMap.get(ip).getId()); //连接计数intconCount=message01Service.getConnectionsCount(); conMap.put(ip, conCount); if (!metricMap.containsKey(ip)) { //System.out.println(ip);metricMap.put(ip, true); //度量单例 获取度量注册MetricSingleton.getMetricRegistry().register("mq.ip.con.count?ip="+ip, newGauge<Integer>() { publicIntegergetValue() { if (conMapRef.get().containsKey(ip)) { returnconMapRef.get().get(ip); } else { return0; } } }); } } conMapRef.set(conMap); } catch (Exceptione) { // TODO: handle exception } Util.sleep(10000); } } }); } } //销毁前将运行状态变成false,同时将线程池关闭privatevoidclose() { isRunning=false; executorService.shutdown(); } }
3.使用过滤器:实现filer接口,同时重写init方法、doFilter方法、destroy。通常会在doFilter进行逻辑处理。以前我写过一个url的拼接,基于CAS的重定向url拼接,当时也是基于filter实现的,当时也是在doFilter中实现的。
/*** 权限过滤器*/1) (filterName="WebAuthFilter", urlPatterns="/*") (publicclassAuthFilterimplementsFilter { Loggerlog=LoggerFactory.getLogger(this.getClass().getName()); privateMessage01Servicemessage01Service; UserInfoHolderuserInfoHolder; publicvoidinit(FilterConfigfilterConfig) throwsServletException { } publicvoiddoFilter(ServletRequestreq, ServletResponseresp, FilterChainchain) throwsIOException, ServletException { HttpServletRequestrequest= (HttpServletRequest) req; HttpServletResponseresponse= (HttpServletResponse) resp; Stringuri=request.getRequestURI(); if (skipUri(uri)) { chain.doFilter(request, response); } else { try { Cookiecookie=CookieUtil.getCookie(request, "userSessionId"); if (cookie==null) { response.sendRedirect("/login"); } else { StringuserId=DesUtil.decrypt(cookie.getValue()); userInfoHolder.setUserId(userId); chain.doFilter(request, response); } } catch (Exceptione) { log.error("login fail", e); response.sendRedirect("/login"); } finally { message01Service.clearDbId(); userInfoHolder.clear(); } } } privateList<String>skipUrlLst=newArrayList<>(); publicAuthFilter() { skipUrlLst=Arrays.asList("/login", ".js", ".css", ".jpg", ".woff", ".png", "/auth" ,"/cat","/hs","/message/getByTopic"); } privatebooleanskipUri(Stringuri) { for(Stringt : skipUrlLst){ if(uri.indexOf(t)!=-1){ returntrue; } } returnfalse; } publicvoiddestroy() { // TODO Auto-generated method stub } }
4.获取用户信息,使用threadLocal
/*** 默认用户信息holder*/publicclassDefaultUserInfoHolderimplementsUserInfoHolder { privateUserProviderServiceuserProviderService; //使用threadLocal,用户idLocalprivateThreadLocal<String>userIdLocal=newThreadLocal<>(); //获取用户信息publicUserInfogetUser() { StringuserId=userIdLocal.get(); Map<String, UserInfo>mapUser=userProviderService.getUsers(); if (mapUser.containsKey(userId)) { returnmapUser.get(userId); } returnnull; } //获取用户idpublicStringgetUserId() { returnuserIdLocal.get(); } //设置用户idpublicvoidsetUserId(StringuserId) { userIdLocal.set(userId); } //执行清理操作Threadpublicvoidclear() { userIdLocal.remove(); } }
这个在我们项目里面也是这样使用的,这里可以进行改进,可以将token放入到用户信息中去,使用观察者模式。
5.cookie工具类:
/*** cookie工具类*/publicclassCookieUtil { //获取用户名称publicstaticStringgetUserName(HttpServletRequestrequest){ //获取一个cookie数组Cookie[] cookies=request.getCookies(); StringuserName=""; if (cookies!=null) { //获取cookieCookiecookie=getCookie(request, "userSessionId"); if (cookie==null) { return""; } try { //返回对数据进行DES加密的数据returnDesUtil.decrypt(cookie.getValue()); } catch (Exceptione) { return""; } } returnuserName; } //获取cookiepublicstaticCookiegetCookie(HttpServletRequestrequest, Stringkey) { Cookie[] cks=request.getCookies(); if (cks!=null) { for (Cookietemp : cks) { if (temp.getName().equals(key)) returntemp; } } returnnull; } }
常用的获取cooike的工具类,够实用。当然里面还有很多值得学习的地方,慢慢学习吧!