Lambda
这个表达式 其实就是一个新的语法糖,这里Java8主要是对语法做了简化,让我们java的代码更加的简洁
Lambda可以总在哪里呢?
函数式接口
只实现了一个方法的接口,我们就叫函数式接口,这个时候可能会有java的警报
@FunctionalInterface
有这个注解,java就会知道哦 你这个是函数式接口,就不会有警报了
简单的Lambda实战
我们就拿多线程中的 Runnable
接口来做例子
@Test public void test() { new Thread(new Runnable() { @Override public void run() { System.out.println("你好我是传统线程"); } }).start(); new Thread(() -> { System.out.println("你好我是Lambda的第一个线程"); }).start(); }
我们可以根据上边语句的变化来看出 语法的简洁
() = 代表的是我们的参数列表,Lambda表达式的参数和我们调用方法参数必须一致 -> 尖头标识符 代表我们要使用Lambda {} 方法体,这里是我们使用表达式的具体操作,也可以用方法引用的方式,用其他包装好点类的方法来做处理
编写一个自己的函数式接口,并且练习
@FunctionalInterface public interface MyinterFace { void method(); } class test { void dosth(MyinterFace myinterFace) { System.out.print("Function A "); myinterFace.method(); } public static void main(String[] args) { //这里我们使用自己的函数式接口 输出语句 test test = new test(); test.dosth(() -> { System.out.print(" do sth"); }); } }
可以看到我们用自己的函数式接口作为参数 调用函数方法的 dosth,这个时候我们可以用Lambda表达式来实现我们这个接口里的步骤,这里我们以输出 do sth 为操作。
问题处理
这里时候我们有两个方法,一个使用了 myinterfaceA 一个使用率 myinterFace B 这个时候我们 Lambda表达式没办法去识别,需要我们显示的声明用谁的
@FunctionalInterface interface MyinterFaceA { void method(); } @FunctionalInterface interface MyinterFaceB { void method(); } class test { void dosth(MyinterFaceA myinterFace) { System.out.print("Function A "); myinterFace.method(); } void dosth(MyinterFaceB myinterFace) { System.out.print("Function A "); myinterFace.method(); } public static void main(String[] args) { //这里我们使用自己的函数式接口 输出语句 test test = new test(); test.dosth((MyinterFaceA) () -> { System.out.print(" do sth"); }); } }
常用的java函数
提供者接口 : Supplier 没有输入只有输出
消费者接口 : Consumer 没有出只有输入
函数接口 : Function 放入一个对象返回一个新对象
- UnaryOperator 对于 放入和输出类型一致时候的函数借口
- BiFunction接口: 输入两个对象,返回一个新对象
Coding
/** * @projectName: Webflux_demo * @package: Lambda * @className: JdkFunctionDmo * @author: 冷环渊 doomwatcher * @description: TODO * @date: 2021/12/13 19:04 * @version: 1.0 */ public class JdkFunctionDmo { public static void main(String[] args) { // Supplier 没有输入 只有输出 Supplier<String> supplier = () -> "我是一个 Supplier 方法"; System.out.println(supplier.get()); //Consummer 只有输入 没有输出 Consumer<String> con = i -> System.out.println("我是一个 Conusmer Demo" + i); con.accept(" hello Consumer i am 冷环渊"); //function 放入一个对象生成一个新的对象 Function<Integer, Integer> func = i -> i * i; Integer apply = func.apply(9); System.out.println("Function demo out:" + apply); //对于 放入和输出类型一致的哦我们 Function接口里有一个实现 UnaryOperator UnaryOperator<Integer> unaryOperator = i -> i * i - i; System.out.println("Function demo out:" + unaryOperator.apply(9)); //输入两个对象 返回一个新的对象 BiFunction BiFunction<Integer, Integer, String> biFunction = (i, e) -> i * e + "元"; System.out.println("我一共该交给你多少钱:" + biFunction.apply(40, 80)); } }
到这里我们 Lambda表达式的快速认识就结束了,接下来是Java8的另一个特性,流式编程
Stream
我们通过演示的代码来带入 Stream api 的变成 以及我们做一个小练习
coding
/** * @projectName: Webflux_demo * @package: Stream * @className: StreamAPITest * @author: 冷环渊 doomwatcher * @description: TODO * @date: 2021/12/13 20:09 * @version: 1.0 */ public class StreamAPITest { public static void main(String[] args) { String[] strarr = {"bo_le", "", "webfulx", "redis", "spring", "mirc_Sercice"}; // 数组 arr 创建 Stream //Arrays.stream(strarr).forEach(System.out::println); //2.list //Arrays.asList(strarr).stream().forEach(System.out::println); //3.stream.of() //Stream.of(strarr).forEach(System.out::println); // 4.迭代器 打印 1-10 元素 //Stream.iterate(1, i -> i + 1).limit(10).forEach(System.out::println); // 5. generate 打印随机数 10以内的随机数 //Stream.generate(() -> new Random().nextInt(10)).limit(10).forEach(System.out::println); /* * 现实中的流 变成 完整案例 * 元素的中间操作,元素的终止操作 * 结果依次 输出 abceo * * 结果 一次输出 belo * bo_le --> bole ->字符转换成一个新的流(b o l e)-> sorted->(belo); * * PS: 注意事项 在流编程中 终止操作只能有一个,中间操作可以有 0-n个 * */ String[] arr = {"react", "", "spring", "bo_le", "bo_le"}; Stream.of(arr) .filter(i -> !i.isEmpty()) .distinct() .sorted() .limit(1) .map(i -> i.replace("_", "")) .flatMap(i -> Stream.of(i.split(""))) .sorted() .forEach(System.out::println); } }
Reactor Project
官网地址 : https://spring.io/reactive
简介
Reacive 异步非阻塞响应式框架 特点: 低延迟,高吞吐
,以下简介均来自spring官方文档。
反应式系统具有一些特性,使其成为低延迟、高吞吐量工作负载的理想选择。Project Reactor 和 Spring 产品组合协同工作,使开发人员能够构建具有响应性、弹性、弹性和消息驱动的企业级反应式系统。
响应式系统和传统的同步阻塞调用模型
- 传统的模型 ,client 不管有多少信息都会一次性发给server,这个时候如果Server性能够,可以能会造成大量的客户端请求无法响应,之后就会拒绝请求和请求失败
- 而响应式的模型有一个东西叫做 背压,需要数据,可以通过背压去控制数量,这样就不会让大量的数据冲垮我们的服务器
什么是响应式?
响应式处理是一种范例,它使开发人员能够构建可以处理背压(流控制)的非阻塞、异步应用程序。
为什么需要响应式
反应式系统更好地利用现代处理器。此外,在反应式编程中包含背压可确保解耦组件之间具有更好的弹性。
有关响应式系统特质的论文
论文地址:https://www.reactivemanifesto.org/zh-CN
Reactor 核心库
Project Reactor 是一个完全无阻塞的基础,包括背压支持。它是 Spring 生态系统中响应式堆栈的基础,并在 Spring WebFlux、Spring Data 和 Spring Cloud Gateway 等项目中具有特色。
与springBoot整合
Spring 产品组合提供了两个并行堆栈。一种是基于带有 Spring MVC 和 Spring Data 构造的 Servlet API。另一个是利用 Spring WebFlux 和 Spring Data 的反应式存储库的完全反应式堆栈。在这两种情况下,Spring Security 都为您提供了对这两个堆栈的本机支持。
可以看到,响应式的技术栈,和我们熟悉的MVC那一套不一样,这里我们的技术基本是换了一套,还没有很好的第三方框架的兼容性
响应式技术组建的关系
我们之后的demo Coding也会跟着从里到外的API 来学习
- ReativeStream
我们来看一下,响应式的流程
订阅者来决定可以接受多少数据,生产者根据背压的规则来传递,这样就不会出现像传统架构一样的问题
下图:就是我们的响应流的运行模型
ReactiveStream(JDK9)编程
coding
ReactiveStream helloworld
- 我们需要 发布者,订阅者,两者绑定,发送消息,关闭流
/** * @projectName: Webflux_demo * @package: reactiveStream * @className: ReactiveStreamDemo * @author: 冷环渊 doomwatcher * @description: TODO * @date: 2021/12/14 0:24 * @version: 1.0 */ public class ReactiveStreamDemo { public static void main(String[] args) { // 1.创建一个 发布者 SubmissionPublisher publisher = new SubmissionPublisher(); // 2.创建一个订阅者 Flow.Subscriber subscriber = new Flow.Subscriber() { Flow.Subscription subscription; @Override public void onSubscribe(Flow.Subscription subscription) { this.subscription = subscription; System.out.println("创建订阅关系 "); subscription.request(1); //第一次需要发送一个 之后的都不需要了 } @Override public void onNext(Object item) { System.out.println("接收数据:" + item); //接收数据 业务处理 subscription.request(10); } @Override public void onError(Throwable throwable) { System.out.println("发生错误了"); } @Override public void onComplete() { System.out.println("数据发送完成了"); } }; // 3 建立订阅者 publisher.subscribe(subscriber); for (int i = 0; i < 100; i++) { // 4 发送数据 publisher.submit("第" + i + "条hello reactive stream"); } publisher.close(); try { Thread.currentThread().join(); } catch (InterruptedException e) { e.printStackTrace(); } } }
这里我们需要编写一个 Processor 来当做中间处理数据的
我们的发布者先发给Processor之后由Processor发给订阅者,
/** * @projectName: Webflux_demo * @package: reactiveStream * @className: ReactiveProcessor * @author: 冷环渊 doomwatcher * @description: TODO * @date: 2021/12/14 0:25 * @version: 1.0 */ public class ReactiveProcessor extends SubmissionPublisher<String> implements Flow.Processor<String, String> { private Flow.Subscription subscription; @Override public void onSubscribe(Flow.Subscription subscription) { this.subscription = subscription; System.out.println("Processor建立订阅关系"); subscription.request(1); } @Override public void onNext(String item) { System.out.println("Processor接收数据:" + item); //中间处理 //数据发给最终订阅者 this.submit(item.toUpperCase()); //背压的实现核心 this.subscription.request(1); } @Override public void onError(Throwable throwable) { System.out.println("出现错误了"); } @Override public void onComplete() { System.out.println("数据传输成功"); } }
编写有中间处理器 Processor的demo
/** * @projectName: Webflux_demo * @package: reactiveStream * @className: ReactiveStreamDemo2 * @author: 冷环渊 doomwatcher * @description: TODO * @date: 2021/12/14 0:25 * @version: 1.0 */ public class ReactiveStreamDemo2 { public static void main(String[] args) { // 1.创建一个 发布者 SubmissionPublisher publisher = new SubmissionPublisher(); // 2.创建一个 Processor ReactiveProcessor processor = new ReactiveProcessor(); // 3 发布者将消息给processor来做处理之后转发到最终订阅者 publisher.subscribe(processor); // 4.创建一个最终订阅者 Flow.Subscriber subscriber = new Flow.Subscriber() { Flow.Subscription subscription; @Override public void onSubscribe(Flow.Subscription subscription) { this.subscription = subscription; System.out.println("创建订阅关系 "); subscription.request(1); //第一次需要发送一个 之后的都不需要了 } @Override public void onNext(Object item) { System.out.println("接收数据:" + item); //接收数据 业务处理 subscription.request(10); } @Override public void onError(Throwable throwable) { System.out.println("发生错误了"); } @Override public void onComplete() { System.out.println("数据发送完成了"); } }; processor.subscribe(subscriber); for (int i = 0; i < 100; i++) { System.out.println("发布数据" + i); // 4 发送数据 publisher.submit("第" + i + "条hello reactive stream"); } publisher.close(); try { Thread.currentThread().join(10000); } catch (InterruptedException e) { e.printStackTrace(); } } }
到这里我们基于ReactiveStream的小练习demo就到这里了
Reactor Project(spring)
Flux And Mono 他们都是 Publisher
Flux 0-N 项的异步序列 代表0-多个
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-QcWlgvn2-1648923751095)(https://gitee.com/cold-abyss_admin/my-image-host/raw/master/ img /flux.svg)]
AFlux<T>
是一个标准Publisher<T>
,表示 0 到 N 个发出的项目的异步序列,可选地由完成信号或错误终止。如无流规范,这三种类型的信号转换为呼叫到下游用户的onNext
,onComplete
和onError
方法。
具有这种大范围的可能信号,Flux
是通用的反应型。请注意,所有事件,即使是终止事件,都是可选的:没有onNext
事件但 onComplete
事件代表一个空的有限序列,但是删除onComplete
并且您有一个无限的空序列(不是特别有用,除了围绕取消的测试)。同样,无限序列也不一定是空的。
Mono
, 异步 0-1 结果 要么有一个 要么没有
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-h5VSKfJy-1648923751096)(https://gitee.com/cold-abyss_admin/my-image-host/raw/master/ img /mono.svg)]
AMono<T>
是一种特殊的Publisher<T>
,它通过onNext
信号最多发出一个项目, 然后以一个onComplete
信号(成功Mono
,有或没有值)终止,或者只发出一个onError
信号(失败Mono
)。
可以使用 aMono
来表示只有完成概念的无值异步进程(类似于 a Runnable
)一个空的 Mono<Void>
.
Reactor Coding
Coding之前 我们先把Reactor 需要的Mavern依赖 导入到maven 环境里
<dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-core</artifactId> <version>3.4.6</version> </dependency>
Mono
/** * @author 冷环渊 Doomwatcher * @context: 这里是 Mono 创建 0-1个元素序列的测试方法 * @date: 2021/12/14 15:01 * @param * @return: void */ @Test public void MonoTset() { // 1. Mono 的创建方式 /* *创建 空的 Mono 对象 输出 “” * public final Disposable subscribe(Consumer<? super T> consumer) { * Objects.requireNonNull(consumer, "consumer"); * return this.subscribe(consumer, (Consumer)null, (Runnable)null); * } * 从源码看出 我们的 subsrcibe参数是 Consumer,也就是说只进 不出 * */ Mono.empty().subscribe(System.out::println); /* *创建一个 Mono 输出内容就是我们just()参数的内容 * public static <T> Mono<T> just(T data) { * return onAssembly(new MonoJust(data)); * } * */ Mono.just("我的今天就结束 webflux 的学习了 hello Mono").subscribe(System.out::println); }
Flux
/** * @author 冷环渊 Doomwatcher * @context: 这里是 flux 创建多个 0-n个元素序列 测试方法 * @date: 2021/12/14 15:01 * @param * @return: void */ @Test public void FluxTset() { // 创建一个Flux Flux.just(1, 2, 3, 4, 5, 6).subscribe(System.out::print); System.out.println(); //创建多个 集合的形式 Flux.fromIterable(Arrays.asList("a1", "b1", "c1", "d1")).subscribe(System.out::print); System.out.println(); //创建多个 数组的形式 Flux.fromArray(new String[]{"a1", "b1", "c1", "d1", "e1"}).subscribe(System.out::print); System.out.println(); //基于流创建 Flux.fromStream(Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)).subscribe(System.out::print); //通过饭未创建 System.out.println(); Flux.range(1, 100).subscribe(System.out::println); /* * 小案例 * Flux.generate这里我们以两个参数为例子 * 2的乘法口诀 * 2*0 = 0 * 2*1=1 * 2*2 = 4 * */ Flux.generate(() -> 0, (i, sink) -> { sink.next("2*" + i + "=" + 2 * i); if (i == 9) { sink.complete(); } return i + 1; }).subscribe(System.out::println); }
响应式编程需求实战
需求 我们这个需求的案例
给一定随机英文字符串,要求以26个字母的顺序输出排列
- 不能用循循环
- 不要以暴力的方式
解题思路
这里我们写了两种 解题目的方法,一个是基于StreamAPI 一个是基于ReactorAPI
- 思路是这个样子的,创建出一个去掉空格获得的字符数组,之后去重排序即可
/** * @author 冷环渊 Doomwatcher * @context: 响应式变成小练习 * 给一定随机英文字符串,要求以26个字母的顺序输出排列 * 小冷没看视频 用Stream流api 编写的 * @date: 2021/12/14 16:44 * @param * @return: void */ @Test public void StreamDemoTest() { String[] arr = new String[]{"hello", "guys", "i", "prizev", "abc"}; List<String> list = Arrays.asList(arr); list.stream() .filter(i -> !i.isEmpty()) .flatMap(i -> Stream.of(i.split(""))) .distinct() .sorted() .forEach(System.out::print); } /** * @author 冷环渊 Doomwatcher * @context: 这个是根据视频 用 reactor flux api 编写的 * @date: 2021/12/14 16:54 * @param * @return: void */ @Test public void VedioReactorTest() { String str = "hello guys i am bole welcome to normal school jdk quick fox prizev "; Flux.fromArray(str.split(" ")) .flatMap(i -> Flux.fromArray(i.split(""))) .distinct() .sort() .subscribe(System.out::print); }
WebFlux 响应式框架
Spring WebFlux
Spring Framework 中包含的原始 Web 框架 Spring Web MVC 是专门为 Servlet API 和 Servlet 容器构建的。响应式堆栈 Web 框架 Spring WebFlux 是在 5.0 版本中添加的。它是完全非阻塞的,支持 Reactive Streams背压,并在 Netty、Undertow 和 Servlet 3.1+ 容器等服务器上运行。
这两个 Web 框架都反映了它们的源模块(spring-webmvc和 spring-webflux)的名称,并在 Spring 框架中并排共存。每个模块都是可选的。应用程序可以使用一个或另一个模块,或者在某些情况下,两者都使用——例如,带有响应式WebClient
.
为什么我们需要Webflux
1.我们需要少量的线程来支持更多的处理。Servlet 3.1 确实为非阻塞 I/O 提供了 API。然而,使用它会远离 Servlet API 的其余部分,其中契约是同步 ( Filter
, Servlet
) 或阻塞 ( getParameter
, getPart
)。这就是将新的通用 API 用作任何非阻塞运行时的基础的动机。这很重要,因为服务器(例如 Netty)在异步、非阻塞空间中建立良好。
2 是函数式编程。就像 Java 5 中添加注释创造了机会(例如带注释的 REST 控制器或单元测试)一样,Java 8 中添加的 lambda 表达式为 Java 中的函数式 API 创造了机会。这对于允许异步逻辑的声明式组合的非阻塞应用程序和延续式 API(由CompletableFuture
和ReactiveX推广)是一个福音。在编程模型级别,Java 8 使 Spring WebFlux 能够提供功能性 Web 端点以及带注释的控制器。
Spring MVC和spring webflux 的技术场景使用图
Webflux的核心库就是我们的 Reactor API 与MVC区别所在
- 接收但是 Publisher 返回的是 Mono/Flux
- 同时支持注解和函数式编程两种模式
spring-web
模块包含以下对反应式 Web 应用程序的基础支持:
- 对于服务器请求处理,有两个级别的支持。
- HttpHandler:HTTP 请求处理的基本契约,具有非阻塞 I/O 和反应流背压,以及用于 Reactor Netty、Undertow、Tomcat、Jetty 和任何 Servlet 3.1+ 容器的适配器。
WebHandler
API:用于请求处理的稍高级别的通用 Web API,在其之上构建了具体的编程模型,例如带注释的控制器和功能端点。
- 对于客户端,有一个基本
ClientHttpConnector
合同来执行带有非阻塞 I/O 和响应式流背压的 HTTP 请求,以及用于Reactor Netty、响应式 Jetty HttpClient 和Apache HttpComponents 的适配器 。应用程序中使用的更高级别的WebClient建立在这个基本契约之上。 - 对于客户端和服务器,用于 HTTP 请求和响应内容的序列化和反序列化的编解码器。
理论就到这里,我们来上手实操吧!
WebFlux Coding
编写controller 注解 hello world
/** * @projectName: webflux * @package: com.hyc.webflux.Controller * @className: ReactorController * @author: 冷环渊 doomwatcher * @description: TODO * @date: 2021/12/14 19:27 * @version: 1.0 */ @RestController @RequestMapping("/annotated") public class ReactorController { @GetMapping("/greeting") public Mono<String> greeting() { return Mono.just(" hello webflux by annotated"); } }
. ____ _ __ _ _ /\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \ ( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \ \\/ ___)| |_)| | | | | || (_| | ) ) ) ) ' |____| .__|_| |_|_| |_\__, | / / / / =========|_|==============|___/=/_/_/_/ :: Spring Boot :: (v2.4.2) 2021-12-14 19:35:53.017 INFO 15172 --- [ main] com.hyc.webflux.WebfluxApplication : Starting WebfluxApplication using Java 11.0.2 on DESKTOP-OG41IMR with PID 15172 (D:\JavaEngineer\Spirng5Webflux\webflux\target\classes started by doomwstcher in D:\JavaEngineer\Spirng5Webflux\webflux) 2021-12-14 19:35:53.022 INFO 15172 --- [ main] com.hyc.webflux.WebfluxApplication : No active profile set, falling back to default profiles: default 2021-12-14 19:35:54.094 INFO 15172 --- [ main] o.s.b.web.embedded.netty.NettyWebServer : Netty started on port 8080 2021-12-14 19:35:54.104 INFO 15172 --- [ main] com.hyc.webflux.WebfluxApplication : Started WebfluxApplication in 1.501 seconds (JVM running for 2.712)
这里我们查看
这就是我们注解版本的helloworld
函数式 hello world
//函数式 @Bean public RouterFunction<ServerResponse> routers() { return RouterFunctions.route().GET("/func/greeting", serverRequest -> ok().bodyValue("hello webflux by function")).build(); }
结语
这篇文章主要是帮助 想要了解 spring 最新技术特性的小伙伴进行一个 简单的入门,
想要了解更多,可以通过文档,视频等继续深入学习,工程师的路上 学无止境