4.11 max和min
如果我们想要获取最大值和最小值,那么可以使用max和min方法
Optional<T> min(Comparator<? super T> comparator); Optional<T> max(Comparator<? super T> comparator);
使用
public class StreamTest15MaxMin { public static void main(String[] args) { Optional<Integer> max = Stream.of("2", "21", "1", "3", "4", "3", "9", "22") .map(Integer::parseInt) .max((o1, o2) -> o1-o2); System.out.println(max.get()); Optional<Integer> min = Stream.of("2", "21", "1", "3", "4", "3", "9", "22") .map(Integer::parseInt) .min((o1, o2) -> o1-o2); System.out.println(min.get()); } }
4.12 reduce方法
T reduce(T identity, BinaryOperator<T> accumulator);
如果需要把所有数据归纳得到一个数据,可以使用reduce方法
public class StreamTest17Reduce { public static void main(String[] args) { Integer max = Stream.of("2", "21", "1", "3", "4", "3", "9", "22") .map(Integer::parseInt) .reduce(0,(x,y)->x>y?x:y); Integer sum = Stream.of("2", "21", "1", "3", "4", "3", "9", "22") .map(Integer::parseInt) .reduce(0,(x,y)->x+y); System.out.println(max); System.out.println(sum); } }
4.13 map和reduce的组合
在实际开发中我们经常会将map和reduce一块使用
public class StreamTest18MapReduce { public static void main(String[] args) { Integer sumAge = Stream.of( new Person("张三",18,12), new Person("李四",23,11), new Person("张三",18,12), new Person("王五",12,13) ).map(Person::getAge) // .reduce(0,(x,y)->x+y); // .reduce(0,Integer::sum); .reduce(0,Math::max); System.out.println(sumAge); } }
4.14 mapToInt
如果需要将Stream中的Integer类型转换为int类型,可以使用mapToInt来实现
IntStream mapToInt(ToIntFunction<? super T> mapper);
使用
public class StreamTest19MapToInt { public static void main(String[] args) { // Integer占用的内存比int大很多,在Stream流操作中会自动装修和拆箱操作 // 为了提高代码效率,可以把流转换未IntStream,再操作 Stream.of("2", "21", "1", "3", "4", "3", "9", "22") .mapToInt(Integer::parseInt) .forEach(System.out::println); } }
4.15 concat
如果有两个流,希望合并成一个流,那么可以使用concat方法
public static <T> Stream<T> concat(Stream<? extends T> a, Stream<? extends T> b) { Objects.requireNonNull(a); Objects.requireNonNull(b); @SuppressWarnings("unchecked") Spliterator<T> split = new Streams.ConcatSpliterator.OfRef<>( (Spliterator<T>) a.spliterator(), (Spliterator<T>) b.spliterator()); Stream<T> stream = StreamSupport.stream(split, a.isParallel() || b.isParallel()); return stream.onClose(Streams.composedClose(a, b)); }
使用
public class StreamTest20Concat { public static void main(String[] args) { Stream<String> stream1 = Stream.of("2", "21", "1", "3", "4", "3", "9", "22"); Stream<String> stream2 = Stream.of("aa", "bb", "cc", "dd", "ee", "ff", "dd", "gg"); Stream.concat(stream1,stream2).forEach(System.out::println); } }
4.16 综合案例
定义两个集合,然后在集合中存储多个用户名称,然后完成如下的操作
- 第一个队伍只保留姓名长度为3的成员
- 第一个队伍筛选之后只要前3个人
- 第二个队伍只要姓张的成员
- 第二个队伍不要前两个人
- 合并两个队伍
- 根据姓名创建Person对象
- 打印整个队伍的Person信息
public class StreamTest21Example { public static void main(String[] args) { List<String> list1 = Arrays.asList("宋江","及时雨","李逵","黑旋风","豹子头","林冲","花和尚","鲁智深","智多星","吴用","鼓上搔","时迁"); List<String> list2 = Arrays.asList("周星驰","张三丰","周润发","张启灵","刘德华","张起山","姚明","王中网","张三","刘备","张飞"); Stream<String> stream1 = list1.stream() .filter(s -> s.length()==3) .limit(3); Stream<String> stream2 = list2.stream() .filter(s -> s.contains("张")) .skip(2); Stream.concat(stream1,stream2) .map(Person::new) .forEach(System.out::println); } }
输出
Person(name=及时雨, age=null, height=null) Person(name=黑旋风, age=null, height=null) Person(name=豹子头, age=null, height=null) Person(name=张起山, age=null, height=null) Person(name=张三, age=null, height=null) Person(name=张飞, age=null, height=null)
5. Stream结果集收集
5.1 收集到集合中
//收集到集合中 @Test public void test1(){ List<String> list = Stream.of("aa", "bb", "cc", "dd", "aa").collect(Collectors.toList()); System.out.println(list); //收集到set集合中 Set<String> set = Stream.of("aa", "bb", "cc", "dd", "aa").collect(Collectors.toSet()); System.out.println(set); //收集到ArrayList ArrayList<String> arrayList = Stream.of("aa", "bb", "cc", "dd", "aa") // .collect(Collectors.toCollection(() -> new ArrayList<>())); .collect(Collectors.toCollection(ArrayList::new)); System.out.println(arrayList); //收集到HashSet中 HashSet<String> hashSet = Stream.of("aa", "bb", "cc", "dd", "aa") .collect(Collectors.toCollection(HashSet::new)); System.out.println(hashSet); }
5.2 收集到数组中
Stream中提供了toArray方法将结构放到一个数组中,返回值类型是Object[],如果我们要指定返回类型,那么可以使用另一个重载的toArray(IntFunction f)方法
//收集到数组中 @Test public void test2(){ Object[] objects = Stream.of("aa", "bb", "cc", "dd", "aa").toArray(); System.out.println(Arrays.toString(objects)); //指定类型 String[] strings = Stream.of("aa", "bb", "cc", "dd", "aa").toArray(String[]::new); System.out.println(strings); }
5.3 对流中的数据做聚合运算
当我们使用Stream流处理数据后,可以像数据库的聚合函数一样对某个字段进行操作,比如获取最大值,最小值,求和,平均值,统计数量
@Test public void test3(){ Optional<Person> maxAge = Stream.of( new Person("张三", 18), new Person("李四", 21), new Person("王五", 12), new Person("赵六", 88), new Person("张三", 77) ).collect(Collectors.maxBy((p1, p2) -> p1.getAge() - p2.getAge())); System.out.println("最大年龄:"+maxAge.get()); Optional<Person> minAge = Stream.of( new Person("张三", 18), new Person("李四", 21), new Person("王五", 12), new Person("赵六", 88), new Person("张三", 77) ).collect(Collectors.minBy((p1, p2) -> p1.getAge() - p2.getAge())); System.out.println("最小年龄:"+minAge.get()); Integer sumAge = Stream.of( new Person("张三", 18), new Person("李四", 21), new Person("王五", 12), new Person("赵六", 88), new Person("张三", 77) ).collect(Collectors.summingInt(Person::getAge)); System.out.println("年龄总和:"+sumAge); Double avgAge = Stream.of( new Person("张三", 18), new Person("李四", 21), new Person("王五", 12), new Person("赵六", 88), new Person("张三", 77) ).collect(Collectors.averagingInt(Person::getAge)); System.out.println("平均年龄:"+ avgAge); Long count = Stream.of( new Person("张三", 18), new Person("李四", 21), new Person("王五", 12), new Person("赵六", 88), new Person("张三", 77) ).collect(Collectors.counting()); System.out.println("总条数:"+count); }
5.4 对流中的数据进行分组操作
当我们使用Stream流处理数据后,可以根据某个属性将数据分组
@Test public void test4(){ System.out.println("一般分组================="); Map<String, List<Person>> map = Stream.of( new Person("张三", 18), new Person("李四", 21), new Person("张三", 12), new Person("李四", 88), new Person("张三", 77) ).collect(Collectors.groupingBy(Person::getName)); map.forEach((k,v)->{ System.out.println("k="+k+",v="+v); }); System.out.println("条件分组================="); //根据年龄分组小于18未成年大于18成年 Map<String, List<Person>> map2 = Stream.of( new Person("张三", 18), new Person("李四", 21), new Person("张三", 12), new Person("李四", 88), new Person("张三", 77) ).collect(Collectors.groupingBy((s) -> s.getAge() >= 18 ? "成年" : "未成年")); map2.forEach((k,v)->{ System.out.println("k2="+k+",v2="+v); }); System.out.println("多级分组================="); Map<String, Map<String, List<Person>>> map3 = Stream.of( new Person("张三", 18), new Person("李四", 21), new Person("张三", 12), new Person("李四", 18), new Person("张三", 77) ).collect(Collectors.groupingBy(Person::getName, Collectors.groupingBy((p) -> p.getAge() >= 18 ? "成年" : "未成年"))); map3.forEach((k,v)->{ System.out.println("k="+k); v.forEach((k2,v2)->{ System.out.println("\t子k="+k2+"v="+v2); }); }); }
输出结果
一般分组================= k=李四,v=[Person(name=李四, age=21, height=null), Person(name=李四, age=88, height=null)] k=张三,v=[Person(name=张三, age=18, height=null), Person(name=张三, age=12, height=null), Person(name=张三, age=77, height=null)] 条件分组================= k2=未成年,v2=[Person(name=张三, age=12, height=null)] k2=成年,v2=[Person(name=张三, age=18, height=null), Person(name=李四, age=21, height=null), Person(name=李四, age=88, height=null), Person(name=张三, age=77, height=null)] 多级分组================= k=李四 子k=成年v=[Person(name=李四, age=21, height=null), Person(name=李四, age=18, height=null)] k=张三 子k=未成年v=[Person(name=张三, age=12, height=null)] 子k=成年v=[Person(name=张三, age=18, height=null), Person(name=张三, age=77, height=null)]
5.5 对流中的数据做分区操作
Collectors.partitioningBy会根据值是否为true,把集合中的数据分割为两个列表,一个true列表,一个false列表
@Test public void test5(){ Map<Boolean, List<Person>> collect = Stream.of( new Person("张三", 18), new Person("李四", 21), new Person("张三", 12), new Person("李四", 18), new Person("张三", 77) ).collect(Collectors.partitioningBy(p-> p.getAge() >= 18)); collect.forEach((k,v)->{ System.out.println("k="+k+",v="+v); }); }
输出结果
k=false,v=[Person(name=张三, age=12, height=null)] k=true,v=[Person(name=张三, age=18, height=null), Person(name=李四, age=21, height=null), Person(name=李四, age=18, height=null), Person(name=张三, age=77, height=null)]
5.6 对流中的数据做拼接
Colector.joining会根据指定的连接符,将所有的元素连接成一个字符串
@Test public void test6(){ String collect = Stream.of( new Person("张三", 18), new Person("李四", 21), new Person("张三", 12), new Person("李四", 18), new Person("张三", 77) ).map(Person::getName) .collect(Collectors.joining("")); System.out.println(collect); String collect2 = Stream.of( new Person("张三", 18), new Person("李四", 21), new Person("张三", 12), new Person("李四", 18), new Person("张三", 77) ).map(Person::getName) .collect(Collectors.joining(",","####","!!!!")); System.out.println(collect2); }
输出
张三李四张三李四张三 ####张三,李四,张三,李四,张三!!!!
6. 并行Stream流
6.1 串行Stream流
我们前面使用的Stream流都是穿行的,也就是在一个线程上面执行
@Test public void test01(){ Stream.of(1,2,3,4,5,6).forEach(s->{ System.out.println("线程名:"+Thread.currentThread()+"值:"+s); }); }
输出:
线程名:Thread[main,5,main]值:1 线程名:Thread[main,5,main]值:2 线程名:Thread[main,5,main]值:3 线程名:Thread[main,5,main]值:4 线程名:Thread[main,5,main]值:5 线程名:Thread[main,5,main]值:6
6.2 并行流
parrallelStream是一个并行执行的流,它通过默认的ForkJoinPool,可以提高多线程任务的速度。
获取并行流
我们可以通过两种方式来获取并行流
- 通过List接口的parallelStream方法
- 通过已有串行流的parallel方法转换为并行流
@Test public void test02(){ //1. 通过list的方法 List<Integer> list = Arrays.asList(1,2,3,4,5,6); Stream<Integer> parallelStream1 = list.parallelStream(); //2. 通过流parrlel方法 Stream<Integer> parallelStream2 = Stream.of(1, 2, 3, 4, 5, 6).parallel(); }
并行流操作
@Test public void test03(){ Stream.of(1, 2, 3, 4, 5, 6).parallel() .forEach(s->{ System.out.println("线程:"+Thread.currentThread()+",值:"+s); }); }
输出
线程:Thread[main,5,main],值:4 线程:Thread[main,5,main],值:1 线程:Thread[main,5,main],值:3 线程:Thread[main,5,main],值:5 线程:Thread[ForkJoinPool.commonPool-worker-1,5,main],值:2 线程:Thread[ForkJoinPool.commonPool-worker-2,5,main],值:6
6.3 并行流和串行流对比
通过for循环,串行流,并行流来对5亿个数字求和。来看消耗时间
public class Test03 { private static long time = 500000000l; // private static long time = 500l; private long start; @Before public void before(){ start = new Date().getTime(); } @After public void after(){ long end = new Date().getTime(); System.out.println("消耗时间===="+(end - start)); } /** * for循环 * 消耗时间====202 */ @Test public void test01(){ long sum = 0; for(int i=0;i<time;i++){ sum +=i; } } /** * 串行流 * 消耗时间====303 */ @Test public void test02(){ LongStream.range(1, time).reduce(0,Long::sum); } /** * 串行流 * 消耗时间====132 */ @Test public void test03(){ LongStream.range(1, time).parallel().reduce(0,Long::sum); } }
通过案例可以看到parallelStream的效率是最高的。
Stream并行处理的过程会分而治之,也就是将一个大的任务切分成了多个小任务,这表示每个人物都是一个线程操作
6.4 线程安全问题
在多线程的处理下,肯定会出现数据安全问题。如下:
@Test public void test01(){ List<Integer> list = new ArrayList<>(); for(int i=0;i<1000;i++){ list.add(i); } System.out.println("原集合大小"+list.size()); List<Integer> newList = new ArrayList<>(); for(int i=0;i<list.size();i++){ newList.add(i); } System.out.println("新集合大小"+newList.size()); List<Integer> parallelList = new ArrayList<>(); list.stream().parallel().forEach(parallelList::add); System.out.println("并行集合大小"+parallelList.size()); }
结果
原集合大小1000 新集合大小1000 并行集合大小983
或者
原集合大小1000 新集合大小1000 java.lang.ArrayIndexOutOfBoundsException at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) ... Caused by: java.lang.ArrayIndexOutOfBoundsException: 549
解决方法:
- 加同步锁
- 使用线程安全的容器
- 使用Stream中的toArray/collect操作
/** * 加锁的方式 */ @Test public void test02(){ Object obj = new Object(); List list = new ArrayList(); IntStream.range(0,1000).parallel().forEach(s->{ synchronized (obj){ list.add(s); } }); System.out.println(list.size()); } /** * 使用线程安全的容器 */ @Test public void test03(){ Vector vector = new Vector(); IntStream.range(0,1000).parallel().forEach(s->{ vector.add(s); }); System.out.println(vector.size()); //或者把不安全的容器包装成线程安全的容器 List list = Collections.synchronizedList(new ArrayList<>()); IntStream.range(0,1000).parallel().forEach(s->{ list.add(s); }); System.out.println(list.size()); } /** * 还可以用Stream的toArray方法或者collect方法来操作 */ @Test public void test04(){ List<Integer> collect = IntStream.range(0, 1000) .parallel() .boxed() .collect(Collectors.toList()); System.out.println(collect.size()); }