重学JDK8新特性之Stream(上):https://developer.aliyun.com/article/1413212
Stream结果收集
结果收集到集合
List<String> list = Stream.of("aa", "bb", "cc","aa") .collect(Collectors.toList()); System.out.println(list); // 收集到 Set集合中 Set<String> set = Stream.of("aa", "bb", "cc", "aa") .collect(Collectors.toSet()); System.out.println(set); // 如果需要获取的类型为具体的实现,比如:ArrayList HashSet ArrayList<String> arrayList = Stream.of("aa", "bb", "cc", "aa") //.collect(Collectors.toCollection(() -> new ArrayList<>())); .collect(Collectors.toCollection(ArrayList::new)); System.out.println(arrayList); HashSet<String> hashSet = Stream.of("aa", "bb", "cc", "aa") .collect(Collectors.toCollection(HashSet::new)); System.out.println(hashSet);
输出:
[aa, bb, cc, aa] [aa, bb, cc] [aa, bb, cc, aa] [aa, bb, cc]
结果收集到数组中
Object[] objects = Stream.of("aa", "bb", "cc", "aa") .toArray(); // 返回的数组中的元素是 Object类型 System.out.println(Arrays.toString(objects)); // 如果我们需要指定返回的数组中的元素类型 String[] strings = Stream.of("aa", "bb", "cc", "aa") .toArray(String[]::new); System.out.println(Arrays.toString(strings));
对流中数据做聚合计算
当我们使用Stream流处理数据后,可以像数据库的聚合函数一样对某个字段进行操作,比如获得最大
值,最小值,求和,平均值,统计数量。
Optional<Person> maxAge = Stream.of( new Person("张三", 18) , new Person("李四", 22) , new Person("张三", 13) , new Person("王五", 15) , new Person("张三", 19) ).collect(Collectors.maxBy((p1, p2) -> p1.getAge() - p2.getAge())); System.out.println("最大年龄:" + maxAge.get()); // 获取年龄的最小值 Optional<Person> minAge = Stream.of( new Person("张三", 18) , new Person("李四", 22) , new Person("张三", 13) , new Person("王五", 15) , new Person("张三", 19) ).collect(Collectors.minBy((p1, p2) -> p1.getAge() - p2.getAge())); System.out.println("最新年龄:" + minAge.get()); // 求所有人的年龄之和 Integer sumAge = Stream.of( new Person("张三", 18) , new Person("李四", 22) , new Person("张三", 13) , new Person("王五", 15) , new Person("张三", 19) ) //.collect(Collectors.summingInt(s -> s.getAge())) .collect(Collectors.summingInt(Person::getAge)) ; System.out.println("年龄总和:" + sumAge); // 年龄的平均值 Double avgAge = Stream.of( new Person("张三", 18) , new Person("李四", 22) , new Person("张三", 13) , new Person("王五", 15) , new Person("张三", 19) ).collect(Collectors.averagingInt(Person::getAge)); System.out.println("年龄的平均值:" + avgAge); // 统计数量 Long count = Stream.of( new Person("张三", 18) , new Person("李四", 22) , new Person("张三", 13) , new Person("王五", 15) , new Person("张三", 19) ).filter(p->p.getAge() > 18) .collect(Collectors.counting()); System.out.println("满足条件的记录数:" + count);
对流中数据做分组操作
// 根据账号对数据进行分组 Map<String, List<Person>> map1 = Stream.of( new Person("张三", 18, 175) , new Person("李四", 22, 177) , new Person("张三", 14, 165) , new Person("李四", 15, 166) , new Person("张三", 19, 182) ).collect(Collectors.groupingBy(Person::getName)); map1.forEach((k,v)-> System.out.println("k=" + k +"\t"+ "v=" + v)); System.out.println("-----------"); // 根据年龄分组 如果大于等于18 成年否则未成年 Map<String, List<Person>> map2 = Stream.of( new Person("张三", 18, 175) , new Person("李四", 22, 177) , new Person("张三", 14, 165) , new Person("李四", 15, 166) , new Person("张三", 19, 182) ).collect(Collectors.groupingBy(p -> p.getAge() >= 18 ? "成年" : "未成 年")); map2.forEach((k,v)-> System.out.println("k=" + k +"\t"+ "v=" + v));
结果:
k=李四 v=[Person{name='李四', age=22, height=177}, Person{name='李四', age=15, height=166}] k=张三 v=[Person{name='张三', age=18, height=175}, Person{name='张三', age=14, height=165}, Person{name='张三', age=19, height=182}] ----------- k=未成年 v=[Person{name='张三', age=14, height=165}, Person{name='李四', age=15, height=166}] k=成年 v=[Person{name='张三', age=18, height=175}, Person{name='李四', age=22, height=177}, Person{name='张三', age=19, height=182}]
对流中的数据做分区操作
Map<Boolean, List<Person>> map = Stream.of( new Person("张三", 18, 175) , new Person("李四", 22, 177) , new Person("张三", 14, 165) , new Person("李四", 15, 166) , new Person("张三", 19, 182) ).collect(Collectors.partitioningBy(p -> p.getAge() > 18)); map.forEach((k,v)-> System.out.println(k+"\t" + v));
结果:
false [Person{name='张三', age=18, height=175}, Person{name='张三', age=14, height=165}, Person{name='李四', age=15, height=166}] true [Person{name='李四', age=22, height=177}, Person{name='张三', age=19, height=182}]
对流中的数据做拼接
String s1 = Stream.of( new Person("张三", 18, 175) , new Person("李四", 22, 177) , new Person("张三", 14, 165) , new Person("李四", 15, 166) , new Person("张三", 19, 182) ).map(Person::getName) .collect(Collectors.joining()); // 张三李四张三李四张三 System.out.println(s1); String s2 = Stream.of( new Person("张三", 18, 175) , new Person("李四", 22, 177) , new Person("张三", 14, 165) , new Person("李四", 15, 166) , new Person("张三", 19, 182) ).map(Person::getName) .collect(Collectors.joining("_")); // 张三_李四_张三_李四_张三 System.out.println(s2);
并行的Stream流
串行Stream流
前面使用的Stream流都是串行,也就是在一个线程上面执行。
Stream.of(5,6,8,3,1,6) .filter(s->{ System.out.println(Thread.currentThread() + "" + s); return s > 3; }).count();
输出:
Thread[main,5,main]5 Thread[main,5,main]6 Thread[main,5,main]8 Thread[main,5,main]3 Thread[main,5,main]1 Thread[main,5,main]6
并行流
parallelStream其实就是一个并行执行的流,它通过默认的ForkJoinPool,可以提高多线程任务的速度。
我们可以通过两种方式来获取并行流。
- 通过List接口中的parallelStream方法来获取
- 通过已有的串行流转换为并行流(parallel)
List<Integer> list = new ArrayList<>(); // 通过List 接口 直接获取并行流 Stream<Integer> integerStream = list.parallelStream(); // 将已有的串行流转换为并行流 Stream<Integer> parallel = Stream.of(1, 2, 3).parallel();
并行操作
Stream.of(1,4,2,6,1,5,9) .parallel() // 将流转换为并发流,Stream处理的时候就会通过多线程处理 .filter(s->{ System.out.println(Thread.currentThread() + " s=" +s); return s > 2; }).count();
输出:
Thread[main,5,main] s=1 Thread[ForkJoinPool.commonPool-worker-2,5,main] s=9 Thread[ForkJoinPool.commonPool-worker-6,5,main] s=6 Thread[ForkJoinPool.commonPool-worker-13,5,main] s=2 Thread[ForkJoinPool.commonPool-worker-9,5,main] s=4 Thread[ForkJoinPool.commonPool-worker-4,5,main] s=5 Thread[ForkJoinPool.commonPool-worker-11,5,main] s=1
并行流和串行流对比
/** * 普通for循环 消耗时间:138 */ @Test public void test01(){ System.out.println("普通for循环:"); long res = 0; for (int i = 0; i < times; i++) { res += i; } } /** * 串行流处理 * 消耗时间:203 */ @Test public void test02(){ System.out.println("串行流:serialStream"); LongStream.rangeClosed(0,times) .reduce(0,Long::sum); } /** * 并行流处理 消耗时间:84 */ @Test public void test03(){ LongStream.rangeClosed(0,times) .parallel() .reduce(0,Long::sum); }
通过案例我们可以看到parallelStream的效率是最高的。
Stream并行处理的过程会分而治之,也就是将一个大的任务切分成了多个小任务,这表示每个任务都是一个线程操作。
线程安全问题
在多线程的处理下,肯定会出现数据安全问题。如下:
List<Integer> list = new ArrayList<>(); for (int i = 0; i < 1000; i++) { list.add(i); } System.out.println(list.size()); List<Integer> listNew = new ArrayList<>(); // 使用并行流来向集合中添加数据 list.parallelStream() //.forEach(s->listNew.add(s)); .forEach(listNew::add); System.out.println(listNew.size());
实际上有可能出现线程安全问题:
java.lang.ArrayIndexOutOfBoundsException at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorI mpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorA ccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:598) .... Caused by: java.lang.ArrayIndexOutOfBoundsException: 366 at java.util.ArrayList.add(ArrayList.java:463)
在并行流中往列表(List)集合进行添加元素可能会遇到线程安全问题。当多个线程同时执行添加操作时,由于并行流的特性,多个线程会同时访问和修改同一个列表对象,这可能导致以下问题:
- 竞态条件:由于多个线程同时修改列表,可能导致数据不一致或意外的结果。例如,如果两个线程同时向同一个索引位置添加元素,可能会导致其中一个元素被覆盖或丢失。
- 并发修改异常:Java 的 ArrayList 类并不是线程安全的,当多个线程同时修改列表时,可能会引发 ConcurrentModificationException 异常。
针对这个问题,我们的解决方案有哪些呢?
- 加同步锁
- 使用线程安全的容器
- 通过Stream中的toArray/collect操作
实现:
/** * 加同步锁 */ List<Integer> listNew = new ArrayList<>(); Object obj = new Object(); IntStream.rangeClosed(1,1000) .parallel() .forEach(i->{ synchronized (obj){ listNew.add(i); } }); System.out.println(listNew.size()); /** * 使用线程安全的容器 */ Vector v = new Vector(); Object obj = new Object(); IntStream.rangeClosed(1,1000) .parallel() .forEach(i->{ synchronized (obj){ v.add(i); } }); System.out.println(v.size()); /** * 将线程不安全的容器转换为线程安全的容器 */ List<Integer> listNew = new ArrayList<>(); // 将线程不安全的容器包装为线程安全的容器 List<Integer> synchronizedList = Collections.synchronizedList(listNew); Object obj = new Object(); IntStream.rangeClosed(1,1000) .parallel() .forEach(i->{ synchronizedList.add(i); }); System.out.println(synchronizedList.size());