1、什么是 Stream?
1.1、Stream的定义?
- 在 Java8 之前,我们通常是通过 for 循环或者 Iterator 迭代来重新排序合并数据,又或者通过重新定义 Collections.sorts 的 Comparator 方法来实现,这两种方式对于大数据量系统来说,效率并不是很理想。
- Java8 中添加了一个新的接口类 Stream,他和我们之前接触的字节流概念不太一样,Java8 集合中的 Stream 相当于高级版的 Iterator,他通过 Lambda 表达式对集合进行各种非常便利、高效的聚合操作(Aggregate Operation),或者大批量数据操作 (Bulk Data Operation)。
1.2、Stream原理
- 将要处理的元素看做一种流,流在管道中传输,并且可以在管道的节点上处理,包括过滤筛选、去重、排序、聚合等。元素流在管道中进过中间操作的处理,最后由最终操作得到前面处理的结果。
- 例子1:需求是过滤分组一所中学里身高在 160cm 以上的男女同学
- 方法1:使用传统的迭代方式来实现
Map<String, List<Student>> stuMap = new HashMap<String, List<Student>>(); for (Student stu: studentsList) { if (stu.getHeight() > 160) { // 如果身高大于 160 if (stuMap.get(stu.getSex()) == null) { // 该性别还没分类 List<Student> list = new ArrayList<Student>(); // 新建该性别学生的列表 list.add(stu);// 将学生放进去列表 stuMap.put(stu.getSex(), list);// 将列表放到 map 中 } else { // 该性别分类已存在 stuMap.get(stu.getSex()).add(stu);// 该性别分类已存在,则直接放进去即可 } } }
- 方法2:Java8 中的 Stream API 进行实现
- stream() - 为集合创建串行流
Map<String, List<Student>> stuMap = stuList.stream() .filter((Student s) -> s.getHeight() > 160) .collect(Collectors.groupingBy(Student ::getSex));
- parallelStream() - 为集合创建并行流
Map<String, List<Student>> stuMap = stuList.parallelStream().filter((Student s) -> s.getHeight() > 160) .collect(Collectors.groupingBy(Student ::getSex));
2、函数式编程跟面向过程编程的区别?
- 函数是无状态的。何为无状态?简单点讲就是,函数内部涉及的变量都是局部变量,不会像面向对象编程那样,共享类成员变量,也不会像面向过程编程那样,共享全局变量。函数的执行结果只与入参有关,跟其他任何外部变量无关。同样的入参,不管怎么执行,得到的结果都是一样的。
// 有状态函数: 执行结果依赖b的值是多少,即便入参相同,多次执行函数,函数的返回值有可能不同,因为b值有可能不同。 int b; int increase(int a) { return a + b; } // 无状态函数:执行结果不依赖任何外部变量值,只要入参相同,不管执行多少次,函数的返回值就相同 int increase(int a, int b) { return a + b; }
不同的编程范式之间并不是截然不同的,总是有一些相同的编程规则。
- 比如,不管是面向过程、面向对象还是函数式编程,它们都有变量、函数的概念,最顶层都要有 main 函数执行入口,来组装编程单元(类、函数等)。只不过,面向对象的编程单元是类或对象,面向过程的编程单元是函数,函数式编程的编程单元是无状态函数。
实际上,Lambda 表达式在 Java 中只是一个语法糖而已,底层是基于函数接口来实现的,
- 函数接口,是指内部只有一个抽象方法的接口
public class FPDemo { public static void main(String[] args) { Optional<Integer> result = Stream.of("f", "ba", "hello") // of返回Stream<String>对象 .map(s -> s.length()) // map返回Stream<Integer>对象 .filter(l -> l <= 3) // filter返回Stream<Integer>对象 .max((o1, o2) -> o1-o2); // max终止操作:返回Optional<Integer> System.out.println(result.get()); // 输出2 } }
// Stream中map函数的定义: @FunctionalInterface public interface Stream<T> extends BaseStream<T, Stream<T>> { <R> Stream<R> map(Function<? super T, ? extends R> mapper); //...省略其他函数... } // Stream中map的使用方法: @FunctionalInterface Stream.of("fo", "bar", "hello").map(new Function<String, Integer>() { @Override public Integer apply(String s) { return s.length(); } }); // 用Lambda表达式简化后的写法: Stream.of("fo", "bar", "hello").map(s -> s.length());
C 语言支持函数指针,它可以把函数直接当变量来使用。但是,Java 没有函数指针这样的语法。所以,它通过函数接口,将函数包裹在接口中,当作变量来使用。实际上,函数接口就是接口。不过,它也有自己特别的地方,那就是要求只包含一个未实现的方法。因为只有这样,Lambda 表达式才能明确知道匹配的是哪个接口。如果有两个未实现的方法,并且接口入参、返回值都一样,那 Java 在翻译 Lambda 表达式的时候,就不知道表达式对应哪个方法了
@FunctionalInterface public interface Function<T, R> { R apply(T t); // 只有这一个未实现的方法 default <V> Function<V, R> compose(Function<? super V, ? extends T> before) { Objects.requireNonNull(before); return (V v) -> apply(before.apply(v)); } default <V> Function<T, V> andThen(Function<? super R, ? extends V> after) { Objects.requireNonNull(after); return (T t) -> after.apply(apply(t)); } static <T> Function<T, T> identity() { return t -> t; } } @FunctionalInterface public interface Predicate<T> { boolean test(T t); // 只有这一个未实现的方法 default Predicate<T> and(Predicate<? super T> other) { Objects.requireNonNull(other); return (t) -> test(t) && other.test(t); } default Predicate<T> negate() { return (t) -> !test(t); } default Predicate<T> or(Predicate<? super T> other) { Objects.requireNonNull(other); return (t) -> test(t) || other.test(t); } static <T> Predicate<T> isEqual(Object targetRef) { return (null == targetRef) ? Objects::isNull : object -> targetRef.equals(object); } } // 自定义函数接口 @FunctionalInterface public interface ConsumerInterface<T>{ void accept(T t); }
上面代码中的@FunctionalInterface是可选的,但加上该标注编译器会帮你检查接口是否符合函数接口规范。就像加入@Override标注会检查是否重载了函数一样。
ConsumerInterface<String> consumer = str -> System.out.println(str);
进一步的,还可以这样使用:
class MyStream<T> { private List<T> list; ... public void myForEach(ConsumerInterface<T> consumer){/ / 1 for(T t : list){ consumer.accept(t); } } } MyStream<String> stream = new MyStream<String>(); stream.myForEach(str -> System.out.println(str));// 使用自定义函数接口书写Lambda表达式
3、Stream 如何优化遍历?
3.1、Stream 操作分类
- Stream 的操作分为两大类:中间操作(Intermediate operations)和终结操作(Terminal operations)。我们通常还会将中间操作称为懒操作,也正是由这种懒操作结合终结操作、数据源构成的处理管道(Pipeline),实现了 Stream 的高效。
操作类型 | 详情 | 备注 |
中间操作 | 只对操作进行了记录,即只会返回一个流,不会进行计算操作 | 可以分为无状态(Stateless)与有状态(Stateful)操作,前者是指元素的处理不受之前元素的影响,后者是指该操作只有拿到所有元素之后才能继续下去。 |
终结操作 | 终结操作是实现了计算操作 | 可以分为短路(Short-circuiting)与非短路(Unshort-circuiting)操作,前者是指遇到某些符合条件的元素就可以得到最终结果,后者是指必须处理完所有元素才能得到最终结果。 |
- 操作分类详情如下图所示
举例说明:
1、新建一个Student类
@Data public class Student { private Long id; private String name; private int age; private String address; public Student() {} public Student(Long id, String name, int age, String address) { this.id = id; this.name = name; this.age = age; this.address = address; } @Override public String toString() { return "Student{" + "id=" + id + ", name='" + name + '\'' + ", age=" + age + ", address='" + address + '\'' + '}'; } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; Student student = (Student) o; return age == student.age && Objects.equals(id, student.id) && Objects.equals(name, student.name) && Objects.equals(address, student.address); } @Override public int hashCode() { return Objects.hash(id, name, age, address); } }
2、执行Stream操作
public static void main(String [] args) { Student s1 = new Student(1L, "肖战", 15, "浙江"); Student s2 = new Student(2L, "王一博", 15, "湖北"); Student s3 = new Student(3L, "杨紫", 17, "北京"); Student s4 = new Student(4L, "李现", 17, "浙江"); List<Student> students = new ArrayList<>(); students.add(s1); students.add(s2); students.add(s3); students.add(s4); List<Student> streamStudents = testFilter(students); streamStudents.forEach(System.out::println); } /** * 1、集合的筛选 * @param students * @return */ private static List<Student> testFilter(List<Student> students) { //筛选年龄大于15岁的学生 //return students.stream().filter(s -> s.getAge()>15).collect(Collectors.toList()); //筛选住在浙江省的学生 return students.stream().filter(s ->"浙江".equals(s.getAddress())).collect(Collectors.toList()); } /** * 2、集合转换 * @param students * @return */ private static void testMap(List<Student> students) { //在地址前面加上部分信息,只获取地址输出 List<String> addresses = students.stream().map(s ->"住址:"+s.getAddress()).collect(Collectors.toList()); addresses.forEach(a ->System.out.println(a)); } /** * 3、集合去重(基本类型) */ private static void testDistinct1() { //简单字符串的去重 List<String> list = Arrays.asList("111","222","333","111","222"); list.stream().distinct().forEach(System.out::println); } /** * 3、集合去重(引用对象) * 两个重复的“肖战”同学进行了去重,这不仅因为使用了distinct()方法,而且因为Student对象重写了equals和hashCode()方法,否则去重是无效的 */ private static void testDistinct2() { //引用对象的去重,引用对象要实现hashCode和equal方法,否则去重无效 Student s1 = new Student(1L, "肖战", 15, "浙江"); Student s2 = new Student(2L, "王一博", 15, "湖北"); Student s3 = new Student(3L, "杨紫", 17, "北京"); Student s4 = new Student(4L, "李现", 17, "浙江"); Student s5 = new Student(1L, "肖战", 15, "浙江"); List<Student> students = new ArrayList<>(); students.add(s1); students.add(s2); students.add(s3); students.add(s4); students.add(s5); students.stream().distinct().forEach(System.out::println); } /** * 4、集合排序(默认排序) */ private static void testSort1() { List<String> list = Arrays.asList("333","222","111"); list.stream().sorted().forEach(System.out::println); } /** * 4、集合排序(指定排序规则) * 先按照学生的id进行降序排序,再按照年龄进行降序排序 * */ private static void testSort2() { Student s1 = new Student(1L, "肖战", 15, "浙江"); Student s2 = new Student(2L, "王一博", 15, "湖北"); Student s3 = new Student(3L, "杨紫", 17, "北京"); Student s4 = new Student(4L, "李现", 17, "浙江"); List<Student> students = new ArrayList<>(); students.add(s1); students.add(s2); students.add(s3); students.add(s4); students.stream() .sorted((stu1,stu2) ->Long.compare(stu2.getId(), stu1.getId())) .sorted((stu1,stu2) -> Integer.compare(stu2.getAge(),stu1.getAge())) .forEach(System.out::println); } /** * 5、集合limit,返回前几个元素 */ private static void testLimit() { List<String> list = Arrays.asList("333","222","111"); list.stream().limit(2).forEach(System.out::println); } /** * 6、集合skip,删除前n个元素 */ private static void testSkip() { List<String> list = Arrays.asList("333","222","111"); list.stream().skip(2).forEach(System.out::println); } /** * 7、集合reduce,将集合中每个元素聚合成一条数据 */ private static void testReduce() { List<String> list = Arrays.asList("欢","迎","你"); String appendStr = list.stream().reduce("北京",(a,b) -> a+b); System.out.println(appendStr); } /** * 8、求集合中元素的最小值 * 求所有学生中年龄最小的一个 */ private static void testMin() { Student s1 = new Student(1L, "肖战", 14, "浙江"); Student s2 = new Student(2L, "王一博", 15, "湖北"); Student s3 = new Student(3L, "杨紫", 17, "北京"); Student s4 = new Student(4L, "李现", 17, "浙江"); List<Student> students = new ArrayList<>(); students.add(s1); students.add(s2); students.add(s3); students.add(s4); Student minS = students.stream().min((stu1,stu2) ->Integer.compare(stu1.getAge(),stu2.getAge())).get(); System.out.println(minS.toString()); } /** * 9、anyMatch/allMatch/noneMatch(匹配) * anyMatch:Stream 中任意一个元素符合传入的 predicate,返回 true allMatch:Stream 中全部元素符合传入的 predicate,返回 true noneMatch:Stream 中没有一个元素符合传入的 predicate,返回 true */ private static void testMatch() { Student s1 = new Student(1L, "肖战", 15, "浙江"); Student s2 = new Student(2L, "王一博", 15, "湖北"); Student s3 = new Student(3L, "杨紫", 17, "北京"); Student s4 = new Student(4L, "李现", 17, "浙江"); List<Student> students = new ArrayList<>(); students.add(s1); students.add(s2); students.add(s3); students.add(s4); Boolean anyMatch = students.stream().anyMatch(s ->"湖北".equals(s.getAddress())); if (anyMatch) { System.out.println("有湖北人"); } Boolean allMatch = students.stream().allMatch(s -> s.getAge()>=15); if (allMatch) { System.out.println("所有学生都满15周岁"); } Boolean noneMatch = students.stream().noneMatch(s -> "杨洋".equals(s.getName())); if (noneMatch) { System.out.println("没有叫杨洋的同学"); } } /* * 10、peek操作 * 改变元素的内部状态:流经的每个元素应用一个函数 推荐使用forEach(),而不是peek方法 * 协助调试:正因为 peek() 不是一个最终操作,不会影响“哪些元素会流过”,所以十分适合在调试的时候,用来打印出流经管道的元素 */ private static void testPeek() { // 1、改变元素的内部状态 return objects.stream() .peek(object -> addInfo(object, someParams)) .collect(Collectors.toList()); // 2、协助调试 Stream.of("one", "two", "three", "four") .filter(e -> e.length() > 3) .peek(e -> System.out.println("Filtered value: " + e)) .map(String::toUpperCase) .peek(e -> System.out.println("Mapped value: " + e)) .collect(Collectors.toList()); } }
3.2、Stream 源码实现
- Stream相关的类
- BaseStream 和 Stream 为最顶端的接口类。BaseStream 主要定义了流的基本接口方法,例如,spliterator、isParallel 等;Stream 则定义了一些流的常用操作方法,例如,map、filter 等
- ReferencePipeline 是一个结构类,他通过定义内部类组装了各种操作流。他定义了 Head、StatelessOp、StatefulOp 三个内部类,实现了 BaseStream 与 Stream 的接口方法
- Sink 接口是定义每个 Stream 操作之间关系的协议,他包含 begin()、end()、cancellationRequested()、accept() 四个方法
- ReferencePipeline 最终会将整个 Stream 流操作组装成一个调用链,而这条调用链上的各个 Stream 操作的上下关系就是通过 Sink 接口协议来定义实现的
3.3、Stream 操作叠加
- 管道结构通常是由 ReferencePipeline 类实现的,前面讲解 Stream 包结构时,我提到过 ReferencePipeline 包含了 Head、StatelessOp、StatefulOp 三种内部类。
名词 | 作用 |
Head | 主要用来定义数据源操作,在我们初次调用 names.stream() 方法时,会初次加载 Head 对象,此时为加载数据源操作 |
StatelessOp | 无状态中间操作 |
StatefulOp | 有状态中间操作 |
Stage | 在 JDK 中每次的中断操作会以使用阶段(Stage)命名 |
AbstractPipeline | 用于生成一个中间操作 Stage 链表 |
中间操作之后,此时的 Stage 并没有执行,而是通过 AbstractPipeline 生成了一个中间操作 Stage 链表;当我们调用终结操作时,会生成一个最终的 Stage,通过这个 Stage 触发之前的中间操作,从最后一个 Stage 开始,递归产生一个 Sink 链。
3.4、Stream执行流程
例子2:需求是查找出一个长度最长,并且以“张”为姓氏的名字
List<String> names = Arrays.asList(" 张三 ", " 李四 ", " 王老五 ", " 李三 ", " 刘老四 ", " 王小二 ", " 张四 ", " 张五六七 "); String maxLenStartWithZ = names.stream() .filter(name -> name.startsWith(" 张 ")) .mapToInt(String::length) .max() .toString();
你以为的Stream执行流程:
- 首先遍历一次集合,得到以“张”开头的所有名字;然后遍历一次 filter 得到的集合,将名字转换成数字长度;最后再从长度集合中找到最长的那个名字并且返回。
实际上的执行流程☆:
- 1、names.stream() 方法将会调用集合类基础接口 Collection 的 Stream 方法;
- 2、Stream 方法就会调用 StreamSupport 类的 Stream 方法,方法中初始化了一个 ReferencePipeline 的 Head 内部类对象;
- 3、 再调用 filter 和 map 方法,这两个方法都是无状态的中间操作,所以执行 filter 和 map 操作时,并没有进行任何的操作,而是分别创建了一个 Stage 来标识用户的每一次操作;
- 4、new StatelessOp 将会调用父类 AbstractPipeline 的构造函数,这个构造函数将前后的 Stage 联系起来,生成一个 Stage 链表:
- 5、当执行 max 方法时,会调用 ReferencePipeline 的 max 方法,此时由于 max 方法是终结操作,所以会创建一个 TerminalOp 操作,同时创建一个 ReducingSink,并且将操作封装在 Sink 类中。
- 6、Java8 中的 Spliterator 的 forEachRemaining 会迭代集合,每迭代一次,都会执行一次 filter 操作,如果 filter 操作通过,就会触发 map 操作,然后将结果放入到临时数组 object 中,再进行下一次的迭代。完成中间操作后,就会触发终结操作 max。
names.stream() 方法将会调用集合类基础接口 Collection 的 Stream 方法;
default Stream<E> stream() { return StreamSupport.stream(spliterator(), false); }
public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) { Objects.requireNonNull(spliterator); return new ReferencePipeline.Head<>(spliterator, StreamOpFlag.fromCharacteristics(spliterator), parallel); }
ReferencePipeline 的 filter 方法和 map 方法:
@Override public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) { Objects.requireNonNull(predicate); return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,StreamOpFlag.NOT_SIZED) { @Override Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) { return new Sink.ChainedReference<P_OUT, P_OUT>(sink) { @Override public void begin(long size) { downstream.begin(-1); } @Override public void accept(P_OUT u) { if (predicate.test(u)) downstream.accept(u); } }; } }; }
@Override @SuppressWarnings("unchecked") public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) { Objects.requireNonNull(mapper); return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { @Override Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) { return new Sink.ChainedReference<P_OUT, R>(sink) { @Override public void accept(P_OUT u) { downstream.accept(mapper.apply(u)); } }; } }; }
new StatelessOp 将会调用父类 AbstractPipeline 的构造函数,这个构造函数将前后的 Stage 联系起来,生成一个 Stage 链表:
AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) { if (previousStage.linkedOrConsumed) throw new IllegalStateException(MSG_STREAM_LINKED); previousStage.linkedOrConsumed = true; previousStage.nextStage = this;// 将当前的 stage 的 next 指针指向之前的 stage this.previousStage = previousStage;// 赋值当前 stage 当全局变量 previousStage this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK; this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags); this.sourceStage = previousStage.sourceStage; if (opIsStateful()) sourceStage.sourceAnyStateful = true; this.depth = previousStage.depth + 1; }
因为在创建每一个 Stage 时,都会包含一个 opWrapSink() 方法,该方法会把一个操作的具体实现封装在 Sink 类中,Sink 采用(处理 -> 转发)的模式来叠加操作。
当执行 max 方法时,会调用 ReferencePipeline 的 max 方法,此时由于 max 方法是终结操作,所以会创建一个 TerminalOp 操作,同时创建一个 ReducingSink,并且将操作封装在 Sink 类中。
@Override public final Optional<P_OUT> max(Comparator<? super P_OUT> comparator) { return reduce(BinaryOperator.maxBy(comparator)); }
最后,调用 AbstractPipeline 的 wrapSink 方法,该方法会调用 opWrapSink 生成一个 Sink 链表,Sink 链表中的每一个 Sink 都封装了一个操作的具体实现。
@Override @SuppressWarnings("unchecked") final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) { Objects.requireNonNull(sink); for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) { sink = p.opWrapSink(p.previousStage.combinedFlags, sink); } return (Sink<P_IN>) sink; }
当 Sink 链表生成完成后,Stream 开始执行,通过 spliterator 迭代集合,执行 Sink 链表中的具体操作。
@Override final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) { Objects.requireNonNull(wrappedSink); if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) { wrappedSink.begin(spliterator.getExactSizeIfKnown()); spliterator.forEachRemaining(wrappedSink); wrappedSink.end(); } else { copyIntoWithCancel(wrappedSink, spliterator); } }
Java8 中的 Spliterator 的 forEachRemaining 会迭代集合,每迭代一次,都会执行一次 filter 操作,如果 filter 操作通过,就会触发 map 操作,然后将结果放入到临时数组 object 中,再进行下一次的迭代。完成中间操作后,就会触发终结操作 max。
3.5、Stream 并行处理
List<String> names = Arrays.asList(" 张三 ", " 李四 ", " 王老五 ", " 李三 ", " 刘老四 ", " 王小二 ", " 张四 ", " 张五六七 "); String maxLenStartWithZ = names.stream().parallel().filter(name -> name.startsWith("张")) .mapToInt(String::length).max().toString();
Stream 的并行处理在执行终结操作之前,跟串行处理的实现是一样的。而在调用终结方法之后,实现的方式就有点不太一样,会调用 TerminalOp 的 evaluateParallel 方法进行并行处理。这里的并行处理指的是,Stream 结合了 ForkJoin 框架,对 Stream 处理进行了分片,Splititerator 中的 estimateSize 方法会估算出分片的数据量。
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) { assert getOutputShape() == terminalOp.inputShape(); if (linkedOrConsumed) throw new IllegalStateException(MSG_STREAM_LINKED); linkedOrConsumed = true; return isParallel() ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags())) : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags())); }
通过预估的数据量获取最小处理单元的阀值,如果当前分片大小大于最小处理单元的阀值,就继续切分集合。每个分片将会生成一个 Sink 链表,当所有的分片操作完成后,ForkJoin 框架将会合并分片任何结果集
4、合理使用 Stream
4.1、性能测试对比
我们将对常规的迭代、Stream 串行迭代以及 Stream 并行迭代进行性能测试对比,迭代循环中,我们将对数据进行过滤、分组等操作。分别进行以下几组测试:
测试 | 结论(迭代使用时间) |
多核 CPU 服务器配置环境下,对比长度 100 的 int 数组的性能; | 常规的迭代 <Stream 并行迭代 <Stream 串行迭代 |
多核 CPU 服务器配置环境下,对比长度 1.00E+8 的 int 数组的性能; | Stream 并行迭代 < 常规的迭代 <Stream 串行迭代 |
多核 CPU 服务器配置环境下,对比长度 1.00E+8 对象数组过滤分组的性能; | Stream 并行迭代 < 常规的迭代 <Stream 串行迭代 |
单核 CPU 服务器配置环境下,对比长度 1.00E+8 对象数组过滤分组的性能。 | 常规的迭代 <Stream 串行迭代 <Stream 并行迭代 |
结论:在循环迭代次数较少的情况下,常规的迭代方式性能反而更好;在单核 CPU 服务器配置环境中,也是常规迭代方式更有优势;而在大数据循环迭代中,如果服务器是多核 CPU 的情况下,Stream 的并行迭代优势明显。所以在平时处理大数据的集合时,应该尽量考虑将应用部署在多核 CPU 环境下,并且使用 Stream 的并行迭代方式进行处理。
4.2、并行流与串行流 20210128补充
- 流开启默认 串行流, 并行流与串行流都无法重复使用。也无法停止。
- 转换:sequential(), parallel() 方法,这两个方法可以多次调用, 只有最后一个调用决定这个流是顺序的还是并发的。
- 并发流使用的默认线程数等于你机器的处理器核心数。使用公共的forkjoinpool线程池。并行流默认都是用同一个默认的ForkJoinPool,ForkJoinPool会将所有的CPU打满,如果其中还有IO操作或等待操作,这个默认的ForkJoinPool只能消耗一部分CPU,而另外的并行流因为获取不到该ForkJoinPool的使用权,性能将大大降低。默认的ForkJoinPool处理计算密集型的任务比较好。
两种方式解决:
一是将并行流的源数据拆成多个,形成多个并行流在单独的线程中执行,当线程池中达到四个线程执行任务时,新启动的并行流就只能在其本身的线程X中执行,成了串行流,不过执行顺序被打乱。此时X也拥有可被窃取的任务队列,当线程池中有线程空闲时将会窃取X中的任务执行。
int[] arr = IntStream.range(1, 5).toArray(); new Thread(() -> { Arrays.stream(arr).parallel().forEach((v) -> { try { System.out.println("first:" + v); int sum = 0; for(long i = 0; i < (1<<28); i ++) { sum += i % 2; } System.out.println("first:" + v + ":" + sum); } catch (Exception e) { e.printStackTrace(); } }); }).start(); try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } new Thread(() -> { Arrays.stream(arr).parallel().forEach((v) -> { try { System.out.println("first1:" + v); int sum = 0; for(long i = 0; i < (1<<30); i ++) { sum += i % 2; } System.out.println("first1:"+ v + ":" + sum); } catch (Exception e) { e.printStackTrace(); } }); }).start();
二是创建新的forkjoinpool线程池Y,将任务拆分到Y中执行
ForkJoinPool forkJoinPool = new ForkJoinPool(8); forkJoinPool.submit(()->{ tasks.parallelStream().forEach(t->{ try { String gdsstatus=transactionService.GetTransInfo(url, t.getTask_id()); checkStatus(t.getTask_id(),t.getTask_status(),gdsstatus); } catch (Exception e) { System.out.println("EXCEPTION OCCOR IN TASK:"+t.getTask_id()); e.printStackTrace(); } System.out.println("NO:"+count.getAndIncrement()+" is done"); }); });
- 通过这个方法可以修改这个值,这是全局属性。没事别瞎设置。
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "12");
4.3、使用并行流的一些建议
- 没事就别开了,普通业务用不着,如果非要用, 自己测试性能 ~
- 数值类型,尽量使用基本类型的流 IntStream, LongStream, and DoubleStream,拆箱装箱也是损耗新能操作,需要转换则使用 box方法
- 有些操作使用并发流的性能会比顺序流的性能更差,比如limit,findFirst , 依赖元素顺序的操作在并发流中是极其消耗性能的 。findAny的性能就会好很多,因为不依赖顺序。
- 数据量不大时使用并发流,性能得不到提升。想想没事为什么不开线程。
- 想要理解机制:fork/join 框架
4.4、使用并行流造成的故障
- 查看代码,使用parallelStream 并行流处理。在并行流中调用dubbo接口
- 其内部实现为全局整个进程共用一个线程池(线程数为CPU核数,web-agg为2核),此处forEach任务会进入该线程池的处理队列,当进行一些io型慢任务时,由于线程池调度不过来,将会导致主线程一直在阻塞等待。此现象并发越高越明显。
压测复现了问题,同时发现stream比parallel表现好
- 根据压测时的Jstack,是能够分析出大部分servlet线程其实都卡在ForkJoinTask#externalAwaitDone方法上,大家在使用线程池时可以在submit时通过 Override 拒绝策略的方式,把任务让当前线程执行,这种方式可以有效的避免因线程池大小设置过小导致任务消费速率跟不上的问题。而默认的ForkJoin的实现,主线程在提交完任务后,会进行join,阻塞直到等待子线程完成后才释放。在我们这个案例中,主线程有1000个(根据web应用的active Thread判断),而子线程只有 2个(cpu核数),此时大部分线程都在等待状态。如果ForkJoin的默认策略也是用当前方式来执行,其实可以避免这个问题,因为主线程的数量限制 远大于 ForkJoin线程池的数量。
- 如下图所示,为优化后的代码,stream+distinct+批量调用一次rpc
4.5、JAVA使用并行流(ParallelStream)时要注意的一些问题
JAVA使用并行流(ParallelStream)时要注意的一些问题
4.6、使用findFirst/findAny方法的注意事项 20211111补
线上问题,报如下错误:
原因:
Optional<String> brandOpt = attrLists.stream() .filter(attrKey -> "品牌".equals(attrKey.getAttrKey())) .map(OtherAttribute::getAttrVal) .findFirst();
/* @return an {@code Optional} describing the first element of this stream, * or an empty {@code Optional} if the stream is empty * @throws NullPointerException if the element selected is null */ Optional<T> findFirst(); /** * @return an {@code Optional} describing some element of this stream, or an * empty {@code Optional} if the stream is empty * @throws NullPointerException if the element selected is null * @see #findFirst() */ Optional<T> findAny();
解决方案:
Optional<String> brandOpt = attrLists.stream() .filter(attrKey -> "品牌".equals(attrKey.getAttrKey())) .map(OtherAttribute::getAttrVal) .filter(Objects::nonNull) .findFirst();
- 在Stream中使用findxxx方法之前,必须确保上次计算的结果是非空的,使用
filter()
操作来确保非空
4.7 使用toMap()方法踩坑,报错npe 202207补
- 背景:线上报错npe,日志如下
- 代码如下:
Map<Long, String> auditMaps = zcyOptimizationItemAudits.stream() .collect(Collectors.toMap(ZcyOptimizationItemAudit::getId, ZcyOptimizationItemAudit::getRejectReason, (a, b) -> a));
- 原因:使用toMap方法,在底层merge操作时,如果value为空,报错npe
public V merge(K key, V value, BiFunction<? super V, ? super V, ? extends V> remappingFunction) { if (value == null) throw new NullPointerException(); if (remappingFunction == null) throw new NullPointerException(); int hash = hash(key); Node<K,V>[] tab; Node<K,V> first; int n, i; ...
- 解决方案:
Map<Long, ZcyOptimizationItemAudit> auditMaps = zcyOptimizationItemAudits.stream() .collect(Collectors.toMap(ZcyOptimizationItemAudit::getId, Function.identity(), (a, b) -> a));
5、使用Stream总结
- 在串行处理操作中,Stream 在执行每一步中间操作时,并不会做实际的数据操作处理,而是将这些中间操作串联起来,最终由终结操作触发,生成一个数据处理链表,通过 Java8 中的 Spliterator 迭代器进行数据处理;此时,每执行一次迭代,就对所有的无状态的中间操作进行数据处理,而对有状态的中间操作,就需要迭代处理完所有的数据,再进行处理操作;最后就是进行终结操作的数据处理。
- 在并行处理操作中,Stream 对中间操作基本跟串行处理方式是一样的,但在终结操作中,Stream 将结合 ForkJoin 框架对集合进行切片处理,ForkJoin 框架将每个切片的处理结果 Join 合并起来。最后就是要注意 Stream 的使用场景。
5.1、从Java开发者角度,如何从代码级别判断应用的性能表现(基准测试)“Lambda能让Java程序慢30倍”,你怎么看?
// 一个大的ArrayLis,内部是随机的整型数据 volatile Lis<Integer> integers= … // 基准测试1 public int forEachLoopMaxInteger(){ int max=Integer.MIN_VALUE; for(Integer n:integers) { max=Integer.max(max, n); } return max; } // 基准测试2 public int lambdaMaxInteger(){ return integers.stream().reduce(Integer.MIN_VALUE,(a,b) -> Integer.max(a,b)); }
前面代码片段本身的逻辑就有瑕疵,更多的开销是源于自动装箱、拆箱(auto-boxing/unboxing),而不是源自Lambda和Stream,所以得出的初始结论是没有说服力的
- 1、什么时候需要开发微基准测试?当需要对一个大型软件的某小部分的性能进行评估时,就可以考虑微基准测试
- 1)你在开发共享类库,为其他模块提供某种服务的API等;
- 2)你的API对于性能,如延迟、吞吐量有着严格的要求(实现了定制的HTTP客户端API,需要明确它对HTTP服务器进行大量GET请求时的吞吐能力),基准测试的框架JMH(支持完整的基准测试过程,包括预热、运行、统计和报告,还支持java和其他JVM语言)。
- 2、如何使用:直接将其依赖加入Maven工程
<dependencies> <dependency> <groupId>org.openjdk.jmh</groupId> <artifactId>jmh-core</artifactId> <version>${jmh.version}</version> <dependency> <dependencies> <dependencies> <dependency> <groupId>org.openjdk.jmh</groupId> <artifactId>jmh-generator-annprocess</artifactId> <version>${jmh.version}</version> <scope>provided</scope> <dependency> <dependencies>
- JMH利用注解(Annotation),定义具体的测试方法,以及基准测试的详细配置。
@Benchmark //加上“@Benchmark”以标识它是个基准测试方法 @BenchmarkMode(Mode.Throughput) //BenchmarkMode则指定了基准测试模式,指定了吞吐量(Throughput)模式,还可以根据需要指定平均时间(AverageTime)等其他模式 public void tesMethod() { //Put your benchmark code here. }
- 利用maven命令构建 mvn clean install
- 运行基准测试 java -jar target/benchmarks.jar
- 3、基准测试的典型问题
- 1)保证代码经过了足够并且合适的预热(在server模式下,JIT会在一段代码执行10000次后,将其编译为本地代码,client模式则是1500次) 使用下面的参数来判断预热工作到底是经过了多久。
-XX:+PrintCompilation - 2)防止JVM进行无效代码消除
我们并没有使用计算结果mul,那么JVM就可能直接判断无效代码,根本就不执行它
public void tesMethod() { int left = 10; int right = 100; int mul = left * right; }//尽量保证方法有返回值,而不是void方法
- 3)防止发生常量折叠 (JVM如果发现计算过程是依赖于常量或者事实上的常量,就可能会直接计算其结果)
6、Stream思考题
- 这里有一个简单的并行处理案例,请你找出其中存在的问题。
// 使用一个容器装载 100 个数字,通过 Stream 并行处理的方式将容器中为单数的数字转移到容器 parallelList List<Integer> integerList= new ArrayList<Integer>(); for (int i = 0; i <100; i++) { integerList.add(i); } List<Integer> parallelList = new ArrayList<Integer>() ; integerList.stream() .parallel() .filter(i->i%2==1) .forEach(i->parallelList.add(i));
- 答案:ArrayList不是线程安全的,在并行操作时,会出现多线程操作问题,例如出现null值,有可能是在扩容时,复制出现问题。同时也会出现值被覆盖的情况。
- 解决方案:可以换成线程安全的ArrayList。
7、Java8的接口默认实现 20210305补
- 背景:Java8中给很多API提供了新的方法,比如常用的java.util.List接口增加了replaceAll、sort、spliterator这三个方法;根据现行的Java语法规范,List的所有子类都需要对这三个方法进行实现,这显然是不能接受的,尤其是众多第三方类库对JDK有依赖,要求他们一同修改也是做不到的。
- 为了解决这个问题,也保证JDK的兼容性,Java8引入了一种新的机制:接口可以声明带有实现的方法。
语法规则
- 1、使用default关键字
- 2、一个接口中可以含有多个带有默认实现的方法
java.util.List中新增的三个方法源码
public interface List<E> extends Collection<E> { default void replaceAll(UnaryOperator<E> operator) { Objects.requireNonNull(operator); final ListIterator<E> li = this.listIterator(); while (li.hasNext()) { li.set(operator.apply(li.next())); } } default void sort(Comparator<? super E> c) { Object[] a = this.toArray(); Arrays.sort(a, (Comparator) c); ListIterator<E> i = this.listIterator(); for (Object e : a) { i.next(); i.set((E) e); } } default Spliterator<E> spliterator() { return Spliterators.spliterator(this, Spliterator.ORDERED); } }
Action:
1、接口默认实现跟抽象类岂不是差不多?
- 是的,这个默认实现的确和抽象类的非抽象方法有相似之处,但是相比于抽象类,接口是可以多继承的。
2、当然接口引入了默认实现后,又会带来另外一个问题,同一个类可能会从不同的接口中继承了具有相同的签名的方法,这又该如何解决呢?
- 为了解决这种冲突,Java8提供下面三种规则:
- 1、类中的方法优先级最高;
- 2、子接口中的方法优先级高于父接口中的方法;
- 3、当前两种方式无法识别时,需要显示的重写有冲突的方法,指明调用期望;
场景1:
public interface A { default String test() { return "A"; } } public class B { public String test() { return "B"; } } public class C extends B implements A { public static void main(String[] args) { System.out.println(new C().test()); } }
- 这个例子中C分别中A和B中都继承了test()方法,但是B是类,而A是接口,所以B中的test()方法优先级更高,因此此次输出结果应该是:B
场景二:
public interface D extends A { default String test() { return "D"; } } public class E implements A, D { public static void main(String[] args) { System.out.println(new E().test()); } }
- 这个例子中E分别中A和D这两个接口中继承了test()方法,但是D是A的子接口,所以D接口中的test()方法优先级更高,因此此处输出应该是:D
场景三:
public class F implements A { } public class G extends F implements A, D { public static void main(String[] args) { System.out.println(new G().test()); } }
- 这种场景下虽然F实现了A接口,但是由于它并没有重写test()方法,所以优先级依然是D接口高,因此此处输出依然是:D
场景四:
public interface H { default String test() { return "H"; } } public class K implements A, H { public static void main(String[] args) { System.out.println(new K().test()); } @Override public String test() { return H.super.test(); } }
- 这个场景中,由于A和H没有继承关系,所以在JVM无法识别出到底要调用哪一个接口中的方法,所以需要在类K中显式的覆盖test()方法,指明具体调用哪个接口中的方法,当然也可以自己重写方法实现;
8、Java8 Map集合中put()与putIfAbsent()的区别
Map集合中put与putIfAbsent的区别
put方法:
V put(K key, V value);
putIfAbsent方法:
V putIfAbsent(K key, V value);
这两种方法都是以key-value键值对的形式存在到map集合中,那么它们两个有什么区别呢?
- 我们可以从map官网注释中看出:
- 1.使用
put
方法添加键值对,如果map集合中没有该key对应的值,则直接添加,并返回null,如果已经存在对应的值,则会覆盖旧值,value为新的值。 - 2.使用
putIfAbsent
方法添加键值对,如果map集合中没有该key对应的值,则直接添加,并返回null,如果已经存在对应的值,则依旧为原来的值。
default V putIfAbsent(K key, V value) { V v = get(key); if (v == null) { v = put(key, value); } return v; }
9、Java8 新特性有哪些了解(5个主要的特性)? 阿里
9.1、五大特性
1、代码更少(增加了新的语法Lambda表达式)
- lambda表达式,允许像对象一样传递匿名函数,一个函数式接口非常有价值的属性就是他们能够用lambda来实例化,语法:左侧是表达式需要的所有参数,右侧是表达式要执行的功能 1、无参数,使用空括号,可以从匿名类转换为lambda表达式,如多线程中,把重写的run方法换为->的lambda表达式;2、有参数,在括号里面加参数,原来使用匿名内部类作为参数传递,现在使用lambda表达式作为参数传递;3、类型推断 编译器可以推断得到数据类型。
2、强大的Stream API(java.util.stream.* )
- stream API,充分利用现代多核cpu,可以写出很简洁的代码,Stream是Java8中处理集合的关键抽象概念,它可以指定你希望对集合进行的操作,可以执行非常复杂的查找、过滤和映射数据等操作.(类似于使用sql执行数据库查询)。操作步骤:创建stream,中间操作(对数据源的数据进行处理),终止操作(产生结果),多个中间操作可以连接起来形成一个流水线,除非流水线上触发终止操作,否则中间操作不会执行任何的处理!而在终止操作时一次性全部处理,称为“惰性求值”。
3、新时间日期 API(Data与TimeAPI,稳定、简单的日期和时间库可供使用)
- LocalDate、LocalTime、LocalDateTime类的实例是不可变的对象,分别表示使用ISO-8601日历系统的日期、时间、日期和时间(不包含与时区相关的信息), Instant时间戳 它是以Unix元年开始所经历的描述进行运算
4、扩展方法,接口中可以有静态、默认方法
- 一个接口定义个唯一一个抽象方法,那么这个接口就成为函数式接口
5、重复注解
- 可以将相同的注解在同一类型上使用多次 后续详解
9.2、无参函数的简写
原来:
- 如果需要新建一个线程,一种常见的写法是这样
// JDK7 匿名内部类写法 new Thread(new Runnable(){// 接口名 @Override public void run(){// 方法名 fixStock(itemIds); } }).start();
如多线程中,把重写的run方法换为->的lambda表达式Demo:
EXECUTOR.execute(() -> fixStock(itemIds));
9.3、带参函数的简写
如果要给一个字符串列表通过自定义比较器,按照字符串长度进行排序,Java 7的书写形式如下:
// JDK7 匿名内部类写法 List<String> list = Arrays.asList("I", "love", "you", "too"); Collections.sort(list, new Comparator<String>(){// 接口名 @Override public int compare(String s1, String s2){// 方法名 if(s1 == null) return -1; if(s2 == null) return 1; return s1.length()-s2.length(); } });
上述代码通过内部类重载了Comparator接口的compare()方法,实现比较逻辑
采用Lambda表达式可简写如下:
// JDK8 Lambda表达式写法 List<String> list = Arrays.asList("I", "love", "you", "too"); Collections.sort(list, (s1, s2) ->{// 省略参数表的类型 if(s1 == null) return -1; if(s2 == null) return 1; return s1.length()-s2.length(); });
除了省略了接口名和方法名,代码中把参数表的类型也省略了。这得益于javac的类型推断机制,编译器能够根据上下文信息推断出参数的类型,当然也有推断失败的时候,这时就需要手动指明参数类型
简写的依据
- 能够使用Lambda的依据是必须有相应的函数接口(函数接口,是指内部只有一个抽象方法的接口)
- 另一个依据是类型推断机制
- Lambda表达更多合法的书写形式如下:
// Lambda表达式的书写形式 Runnable run = () -> System.out.println("Hello World");// 1 ActionListener listener = event -> System.out.println("button clicked");// 2 Runnable multiLine = () -> {// 3 代码块 System.out.print("Hello"); System.out.println(" Hoolee"); }; BinaryOperator<Long> add = (Long x, Long y) -> x + y;// 4 BinaryOperator<Long> addImplicit = (x, y) -> x + y;// 5 类型推断
- 上述代码中,1展示了无参函数的简写;2处展示了有参函数的简写,以及类型推断机制;3是代码块的写法;4和5再次展示了类型推断机制。
进一步区分Lambda表达式和匿名内部类在JVM层面的区别
匿名内部类仍然是一个类,只是不需要程序员显示指定类名,编译器会自动为该类取名。因此如果有如下形式的代码,编译之后将会产生两个class文件:
public class MainAnonymousClass { public static void main(String[] args) { new Thread(new Runnable(){ @Override public void run(){ System.out.println("Anonymous Class Thread run()"); } }).start();; } }
进一步分析主类MainAnonymousClass.class的字节码,可发现其创建了匿名内部类的对象:
// javap -c MainAnonymousClass.class public class MainAnonymousClass { ... public static void main(java.lang.String[]); Code: 0: new #2 // class java/lang/Thread 3: dup 4: new #3 // class MainAnonymousClass$1 /*创建内部类对象*/ 7: dup 8: invokespecial #4 // Method MainAnonymousClass$1."<init>":()V 11: invokespecial #5 // Method java/lang/Thread."<init>":(Ljava/lang/Runnable;)V 14: invokevirtual #6 // Method java/lang/Thread.start:()V 17: return }
Lambda表达式通过invokedynamic指令实现,书写Lambda表达式不会产生新的类。如果有如下代码,编译之后只有一个class文件:
public class MainLambda { public static void main(String[] args) { new Thread( () -> System.out.println("Lambda Thread run()") ).start();; } }
通过javap反编译命名,我们更能看出Lambda表达式内部表示的不同:
// javap -c -p MainLambda.class public class MainLambda { ... public static void main(java.lang.String[]); Code: 0: new #2 // class java/lang/Thread 3: dup 4: invokedynamic #3, 0 // InvokeDynamic #0:run:()Ljava/lang/Runnable; /*使用invokedynamic指令调用*/ 9: invokespecial #4 // Method java/lang/Thread."<init>":(Ljava/lang/Runnable;)V 12: invokevirtual #5 // Method java/lang/Thread.start:()V 15: return private static void lambda$main$0(); /*Lambda表达式被封装成主类的私有方法*/ Code: 0: getstatic #6 // Field java/lang/System.out:Ljava/io/PrintStream; 3: ldc #7 // String Lambda Thread run() 5: invokevirtual #8 // Method java/io/PrintStream.println:(Ljava/lang/String;)V 8: return }
反编译之后我们发现Lambda表达式被封装成了主类的一个私有方法,并通过invokedynamic指令进行调用
10、Java8的Stream工具封装
封装工具
Predicate 判断|true|false
Function 转换函数
Supplier 生产
Consumer 消费
BinaryOperator 聚合
工具封装 Demo
public class ListUtils { public static <T> List<T> filter(List<T> list, Predicate<T> predicate) { return list.stream().filter(predicate).collect(Collectors.toList()); } public static <T, R> List<R> map(List<T> list, Function<T, R> function) { return list.stream().map(function).collect(Collectors.toList()); } public static <T> T findFirstOrNull(List<T> list) { return list.stream().findFirst().orElse(null); } public static <T, K, R> Map<K, R> toMap(List<T> list, Function<T, K> keyMapper, Function<T, R> valueMapper) { return list.stream().collect(Collectors.toMap(keyMapper, valueMapper)); } //...还有其他 }
11、Optional详解
1、ofNullable
判空操作
public static String getGender(Student student) { return Optional.ofNullable(student) .map(u -> u.getGender()).orElse("Unkown"); }
2、isPresent()
方法 用于判断包装对象的值是否非空。
public static void printName(Student student) { Optional.ofNullable(student) .ifPresent(u -> System.out.println("The student name is : " + u.getName())); }
- 用于打印学生姓名,由于ifPresent()方法内部做了null值检查,调用前无需担心NPE问题
3、filter()
用于对Optional对象进行过滤,如果符合Predicate的条件,返回Optional对象本身,否则返回一个空的Optional对象
public static void filterAge(Student student) { Optional.ofNullable(student) .filter( u -> u.getAge() > 18) .ifPresent(u -> System.out.println("The student age is more than 18.")); }
4、map()
map()方法的参数为Function(函数式接口)对象,map()方法将Optional中的包装对象用Function函数进行运算,并包装成新的Optional对象(包装对象的类型可能改变)
public static Optional<Integer> getAge(Student student) { return Optional.ofNullable(student) .map(u -> u.getAge()); }
5、 flatMap()
flatMap()能将一个二维的Optional对象映射成一个一维的对象
public static Optional<Integer> getAge(Student student) { return Optional.ofNullable(student) .flatMap(u -> Optional.ofNullable(u.getAge())); }
6、npe检查
public static <T> Optional<T> checkNPE(Supplier<T> resolver) { try { T result = resolver.get(); return Optional.ofNullable(result); } catch (NullPointerException e) { return Optional.empty(); } } // 使用方式1 boolean itemPriceDescOption = OptionalUtil.checkNPE(() -> extDTO.getItemPriceDesc()).isPresent(); // 使用方式2 Optional<Long> protocolOptional = OptionalUtil.checkNPE(() -> item.getAgreementId());
参考资料
1、Java Functional Programming Internals
2、《java性能调优实战》 刘超
3、公众号《java之间》
真正的爱,不是单纯的给予,还包括适当的拒绝、及时的赞美、得体的批评、恰当的争论、必要的鼓励、温柔的安慰、有效的敦促。父母应该成为值得尊敬的领导者、指挥官,告诉孩子该做什么,不该做什么。要依据理性的判断,而不能仅凭直觉,必须经过认真思考和周密计划,甚至是做出令人痛苦的决定。