重学JDK8新特性之Stream(下)

简介: 重学JDK8新特性之Stream

重学JDK8新特性之Stream(上):https://developer.aliyun.com/article/1413212


Stream结果收集


结果收集到集合

List<String> list = Stream.of("aa", "bb", "cc","aa")
  .collect(Collectors.toList());
System.out.println(list);
// 收集到 Set集合中
Set<String> set = Stream.of("aa", "bb", "cc", "aa")
  .collect(Collectors.toSet());
System.out.println(set);
// 如果需要获取的类型为具体的实现,比如:ArrayList HashSet
ArrayList<String> arrayList = Stream.of("aa", "bb", "cc", "aa")
//.collect(Collectors.toCollection(() -> new ArrayList<>()));
  .collect(Collectors.toCollection(ArrayList::new));
System.out.println(arrayList);
HashSet<String> hashSet = Stream.of("aa", "bb", "cc", "aa")
  .collect(Collectors.toCollection(HashSet::new));
System.out.println(hashSet);

输出:

[aa, bb, cc, aa]
[aa, bb, cc]
[aa, bb, cc, aa]
[aa, bb, cc]


结果收集到数组中


Object[] objects = Stream.of("aa", "bb", "cc", "aa")
  .toArray(); // 返回的数组中的元素是 Object类型
System.out.println(Arrays.toString(objects));
// 如果我们需要指定返回的数组中的元素类型
String[] strings = Stream.of("aa", "bb", "cc", "aa")
  .toArray(String[]::new);
System.out.println(Arrays.toString(strings));


对流中数据做聚合计算


当我们使用Stream流处理数据后,可以像数据库的聚合函数一样对某个字段进行操作,比如获得最大

值,最小值,求和,平均值,统计数量。

Optional<Person> maxAge = Stream.of(
    new Person("张三", 18)
    , new Person("李四", 22)
    , new Person("张三", 13)
    , new Person("王五", 15)
    , new Person("张三", 19)
).collect(Collectors.maxBy((p1, p2) -> p1.getAge() - p2.getAge()));
System.out.println("最大年龄:" + maxAge.get());
// 获取年龄的最小值
Optional<Person> minAge = Stream.of(
    new Person("张三", 18)
    , new Person("李四", 22)
    , new Person("张三", 13)
    , new Person("王五", 15)
    , new Person("张三", 19)
).collect(Collectors.minBy((p1, p2) -> p1.getAge() - p2.getAge()));
System.out.println("最新年龄:" + minAge.get());
// 求所有人的年龄之和
Integer sumAge = Stream.of(
    new Person("张三", 18)
    , new Person("李四", 22)
    , new Person("张三", 13)
    , new Person("王五", 15)
    , new Person("张三", 19)
)
//.collect(Collectors.summingInt(s -> s.getAge()))
.collect(Collectors.summingInt(Person::getAge))
;
System.out.println("年龄总和:" + sumAge);
// 年龄的平均值
Double avgAge = Stream.of(
    new Person("张三", 18)
    , new Person("李四", 22)
    , new Person("张三", 13)
    , new Person("王五", 15)
    , new Person("张三", 19)
).collect(Collectors.averagingInt(Person::getAge));
System.out.println("年龄的平均值:" + avgAge);
// 统计数量
Long count = Stream.of(
    new Person("张三", 18)
    , new Person("李四", 22)
    , new Person("张三", 13)
    , new Person("王五", 15)
    , new Person("张三", 19)
).filter(p->p.getAge() > 18)
.collect(Collectors.counting());
System.out.println("满足条件的记录数:" + count);


对流中数据做分组操作


// 根据账号对数据进行分组
Map<String, List<Person>> map1 = Stream.of(
    new Person("张三", 18, 175)
    , new Person("李四", 22, 177)
    , new Person("张三", 14, 165)
    , new Person("李四", 15, 166)
    , new Person("张三", 19, 182)
).collect(Collectors.groupingBy(Person::getName));
map1.forEach((k,v)-> System.out.println("k=" + k +"\t"+ "v=" + v));
System.out.println("-----------");
// 根据年龄分组 如果大于等于18 成年否则未成年
Map<String, List<Person>> map2 = Stream.of(
    new Person("张三", 18, 175)
    , new Person("李四", 22, 177)
    , new Person("张三", 14, 165)
    , new Person("李四", 15, 166)
    , new Person("张三", 19, 182)
).collect(Collectors.groupingBy(p -> p.getAge() >= 18 ? "成年" : "未成
年"));
map2.forEach((k,v)-> System.out.println("k=" + k +"\t"+ "v=" + v));

结果:

k=李四 v=[Person{name='李四', age=22, height=177}, Person{name='李四', age=15,
height=166}]
k=张三 v=[Person{name='张三', age=18, height=175}, Person{name='张三', age=14,
height=165}, Person{name='张三', age=19, height=182}]
-----------
k=未成年 v=[Person{name='张三', age=14, height=165}, Person{name='李四', age=15,
height=166}]
k=成年 v=[Person{name='张三', age=18, height=175}, Person{name='李四', age=22,
height=177}, Person{name='张三', age=19, height=182}]


对流中的数据做分区操作


Map<Boolean, List<Person>> map = Stream.of(
    new Person("张三", 18, 175)
    , new Person("李四", 22, 177)
    , new Person("张三", 14, 165)
    , new Person("李四", 15, 166)
    , new Person("张三", 19, 182)
).collect(Collectors.partitioningBy(p -> p.getAge() > 18));
map.forEach((k,v)-> System.out.println(k+"\t" + v));

结果:

false [Person{name='张三', age=18, height=175}, Person{name='张三', age=14,
height=165}, Person{name='李四', age=15, height=166}]
true [Person{name='李四', age=22, height=177}, Person{name='张三', age=19,
height=182}]


对流中的数据做拼接


String s1 = Stream.of(
    new Person("张三", 18, 175)
    , new Person("李四", 22, 177)
    , new Person("张三", 14, 165)
    , new Person("李四", 15, 166)
    , new Person("张三", 19, 182)
).map(Person::getName)
.collect(Collectors.joining());
// 张三李四张三李四张三
System.out.println(s1);
String s2 = Stream.of(
    new Person("张三", 18, 175)
    , new Person("李四", 22, 177)
    , new Person("张三", 14, 165)
    , new Person("李四", 15, 166)
    , new Person("张三", 19, 182)
).map(Person::getName)
.collect(Collectors.joining("_"));
// 张三_李四_张三_李四_张三
System.out.println(s2);


并行的Stream流


串行Stream流


前面使用的Stream流都是串行,也就是在一个线程上面执行。

Stream.of(5,6,8,3,1,6)
    .filter(s->{
    System.out.println(Thread.currentThread() + "" + s);
    return s > 3;
  }).count();

输出:

Thread[main,5,main]5
Thread[main,5,main]6
Thread[main,5,main]8
Thread[main,5,main]3
Thread[main,5,main]1
Thread[main,5,main]6


并行流


parallelStream其实就是一个并行执行的流,它通过默认的ForkJoinPool,可以提高多线程任务的速度。


我们可以通过两种方式来获取并行流。


  1. 通过List接口中的parallelStream方法来获取
  2. 通过已有的串行流转换为并行流(parallel)
List<Integer> list = new ArrayList<>();
  // 通过List 接口 直接获取并行流
  Stream<Integer> integerStream = list.parallelStream();
  // 将已有的串行流转换为并行流
  Stream<Integer> parallel = Stream.of(1, 2, 3).parallel();

并行操作

Stream.of(1,4,2,6,1,5,9)
    .parallel() // 将流转换为并发流,Stream处理的时候就会通过多线程处理
    .filter(s->{
    System.out.println(Thread.currentThread() + " s=" +s);
    return s > 2;
  }).count();

输出:

Thread[main,5,main] s=1
Thread[ForkJoinPool.commonPool-worker-2,5,main] s=9
Thread[ForkJoinPool.commonPool-worker-6,5,main] s=6
Thread[ForkJoinPool.commonPool-worker-13,5,main] s=2
Thread[ForkJoinPool.commonPool-worker-9,5,main] s=4
Thread[ForkJoinPool.commonPool-worker-4,5,main] s=5
Thread[ForkJoinPool.commonPool-worker-11,5,main] s=1


并行流和串行流对比


/**
* 普通for循环 消耗时间:138
*/
@Test
public void test01(){
  System.out.println("普通for循环:");
  long res = 0;
  for (int i = 0; i < times; i++) {
    res += i;
  }
}
/**
* 串行流处理
* 消耗时间:203
*/
@Test
public void test02(){
  System.out.println("串行流:serialStream");
  LongStream.rangeClosed(0,times)
    .reduce(0,Long::sum);
}
/**
* 并行流处理 消耗时间:84
*/
@Test
public void test03(){
  LongStream.rangeClosed(0,times)
  .parallel()
  .reduce(0,Long::sum);
}

通过案例我们可以看到parallelStream的效率是最高的。


Stream并行处理的过程会分而治之,也就是将一个大的任务切分成了多个小任务,这表示每个任务都是一个线程操作。


线程安全问题


在多线程的处理下,肯定会出现数据安全问题。如下:

List<Integer> list = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
  list.add(i);
}
System.out.println(list.size());
List<Integer> listNew = new ArrayList<>();
// 使用并行流来向集合中添加数据
list.parallelStream()
    //.forEach(s->listNew.add(s));
    .forEach(listNew::add);
System.out.println(listNew.size());

实际上有可能出现线程安全问题:

java.lang.ArrayIndexOutOfBoundsException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorI
mpl.java:62)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorA
ccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at
java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:598)
....
Caused by: java.lang.ArrayIndexOutOfBoundsException: 366
at java.util.ArrayList.add(ArrayList.java:463)

在并行流中往列表(List)集合进行添加元素可能会遇到线程安全问题。当多个线程同时执行添加操作时,由于并行流的特性,多个线程会同时访问和修改同一个列表对象,这可能导致以下问题:


  1. 竞态条件:由于多个线程同时修改列表,可能导致数据不一致或意外的结果。例如,如果两个线程同时向同一个索引位置添加元素,可能会导致其中一个元素被覆盖或丢失。
  2. 并发修改异常:Java 的 ArrayList 类并不是线程安全的,当多个线程同时修改列表时,可能会引发 ConcurrentModificationException 异常。


针对这个问题,我们的解决方案有哪些呢?


  1. 加同步锁
  2. 使用线程安全的容器
  3. 通过Stream中的toArray/collect操作


实现:

/**
* 加同步锁
*/
List<Integer> listNew = new ArrayList<>();
Object obj = new Object();
IntStream.rangeClosed(1,1000)
    .parallel()
    .forEach(i->{
      synchronized (obj){
        listNew.add(i);
      }
    });
System.out.println(listNew.size());
/**
* 使用线程安全的容器
*/
Vector v = new Vector();
Object obj = new Object();
IntStream.rangeClosed(1,1000)
    .parallel()
    .forEach(i->{
      synchronized (obj){
        v.add(i);
      }
    });
System.out.println(v.size());
/**
* 将线程不安全的容器转换为线程安全的容器
*/
List<Integer> listNew = new ArrayList<>();
// 将线程不安全的容器包装为线程安全的容器
List<Integer> synchronizedList = Collections.synchronizedList(listNew);
Object obj = new Object();
IntStream.rangeClosed(1,1000)
    .parallel()
    .forEach(i->{
      synchronizedList.add(i);
    });
System.out.println(synchronizedList.size());


目录
相关文章
|
23天前
|
容器
jdk8新特性-详情查看文档
jdk8新特性-详情查看文档
41 7
|
3月前
|
容器
jdk8新特性-详情查看文档
jdk8新特性-详情查看文档
49 3
|
2月前
|
存储 安全 Java
JDK1.8 新的特性
JDK1.8 新的特性
30 0
|
3月前
|
编解码 安全 Java
jdk8新特性-接口和日期处理
jdk8新特性-接口和日期处理
|
4月前
|
API
JDK8的stream有求和方法吗?
【8月更文挑战第20天】JDK8的stream有求和方法吗?
154 3
|
4月前
|
Java API
JDK8到JDK25版本升级的新特性问题之使用Collectors.teeing()来计算一个列表中学生的平均分和总分如何操作
JDK8到JDK25版本升级的新特性问题之使用Collectors.teeing()来计算一个列表中学生的平均分和总分如何操作
|
4月前
|
Java API Apache
JDK8到JDK24版本升级的新特性问题之在Java中,HttpURLConnection有什么局限性,如何解决
JDK8到JDK24版本升级的新特性问题之在Java中,HttpURLConnection有什么局限性,如何解决
|
4月前
|
Oracle Java 关系型数据库
JDK8到JDK29版本升级的新特性问题之未来JDK的升级是否会成为必然趋势,如何理解
JDK8到JDK29版本升级的新特性问题之未来JDK的升级是否会成为必然趋势,如何理解
|
4月前
|
Oracle 安全 Java
JDK8到JDK28版本升级的新特性问题之在Java 15及以后的版本中,密封类和密封接口是怎么工作的
JDK8到JDK28版本升级的新特性问题之在Java 15及以后的版本中,密封类和密封接口是怎么工作的
|
4月前
|
Java API 开发者
JDK8到JDK17版本升级的新特性问题之SpringBoot选择JDK17作为最小支持的Java lts版本意味着什么
JDK8到JDK17版本升级的新特性问题之SpringBoot选择JDK17作为最小支持的Java lts版本意味着什么
155 0
JDK8到JDK17版本升级的新特性问题之SpringBoot选择JDK17作为最小支持的Java lts版本意味着什么