暂时未有相关云产品技术能力~
其原理是在Spring容器启动好了之后,监听Spring容器内部发出的ApplicationReadyEvent事件,监听该事件,并且开启两个后台线程用于处理redis内部的stream数据。封装相关的消息发布功能消息的发送部分比较简单,直接通过redis往stream里面写入数据即可package org.idea.mq.redis.framework.producer; /** * @Author linhao * @Date created in 12:23 下午 2022/2/10 */ public interface IStreamProducer { /** * 指定streamName发布消息 * @param streamName * @param json */ void sendMsg(String streamName, String json); }消息的传输格式采用json字符串的方式写入到redis内部的stream当中。package org.idea.mq.redis.framework.producer; import org.idea.mq.redis.framework.redis.IRedisService; import javax.annotation.Resource; import java.util.HashMap; import java.util.Map; /** * @Author linhao * @Date created in 12:19 下午 2022/2/10 */ public class StreamProducer implements IStreamProducer{ @Resource private IRedisService iRedisService; @Override public void sendMsg(String streamName,String json){ Map<String,String> map = new HashMap<>(); map.put("json",json); iRedisService.xAdd(streamName,map); } }注意,写入底层的时候,我使用的是Redis内部自动生成的ID序号,代码如下:@Override public boolean xAdd(String streamName, Map<String, String> stringMap) { try (Jedis jedis = iRedisFactory.getConnection()) { jedis.xadd(streamName, StreamEntryID.NEW_ENTRY, stringMap); return true; } catch (Exception e) { throw new RuntimeException(e); } }为了方便将其作为一个SpringBoot的starter组件供外界团队人员使用,我们可以将其封装为一个starter组件:组件的测试点对点发送测试建立两套微服务工程,user-service 和 order-service,其中user-service部署两个服务节点,同属user-service-group。order-service也要部署两个服务节点,同属order-service-group。最后两个微服务集群之间互相发布对方订阅的消息,查看是否能够正常接受,且同一个组内一次只有一个节点接收消息。广播发送测试使用之前搭建好的user-service模块,部署四个节点,订阅同一个stream队列,但是将其groupName设置为不同的属性,最后发布消息,查看四个节点都能正常接收。具体细节在现有工程内部已经建立了测试模版,感兴趣的朋友可以去阅读下mq-redis-test模块的部分。问题思考为何同一个StreamName需要采用双线程消费?一个线程用于接受Stream内部正常数据,如果业务正常处理则对其返回为ack信号,确认该消息已经消费成功。如果处理过程中出现异常,则不反回ACK信号,此时Redis内部会将该消息放入到Pending队列中,而第二个线程专门用于处理Pending队列内部的数据。如果处于Pending状态的消息第二次消费依然失败,则会进行定时轮询状况。是否支持延迟重试目前的设计其实一直都存在不足点,例如当消息消费异常后会进入轮询,严重情况下可能会导致消息消费出现死循环,并且一直堵塞。暂时还未实现类似于RocketMQ的那种间隔1,3,5...分钟定时投递消费失败消息都功能。感兴趣的小伙伴可以基于现有代码进行简单改造。本文完整代码案例地址https://gitee.com/IdeaHome_admin/mq-framework推荐主流Java进阶技术(学习资料分享)Java面试题宝典加入Spring技术开发社区
需求提出:公司内部已有一套oneid用户中心,需要支持登录gitlab。实现GitLab支持配置第三方登录, 修改配置文件gitlab.rb:vi /etc/gitlab/gitlab.rb #OAuth2.0 gitlab_rails['omniauth_enabled'] = true gitlab_rails['omniauth_allow_single_sign_on'] = ['OneID'] gitlab_rails['omniauth_block_auto_created_users'] = false gitlab_rails['omniauth_providers'] = [ { 'name' => 'OneID', 'app_id' => '123', 'app_secret' => '1111', 'args' => { client_options: { 'site' => 'http://10.30.75.85:31900', 'authorize_url' => '/auth', 'user_info_url' => '/userInfo' }, user_response_structure: { root_path: [], id_path: 'userAccountID', attributes: { name: 'realName', nickname: 'nickname', email: 'email', username:'username' } }, name: 'OneID', strategy_class: "OmniAuth::Strategies::OAuth2Generic" } } ]http://10.30.75.85:31900 :本人服务的地址以上数据仅供参考,请根据实际情况修改,不清楚配置请百度,有详细案例我服务实现方式为java web项目(Spring boot),配置:<dependency> <groupId>org.jsoup</groupId> <artifactId>jsoup</artifactId> <version>1.11.3</version> </dependency> <dependency> <groupId>com.konghq</groupId> <artifactId>unirest-java</artifactId> <version>3.5.00</version> </dependency> <!-- 需要作为独立jar文件引用时(包含隐式依赖) --> <dependency> <groupId>com.konghq</groupId> <artifactId>unirest-java</artifactId> <version>3.5.00</version> <classifier>standalone</classifier> </dependency>定义OAuthController.java@Controller @RefreshScope public class OAuthController extends BaseController { @Value("${dossen.gitlab.url}") private String gitLabUrl; /** * 获得通过oneid登录得重定向地址 * @return */ @RequestMapping(value = "/login", method = RequestMethod.GET) public String getGitLabStateVal(HttpServletRequest request, HttpServletResponse response){ //所有cookie-我就看看,没什么用 Cookie[] cookies = request.getCookies(); //获得通过oneid登录得重定向地址 String location = ImitativeLoginGitLabUtil.getLocation(gitLabUrl); String[] urlAndCookie = location.split("&&"); //设置cookie Cookie cookie = new Cookie("_gitlab_session",urlAndCookie[1].replaceAll("_gitlab_session=","")); cookie.setPath("/"); response.addCookie(cookie); return "redirect:"+urlAndCookie[0]; } @RequestMapping(value = "/auth", method = RequestMethod.GET) public String auth(OAuthRequest request) { //需要自己写实现逻辑鉴权返回给gitlab return "redirect:""; } /** * 获取用户信息 * * @return */ @ResponseBody @RequestMapping(value = "/userInfo") public Object userInfo(HttpServletRequest request) { //gitlab请求参数查询用户信息,返回给gitlab UserGetResponse userGetResponse = null; Map<String, Object> resultMap = new HashMap<String, Object>(); resultMap.put("userAccountID", userGetResponse.getUserAccountID()); resultMap.put("realName", userGetResponse.getRealName()); resultMap.put("nickname", userGetResponse.getRealName()); resultMap.put("username", userGetResponse.getEmail().split("@")[0]); resultMap.put("email", userGetResponse.getEmail()); ResponseEntity<Object> responseEntity = new ResponseEntity<Object>(resultMap, HttpStatus.valueOf(200)); return responseEntity; } }定义ImitativeLoginGitLabUtil.javapackage com.dossen.gitlab.adapter.util; import kong.unirest.HttpResponse; import kong.unirest.Unirest; import org.jsoup.Jsoup; import org.jsoup.nodes.Document; import org.springframework.beans.factory.annotation.Value; /** * 模拟登录gitlab请求获取重定向值 * @Author wenfl * @Date 2021-10-14 */ public class ImitativeLoginGitLabUtil { public static String getLocation(String gitLabUrl){ HttpResponse<String> response = null; try { //打开登录页面 response =Unirest.get(gitLabUrl).asString(); //得到document对象 Document doc = Jsoup.parse(response.getBody()); String authenticity_token = doc.select("meta[name=csrf-token]").get(0).attr("content"); String cookeiValue = response.getHeaders().getFirst("Set-Cookie"); response = Unirest.post(gitLabUrl+"/users/auth/OneID") .header("Cookie", cookeiValue) .header("Content-Type", "application/x-www-form-urlencoded") .field("authenticity_token", authenticity_token) .asString(); //获得重定向地址 String location = response.getHeaders().getFirst("Location")+"&&"+cookeiValue.split(";")[0]; return location; } catch (Exception e) { e.printStackTrace(); } return ""; } }经过上面的操作就已完成常规的登录了,界面如下后续因公司已有一套用户中心,需要实现直接在用户中心点击就完成登录的过程跳转到首页。结合OAuthController中getGitLabStateVal方法完成模拟gitlab页面点击第三方登录按钮操作,主要还是设置cookie的动作,需要在gitlab的域中设置才能生效 :修改gitlab的nginx配置/var/opt/gitlab/nginx/conf/gitlab-http.conf# 以下操作是为了能让用户中心点击图标实现登录的过程 location /oneid/login{ proxy_set_header Host $http_host; proxy_set_header X-Real-IP $remote_addr; proxy_set_header REMOTE-HOST $remote_addr; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_pass http://10.30.75.85:31900/login; }修改proxy_pass为java web项目地址执行:gitlab-ctl restart nginx注:不要执行gitlab-ctl reconfigure,否则配置会被覆盖这样就可以在用户中心配置地址为:http://gitlaburl.com/oneid/login,就可以完成登录的动作了。
Spring Boot 2.6.0已经正式发布,快看看作了哪些升级。1、禁止了循环依赖循环依赖问题一直困扰大家,也是面试常问题目之一,对循环依赖不了解的可以看这篇:Spring高频面试题:如何解决循环依赖问题Spring Boot 2.6.0之后,如果程序中存在循环依赖问题,启动上就会失败,报错:┌─────┐ | a (field private com.zhiyin.TestB com.zhiyin.TestA.b) ↑ ↓ | b (field private com.zhiyin.TestA com.zhiyin.TestB.a) └─────┘ Action: Relying upon circular references is discouraged and they are prohibited by default. Update your application to remove the dependency cycle between beans. As a last resort, it may be possible to break the cycle automatically by setting spring.main.allow-circular-references to true.如果程序设计非常合理,完全避免了循环依赖,是最完美的。如果实在不能,Spring Boot也提供了折中解决办法,在报错信息中已经明示:As a last resort, it may be possible to break the cycle automatically by setting spring.main.allow-circular-references to true.需要我们在配置文件application.properties里加上这个属性:spring.main.allow-circular-references = true程序依然可以正常启动。2、新增自定义脱敏规则Spring Boot 2.6.0 支持/env端点和configprops配置属性的自定义脱敏,自定义SanitizingFunction类型的Bean即可实现,如下:@Bean public SanitizingFunction pwdSanitizingFunction() { return data -> { org.springframework.core.env.PropertySource<?> propertySource = data.getPropertySource(); String key = data.getKey(); // 仅对redis.properties里面的某些key做脱敏 if (propertySource.getName().contains("redis.properties")) { if (key.equals("redis.pwd")) { return data.withValue(SANITIZED_VALUE); } } return data; }; }对于部分数据脱敏,这个改进非常灵活,很有用。3、Redis自动开启连接池在2.6.0之前的版本,配置Redis时是否启用连接池是由使用者指定,2.6.0之后是默认开启,如果需要关闭,则需要配置:spring.redis.jedis.pool.enabled = false或者spring.redis.lettuce.pool.enabled = false说明Spring Boot支持大家使用Redis连接池。4、支持使用WebTestClient来测试Spring MVC开发人员可以使用 WebTestClient 在模拟环境中测试程序,只需要在Mock环境中使用 @AutoConfigureMockMvc 注释,就可以轻松注入 WebTestClient。,省去编写测试程序。5、默认使用全新匹配策略请求路径与 Spring MVC 处理映射匹配的默认策略已从AntPathMatcher更改为PathPatternParser。你可以设置spring.mvc.pathmatch.matching-strategy为ant-path-matcher来改变它。2.6.0之前:public static class Pathmatch { private MatchingStrategy matchingStrategy = MatchingStrategy.ANT_PATH_MATCHER; }2.6.0之后:public static class Pathmatch { private MatchingStrategy matchingStrategy = MatchingStrategy.PATH_PATTERN_PARSER; }两者差异上:PathPattern去掉了Ant字样,但保持了很好的向下兼容性:除了不支持将**写在path中间之外,其它的匹配规则从行为上均保持和AntPathMatcher一致,并且还新增了强大的{*pathVariable}的支持。6、增强了/info管理端点,加上了Java运行时信息如下:{ "java": { "vendor": "BellSoft", "version": "17", "runtime": { "name": "OpenJDK Runtime Environment", "version": "17+35-LTS" }, "jvm": { "name": "OpenJDK 64-Bit Server VM", "vendor": "BellSoft", "version": "17+35-LTS" } } }7、其他变化Servlet应用现在支持在Cookie中添加SameSite。支持在主端口或管理端口上配置健康组。在 Spring Boot 2.4 中弃用的类、方法和属性已在此版本中删除。支持 Log4j2 复合配置支持构建信息属性排除另外需要注意的是,Spring Boot每年会在5月份和11月份发布两个中型版本,每个中型版本提供1年的免费支持,那也就意味着2.4.x已经停止了版本停止(免费)支持。不过对本次版本更新点有所了解即可,等待下次大版本更新再去学,一更新马上用新的实在学不动~~参考:https://github.com/spring-projects/spring-boot/wiki/Spring-Boot-2.6.0-Configuration-Changelog
现在比较多的互联网公司都在尝试将微服务迁到云上,这样的能够通过一些成熟的云容器管理平台更为方便地管理微服务集群,从而提高微服务的稳定性,同时也能较好地提升团队开发效率。但是迁云存在一定的技术难点,今天这篇文章主要介绍如何从0开始搭建一套基于K8s部署的SpringBoot案例教程。基础环境准备:mac操作系统SpringBoot的简单Web工程minikube的环境搭建安装一个适合我们初级入门的k8s环境,比较好的推荐是使用minikube工具,同时使用该工具可以更好地降低我们对k8s的学习门槛。首先我们需要下载minikube文件:curl -Lo minikube https://github.com/kubernetes/minikube/releases/download/v1.5.0/minikube-linux-amd64 && chmod +x minikube && sudo mv minikube /usr/local/bin/在安装minikube的时候,尝试下载镜像的时候可能会卡住,例如出现下边的这类异常:【idea @ Mac】>>>>>>minikube start --registry-mirror=https://w4i0ckag.mirror.aliyuncs.com 😄 Darwin 10.15.3 上的 minikube v1.16.0 ✨ 根据现有的配置文件使用 docker 驱动程序 👍 Starting control plane node minikube in cluster minikube 🚜 Pulling base image ... E0126 17:03:30.131026 34416 cache.go:180] Error downloading kic artifacts: failed to download kic base image or any fallback image 🔥 Creating docker container (CPUs=2, Memory=1988MB) ... 🤦 StartHost failed, but will try again: creating host: create: creating: setting up container node: preparing volume for minikube container: docker run --rm --entrypoint /usr/bin/test -v minikube:/var gcr.io/k8s-minikube/kicbase:v0.0.15-snapshot4@sha256:ef1f485b5a1cfa4c989bc05e153f0a8525968ec999e242efff871cbb31649c16 -d /var/lib: exit status 125 stdout: stderr: Unable to find image 'gcr.io/k8s-minikube/kicbase:v0.0.15-snapshot4@sha256:ef1f485b5a1cfa4c989bc05e153f0a8525968ec999e242efff871cbb31649c16' locally docker: Error response from daemon: Get https://gcr.io/v2/: net/http: request canceled while waiting for connection (Client.Timeout exceeded while awaiting headers). See 'docker run --help'. 🤷 docker "minikube" container is missing, will recreate. 🔥 Creating docker container (CPUs=2, Memory=1988MB) ... 😿 Failed to start docker container. Running "minikube delete" may fix it: recreate: creating host: create: creating: setting up container node: preparing volume for minikube container: docker run --rm --entrypoint /usr/bin/test -v minikube:/var gcr.io/k8s-minikube/kicbase:v0.0.15-snapshot4@sha256:ef1f485b5a1cfa4c989bc05e153f0a8525968ec999e242efff871cbb31649c16 -d /var/lib: exit status 125 stdout: stderr: Unable to find image 'gcr.io/k8s-minikube/kicbase:v0.0.15-snapshot4@sha256:ef1f485b5a1cfa4c989bc05e153f0a8525968ec999e242efff871cbb31649c16' locally docker: Error response from daemon: Get https://gcr.io/v2/: net/http: request canceled while waiting for connection (Client.Timeout exceeded while awaiting headers). See 'docker run --help'. ❌ Exiting due to GUEST_PROVISION: Failed to start host: recreate: creating host: create: creating: setting up container node: preparing volume for minikube container: docker run --rm --entrypoint /usr/bin/test -v minikube:/var gcr.io/k8s-minikube/kicbase:v0.0.15-snapshot4@sha256:ef1f485b5a1cfa4c989bc05e153f0a8525968ec999e242efff871cbb31649c16 -d /var/lib: exit status 125 stdout: stderr: Unable to find image 'gcr.io/k8s-minikube/kicbase:v0.0.15-snapshot4@sha256:ef1f485b5a1cfa4c989bc05e153f0a8525968ec999e242efff871cbb31649c16' locally docker: Error response from daemon: Get https://gcr.io/v2/: net/http: request canceled while waiting for connection (Client.Timeout exceeded while awaiting headers). See 'docker run --help'. 😿 If the above advice does not help, please let us know: 👉 https://github.com/kubernetes/minikube/issues/new/choose此时可以尝试先在宿主机上安装好对应的镜像文件:docker pull anjone/kicbase然后minikube在启动的时候使用本地的镜像,这样可以减少minikube start过程的耗时。minikube下载了之后便到了启动环节:minikube start --vm-driver=docker --base-image="anjo ne/kicbase"如果启动失败,不妨试试更换指定的镜像仓库,例如下边这段:minikube start --registry-mirror=https://bmtb46e4.mirror.aliyuncs.com --vm-driver=docker --base-image="anjone/kicbase" --image-repository=registry.cn-hangzhou.aliyuncs.com/google_containers这里头我大概介绍一下启动参数的含义:--registry-mirror 这里的地址会和启动的minikube内部的docker.daemon文件中所指向的镜像仓库地址一致。--vm-driver 虚拟机引擎 这里是指minikube的内部通过docker来作为核心--base-image 声明好基础镜像,如果宿主机内部有对应镜像,就不需要额外拉取--image-repository 拉取镜像的仓库当minikube启动成功之后,大致如下所示:【idea @ Mac】>>>>>>minikube start --vm-driver=docker --base-image="anjone/kicbase" 😄 Darwin 10.15.3 上的 minikube v1.16.0 ✨ 根据现有的配置文件使用 docker 驱动程序 👍 Starting control plane node minikube in cluster minikube 🤷 docker "minikube" container is missing, will recreate. 🔥 Creating docker container (CPUs=2, Memory=1988MB) ... ❗ This container is having trouble accessing https://k8s.gcr.io 💡 To pull new external images, you may need to configure a proxy: https://minikube.sigs.k8s.io/docs/reference/networking/proxy/ 🐳 正在 Docker 19.03.2 中准备 Kubernetes v1.20.0… ▪ Generating certificates and keys ... ▪ Booting up control plane ...\ ▪ Configuring RBAC rules ... 🔎 Verifying Kubernetes components... 🌟 Enabled addons: default-storageclass 🏄 Done! kubectl is now configured to use "minikube" cluster and "default" namespace by default 【idea @ Mac】>>>>>>好了,接下来便到了部署SpringBoot应用的部分了。基于SpringBoot部署到k8s中首先我们需要搭建一个简单的SpringBoot应用:引入dependency依赖<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> </dependencies>打包docker镜像的配置:<build> <finalName>打包出来的镜像名称</finalName> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <version>2.2.5.RELEASE</version> </plugin> <!-- Docker maven plugin --> <plugin> <groupId>com.spotify</groupId> <artifactId>docker-maven-plugin</artifactId> <version>1.0.0</version> <configuration> <imageName>${project.artifactId}</imageName> <imageTags> <tag>1.0.1</tag> </imageTags> <dockerDirectory>src/main/docker</dockerDirectory> <resources> <resource> <targetPath>/</targetPath> <directory>${project.build.directory}</directory> <include>${project.build.finalName}.jar</include> </resource> </resources> </configuration> </plugin> <!-- Docker maven plugin --> </plugins> </build>接着是简单的controller和启动类:@RestController @RequestMapping(value = "/test") public class TestController { @GetMapping(value = "/do-test") public String doTest(){ System.out.println("this is a test"); return "success"; } } @SpringBootApplication public class WebApplication { public static void main(String[] args) { SpringApplication.run(WebApplication.class); } }编写Dockerfile的脚本:FROM openjdk:8-jdk-alpine VOLUME /tmp #将springboot-k8s-template.jar复制到容器内部 并且别名叫springboot-k8s-template-v1.jar ADD springboot-k8s-template.jar springboot-k8s-template-v1.jar #相当于在容器中用cmd命令执行jar包 指定外部配置文件 ENTRYPOINT ["java","-Djava.security.egd=file:/dev/./urandom","-jar","/springboot-k8s-template-v1.jar"]然后进入到Dockerfile的目录底下,进行镜像的构建:【idea @ Mac】>>>>>>docker build -t springboot-k8s-template:1.0 . [+] Building 0.5s (7/7) FINISHED => [internal] load build definition from Dockerfile 0.0s => => transferring dockerfile: 419B 0.0s => [internal] load .dockerignore 0.0s => => transferring context: 2B 0.0s => [internal] load metadata for docker.io/library/openjdk:8-jdk-alpine 0.0s => [internal] load build context 0.3s => => transferring context: 17.60MB 0.3s => CACHED [1/2] FROM docker.io/library/openjdk:8-jdk-alpine 0.0s => [2/2] ADD springboot-k8s-template.jar springboot-k8s-template-v1.jar 0.1s => exporting to image 0.1s => => exporting layers 0.1s => => writing image sha256:86d02961c4fa5bb576c91e3ebf031a3d8b140ddbb451b9613a2c4d601ac4d853 0.0s => => naming to docker.io/library/springboot-k8s-template:1.0 0.0s Use 'docker scan' to run Snyk tests against images to find vulnerabilities and learn how to fix them 【idea @ Mac】>>>>>>docker images | grep template springboot-k8s-template 1.0 86d02961c4fa 48 seconds ago 122MB构建完成之后,将本地镜像打包发布到镜像仓库中,这里我是通过推送到阿里云镜像仓库的方式来处理的。推送本地镜像到阿里云首先是登录到docker仓库,然后记录对应的tag信息,最终推送镜像。$ docker login --username=[阿里云账户名称] registry.cn-qingdao.aliyuncs.com $ docker tag [ImageId] registry.cn-qingdao.aliyuncs.com/idea_hub/idea_resp:[镜像版本号] $ docker push registry.cn-qingdao.aliyuncs.com/idea_hub/idea_resp:[镜像版本号]例如:【idea @ Mac】>>>>>>docker images | grep config qiyu-framework-k8s-config 1.0 6168639757e9 2 minutes ago 122MB 【idea @ Mac】>>>>>>docker tag 6168639757e9 registry.cn-qingdao.aliyuncs.com/idea_hub/idea_resp:qiyu-framework-k8s-config-1.0 【idea @ Mac】>>>>>>docker push registry.cn-qingdao.aliyuncs.com/idea_hub/idea_resp:qiyu-framework-k8s-config-1.0 The push refers to repository [registry.cn-qingdao.aliyuncs.com/idea_hub/idea_resp] 1ace00556b41: Pushed ceaf9e1ebef5: Layer already exists 9b9b7f3d56a0: Layer already exists f1b5933fe4b5: Layer already exists qiyu-framework-k8s-config-1.0: digest: sha256:50c1a87484f6cbec699d65321fa5bbe70f5ad6da5a237e95ea87c7953a1c80da size: 1159 【idea @ Mac】>>>>>>请根据实际镜像信息替换示例中的[ImageId]和[镜像版本号]参数。将镜像文件打包并且推送到镜像仓库之后,可以通过在yaml文件中编写对应的镜像地址,这样就能保证在镜像下载的时候能从仓库拉取出对应的镜像文件。通常项目中我们会采用统一的yaml文件来进行pod节点的部署与构建。yaml配置文件:apiVersion: apps/v1 #kubectl api-versions 可以通过这条指令去看版本信息 kind: Deployment # 指定资源类别 metadata: #资源的一些元数据 name: springboot-k8s-template-deployment #deloyment的名称 labels: app: springboot-k8s-template-deployment #标签 spec: replicas: 2 #创建pod的个数 selector: matchLabels: app: springboot-k8s-template-deployment #满足标签为这个的时候相关的pod才能被调度到 template: metadata: labels: app: springboot-k8s-template-v1 spec: containers: - name: springboot-k8s-template-v1 image: registry.cn-qingdao.aliyuncs.com/idea_hub/idea_resp:1.0 imagePullPolicy: IfNotPresent ports: - containerPort: 8080由于阿里云的镜像仓库需要用户账号密码权限访问,所以这里我们可以尝试简单一些的策略,登录minikube的内部,提前下载好对应的阿里云镜像。通过 minikube ssh 指令即可登录到minikube的内部:采用docker pull指令即可下载对应资源:docker@minikube:~$ docker pull registry.cn-qingdao.aliyuncs.com/idea_hub/idea_resp:springboot-k8s-template-1.0 springboot-k8s-template-1.0: Pulling from idea_hub/idea_resp e7c96db7181b: Already exists f910a506b6cb: Already exists c2274a1a0e27: Already exists d2fe98fe1e4e: Pull complete Digest: sha256:dc1c9caa101df74159c1224ec4d7dcb01932aa8f4a117bba603ffcf35e91c60c Status: Downloaded newer image for registry.cn-qingdao.aliyuncs.com/idea_hub/idea_resp:springboot-k8s-template-1.0 registry.cn-qingdao.aliyuncs.com/idea_hub/idea_resp:springboot-k8s-template-1.0 docker@minikube:~$查看对应的镜像文件镜像拉取策略可以对照官网的介绍来系统认识:https://kubernetes.io/docs/concepts/containers/images/在yaml文件里我选用了IfNotPresent策略,这条策略能够保证当本地有镜像的时候优先选用本地,没有才选择网络拉取。最后是找到相关的yaml文件进行pod的部署启动。kubectl create -f ./k8s-springboot-template.yaml此时通过 kubectl get pod 命令可以看到对应的pod节点:最终需要暴露deployment服务:【idea @ 拍了拍我的iterm2 说】>>>>>> kubectl expose deployment springboot-k8s-template-deployment --type=NodePort service/springboot-k8s-template-deployment exposed 【idea @ 拍了拍我的iterm2 说】>>>>>> kubectl get pods NAME READY STATUS RESTARTS AGE springboot-k8s-template-deployment-687f8bf86d-gqxcp 1/1 Running 0 7m50s springboot-k8s-template-deployment-687f8bf86d-lcq5p 1/1 Running 0 7m50s 【idea @ 拍了拍我的iterm2 说】>>>>>> minikube service springboot-k8s-template-deployment |-----------|------------------------------------|-------------|---------------------------| | NAMESPACE | NAME | TARGET PORT | URL | |-----------|------------------------------------|-------------|---------------------------| | default | springboot-k8s-template-deployment | 8080 | http://192.168.49.2:31179 | |-----------|------------------------------------|-------------|---------------------------| 🏃 Starting tunnel for service springboot-k8s-template-deployment. |-----------|------------------------------------|-------------|------------------------| | NAMESPACE | NAME | TARGET PORT | URL | |-----------|------------------------------------|-------------|------------------------| | default | springboot-k8s-template-deployment | | http://127.0.0.1:57109 | |-----------|------------------------------------|-------------|------------------------| 🎉 正通过默认浏览器打开服务 default/springboot-k8s-template-deployment... ❗ Because you are using a Docker driver on darwin, the terminal needs to be open to run it.暴露之后访问:http://127.0.0.1:57109/test/do-test验证接口正常。minikube日志查看:kubectl logs -f springboot-k8s-template-deployment-687f8bf86d-lcq5p
项目背景和各位读者大致介绍下具体场景,线上的小程序中开放一些语音麦克风的房间,让用户进入房间之后可以互相通过语音聊天的方式进行互动。这里分享一下相关的技术设计方案。这款系统的核心点设计在于如何能让一个用户发出的语音通知到其他用户上边。语音数据在客户端同事的处理下最终变成了io数据流请求到了后端,后端只需要将这些数据流传达给各个不同的终端即可达到广播通知的效果。单机版架构最初期上线的时候,为了赶速度,快速试错,所以简单地采用了单机版架构去设计。结合技术栈为 SpringBoot,WebSocket,MySQL技术。线上一间语音房间的同时在线人数并不会特别多,大概在15-50人的区间段内,系统核心代码是通过SpringBoot内部的WebSocket技术去进行数据的主动推送。设计思路整体的设计图比较简单,基本就是一台服务器存储WebSocket连接,如下图所示:用户进行WebSocket初始化连接的时候需要一个连接分配和存储的过程:早期的存储是存放在了服务器本地的一个Map集合中。当WebSocket进行连接的时候就会往内存中写入一条数据信息,当链接断开的时候,就将内存中的数据移除。然后进行语音广播的时候需要结合WebSocket内部的广播发送功能进行通知看似设计比较简单,但是在后期业务变得庞大的时候出现了瓶颈。因为随着参加语音活动用户的增加,越来越多的WebSocketSession对象需要被存储到内存当中,这种有状态性的存储对于单机扩容不灵活。设计缺陷1.假设原先的服务器扩容到了A,B两台机器,A用户在A机器上边建立了WebSocketSession,B用户在B机器上边建立的WebSocketSession连接。此时如果A想要和B进行对话发送,需要先查找到具体WebSocketSession存放在哪台机器上边。2.当用户出现了网络异常,临时断开连接进行重连的时候,也可能会出现1所说的问题。集群架构设计思路一旦出现需要发送语音通知的时候,发送一条广播的mq消息,每个机器都接收到消息之后,触发自己的广播操作即可。RocketMq的接入系统设计里面mq采用的是广播模式,这和我们通常使用的集群模式有一定的区别。消息队列RocketMQ版是基于发布或订阅模型的消息系统。消费者,即消息的订阅方订阅关注的Topic,以获取并消费消息。由于消费者应用一般是分布式系统,以集群方式部署,因此消息队列RocketMQ版约定以下概念:集群:使用相同Group ID的消费者属于同一个集群。同一个集群下的消费者消费逻辑必须完全一致(包括Tag的使用)。集群消费:当使用集群消费模式时,消息队列RocketMQ版认为任意一条消息只需要被集群内的任意一个消费者处理即可。广播消费:当使用广播消费模式时,消息队列RocketMQ版会将每条消息推送给集群内所有注册过的消费者,保证消息至少被每个消费者消费一次。集群消费模式适用场景 适用于消费端集群化部署,每条消息只需要被处理一次的场景。此外,由于消费进度在服务端维护,可靠性更高。具体消费示例如下图所示。注意事项集群消费模式下,每一条消息都只会被分发到一台机器上处理。如果需要被集群下的每一台机器都处理,请使用广播模式。集群消费模式下,不保证每一次失败重投的消息路由到同一台机器上。广播消费模式适用场景 适用于消费端集群化部署,每条消息需要被集群下的每个消费者处理的场景。具体消费示例如下图所示。注意事项广播消费模式下不支持顺序消息。广播消费模式下不支持重置消费位点。每条消息都需要被相同订阅逻辑的多台机器处理。消费进度在客户端维护,出现重复消费的概率稍大于集群模式。广播模式下,消息队列RocketMQ版保证每条消息至少被每台客户端消费一次,但是并不会重投消费失败的消息,因此业务方需要关注消费失败的情况。广播模式下,客户端每一次重启都会从最新消息消费。客户端在被停止期间发送至服务端的消息将会被自动跳过,请谨慎选择。广播模式下,每条消息都会被大量的客户端重复处理,因此推荐尽可能使用集群模式。广播模式下服务端不维护消费进度,所以消息队列RocketMQ版控制台不支持消息堆积查询、消息堆积报警和订阅关系查询功能。这里面的应用场景需要对集群内部对每个消费者都对服务器内存中的socket连接进行session是否存在对判断,因此需要采用mq的广播模式。关于mq部分的接入代码Consumer模块的配置:package org.idea.web.socket.config; import org.springframework.boot.context.properties.ConfigurationProperties; /** * @Author linhao * @Date created in 10:30 上午 2021/5/10 */ @ConfigurationProperties(prefix = "rocketmq.consumer") public class MqConsumerConfig { private boolean isOn; private String groupName; private String nameSrvAddr; private String topics; private Integer consumeThreadMin; private Integer consumeThreadMax; private Integer consumeMessageBatchMaxSize; /** getter 和 setter部分省略 **/ }Producer模块的配置展示:package org.idea.web.socket.config; import org.springframework.boot.context.properties.ConfigurationProperties; /** * @Author linhao * @Date created in 10:26 上午 2021/5/10 */ @ConfigurationProperties(prefix = "rocketmq.producer") public class MqProducerConfig { private boolean isOn; private String groupName; private String nameSrvAddr; private Integer maxMessageSize; private Integer sendMsgTimeout; private Integer retryTimesWhenSendFailed; /** getter 和 setter部分省略 **/ }RocketMq内部的消费端Bean配置package org.idea.web.socket.mq; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import org.idea.web.socket.config.MqConsumerConfig; import org.idea.web.socket.config.MqProducerConfig; import org.springframework.boot.autoconfigure.AutoConfigureAfter; import org.springframework.boot.autoconfigure.AutoConfigureBefore; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import javax.annotation.Resource; /** * @Author linhao * @Date created in 10:34 上午 2021/5/10 */ @Configuration @Slf4j @EnableConfigurationProperties({MqConsumerConfig.class}) public class MqConsumerAutoConfig { @Resource private MqConsumerConfig mqConsumerConfig; @Resource //这个接口需要手动实现顺序消费的逻辑 每次获取到消息队列的第一条数据 private MessageListenerHandler messageListenerConcurrently; @Bean @ConditionalOnMissingBean public DefaultMQPushConsumer defaultMQPushConsumer() { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(); consumer.setNamesrvAddr(mqConsumerConfig.getNameSrvAddr()); consumer.setConsumerGroup(mqConsumerConfig.getGroupName()); consumer.setConsumeThreadMin(mqConsumerConfig.getConsumeThreadMin()); consumer.setConsumeThreadMax(mqConsumerConfig.getConsumeThreadMax()); consumer.registerMessageListener(messageListenerConcurrently); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); //消费模型是什么? consumer.setMessageModel(MessageModel.BROADCASTING); //默认一次拉取一条消费 consumer.setConsumeMessageBatchMaxSize(mqConsumerConfig.getConsumeMessageBatchMaxSize()); //*表示订阅所有的tag try { consumer.subscribe(mqConsumerConfig.getTopics(), "*"); consumer.start(); log.info("【 MqConsumerAutoConfig 】mq consumer is started!"); } catch (Exception e) { log.error("mq start fail,e is ", e); } return consumer; } }
Spring cloud gateway是替代zuul的网关产品,基于Spring 5、Spring boot 2.0以上、Reactor, 提供任意的路由匹配和断言、过滤功能。上一篇文章谈了一下Gateway网关使用不规范,同事加班泪两行~,这篇文章将会侧重于其他的几个需要注意的地方。网关实现这里介绍编码方式实现HystrixObservableCommand.Setter getSetter() { HystrixCommandGroupKey groupKey = HystrixCommandGroupKey.Factory.asKey("group-accept"); HystrixObservableCommand.Setter setter = HystrixObservableCommand.Setter.withGroupKey(groupKey); HystrixCommandKey commandKey = HystrixCommandKey.Factory.asKey("command-accept"); setter.andCommandKey(commandKey); HystrixCommandProperties.Setter proertiesSetter = HystrixCommandProperties.Setter(); proertiesSetter /* * * 线程策略配置 */ //设置线程模式 缺省 1000ms .withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.THREAD) //执行是否启用超时时间 缺省 true .withExecutionTimeoutEnabled(true) //使用线程隔离时,是否对命令执行超时的线程调用中断 缺省false .withExecutionIsolationThreadInterruptOnFutureCancel(false) //执行超时的时候是否要它中断 缺省 true .withExecutionIsolationThreadInterruptOnTimeout(true) //执行的超时时间 缺省 1000ms .withExecutionTimeoutInMilliseconds(2000) /* * * 熔断策略 */ //是否开启溶断 缺省 true .withCircuitBreakerEnabled(true) // 是否允许熔断器忽略错误,默认false, 不开启 ; // true,断路器强制进入“关闭”状态,它会接收所有请求。 // 如果forceOpen属性为true,该属性不生效 .withCircuitBreakerForceClosed(false) // 是否强制开启熔断器阻断所有请求, 默认为false // 为true时,所有请求都将被拒绝,直接到fallback. // 如果该属性设置为true,断路器将强制进入“打开”状态, // 它会拒绝所有请求。该属性优于forceClosed属性 .withCircuitBreakerForceOpen(false) // 用来设置当断路器打开之后的休眠时间窗。 // 休眠时间窗结束之后,会将断路器设置为“半开”状态,尝试熔断的请求命令, // 如果依然请求错误就将断路器继续设置为“打开”状态,如果成功,就设置为“关闭”状态 // 熔断器默认工作时间,默认:5000豪秒. // 熔断器中断请求10秒后会进入半打开状态,放部分流量过去重试. .withCircuitBreakerSleepWindowInMilliseconds(5000) // 熔断器在整个统计时间内是否开启的阀值. // 在metricsRollingStatisticalWindowInMilliseconds(默认10s)内默认至少请求10次, // 熔断器才发挥起作用,9次熔断器都不起作用。 .withCircuitBreakerRequestVolumeThreshold(100) // 该属性用来设置断路器打开的错误百分比条件。默认值为50. // 表示在滚动时间窗中,在请求值超过requestVolumeThreshold阈值的前提下, // 如果错误请求数百分比超过50,就把断路器设置为“打开”状态,否则就设置为“关闭”状态 .withCircuitBreakerErrorThresholdPercentage(50); setter.andCommandPropertiesDefaults(proertiesSetter); return setter; }@Bean public RouteLocator customRouteLocator(RouteLocatorBuilder builder) { RouteLocatorBuilder.Builder routes = builder.routes(); RouteLocatorBuilder.Builder serviceProvider = routes .route("accept", r -> r.method(HttpMethod.GET) .and() .path("/gateway-accept/**") .and() .header(HttpHeaders.CONTENT_TYPE, "application/json;charset=UTF-8") .filters(f -> { f.rewritePath("/gateway-accept/(?<path>.*)", "/${path}"); f.requestRateLimiter( config -> config.setKeyResolver(new GenericAccessResolver()) .setRateLimiter(redisRateLimiter())); f.hystrix(config -> config.setName("accept") .setFallbackUri("forward:/gateway-fallback") .setSetter(getSetter())); return f; }) .uri("http://localhost:8888") ); return serviceProvider.build(); }在上面的代码中,主要做了3件事情:限流、熔断策略及降级方法配置限流配置redisspring: redis: database: 0 host: 127.0.0.1 port: 6379 password: timeout: 1500 lettuce: pool: max-active: 300 #连接池最大连接数(使用负值表示没有限制) max-idle: 10 #连接池中的最大空闲连接 min-idle: 5 #连接池中的最小空闲连接 max-wait: -1 #连接池最大阻塞等待时间(使用负值表示没有限制)自定义解析/** * @description: 按照访问地址进行限流(也可以安装其他条件进行限流),具体可以看exchange.getRequest()的方法和属性 **/ public class GenericAccessResolver implements KeyResolver { @Override public Mono<String> resolve(ServerWebExchange exchange) { return Mono.just(exchange.getRequest().getPath().value()); } }自定义限流配置RedisRateLimiter redisRateLimiter() { //1000,1500对应replenishRate、burstCapacity return new RedisRateLimiter(1000, 1500); }网关使用自定义限流器(网关使用代码实现)@Bean public RouteLocator customRouteLocator(RouteLocatorBuilder builder) { RouteLocatorBuilder.Builder routes = builder.routes(); RouteLocatorBuilder.Builder serviceProvider = routes .route("accept", r -> r.method(HttpMethod.GET) .and() .path("/gateway-accept/**") .and() .header(HttpHeaders.CONTENT_TYPE, "application/json;charset=UTF-8") //.and() //.readBody(String.class, readBody -> true) .filters(f -> { f.rewritePath("/gateway-accept/(?<path>.*)", "/${path}"); f.requestRateLimiter(config -> config.setKeyResolver(new GenericAccessResolver()).setRateLimiter(redisRateLimiter())); return f; }) .uri("http://localhost:8888") ); return serviceProvider.build(); }测试jmeter配置结果其他如果有多个路由,使用不同的限流策略,可以自定义KeyResolver和RedisRateLimiter, 在路由定义时加入//基于ip限流 public class OtherAccessResolver implements KeyResolver { @Override public Mono<String> resolve(ServerWebExchange exchange) { return Mono.just(exchange.getRequest().getRemoteAddress().getHostName()); } }RedisRateLimiter otherRedisRateLimiter() { //1000,1500对应replenishRate、burstCapacity return new RedisRateLimiter(100, 500); }@Bean public RouteLocator customRouteLocator(RouteLocatorBuilder builder) { RouteLocatorBuilder.Builder routes = builder.routes(); RouteLocatorBuilder.Builder serviceProvider = routes .route("accept", r -> r.method(HttpMethod.GET) .and() .path("/gateway-accept/**") .and() .header(HttpHeaders.CONTENT_TYPE, "application/json;charset=UTF-8") .filters(f -> { f.rewritePath("/gateway-accept/(?<path>.*)", "/${path}"); f.requestRateLimiter( config -> config.setKeyResolver(new GenericAccessResolver()) .setRateLimiter(redisRateLimiter())); f.hystrix(config -> config.setName("accept") .setFallbackUri("forward:/gateway-fallback") .setSetter(getSetter())); return f; }) .uri("http://localhost:8888")) .route("sign", r -> r.method(HttpMethod.POST) .and() .path("/gateway-sign/**") .and() .header(HttpHeaders.CONTENT_TYPE, "application/json;charset=UTF-8") .filters(f -> { f.rewritePath("/gateway-sign/(?<path>.*)", "/${path}"); f.requestRateLimiter( config -> config.setKeyResolver(new OtherAccessResolver()) .setRateLimiter(otherRedisRateLimiter())); f.hystrix(config -> config.setName("sign") .setFallbackUri("forward:/gateway-fallback") .setSetter(getSetter())); return f; }) .uri("http://localhost:7777") ); return serviceProvider.build(); }熔断策略熔断策略主要是线程配置和熔断配置,上面已经说明很清楚了。在上篇文章中,为了解决网关调用后台服务Connection prematurely closed BEFORE response的问题,要设置后台服务线程的空闲时间和网关线程池线程的空闲时间,并让网关线程池线程的空闲时间小于后台服务的空闲时间配置方法spring: cloud: gateway: httpclient: pool: max-connections: 500 max-idle-time: 10000编码实现翻阅Spring Cloud Gateway英文资料,知道路由提供一个metadata方法,可以设置路由的元数据(https://docs.spring.io/spring-cloud-gateway/docs/2.2.6.RELEASE/reference/html/#route-metadata-configuration),这些元数据在RouteMetadataUtils中定义:package org.springframework.cloud.gateway.support; public final class RouteMetadataUtils { public static final String RESPONSE_TIMEOUT_ATTR = "response-timeout"; public static final String CONNECT_TIMEOUT_ATTR = "connect-timeout"; private RouteMetadataUtils() { throw new AssertionError("Must not instantiate utility class."); } }其中没有我要的线程数量(max-connection)和空闲时间(max-idle-time)的设置,没有关系,自己加上去:@Bean public RouteLocator customRouteLocator(RouteLocatorBuilder builder) { RouteLocatorBuilder.Builder routes = builder.routes(); RouteLocatorBuilder.Builder serviceProvider = routes .route("accept", r -> r.method(HttpMethod.GET) .and() .path("/gateway-accept/**") .and() .header(HttpHeaders.CONTENT_TYPE, "application/json;charset=UTF-8") .filters(f -> { f.rewritePath("/gateway-accept/(?<path>.*)", "/${path}"); f.requestRateLimiter( config -> config.setKeyResolver(new GenericAccessResolver()) .setRateLimiter(redisRateLimiter())); f.hystrix(config -> config.setName("accept") .setFallbackUri("forward:/gateway-fallback") .setSetter(getSetter())); return f; }) .uri("http://localhost:8888") .metadata("max-idle-time", 10000) //网关调用后台线程空闲时间设置 .metadata("max-connections", 200) //网关调用后台服务线程数量设置 ); return serviceProvider.build(); }测试果然和yml配置一样有效果。降级方法降级方法本身没有什么特别,有一个问题需要注意,调用降级方法也是使用线程池的,缺省在HystrixThreadPoolProperties中定义:public abstract class HystrixThreadPoolProperties { /* defaults */ static int default_coreSize = 10; // core size of thread pool static int default_maximumSize = 10; // maximum size of thread pool static int default_keepAliveTimeMinutes = 1; // minutes to keep a thread alive static int default_maxQueueSize = -1; // size of queue (this can't be dynamically changed so we use 'queueSizeRejectionThreshold' to artificially limit and reject) // -1 turns it off and makes us use SynchronousQueue错误如果上面的限流设置比较大,比如1000,最大突发2000,网关调用后台服务发生熔断降级, 熔断后降级的方法调用太频繁,10个线程不够用,会导致以下500错误:2021-02-01 14:29:45.076 ERROR 64868 --- [ioEventLoop-5-1] a.w.r.e.AbstractErrorWebExceptionHandler : [a0ed6911-18982] 500 Server Error for HTTP GET "/gateway-accept/test" com.netflix.hystrix.exception.HystrixRuntimeException: command-accept fallback execution rejected. at com.netflix.hystrix.AbstractCommand.handleFallbackRejectionByEmittingError(AbstractCommand.java:1043) ~[hystrix-core-1.5.18.jar:1.5.18] Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: Error has been observed at the following site(s): |_ checkpoint ⇢ org.springframework.cloud.gateway.filter.WeightCalculatorWebFilter [DefaultWebFilterChain] |_ checkpoint ⇢ HTTP GET "/gateway-accept/test" [ExceptionHandlingWebHandler] com.netflix.hystrix.exception.HystrixRuntimeException: command-accept fallback execution rejected. at com.netflix.hystrix.AbstractCommand.handleFallbackRejectionByEmittingError(AbstractCommand.java:1043) ~[hystrix-core-1.5.18.jar:1.5.18] Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: Error has been observed at the following site(s): |_ checkpoint ⇢ org.springframework.cloud.gateway.filter.WeightCalculatorWebFilter [DefaultWebFilterChain] |_ checkpoint ⇢ HTTP GET "/gateway-accept/test" [ExceptionHandlingWebHandler]配置方法所以要在yml中设置合适的调用降级方法的线程池, 合理的配置能够杜绝网关500错误的发生。hystrix: threadpool: group-accept: #代码里面设置的HystrixCommandGroupKey.Factory.asKey("group-accept") coreSize: 50 #并发执行的最大线程数,默认10 maxQueueSize: 1500 #BlockingQueue的最大队列数 #即使maxQueueSize没有达到,达到queueSizeRejectionThreshold该值后,请求也会被拒绝 queueSizeRejectionThreshold: 1400网关异常截获上面的异常后,没有捕获异常直接返回前端500错误,一般情况下需要返回一个统一接口,比如:@Data @ToString @EqualsAndHashCode @Accessors(chain = true) public class Result<T> implements Serializable { private Integer code; private String message; private T data; private String sign; public static final String SUCCESS = "成功"; public static final String FAILURE = "失败"; public Result(int code, String message) { this.code = code; this.message = message; } public Result(int code, String message, T data) { this.code = code; this.message = message; this.data = data; } public Result(int code, String message, T data, String sign) { this.code = code; this.message = message; this.data = data; this.sign = sign; } public static Result<Object> success() { return new Result<Object>(200, SUCCESS); } public static Result<Object> success(Object data) { return new Result<Object>(200, SUCCESS, data); } public static Result<Object> success(Object data, String sign) { return new Result<Object>(200, SUCCESS, data, sign); } public static Result<Object> failure() { return new Result<Object>(400, FAILURE); } public static Result<Object> failure(Object data) { return new Result<Object>(400, FAILURE, data); } public static Result<Object> failure(Object data, String sign) { return new Result<Object>(400, FAILURE, data, sign); } }创建GlobalExceptionConfiguration 实现ErrorWebExceptionHandler(这一段是来者网友提供的)@Slf4j @Order(-1) @Component @RequiredArgsConstructor public class GlobalExceptionConfiguration implements ErrorWebExceptionHandler { private final ObjectMapper objectMapper; @Override public Mono<Void> handle(ServerWebExchange exchange, Throwable ex) { ServerHttpResponse response = exchange.getResponse(); if (response.isCommitted()) { return Mono.error(ex); } response.getHeaders().setContentType(MediaType.APPLICATION_JSON_UTF8); if (ex instanceof ResponseStatusException) { response.setStatusCode(((ResponseStatusException) ex).getStatus()); } return response .writeWith(Mono.fromSupplier(() -> { DataBufferFactory bufferFactory = response.bufferFactory(); try { return bufferFactory.wrap(objectMapper.writeValueAsBytes(Result.failure(ex.getMessage()))); } catch (JsonProcessingException e) { log.warn("Error writing response", ex); return bufferFactory.wrap(new byte[0]); } })); } }这样,就会把网关异常统一包装在接口中返回:如:后台日志已经没有之前的错误日志了。编码实现,没找到由于Spring Cloud Gateway 中的 Hystrix采用的是HystrixObservableCommand.Setter, 没有采用 HystrixCommand.Setter, 在 HystrixCommand.Setter中是可以编码实现线程池配置的, 但是在HystrixObservableCommand.Setter没有提供:final public static class Setter { protected final HystrixCommandGroupKey groupKey; protected HystrixCommandKey commandKey; protected HystrixThreadPoolKey threadPoolKey; //有属性但是没有set方法 protected HystrixCommandProperties.Setter commandPropertiesDefaults; protected HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults; //有属性没有set方法 protected Setter(HystrixCommandGroupKey groupKey) { this.groupKey = groupKey; // default to using SEMAPHORE for ObservableCommand commandPropertiesDefaults = setDefaults(HystrixCommandProperties.Setter()); } public static Setter withGroupKey(HystrixCommandGroupKey groupKey) { return new Setter(groupKey); } public Setter andCommandKey(HystrixCommandKey commandKey) { this.commandKey = commandKey; return this; } public Setter andCommandPropertiesDefaults(HystrixCommandProperties.Setter commandPropertiesDefaults) { this.commandPropertiesDefaults = setDefaults(commandPropertiesDefaults); return this; } private HystrixCommandProperties.Setter setDefaults(HystrixCommandProperties.Setter commandPropertiesDefaults) { if (commandPropertiesDefaults.getExecutionIsolationStrategy() == null) { // default to using SEMAPHORE for ObservableCommand if the user didn't set it commandPropertiesDefaults.withExecutionIsolationStrategy(ExecutionIsolationStrategy.SEMAPHORE); } return commandPropertiesDefaults; } }由于本人水平有限,没有找到Setter中设置HystrixThreadPoolKey和HystrixThreadPoolProperties.Setter的方法,所以只能在yml中配置。有知道的同学告诉我一声,不胜感激。总结所以在Spring Cloud Gateway网关的配置中,需要综合考虑限流大小、网关调用后台连接池设置大小、后台服务的连接池以及空闲时间,包括网关调用降级方法的线程池配置,都需要在压测中调整到一个合理的配置,才能发挥最大的功效。本人水平有限,跟深入的研究还在继续,如果文章有表达错误或者不周,请大家指正,谢谢!END
问题Spring cloud gateway是替代zuul的网关产品,基于Spring 5、Spring boot 2.0以上、Reactor, 提供任意的路由匹配和断言、过滤功能。笔者公司之前有系统也使用了Spring cloud gateway做为后台应用访问的网关,采用的版本信息为:组件版本其他spring boot2.1.7.RELEASEspring cloudGreenwich.SR2spring cloud gateway2.1.2.RELEASE其中的一个路由的代码如下:@Bean public RouteLocator customRouteLocator(RouteLocatorBuilder builder) { RouteLocatorBuilder.Builder routes = builder.routes(); RouteLocatorBuilder.Builder serviceProvider = routes .route("accept", r -> r .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_UTF8_VALUE) .and() .method(HttpMethod.POST) .and() .readBody(String.class, readBody -> true) .and() .path("/gateway-accept/**") .filters(f -> { f.rewritePath("/gateway-accept/(?<path>.*)", "/${path}"); return f; }) .uri("lb://ACCEPT-MICROSERVICE")); return serviceProvider.build(); }后台采用的是版本也是spring boot 2.1.2.RELEASE, 内置Tomcat的一个服务。一开始系统运行良好,最近业务慢慢繁忙,前端调用说偶尔返回这样的问题:{"timestamp":"2021-01-06T01:50:00.468+0000","path":"/gateway-accept/policy","status":500,"error":"Internal Server Error","message":"Connection prematurely closed BEFORE response"}字面意思是“响应前过早关闭连接”, 查看后台服务的日志,根本没有调用的信息,再次调用也没有问题,服务和网关都没有任何问题,到底怎么回事?原因由于这是Spring cloud gateway的问题,肯定有人碰上过,先去gateway的github上的issues去碰碰运气。果然,在issues中查找“Connection prematurely closed BEFORE response”,列出了十几条,相关的有七八条,一个一个翻阅,终于一个issue提到了相同的的问题:https://github.com/spring-cloud/spring-cloud-gateway/issues/1148总结如下:gateway调用后台服务,会使用httpclient连接池里面的连接gateway使用的httpclient连接池的连接有个参数:max-idle-time,大意指的是多长时间连接不使用就关闭。如果设置为null, 连接不会关闭。后台服务也有相应的连接对应连接池的连接,参数keepAliveTimeout,大意指后台服务的连接空闲多长时间就会自动关闭,缺省的值就是connection-timeout参数的值。如果为-1就不会有时间限制,缺省值为60s ,但是一般的再server.xml里面设置为20s.重要:如果网关的连接池的连接max-idle-time不设置(null),后台服务的connection-timeout为20s假设网络连接池有个连接(gateway-connect-1)对应后台服务的连接(server-connect-1)前端请求过来,gateway的分配给这个请求的连接正好是(gateway-connect-1), 向后端发起请求调用同时,服务端连接(server-connect-1)已经等待空闲20秒,自动关闭;可想而知,服务端没有和(gateway-connect-1)对应的连接,于是发生了异常。需要在网关层设置spring.cloud.gateway.httpclient.pool.max-idle-time需要服务端设置server.connection-timeout, 这个值要适当的大于网关层的max-idle-time, 意思就是,网关层对后端连接的空闲时间要小于后端服务的连接空闲时间,这样就不会取到无效的网关层的连接。解决根据上面的描述,我在yml里面加入:spring: cloud: gateway: httpclient: pool: max-idle-time: 5000 max-connections: 30在idea里面发现max-idle-time黄色标底,找不到这个配置,点击没有问题的max-connections, 定位到原来,我用的这个gateway版本2.1.2的连接池根本没有提供max-idle-time这个参数,那个版本可以提供?我新建了一个gateway服务,用的版本如下:组件版本其他spring boot2.3.4.RELEASEspring cloudHoxton.SR1spring cloud gateway2.2.1.RELEASE在网关服务层设置:spring: cloud: gateway: httpclient: pool: max-idle-time: 10000i点击max-idle-time, 可以看到已经提供这个参数了:后端服务设置(后端用的内嵌tomcat):server: tomcat: connection-timeout: 20000服务调用接口:@GetMapping(value = "/test") public String get() throws InterruptedException { Thread.sleep(10); return "laza"; }第一种设置网关不设置max-idle-time后端服务我设置connection-time: 100使用jmeter测试,配置如下:点击开始,后台出现错误:第二种设置网关设置max-idle-time:10000后端服务我设置connection-time: 20000jmeter设置和上面一样,测试一切正常。和版本也有点关系,我生产使用的版本目前不支持max-idle-time这个参数的设置,所以要升级一下gateway使用的版本了。后续在issues最后,发现这个:点进去以后,发现是蚂蚁金服的同学Lovnx, 详细的阐述了这个问题,有兴趣可以去看一下。他在文章中提到:reactor.netty.pool.leasingStrategy=lifo获取连接策略由默认的FIFO变更为LIFO,因为LIFO能够确保获取的连接最大概率是最近刚被用过的,也就是热点连接始终是热点连接,而始终用不到的连接就可以被回收掉,LRU的思想(文中原话)Reactor-Netty 版本问题 我查了一下,spring cloud gateway 2.2.1.release提供的Reactor-Netty版本是0.9.2.RELEASE<dependency> <groupId>io.projectreactor.netty</groupId> <artifactId>reactor-netty</artifactId> <version>0.9.2.RELEASE</version> <scope>compile</scope> </dependency>END
在使用Netty开发Websocket服务时,通常需要解析来自客户端请求的URL、Headers等等相关内容,并做相关检查或处理。本文将讨论两种实现方法。方法一:基于HandshakeComplete自定义事件特点:使用简单、校验在握手成功之后、失败信息可以通过Websocket发送回客户端。1.1 从netty源码出发一般地,我们将netty内置的WebSocketServerProtocolHandler作为Websocket协议的主要处理器。通过研究其代码我们了解到在本处理器被添加到Pipline后handlerAdded方法将会被调用。此方法经过简单的检查后将WebSocketHandshakeHandler添加到了本处理器之前,用于处理握手相关业务。我们都知道Websocket协议在握手时是通过HTTP(S)协议进行的,那么这个WebSocketHandshakeHandler应该就是处理HTTP相关的数据的吧?下方代码经过精简,放心阅读😄package io.netty.handler.codec.http.websocketx; public class WebSocketServerProtocolHandler extends WebSocketProtocolHandler { @Override public void handlerAdded(ChannelHandlerContext ctx) { ChannelPipeline cp = ctx.pipeline(); if (cp.get(WebSocketServerProtocolHandshakeHandler.class) == null) { // Add the WebSocketHandshakeHandler before this one. cp.addBefore(ctx.name(), WebSocketServerProtocolHandshakeHandler.class.getName(), new WebSocketServerProtocolHandshakeHandler(serverConfig)); } //... } }我们来看看WebSocketServerProtocolHandshakeHandler都做了什么操作。channelRead方法会尝试接收一个FullHttpRequest对象,表示来自客户端的HTTP请求,随后服务器将会进行握手相关操作,此处省略了握手大部分代码,感兴趣的同学可以自行阅读。可以注意到,在确认握手成功后,channelRead将会调用两次fireUserEventTriggered,此方法将会触发自定义事件。其他(在此处理器之后)的处理器会触发userEventTriggered方法。其中一个方法传入了WebSocketServerProtocolHandler对象,此对象保存了HTTP请求相关信息。那么解决方案逐渐浮出水面,通过监听自定义事件即可实现检查握手的HTTP请求。package io.netty.handler.codec.http.websocketx; /** * Handles the HTTP handshake (the HTTP Upgrade request) for {@link WebSocketServerProtocolHandler}. */ class WebSocketServerProtocolHandshakeHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception { final FullHttpRequest req = (FullHttpRequest) msg; if (isNotWebSocketPath(req)) { ctx.fireChannelRead(msg); return; } try { //... if (!future.isSuccess()) { } else { localHandshakePromise.trySuccess(); // Kept for compatibility ctx.fireUserEventTriggered( WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE); ctx.fireUserEventTriggered( new WebSocketServerProtocolHandler.HandshakeComplete( req.uri(), req.headers(), handshaker.selectedSubprotocol())); } } finally { req.release(); } } }1.2 解决方案下面的代码展示了如何监听自定义事件。通过抛出异常可以终止链接,同时可以利用ctx向客户端以Websocket协议返回错误信息。因为此时握手已经完成,所以虽然这种方案简单的过分,但是效率并不高,耗费服务端资源(都握手了又给人家踢了QAQ)。private final class ServerHandler extends SimpleChannelInboundHandler<DeviceDataPacket> { @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) { // 在此处获取URL、Headers等信息并做校验,通过throw异常来中断链接。 } super.userEventTriggered(ctx, evt); } }1.3 ChannelInitializer实现附上Channel初始化代码作为参考。private final class ServerInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) { ch.pipeline() .addLast("http-codec", new HttpServerCodec()) .addLast("chunked-write", new ChunkedWriteHandler()) .addLast("http-aggregator", new HttpObjectAggregator(8192)) .addLast("log-handler", new LoggingHandler(LogLevel.WARN)) .addLast("ws-server-handler", new WebSocketServerProtocolHandler(endpointUri.getPath())) .addLast("server-handler", new ServerHandler()); } }方法二:基于新增安全检查处理器特点:使用相对复杂、校验在握手成功之前、失败信息可以通过HTTP返回客户端。2.1 解决方案编写一个入站处理器,接收FullHttpMessage消息,在Websocket处理器之前检测拦截请求信息。下面的例子主要做了四件事情:从HTTP请求中提取关心的数据安全检查将结果和其他数据绑定在Channel触发安全检查完毕自定义事件public class SecurityServerHandler extends ChannelInboundHandlerAdapter { private static final ObjectMapper json = new ObjectMapper(); public static final AttributeKey<SecurityCheckComplete> SECURITY_CHECK_COMPLETE_ATTRIBUTE_KEY = AttributeKey.valueOf("SECURITY_CHECK_COMPLETE_ATTRIBUTE_KEY"); @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if(msg instanceof FullHttpMessage){ //extracts device information headers HttpHeaders headers = ((FullHttpMessage) msg).headers(); String uuid = Objects.requireNonNull(headers.get("device-connection-uuid")); String devDescJson = Objects.requireNonNull(headers.get("device-description")); //deserialize device description DeviceDescription devDesc = json.readValue(devDescJson, DeviceDescriptionWithCertificate.class); //check ...... // SecurityCheckComplete complete = new SecurityCheckComplete(uuid, devDesc); ctx.channel().attr(SECURITY_CHECK_COMPLETE_ATTRIBUTE_KEY).set(complete); ctx.fireUserEventTriggered(complete); } //other protocols super.channelRead(ctx, msg); } @Getter @AllArgsConstructor public static final class SecurityCheckComplete { private String connectionUUID; private DeviceDescription deviceDescription; } }在业务逻辑处理器中,可以通过组合自定义的安全检查事件和Websocket握手完成事件。例如,在安全检查后进行下一步自定义业务检查,在握手完成后发送自定义内容等等,就看各位同学自由发挥了。private final class ServerHandler extends SimpleChannelInboundHandler<DeviceDataPacket> { public final AttributeKey<DeviceConnection> @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof SecurityCheckComplete){ log.info("Security check has passed"); SecurityCheckComplete complete = (SecurityCheckComplete) evt; listener.beforeConnect(complete.getConnectionUUID(), complete.getDeviceDescription()); } else if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) { log.info("Handshake has completed"); SecurityCheckComplete complete = ctx.channel().attr(SecurityServerHandler.SECURITY_CHECK_COMPLETE_ATTRIBUTE_KEY).get(); DeviceDataServer.this.listener.postConnect(complete.getConnectionUUID(), new DeviceConnection(ctx.channel(), complete.getDeviceDescription())); } super.userEventTriggered(ctx, evt); } }2.2 ChannelInitializer实现附上Channel初始化代码作为参考。private final class ServerInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) { ch.pipeline() .addLast("http-codec", new HttpServerCodec()) .addLast("chunked-write", new ChunkedWriteHandler()) .addLast("http-aggregator", new HttpObjectAggregator(8192)) .addLast("log-handler", new LoggingHandler(LogLevel.WARN)) .addLast("security-handler", new SecurityServerHandler()) .addLast("ws-server-handler", new WebSocketServerProtocolHandler(endpointUri.getPath())) .addLast("packet-codec", new DataPacketCodec()) .addLast("server-handler", new ServerHandler()); } }总结上述两种方式分别在握手完成后和握手之前拦截检查;实现复杂度和性能略有不同,可以通过具体业务需求选择合适的方法。Netty增强了责任链模式,使用userEvent传递自定义事件使得各个处理器之间减少耦合,更专注于业务。但是、相比于流动于各个处理器之间的"主线"数据来说,userEvent传递的"支线"数据往往不受关注。通过阅读Netty内置的各种处理器源码,探索其产生的事件,同时在开发过程中加以善用,可以减少冗余代码。另外在开发自定义的业务逻辑时,应该积极利用userEvent传递事件数据,降低各模块之间代码耦合。
一、问题复现1、描述两个一样的Consumer Group的Consumer订阅同一个Topic,但是是不同的tag,Consumer1订阅Topic的tag1,Consumer2订阅Topic的tag2,然后分别启动。这时候往Topic的tag1里发送10条数据,Topic的tag2里发送10条。目测应该是Consumer1和Consumer2分别收到对应的10条消息。结果却是只有Consumer2收到了消息,而且只收到了4-6条消息,不固定。2、代码2.1、Consumerpublic class Consumer { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-consumer"); consumer.setNamesrvAddr("124.57.180.156:9876"); consumer.subscribe("TopicTest2","tag1"); consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage( List<MessageExt> msgs, ConsumeConcurrentlyContext context) { MessageExt msg = msgs.get(0); System.out.println(msg.getTags()); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.out.println("ConsumerStarted."); } }启动这个订阅了TopicTest2的tag1标签的Consumer,然后将tag1改为tag2再次启动Consumer。这就相当于启动了两个Consumer进程,一个订阅了TopicTest2的tag1标签,另一个订阅了TopicTest2的tag2标签。2.2、Producerpublic class Producer { public static void main(String[] args) throws MQClientException { final DefaultMQProducer producer = new DefaultMQProducer("test-producer"); producer.setNamesrvAddr("124.57.180.156:9876"); producer.start(); for (int i = 0; i < 10; i++){ try { Message msg = new Message("TopicTest2", "tag1", ("Hello tag1 - "+i).getBytes()); SendResult sendResult = producer.send(msg); System.out.println(sendResult); }catch(Exception e) { e.printStackTrace(); } } } }启动Producer,往TopicTest2的tag1里发10条消息。再次将tag1改为tag2,然后再次启动Producer进行发送,这样就是TopicTest2的tag1下有10条消息,TopicTest2的tag2下也有10条消息。3、结果Consumer和Producer都启动后,发现如下:Producer发送了20条消息正常。Consumer1没有消费到tag1下的数据Consumer2消费了一半(不一定是几条,有时候5条,有时候6条的)消息。二、问题答案首先这是Broker决定的,而不是Consumer端决定的之前看过一篇文章写的有理有据,写的是Consumer端,还贴出了debug的源码,说后者覆盖了前者,但是我想说:你启动了两个独立的Consumer,那是两个独立的进程,根本不存在覆盖不覆盖的问题,那就是独立的。JVM就一个。又不是共享的JVM,何来覆盖?Consumer端发心跳给Broker,Broker收到后存到consumerTable里(就是个Map),key是GroupName,value是ConsumerGroupInfo。ConsumerGroupInfo里面是包含topic等信息的,但是问题就出在上一步骤,key是groupName,你同GroupName的话Broker心跳最后收到的Consumer会覆盖前者的。相当于如下代码:map.put(groupName, ConsumerGroupInfo);这样同key,肯定产生了覆盖。所以Consumer1不会收到任何消息,但是Consumer2为什么只收到了一半(不固定)消息呢?那是因为:你是集群模式消费,它会负载均衡分配到各个节点去消费,所以一半消息(不固定个数)跑到了Consumer1上,结果Consumer1订阅的是tag1,所以不会任何输出。如果换成BROADCASTING,那绝逼后者会收到全部消息,而不是一半,因为广播是广播全部Consumer。三、源码验证1、调用链# 核心在于如下这个方法 org.apache.rocketmq.broker.client.ConsumerManager#registerConsumer() # 关键调用链如下 # 入口是Broker启动的时候 org.apache.rocketmq.broker.BrokerStartup#start() org.apache.rocketmq.broker.BrokerController#start() org.apache.rocketmq.remoting.netty.NettyRemotingServer#start() org.apache.rocketmq.remoting.netty.NettyRemotingServer#prepareSharableHandlers() org.apache.rocketmq.remoting.netty.NettyRemotingServer.NettyServerHandler#channelRead0() org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#processMessageReceived() org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#processRequestCommand() org.apache.rocketmq.broker.processor.ClientManageProcessor#processRequest() org.apache.rocketmq.broker.processor.ClientManageProcessor#heartBeat() org.apache.rocketmq.broker.client.ConsumerManager#registerConsumer()2、源码2.1、registerConsumer/** * Consumer信息 */ public class ConsumerGroupInfo { // 组名 private final String groupName; // topic信息,比如topic、tag等 private final ConcurrentMap<String/* Topic */, SubscriptionData> subscriptionTable = new ConcurrentHashMap<String, SubscriptionData>(); // 客户端信息,比如clientId等 private final ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable = new ConcurrentHashMap<Channel, ClientChannelInfo>(16); // PULL/PUSH private volatile ConsumeType consumeType; // 消费模式:BROADCASTING/CLUSTERING private volatile MessageModel messageModel; // 消费到哪了 private volatile ConsumeFromWhere consumeFromWhere; } /** * 通过心跳将Consumer信息注册到Broker端。 */ public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo, ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere, final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) { // consumerTable:维护所有的Consumer ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group); // 如果没有Consumer,则put到map里 if (null == consumerGroupInfo) { ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere); // put到map里 ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp); consumerGroupInfo = prev != null ? prev : tmp; } // 更新Consumer信息,客户端信息 boolean r1 = consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel, consumeFromWhere); // 更新订阅Topic信息 boolean r2 = consumerGroupInfo.updateSubscription(subList); if (r1 || r2) { if (isNotifyConsumerIdsChangedEnable) { this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel()); } } this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList); return r1 || r2; }从这一步可以看出消费者信息是以groupName为key,ConsumerGroupInfo为value存到map(consumerTable)里的,那很明显了,后者肯定会覆盖前者的,因为key是一样的。而后者的tag是tag2,那肯定覆盖了前者的tag1,这部分是存到ConsumerGroupInfo的subscriptionTable里面的private final ConcurrentMap<String/* Topic */, SubscriptionData> subscriptionTable = new ConcurrentHashMap<String, SubscriptionData>();SubscriptionData包含了topic等信息public class SubscriptionData implements Comparable<SubscriptionData> { // topic private String topic; private String subString; // tags private Set<String> tagsSet = new HashSet<String>(); private Set<Integer> codeSet = new HashSet<Integer>(); }2.2、两个问题1.topic、tag等信息是怎么覆盖的?boolean r1 = consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,consumeFromWhere);/** * 其实很简单,就是以topic为key,SubscriptionData为value。而SubscriptionData里包含了tags信息,所以直接覆盖掉 */ public boolean updateSubscription(final Set<SubscriptionData> subList) { for (SubscriptionData sub : subList) { SubscriptionData old = this.subscriptionTable.get(sub.getTopic()); if (old == null) { SubscriptionData prev = this.subscriptionTable.putIfAbsent(sub.getTopic(), sub); } else if (sub.getSubVersion() > old.getSubVersion()) { this.subscriptionTable.put(sub.getTopic(), sub); } } }等等,这里好像有新发现ConsumerGroupInfo#subscriptionTable// {@link org.apache.rocketmq.broker.client.ConsumerGroupInfo#subscriptionTable} private final ConcurrentMap<String/* Topic */, SubscriptionData> subscriptionTable = new ConcurrentHashMap<String, SubscriptionData>();可以有意外收获就是topic作为map的key,那岂不是一个Consumer可以订阅多个Topic?是的,通过这段源码可以发现是没毛病的,我也测试过。2.这么看的话Consumer端只会存在一个进程,因为同组,注册进去就覆盖了呀?大哥,注意ConsumerGroupInfo里的channelInfoTable// 客户端信息,比如clientId等 private final ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable = new ConcurrentHashMap<Channel, ClientChannelInfo>(16);ClientChannelInfo是包含clientId等信息的,代表一个Consumer。注册方法是:boolean r2 = consumerGroupInfo.updateSubscription(subList);/** * 下面是删减后的代码,其实就是以Channel作为key,每个Consumer的Channel是不一样的。所以能存多个Consumer客户端 */ public boolean updateChannel(final ClientChannelInfo infoNew, ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere) { ClientChannelInfo infoOld = this.channelInfoTable.get(infoNew.getChannel()); if (null == infoOld) { ClientChannelInfo prev = this.channelInfoTable.put(infoNew.getChannel(), infoNew); } }END
小试牛刀1.构建一个springboot项目,并且引入jasypt依赖<dependency> <groupId>com.github.ulisesbocchio</groupId> <artifactId>jasypt-spring-boot-starter</artifactId> <version>3.0.2</version> </dependency>2.编写一个单元测试,用于获取加密后的账号密码StringEncryptor是jasypt-spring-boot-starter自动配置的加密工具,加密算法我们选择PBEWithHmacSHA512AndAES_128,password为123abcjasypt.encryptor.password=123abc jasypt.encryptor.algorithm=PBEWithHmacSHA512AndAES_128@SpringBootTest class SpringbootPropertiesEncApplicationTests { @Autowired private StringEncryptor stringEncryptor; @Test void contextLoads() { String sunshujie = stringEncryptor.encrypt("sunshujie"); String qwerty1234 = stringEncryptor.encrypt("qwerty1234"); System.out.println(sunshujie); System.out.println(qwerty1234); } }3.在application.properties中配置加密后的账号密码jasypt.encryptor.password=123abc jasypt.encryptor.algorithm=PBEWithHmacSHA512AndAES_128 username=ENC(pXDnpH3GdMDBHdxraKyAt7IKCeX8mVlM9A9PeI9Ow2VUoBHRESQ5m8qhrbp45vH+) password=ENC(qD55H3EKYpxp9cGBqpOfR2pqD/AgqT+IyClWKdW80MkHx5jXEViaJTAx6Es4/ZJt)4.观察在程序中是否能够拿到解密后的账号密码@SpringBootApplication public class SpringbootPropertiesEncApplication implements CommandLineRunner { private static final Logger logger = LoggerFactory.getLogger(SpringbootPropertiesEncApplication.class); public static void main(String[] args) { SpringApplication.run(SpringbootPropertiesEncApplication.class, args); } @Value("${password}") private String password; @Value("${username}") private String username; @Override public void run(String... args) throws Exception { logger.info("username: {} , password: {} ", username, password); } }原理解析加密原理首先看jasypt相关的配置,分别是password和加密算法jasypt.encryptor.password=123abc jasypt.encryptor.algorithm=PBEWithHmacSHA512AndAES_128PBEWithHmacSHA512AndAES_128是此次我们选用的加密算法.123abc是PBEWithHmacSHA512AndAES_128加密过程中用的加密密码.PBE是基于密码的加密算法,密码和秘钥相比有什么好处呢?好处就是好记…PBE加密流程如下密码加盐密码加盐结果做摘要获取秘钥用秘钥对称加密原文,然后和盐拼在一起得到密文PBE解密流程如下从密文获取盐密码+盐摘要获取秘钥密文通过秘钥解密获取原文再来看PBEWithHmacSHA512AndAES_128,名字就是加密过程中用的具体算法PBE是指用的是PBE加密算法HmacSHA512是指摘要算法,用于获取秘钥AES_128是对称加密算法jasypt-spring-boot-starter原理先从spring.factories文件入手查看自动配置类org.springframework.boot.autoconfigure.EnableAutoConfiguration=com.ulisesbocchio.jasyptspringboot.JasyptSpringBootAutoConfigurationJasyptSpringBootAutoConfiguration配置仅仅使用@Import注解引入另一个配置类EnableEncryptablePropertiesConfiguration.@Configuration @Import({EnableEncryptablePropertiesConfiguration.class}) public class JasyptSpringBootAutoConfiguration { public JasyptSpringBootAutoConfiguration() { } }从配置类EnableEncryptablePropertiesConfiguration可以看到有两个操作1.@Import了EncryptablePropertyResolverConfiguration.class, CachingConfiguration.class2.注册了一个BeanFactoryPostProcessor -> EnableEncryptablePropertiesBeanFactoryPostProcessor@Configuration @Import({EncryptablePropertyResolverConfiguration.class, CachingConfiguration.class}) public class EnableEncryptablePropertiesConfiguration { private static final Logger log = LoggerFactory.getLogger(EnableEncryptablePropertiesConfiguration.class); public EnableEncryptablePropertiesConfiguration() { } @Bean public static EnableEncryptablePropertiesBeanFactoryPostProcessor enableEncryptablePropertySourcesPostProcessor(ConfigurableEnvironment environment) { boolean proxyPropertySources = (Boolean)environment.getProperty("jasypt.encryptor.proxy-property-sources", Boolean.TYPE, false); InterceptionMode interceptionMode = proxyPropertySources ? InterceptionMode.PROXY : InterceptionMode.WRAPPER; return new EnableEncryptablePropertiesBeanFactoryPostProcessor(environment, interceptionMode); } }先看EncryptablePropertyResolverConfiguration.classlazyEncryptablePropertyDetector这里有配置文件中ENC()写法的出处.从名称来看是用来找到哪些配置需要解密.从代码来看,不一定非得用ENC()把密文包起来, 也可以通过配置来指定其他前缀和后缀jasypt.encryptor.property.prefix jasypt.encryptor.property.suffix@Bean( name = {"lazyEncryptablePropertyDetector"} ) public EncryptablePropertyDetector encryptablePropertyDetector(EncryptablePropertyResolverConfiguration.EnvCopy envCopy, BeanFactory bf) { String prefix = envCopy.get().resolveRequiredPlaceholders("${jasypt.encryptor.property.prefix:ENC(}"); String suffix = envCopy.get().resolveRequiredPlaceholders("${jasypt.encryptor.property.suffix:)}"); String customDetectorBeanName = envCopy.get().resolveRequiredPlaceholders(DETECTOR_BEAN_PLACEHOLDER); boolean isCustom = envCopy.get().containsProperty("jasypt.encryptor.property.detector-bean"); return new DefaultLazyPropertyDetector(prefix, suffix, customDetectorBeanName, isCustom, bf); }另外还配置了很多bean,先记住这两个重要的bean.带着疑问往后看.lazyEncryptablePropertyResolver 加密属性解析器lazyEncryptablePropertyFilter 加密属性过滤器@Bean( name = {"lazyEncryptablePropertyResolver"} ) public EncryptablePropertyResolver encryptablePropertyResolver(@Qualifier("lazyEncryptablePropertyDetector") EncryptablePropertyDetector propertyDetector, @Qualifier("lazyJasyptStringEncryptor") StringEncryptor encryptor, BeanFactory bf, EncryptablePropertyResolverConfiguration.EnvCopy envCopy, ConfigurableEnvironment environment) { String customResolverBeanName = envCopy.get().resolveRequiredPlaceholders(RESOLVER_BEAN_PLACEHOLDER); boolean isCustom = envCopy.get().containsProperty("jasypt.encryptor.property.resolver-bean"); return new DefaultLazyPropertyResolver(propertyDetector, encryptor, customResolverBeanName, isCustom, bf, environment); } @Bean( name = {"lazyEncryptablePropertyFilter"} ) public EncryptablePropertyFilter encryptablePropertyFilter(EncryptablePropertyResolverConfiguration.EnvCopy envCopy, ConfigurableBeanFactory bf, @Qualifier("configPropsSingleton") Singleton<JasyptEncryptorConfigurationProperties> configProps) { String customFilterBeanName = envCopy.get().resolveRequiredPlaceholders(FILTER_BEAN_PLACEHOLDER); boolean isCustom = envCopy.get().containsProperty("jasypt.encryptor.property.filter-bean"); FilterConfigurationProperties filterConfig = ((JasyptEncryptorConfigurationProperties)configProps.get()).getProperty().getFilter(); return new DefaultLazyPropertyFilter(filterConfig.getIncludeSources(), filterConfig.getExcludeSources(), filterConfig.getIncludeNames(), filterConfig.getExcludeNames(), customFilterBeanName, isCustom, bf); }再看EnableEncryptablePropertiesBeanFactoryPostProcessor这个类是一个BeanFactoryPostProcessor实现了Ordered,是最低优先级,会在其他BeanFactoryPostProcessor执行之后再执行postProcessBeanFactory方法中获取了上面提到的两个重要的bean, lazyEncryptablePropertyResolver lazyEncryptablePropertyFilter从environment中获取了PropertySources调用工具类进行转换PropertySources, 也就是把密文转换为原文public class EnableEncryptablePropertiesBeanFactoryPostProcessor implements BeanFactoryPostProcessor, Ordered { // ignore some code public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException { LOG.info("Post-processing PropertySource instances"); EncryptablePropertyResolver propertyResolver = (EncryptablePropertyResolver)beanFactory.getBean("lazyEncryptablePropertyResolver", EncryptablePropertyResolver.class); EncryptablePropertyFilter propertyFilter = (EncryptablePropertyFilter)beanFactory.getBean("lazyEncryptablePropertyFilter", EncryptablePropertyFilter.class); MutablePropertySources propSources = this.environment.getPropertySources(); EncryptablePropertySourceConverter.convertPropertySources(this.interceptionMode, propertyResolver, propertyFilter, propSources); } public int getOrder() { return 2147483547; } }再看工具类EncryptablePropertySourceConverter1.过滤所有已经是EncryptablePropertySource的PropertySource2.转换为EncryptablePropertySource3.用EncryptablePropertySource从PropertySources中替换原PropertySourcepublic static void convertPropertySources(InterceptionMode interceptionMode, EncryptablePropertyResolver propertyResolver, EncryptablePropertyFilter propertyFilter, MutablePropertySources propSources) { ((List)StreamSupport.stream(propSources.spliterator(), false).filter((ps) -> { return !(ps instanceof EncryptablePropertySource); }).map((ps) -> { return makeEncryptable(interceptionMode, propertyResolver, propertyFilter, ps); }).collect(Collectors.toList())).forEach((ps) -> { propSources.replace(ps.getName(), ps); }); }关键方法在makeEncryptable中,调用链路很长, 这里选取一条链路跟一下.ulisesbocchio.jasyptspringboot.EncryptablePropertySourceConverter#makeEncryptablecom.ulisesbocchio.jasyptspringboot.EncryptablePropertySourceConverter#convertPropertySourcecom.ulisesbocchio.jasyptspringboot.EncryptablePropertySourceConverter#proxyPropertySourcecom.ulisesbocchio.jasyptspringboot.aop.EncryptablePropertySourceMethodInterceptor#invokecom.ulisesbocchio.jasyptspringboot.caching.CachingDelegateEncryptablePropertySource#getPropertycom.ulisesbocchio.jasyptspringboot.EncryptablePropertySource#getProperty()看到最后豁然开朗,发现就是用的最开始配置的DefaultLazyPropertyResolver进行密文解析.直接看最终的实现 DefaultPropertyResolver据lazyEncryptablePropertyDetector过滤需要解密的配置用lazyEncryptablePropertyDetector去掉前缀后缀替换占位符解密public String resolvePropertyValue(String value) { Optional var10000 = Optional.ofNullable(value); Environment var10001 = this.environment; var10001.getClass(); var10000 = var10000.map(var10001::resolveRequiredPlaceholders); EncryptablePropertyDetector var2 = this.detector; var2.getClass(); return (String)var10000.filter(var2::isEncrypted).map((resolvedValue) -> { try { String unwrappedProperty = this.detector.unwrapEncryptedValue(resolvedValue.trim()); String resolvedProperty = this.environment.resolveRequiredPlaceholders(unwrappedProperty); return this.encryptor.decrypt(resolvedProperty); } catch (EncryptionOperationNotPossibleException var5) { throw new DecryptionException("Unable to decrypt: " + value + ". Decryption of Properties failed, make sure encryption/decryption passwords match", var5); } }).orElse(value); }解惑1.加密配置文件能否使用摘要算法,例如md5?不能, 配置文件加密是需要解密的,例如数据库连接信息加密,如果不解密,springboot程序无法读取到真正的数据库连接信息,也就无法建立连接.2.加密配置文件能否直接使用对称加密,不用PBE?可以, PBE的好处就是密码好记.3.jasypt.encryptor.password可以泄漏吗?不能, 泄漏了等于没有加密.4.例子中jasypt.encryptor.password配置在配置文件中不就等于泄漏了吗?是这样的,需要在流程上进行控制.springboot打包时千万不要把jasypt.encryptor.password打入jar包内.在公司具体的流程可能是这样的:运维人员持有jasypt.encryptor.password,加密原文获得密文运维人员将密文发给开发人员开发人员在配置文件中只配置密文,不配置jasypt.encryptor.password运维人员启动应用时再配置jasypt.encryptor.password如果有其他疑惑欢迎留言提问, 另外由于作者水平有限难免有疏漏, 欢迎留言纠错。END
1.微服务限流随着微服务的流行,服务和服务之间的稳定性变得越来越重要。缓存、降级和限流是保护微服务系统运行稳定性的三大利器。缓存的目的是提升系统访问速度和增大系统能处理的容量,而降级是当服务出问题或者影响到核心流程的性能则需要暂时屏蔽掉,待高峰或者问题解决后再打开,而有些场景并不能用缓存和降级来解决,比如稀缺资源、数据库的写操作、频繁的复杂查询,因此需有一种手段来限制这些场景的请求量,即限流。比如当我们设计了一个函数,准备上线,这时候这个函数会消耗一些资源,处理上限是1秒服务3000个QPS,但如果实际情况遇到高于3000的QPS该如何解决呢?所以限流的目的应当是通过对并发访问/请求进行限速或者一个时间窗口内的的请求进行限速来保护系统,一旦达到限制速率就可以拒绝服务、等待、降级。学习如何去实现一个分布式限流框架,首先,我们需要去了解最基本的两种限流算法。2.限流算法2.1漏桶算法漏桶算法思路很简单,水(也就是请求)先进入到漏桶里,漏桶以一定的速度出水,当水流入速度过大会直接溢出,然后就拒绝请求,可以看出漏桶算法能强行限制数据的传输速率。示意图(来源网络)如下:2.2令牌桶算法令牌桶算法和漏桶算法效果一样但方向相反的算法,更加容易理解。随着时间流逝,系统会按恒定1/QPS时间间隔(如果QPS=100,则间隔是10ms)往桶里加入令牌(想象和漏洞漏水相反,有个水龙头在不断的加水),如果桶已经满了就不再加了。新请求来临时,会各自拿走一个令牌,如果没有令牌可拿了就阻塞或者拒绝服务。示意图(来源网络)如下:2.3算法选择漏桶算法与令牌桶算法的区别在于,漏桶算法能够强行限制数据的传输速率,令牌桶算法能够在限制数据的平均传输速率的同时还允许某种程度的突发情况。令牌桶还有一个好处是可以方便的改变速度。一旦需要提高速率,则按需提高放入桶中的令牌的速率。所以,限流框架的核心算法还是以令牌桶算法为主。3.本地限流已知上面讲述的令牌桶算法的原理,如何通过代码实现?本地限流的实现可以用Long长整型作为令牌桶,为了达到无锁,建议使用Long的原子类型AtomicLong,使用AtomicLong的好处就是可以非常方便的对其进行CAS加操作与CAS减操作(也就是令牌桶令牌的放入与拿取),以避免线程的上下文切换的开销,核心CAS算法如下:private boolean tryAcquireFailed() { long l = bucket.longValue(); while (l > 0) { if (bucket.compareAndSet(l, l - 1)) { return true; } l = bucket.longValue(); } return false; }根据上述了解的令牌桶算法可以得知,令牌桶需要一个ScheduledThread不断的放入令牌,这部分的代码如下:ScheduledThreadExecutor.scheduleAtFixedRate(() -> bucket.set(rule.getLimit()), rule.getInitialDelay(), rule.getPeriod(), rule.getUnit() );4.分布式限流概述分布式限流需要解决什么问题呢?我想至少有下面几个:1.动态规则:比如限流的QPS我们希望可以动态修改,限流的功能可以随时开启、关闭,限流的规则可以跟随业务进行动态变更等。2.集群限流:比如对Spring Cloud微服务架构中的某服务下的所有实例进行统一限流,以控制后续访问数据库的流量。3.熔断降级:比如在调用链路中某个资源出现不稳定状态时(例如调用超时或异常比例升高),对这个资源的调用进行限制,让请求快速失败,避免影响到其它的资源而导致级联错误。可选的其它几个功能,诸如实时监控数据、网关流控、热点参数限流、系统自适应限流、黑白名单控制、注解支持等,这些功能其实可以非常方便的进行扩展。5.分布式限流方案分布式限流的思想我列举下面三个方案:1.Redis令牌桶这种方案是最简单的一种集群限流思想。在本地限流中,我们使用Long的原子类作令牌桶,当实例数量超过1,我们就考虑将Redis用作公共内存区域,进行读写。涉及到的并发控制,也可以使用Redis实现分布式锁。方案的缺点显而易见,每取一次令牌都会进行一次网络开销,而网络开销起码是毫秒级,所以这种方案支持的并发量是非常有限的。2.QPS统一分配这种方案的思想是将集群限流最大程度的本地化。举个例子,我们有两台服务器实例,对应的是同一个应用程序(Application.name相同),程序中设置的QPS为100,将应用程序与同一个控制台程序进行连接,控制台端依据应用的实例数量将QPS进行均分,动态设置每个实例的QPS为50,若是遇到两个服务器的配置并不相同,在负载均衡层的就已经根据服务器的优劣对流量进行分配,例如一台分配70%流量,另一台分配30%的流量。面对这种情况,控制台也可以对其实行加权分配QPS的策略。客观来说,这是一种集群限流的实现方案,但依旧存在不小的问题。该模式的分配比例是建立在大数据流量下的趋势进行分配,实际情况中可能并不是严格的五五分或三七分,误差不可控,极容易出现用户连续访问某一台服务器遇到请求驳回而另一台服务器此刻空闲流量充足的尴尬情况。3.发票服务器这种方案的思想是建立在Redis令牌桶方案的基础之上的。如何解决每次取令牌都伴随一次网络开销,该方案的解决方法是建立一层控制端,利用该控制端与Redis令牌桶进行交互,只有当客户端的剩余令牌数不足时,客户端才向该控制层取令牌并且每次取一批。这种思想类似于Java集合框架的数组扩容,设置一个阈值,只有当超过该临界值时,才会触发异步调用。其余存取令牌的操作与本地限流无二。虽然该方案依旧存在误差,但误差最大也就一批次令牌数而已。6.开源项目上面说了三种分布式限流方案的实现思路,这里推荐一个基于发票服务器思想实现的分布式限流项目SnowJean(https://github.com/yueshutong/SnowJena)。笔者通过该项目源码观察到该限流项目在解决分布式限流上的有很多巧妙的点,比如,SnowJean内部使用观察者模式实现动态规则配置,使用工厂模式实现限流器的构造,使用建造者模式构建限流规则。在解决如何对客户端实例的健康状况进行检查时,利用的是Redis的过期时间与客户端发送的心跳包(发送心跳时再进行延期)。比较不错的一点是,该项目提供基于前端Echarts图表的QPS视图展示,如下图。
以下是正文:在实际的开发当中,我们经常需要进行磁盘数据的读取和搜索,因此经常会有出现从数据库读取数据的场景出现。但是当数据访问量次数增大的时候,过多的磁盘读取可能会最终成为整个系统的性能瓶颈,甚至是压垮整个数据库,导致系统卡死等严重问题。常规的应用系统中,我们通常会在需要的时候对数据库进行查找,因此系统的大致结构如下所示:当数据量较高的时候,需要减少对于数据库里面的磁盘读写操作,因此通常都会选择在业务系统和MySQL数据库之间加入一层缓存从而减少对数据库方面的访问压力。但是很多时候,缓存在实际项目中的应用并非这么简单。下边我们来通过几个比较经典的缓存应用场景来列举一些问题:1.缓存和数据库之间数据一致性问题常用于缓存处理的机制我总结为了以下几种:Cache AsideRead ThroughWrite ThroughWrite Behind Caching首先来简单说说Cache aside的这种方式:Cache Aside模式这种模式处理缓存通常都是先从数据库缓存查询,如果缓存没有命中则从数据库中进行查找。这里面会发生的三种情况如下:缓存命中:当查询的时候发现缓存存在,那么直接从缓存中提取。缓存失效:当缓存没有数据的时候,则从database里面读取源数据,再加入到cache里面去。缓存更新:当有新的写操作去修改database里面的数据时,需要在写操作完成之后,让cache里面对应的数据失效。这种Cache aside模式通常是我们在实际应用开发中最为常用到的模式。但是并非说这种模式的缓存处理就一定能做到完美。关于这种模式下依然会存在缺陷。比如,一个是读操作,但是没有命中缓存,然后就到数据库中取数据,此时来了一个写操作,写完数据库后,让缓存失效,然后,之前的那个读操作再把老的数据放进去,所以,会造成脏数据。Facebook的大牛们也曾经就缓存处理这个问题发表过相关的论文,链接如下:https://www.usenix.org/system/files/conference/nsdi13/nsdi13-final170_update.pdf分布式环境中要想完全的保证数据一致性是一件极为困难的事情,我们只能够尽可能的减低这种数据不一致性问题产生的情况。Read Through模式Read Through模式是指应用程序始终从缓存中请求数据。 如果缓存没有数据,则它负责使用底层提供程序插件从数据库中检索数据。 检索数据后,缓存会自行更新并将数据返回给调用应用程序。使用Read Through 有一个好处。我们总是使用key从缓存中检索数据, 调用的应用程序不知道数据库, 由存储方来负责自己的缓存处理,这使代码更具可读性, 代码更清晰。但是这也有相应的缺陷,开发人员需要给编写相关的程序插件,增加了开发的难度性。Write Through模式Write Through模式和Read Through模式类似,当数据发生更新的时候,先去Cache里面进行更新,如果命中了,则先更新缓存再由Cache方来更新database。如果没有命中的话,就直接更新Cache里面的数据。Write Behind Caching模式Write Behind Caching 这种模式通常是先将数据写入到缓存里面,然后再异步的写入到database中进行数据同步,这样的设计既可以直接的减少我们对于数据的database里面的直接访问,降低压力,同时对于database的多次修改可以进行合并操作,极大的提升了系统的承载能力。但是这种模式处理缓存数据具有一定的风险性,例如说当cache机器出现宕机的时候,数据会有丢失的可能。2.缓存穿透问题在高并发的场景中,缓存穿透是一个经常都会遇到的问题。什么是缓存穿透?大量的请求在缓存中没有查询到指定的数据,因此需要从数据库中进行查询,造成缓存穿透。会造成什么后果?大量的请求短时间内涌入到database中进行查询会增加database的压力,最终导致database无法承载客户单请求的压力,出现宕机卡死等现象。常用的解决方案通常有以下几类:1.空值缓存在某些特定的业务场景中,对于数据的查询可能会是空的,没有实际的存在,并且这类数据信息在短时间进行多次的反复查询也不会有变化,那么整个过程中,多次的请求数据库操作会显得有些多余。不妨可以将这些空值(没有查询结果的数据)对应的key存储在缓存中,那么第二次查找的时候就不需要再次请求到database那么麻烦,只需要通过内存查询即可。这样的做法能够大大减少对于database的访问压力。2.布隆过滤器通常对于database里面的数据的key值可以预先存储在布隆过滤器里面去,然后先在布隆过滤器里面进行过滤,如果发现布隆过滤器中没有的话,就再去redis里面进行查询,如果redis中也没有数据的话,再去database查询。这样可以避免不存在的数据信息也去往存储库中进行查询情况。关于布隆过滤器的学习可以参考下我的这篇笔记:https://blog.csdn.net/Danny_idea/article/details/889466733.缓存雪崩场景什么是缓存雪崩?当缓存服务器重启或者大量缓存集中在某一个时间段失效,这样在失效的时候,也会给后端系统(比如DB)带来很大压力。如何避免缓存雪崩问题?1.使用加锁队列来应付这种问题。当有多个请求涌入的时候,当缓存失效的时候加入一把分布式锁,只允许抢锁成功的请求去库里面读取数据然后将其存入缓存中,再释放锁,让后续的读请求从缓存中取数据。但是这种做法有一定的弊端,过多的读请求线程堵塞,将机器内存占满,依然没有能够从根本上解决问题。2.在并发场景发生前,先手动触发请求,将缓存都存储起来,以减少后期请求对database的第一次查询的压力。数据过期时间设置尽量分散开来,不要让数据出现同一时间段出现缓存过期的情况。3.从缓存可用性的角度来思考,避免缓存出现单点故障的问题,可以结合使用 主从+哨兵的模式来搭建缓存架构,但是这种模式搭建的缓存架构有个弊端,就是无法进行缓存分片,存储缓存的数据量有限制,因此可以升级为Redis Cluster架构来进行优化处理。(需要结合企业实际的经济实力,毕竟Redis Cluster的搭建需要更多的机器)4.Ehcache本地缓存 + Hystrix限流&降级,避免MySQL被打死。使用 Ehcache本地缓存的目的也是考虑在 Redis Cluster 完全不可用的时候,Ehcache本地缓存还能够支撑一阵。使用 Hystrix进行限流 & 降级 ,比如一秒来了5000个请求,我们可以设置假设只能有一秒 2000个请求能通过这个组件,那么其他剩余的 3000 请求就会走限流逻辑。然后去调用我们自己开发的降级组件(降级),比如设置的一些默认值呀之类的。以此来保护最后的 MySQL 不会被大量的请求给打死。
前言这其实是一道面试题,是我在面试百度的时候被问到的,当时没有答出来(因为自己真的很菜),后来在网上寻找答案,看到也是一头雾水,直到看到了《Spring in action》这本书,书上有对Bean声明周期的大致解释,但是没有代码分析,所以就自己上网寻找资料,一定要把这个Bean生命周期弄明白!网上大部分都是验证的Bean 在面试问的生命周期,其实查阅JDK还有一个完整的Bean生命周期,这同时也验证了书是具有片面性的,最fresh 的资料还是查阅原始JDK!!!一、Bean 的完整生命周期在传统的Java应用中,bean的生命周期很简单,使用Java关键字 new 进行Bean 的实例化,然后该Bean 就能够使用了。一旦bean不再被使用,则由Java自动进行垃圾回收。相比之下,Spring管理Bean的生命周期就复杂多了,正确理解Bean 的生命周期非常重要,因为Spring对Bean的管理可扩展性非常强,下面展示了一个Bean的构造过程Bean 的生命周期如上图所示,Bean 的生命周期还是比较复杂的,下面来对上图每一个步骤做文字描述:Spring启动,查找并加载需要被Spring管理的bean,进行Bean的实例化Bean实例化后对将Bean的引入和值注入到Bean的属性中如果Bean实现了BeanNameAware接口的话,Spring将Bean的Id传递给setBeanName()方法如果Bean实现了BeanFactoryAware接口的话,Spring将调用setBeanFactory()方法,将BeanFactory容器实例传入如果Bean实现了ApplicationContextAware接口的话,Spring将调用Bean的setApplicationContext()方法,将bean所在应用上下文引用传入进来。如果Bean实现了BeanPostProcessor接口,Spring就将调用他们的postProcessBeforeInitialization()方法。如果Bean 实现了InitializingBean接口,Spring将调用他们的afterPropertiesSet()方法。类似的,如果bean使用init-method声明了初始化方法,该方法也会被调用如果Bean 实现了BeanPostProcessor接口,Spring就将调用他们的postProcessAfterInitialization()方法。此时,Bean已经准备就绪,可以被应用程序使用了。他们将一直驻留在应用上下文中,直到应用上下文被销毁。如果bean实现了DisposableBean接口,Spring将调用它的destory()接口方法,同样,如果bean使用了destory-method 声明销毁方法,该方法也会被调用。上面是Spring 中Bean的核心接口和生命周期,面试回答上述过程已经足够了。但是翻阅JavaDoc文档发现除了以上接口外,还有另外的初始化过程涉及的接口:摘自org.springframework.beans.factory.BeanFactory, 全部相关接口如下,上述已有的就不用着重标注,把额外的相关接口着重标注下Bean 完整的生命周期文字解释如下:————————————初始化————————————BeanNameAware.setBeanName() 在创建此bean的bean工厂中设置bean的名称,在普通属性设置之后调用,在InitializinngBean.afterPropertiesSet()方法之前调用BeanClassLoaderAware.setBeanClassLoader(): 在普通属性设置之后,InitializingBean.afterPropertiesSet()之前调用BeanFactoryAware.setBeanFactory() : 回调提供了自己的bean实例工厂,在普通属性设置之后,在InitializingBean.afterPropertiesSet()或者自定义初始化方法之前调用EnvironmentAware.setEnvironment(): 设置environment在组件使用时调用EmbeddedValueResolverAware.setEmbeddedValueResolver(): 设置StringValueResolver 用来解决嵌入式的值域问题ResourceLoaderAware.setResourceLoader(): 在普通bean对象之后调用,在afterPropertiesSet 或者自定义的init-method 之前调用,在 ApplicationContextAware 之前调用。ApplicationEventPublisherAware.setApplicationEventPublisher(): 在普通bean属性之后调用,在初始化调用afterPropertiesSet 或者自定义初始化方法之前调用。在 ApplicationContextAware 之前调用。MessageSourceAware.setMessageSource(): 在普通bean属性之后调用,在初始化调用afterPropertiesSet 或者自定义初始化方法之前调用,在 ApplicationContextAware 之前调用。ApplicationContextAware.setApplicationContext(): 在普通Bean对象生成之后调用,在InitializingBean.afterPropertiesSet之前调用或者用户自定义初始化方法之前。在ResourceLoaderAware.setResourceLoader,ApplicationEventPublisherAware.setApplicationEventPublisher,MessageSourceAware之后调用。ServletContextAware.setServletContext(): 运行时设置ServletContext,在普通bean初始化后调用,在InitializingBean.afterPropertiesSet之前调用,在 ApplicationContextAware 之后调用注:是在WebApplicationContext 运行时BeanPostProcessor.postProcessBeforeInitialization() : 将此BeanPostProcessor 应用于给定的新bean实例 在任何bean初始化回调方法(像是InitializingBean.afterPropertiesSet或者自定义的初始化方法)之前调用。这个bean将要准备填充属性的值。返回的bean示例可能被普通对象包装,默认实现返回是一个bean。BeanPostProcessor.postProcessAfterInitialization() : 将此BeanPostProcessor 应用于给定的新bean实例 在任何bean初始化回调方法(像是InitializingBean.afterPropertiesSet或者自定义的初始化方法)之后调用。这个bean将要准备填充属性的值。返回的bean示例可能被普通对象包装InitializingBean.afterPropertiesSet(): 被BeanFactory在设置所有bean属性之后调用(并且满足BeanFactory 和 ApplicationContextAware)。————————————销毁————————————在BeanFactory 关闭的时候,Bean的生命周期会调用如下方法:DestructionAwareBeanPostProcessor.postProcessBeforeDestruction(): 在销毁之前将此BeanPostProcessor 应用于给定的bean实例。能够调用自定义回调,像是DisposableBean 的销毁和自定义销毁方法,这个回调仅仅适用于工厂中的单例bean(包括内部bean)实现了自定义的destory()方法二、Bean 的生命周期验证为了验证Bean生命周期的过程,有两种形式:一种是为面试而准备的,一种是为了解全过程而准备的,下面来看代码:Book.classpublic class Book implements BeanNameAware,BeanFactoryAware, ApplicationContextAware,InitializingBean,DisposableBean { private String bookName; public Book(){ System.out.println("Book Initializing "); } public void setBeanFactory(BeanFactory beanFactory) throws BeansException { System.out.println("Book.setBeanFactory invoke"); } public void setBeanName(String name) { System.out.println("Book.setBeanName invoke"); } public void destroy() throws Exception { System.out.println("Book.destory invoke"); } public void afterPropertiesSet() throws Exception { System.out.println("Book.afterPropertiesSet invoke"); } public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { System.out.println("Book.setApplicationContext invoke"); } public String getBookName() { return bookName; } public void setBookName(String bookName) { this.bookName = bookName; System.out.println("setBookName: Book name has set."); } public void myPostConstruct(){ System.out.println("Book.myPostConstruct invoke"); } // 自定义初始化方法 @PostConstruct public void springPostConstruct(){ System.out.println("@PostConstruct"); } public void myPreDestory(){ System.out.println("Book.myPreDestory invoke"); System.out.println("---------------destroy-----------------"); } // 自定义销毁方法 @PreDestroy public void springPreDestory(){ System.out.println("@PreDestory"); } @Override protected void finalize() throws Throwable { System.out.println("------inside finalize-----"); } }自定义实现BeanPostProcessor 的MyBeanPostProcessor:public class MyBeanPostProcessor implements BeanPostProcessor { // 容器加载的时候会加载一些其他的bean,会调用初始化前和初始化后方法 // 这次只关注book(bean)的生命周期 public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { if(bean instanceof Book){ System.out.println("MyBeanPostProcessor.postProcessBeforeInitialization"); } return bean; } public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { if(bean instanceof Book){ System.out.println("MyBeanPostProcessor.postProcessAfterInitialization"); } return bean; } }在resources 目录下新建Bean-Lifecycle.xml<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"> <!-- 扫描bean --> <context:component-scan base-package="com.bean.lifecycle"/> <!-- 实现了用户自定义初始化和销毁方法 --> <bean id="book" class="com.bean.lifecycle.Book" init-method="myPostConstruct" destroy-method="myPreDestory"> <!-- 注入bean 属性名称 --> <property name="bookName" value="thingking in java" /> </bean> <!--引入自定义的BeanPostProcessor--> <bean class="com.bean.lifecycle.MyBeanPostProcessor"/> </beans>做一个启动类的测试,新建SpringBeanLifecycleApplicationpublic class SpringBeanLifecycleApplication { public static void main(String[] args) throws InterruptedException { // 为面试而准备的Bean生命周期加载过程 ApplicationContext context = new ClassPathXmlApplicationContext("Bean-Lifecycle.xml"); Book book = (Book)context.getBean("book"); System.out.println("Book name = " + book.getBookName()); ((ClassPathXmlApplicationContext) context).destroy(); } }启动测试,输出结果如下:Book Initializing setBookName: Book name has set. Book.setBeanName invoke Book.setBeanFactory invoke Book.setApplicationContext invoke MyBeanPostProcessor.postProcessBeforeInitialization @PostConstruct Book.afterPropertiesSet invoke Book.myPostConstruct invoke MyBeanPostProcessor.postProcessAfterInitialization Book name = thingking in java @PreDestory Book.destory invoke Book.myPreDestory invoke ---------------destroy-----------------为了验证Bean完整的生命周期,需要新建一个SubBookClass 继承Book类public class SubBookClass extends Book implements BeanClassLoaderAware, EnvironmentAware,EmbeddedValueResolverAware,ResourceLoaderAware, ApplicationEventPublisherAware,MessageSourceAware{ private String bookSystem; public String getBookSystem() { return bookSystem; } public void setBookSystem(String bookSystem) { System.out.println("设置BookSystem 的属性值"); this.bookSystem = bookSystem; } public void setBeanClassLoader(ClassLoader classLoader) { System.out.println("SubBookClass.setBeanClassLoader() 方法被调用了"); } public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) { System.out.println("SubBookClass.setApplicationEventPublisher() 方法被调用了"); } public void setEmbeddedValueResolver(StringValueResolver resolver) { System.out.println("SubBookClass.setEmbeddedValueResolver() 方法被调用了"); } public void setEnvironment(Environment environment) { System.out.println("SubBookClass.setEnvironment() 方法被调用了"); } public void setMessageSource(MessageSource messageSource) { System.out.println("SubBookClass.setMessageSource() 方法被调用了"); } public void setResourceLoader(ResourceLoader resourceLoader) { System.out.println("SubBookClass.setResourceLoader() 方法被调用了"); } }上述SubBookClass类与Book是互补关系。新建一个SubBean-Lifecycle.xml,注入SubBookClass<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"> <bean id="bookClass" class="com.bean.lifecycle.SubBookClass" init-method="myPostConstruct" destroy-method="myPreDestory"> <property name="bookSystem" value="Java System" /> </bean> <bean class="com.bean.lifecycle.MyBeanPostProcessor"/> </beans>完整的SpringBeanLifecycleApplication 如下:public class SpringBeanLifecycleApplication { public static void main(String[] args) throws InterruptedException { // 为面试而准备的Bean生命周期加载过程 ApplicationContext context = new ClassPathXmlApplicationContext("Bean-Lifecycle.xml"); Book book = (Book)context.getBean("book"); System.out.println("Book name = " + book.getBookName()); ((ClassPathXmlApplicationContext) context).destroy(); // 完整的加载过程,当然了解的越多越好 ApplicationContext applicationContext = new ClassPathXmlApplicationContext("SubBean-Lifecycle.xml"); SubBookClass subBookClass = (SubBookClass) applicationContext.getBean("bookClass"); System.out.println("BookSystemName = " + subBookClass.getBookSystem()); ((ClassPathXmlApplicationContext) applicationContext).registerShutdownHook(); } }输出完整的结果:Book Initializing setBookName: Book name has set. Book.setBeanName invoke Book.setBeanFactory invoke Book.setApplicationContext invoke MyBeanPostProcessor.postProcessBeforeInitialization @PostConstruct Book.afterPropertiesSet invoke Book.myPostConstruct invoke MyBeanPostProcessor.postProcessAfterInitialization Book name = thingking in java @PreDestory Book.destory invoke Book.myPreDestory invoke ---------------destroy----------------- Book Initializing 设置BookSystem 的属性值 Book.setBeanName invoke SubBookClass.setBeanClassLoader() 方法被调用了 Book.setBeanFactory invoke SubBookClass.setEnvironment() 方法被调用了 SubBookClass.setEmbeddedValueResolver() 方法被调用了 SubBookClass.setResourceLoader() 方法被调用了 SubBookClass.setApplicationEventPublisher() 方法被调用了 SubBookClass.setMessageSource() 方法被调用了 Book.setApplicationContext invoke MyBeanPostProcessor.postProcessBeforeInitialization Book.afterPropertiesSet invoke Book.myPostConstruct invoke MyBeanPostProcessor.postProcessAfterInitialization BookSystemName = Java System Book.destory invoke Book.myPreDestory invoke ---------------destroy-----------------后记:这篇文章是我翻阅各种书籍和从网上查找资料,包括国外一些网站从而得到的结论,记录下来,但是我没有发现Spring Bean的生命周期(非常详细) 这篇文章中InstantiationAwareBeanPostProcessorAdapter 这个类和工厂后置处理器接口方法,知道的朋友欢迎指教,感谢。https://www.cnblogs.com/zrtqsk/p/3735273.html参考:https://www.cnblogs.com/zrtqsk/p/3735273.htmlhttps://www.journaldev.com/2637/spring-bean-life-cyclehttp://www.wideskills.com/spring/spring-bean-lifecyclewww.concretepage.com/spring/spring-bean-life-cycle-tutorial
目录GateWay配置在mysql定义表gateway_define, 表结构如下面的GatewayDefine实体类:定义repository和service,采用JPA实现定义MysqlRouteDefinitionRepository类,实现RouteDefinitionRepository接口的getRouteDefinitions方法,获取从数据库里面装载的路由配置,当然还有save和delete其他方法。在启动类GatewayServiceApplication中添加两个Bean。 添加ApplicationStartup类,在Spring Boot启动时装载路由配置信息, 说明看注释:其他最后后记Spring Cloud Gateway是由spring官方基于Spring5.0,Spring Boot2.0,Project Reactor等技术开发的网关,目的是代替原先版本中的Spring Cloud Netfilx Zuul。目前Netfilx已经开源了Zuul2.0,但Spring 没有考虑集成,而是推出了自己开发的Spring Cloud GateWay。该项目提供了一个构建在Spring Ecosystem之上的API网关,旨在提供一种简单而有效的途径来发送API,并向他们提供交叉关注点:例如:安全性,监控/指标和弹性。在这里废话少说,直接把我实现动态自定义路由的方法托出,共大家参考。由于水平有限,难免有不当或者错误之处,请大家指正,谢谢。GateWay配置一般的,我们如果使用Spring Cloud GateWay进行配置,类似于下面的样子:spring: cloud: gateway: discovery: locator: enabled: true routes: - id: sample-service-a uri: lb://SAMPLE-SERVICE-A-HA predicates: - Path=/customeradd/** filters: - RewritePath=/customeradd,/customer/add当我们要新增或者改变一个网关路由时,我们不得不停止网关服务,修改配置文件,保存再重新启动网关服务,这样才能让我们新的设置生效。设想一样,如果是在生产环境,为了一个小小的路由变更,这样的停止再重启恐怕谁也受不了吧。接下来,看看我们怎么能做到动态配置网关路由,让网关路由配置在服务不需要重启的情况生效。在mysql定义表gateway_define, 表结构如下面的GatewayDefine实体类:@Entity@Table(name = "gateway_define")public class GatewayDefine implements Serializable { @Id @GeneratedValue(strategy = GenerationType.AUTO) private String id; private String uri; private String predicates; private String filters; public String getId() { return id; } public void setId(String id) { this.id = id; } public String getUri() { return uri; } public void setUri(String uri) { this.uri = uri; } public String getPredicates() { return this.predicates; } public void setPredicates(String predicates) { this.predicates = predicates; } public List<PredicateDefinition> getPredicateDefinition() { if (this.predicates != null) { List<PredicateDefinition> predicateDefinitionList = JSON.parseArray(this.predicates, PredicateDefinition.class); return predicateDefinitionList; } else { return null; } } public String getFilters() { return filters; } public List<FilterDefinition> getFilterDefinition() { if (this.filters != null) { List<FilterDefinition> filterDefinitionList = JSON.parseArray(this.filters, FilterDefinition.class); return filterDefinitionList; } else { return null; } } public void setFilters(String filters) { this.filters = filters; } @Override public String toString() { return "GatewayDefine{" + "id='" + id + '\'' + ", uri='" + uri + '\'' + ", predicates='" + predicates + '\'' + ", filters='" + filters + '\'' + '}'; }}其中:id为Eureka注册的服务名; uri、predicates、filters分别应上面配置文件片段中的predicates和filters(这两个保存的都是json)定义repository和service,采用JPA实现@Repositorypublic interface GatewayDefineRepository extends JpaRepository<GatewayDefine, String> { @Override List<GatewayDefine> findAll(); @Override GatewayDefine save(GatewayDefine gatewayDefine); @Override void deleteById(String id); @Override boolean existsById(String id);}public interface GatewayDefineService { List<GatewayDefine> findAll() throws Exception; String loadRouteDefinition() throws Exception; GatewayDefine save(GatewayDefine gatewayDefine) throws Exception; void deleteById(String id) throws Exception; boolean existsById(String id)throws Exception;}@Servicepublic class GatewayDefineServiceImpl implements GatewayDefineService { @Autowired GatewayDefineRepository gatewayDefineRepository; @Autowired private GatewayDefineService gatewayDefineService; @Autowired private RouteDefinitionWriter routeDefinitionWriter; private ApplicationEventPublisher publisher; @Override public List<GatewayDefine> findAll() throws Exception { return gatewayDefineRepository.findAll(); } @Override public String loadRouteDefinition() { try { List<GatewayDefine> gatewayDefineServiceAll = gatewayDefineService.findAll(); if (gatewayDefineServiceAll == null) { return "none route defined"; } for (GatewayDefine gatewayDefine : gatewayDefineServiceAll) { RouteDefinition definition = new RouteDefinition(); definition.setId(gatewayDefine.getId()); definition.setUri(new URI(gatewayDefine.getUri())); List<PredicateDefinition> predicateDefinitions = gatewayDefine.getPredicateDefinition(); if (predicateDefinitions != null) { definition.setPredicates(predicateDefinitions); } List<FilterDefinition> filterDefinitions = gatewayDefine.getFilterDefinition(); if (filterDefinitions != null) { definition.setFilters(filterDefinitions); } routeDefinitionWriter.save(Mono.just(definition)).subscribe(); this.publisher.publishEvent(new RefreshRoutesEvent(this)); } return "success"; } catch (Exception e) { e.printStackTrace(); return "failure"; } } @Override public GatewayDefine save(GatewayDefine gatewayDefine) throws Exception { gatewayDefineRepository.save(gatewayDefine); return gatewayDefine; } @Override public void deleteById(String id) throws Exception { gatewayDefineRepository.deleteById(id); } @Override public boolean existsById(String id) throws Exception { return gatewayDefineRepository.existsById(id); }}注:loadRouteDefinition是重点,它从数据库里获取动态定义的路由,最后封装成RouteDefinition 类实例,调用RouteDefinitionWriter 的save方法保存。RouteDefinitionWriter是个接口,真正实现的是InMemoryRouteDefinitionRepository类,在InMemoryRouteDefinitionRepository定义了一个SynchronizedMap 类,所有的设置都在这儿保存。定义MysqlRouteDefinitionRepository类,实现RouteDefinitionRepository接口的getRouteDefinitions方法,获取从数据库里面装载的路由配置,当然还有save和delete其他方法。public class MysqlRouteDefinitionRepository implements RouteDefinitionRepository { @Autowired private GatewayDefineService gatewayDefineService; @Override public Flux<RouteDefinition> getRouteDefinitions() { try { List<GatewayDefine> gatewayDefineList = gatewayDefineService.findAll(); Map<String, RouteDefinition> routes = new LinkedHashMap<String, RouteDefinition>(); for (GatewayDefine gatewayDefine: gatewayDefineList) { RouteDefinition definition = new RouteDefinition(); definition.setId(gatewayDefine.getId()); definition.setUri(new URI(gatewayDefine.getUri())); List<PredicateDefinition> predicateDefinitions = gatewayDefine.getPredicateDefinition(); if (predicateDefinitions != null) { definition.setPredicates(predicateDefinitions); } List<FilterDefinition> filterDefinitions = gatewayDefine.getFilterDefinition(); if (filterDefinitions != null) { definition.setFilters(filterDefinitions); } routes.put(definition.getId(), definition); } return Flux.fromIterable(routes.values()); } catch (Exception e) { e.printStackTrace(); return Flux.empty(); } } @Override public Mono<Void> save(Mono<RouteDefinition> route) { return route.flatMap(r -> { try { GatewayDefine gatewayDefine = new GatewayDefine(); gatewayDefine.setId(r.getId()); gatewayDefine.setUri(r.getUri().toString()); gatewayDefine.setPredicates(JSON.toJSONString(r.getPredicates())); gatewayDefine.setFilters(JSON.toJSONString(r.getFilters())); gatewayDefineService.save(gatewayDefine); return Mono.empty(); } catch (Exception e) { e.printStackTrace(); return Mono.defer(() -> Mono.error(new NotFoundException("RouteDefinition save error: "+ r.getId()))); } }); } @Override public Mono<Void> delete(Mono<String> routeId) { return routeId.flatMap(id -> { try { gatewayDefineService.deleteById(id); return Mono.empty(); } catch (Exception e) { e.printStackTrace(); return Mono.defer(() -> Mono.error(new NotFoundException("RouteDefinition delete error: " + routeId))); } }); }}在启动类GatewayServiceApplication中添加两个Bean。@Bean public RouteDefinitionWriter routeDefinitionWriter() { return new InMemoryRouteDefinitionRepository(); } @Bean public MysqlRouteDefinitionRepository mysqlRouteDefinitionRepository() { return new MysqlRouteDefinitionRepository(); } 添加ApplicationStartup类,在Spring Boot启动时装载路由配置信息, 说明看注释: /*** 在Spring Boot程序启动后会检测程序中是否有CommandLineRunner* 和ApplicationRunner接口的实例,* 如果存在,则会执行对应实现类中的run()方法,而且只执行一次*/public class ApplicationStartup implements ApplicationRunner { @Autowired private GatewayDefineService gatewayDefineService; @Override public void run(ApplicationArguments args) throws Exception { gatewayDefineService.loadRouteDefinition(); }}完成。其他我这里面没有界面设置路由,我是在配置文件中配置我要的路由,然后通过 /actuator/gateway/routes 获取所有路由的json, 也可以通过 /actuator/gateway/routes/{id} 获取单独一个路由的json.然后手工往数据库里面插入数据,再把网关服务停止,删除配置文件中的路由设定,再重新启动网关功能,通过 /actuator/gateway/routes 能够获取同样的路由json, 通过curl访问设置的路由同样生效。当然完全可以独立开发一个应用,有界面来读取数据库中的路由配置,可以增加和修改路由信息。再通过Spring Cloud Config的配置来刷新多个网关路由的信息,实现多个网关服务的路由信息实时更新。反正有各种方法可供选择。最后完全是记录自己前一段时间的研究心得。水平有限,有什么不对的地方请大家指正。还有,Spring Cloud GateWay还不支持OAuth2, 所以想统一集成授权、认证等功能的还是使用ZUUL吧。下一次有时间,我会写一下ZUUL的动态路由功能实现以及避免频繁刷新路由信息。反正和GateWay相似,但是还是有区别的。实现参考了网上很多人的源码和文章,在此表示感谢!也阅读了Spring Cloud GateWay 部分源代码,对Spring Cloud GateWay有了一定的认识,聊以自慰。后记最新的Spring Cloud Greenwich.RELEASE中Gateway 过滤器新增支持OAuth2,我觉得可以抛弃ZUUL了。
什么是JPA?一种规范,并非ORM框架,也就是ORM上统一的规范用了之后可以做什么,为什么要用?代码解释:实体类package com.example.springredis.entity;import lombok.Data;import javax.persistence.Entity;import javax.persistence.GeneratedValue;import javax.persistence.GenerationType;import javax.persistence.Id;import java.io.Serializable;@Entity@Datapublic class User implements Serializable { @Id @GeneratedValue(strategy = GenerationType.AUTO) private Long id; private String name; private String account; private String pwd;}dao层@Repositorypublic interface UserDao extends JpaRepository<User, Long> {}测试类 @Autowired private UserDao userDao; public void findAllTest() { System.out.println(userDao.findAll().toString()); }上面的操作已经完成了一个查询全部,相信不用在做多余的解释了JPA优点:主要就是简单易用,集成方便,可以不用写SQL语句准备工作这里的环境JDK 1.8 以上IDEA 2018.2Gradle 4+ 或者 Maven 3.2+在https://start.spring.io/ 初始化一个项目这里使用的是Gradle下载之后请在IDEA导入项目build.gradle配置buildscript { ext { springBootVersion = '2.1.0.RELEASE' } repositories { mavenCentral() } dependencies { classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}") }}apply plugin: 'java-library'apply plugin: 'idea'apply plugin: 'org.springframework.boot'apply plugin: 'io.spring.dependency-management'group = 'com.example'version = '0.0.1-SNAPSHOT'sourceCompatibility = 1.8repositories { maven { url 'http://maven.aliyun.com/nexus/content/groups/public/' }}//Gradle3.4新增了Java-library插件,java-library插件使用了新的依赖配置implementation和api。旧的依赖配置compile被废弃dependencies { implementation('org.springframework.boot:spring-boot-starter-data-jpa') implementation('mysql:mysql-connector-java') compileOnly('org.projectlombok:lombok') testImplementation('org.springframework.boot:spring-boot-starter-test')}开始定义一个简单的实体package com.example.springbootjpademo.entity;import lombok.Data;import javax.persistence.Entity;import javax.persistence.GeneratedValue;import javax.persistence.GenerationType;import javax.persistence.Id;@Entity@Datapublic class User { @Id @GeneratedValue(strategy = GenerationType.IDENTITY) private Long id; private String name; private String ename; protected User() { } public User(String name, String ename) { this.name = name; this.ename = ename; } @Override public String toString() { /* JAVA字符串格式化-String.format() %s 字符串类型 %d 整数类型(十进制) */ return String.format("Customer[id=%d, name='%s', ename='%s']", id, name, ename); }}这里有一个User类,它有三个属性,id,name和ename。你还有两个构造函数。默认构造函数仅为JPA而存在。您不会直接使用它,因此它被指定为 protected 。另一个构造函数是您将用于创建要保存到数据库的user实例的构造函数。在User类上加 @Entity 注解,表示这个是一个 JPA 的实体,如果在 User 类上没有加 @Table 注解,表明该实体将映射到名为user的表,如果要加上 @Table ,可以在其 name 属性里写入表名,如: @Table(name = "t_user")User的id属性使用@Id注释,以便JPA将其识别为对象的ID。id属性也使用@GeneratedValue注释@GeneratedValue(strategy = GenerationType.IDENTITY) 自增长ID策略其他两个属性name和ename未注释。表明它们将映射到与属性本身相同一名称的列,比如,User实体中的name属性映射user表中的name列。toString() 方便将打印出实体的属性创建一个 UserRepository 接口这里很简单,直接继承核心接口JpaRepositorysrc/main/java/com/example/springbootjpademo/repository/UserRepository.java package com.example.springbootjpademo.repository;import com.example.springbootjpademo.entity.User;import org.springframework.data.jpa.repository.JpaRepository;import org.springframework.stereotype.Repository;@Repositorypublic interface UserRepository extends JpaRepository<User, Long> {}配置文件application.yml修改application.properties 为 application.ymlsrc/main/resources/application.ymlspring: # 数据源配置 datasource: driver-class-name: com.mysql.cj.jdbc.Driver url: jdbc:mysql://127.0.0.1:3306/test?characterEncoding=utf-8&useSSL=false username: root password: 123456 jpa: # 在 SrpingBoot 2.0 版本中,Hibernate 创建数据表的时候,默认的数据库存储引擎选择的是 MyISAM #(之前好像是 InnoDB,这点比较诡异)。这个参数是在建表的时候,将默认的存储引擎切换为 InnoDB 用的。 database-platform: org.hibernate.dialect.MySQL5InnoDBDialect # spring.jpa.show-sql=true 配置在日志中打印出执行的 SQL 语句信息。 show-sql: true # 配置指明在程序启动的时候要删除并且创建实体类对应的表。 # create 这个参数很危险,因为他会把对应的表删除掉然后重建。所以千万不要在生成环境中使用。只有在测试环境中,一开始初始化数据库结构的时候才能使用一次。 # ddl-auto:create----每次运行该程序,没有表格会新建表格,表内有数据会清空 # ddl-auto:create-drop----每次程序结束的时候会清空表 # ddl-auto:update----每次运行程序,没有表格会新建表格,表内有数据不会清空,只会更新(推荐) # ddl-auto:validate----运行程序会校验数据与数据库的字段类型是否相同,不同会报错 hibernate.ddl-auto: update建立测试类进行查询src/test/java/com/example/springbootjpademo/SpringbootJpaDemoApplicationTests.javapackage com.example.springbootjpademo;import com.example.springbootjpademo.repository.UserRepository;import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.junit4.SpringRunner;@RunWith(SpringRunner.class)@SpringBootTestpublic class SpringbootJpaDemoApplicationTests { @Autowired private UserRepository userRepository; @Test public void contextLoads() { System.out.println(userRepository.findAll().toString()); }}输出注意如果出现下列等错误:Error:(41, 13) java: 找不到符号 符号: 方法 setName(java.lang.String) 位置: 类型为com.example.springbootjpademo.entity.User的变量 user请注意下面的设置是否正确:其他操作src/test/java/com/example/springbootjpademo/SpringbootJpaDemoApplicationTests.javapackage com.example.springbootjpademo;import com.example.springbootjpademo.entity.User;import com.example.springbootjpademo.repository.UserRepository;import org.junit.After;import org.junit.Before;import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.junit4.SpringRunner;@RunWith(SpringRunner.class)@SpringBootTestpublic class SpringbootJpaDemoApplicationTests { @Autowired private UserRepository userRepository; @Test public void contextLoads() { System.out.println(userRepository.findAll().toString()); } @Before public void add() { userRepository.save(new User("英雄联盟", "lol")); } //修改操作 @After public void update() {// ifPresent 如果存在值,则使用值调用指定的使用者,否则不执行任何操作。 userRepository.findById(1L).ifPresent(user -> { user.setName("xiugaihou"); userRepository.save(user); System.out.println(user.toString()); }); } //删除 @After public void del() { userRepository.findById(2L).ifPresent(user -> userRepository.delete(user)); }}最后数据库的值:码云代码地址https://gitee.com/cuifuan/SpringBoot
看到大家对上篇《Java面试中遇到的坑》一文表现出强力的关注度,说明大家确实在面试中遇到了类似的难题。大家在文章留言处积极留言探讨面试中遇到的问题,其中几位同学还提出了自己的见解,我感到非常高兴,还有几位同学强烈要求给出题目答案,那我很乐意跟大家一起探讨分享这些题目。我将题目答案写下来,这个并非标准答案,有不准确的地方请大家辩证补充。1.StringBuilder替代String拼接,面试中经常会问到String,StringBuilder,StringBuffer的区别。解答:String类作为java语言中最常见的字符串类被广泛使用,如果在做大量字符串拼接效率时变得比较低,因为虚拟机需要不断地将对象引用指向新的地址。因此,一般方法内的私有变量推荐使用stringBuilder来完成,如果是多线程需要同步的自然选用stringBuffer。1.对参数未做空验证,就做判断值相等下面的写法将常量放到方法左边,能防止NPE。解答:关于这道题需要查看String类中equals的实现方法,当左边为Null时会出现NPE。因此后续代码中需要保证equals方法左边变量不为null。2.这个坑很多兄弟都踩过,知道为什么打印结果为false,而下面结果为true为啥解答:Integer类型当正整数小于128时是在内存栈中创建值的,并将对象指向这个值,这样当比较两个栈引用时因为是同一地址引用两者则相等。当大于127时将会调用new Integer(),两个整数对象地址引用不相等了。这就是为什么当值为128时不相等,当值为100时相等了。3.将变量作为参数传递,在方法中改变参数值,变量的值改变了么?下图total值到底是几?解答:将一个私有变量作为形参传递赋值并不会改变参数原有的值,但是如果将一个对象作为参数传递改变属性,对象的属性值就会随着改变。因此total的值仍然为0。4.由数组转换的list,只能循环遍历,而不能看长度,增加元素,删除元素,这是为何?下图代码执行竟然出错!解答:因为将数组转换的列表其实不是我们经常使用的arrayList,但只是数组中内部定义的一种数据结构类型,本质还是原数组而并非列表,因此当向列表添加元素就会出现错误,这道题上当的兄弟不少吧。5.将列表中李明的名字移除掉,下图实现有无问题?解答:在列表中移除最后一个元素按说应该没有问题的,但是这个算法还是出现了错误,主要是这种写法的列表循环遵循下表索引查找,当移除某个元素时,上次计算出来的长度超过了当前列表长度,故而会出现越界错误。6.在指定目录下创建文件目录,到底使用哪一种呢,两个方法都没报错,为何第一次没创建目录而第二次创建?解答:mkdirs()可以建立多级文件夹,而mkdir()只会建立一级的文件夹。这个主要依靠java底层调用操作系统的实现,作者愚见无须弄明白底层实现原理,只要使用中能区分不留BUG就好。7.老板从客户那里回来后骂了我一顿,说是客户界面显示金额很奇怪,我的代码那里有问题?解答:这道题如果我是老板我会打你的,因为老板之前也写过代码。老板之所以骂你是因为客户看到的太奇怪以至于看不懂,两个float类型数据相减会丢失精度,尾部带着常常的一串数字。如果实际场景要做计算我给你两个思路:第一可以用bigdecimal来计算,第二先将单位做成整数再做除法。8.面试官:你能说出来java中实现多线程的几种方法么?面试者:继承Thread类和实现runnable接口!面试官:除此以外还有方法么?面试者:就这两种实现啊?!面试官:我们聊点别的,呵呵。解答:这道题最有意思了,一般人都会干脆利索回答前两个,我工作前3年每次都是信心满满这样回答问题的。其实 实现多线程还可以实现Callable接口,利用task来接受异步线程的执行结果。希望后面再被面试官问到这道题可千万别再入坑,并且能回答这第三种方式跟前两种不同的地方(可以获取执行结果)。9.我只是想让这个好好循环三次,有什么问题么?解答:这道题主要是犯了整数符号位的问题了,大家可以了解一下Integer.MAX_VALUE加上1以后的数值这道陷阱题就解决了。10.这个一直没有弄明白,到底返回哪个true还是false?解答:这道题大家需要弄明白finally的使用场景,主要是捕获异常以后必须要执行的代码,大多是关闭流之类的。即使try的代码块已经返回结果但程序仍然会执行finally里面的代码,因此上题返回false。大家可以看看下面这道题返回结果是多少?欢迎留言,答对的同学自己给自己晚饭加鸡腿。
注:这里的支付是沙箱模拟支付蚂蚁金服开放平台注册地址:https://open.alipay.com/platform/home.htm支付宝扫码登陆 -> 注册为自主研发者支付宝提供一键生成工具便于开发者生成一对RSA2密钥:https://docs.open.alipay.com/291/105971该工具使用需要java环境windows安装java环境:https://blog.csdn.net/edison_03/article/details/79757591Mac安装java环境:https://www.cnblogs.com/xqx-qyy/p/7659805.html注意:生成时一定要选择PKCS8+2048将应用网关和回调地址更改为:https://www.alipay.comAES密钥不用管然后往下会有支付宝沙箱安卓端工具,下载,以供后续支付使用进入页面左侧导航栏沙箱账号,沙箱安卓端安装后用买家账号登陆到这里基本配置就完了,下面进入大家喜欢的代码时间:package com.alipay.config; import java.io.FileWriter; import java.io.IOException; /* * *类名:AlipayConfig *作者:有梦想一起实现 */ public class AlipayConfig{ // ↓↓↓↓↓↓↓↓↓↓请在这里配置您的基本信息↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓ // 应用ID,您的APPID,收款账号既是您的APPID对应支付宝账号 public static String app_id = "APPID";//例:2016082600317257 // 商户私钥,您的PKCS8格式RSA2私钥 public static String merchant_private_key = "商户私钥!!!!私钥!!!不是公钥!!!"; // 支付宝公钥,查看地址:https://openhome.alipay.com/platform/keyManage.htm // 对应APPID下的支付宝公钥。 public static String alipay_public_key = "支付宝公钥,记得是支付宝公钥!!!!!!!支付宝公钥"; // 服务器异步通知页面路径 需http://格式的完整路径,不能加?id=123这类自定义参数,必须外网可以正常访问 /** * 返回的时候此页面不会返回到用户页面,只会执行你写到控制器里的地址 */ public static String notify_url = "你的服务器地址/项目名称/notify_url"; // 页面跳转同步通知页面路径 需http://格式的完整路径,不能加?id=123这类自定义参数,必须外网可以正常访问 /** * 此页面是同步返回用户页面,也就是用户支付后看到的页面,上面的notify_url是异步返回商家操作,谢谢 * 要是看不懂就找度娘,或者多读几遍,或者去看支付宝第三方接口API,不看API直接拿去就用,遇坑不怪别人 */ public static String return_url = " 你的服务器地址/项目名称/return_url"; // 签名方式 public static String sign_type = "RSA2"; // 字符编码格式 public static String charset = "gbk"; // 支付宝网关 public static String gatewayUrl = "https://openapi.alipaydev.com/gateway.do"; // 日志地址 public static String log_path = "D:/logs/"; // ↑↑↑↑↑↑↑↑↑↑请在这里配置您的基本信息↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑ /** * 写日志,方便测试(看网站需求,也可以改成把记录存入数据库) * * @param sWord * 要写入日志里的文本内容 */ public static void logResult(String sWord) { FileWriter writer = null; try { writer = new FileWriter(log_path + "alipay_log_" + System.currentTimeMillis() + ".txt"); writer.write(sWord); } catch (Exception e) { e.printStackTrace(); } finally { if (writer != null) { try { writer.close(); } catch (IOException e) { e.printStackTrace(); } } } } }如果你是在本地测试,支付完成不会跳转回调页面,那么就需要外网了推荐一个东西,叫内网穿透,只要你电脑tomcat启动,可以连接外网,就可以使用。NATAPP 提供免费的测试足够:https://natapp.cn/ngrok或者frp以及其他免费开源,自行搜索了解<dependency> <groupId>com.pentahohub.nexus</groupId> <artifactId>alipay-sdk-java</artifactId> <version>20150820220052</version> </dependency>如果上面的依赖失效或者无法使用,依赖下载地址:http://central.maven.org/maven2/com/pentahohub/nexus/alipay-sdk-java/20150820220052/alipay-sdk-java-20150820220052.jar/** * 快捷支付调用支付宝支付接口 * @param model,id,payables, * @throws IOException,AlipayApiException * @return Object * @author 有梦想一起实现 */ @RequestMapping("alipaySum") public Object alipayIumpSum(Model model, String payables, String subject, String body, HttpServletResponse response) throws Exception { // 获得初始化的AlipayClient AlipayClient alipayClient = new DefaultAlipayClient(AlipayConfigInfo.gatewayUrl, AlipayConfigInfo.app_id, AlipayConfigInfo.merchant_private_key, "json", AlipayConfigInfo.charset, AlipayConfigInfo.alipay_public_key, AlipayConfigInfo.sign_type); // 设置请求参数 AlipayTradePagePayRequest alipayRequest = new AlipayTradePagePayRequest(); alipayRequest.setReturnUrl(AlipayConfigInfo.return_url); alipayRequest.setNotifyUrl(AlipayConfigInfo.notify_url2); SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmssSSS"); // 商户订单号,商户网站订单系统中唯一订单号,必填 String out_trade_no = sdf.format(new Date()); // 付款金额,必填 String total_amount = payables.replace(",", ""); alipayRequest.setBizContent("{\"out_trade_no\":\"" + out_trade_no + "\"," + "\"total_amount\":\"" + total_amount + "\"," + "\"subject\":\"" + subject + "\"," + "\"body\":\"" + body + "\"," + "\"product_code\":\"FAST_INSTANT_TRADE_PAY\"}"); // 请求 String result = alipayClient.pageExecute(alipayRequest).getBody(); // System.out.println(result); AlipayConfigInfo.logResult(result);// 记录支付日志 response.setContentType("text/html; charset=gbk"); PrintWriter out = response.getWriter(); out.print(result); return null; }参数传入是必须有的,不然会报订单信息有误。如果有其他额外参数,请参考支付宝第三方API文档:https://docs.open.alipay.com/api_1/alipay.trade.create/这里支付完成会回调两个接口,notify_url和return_url,就是在配置类配置的两个接口:1、notify_url接口->异步回调的后台操作/** * 支付完成回调验证操作 * @param response,request * @throws Exception * @return void * @author 有梦想一起实现 */ @RequestMapping("notify_url") public void Notify(HttpServletResponse response, HttpServletRequest request) throws Exception { System.out.println("----------------------------notify_url------------------------"); // 商户订单号 String out_trade_no = new String(request.getParameter("out_trade_no").getBytes("ISO-8859-1"), "GBK"); // 付款金额 String total_amount = new String(request.getParameter("total_amount").getBytes("ISO-8859-1"), "GBK"); // 支付宝交易号 String trade_no = new String(request.getParameter("trade_no").getBytes("ISO-8859-1"), "GBK"); // 交易说明 String cus = new String(request.getParameter("body").getBytes("ISO-8859-1"), "GBK"); // 交易状态 String trade_status = new String(request.getParameter("trade_status").getBytes("ISO-8859-1"), "GBK"); if (trade_status.equals("TRADE_SUCCESS")) {//支付成功商家操作 //下面是我写的一个简单的插入操作,根据你的操作自行编写 /*Map<Object, Object> map = new HashMap<Object, Object>(); map.put("cuId", Integer.valueOf(cus)); RepaymentPlan repaymentPlan = new RepaymentPlan(); Integer id = Integer.valueOf(out_trade_no); double payablesCheck = Double.valueOf(total_amount); RepaymentPlan repayCheck = serviceMain.selectByPrimaryKey(id); double total = repayCheck.getPayables(); if (Double.valueOf(total_amount) < repayCheck.getPayables()) { map.put("ubalance", total - Double.valueOf(total_amount)); serviceMain.updateCusMoney(map); } repaymentPlan.setId(id); repaymentPlan.setActualPayment(total); repaymentPlan.setRepaymentStatus(1); int i = serviceMain.updateByPrimaryKeySelective(repaymentPlan); System.out.println("---------------------还款影响行数----------------------------" + i);*/ } }2、return_url 接口->同步通知返回的是页面/** * 同步通知的页面的Controller * 我这边就简单的返回了一个页面 * @param request,response * @throws InterruptedException */ @RequestMapping("return_url") public String Return_url() throws InterruptedException { return "alipayexit"; }<form name=alipayment action=alipay.trade.page.pay.jsp method=post target="_blank"> <div id="body1" class="show" name="divcontent"> <dl class="content"> <dt>商户订单号 :</dt> <dd> <input id="WIDout_trade_no" name="WIDout_trade_no" /> </dd> <hr class="one_line"> <dt>订单名称 :</dt> <dd> <input id="WIDsubject" name="WIDsubject" /> </dd> <hr class="one_line"> <dt>付款金额 :</dt> <dd> <input id="WIDtotal_amount" name="WIDtotal_amount" /> </dd> <hr class="one_line"> <dt>商品描述:</dt> <dd> <input id="WIDbody" name="WIDbody" /> </dd> <hr class="one_line"> <dt></dt> <dd id="btn-dd"> <span class="new-btn-login-sp"> <button class="new-btn-login" type="submit" style="text-align: center;">付 款</button> </span> <span class="note-help">如果您点击“付款”按钮,即表示您同意该次的执行操作。</span> </dd> </dl> </div> </form> <!--这里的target为_blank是新打开一个窗口-->支付宝接口的SDK&DEMO地址:https://docs.open.alipay.com/270/106291/
备忘录,备份曾经发生过的历史记录,以防忘记,之后便可以轻松回溯过往。想必我们曾经都干过很多蠢事导致糟糕的结果,当后悔莫及的时候已经是覆水难收了,只可惜这世界上没有后悔药,事后我们能做的只能去弥补过失,总结经验。除非穿越时空,时光倒流,利用爱因斯坦狭义相对论,超越光速回到过去,破镜重圆。然而世界是残酷的,人类至今最快的载人交通工具连达到光速的万分之一都显得遥不可及,更别说超越了。光速,宇宙间永远无法打破的时空屏障,它像是上帝定义的常量C,将时间牢牢地套死在坐标轴上,自创世宇宙大爆炸开始就让它不断流逝,如同播放一部不可回退的电影一样,暮去朝来,谁也无法打破。但在计算机世界里,人类便是神一般的存在,各种回滚,倒退,载入历史显得稀松平常,例如数据库恢复、游戏存盘载入、操作系统快照恢复、打开备份文档、手机恢复出厂设置……为了保证极简风格,我们这里以文档操作来举例说明这个设计模式。假设某位作者要写一部科幻小说,当他打开编辑器软件以及创建文档开始创作的时候,我们来思考下这个场景需要哪些类。很简单,首先我们得有一个文档类Doc。1public class Doc { 2 private String title;//文章标题 3 private String body;//文章内容 4 5 public Doc(String title){//新建文档先命名 6 this.title = title; 7 this.body = ""; 8 } 9 10 public void setTitle(String title) { 11 this.title = title; 12 } 13 14 public String getTitle() { 15 return title; 16 } 17 18 public String getBody() { 19 return body; 20 } 21 22 public void setBody(String body) { 23 this.body = body; 24 } 25}没什么好说的,一个简单的Java Bean,包括标题与内容。有了文档那一定要有编辑器去修改它了,看代码。1public class Editor {//编辑器 2 private Doc doc;//文档引用 3 4 public Editor(Doc doc) { 5 System.out.println("<<<打开文档" + doc.getTitle()); 6 this.doc = doc; 7 show(); 8 } 9 10 public void append(String txt) { 11 System.out.println("<<<插入操作"); 12 doc.setBody(doc.getBody() + txt); 13 show(); 14 } 15 16 public void save(){ 17 System.out.println("<<<存盘操作"); 18 } 19 20 public void delete(){ 21 System.out.println("<<<删除操作"); 22 doc.setBody(""); 23 show(); 24 } 25 26 private void show(){//显示当前文本内容 27 System.out.println(doc.getBody()); 28 System.out.println("文章结束>>>\n"); 29 } 30}当编辑器打开一个文档后会持有其引用,这里我们写在编辑器构造方法里。编辑器主要的功能当然是对文档进行更改了,依然保持简单的操作模拟,我们只加入append插入功能、delete清空功能,以及save存盘方法和最后的show方法用于显示文档内容。一切就绪,接下来看看我们的作者怎样写出一部惊世骇俗的科幻小说《AI的觉醒》。1public class Author { 2 public static void main(String[] args) { 3 Editor editor = new Editor(new Doc("《AI的觉醒》")); 4 /* 5 <<<打开文档《AI的觉醒》 6 7 文章结束>>> 8 */ 9 editor.append("第一章 混沌初开"); 10 /* 11 <<<插入操作 12 第一章 混沌初开 13 文章结束>>> 14 */ 15 editor.append("\n 正文2000字……"); 16 /* 17 <<<插入操作 18 第一章 混沌初开 19 正文2000字…… 20 文章结束>>> 21 */ 22 editor.append("\n第二章 荒漠之花\n 正文3000字……"); 23 /* 24 <<<插入操作 25 第一章 混沌初开 26 正文2000字…… 27 第二章 荒漠之花 28 正文3000字…… 29 文章结束>>> 30 */ 31 editor.delete(); 32 /* 33 <<<删除操作 34 35 文章结束>>> 36 */ 37 } 38}鬼才作者开始了创作,一切进行地非常顺利,一气呵成写完了二章内容(第22行操作),于是他离开电脑去倒了杯咖啡,噩耗在此间发生了,他的熊孩子不知怎么就按下了Ctr+A,Delete触发了第31行的操作,导致全文丢失,从内存里被清空,而且离开前作者疏忽大意也没有进行存盘操作,这下彻底完了,5000字的心血付诸东流。此场景该如何是好?大家都想到了Ctr+z的操作吧?它可以瞬间撤销上一步操作并回退到前一个版本,不但让我们有吃后悔药的机会,而且还不需要频繁的去存盘备份。那么这个机制是怎样实现的呢?既然可以回溯历史,那一定得有一个历史备忘类来记录每步操作后的文本状态记录了,它同样是一个简单的Java Bean。1public class History { 2 private String body;//用于备忘文章内容 3 4 public History(String body){ 5 this.body = body; 6 } 7 8 public String getBody() { 9 return body; 10 } 11}有了这个类,我们便可以记录文档的内容快照了,在初始化时把文档内容传进来。那谁来生成这些历史记录呢?我们可以放在文档类里,让文档类具备创建与恢复历史记录的功能,我们对Doc文档类做如下修改。1public class Doc { 2 private String title;//文章名字 3 private String body;//文章内容 4 5 public Doc(String title){//新建文档先命名 6 this.title = title; 7 this.body = ""; 8 } 9 10 public void setTitle(String title) { 11 this.title = title; 12 } 13 14 public String getTitle() { 15 return title; 16 } 17 18 public String getBody() { 19 return body; 20 } 21 22 public void setBody(String body) { 23 this.body = body; 24 } 25 26 public History createHistory() { 27 return new History(body);//创建历史记录 28 } 29 30 public void restoreHistory(History history){ 31 this.body = history.getBody();//恢复历史记录 32 } 33}可以看到自第26行开始我们加入了这两个功能,只要简单的调用,便可以生成当下的历史记录,以及来去自如的恢复内容到任一历史时刻。接下来得有对历史记录的逻辑控制,也就是我们期待已久的撤销功能了,继续对编辑器类做如下修改。1public class Editor { 2 private Doc doc; 3 private List<History> historyRecords;// 历史记录列表 4 private int historyPosition = -1;// 历史记录当前位置 5 6 public Editor(Doc doc) { 7 System.out.println("<<<打开文档" + doc.getTitle()); 8 this.doc = doc; // 注入文档 9 historyRecords = new ArrayList<>();// 初始化历史记录 10 backup();// 保存一份历史记录 11 show();//显示内容 12 } 13 14 public void append(String txt) { 15 System.out.println("<<<插入操作"); 16 doc.setBody(doc.getBody() + txt); 17 backup();//操作完成后保存历史记录 18 show(); 19 } 20 21 public void save(){ 22 System.out.println("<<<存盘操作"); 23 } 24 25 public void delete(){ 26 System.out.println("<<<删除操作"); 27 doc.setBody(""); 28 backup();//操作完成后保存历史记录 29 show(); 30 } 31 32 private void backup() { 33 historyRecords.add(doc.createHistory()); 34 historyPosition++; 35 } 36 37 private void show() {// 显示当前文本内容 38 System.out.println(doc.getBody()); 39 System.out.println("文章结束>>>\n"); 40 } 41 42 public void undo() {// 撤销操作:如按下Ctr+Z,回到过去。 43 System.out.println(">>>撤销操作"); 44 if (historyPosition == 0) { 45 return;//到头了,不能再撤销了。 46 } 47 historyPosition--;//历史记录位置回滚一笔 48 History history = historyRecords.get(historyPosition); 49 doc.restoreHistory(history);//取出历史记录并恢复至文档 50 show(); 51 } 52 53 // public void redo(); 省略实现代码 54}在第3行我们加入了一个历史记录列表,它就像是时间轴一样按顺序地按index记录每个时间点的历史事件,从某种意义上看它更像是一本历史书。接下来加入的第32行backup方法会从文档中拿出快照并插入历史书,并于每个暴露给客户端作者的操作方法内被调用,做好历史的传承。最后我们加入第42行的撤销操作,让时间点回溯一个单位并恢复此处的快照至文档。当编辑器拥有了撤销功能后,我们的鬼才作者将高枕无忧的去倒咖啡了。1public class Author { 2 public static void main(String[] args) { 3 Editor editor = new Editor(new Doc("《AI的觉醒》")); 4 /* 5 <<<打开文档《AI的觉醒》 6 7 文章结束>>> 8 */ 9 editor.append("第一章 混沌初开"); 10 /* 11 <<<插入操作 12 第一章 混沌初开 13 文章结束>>> 14 */ 15 editor.append("\n 正文2000字……"); 16 /* 17 <<<插入操作 18 第一章 混沌初开 19 正文2000字…… 20 文章结束>>> 21 */ 22 editor.append("\n第二章 荒漠之花\n 正文3000字……"); 23 /* 24 <<<插入操作 25 第一章 混沌初开 26 正文2000字…… 27 第二章 荒漠之花 28 正文3000字…… 29 文章结束>>> 30 */ 31 editor.delete(); 32 /* 33 <<<删除操作 34 35 文章结束>>> 36 */ 37 38 //吃下后悔药,我的世界又完整了。 39 editor.undo(); 40 /* 41 >>>撤销操作 42 第一章 混沌初开 43 正文2000字…… 44 第二章 荒漠之花 45 正文3000字…… 46 文章结束>>> 47 */ 48 } 49}可以看到,熊孩子做了delete操作后,作者轻松淡定地按下了Ctr+z,一切恢复如初,世界依旧美好,挽回那逝去的青葱岁月。当然,代码中我们略去了一些功能,比如读者还可以加入重做redo操作,弹指之间,让历史在时间轴上来去自如,我的电脑我做主,时空穿梭,逆天之做。诚然,任何模式都有其优缺点,备忘录虽然看起来完美,但如果历史状态内容过大,会导致内存消耗严重,别忘了那边历史书的list是在内存中的哦,所以我们一定要依场景灵活运用,切不可生搬硬套。
面向对象,是对事物属性与行为的封装,方法,指的就是行为。模板方法,显而易见是说某个方法充当了模板的作用,其充分利用了抽象类虚实结合的特性,虚部抽象预留,实部固定延续,以达到将某种固有行为延续至子类的目的。反观接口,则达不到这种目的。要搞明白模板方法,首先我们从接口与抽象类的区别切入,这也是面试官经常会问到的问题。汽车上的接口最常见的就是这几个了,点烟器,USB,AUX等等,很明显这些都是接口,它们都预留了某种标准,暴露在系统外部,并与外设对接。就拿点烟器接口来说吧,它原本是专门用于给点烟器供电的,后来由于这个接口在汽车上的通用性,于是衍生出了各种外部设备,只要是符合这个标准size的,带正负极簧片的,直流12V的,那就可以使用,比如导航、行车记录仪、吸尘器什么的,以及其他各种车载电子设备。public interface CigarLighterInterface {//点烟器接口 //供电方法,16V直流电 public void electrifyDC16V(); }1public class GPS implements CigarLighterInterface { 2 //导航的实现 3 @Override 4 public void electrifyDC16V() { 5 System.out.println("连接卫星"); 6 System.out.println("定位。。。"); 7 } 8 9}1public class CigarLighter implements CigarLighterInterface { 2 //点烟器的实现 3 @Override 4 public void electrifyDC16V() { 5 int time = 1000; 6 while(--time>0){ 7 System.out.println("加热电炉丝"); 8 } 9 System.out.println("点烟器弹出"); 10 } 11 12}对于点烟器接口来说,它根本不在乎也不知道对接的外设是什么鬼,它只是定义了一种规范,一种标准,只要符合的都可以对接。再比如USB接口的应用更加广泛,外设更是应有尽有,具体例子可以参考文章《设计模式是什么鬼(初探)》。以上我们可以体会到接口的抽象是淋漓尽致的,实现是空无的,也就是说其方法都是无实现的。然而这样在某些场景下会存在一些问题,例如有时候我们在父类中只需抽象出一些方法,并且同时也有一些实体方法,以供子类直接继承,这怎么办?答案就是抽象类。举个例子,哺乳动物类,我们人类就是哺乳动物。什么?鲸鱼是哺乳类?是的,凡是喂奶的都是哺乳类,不要以为会游泳的都是鱼,会飞的都是鸟,蝙蝠同样是哺乳类,只不过是老鼠中的飞行员而已。既然如此这哺乳动物肯定是都能喂奶了,但是到底是跑还是游,或是飞呢还真不好说,但至少可以确认它们都是可以移动的。言归正传,我们开始定义哺乳动物抽象类。1public abstract class Mammal { 2 3 //既然是哺乳动物当然会喂奶了,但这里约束为只能母的喂奶 4 protected final void feedMilk(){ 5 if(female){//如果是母的…… 6 System.out.println("喂奶"); 7 }else{//如果是公的……或者可以抛个异常出去。 8 System.out.println("公的不会"); 9 } 10 } 11 12 //哺乳动物当然可以移动,但具体怎么移动还不知道。 13 public abstract void move(); 14}这里我们省略了female属性,其作用是为了控制喂奶行为,大家可以自行添加。可以看到的是,抽象类不同于接口,其自身是可以有具体实现的,也就是说抽象类是虚实结合的,虚部抽象行为,实部遗传给子类,虚虚实实,飘忽不定。OK,我们看下人、鲸、蝠的子类实现。public class Human extends Mammal { @Override public void move() { System.out.println("两条腿走路……"); } }public class Whale extends Mammal { @Override public void move() { System.out.println("游泳……"); } }public class Bat extends Mammal { @Override public void move() { System.out.println("用翅膀飞……"); } }可以看到子类的各路实现都是自己独有的行为方式,而喂奶那个行为是不需要自己实现的,它是属于抽象哺乳类的共有行为,哺乳子类不能进行任何干涉。这便是接口与抽象的最大区别了,接口是虚的,抽象类可以有虚有实,接口不在乎实现类是什么,抽象类会延续其基因给子类。其实到这里我们已经说了一大半了,理解了以上部分,剩下的部分就非常简单了,利用抽象类的这个特性,便有了“模板方法”。举例说明,我们做软件项目管理,按瀑布式简单来讲分为:需求分析、设计、编码、测试、发布,先不管是用何种方式去实现各个细节,我们就抽象成这五个步骤。public abstract class PM { protected abstract void analyze();//需求分析 protected abstract void design();//设计 protected abstract void develop();//开发 protected abstract boolean test();//测试 protected abstract void release();//发布 }那么问题来了,有个程序员在需求不明确或者设计不完善的情况下,一上来二话不说直接写代码,这样就会造成资源的浪费,后期改动太大还会影响项目进度。那么项目经理这时就应该规范一下这个任务流程,这里我们加入kickoff()方法实现。1public abstract class PM { 2 protected abstract void analyze();//需求分析 3 protected abstract void design();//设计 4 protected abstract void develop();//开发 5 protected abstract boolean test();//测试 6 protected abstract void release();//发布 7 8 protected final void kickoff(){ 9 analyze(); 10 design(); 11 develop(); 12 test(); 13 release(); 14 } 15}这样就限制了整个项目周期的任务流程,注意这里要用final声明此方法子类不可以重写,只能乖乖的继承下去用。至于其他的抽象方法,子类可以自由发挥,比如测试方法test(),子类可以用人工测试,自动化测试,我们不care,我们是站在项目管理的抽象高度,只把控流程进度。这里甚至我们还可以加入一些逻辑如下。1public abstract class PM { 2 protected abstract void analyze();//需求分析 3 protected abstract void design();//设计 4 protected abstract void develop();//开发 5 protected abstract boolean test();//测试 6 protected abstract void release();//发布 7 8 protected final void kickoff(){ 9 analyze(); 10 design(); 11 do { 12 develop(); 13 } while (!test());//如果测试失败,则继续开发改Bug。 14 release(); 15 } 16}以下子类只需实现抽象方法,而不用实现固有的模板方法kickoff(),因为它已经被父类PM实现了,并且子类也不能进行重写。1public class AutoTestPM extends PM{ 2 3 @Override 4 protected void analyze() { 5 System.out.println("进行业务沟通,需求分析"); 6 } 7 8 //design();develop();test();release();实现省略 9}至此,我们的模板方法就完成了,抽象类PM中的实方法kickoff()中,以某种逻辑编排调用了其他各个抽象方法,提供了一种固定模式的行为方式或是指导方针,以此达到虚实结合、柔中带刚、刚柔并济,灵活中不失规范的目的。当然大部分情况我们使用接口会多于抽象类,因为接口灵活啊,抽象类不允许多继承啊等等,其实我们还是要看应用场景,在某种无规矩不成方圆,或者规范比较明确,的情况下抽象类的应用是有必要的,世间万物没有最好的,只有最合适的。
上一篇文章我讲解 Stream 流的基本原理,以及它的基本方法使用,本篇文章我们继续讲解流的其他操作没有看过上篇文章的可以先点击进去学习一下 简洁又快速地处理集合——Java8 Stream(上)值得注意的是:学习 Stream 之前必须先学习 lambda 的相关知识。本文也假设读者已经掌握 lambda 的相关知识。本篇文章主要内容:一种特化形式的流——数值流Optional 类如何构建一个流collect 方法并行流相关问题一. 数值流前面介绍的如int sum = list.stream().map(Person::getAge).reduce(0, Integer::sum);计算元素总和的方法其中暗含了装箱成本,map(Person::getAge) 方法过后流变成了 Stream类型,而每个 Integer 都要拆箱成一个原始类型再进行 sum 方法求和,这样大大影响了效率。针对这个问题 Java 8 有良心地引入了数值流 IntStream, DoubleStream, LongStream,这种流中的元素都是原始数据类型,分别是 int,double,long1. 流与数值流的转换流转换为数值流mapToInt(T -> int) : return IntStreammapToDouble(T -> double) : return DoubleStreammapToLong(T -> long) : return LongStreamIntStream intStream = list.stream().mapToInt(Person::getAge);当然如果是下面这样便会出错LongStream longStream = list.stream().mapToInt(Person::getAge);因为 getAge 方法返回的是 int 类型(返回的如果是 Integer,一样可以转换为 IntStream)数值流转换为流很简单,就一个 boxedStream<Integer> stream = intStream.boxed();2. 数值流方法下面这些方法作用不用多说,看名字就知道:sum()max()min()average() 等...3. 数值范围IntStream 与 LongStream 拥有 range 和 rangeClosed 方法用于数值范围处理IntStream : rangeClosed(int, int) / range(int, int)LongStream : rangeClosed(long, long) / range(long, long)这两个方法的区别在于一个是闭区间,一个是半开半闭区间:rangeClosed(1, 100) :[1, 100]range(1, 100) :[1, 100)我们可以利用 IntStream.rangeClosed(1, 100) 生成 1 到 100 的数值流求 1 到 10 的数值总和: IntStream intStream = IntStream.rangeClosed(1, 10); int sum = intStream.sum();二. Optional 类NullPointerException 可以说是每一个 Java 程序员都非常讨厌看到的一个词,针对这个问题, Java 8 引入了一个新的容器类 Optional,可以代表一个值存在或不存在,这样就不用返回容易出问题的 null。之前文章的代码中就经常出现这个类,也是针对这个问题进行的改进。Optional 类比较常用的几个方法有:isPresent() :值存在时返回 true,反之 flaseget() :返回当前值,若值不存在会抛出异常orElse(T) :值存在时返回该值,否则返回 T 的值Optional 类还有三个特化版本 OptionalInt,OptionalLong,OptionalDouble,刚刚讲到的数值流中的 max 方法返回的类型便是这个Optional 类其中其实还有很多学问,讲解它说不定也要开一篇文章,这里先讲那么多,先知道基本怎么用就可以。三. 构建流之前我们得到一个流是通过一个原始数据源转换而来,其实我们还可以直接构建得到流。1. 值创建流Stream.of(T...) : Stream.of("aa", "bb") 生成流生成一个字符串流 Stream<String> stream = Stream.of("aaa", "bbb", "ccc");Stream.empty() : 生成空流2. 数组创建流根据参数的数组类型创建对应的流:Arrays.stream(T[ ])Arrays.stream(int[ ])Arrays.stream(double[ ])Arrays.stream(long[ ])值得注意的是,还可以规定只取数组的某部分,用到的是Arrays.stream(T[], int, int)只取索引第 1 到第 2 位的: int[] a = {1, 2, 3, 4}; Arrays.stream(a, 1, 3).forEach(System.out :: println); 打印 2 ,33. 文件生成流Stream<String> stream = Files.lines(Paths.get("data.txt"));每个元素是给定文件的其中一行4. 函数生成流两个方法:iterate : 依次对每个新生成的值应用函数generate :接受一个函数,生成一个新的值Stream.iterate(0, n -> n + 2) 生成流,首元素为 0,之后依次加 2 Stream.generate(Math :: random) 生成流,为 0 到 1 的随机双精度数 Stream.generate(() -> 1) 生成流,元素全为 1四. collect 收集数据coollect 方法作为终端操作,接受的是一个 Collector 接口参数,能对数据进行一些收集归总操作1. 收集最常用的方法,把流中所有元素收集到一个 List, Set 或 Collection 中toListtoSettoCollectiontoMapList newlist = list.stream.collect(toList());//如果 Map 的 Key 重复了,可是会报错的哦 Map<Integer, Person> map = list.stream().collect(toMap(Person::getAge, p -> p));2. 汇总(1)counting用于计算总和:long l = list.stream().collect(counting());没错,你应该想到了,下面这样也可以:long l = list.stream().count();推荐第二种(2)summingInt ,summingLong ,summingDoublesumming,没错,也是计算总和,不过这里需要一个函数参数计算 Person 年龄总和:int sum = list.stream().collect(summingInt(Person::getAge));当然,这个可以也简化为:int sum = list.stream().mapToInt(Person::getAge).sum();除了上面两种,其实还可以:int sum = list.stream().map(Person::getAge).reduce(Interger::sum).get();推荐第二种由此可见,函数式编程通常提供了多种方式来完成同一种操作(3)averagingInt,averagingLong,averagingDouble看名字就知道,求平均数Double average = list.stream().collect(averagingInt(Person::getAge));当然也可以这样写OptionalDouble average = list.stream().mapToInt(Person::getAge).average();不过要注意的是,这两种返回的值是不同类型的(4)summarizingInt,summarizingLong,summarizingDouble这三个方法比较特殊,比如 summarizingInt 会返回 IntSummaryStatistics 类型IntSummaryStatistics l = list.stream().collect(summarizingInt(Person::getAge));IntSummaryStatistics 包含了计算出来的平均值,总数,总和,最值,可以通过下面这些方法获得相应的数据3. 取最值maxBy,minBy 两个方法,需要一个 Comparator 接口作为参数Optional<Person> optional = list.stream().collect(maxBy(comparing(Person::getAge)));我们也可以直接使用 max 方法获得同样的结果Optional<Person> optional = list.stream().max(comparing(Person::getAge));4. joining 连接字符串也是一个比较常用的方法,对流里面的字符串元素进行连接,其底层实现用的是专门用于字符串连接的 StringBuilderString s = list.stream().map(Person::getName).collect(joining()); 结果:jackmiketomString s = list.stream().map(Person::getName).collect(joining(",")); 结果:jack,mike,tomjoining 还有一个比较特别的重载方法:String s = list.stream().map(Person::getName).collect(joining(" and ", "Today ", " play games.")); 结果:Today jack and mike and tom play games.即 Today 放开头,play games. 放结尾,and 在中间连接各个字符串5. groupingBy 分组groupingBy 用于将数据分组,最终返回一个 Map 类型Map<Integer, List<Person>> map = list.stream().collect(groupingBy(Person::getAge));例子中我们按照年龄 age 分组,每一个 Person 对象中年龄相同的归为一组另外可以看出,Person::getAge 决定 Map 的键(Integer 类型),list 类型决定 Map 的值(List类型)多级分组groupingBy 可以接受一个第二参数实现多级分组:Map<Integer, Map<T, List<Person>>> map = list.stream().collect(groupingBy(Person::getAge, groupingBy(...)));其中返回的 Map 键为 Integer 类型,值为 Map<t, list按组收集数据Map<Integer, Integer> map = list.stream().collect(groupingBy(Person::getAge, summingInt(Person::getAge)));该例子中,我们通过年龄进行分组,然后 summingInt(Person::getAge)) 分别计算每一组的年龄总和(Integer),最终返回一个 Map根据这个方法,我们可以知道,前面我们写的:groupingBy(Person::getAge)其实等同于:groupingBy(Person::getAge, toList())6. partitioningBy 分区分区与分组的区别在于,分区是按照 true 和 false 来分的,因此partitioningBy 接受的参数的 lambda 也是 T -> boolean根据年龄是否小于等于20来分区 Map<Boolean, List<Person>> map = list.stream() .collect(partitioningBy(p -> p.getAge() <= 20)); 打印输出 { false=[Person{name='mike', age=25}, Person{name='tom', age=30}], true=[Person{name='jack', age=20}] }同样地 partitioningBy 也可以添加一个收集器作为第二参数,进行类似 groupBy 的多重分区等等操作。五. 并行我们通过 list.stream() 将 List 类型转换为流类型,我们还可以通过 list.parallelStream() 转换为并行流。因此你通常可以使用 parallelStream 来代替 stream 方法并行流就是把内容分成多个数据块,使用不同的线程分别处理每个数据块的流。这也是流的一大特点,要知道,在 Java 7 之前,并行处理数据集合是非常麻烦的,你得自己去将数据分割开,自己去分配线程,必要时还要确保同步避免竞争。Stream 让程序员能够比较轻易地实现对数据集合的并行处理,但要注意的是,不是所有情况的适合,有些时候并行甚至比顺序进行效率更低,而有时候因为线程安全问题,还可能导致数据的处理错误,这些我会在下一篇文章中讲解。比方说下面这个例子int i = Stream.iterate(1, a -> a + 1).limit(100).parallel().reduce(0, Integer::sum);我们通过这样一行代码来计算 1 到 100 的所有数的和,我们使用了 parallel 来实现并行。但实际上是,这样的计算,效率是非常低的,比不使用并行还低!一方面是因为装箱问题,这个前面也提到过,就不再赘述,还有一方面就是 iterate 方法很难把这些数分成多个独立块来并行执行,因此无形之中降低了效率。流的可分解性这就说到流的可分解性问题了,使用并行的时候,我们要注意流背后的数据结构是否易于分解。比如众所周知的 ArrayList 和 LinkedList,明显前者在分解方面占优。我们来看看一些数据源的可分解性情况数据源可分解性ArrayList极佳LinkedList差IntStream.range极佳Stream.iterate差HashSet好TreeSet好顺序性。除了可分解性,和刚刚提到的装箱问题,还有一点值得注意的是一些操作本身在并行流上的性能就比顺序流要差,比如:limit,findFirst,因为这两个方法会考虑元素的顺序性,而并行本身就是违背顺序性的,也是因为如此 findAny 一般比 findFirst 的效率要高。六. 效率最后再来谈谈效率问题,很多人可能听说过有关 Stream 效率低下的问题。其实,对于一些简单的操作,比如单纯的遍历,查找最值等等,Stream 的性能的确会低于传统的循环或者迭代器实现,甚至会低很多。但是对于复杂的操作,比如一些复杂的对象归约,Stream 的性能是可以和手动实现的性能匹敌的,在某些情况下使用并行流,效率可能还远超手动实现。好钢用在刀刃上,在适合的场景下使用,才能发挥其最大的用处。函数式接口的出现主要是为了提高编码开发效率以及增强代码可读性;与此同时,在实际的开发中,并非总是要求非常高的性能,因此 Stream 与 lambda 的出现意义还是非常大的。
Java 8 发布至今也已经好几年过去,如今 Java 也已经向 11 迈去,但是 Java 8 作出的改变可以说是革命性的,影响足够深远,学习 Java 8 应该是 Java 开发者的必修课。今天给大家带来 Java 8 Stream 讲解,为什么直接讲这个,是因为只要你学完,立刻就能上手,并能让它在你的代码中大展身手。值得注意的是:学习 Stream 之前必须先学习 lambda 的相关知识。本文也假设读者已经掌握 lambda 的相关知识。本篇文章主要内容:介绍 Stream 以及 Stream 是如何处理集合的介绍 Stream 与集合的关系与区别Stream 的基本方法介绍一. 什么是 StreamStream 中文称为 “流”,通过将集合转换为这么一种叫做 “流” 的元素序列,通过声明性方式,能够对集合中的每个元素进行一系列并行或串行的流水线操作。换句话说,你只需要告诉流你的要求,流便会在背后自行根据要求对元素进行处理,而你只需要 “坐享其成”。二. 流操作整个流操作就是一条流水线,将元素放在流水线上一个个地进行处理。其中数据源便是原始集合,然后将如 List的集合转换为 Stream类型的流,并对流进行一系列的中间操作,比如过滤保留部分元素、对元素进行排序、类型转换等;最后再进行一个终端操作,可以把 Stream 转换回集合类型,也可以直接对其中的各个元素进行处理,比如打印、比如计算总数、计算最大值等等很重要的一点是,很多流操作本身就会返回一个流,所以多个操作可以直接连接起来,我们来看看一条 Stream 操作的代码:如果是以前,进行这么一系列操作,你需要做个迭代器或者 foreach 循环,然后遍历,一步步地亲力亲为地去完成这些操作;但是如果使用流,你便可以直接声明式地下指令,流会帮你完成这些操作。有没有想到什么类似的?是的,就像 SQL 语句一样, select username from user where id = 1,你只要说明:“我需要 id 是 1 (id = 1)的用户(user)的用户名(username )”,那么就可以得到自己想要的数据,而不需要自己亲自去数据库里面循环遍历查找。三. 流与集合什么时候计算Stream 和集合的其中一个差异在于什么时候进行计算。一个集合,它会包含当前数据结构中所有的值,你可以随时增删,但是集合里面的元素毫无疑问地都是已经计算好了的。流则是按需计算,按照使用者的需要计算数据,你可以想象我们通过搜索引擎进行搜索,搜索出来的条目并不是全部呈现出来的,而且先显示最符合的前 10 条或者前 20 条,只有在点击 “下一页” 的时候,才会再输出新的 10 条。再比方在线观看电影和你硬盘里面的电影,也是差不多的道理。外部迭代和内部迭代Stream 和集合的另一个差异在于迭代。我们可以把集合比作一个工厂的仓库,一开始工厂比较落后,要对货物作什么修改,只能工人亲自走进仓库对货物进行处理,有时候还要将处理后的货物放到一个新的仓库里面。在这个时期,我们需要亲自去做迭代,一个个地找到需要的货物,并进行处理,这叫做外部迭代。后来工厂发展了起来,配备了流水线作业,只要根据需求设计出相应的流水线,然后工人只要把货物放到流水线上,就可以等着接收成果了,而且流水线还可以根据要求直接把货物输送到相应的仓库。这就叫做内部迭代,流水线已经帮你把迭代给完成了,你只需要说要干什么就可以了(即设计出合理的流水线)。Java 8 引入 Stream 很大程度是因为,流的内部迭代可以自动选择一种合适你硬件的数据表示和并行实现;而以往程序员自己进行 foreach 之类的时候,则需要自己去管理并行等问题。一次性的流流和迭代器类似,只能迭代一次。Stream<String> stream = list.stream().map(Person::getName).sorted().limit(10); List<String> newList = stream.collect(toList()); List<String> newList2 = stream.collect(toList());上面代码中第三行会报错,因为第二行已经使用过这个流,这个流已经被消费掉了四. 方法介绍,开始实战首先我们先创建一个 Person 泛型的 ListList<Person> list = new ArrayList<>(); list.add(new Person("jack", 20)); list.add(new Person("mike", 25)); list.add(new Person("tom", 30));Person 类包含年龄和姓名两个成员变量private String name; private int age;1. stream() / parallelStream()最常用到的方法,将集合转换为流List list = new ArrayList(); // return Stream<E> list.stream();而 parallelStream() 是并行流方法,能够让数据集执行并行操作,后面会更详细地讲解2. filter(T -> boolean)保留 boolean 为 true 的元素保留年龄为 20 的 person 元素 list = list.stream() .filter(person -> person.getAge() == 20) .collect(toList()); 打印输出 [Person{name='jack', age=20}]collect(toList()) 可以把流转换为 List 类型,这个以后会讲解3. distinct()去除重复元素,这个方法是通过类的 equals 方法来判断两个元素是否相等的如例子中的 Person 类,需要先定义好 equals 方法,不然类似[Person{name='jack', age=20}, Person{name='jack', age=20}] 这样的情况是不会处理的4. sorted() / sorted((T, T) -> int)如果流中的元素的类实现了 Comparable 接口,即有自己的排序规则,那么可以直接调用 sorted() 方法对元素进行排序,如 Stream反之, 需要调用 sorted((T, T) -> int) 实现 Comparator 接口根据年龄大小来比较: list = list.stream() .sorted((p1, p2) -> p1.getAge() - p2.getAge()) .collect(toList());当然这个可以简化为list = list.stream() .sorted(Comparator.comparingInt(Person::getAge)) .collect(toList());5. limit(long n)返回前 n 个元素list = list.stream() .limit(2) .collect(toList()); 打印输出 [Person{name='jack', age=20}, Person{name='mike', age=25}]6. skip(long n)去除前 n 个元素list = list.stream() .skip(2) .collect(toList()); 打印输出 [Person{name='tom', age=30}]tips:用在 limit(n) 前面时,先去除前 m 个元素再返回剩余元素的前 n 个元素limit(n) 用在 skip(m) 前面时,先返回前 n 个元素再在剩余的 n 个元素中去除 m 个元素list = list.stream() .limit(2) .skip(1) .collect(toList()); 打印输出 [Person{name='mike', age=25}]7. map(T -> R)将流中的每一个元素 T 映射为 R(类似类型转换)List<String> newlist = list.stream().map(Person::getName).collect(toList());newlist 里面的元素为 list 中每一个 Person 对象的 name 变量8. flatMap(T -> Stream)将流中的每一个元素 T 映射为一个流,再把每一个流连接成为一个流List<String> list = new ArrayList<>(); list.add("aaa bbb ccc"); list.add("ddd eee fff"); list.add("ggg hhh iii"); list = list.stream().map(s -> s.split(" ")). flatMap(Arrays::stream).collect(toList());上面例子中,我们的目的是把 List 中每个字符串元素以" "分割开,变成一个新的 List。首先 map 方法分割每个字符串元素,但此时流的类型为 Stream,因为 split 方法返回的是 String[ ] 类型;所以我们需要使用 flatMap 方法,先使用Arrays::stream将每个 String[ ] 元素变成一个 Stream流,然后 flatMap 会将每一个流连接成为一个流,最终返回我们需要的 Stream9. anyMatch(T -> boolean)流中是否有一个元素匹配给定的 T -> boolean 条件是否存在一个 person 对象的 age 等于 20: boolean b = list.stream().anyMatch(person -> person.getAge() == 20);10. allMatch(T -> boolean)流中是否所有元素都匹配给定的 T -> boolean 条件11. noneMatch(T -> boolean)流中是否没有元素匹配给定的 T -> boolean 条件12. findAny() 和 findFirst()findAny():找到其中一个元素 (使用 stream() 时找到的是第一个元素;使用 parallelStream() 并行时找到的是其中一个元素)findFirst():找到第一个元素值得注意的是,这两个方法返回的是一个 Optional对象,它是一个容器类,能代表一个值存在或不存在,这个后面会讲到13. reduce((T, T) -> T) 和 reduce(T, (T, T) -> T)用于组合流中的元素,如求和,求积,求最大值等计算年龄总和: int sum = list.stream().map(Person::getAge).reduce(0, (a, b) -> a + b); 与之相同: int sum = list.stream().map(Person::getAge).reduce(0, Integer::sum);其中,reduce 第一个参数 0 代表起始值为 0,lambda (a, b) -> a + b 即将两值相加产生一个新值。同样地:计算年龄总乘积: int sum = list.stream().map(Person::getAge).reduce(1, (a, b) -> a * b);当然也可以Optional<Integer> sum = list.stream().map(Person::getAge).reduce(Integer::sum);即不接受任何起始值,但因为没有初始值,需要考虑结果可能不存在的情况,因此返回的是 Optional 类型。13. count()返回流中元素个数,结果为 long 类型14. collect()收集方法,我们很常用的是 collect(toList()),当然还有 collect(toSet()) 等,参数是一个收集器接口,这个后面会另外讲。15. forEach()返回结果为 void,很明显我们可以通过它来干什么了,比方说:### 16. unordered()还有这个比较不起眼的方法, #返回一个等效的无序流,当然如果流本身就是无序的话,那可能就会直接返回其本身 打印各个元素: list.stream().forEach(System.out::println);再比如说 MyBatis 里面访问数据库的 mapper 方法:向数据库插入新元素: list.stream().forEach(PersonMapper::insertPerson);推荐大而全的【后端技术精选】
2022年04月
2022年02月