9. stream中Collectors的用法
在java stream中,我们通常需要将处理后的stream转换成集合类,这个时候就需要用到stream.collect方法。collect方法需要传入一个Collector类型,要实现Collector还是很麻烦的,需要实现好几个接口。
于是java提供了更简单的Collectors工具类来方便我们构建Collector。
下面我们将会具体讲解Collectors的用法。
假如我们有这样两个list:
List<String> list = Arrays.asList("jack", "bob", "alice", "mark"); List<String> duplicateList = Arrays.asList("jack", "jack", "alice", "mark");
上面一个是无重复的list,一个是带重复数据的list。接下来的例子我们会用上面的两个list来讲解Collectors的用法。
9.1 Collectors.toList()
List<String> listResult = list.stream().collect(Collectors.toList()); log.info("{}",listResult);
将stream转换为list。这里转换的list是ArrayList,如果想要转换成特定的list,需要使用toCollection方法。
9.2 Collectors.toSet()
Set<String> setResult = list.stream().collect(Collectors.toSet()); log.info("{}",setResult);
toSet将Stream转换成为set。这里转换的是HashSet。如果需要特别指定set,那么需要使用toCollection方法。
因为set中是没有重复的元素,如果我们使用duplicateList来转换的话,会发现最终结果中只有一个jack。
Set<String> duplicateSetResult = duplicateList.stream().collect(Collectors.toSet()); log.info("{}",duplicateSetResult);
9.3 Collectors.toCollection()
上面的toMap,toSet转换出来的都是特定的类型,如果我们需要自定义,则可以使用toCollection()
List<String> custListResult = list.stream().collect(Collectors.toCollection(LinkedList::new)); log.info("{}",custListResult);
上面的例子,我们转换成了LinkedList。
9.4 Collectors.toMap()
toMap接收两个参数,第一个参数是keyMapper,第二个参数是valueMapper:
Map<String, Integer> mapResult = list.stream() .collect(Collectors.toMap(Function.identity(), String::length)); log.info("{}",mapResult);
如果stream中有重复的值,则转换会报IllegalStateException异常:
Map<String, Integer> duplicateMapResult = duplicateList.stream() .collect(Collectors.toMap(Function.identity(), String::length));
怎么解决这个问题呢?我们可以这样:
Map<String, Integer> duplicateMapResult2 = duplicateList.stream() .collect(Collectors.toMap(Function.identity(), String::length, (item, identicalItem) -> item)); log.info("{}",duplicateMapResult2);
在toMap中添加第三个参数mergeFunction,来解决冲突的问题。
9.5 Collectors.collectingAndThen()
collectingAndThen允许我们对生成的集合再做一次操作。
List<String> collectAndThenResult = list.stream() .collect(Collectors.collectingAndThen(Collectors.toList(), l -> {return new ArrayList<>(l);})); log.info("{}",collectAndThenResult);
9.6 Collectors.joining()
Joining用来连接stream中的元素:
String joinResult = list.stream().collect(Collectors.joining()); log.info("{}",joinResult); String joinResult1 = list.stream().collect(Collectors.joining(" ")); log.info("{}",joinResult1); String joinResult2 = list.stream().collect(Collectors.joining(" ", "prefix","suffix")); log.info("{}",joinResult2);
可以不带参数,也可以带一个参数,也可以带三个参数,根据我们的需要进行选择。
9.7 Collectors.counting()
counting主要用来统计stream中元素的个数:
Long countResult = list.stream().collect(Collectors.counting()); log.info("{}",countResult);
9.8 Collectors.summarizingDouble/Long/Int()
SummarizingDouble/Long/Int为stream中的元素生成了统计信息,返回的结果是一个统计类:
IntSummaryStatistics intResult = list.stream() .collect(Collectors.summarizingInt(String::length)); log.info("{}",intResult);
输出结果:
22:22:35.238 [main] INFO com.flydean.CollectorUsage - IntSummaryStatistics{count=4, sum=16, min=3, average=4.000000, max=5}
9.9 Collectors.averagingDouble/Long/Int()
averagingDouble/Long/Int()对stream中的元素做平均:
Double averageResult = list.stream().collect(Collectors.averagingInt(String::length)); log.info("{}",averageResult);
9.10 Collectors.summingDouble/Long/Int()
summingDouble/Long/Int()对stream中的元素做sum操作:
Double summingResult = list.stream().collect(Collectors.summingDouble(String::length)); log.info("{}",summingResult);
9.11 Collectors.maxBy()/minBy()
maxBy()/minBy()根据提供的Comparator,返回stream中的最大或者最小值:
Optional<String> maxByResult = list.stream().collect(Collectors.maxBy(Comparator.naturalOrder())); log.info("{}",maxByResult);
9.12 Collectors.groupingBy()
GroupingBy根据某些属性进行分组,并返回一个Map:
Map<Integer, Set<String>> groupByResult = list.stream() .collect(Collectors.groupingBy(String::length, Collectors.toSet())); log.info("{}",groupByResult);
9.13 Collectors.partitioningBy()
PartitioningBy是一个特别的groupingBy,PartitioningBy返回一个Map,这个Map是以boolean值为key,从而将stream分成两部分,一部分是匹配PartitioningBy条件的,一部分是不满足条件的:
Map<Boolean, List<String>> partitionResult = list.stream() .collect(Collectors.partitioningBy(s -> s.length() > 3)); log.info("{}",partitionResult);
看下运行结果:
22:39:37.082 [main] INFO com.flydean.CollectorUsage - {false=[bob], true=[jack, alice, mark]}
结果被分成了两部分。
10. 创建一个自定义的collector
在之前的java collectors文章里面,我们讲到了stream的collect方法可以调用Collectors里面的toList()或者toMap()方法,将结果转换为特定的集合类。
今天我们介绍一下怎么自定义一个Collector。
10.1 Collector介绍
我们先看一下Collector的定义:
Collector接口需要实现supplier(),accumulator(),combiner(),finisher(),characteristics()这5个接口。
同时Collector也提供了两个静态of方法来方便我们创建一个Collector实例。
我们可以看到两个方法的参数跟Collector接口需要实现的接口是一一对应的。
下面分别解释一下这几个参数:
- supplier
Supplier是一个函数,用来创建一个新的可变的集合。换句话说Supplier用来创建一个初始的集合。
- accumulator
accumulator定义了累加器,用来将原始元素添加到集合中。
- combiner
combiner用来将两个集合合并成一个。
- finisher
finisher将集合转换为最终的集合类型。
- characteristics
characteristics表示该集合的特征。这个不是必须的参数。
Collector定义了三个参数类型,T是输入元素的类型,A是reduction operation的累加类型也就是Supplier的初始类型,R是最终的返回类型。 我们画个图来看一下这些类型之间的转换关系:
有了这几个参数,我们接下来看看怎么使用这些参数来构造一个自定义Collector。
10.2 自定义Collector
我们利用Collector的of方法来创建一个不变的Set:
public static <T> Collector<T, Set<T>, Set<T>> toImmutableSet() { return Collector.of(HashSet::new, Set::add, (left, right) -> { left.addAll(right); return left; }, Collections::unmodifiableSet); }
上面的例子中,我们HashSet::new作为supplier,Set::add作为accumulator,自定义了一个方法作为combiner,最后使用Collections::unmodifiableSet将集合转换成不可变集合。
上面我们固定使用HashSet::new作为初始集合的生成方法,实际上,上面的方法可以更加通用:
public static <T, A extends Set<T>> Collector<T, A, Set<T>> toImmutableSet( Supplier<A> supplier) { return Collector.of( supplier, Set::add, (left, right) -> { left.addAll(right); return left; }, Collections::unmodifiableSet); }
上面的方法,我们将supplier提出来作为一个参数,由外部来定义。
看下上面两个方法的测试:
@Test public void toImmutableSetUsage(){ Set<String> stringSet1=Stream.of("a","b","c","d") .collect(ImmutableSetCollector.toImmutableSet()); log.info("{}",stringSet1); Set<String> stringSet2=Stream.of("a","b","c","d") .collect(ImmutableSetCollector.toImmutableSet(LinkedHashSet::new)); log.info("{}",stringSet2); }
输出:
INFO com.flydean.ImmutableSetCollector - [a, b, c, d] INFO com.flydean.ImmutableSetCollector - [a, b, c, d]
11. stream reduce详解和误区
Stream API提供了一些预定义的reduce操作,比如count(), max(), min(), sum()等。如果我们需要自己写reduce的逻辑,则可以使用reduce方法。
本文将会详细分析一下reduce方法的使用,并给出具体的例子。
11.1 reduce详解
Stream类中有三种reduce,分别接受1个参数,2个参数,和3个参数,首先来看一个参数的情况:
Optional<T> reduce(BinaryOperator<T> accumulator);
该方法接受一个BinaryOperator参数,BinaryOperator是一个@FunctionalInterface,需要实现方法:
R apply(T t, U u);
accumulator告诉reduce方法怎么去累计stream中的数据。
举个例子:
List<Integer> intList = Arrays.asList(1,2,3); Optional<Integer> result1=intList.stream().reduce(Integer::sum); log.info("{}",result1);
上面的例子输出结果:
com.flydean.ReduceUsage - Optional[6]
一个参数的例子很简单。这里不再多说。
接下来我们再看一下两个参数的例子:
T reduce(T identity, BinaryOperator<T> accumulator);
这个方法接收两个参数:identity和accumulator。多出了一个参数identity。
也许在有些文章里面有人告诉你identity是reduce的初始化值,可以随便指定,如下所示:
Integer result2=intList.stream().reduce(100, Integer::sum); log.info("{}",result2);
上面的例子,我们计算的值是106。
如果我们将stream改成parallelStream:
Integer result3=intList.parallelStream().reduce(100, Integer::sum); log.info("{}",result3);
得出的结果就是306。
为什么是306呢?因为在并行计算的时候,每个线程的初始累加值都是100,最后3个线程加出来的结果就是306。
并行计算和非并行计算的结果居然不一样,这肯定不是JDK的问题,我们再看一下JDK中对identity的说明:
identity必须是accumulator函数的一个identity,也就是说必须满足:对于所有的t,都必须满足 accumulator.apply(identity, t) == t
所以这里我们传入100是不对的,因为sum(100+1)!= 1。
这里sum方法的identity只能是0。
如果我们用0作为identity,则stream和parallelStream计算出的结果是一样的。这就是identity的真正意图。
下面再看一下三个参数的方法:
<U> U reduce(U identity, BiFunction<U, ? super T, U> accumulator, BinaryOperator<U> combiner);
和前面的方法不同的是,多了一个combiner,这个combiner用来合并多线程计算的结果。
同样的,identity需要满足combiner.apply(u, accumulator.apply(identity, t)) == accumulator.apply(u, t)
大家可能注意到了为什么accumulator的类型是BiFunction而combiner的类型是BinaryOperator?
public interface BinaryOperator<T> extends BiFunction<T,T,T>
BinaryOperator是BiFunction的子接口。BiFunction中定义了要实现的apply方法。
其实reduce底层方法的实现只用到了apply方法,并没有用到接口中其他的方法,所以我猜测这里的不同只是为了简单的区分。
虽然reduce是一个很常用的方法,但是大家一定要遵循identity的规范,并不是所有的identity都是合适的。
12. stream中的Spliterator
Spliterator是在java 8引入的一个接口,它通常和stream一起使用,用来遍历和分割序列。
只要用到stream的地方都需要Spliterator,比如List,Collection,IO channel等等。
我们先看一下Collection中stream方法的定义:
default Stream<E> stream() { return StreamSupport.stream(spliterator(), false); }
default Stream<E> parallelStream() { return StreamSupport.stream(spliterator(), true); }
我们可以看到,不管是并行stream还是非并行stream,都是通过StreamSupport来构造的,并且都需要传入一个spliterator的参数。
好了,我们知道了spliterator是做什么的之后,看一下它的具体结构:
spliterator有四个必须实现的方法,我们接下来进行详细的讲解。
12.1 tryAdvance
tryAdvance就是对stream中的元素进行处理的方法,如果元素存在,则对他进行处理,并返回true,否则返回false。
如果我们不想处理stream后续的元素,则在tryAdvance中返回false即可,利用这个特征,我们可以中断stream的处理。这个例子我将会在后面的文章中讲到。
12.2 trySplit
trySplit尝试对现有的stream进行分拆,一般用在parallelStream的情况,因为在并发stream下,我们需要用多线程去处理stream的不同元素,trySplit就是对stream中元素进行分拆处理的方法。
理想情况下trySplit应该将stream拆分成数目相同的两部分才能最大提升性能。
12.3 estimateSize
estimateSize表示Spliterator中待处理的元素,在trySplit之前和之后一般是不同的,后面我们会在具体的例子中说明。
12.4 characteristics
characteristics表示这个Spliterator的特征,Spliterator有8大特征:
public static final int ORDERED = 0x00000010;//表示元素是有序的(每一次遍历结果相同) public static final int DISTINCT = 0x00000001;//表示元素不重复 public static final int SORTED = 0x00000004;//表示元素是按一定规律进行排列(有指定比较器) public static final int SIZED = 0x00000040;// 表示大小是固定的 public static final int NONNULL = 0x00000100;//表示没有null元素 public static final int IMMUTABLE = 0x00000400;//表示元素不可变 public static final int CONCURRENT = 0x00001000;//表示迭代器可以多线程操作 public static final int SUBSIZED = 0x00004000;//表示子Spliterators都具有SIZED特性
一个Spliterator可以有多个特征,多个特征进行or运算,最后得到最终的
characteristics。
12.5 举个例子
上面我们讨论了Spliterator一些关键方法,现在我们举一个具体的例子:
@AllArgsConstructor @Data public class CustBook { private String name; }
先定义一个CustBook类,里面放一个name变量。
定义一个方法,来生成一个CustBook的list:
public static List<CustBook> generateElements() { return Stream.generate(() -> new CustBook("cust book")) .limit(1000) .collect(Collectors.toList()); }
我们定义一个call方法,在call方法中调用了tryAdvance方法,传入了我们自定义的处理方法。这里我们修改book的name,并附加额外的信息。
public String call(Spliterator<CustBook> spliterator) { int current = 0; while (spliterator.tryAdvance(a -> a.setName("test name" .concat("- add new name")))) { current++; } return Thread.currentThread().getName() + ":" + current; }
最后,写一下测试方法:
@Test public void useTrySplit(){ Spliterator<CustBook> split1 = SpliteratorUsage.generateElements().spliterator(); Spliterator<CustBook> split2 = split1.trySplit(); log.info("before tryAdvance: {}",split1.estimateSize()); log.info("Characteristics {}",split1.characteristics()); log.info(call(split1)); log.info(call(split2)); log.info("after tryAdvance {}",split1.estimateSize()); }
运行的结果如下:
23:10:08.852 [main] INFO com.flydean.SpliteratorUsage - before tryAdvance: 500 23:10:08.857 [main] INFO com.flydean.SpliteratorUsage - Characteristics 16464 23:10:08.858 [main] INFO com.flydean.SpliteratorUsage - main:500 23:10:08.858 [main] INFO com.flydean.SpliteratorUsage - main:500 23:10:08.858 [main] INFO com.flydean.SpliteratorUsage - after tryAdvance 0
List总共有1000条数据,调用一次trySplit之后,将List分成了两部分,每部分500条数据。
注意,在tryAdvance调用之后,estimateSize变为0,表示所有的元素都已经被处理完毕。
再看一下这个Characteristics=16464,转换为16进制:Ox4050 = ORDERED or SIZED or SUBSIZED 这三个的或运算。
这也是ArrayList的基本特征。
13. break stream的foreach
我们通常需要在java stream中遍历处理里面的数据,其中foreach是最最常用的方法。
但是有时候我们并不想处理完所有的数据,或者有时候Stream可能非常的长,或者根本就是无限的。
一种方法是先filter出我们需要处理的数据,然后再foreach遍历。
那么我们如何直接break这个stream呢?今天本文重点讲解一下这个问题。
13.1 使用Spliterator
上篇文章我们在讲Spliterator的时候提到了,在tryAdvance方法中,如果返回false,则Spliterator将会停止处理后续的元素。
通过这个思路,我们可以创建自定义Spliterator。
假如我们有这样一个stream:
Stream<Integer> ints = Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
我们想定义一个操作,当x > 5的时候就停止。
我们定义一个通用的Spliterator:
public class CustomSpliterator<T> extends Spliterators.AbstractSpliterator<T> { private Spliterator<T> splitr; private Predicate<T> predicate; private volatile boolean isMatched = true; public CustomSpliterator(Spliterator<T> splitr, Predicate<T> predicate) { super(splitr.estimateSize(), 0); this.splitr = splitr; this.predicate = predicate; } @Override public synchronized boolean tryAdvance(Consumer<? super T> consumer) { boolean hadNext = splitr.tryAdvance(elem -> { if (predicate.test(elem) && isMatched) { consumer.accept(elem); } else { isMatched = false; } }); return hadNext && isMatched; } }
在上面的类中,predicate是我们将要传入的判断条件,我们重写了tryAdvance,通过将predicate.test(elem)加入判断条件,从而当条件不满足的时候返回false.
看下怎么使用:
@Slf4j public class CustomSpliteratorUsage { public static <T> Stream<T> takeWhile(Stream<T> stream, Predicate<T> predicate) { CustomSpliterator<T> customSpliterator = new CustomSpliterator<>(stream.spliterator(), predicate); return StreamSupport.stream(customSpliterator, false); } public static void main(String[] args) { Stream<Integer> ints = Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); List<Integer> result = takeWhile(ints, x -> x < 5 ) .collect(Collectors.toList()); log.info(result.toString()); } }
我们定义了一个takeWhile方法,接收Stream和predicate条件。
只有当predicate条件满足的时候才会继续,我们看下输出的结果:
[main] INFO com.flydean.CustomSpliteratorUsage - [1, 2, 3, 4]
13.2 自定义forEach方法
除了使用Spliterator,我们还可以自定义forEach方法来使用自己的遍历逻辑:
public class CustomForEach { public static class Breaker { private volatile boolean shouldBreak = false; public void stop() { shouldBreak = true; } boolean get() { return shouldBreak; } } public static <T> void forEach(Stream<T> stream, BiConsumer<T, Breaker> consumer) { Spliterator<T> spliterator = stream.spliterator(); boolean hadNext = true; Breaker breaker = new Breaker(); while (hadNext && !breaker.get()) { hadNext = spliterator.tryAdvance(elem -> { consumer.accept(elem, breaker); }); } } }
上面的例子中,我们在forEach中引入了一个外部变量,通过判断这个外部变量来决定是否进入spliterator.tryAdvance方法。
看下怎么使用:
@Slf4j public class CustomForEachUsage { public static void main(String[] args) { Stream<Integer> ints = Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); List<Integer> result = new ArrayList<>(); CustomForEach.forEach(ints, (elem, breaker) -> { if (elem >= 5 ) { breaker.stop(); } else { result.add(elem); } }); log.info(result.toString()); } }
上面我们用新的forEach方法,并通过判断条件来重置判断flag,从而达到break stream的目的。
14. predicate chain的使用
Predicate是一个FunctionalInterface,代表的方法需要输入一个参数,返回boolean类型。通常用在stream的filter中,表示是否满足过滤条件。
boolean test(T t);
14.1 基本使用
我们先看下在stream的filter中怎么使用Predicate:
@Test public void basicUsage(){ List<String> stringList=Stream.of("a","b","c","d").filter(s -> s.startsWith("a")).collect(Collectors.toList()); log.info("{}",stringList); }
上面的例子很基础了,这里就不多讲了。
14.2 使用多个Filter
如果我们有多个Predicate条件,则可以使用多个filter来进行过滤:
public void multipleFilters(){ List<String> stringList=Stream.of("a","ab","aac","ad").filter(s -> s.startsWith("a")) .filter(s -> s.length()>1) .collect(Collectors.toList()); log.info("{}",stringList); }
上面的例子中,我们又添加了一个filter,在filter又添加了一个Predicate。
14.3 使用复合Predicate
Predicate的定义是输入一个参数,返回boolean值,那么如果有多个测试条件,我们可以将其合并成一个test方法:
@Test public void complexPredicate(){ List<String> stringList=Stream.of("a","ab","aac","ad") .filter(s -> s.startsWith("a") && s.length()>1) .collect(Collectors.toList()); log.info("{}",stringList); }
上面的例子中,我们把s.startsWith("a") && s.length()>1 作为test的实现。
14.4 组合Predicate
Predicate虽然是一个interface,但是它有几个默认的方法可以用来实现Predicate之间的组合操作。
比如:Predicate.and(), Predicate.or(), 和 Predicate.negate()。
下面看下他们的例子:
@Test public void combiningPredicate(){ Predicate<String> predicate1 = s -> s.startsWith("a"); Predicate<String> predicate2 = s -> s.length() > 1; List<String> stringList1 = Stream.of("a","ab","aac","ad") .filter(predicate1.and(predicate2)) .collect(Collectors.toList()); log.info("{}",stringList1); List<String> stringList2 = Stream.of("a","ab","aac","ad") .filter(predicate1.or(predicate2)) .collect(Collectors.toList()); log.info("{}",stringList2); List<String> stringList3 = Stream.of("a","ab","aac","ad") .filter(predicate1.or(predicate2.negate())) .collect(Collectors.toList()); log.info("{}",stringList3); }
实际上,我们并不需要显示的assign一个predicate,只要是满足
predicate接口的lambda表达式都可以看做是一个predicate。同样可以调用and,or和negate操作:
List<String> stringList4 = Stream.of("a","ab","aac","ad") .filter(((Predicate<String>)a -> a.startsWith("a")) .and(a -> a.length() > 1)) .collect(Collectors.toList()); log.info("{}",stringList4);
14.5 Predicate的集合操作
如果我们有一个Predicate集合,我们可以使用reduce方法来对其进行合并运算:
@Test public void combiningPredicateCollection(){ List<Predicate<String>> allPredicates = new ArrayList<>(); allPredicates.add(a -> a.startsWith("a")); allPredicates.add(a -> a.length() > 1); List<String> stringList = Stream.of("a","ab","aac","ad") .filter(allPredicates.stream().reduce(x->true, Predicate::and)) .collect(Collectors.toList()); log.info("{}",stringList); }
上面的例子中,我们调用reduce方法,对集合中的Predicate进行了and操作。
15. 中构建无限的stream
在java中,我们可以将特定的集合转换成为stream,那么在有些情况下,比如测试环境中,我们需要构造一定数量元素的stream,需要怎么处理呢?
这里我们可以构建一个无限的stream,然后调用limit方法来限定返回的数目。
15.1 基本使用
先看一个使用Stream.iterate来创建无限Stream的例子:
@Test public void infiniteStream(){ Stream<Integer> infiniteStream = Stream.iterate(0, i -> i + 1); List<Integer> collect = infiniteStream .limit(10) .collect(Collectors.toList()); log.info("{}",collect); }
上面的例子中,我们通过调用Stream.iterate方法,创建了一个0,1,2,3,4....的无限stream。
然后调用limit(10)来获取其中的前10个。最后调用collect方法将其转换成为一个集合。
看下输出结果:
INFO com.flydean.InfiniteStreamUsage - [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
15.2 自定义类型
如果我们想输出自定义类型的集合,该怎么处理呢?
首先,我们定义一个自定义类型:
@Data @AllArgsConstructor public class IntegerWrapper { private Integer integer; }
然后利用Stream.generate的生成器来创建这个自定义类型:
public static IntegerWrapper generateCustType(){ return new IntegerWrapper(new Random().nextInt(100)); } @Test public void infiniteCustType(){ Supplier<IntegerWrapper> randomCustTypeSupplier = InfiniteStreamUsage::generateCustType; Stream<IntegerWrapper> infiniteStreamOfCustType = Stream.generate(randomCustTypeSupplier); List<IntegerWrapper> collect = infiniteStreamOfCustType .skip(10) .limit(10) .collect(Collectors.toList()); log.info("{}",collect); }
看下输出结果:
INFO com.flydean.InfiniteStreamUsage - [IntegerWrapper(integer=46), IntegerWrapper(integer=42), IntegerWrapper(integer=67), IntegerWrapper(integer=11), IntegerWrapper(integer=14), IntegerWrapper(integer=80), IntegerWrapper(integer=15), IntegerWrapper(integer=19), IntegerWrapper(integer=72), IntegerWrapper(integer=41)]
16. 自定义parallelStream的thread pool
之前我们讲到parallelStream的底层使用到了ForkJoinPool来提交任务的,默认情况下ForkJoinPool为每一个处理器创建一个线程,parallelStream如果没有特别指明的情况下,都会使用这个共享线程池来提交任务。
那么在特定的情况下,我们想使用自定义的ForkJoinPool该怎么处理呢?
16.1 通常操作
假如我们想做一个从1到1000的加法,我们可以用并行stream这样做:
List<Integer> integerList= IntStream.range(1,1000).boxed().collect(Collectors.toList()); ForkJoinPool customThreadPool = new ForkJoinPool(4); Integer total= integerList.parallelStream().reduce(0, Integer::sum); log.info("{}",total);
输出结果:
INFO com.flydean.CustThreadPool - 499500
16.2 使用自定义ForkJoinPool
上面的例子使用的共享的thread pool。 我们看下怎么使用自定义的thread pool来提交并行stream:
List<Integer> integerList= IntStream.range(1,1000).boxed().collect(Collectors.toList()); ForkJoinPool customThreadPool = new ForkJoinPool(4); Integer actualTotal = customThreadPool.submit( () -> integerList.parallelStream().reduce(0, Integer::sum)).get(); log.info("{}",actualTotal);
上面的例子中,我们定义了一个4个线程的ForkJoinPool,并使用它来提交了这个parallelStream。
输出结果:
INFO com.flydean.CustThreadPool - 499500
如果不想使用公共的线程池,则可以使用自定义的ForkJoinPool来提交。
17. 总结
本文统一介绍了Stream和lambda表达式的使用,涵盖了Stream和lambda表达式的各个小的细节,希望大家能够喜欢。
本文的代码https://github.com/ddean2009/learn-java-streams/