服务端采用Springboot、JPA、Mysql为基本框架,同时接入了EMQ、JWT、微信认证、Lombook、的一些组件
一、框架搭建
使用IDEA创建项目,选择spring initializr 初始化SpringBoot项目,然后勾选JPA、Lombok、springweb的插件,一路next这样一个基本的springboot项目就搭建起来了。
二、EMQ接入
1. 引入JAR包
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-stream</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> </dependency>
2.application.yml添加配置文件
host-url的地址替换为我们部署的EMQ的地址端口号默认为1883
spring: mqtt: username: admin mqpassword: admin host-url: tcp://127.0.0.1:1883 client-id: server_client_${random.value} default-topic: $SYS/brokers/+/clients/# completionTimeout: 3000 keepAlive: 6
3.代码配置
3.1 MqttConfiguration类
用来处理订阅、和发布消息的工厂类。
package com.eric.etcloud.common.configs; import com.eric.etcloud.common.beans.MqttProperties; import com.eric.etcloud.common.mqtt.MqttEvent; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationEventPublisher; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.channel.DirectChannel; import org.springframework.integration.core.MessageProducer; import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; import org.springframework.messaging.MessagingException; /** * @author yangrui * @date 2020年5月14日 */ @Configuration @Slf4j public class MqttConfiguration { @Autowired private MqttProperties mqttProperties; /** * 事件触发 */ @Autowired private ApplicationEventPublisher eventPublisher; @Bean public MqttConnectOptions getMqttConnectOptions(){ MqttConnectOptions mqttConnectOptions=new MqttConnectOptions(); mqttConnectOptions.setUserName(mqttProperties.getUsername()); mqttConnectOptions.setPassword(mqttProperties.getMqpassword().toCharArray()); mqttConnectOptions.setServerURIs(new String[]{mqttProperties.getHostUrl()}); mqttConnectOptions.setKeepAliveInterval(2); mqttConnectOptions.setKeepAliveInterval(mqttProperties.getKeepAlive()); return mqttConnectOptions; } @Bean public MqttPahoClientFactory mqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); factory.setConnectionOptions(getMqttConnectOptions()); return factory; } @Bean public MessageChannel mqttInputChannel() { return new DirectChannel(); } /** * 配置client,监听的topic */ @Bean public MessageProducer inbound() { MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getClientId()+"_inbound", mqttClientFactory(), mqttProperties.getDefaultTopic().split(",")); adapter.setCompletionTimeout(Long.valueOf(mqttProperties.getCompletionTimeout())); adapter.setConverter(new DefaultPahoMessageConverter()); //默认添加TopicName中所有tipic adapter.addTopic("+/+/client"); adapter.addTopic("+/+/web"); adapter.setQos(2); adapter.setOutputChannel(mqttInputChannel()); return adapter; } @Bean @ServiceActivator(inputChannel = "mqttInputChannel") public MessageHandler handler() { return new MessageHandler() { @Override public void handleMessage(Message<?> message) throws MessagingException { System.out.println("收到了消息"); String topic = message.getHeaders().get("mqtt_receivedTopic").toString(); String qos = message.getHeaders().get("mqtt_receivedQos").toString(); //触发事件 这里不再做业务处理,包 listener中做处理 System.out.println(topic); eventPublisher.publishEvent(new MqttEvent(this,topic,message.getPayload().toString())); log.info("topic:"+topic+" Qos:"+qos+" message:"+message.getPayload()); } }; } /** * 发送消息和消费消息Channel可以使用相同MqttPahoClientFactory * * @return */ @Bean @ServiceActivator(inputChannel = "mqttOutboundChannel") public MessageHandler mqttOutbound() { // 在这里进行mqttOutboundChannel的相关设置 MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttProperties.getClientId(), mqttClientFactory()); // 如果设置成true,发送消息时将不会阻塞。 messageHandler.setAsync(true); messageHandler.setDefaultTopic(mqttProperties.getDefaultTopic()); return messageHandler; } @Bean public MessageChannel mqttOutboundChannel() { return new DirectChannel(); } }
3.2 JobListener
触发MqttConfiguration 中的event topic 事件,做消息通道的分别处理
package com.eric.etcloud.common.mqtt; import com.eric.etcloud.common.utils.CommonData; import com.eric.etcloud.entity.*; import com.eric.etcloud.service.*; import com.google.gson.Gson; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.event.EventListener; import org.springframework.stereotype.Component; import java.sql.Timestamp; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; /** * 触发event topic 事件 * @author yangrui * @date 2020年5月23日 */ @Slf4j @Component public class JobListener { @Autowired DeviceService deviceService; @Autowired ProductService productService; @Autowired MqttGateway gateway; @Autowired ClienthisService clienthisService; @Autowired NodehisService nodehisService; @Autowired TriggerService triggerService; @Autowired TriggerNodeService triggerNodeService; @Autowired WarnRecordService warnRecordService; @Autowired WarnInfoService warnInfoService; @Value("${platform.env}") String env; /** * 监听topic * @param mqttEvent */ @EventListener(condition = "#mqttEvent.topic.endsWith('client')") public void onEmqttCall1(MqttEvent mqttEvent){ String topic = mqttEvent.getTopic();eturn; } String[] arr = topic.split("/"); List<ModelDevice> modelDevices = deviceService.findBySn(arr[1]); if(modelDevices.size()!=1){ System.out.println("设备不存在"); return; } if(modelDevices.get(0).getIsNodeDb() == 1){ //存储日志 nodehisService.save(new ModelNodehis("",arr[1],topic,mqttEvent.getMessage(),new Timestamp(System.currentTimeMillis()))); } Gson gson = new Gson(); Map<String,Object> content = gson.fromJson(mqttEvent.getMessage(),Map.class); //触发器告警 List<ModelTrigger> triggers = triggerService.findByProductId(arr[0]); } /** * 监听topic * @param mqttEvent */ @EventListener(condition = "#mqttEvent.topic.startsWith('$SYS/brokers/')") public void onEmqttCall2(MqttEvent mqttEvent){ String topic = mqttEvent.getTopic(); Gson gson = new Gson(); EmqClient.ClientInfo clientInfo = gson.fromJson(mqttEvent.getMessage(), EmqClient.ClientInfo.class); List<ModelDevice> modelDevices = deviceService.findBySn(clientInfo.getClientid()); if(topic.endsWith("/connected")){ //自动注册 if(modelDevices.size() == 0){ ModelProduct product = productService.findById(clientInfo.getUsername()); deviceService.save(new ModelDevice(clientInfo.getUsername(),clientInfo.getClientid(),product.getEid(),product.getUserId(),new Timestamp(System.currentTimeMillis()))); //刷新前端设备树 Map<String,Object> map = new HashMap<>(); map.put("productId",clientInfo.getUsername()); map.put("sn",clientInfo.getClientid()); gateway.sendToMqtt(env + "server_to_web/refresh",gson.toJson(map)); } //存储连接日志 if(modelDevices.size() == 1 && modelDevices.get(0).getIsConnDb() == 1){ clienthisService.save(new ModelClienthis(clientInfo.getClientid(),clientInfo.getUsername(),new Timestamp(System.currentTimeMillis()),1)); } }else{ //存储连接日志 if(modelDevices.size() == 1 && modelDevices.get(0).getIsConnDb() == 1){ clienthisService.save(new ModelClienthis(clientInfo.getClientid(),clientInfo.getUsername(),new Timestamp(System.currentTimeMillis()),0)); } } } /** * 监听topic * @param mqttEvent */ @EventListener(condition = "#mqttEvent.topic.equals('device')") public void onEmqttCallT(MqttEvent mqttEvent){ log.info("接收到消11111111111:"+mqttEvent.getMessage()); } }
3.3 MqttEvent
topic事件类
package com.eric.etcloud.common.mqtt; import lombok.Getter; import lombok.Setter; import org.springframework.context.ApplicationEvent; /** * topic事件 * @author yangrui * @date 2020年5月23日 */ @Getter public class MqttEvent extends ApplicationEvent { /** * */ private String topic; /** * 发送的消息 */ private String message; public MqttEvent(Object source,String topic,String message) { super(source); this.topic = topic; this.message = message; } }
3.4 MqttGateway
消息发送接口
package com.eric.etcloud.common.mqtt; import org.springframework.integration.annotation.MessagingGateway; import org.springframework.integration.mqtt.support.MqttHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Component; @Component @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel") public interface MqttGateway { void sendToMqtt(String data); void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload); void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload); }
三、微信接入
1.引入JAR包
<dependency> <groupId>com.github.binarywang</groupId> <artifactId>weixin-java-miniapp</artifactId> <version>3.8.0</version> </dependency>
2.配置文件yml
wx: miniapp: configs: - appid: yourappid secret: youarsecret token: yourtoken aesKey: youraeskey msgDataFormat: JSO
3.代码集成
3.1 WxMaProperties
读取微信yml中的配置文件
package com.eric.etcloud.common.configs; import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; import java.util.List; /** * @author <a href="https://github.com/binarywang">Binary Wang</a> */ @Data @ConfigurationProperties(prefix = "wx.miniapp") public class WxMaProperties { private List<Config> configs; @Data public static class Config { /** * 设置微信小程序的appid */ private String appid; /** * 设置微信小程序的Secret */ private String secret; /** * 设置微信小程序消息服务器配置的token */ private String token; /** * 设置微信小程序消息服务器配置的EncodingAESKey */ private String aesKey; /** * 消息格式,XML或者JSON */ private String msgDataFormat; } }
3.2 WxMaConfiguration
微信配置中心
package com.eric.etcloud.common.configs; import cn.binarywang.wx.miniapp.api.WxMaService; import cn.binarywang.wx.miniapp.api.impl.WxMaServiceImpl; import cn.binarywang.wx.miniapp.bean.WxMaKefuMessage; import cn.binarywang.wx.miniapp.bean.WxMaTemplateData; import cn.binarywang.wx.miniapp.bean.WxMaTemplateMessage; import cn.binarywang.wx.miniapp.config.impl.WxMaDefaultConfigImpl; import cn.binarywang.wx.miniapp.message.WxMaMessageHandler; import cn.binarywang.wx.miniapp.message.WxMaMessageRouter; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import me.chanjar.weixin.common.bean.result.WxMediaUploadResult; import me.chanjar.weixin.common.error.WxErrorException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Configuration; import javax.annotation.PostConstruct; import java.io.File; import java.util.List; import java.util.Map; import java.util.stream.Collectors; /** * @author <a href="https://github.com/binarywang">Binary Wang</a> */ @Configuration @EnableConfigurationProperties(WxMaProperties.class) public class WxMaConfiguration { private WxMaProperties properties; private static Map<String, WxMaMessageRouter> routers = Maps.newHashMap(); private static Map<String, WxMaService> maServices = Maps.newHashMap(); @Autowired public WxMaConfiguration(WxMaProperties properties) { this.properties = properties; } public static WxMaService getMaService(String appid) { WxMaService wxService = maServices.get(appid); if (wxService == null) { throw new IllegalArgumentException(String.format("未找到对应appid=[%s]的配置,请核实!", appid)); } return wxService; } public static WxMaMessageRouter getRouter(String appid) { return routers.get(appid); } @PostConstruct public void init() { List<WxMaProperties.Config> configs = this.properties.getConfigs(); if (configs == null) { throw new RuntimeException("大哥,拜托先看下项目首页的说明(readme文件),添加下相关配置,注意别配错了!"); } maServices = configs.stream() .map(a -> { WxMaDefaultConfigImpl config = new WxMaDefaultConfigImpl(); config.setAppid(a.getAppid()); config.setSecret(a.getSecret()); config.setToken(a.getToken()); config.setAesKey(a.getAesKey()); config.setMsgDataFormat(a.getMsgDataFormat()); WxMaService service = new WxMaServiceImpl(); service.setWxMaConfig(config); routers.put(a.getAppid(), this.newRouter(service)); return service; }).collect(Collectors.toMap(s -> s.getWxMaConfig().getAppid(), a -> a)); } private WxMaMessageRouter newRouter(WxMaService service) { final WxMaMessageRouter router = new WxMaMessageRouter(service); router .rule().handler(logHandler).next() .rule().async(false).content("模板").handler(templateMsgHandler).end() .rule().async(false).content("文本").handler(textHandler).end() .rule().async(false).content("图片").handler(picHandler).end() .rule().async(false).content("二维码").handler(qrcodeHandler).end(); return router; } private final WxMaMessageHandler templateMsgHandler = (wxMessage, context, service, sessionManager) -> { service.getMsgService().sendTemplateMsg(WxMaTemplateMessage.builder() .templateId("此处更换为自己的模板id") .formId("自己替换可用的formid") .data(Lists.newArrayList( new WxMaTemplateData("keyword1", "339208499", "#173177"))) .toUser(wxMessage.getFromUser()) .build()); return null; }; private final WxMaMessageHandler logHandler = (wxMessage, context, service, sessionManager) -> { System.out.println("收到消息:" + wxMessage.toString()); service.getMsgService().sendKefuMsg(WxMaKefuMessage.newTextBuilder().content("收到信息为:" + wxMessage.toJson()) .toUser(wxMessage.getFromUser()).build()); return null; }; private final WxMaMessageHandler textHandler = (wxMessage, context, service, sessionManager) -> { service.getMsgService().sendKefuMsg(WxMaKefuMessage.newTextBuilder().content("回复文本消息") .toUser(wxMessage.getFromUser()).build()); return null; }; private final WxMaMessageHandler picHandler = (wxMessage, context, service, sessionManager) -> { try { WxMediaUploadResult uploadResult = service.getMediaService() .uploadMedia("image", "png", ClassLoader.getSystemResourceAsStream("tmp.png")); service.getMsgService().sendKefuMsg( WxMaKefuMessage .newImageBuilder() .mediaId(uploadResult.getMediaId()) .toUser(wxMessage.getFromUser()) .build()); } catch (WxErrorException e) { e.printStackTrace(); } return null; }; private final WxMaMessageHandler qrcodeHandler = (wxMessage, context, service, sessionManager) -> { try { final File file = service.getQrcodeService().createQrcode("123", 430); WxMediaUploadResult uploadResult = service.getMediaService().uploadMedia("image", file); service.getMsgService().sendKefuMsg( WxMaKefuMessage .newImageBuilder() .mediaId(uploadResult.getMediaId()) .toUser(wxMessage.getFromUser()) .build()); } catch (WxErrorException e) { e.printStackTrace(); } return null; }; }
3.3 WxPortalController
接受微信认证服务器数据,此处注意需要在微信小程序平台【开发】【开发设置】【消息推送】中配置一下,关于小程序的详细配置,会在后面的章节单独详细说明,此处仅配置认证接口。
package com.eric.etcloud.controller; import cn.binarywang.wx.miniapp.api.WxMaService; import cn.binarywang.wx.miniapp.bean.WxMaMessage; import cn.binarywang.wx.miniapp.constant.WxMaConstants; import com.eric.etcloud.common.annotation.JwtIgnore; import com.eric.etcloud.common.configs.WxMaConfiguration; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.web.bind.annotation.*; import java.util.Objects; /** * @author <a href="https://github.com/binarywang">Binary Wang</a> */ @RestController @RequestMapping("/wx/portal/{appid}") public class WxPortalController { private final Logger logger = LoggerFactory.getLogger(this.getClass()); @GetMapping(produces = "text/plain;charset=utf-8") @JwtIgnore public String authGet(@PathVariable String appid, @RequestParam(name = "signature", required = false) String signature, @RequestParam(name = "timestamp", required = false) String timestamp, @RequestParam(name = "nonce", required = false) String nonce, @RequestParam(name = "echostr", required = false) String echostr) { this.logger.info("\n接收到来自微信服务器的认证消息:signature = [{}], timestamp = [{}], nonce = [{}], echostr = [{}]", signature, timestamp, nonce, echostr); System.out.println("收到认证服务器"); if (StringUtils.isAnyBlank(signature, timestamp, nonce, echostr)) { throw new IllegalArgumentException("请求参数非法,请核实!"); } final WxMaService wxService = WxMaConfiguration.getMaService(appid); if (wxService.checkSignature(timestamp, nonce, signature)) { return echostr; } return "非法请求"; } @PostMapping(produces = "application/xml; charset=UTF-8") @JwtIgnore public String post(@PathVariable String appid, @RequestBody String requestBody, @RequestParam(name = "msg_signature", required = false) String msgSignature, @RequestParam(name = "encrypt_type", required = false) String encryptType, @RequestParam(name = "signature", required = false) String signature, @RequestParam("timestamp") String timestamp, @RequestParam("nonce") String nonce) { this.logger.info("\n接收微信请求:[msg_signature=[{}], encrypt_type=[{}], signature=[{}]," + " timestamp=[{}], nonce=[{}], requestBody=[\n{}\n] ", msgSignature, encryptType, signature, timestamp, nonce, requestBody); final WxMaService wxService = WxMaConfiguration.getMaService(appid); final boolean isJson = Objects.equals(wxService.getWxMaConfig().getMsgDataFormat(), WxMaConstants.MsgDataFormat.JSON); if (StringUtils.isBlank(encryptType)) { // 明文传输的消息 WxMaMessage inMessage; if (isJson) { inMessage = WxMaMessage.fromJson(requestBody); } else {//xml inMessage = WxMaMessage.fromXml(requestBody); } this.route(inMessage, appid); return "success"; } if ("aes".equals(encryptType)) { // 是aes加密的消息 WxMaMessage inMessage; if (isJson) { inMessage = WxMaMessage.fromEncryptedJson(requestBody, wxService.getWxMaConfig()); } else {//xml inMessage = WxMaMessage.fromEncryptedXml(requestBody, wxService.getWxMaConfig(), timestamp, nonce, msgSignature); } this.route(inMessage, appid); return "success"; } throw new RuntimeException("不可识别的加密类型:" + encryptType); } @JwtIgnore private void route(WxMaMessage message, String appid) { try { WxMaConfiguration.getRouter(appid).route(message); } catch (Exception e) { this.logger.error(e.getMessage(), e); } } }
3.4 WxApiController
自定义接口,用来自定义用户登录的操作
package com.eric.etcloud.controller; import cn.binarywang.wx.miniapp.api.WxMaService; import cn.binarywang.wx.miniapp.bean.WxMaJscode2SessionResult; import com.eric.etcloud.common.CommonController; import com.eric.etcloud.common.annotation.JwtIgnore; import com.eric.etcloud.common.configs.Audience; import com.eric.etcloud.common.configs.WxMaConfiguration; import com.eric.etcloud.common.configs.WxMaProperties; import com.eric.etcloud.common.response.Result; import com.eric.etcloud.common.utils.JwtTokenUtil; import com.eric.etcloud.entity.ModelUser; import com.eric.etcloud.service.UserService; import io.swagger.annotations.Api; import io.swagger.annotations.ApiImplicitParam; import io.swagger.annotations.ApiImplicitParams; import io.swagger.annotations.ApiOperation; import me.chanjar.weixin.common.error.WxErrorException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RestController; import javax.servlet.http.HttpServletResponse; import java.util.HashMap; import java.util.List; import java.util.Map; @RestController @RequestMapping("/wxapi") @Api(value="微信接口",tags={"微信接口"}) public class WxApiController extends CommonController { private WxMaProperties properties; private static Logger logger = LoggerFactory.getLogger(WxApiController.class); @Autowired private Audience audience; @Autowired UserService userService; @ApiOperation(value = "微信登录接口" ,notes = "微信登录接口" ) @ApiImplicitParams({ @ApiImplicitParam(name = "code" ,value = "微信小程序code" , required = true, dataType = "String") }) @RequestMapping(value = "/login", method = { RequestMethod.GET }) @JwtIgnore public Result login(HttpServletResponse response, String code){ System.out.println("登录接口"); System.out.println(code); final WxMaService wxService = WxMaConfiguration.getMaService("wx42ac48883a975c48"); try { WxMaJscode2SessionResult session = wxService.getUserService().getSessionInfo(code); Map<String,Object> result = new HashMap<>(); String openid = session.getOpenid(); result.put("openid",openid); result.put("sessionid",session.getSessionKey()); List<ModelUser> users = userService.findByOpenid(openid); if(users.size()==1){ ModelUser user = users.get(0); //业务代码删除了 } result.put("isBind",false); return Result.SUCCESS(result); } catch (WxErrorException e) { logger.error(e.getMessage(), e); return Result.FAIL("服务器异常"); } } @ApiOperation(value = "获得TOKEN的状态" ,notes = "获得TOKEN的状态" ) @ApiImplicitParams({ @ApiImplicitParam(name = "token" ,value = "微信小程序token" , required = true, dataType = "String") }) @RequestMapping(value = "/checkToken", method = { RequestMethod.GET }) @JwtIgnore public Result checkToken(String token){ boolean isExpiration = JwtTokenUtil.isExpiration(token); return Result.SUCCESS(isExpiration); } }
四、集成JWT
1.引入JAR包
<dependency> <groupId>io.jsonwebtoken</groupId> <artifactId>jjwt</artifactId> <version>0.9.0</version> </dependency>
2.添加配置文件yml
##jwt配置 audience: # 代表这个JWT的接收对象,存入audience clientId: 098f6bcd4621d373cade4e832627b4f6 # 密钥, 经过Base64加密, 可自行替换 base64Secret: MDk4ZjZiY2Q0NjIxZDM3DSDzMjYyN2I0ZjY= # JWT的签发主体,存入issuer name: superuser # 过期时间,时间戳 一天 expiresSecond: 86400000
3.代码集成
3.1 Audience
读取配置文件信息
package com.eric.etcloud.common.configs; import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.stereotype.Component; @Data @ConfigurationProperties(prefix = "audience") @Component public class Audience { private String clientId; private String base64Secret; private String name; private int expiresSecond; }
3.2 JwtTokenUtil
生成JWT和校验JWT
package com.eric.etcloud.common.utils; import com.eric.etcloud.common.configs.Audience; import com.eric.etcloud.common.exception.CustomException; import com.eric.etcloud.common.response.ResultCode; import com.eric.etcloud.entity.ModelUser; import io.jsonwebtoken.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.crypto.spec.SecretKeySpec; import javax.servlet.http.HttpServletRequest; import javax.xml.bind.DatatypeConverter; import java.security.Key; import java.util.Date; import java.util.HashMap; import java.util.Map; /** * ======================== * Created with IntelliJ IDEA. * User:eric * Date:2020/6/17 17:24 * Version: v1.0 * ======================== */ public class JwtTokenUtil { private static Logger log = LoggerFactory.getLogger(JwtTokenUtil.class); public static final String AUTH_HEADER_KEY = "Authorization"; public static final String TOKEN_PREFIX = "Bearer "; public static final String base64Security = "MDk4ZjZiY2Q0NjIxZDDSADANhZGU0ZTgzMjYyN2I0ZjY="; /** * 解析jwt * @param jsonWebToken * @param base64Security * @return */ public static Claims parseJWT(String jsonWebToken, String base64Security) { try { Claims claims = Jwts.parser() .setSigningKey(DatatypeConverter.parseBase64Binary(base64Security)) .parseClaimsJws(jsonWebToken).getBody(); return claims; } catch (ExpiredJwtException eje) { log.error("===== Token过期 =====", eje); throw new CustomException(ResultCode.PERMISSION_TOKEN_EXPIRED); } catch (Exception e){ log.error("===== token解析异常 =====", e); throw new CustomException(ResultCode.PERMISSION_TOKEN_INVALID); } } /** * 构建jwt * @param key * @param object * @param audience * @return */ public static String createJWT(String key, Object object, Audience audience) { try { // 使用HS256加密算法 SignatureAlgorithm signatureAlgorithm = SignatureAlgorithm.HS256; long nowMillis = System.currentTimeMillis(); Date now = new Date(nowMillis); //生成签名密钥 byte[] apiKeySecretBytes = DatatypeConverter.parseBase64Binary(audience.getBase64Secret()); Key signingKey = new SecretKeySpec(apiKeySecretBytes, signatureAlgorithm.getJcaName()); //添加构成JWT的参数 JwtBuilder builder = Jwts.builder().setHeaderParam("typ", "JWT") // 可以将基本不重要的对象信息放到claims .claim(key, object) // .setSubject(username) // 代表这个JWT的主体,即它的所有人 .setIssuer(audience.getClientId()) // 代表这个JWT的签发主体; .setIssuedAt(new Date()) // 是一个时间戳,代表这个JWT的签发时间; .setAudience(audience.getName()) // 代表这个JWT的接收对象; .signWith(signatureAlgorithm, signingKey); //添加Token过期时间 int TTLMillis = audience.getExpiresSecond(); if (TTLMillis >= 0) { long expMillis = nowMillis + TTLMillis; Date exp = new Date(expMillis); builder.setExpiration(exp) // 是一个时间戳,代表这个JWT的过期时间; .setNotBefore(now); // 是一个时间戳,代表这个JWT生效的开始时间,意味着在这个时间之前验证JWT是会失败的 } //生成JWT return builder.compact(); } catch (Exception e) { log.error("签名失败", e); throw new CustomException(ResultCode.PERMISSION_SIGNATURE_ERROR); } } /** * 获取当前登录的用户对象 * HttpUserInfoRes * @author eric * @date 2020年6月15日上午11:53:35 */ public static ModelUser getUserByWebToken(HttpServletRequest request) { try { final String authHeader = request.getHeader(AUTH_HEADER_KEY); log.info("## authHeader= {}", authHeader); // 获取token final String jsonWebToken = authHeader.substring(7); Claims claims = parseJWT(jsonWebToken, base64Security); Map<String,Object> map = claims.get("tUser", HashMap.class); ModelUser user = new ModelUser(); user.setId(map.get("id").toString()); user.setEid(map.get("eid").toString()); user.setUsertype(Integer.parseInt(map.get("usertype").toString())); return user; } catch (Exception e) { e.printStackTrace(); return null; } } /** * 是否已过期 * @param token * @return */ public static boolean isExpiration(String token) { return parseJWT(token, base64Security).getExpiration().before(new Date()); } }
3.3LoginConfiguration
配置拦截器
package com.eric.etcloud.common.configs; import com.eric.etcloud.common.filters.LoginInterceptor; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.servlet.config.annotation.CorsRegistry; import org.springframework.web.servlet.config.annotation.WebMvcConfigurer; import org.springframework.web.servlet.handler.MappedInterceptor; import java.util.ArrayList; import java.util.Arrays; import java.util.List; /** * @ProjectName: 登录拦截器 * @ClassName: LoginConfiguration * @Description: 负责注册并生效自己定义的拦截器配置 * @Author:eric * @Date: * @Version: 1.0 */ @Configuration public class LoginConfiguration implements WebMvcConfigurer { @Bean public MappedInterceptor getMappedInterceptor() { //注册拦截器 LoginInterceptor loginInterceptor = new LoginInterceptor(); //拦截路径 ("/**")对所有请求都拦截 String[] includePatterns = new String[]{"/**"}; //排除拦截路径 String[] excludePatterns = new String[]{"/swagger-resources/**", "/webjars/**", "/v2/**", "/swagger-ui.html/**", "/api", "/api-docs", "/api-docs/**"}; //将数组转化为集合 List<String> listOldExclude = Arrays.asList(excludePatterns); //将自定义的排除拦截路径添加到集合中 List<String> listNewExclude = new ArrayList<>(); listNewExclude.add("/netgate-server/dist/*"); listNewExclude.add("/netgate-server/"); //定义新集合 List<String> listExclude = new ArrayList<>(); listExclude.addAll(listOldExclude); listExclude.addAll(listNewExclude); //将新集合转化回新数组 String[] newExcludePatterns = listExclude.toArray(new String[listExclude.size()]); return new MappedInterceptor(includePatterns, newExcludePatterns, loginInterceptor); } /** * 跨域支持 * * @param registry */ @Override public void addCorsMappings(CorsRegistry registry) { registry.addMapping("/**") .allowedOrigins("*") .allowCredentials(true) .allowedMethods("GET", "POST", "DELETE", "PUT", "PATCH", "OPTIONS", "HEAD") .maxAge(3600 * 24); } }
3.4LoginInterceptor
实现拦截器
package com.eric.etcloud.common.filters; import com.eric.etcloud.common.annotation.JwtIgnore; import com.eric.etcloud.common.configs.Audience; import com.eric.etcloud.common.exception.CustomException; import com.eric.etcloud.common.response.ResultCode; import com.eric.etcloud.common.utils.JwtTokenUtil; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang.ObjectUtils; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.BeanFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.context.support.WebApplicationContextUtils; import org.springframework.web.method.HandlerMethod; import org.springframework.web.servlet.HandlerInterceptor; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; /** * @ProjectName: demo * @Package: com.demo.common.interceptor * @ClassName: LoginInterceptor * @Description: 登录请求拦截器 * @Author: * @Date: * @Version: 1.0 */ @Slf4j public class LoginInterceptor implements HandlerInterceptor { @Autowired private Audience audience; /** * 在请求被处理之前调用 * @param request * @param response * @param handler * @return * @throws Exception */ @Override public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) { // 忽略带JwtIgnore注解的请求, 不做后续token认证校验 if (handler instanceof HandlerMethod) { HandlerMethod handlerMethod = (HandlerMethod) handler; JwtIgnore jwtIgnore = handlerMethod.getMethodAnnotation(JwtIgnore.class); if (jwtIgnore != null) { return true; } } String requestMethod = request.getMethod(); if (requestMethod.contains("OPTIONS") || requestMethod.contains("options")) { return true; } // 获取请求头信息authorization信息 final String authHeader = request.getHeader(JwtTokenUtil.AUTH_HEADER_KEY); log.info("## authHeader= {}", authHeader); if (StringUtils.isBlank(authHeader) || !authHeader.startsWith(JwtTokenUtil.TOKEN_PREFIX)) { log.info("### 用户未登录,请先登录 ###"); throw new CustomException(ResultCode.USER_NOT_LOGGED_IN); } // 获取token final String token = authHeader.substring(7); if(audience == null){ BeanFactory factory = WebApplicationContextUtils.getRequiredWebApplicationContext(request.getServletContext()); audience = (Audience) factory.getBean("audience"); } // 验证token是否有效--无效已做异常抛出,由全局异常处理后返回对应信息 JwtTokenUtil.parseJWT(token, audience.getBase64Secret()); return true; } }
3.5 JwtIgnore
忽略JWT校验注解,比如上边的微信认证的接口我就使用了这个注解忽略jwt校验。
package com.eric.etcloud.common.annotation; import java.lang.annotation.*; /** * ======================== * JWT验证忽略注解 * Created with IntelliJ IDEA. * User:eric * Version: v1.0 * ======================== */ @Target({ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface JwtIgnore { }
3.5 UserController
登录创建JWT
@RestController @RequestMapping("/user") @Api(value="用户接口",tags={"user(用户)-增删改查;导入导出"}) public class UserController extends CommonController { @Autowired UserService userService; @Autowired private Audience audience; @ApiOperation(value = "用户登录" ,notes = "用户登录") @ApiImplicitParams({ @ApiImplicitParam(name = "email" ,value = "登录邮箱" , required = true, dataType = "String"), @ApiImplicitParam(name = "password" ,value = "密码" , required = true, dataType = "String") }) @RequestMapping(value = "/login", method = { RequestMethod.POST }) @JwtIgnore public Result login( HttpServletResponse response, @RequestParam(value="email", required=true) String email, @RequestParam(value="password", required=true) String password){ ModelUser user = null; try { user = userService.findByEmail(email).get(0); //user.setLasttime(new Timestamp(System.currentTimeMillis())); //userService.save(user);记录最近一次登陆时间 暂不记录 } catch (Exception e) { e.printStackTrace(); } if(user!=null){ //验证密码 String md5_password = DigestUtils.md5DigestAsHex(password.getBytes()); if(md5_password.equals(user.getPassword())){ HttpUserInfoRes userRes = gson.fromJson(gson.toJson(user),HttpUserInfoRes.class); // 封装返回对象 String power = userService.getPowerByUserid(user.getId()); ModelEid modelEid = eidService.findById(user.getEid()); userRes.setEidName(modelEid.getName()); userRes.setPowers(power); String accessToken = JwtTokenUtil.createJWT("tUser", userRes, audience); // 将token放在响应头 response.setHeader(JwtTokenUtil.AUTH_HEADER_KEY, JwtTokenUtil.TOKEN_PREFIX + accessToken); userRes.setLogintime(new Timestamp(System.currentTimeMillis())); userRes.setWebtoken(accessToken); logService.save(new ModelServicelog(CommonData.modeluser,NetNotes.info.toInteger(),user.getId(),user.getEid(),user.getUsername()+"登陆了系统")); return Result.SUCCESS(userRes); }else{ return Result.FAIL("密码错误"); } }else{ return Result.FAIL("用户不存在"); } } }
3.6 普通controller获取jwt中的用户数据
在CommonController中调用JwtTokenUtil解析token获得用户数据
在普通Controller中就可以直接super.getUserid(request)获取用户信息
五、集成定时任务
1.引入JAR包
<dependency> <groupId>org.quartz-scheduler</groupId> <artifactId>quartz</artifactId> <version>2.2.2</version> </dependency>
2.代码集成
2.1 QuartzJobConfig
配置类
package com.eric.etcloud.common.configs; import com.eric.etcloud.common.beans.ConfigNet; import com.eric.etcloud.common.job.JobTiming; import com.eric.etcloud.common.job.QuartzManager; import org.quartz.Scheduler; import org.quartz.SchedulerException; import org.quartz.SchedulerFactory; import org.quartz.impl.StdSchedulerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationListener; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.event.ContextRefreshedEvent; import javax.annotation.Resource; @Configuration public class QuartzJobConfig implements ApplicationListener<ContextRefreshedEvent>{ @Autowired private QuartzManager quartzManager; @Resource ConfigNet confignet; @Override public void onApplicationEvent(ContextRefreshedEvent event) { // TODO Auto-generated method stub 0 0 0 * * ? //0 0 */1 * * ? //默认开启的定时任务,此处可以替换为从数据库查询数据表,然后执行定时任务 try { quartzManager.addJob(confignet.getTaskTimingName(),confignet.getTaskTimingGroup(), confignet.getTriggerTimingName(),confignet.getTriggerTimingGroup(), JobTiming.class, "0 0 */1 * * ?", null); // quartzManager.addJob("heartjobname", "heartgroupname", "hearttriggername", "hearttriggerrgroup", // JobHeart.class, "*/55 * * * * ?", null); quartzManager.startJob(); System.out.println("定时任务已经启动..."); } catch (SchedulerException e) { e.printStackTrace(); } } /** * 初始注入scheduler * @return * @throws SchedulerException */ @Bean public Scheduler scheduler() throws SchedulerException{ SchedulerFactory schedulerFactoryBean = new StdSchedulerFactory(); return schedulerFactoryBean.getScheduler(); } }
2.2 JobFactory
定时任务工厂
package com.eric.etcloud.common.job; import org.quartz.spi.TriggerFiredBundle; import org.springframework.beans.factory.config.AutowireCapableBeanFactory; import org.springframework.scheduling.quartz.AdaptableJobFactory; import org.springframework.stereotype.Component; //解决SpringBoot不能再Quartz中注入Bean的问题 @Component public class JobFactory extends AdaptableJobFactory { /** * AutowireCapableBeanFactory接口是BeanFactory的子类 * 可以连接和填充那些生命周期不被Spring管理的已存在的bean实例 */ private AutowireCapableBeanFactory factory; public JobFactory(AutowireCapableBeanFactory factory) { this.factory = factory; } /** * 创建Job实例 */ @Override protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception { // 实例化对象 Object job = super.createJobInstance(bundle); // 进行注入(Spring管理该Bean) factory.autowireBean(job); //返回对象 return job; } }
2.3 JobHeart
一个模拟的定时任务,这是一个心跳定时任务
package com.eric.etcloud.common.job; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.quartz.QuartzJobBean; public class JobHeart extends QuartzJobBean{ // @Autowired // private NetgateHandler netgateHandler; @Override protected void executeInternal(JobExecutionContext arg0) throws JobExecutionException { // TODO Auto-generated method stub System.out.println("beat"); try { // netgateHandler.sendHeartBeatToAllManager(); } catch (Exception e) { // TODO: handle exception e.printStackTrace(); } } }
2.4 QuartzConfig
配置类
package com.eric.etcloud.common.job; import org.quartz.Scheduler; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.quartz.SchedulerFactoryBean; @Configuration public class QuartzConfig { private JobFactory jobFactory; public QuartzConfig(JobFactory jobFactory){ this.jobFactory = jobFactory; } /** * 配置SchedulerFactoryBean * * 将一个方法产生为Bean并交给Spring容器管理 */ @Bean public SchedulerFactoryBean schedulerFactoryBean() { // Spring提供SchedulerFactoryBean为Scheduler提供配置信息,并被Spring容器管理其生命周期 SchedulerFactoryBean factory = new SchedulerFactoryBean(); // 设置自定义Job Factory,用于Spring管理Job bean factory.setJobFactory(jobFactory); return factory; } // @Bean(name = "scheduler") // public Scheduler scheduler() { // return schedulerFactoryBean().getScheduler(); // } }
2.5 QuartzManager
定时任务Handle可以创建、删除定时任务
package com.eric.etcloud.common.job; import java.util.Map; import org.quartz.CronScheduleBuilder; import org.quartz.CronTrigger; import org.quartz.JobBuilder; import org.quartz.JobDetail; import org.quartz.JobKey; import org.quartz.Scheduler; import org.quartz.SchedulerException; import org.quartz.Trigger; import org.quartz.TriggerBuilder; import org.quartz.TriggerKey; import org.springframework.stereotype.Service; @Service public class QuartzManager { private Scheduler scheduler; public QuartzManager(Scheduler scheduler){ this.scheduler = scheduler; } /** * 开始执行所有任务 * 默认执行的定时任务 * @throws SchedulerException */ public void startJob() throws SchedulerException { // startJob1(scheduler); // startJob2(scheduler); // netHeartBeat(scheduler, "heatbeat", "heartbeat", "*/5 * * * * ?");//每30s发送一次 scheduler.start(); } /** * 添加一个定时任务 * * @param jobName 任务名 * @param jobGroupName 任务组名 * @param triggerName 触发器名 * @param triggerGroupName 触发器组名 * @param jobClass 任务 * @param cron 时间设置,参考quartz说明文档 */ @SuppressWarnings({ "unchecked", "rawtypes" }) public void addJob(String jobName, String jobGroupName, String triggerName, String triggerGroupName, Class jobClass, String cron, Map<String, Object> params) { try { // 任务名,任务组,任务执行类 JobDetail job = JobBuilder.newJob(jobClass).withIdentity(jobName, jobGroupName).build(); // 任务参数 // job.getJobDataMap().putAll(params); // 触发器 TriggerBuilder<Trigger> triggerBuilder = TriggerBuilder.newTrigger(); // 触发器名,触发器组 triggerBuilder.withIdentity(triggerName, triggerGroupName); triggerBuilder.startNow(); // 触发器时间设定 triggerBuilder.withSchedule(CronScheduleBuilder.cronSchedule(cron)); // 创建Trigger对象 CronTrigger trigger = (CronTrigger) triggerBuilder.build(); // 调度容器设置JobDetail和Trigger scheduler.scheduleJob(job, trigger); // 启动 if (!scheduler.isShutdown()) { scheduler.start(); } } catch (Exception e) { throw new RuntimeException(e); } } /** * 修改一个任务的触发时间 * * @param triggerName 触发器名 * @param triggerGroupName 触发器组名 * @param cron 时间设置,参考quartz说明文档 */ public void modifyJobTime(String triggerName, String triggerGroupName, String cron) { try { TriggerKey triggerKey = TriggerKey.triggerKey(triggerName, triggerGroupName); CronTrigger trigger = (CronTrigger) scheduler.getTrigger(triggerKey); if (trigger == null) { return; } String oldTime = trigger.getCronExpression(); if (!oldTime.equalsIgnoreCase(cron)) { // 触发器 TriggerBuilder<Trigger> triggerBuilder = TriggerBuilder.newTrigger(); // 触发器名,触发器组 triggerBuilder.withIdentity(triggerName, triggerGroupName); triggerBuilder.startNow(); // 触发器时间设定 triggerBuilder.withSchedule(CronScheduleBuilder.cronSchedule(cron)); // 创建Trigger对象 trigger = (CronTrigger) triggerBuilder.build(); // 方式一 :修改一个任务的触发时间 scheduler.rescheduleJob(triggerKey, trigger); } } catch (Exception e) { throw new RuntimeException(e); } } /** * 移除一个任务 * * @param jobName 任务名 * @param jobGroupName 任务组名 * @param triggerName 触发器名 * @param triggerGroupName 触发器组名 */ public void removeJob(String jobName, String jobGroupName, String triggerName, String triggerGroupName) { try { TriggerKey triggerKey = TriggerKey.triggerKey(triggerName, triggerGroupName); // 停止触发器 scheduler.pauseTrigger(triggerKey); // 移除触发器 scheduler.unscheduleJob(triggerKey); // 删除任务 scheduler.deleteJob(JobKey.jobKey(jobName, jobGroupName)); } catch (Exception e) { throw new RuntimeException(e); } } /** * 获取任务是否存在 * * STATE_BLOCKED 4 阻塞 * STATE_COMPLETE 2 完成 * STATE_ERROR 3 错误 * STATE_NONE -1 不存在 * STATE_NORMAL 0 正常 * STATE_PAUSED 1 暂停 * */ public Boolean notExists(String triggerName, String triggerGroupName) { try { return scheduler.getTriggerState(TriggerKey.triggerKey(triggerName, triggerGroupName)) == Trigger.TriggerState.NONE; } catch (Exception e) { throw new RuntimeException(e); } } /** * 获取Job信息 * * @param triggerName * @param triggerGroupName * @return * @throws SchedulerException */ public String getJobInfo(String triggerName, String triggerGroupName) throws SchedulerException { TriggerKey triggerKey = new TriggerKey(triggerName,triggerGroupName ); CronTrigger cronTrigger = (CronTrigger) scheduler.getTrigger(triggerKey); if(cronTrigger != null) { return String.format("time:%s,state:%s", cronTrigger.getCronExpression(), scheduler.getTriggerState(triggerKey).name()); }else { return ""; } } /** * 暂停所有任务 * * @throws SchedulerException */ public void pauseAllJob() throws SchedulerException { scheduler.pauseAll(); } /** * 暂停某个任务 * * @param name * @param group * @throws SchedulerException */ public void pauseJob(String triggerName, String triggerGroupName) throws SchedulerException { JobKey jobKey = new JobKey(triggerName, triggerGroupName); JobDetail jobDetail = scheduler.getJobDetail(jobKey); if (jobDetail == null) return; scheduler.pauseJob(jobKey); } /** * 恢复所有任务 * * @throws SchedulerException */ public void resumeAllJob() throws SchedulerException { scheduler.resumeAll(); } /** * 恢复某个任务 * * @param name * @param group * @throws SchedulerException */ public void resumeJob(String triggerName, String triggerGroupName) throws SchedulerException { JobKey jobKey = new JobKey(triggerName, triggerGroupName); JobDetail jobDetail = scheduler.getJobDetail(jobKey); if (jobDetail == null) return; scheduler.resumeJob(jobKey); } /** * 删除某个任务 * * @param name * @param group * @throws SchedulerException */ public void deleteJob(String triggerName, String triggerGroupName) throws SchedulerException { JobKey jobKey = new JobKey(triggerName, triggerGroupName); JobDetail jobDetail = scheduler.getJobDetail(jobKey); if (jobDetail == null) return; scheduler.deleteJob(jobKey); } }
2.6 JobController
提供可调用的定时任务接口
package com.eric.etcloud.controller; import com.eric.etcloud.common.CommonController; import com.eric.etcloud.common.beans.ConfigNet; import com.eric.etcloud.common.job.JobHeart; import com.eric.etcloud.common.job.JobVersion; import com.eric.etcloud.common.job.QuartzManager; import com.eric.etcloud.common.response.Result; import com.eric.etcloud.common.utils.CommonData; import com.eric.etcloud.common.utils.CronDateUtil; import com.eric.etcloud.common.utils.NetNotes; import com.eric.etcloud.entity.ModelServicelog; import com.eric.etcloud.entity.ModelTask; import com.eric.etcloud.service.ServicelogService; import com.eric.etcloud.service.TaskService; import io.swagger.annotations.ApiImplicitParam; import io.swagger.annotations.ApiImplicitParams; import io.swagger.annotations.ApiOperation; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; import javax.servlet.http.HttpServletRequest; import java.util.Date; @RestController @RequestMapping("/job") public class JobController extends CommonController { private QuartzManager quartzManager; @Autowired ServicelogService logService; @Autowired TaskService taskService; @Resource ConfigNet confignet; public JobController (QuartzManager quartzManager) { this.quartzManager = quartzManager; } /** * @Title: getQuartzJob * @Description: TODO(定时任务信息) * @param @return 参数 * @return String 返回类型 * @throws */ @RequestMapping("/heart") public void startHeart() { quartzManager.addJob("heartjobname", "heartgroupname", "hearttriggername", "hearttriggerrgroup", JobHeart.class, "*/10 * * * * ?", null); } @ApiOperation(value = "定时任务自动升级", notes = "定时任务自动升级") @ApiImplicitParams({ @ApiImplicitParam(name = "tasktype",value = "tasktype",required = true,dataType = "Integer"), @ApiImplicitParam(name = "taskdata",value = "taskdata",required = false,dataType = "Date"), @ApiImplicitParam(name = "tasktime",value = "tasktime",required = true,dataType = "Date"), @ApiImplicitParam(name = "taskid",value = "taskid",required = true,dataType = "String") }) @RequestMapping("/upgradeauto") public Result startQuartzJob(HttpServletRequest request, int tasktype, Date taskdata, Date tasktime, String taskid) { String cron = ""; try { //清除定时任务 quartzManager.removeJob(confignet.getTaskGradeName(), confignet.getTaskGradeGroup(), confignet.getTriggerGradeName(), confignet.getTriggerGradeGroup()); // quartzScheduler.deleteJob(confignet.getTaskGradeName(), confignet.getTaskGradeGroup());//清除定时任务 if(tasktype== NetNotes.everyday.toInteger()) { cron = CronDateUtil.getTaskCron(tasktime); logService.save(new ModelServicelog(CommonData.modelversion,NetNotes.info.toInteger(),super.getUserid(request),super.getEid(request),"开启了一个每天执行的定时升级版本任务"+tasktime)); System.out.println("开启了一个每天执行的定时升级版本任务"+tasktime); return Result.SUCCESS("开启了一个每天执行的定时升级版本任务"+tasktime); }else if(tasktype == NetNotes.oneday.toInteger()) { cron = CronDateUtil.getTaskCron(taskdata,tasktime); logService.save(new ModelServicelog(CommonData.modelversion,NetNotes.info.toInteger(),super.getUserid(request),super.getEid(request),"开启了一个指定日期执行的定时升级版本任务"+tasktime)); System.out.println("开启了一个指定日期执行的定时升级版本任务"+tasktime); return Result.SUCCESS("开启了一个指定日期执行的定时升级版本任务"+tasktime); } //保存数据库 taskService.save(new ModelTask(taskid,confignet.getTaskGradeGroup(),confignet.getTaskGradeName(),cron,tasktype,super.getUserid(request),taskdata,tasktime)); quartzManager.addJob(confignet.getTaskGradeName(), confignet.getTaskGradeGroup(), confignet.getTriggerGradeName(), confignet.getTriggerGradeGroup(), JobVersion.class, cron, null); // quartzScheduler.startJob(confignet.getTaskGradeGroup(),confignet.getTaskGradeName(),cron);//开启新的定时任务 } catch (Exception e) { e.printStackTrace(); return Result.FAIL("指定定时任务失败"+tasktime); } return Result.SUCCESS(); } /** * @Title: getQuartzJob * @Description: TODO(定时任务信息) * @param @return 参数 * @return String 返回类型 * @throws */ @GetMapping("/latesttask") public Result getLatestTask() { return Result.SUCCESS(gson.toJson(taskService.getLatestTask())); } /** * @Title: deleteJob * @Description: TODO(删除定时任务) * @return void 返回类型 * @throws */ @RequestMapping("/taskdelete") public Result deleteJob() { try { quartzManager.removeJob(confignet.getTaskGradeName(), confignet.getTaskGradeGroup(), confignet.getTriggerGradeName(), confignet.getTriggerGradeGroup()); logService.save(new ModelServicelog(CommonData.modelversion,NetNotes.info.toInteger(),"","","删除了升级版本的定时任务")); System.out.println("删除了升级版本的定时任务"); } catch (Exception e) { e.printStackTrace(); logService.save(new ModelServicelog(CommonData.modelversion,NetNotes.error.toInteger(),"","","删除了升级版本的定时任务失败")); return Result.FAIL("删除了升级版本的定时任务失败"); } return Result.SUCCESS("删除了升级版本的定时任务"); } /** * @Title: getQuartzJob * @Description: TODO(定时任务信息) * @param @return 参数 * @return String 返回类型 * @throws */ @RequestMapping("/taskinfo") public Result getQuartzJob() { Boolean result = false; try { //String a = quartzManager.getJobInfo(confignet.getTriggerGradeName(), confignet.getTriggerGradeGroup()); // System.out.println(a); result = quartzManager.notExists(confignet.getTriggerGradeName(), confignet.getTriggerGradeGroup()); // info = quartzScheduler.getJobInfo(confignet.getTaskGradeName(), confignet.getTaskGradeGroup()); } catch (Exception e) { e.printStackTrace(); Result.FAIL(); } return Result.SUCCESS(); } }
2.7 CronDateUtil
非常有用的工具类,可以将时间类型转为cron表达式,也可以将cron表达式转化为事件类型
package com.eric.etcloud.common.utils; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date; public class CronDateUtil { private static final String CRON_DATE_FORMAT = "ss mm HH dd MM ? yyyy"; private static final String TASK_DATE = " dd MM ? yyyy"; private static final String TASK_TIME = "ss mm HH"; /** * @Title: getCron * @Description: TODO(输入Date类型的时间日期转换为cron,用于做指定时间的定时任务) * @param @param date * @param @param time * @param @return 参数 * @return String 返回类型 * @throws */ public static String getTaskCron(Date date,Date time) { SimpleDateFormat sdftime = new SimpleDateFormat(TASK_TIME); SimpleDateFormat sdfdate = new SimpleDateFormat(TASK_DATE); String result = ""; if(time!=null) { result += sdftime.format(time); } if(date != null) { result += sdfdate.format(date); } return result; } public static String getTaskCron(Date time) { SimpleDateFormat sdftime = new SimpleDateFormat(TASK_TIME); String result = ""; if(time!=null) { result += sdftime.format(time); } result += " * * ?"; return result; } /*** * * @param date 时间 * @return cron类型的日期 */ public static String getCron(final Date date){ SimpleDateFormat sdf = new SimpleDateFormat(CRON_DATE_FORMAT); String formatTimeStr = ""; if (date != null) { formatTimeStr = sdf.format(date); } return formatTimeStr; } /*** * * @param cron Quartz cron的类型的日期 * @return Date日期 */ public static Date getDate(final String cron) { if(cron == null) { return null; } SimpleDateFormat sdf = new SimpleDateFormat(CRON_DATE_FORMAT); Date date = null; try { date = sdf.parse(cron); } catch (ParseException e) { return null;// 此处缺少异常处理,自己根据需要添加 } return date; } public static void main(String[] args) { Date now = new Date(); System.out.println(now); System.out.println(CronDateUtil.getCron(now)); String cron = "20 28 17 02 08 ? 2016"; Date cronDate = CronDateUtil.getDate(cron); System.out.println("==================="); System.out.println(cronDate.toString()); } }
六、集成HTTPClient
1.引入JAR包
<dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> </dependency>
2.RestTemplateConfig
配置文件
package com.eric.etcloud.common.configs; import org.apache.http.client.HttpClient; import org.apache.http.config.Registry; import org.apache.http.config.RegistryBuilder; import org.apache.http.conn.socket.ConnectionSocketFactory; import org.apache.http.conn.socket.PlainConnectionSocketFactory; import org.apache.http.conn.ssl.NoopHostnameVerifier; import org.apache.http.conn.ssl.SSLConnectionSocketFactory; import org.apache.http.conn.ssl.SSLContextBuilder; import org.apache.http.conn.ssl.TrustStrategy; import org.apache.http.impl.client.HttpClientBuilder; import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.http.client.HttpComponentsClientHttpRequestFactory; import org.springframework.web.client.DefaultResponseErrorHandler; import org.springframework.web.client.RestTemplate; import javax.net.ssl.HostnameVerifier; import javax.net.ssl.SSLContext; import java.security.KeyManagementException; import java.security.KeyStoreException; import java.security.NoSuchAlgorithmException; import java.security.cert.CertificateException; import java.security.cert.X509Certificate; /** * RestTemplate配置 * 这是一种JavaConfig的容器配置,用于spring容器的bean收集与注册,并通过参数传递的方式实现依赖注入。 * "@Configuration"注解标注的配置类,都是spring容器配置类,springboot通过"@EnableAutoConfiguration" * 注解将所有标注了"@Configuration"注解的配置类,"一股脑儿"全部注入spring容器中。 * * @author yangrui * @date 2020年5月14日 */ @Configuration public class RestTemplateConfig { @Bean public RestTemplate restTemplate() { RestTemplate restTemplate = new RestTemplate(); restTemplate.setRequestFactory(clientHttpRequestFactory()); restTemplate.setErrorHandler(new DefaultResponseErrorHandler()); return restTemplate; } @Bean public HttpComponentsClientHttpRequestFactory clientHttpRequestFactory() { try { HttpClientBuilder httpClientBuilder = HttpClientBuilder.create(); @SuppressWarnings("deprecation") SSLContext sslContext = new SSLContextBuilder().loadTrustMaterial(null, new TrustStrategy() { @Override public boolean isTrusted(X509Certificate[] arg0, String arg1) throws CertificateException { return true; } }).build(); httpClientBuilder.setSSLContext(sslContext); HostnameVerifier hostnameVerifier = NoopHostnameVerifier.INSTANCE; SSLConnectionSocketFactory sslConnectionSocketFactory = new SSLConnectionSocketFactory(sslContext, hostnameVerifier); Registry<ConnectionSocketFactory> socketFactoryRegistry = RegistryBuilder.<ConnectionSocketFactory>create() .register("http", PlainConnectionSocketFactory.getSocketFactory()) .register("https", sslConnectionSocketFactory).build();// 注册http和https请求 // 开始设置连接池 PoolingHttpClientConnectionManager poolingHttpClientConnectionManager = new PoolingHttpClientConnectionManager( socketFactoryRegistry); poolingHttpClientConnectionManager.setMaxTotal(2700); // 最大连接数2700 poolingHttpClientConnectionManager.setDefaultMaxPerRoute(100); // 同路由并发数100 httpClientBuilder.setConnectionManager(poolingHttpClientConnectionManager); // httpClientBuilder.setRetryHandler(new DefaultHttpRequestRetryHandler(3, true)); // 重试次数 HttpClient httpClient = httpClientBuilder.build(); HttpComponentsClientHttpRequestFactory clientHttpRequestFactory = new HttpComponentsClientHttpRequestFactory( httpClient); // httpClient连接配置 clientHttpRequestFactory.setConnectTimeout(20000); // 连接超时 clientHttpRequestFactory.setReadTimeout(30000); // 数据读取超时时间 clientHttpRequestFactory.setConnectionRequestTimeout(20000); // 连接不够用的等待时间 return clientHttpRequestFactory; } catch (KeyManagementException | NoSuchAlgorithmException | KeyStoreException e) { System.out.println("初始化HTTP连接池出错"); e.printStackTrace(); } return null; } }
3. 调用
@Override public void postAllData(String rts) { // TODO Auto-generated method stub System.out.println("进入推送,将要推送的数据大小为"+rts.length()); List<ModelDatapush> datapush = datapushRepository.findAll(); for(int i=0;i<datapush.size();i++) { try { String uri = "http://"+datapush.get(i).getServicehost()+":"+datapush.get(i).getServiceport(); String uridt = uri + datapush.get(i).getDturl(); String urisj = uri + datapush.get(i).getSjurl(); String uridb = uri + datapush.get(i).getDburl(); HttpHeaders headers = new HttpHeaders(); MediaType type = MediaType.parseMediaType("application/json; charset=UTF-8"); headers.setContentType(type); headers.add("Accept", MediaType.APPLICATION_JSON.toString()); System.out.println("web推送"+urisj+"将要推送一条数据"); HttpEntity<String> formEntity = new HttpEntity<String>(rts, headers); String resulsj = restTemplate.postForObject(urisj, formEntity, String.class); System.out.println("web推送"+urisj+"推送了一条数据成功"); } catch (Exception e) { e.printStackTrace(); } } }
七、EMQ-HTTP鉴权接口及API调用
1. EmqApiServiceImpl(EMQAPI调用接口)
package com.eric.etcloud.service.impl; import com.eric.etcloud.service.EmqApiService; import org.springframework.stereotype.Service; import sun.misc.BASE64Encoder; import java.io.ByteArrayOutputStream; import java.io.InputStream; import java.io.UnsupportedEncodingException; import java.net.HttpURLConnection; import java.net.URL; @Service public class EmqApiServiceImpl implements EmqApiService { //用户名 private static String username = "admin"; //登录密码 private static String password = "public"; //服务器地址 private static String serverPath = "http://127.0.0.1:18083"; //当前页 private static int pageIndex = 1; //页大小 private static int pageSize = 100; @Override public String query(String queryPathUrl, int pageIndex, int pageSize) throws Exception { //拼接查询参数 if(pageIndex>0&&pageSize>0){ queryPathUrl = queryPathUrl +"?" + "_page=" + pageIndex + "&" + "_limit=" + pageSize; } URL url = new URL(serverPath+queryPathUrl); HttpURLConnection conn = (HttpURLConnection) url.openConnection(); String authorization = getBase64(username, password); //连接认证信息放在头里,注意,base64可以反编码,有安全隐患 conn.setRequestProperty("authorization", "Basic "+authorization); conn.setRequestMethod("GET"); // 开始连接 conn.connect(); String resule = null ; if (conn.getResponseCode() == 200) { // 请求返回的数据 InputStream inputStream = conn.getInputStream(); byte[] readBuffer = new byte[1024]; int numBytes = -1; ByteArrayOutputStream resultB = new ByteArrayOutputStream(); while (inputStream.available() > 0) { numBytes = inputStream.read(readBuffer); if (numBytes >= 0) { resultB.write(readBuffer, 0, numBytes); readBuffer = new byte[1024]; Thread.sleep(500); } } resule = new String(resultB.toByteArray(), "UTF-8"); inputStream.close(); } return resule; } private static String getBase64(String admin, String aPublic) throws UnsupportedEncodingException { final String text = admin+":"+aPublic; final BASE64Encoder encoder = new BASE64Encoder(); final byte[] textByte = text.getBytes("UTF-8"); return encoder.encode(textByte); } }
2. EmqApiController(EMQAPI控制器)
package com.eric.etcloud.controller; import com.eric.etcloud.common.CommonController; import com.eric.etcloud.common.annotation.JwtIgnore; import com.eric.etcloud.common.response.Result; import com.eric.etcloud.entity.EmqClient; import com.eric.etcloud.service.EmqApiService; import com.eric.etcloud.service.ProductService; import io.swagger.annotations.Api; import io.swagger.annotations.ApiImplicitParam; import io.swagger.annotations.ApiImplicitParams; import io.swagger.annotations.ApiOperation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RestController; import javax.servlet.http.HttpServletResponse; import java.util.List; @RestController @RequestMapping("/emqapi") @Api(value="EMQAPI接口",tags={"EMQAPI接口"}) public class EmqApiController extends CommonController { @Autowired EmqApiService emqApiService; @Autowired ProductService productService; private static Logger logger = LoggerFactory.getLogger(EmqApiController.class); @ApiOperation(value = "获取所有客户端" ,notes = "获取所有客户端" ) @ApiImplicitParams({ @ApiImplicitParam(name = "clientid" ,value = "客户端clientId" , required = false, dataType = "String") }) @RequestMapping(value = "/getAllClient", method = { RequestMethod.GET }) public Result getAllClient(String clientid){ //账号密码Base64加密 String json = ""; try { String url = "/api/v4/clients"; if(!"".equals(clientid) && clientid!=null){ url = url + "/" +clientid; } json = emqApiService.query (url,1, 1000); } catch (Exception e) { e.printStackTrace(); return Result.FAIL("获取数据失败"); } //对返回结果的处理 EmqClient queryResule = gson.fromJson(json, EmqClient.class); List<EmqClient.ClientInfo> data = queryResule.getData (); return Result.SUCCESS(data); } @ApiOperation(value = "客户端连接授权" ,notes = "客户端连接授权" ) @ApiImplicitParams({ @ApiImplicitParam(name = "clientid" ,value = "客户端clientId" , required = false, dataType = "String"), @ApiImplicitParam(name = "username" ,value = "客户端username" , required = false, dataType = "String"), @ApiImplicitParam(name = "password" ,value = "客户端password" , required = false, dataType = "String") }) @RequestMapping(value = "/auth", method = RequestMethod.POST) @JwtIgnore public void checkUser(String clientid, String username, String password, HttpServletResponse response) { logger.info("普通用户;clientid:" + clientid + ";username:" + username + ";password:" + password); System.out.println("登录接口"); //计算用户的剩余设备 if(productService.countDevNumByPid(username)<=0){ System.out.println("clientid:"+clientid+",username:"+username+",password:"+password+"超过产品最大允许数,禁止通过"); response.setStatus(402); return; } if (!"".equals(clientid) && productService.findByIdAndToken(username, password).size()>0) { System.out.println("clientid:"+clientid+",username:"+username+",password:"+password+"允许通过"); response.setStatus(200); } else { System.out.println("clientid:"+clientid+",username:"+username+",password:"+password+"产品信息错误,禁止通过"); response.setStatus(401); } } @RequestMapping("/superuser") @JwtIgnore public void mqttSuperuser(String clientid, String username, HttpServletResponse response) { //auth.http.super_req.params = clientid=%c,username=%u if(clientid.startsWith("server_client_")|| clientid.startsWith("web_client_")||clientid.startsWith("wxapp_client_")){ response.setStatus(200); return; } logger.info("超级用户;clientid:" + clientid + ";username:" + username); System.out.println("超级用户;clientid:" + clientid + ";username:" + username); response.setStatus(200); } @RequestMapping("/acl") @JwtIgnore public void mqttAcl(String access, String username, String clientid, String ipaddr, String topic, HttpServletResponse response) { //auth.http.acl_req.params = access=%A,username=%u,clientid=%c,ipaddr=%a,topic=%t logger.info("access: " + access + ";username: " + username + ";clientid: " + clientid + "; ipaddr: " + ipaddr + ";topic: " + topic); System.out.println("access: " + access + ";username: " + username + ";clientid: " + clientid + "; ipaddr: " + ipaddr + ";topic: " + topic); response.setStatus(200); } }