一.JDK8新特性之Stream流-并行的Stream流以及案例实操
二. 并行的Stream流
2.1 串行的Stream流
我们前面使用的Stream流都是串行,也就是在一个线程上面执行。
/**
* 串行流
*/
@Test
public void test01(){
long count = Stream.of(1, 2, 3, 4, 5, 6)
.filter(s -> {
System.out.println(Thread.currentThread() + "" + s);
return s % 2 == 0;
}).count();
System.out.println(count);
}
2.2 并行流
parallelStream其实就是一个并行执行的流,它通过默认的ForkJoinPool,可以提高多线程任务的速度。
2.2.1 获取并行流俩种方式
我们可以通过两种方式来获取并行流。
- 通过List接口中的parallelStream方法来获取
- 通过已有的串行流转换为并行流(parallel)
/**
* 获取并行流的两种方式
*/
@Test
public void test02(){
List<Integer> list = new ArrayList<>();
// 方式1:通过List接口中的parallelStream方法来获取
Stream<Integer> integerStream = list.parallelStream();
// 方式2:通过已有的串行流转换为并行流(parallel)
Stream<Integer> parallel = Stream.of(1, 3).parallel();
}
2.2.2 并行流操作
/**
* 并行流操作
*/
@Test
public void test03(){
long count = Stream.of(1, 2, 3, 4, 5, 6)
.parallel() // 将流转换为并发流,Stream处理的时候就会通过多线程处理
.filter(s -> {
System.out.println(Thread.currentThread() + "" + s);
return s % 2 == 0;
}).count();
System.out.println(count);
}
2.3 并行流和串行流对比
我们通过for循环,串行Stream流,并行Stream流来对10000000亿个数字求和。来看消耗时间
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.stream.LongStream;
public class Test03 {
private static long times = 300000000;
private long start;
/**
* 程序开始的时候获取当前系统的时间
*/
@Before
public void before(){
start = System.currentTimeMillis();
}
/**
* 程序结束的时候获取此时系统的时间
* 计算消耗的时间
*/
@After
public void end(){
long end = System.currentTimeMillis();
System.out.println("消耗时间:" + (end - start));
}
/**
* 串行流处理:消耗时间:271
*/
@Test
public void test02(){
System.out.println("串行流处理:");
LongStream.rangeClosed(0,times)
.reduce(0,Long::sum);
}
/**
* 并行流处理 消耗时间:158
*/
@Test
public void test03(){
System.out.println("并行流处理:");
LongStream.rangeClosed(0,times)
.parallel()
.reduce(0,Long::sum);
}
}
通过案例我们可以看到parallelStream的效率是最高的。
Stream并行处理的过程会分而治之,也就是将一个大的任务切分成了多个小任务,这表示每个任务都是一个线程操作。
为什么呢?
因为它使用的线程池就是通过分而治之的思想实现的。
2.4 线程安全问题
在多线程的处理下,肯定会出现数据安全问题。如下:
@Test
public void test01(){
List<Integer> list = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
list.add(i);
}
List<Integer> listNew = new ArrayList<>();
// 使用并行流来向集合中添加数据
list.parallelStream()
.forEach(listNew::add);
System.out.println(listNew.size());
}
运行效果:
2.5 解决线程安全问题的三种方案
2.5.1 加同步锁
/**
* 加同步锁
*/
@Test
public void test01(){
List<Integer> listNew = new ArrayList<>();
Object obj = new Object();
IntStream.rangeClosed(1,1000)
.parallel()
.forEach(i->{
synchronized (obj){
listNew.add(i);
}
});
System.out.println(listNew.size());
}
2.5.2 使用线程安全的容器
- 使用线程安全的容器+synchronized
将线程不安全的容器转换为线程安全的容器
/** * 使用线程安全的容器 */ @Test public void test02(){ Vector v = new Vector(); Object obj = new Object(); IntStream.rangeClosed(1,1000) .parallel() .forEach(i->{ synchronized (obj){ v.add(i); } }); System.out.println(v.size()); } /** * 将线程不安全的容器转换为线程安全的容器 */ @Test public void test03(){ List<Integer> listNew = new ArrayList<>(); // 将线程不安全的容器包装为线程安全的容器 List<Integer> synchronizedList = Collections.synchronizedList(listNew); Object obj = new Object(); IntStream.rangeClosed(1,1000) .parallel() .forEach(i->{ synchronizedList.add(i); }); System.out.println(synchronizedList.size()); }
2.5.3 通过Stream中的toArray/collect操作
通过Stream中的 toArray方法或者 collect方法来操作满足线程安全的要求
@Test
public void test05(){
List<Integer> listNew = new ArrayList<>();
Object obj = new Object();
List<Integer> list = IntStream.rangeClosed(1, 1000)
.parallel()
.boxed()
.collect(Collectors.toList());
System.out.println(list.size());
}