Java 8 新特性:Java 类库的新特性之 Stream类(二)

简介: Java 8 新特性:Java 类库的新特性之 Stream类(二)

5.转换Stream


转换Stream其实就是把一个Stream通过某些行为转换成一个新的Stream。

eg:

List<Integer> nums = Lists.newArrayList(1,1,null,2,3,4,null,5,6,7,8,9,10);
    System.out.println(“sum is:”
      + nums.stream().filter(num -> num != null)
        .distinct().mapToInt(num -> num * 2)
        .peek(System.out::println).skip(2).limit(4).sum() );

说明:给定一个Integer类型的List,获取其对应的Stream对象,然后进行过滤掉null,再去重,再每个元素乘以2,再每个元素被消费的时候打印自身,在跳过前两个元素,最后去前四个元素进行加和运算。


性能问题:


在对于一个Stream进行多次转换操作,每次都对Stream的每个元素进行转换,而且是执行多次,这样时间复杂度就是一个for循环里把所有操作都做掉的N(转换的次数)倍。但是事实上不是这样的,转换操作都是lazy的,多个转换操作只会在聚合(reduce)操作的时候融合起来,一次循环完成。我们可以这样简单的理解,Stream里有个操作函数的集合,每次转换操作就是把转换函数放入这个集合中,在聚合操作的时候循环Stream对应的集合,然后对每个元素执行所有的函数。


5.1 distinct()


Stream<T>distinct()


对于Stream中包含的元素进行去重操作(去重逻辑依赖元素的equals方法),新生成的Stream中没有重复的元素。(根据.equals行为排除所有重复的元素。)


distinct()方法示意图:

image.png

5.2 filter( )


Stream<T> filter(Predicate<? super T> predicate)


对于Stream中包含的元素使用给定的过滤函数进行过滤操作,新生成的Stream只包含符合条件的元素。(排除所有与断言不匹配的元素。)


filter()方法示意图:

image.png

5.3 map( )


<R> Stream<R> map(Function<? super T,? extends R> mapper)


DoubleStream mapToDouble(ToDoubleFunction<? super T> mapper)


IntStreammapToInt(ToIntFunction<? super T> mapper)

LongStream mapToLong(ToLongFunction<? super T> mapper)


对于Stream中包含的元素使用给定的转换函数进行转换操作,新生成的Stream只包含转换生成的元素。(通过Function对元素执行一对一的转换)


这个方法有三个对于原始类型的变种方法,分别是:mapToInt(),mapToLong()和mapToDouble()。比较好理解,如mapToInt就是把原始Stream转换成一个新的Stream,这个新生成的Stream中的元素都是int类型。此三个变种方法,可以免除自动装箱/拆箱的额外消耗;


map()方法示意图:

image.png

5.4 flatMap( )


<R> Stream<R>flatMap(Function<? super T,? extends Stream<? extends R>> mapper)

DoubleStream flatMapToDouble(Function<? super T,? extends DoubleStream> mapper)

IntStream flatMapToInt(Function<? super T,? extends IntStream> mapper)

LongStream flatMapToLong(Function<? super T,? extends LongStream> mapper)


和map类似,不同的是其每个元素转换得到的是Stream对象,会把子Stream中的元素压缩到父集合中。(通过FlatMapper将每个元素转变为无或更多的元素。)


flatMap()方法示意图:

image.png

5.5 peek( )


Stream<T>peek(Consumer<? super T> action)


(返回一个流的元素组成的流,另外在每个元素上执行所提供的行动产生的流元素消耗。)


生成一个包含原Stream的所有元素的新Stream,同时会提供一个消费函数(Consumer实例),新Stream每个元素被消费的时候都会执行给定的消费函数.(对每个遇到的元素执行一些操作。主要对调试很有用。)


peek()方法示意图:

image.png

5.6 limit( )


Stream<T>limit(long maxSize)


对一个Stream进行截断操作,获取其前N个元素;如果原Stream中包含的元素个数小于N,那就获取其所有的元素。(保证后续的操作所能看到的最大数量的元素。)


limit()方法示意图:

image.png

5.7 skip( )


Stream<T> skip(long n)


返回一个丢弃原Stream的前N个元素后剩下元素组成的新Stream,如果原Stream中包含的元素个数小于N,那么返回空Stream。(取N个元素后面的所有元素)


skip()方法示意图:

image.png

6.Reduce(聚合)Stream


聚合(也称为折叠)接受一个元素序列为输入,反复使用某个合并操作,把序列中的元素合并成一个汇总的结果。eg:查找一个数字列表的总和或者最大值,或者把这些数字累积成一个List对象。


Stream接口有一些通用的聚合操作,eg:reduce()和collect();


也有一些特定用途的聚合操作,eg:sum(),max()和count()。


Note:sum()方法不是所有的Stream对象都有的,只有IntStream、LongStream和DoubleStream是实例才有。


聚合操作:

1)可变聚合:把输入的元素们累积到一个可变的容器中,eg:Collection或者StringBuilder。


2)其他聚合:除去可变汇聚剩下的,一般都不是通过反复修改某个可变对象,而是通过把前一次的聚合(汇聚)结果当成下一次的入参,反复如此。eg:reduce(),count(),allMatch()。


6.1 可变聚合(collect)


collect()方法可以把Stream中的所有元素收集到一个结果容器中(eg:Collection)


<R,A>Rcollect(Collector<? super T,A,R> collector)

<R> R collect(Supplier<R> supplier, BiConsumer<R,? super T> accumulator, BiConsumer<R,R> combiner)


方法参数说明:Supplier supplier是一个工厂函数,用来生成一个新的容器;BiConsumer accumulator也是一个函数,用来把Stream中的元素添加到结果容器中;BiConsumer combiner还是一个函数,用来把中间状态的多个结果容器合并成为一个(并发的时候会用到)。


eg:对一个元素是Integer类型的List,先过滤掉全部的null,然后把剩下的元素收集到一个新的List中。

        List<Integer> nums = Lists.newArrayList(1,1,null,2,3,4,null,5,6,7,8,9,10);
        List<Integer> numsWithoutNull = nums.stream().filter(num -> num != null)
        .collect(() -> new ArrayList<Integer>(),
                (list, item) -> list.add(item),
                (list1, list2) -> list1.addAll(list2));

说明:第一个函数生成一个新的ArrayList实例;

第二个函数接受两个参数,第一个是前面生成的ArrayList对象,第二个是stream中包含的元素,函数体就是把stream中的元素加入ArrayList对象中。第二个函数被反复调用直到原stream的元素被消费完毕;

第三个函数也是接受两个参数,这两个都是ArrayList类型的,函数体就是把第二个ArrayList全部加入到第一个中。


Java8还给我们提供了Collector的工具类Collectors,其中已经定义了一些静态工厂方法,


eg:Collectors.toCollection()收集到Collection中;


Collectors.toList()收集到List中;


Collectors.toSet()收集到Set中。


Collectors英文文档地址:


http://docs.oracle.com/javase/8/docs/api/java/util/stream/Collectors.html


eg:

    List<Integer> nums = Lists.newArrayList(1,1,null,2,3,4,null,5,6,7,8,9,10);
    List<Integer> numsWithoutNull = nums.stream().filter(num -> num != null)
      .collect(Collectors.toList());

6.2其他聚合(或汇聚)


reduce()、count(),sum()等方法。


reduce()方法有三种形式:


Optional<T>reduce(BinaryOperator<T> accumulator)

T  reduce(T identity, BinaryOperator<T> accumulator)

<U> U reduce(U identity, BiFunction<U,? super T,U> accumulator, BinaryOperator<U> combiner)


eg:


一个参数的reduce方法:

/**
    *这个函数有两个参数,第一个参数是上次函数执行的返回值(也称为中间结果),第二个参数是stream中的元素,这个函数把这两个值相加,得到的和会被赋值给下次执行这个函数的第一个参数。Note:**第一次执行的时候第一个参数的值是Stream的第一个元素,第二个参数是Stream的第二个元素。
    *这个方法返回值类型是Optional,这是Java8防止出现NPE的一种可行方法。
    */
    List<Integer> ints = Lists.newArrayList(1,2,3,4,5,6,7,8,9,10);
    System.out.println("ints sum is:"
      + ints.stream().reduce((sum, item) -> sum + item).get());

两个参数的 reduce方法:

    /**
    *不同的是:它允许用户提供一个循环计算的初始值,如果Stream为空,就直接返回该值。
    *而且这个方法不会返回Optional,因为其不会出现null值。
    */
    List<Integer> ints = Lists.newArrayList(1,2,3,4,5,6,7,8,9,10);
    System.out.println("sum is:" 
      + ints.stream().reduce(0, (sum, item) -> sum + item));

count()方法示例:

    List<Integer> ints = Lists.newArrayList(1,2,3,4,5,6,7,8,9,10);
    System.out.println("sum is:" + ints.stream().count());

7. 应用示例

1)Stream API示例

eg:

public class MyStreams  {
    private enum Status {
        OPEN, CLOSED
    };
    /**
     * Task类有一个分数的概念(或者说是伪复杂度),其次是还有一个值可以为OPEN或CLOSED的状态.
     */
    private static final class Task {
        private final Status status;
        private final Integer points;
        Task( final Status status, final Integer points ) {
            this.status = status;
            this.points = points;
        }
        public Integer getPoints() {
            return points;
        }
        public Status getStatus() {
            return status;
        }
        @Override
        public String toString() {
            return String.format( "[%s, %d]", status, points );
        }
    }
}
/**
 * Stream API极大简化了集合框架的处理
 */
public class StreamDemo {
    public static void main(String[] args) {
        final Collection< Task > tasks = Arrays.asList(
                new Task( Status.OPEN, 5 ),
                new Task( Status.OPEN, 13 ),
                new Task( Status.CLOSED, 8 )
        );
        //1.获取tasks中状态为OPEN的总数和
        getOpenTotalPoint();
        //2.计算所有状态的总和
        getTotalPoints();
        //3.根据状态分组
        getListByStatus();
        //4.计算整个集合中每个task分数(或权重)的平均值
        //5.从文本文件中逐行读取数据这样典型的I/O操作也很适合用Stream API来处理。
        final Path path = new File( filename ).toPath();
        try( Stream< String > lines = Files.lines( path, StandardCharsets.UTF_8 ) ) {
            //对一个stream对象调用onClose方法会返回一个在原有功能基础上新增了关闭功能的stream对象,
            //当对stream对象调用close()方法时,与关闭相关的处理器就会执行。
            lines.onClose( () -> System.out.println("Done!") ).forEach( System.out::println );
        }
    }
    /**
     1.获取tasks中状态为OPEN的总数和
     思路:
     第一,task集合被转换化为其相应的stream表示。然后,filter操作过滤掉状态为CLOSED的task。
     下一步,mapToInt操作通过Task::getPoints这种方式调用每个task实例的getPoints方法把Task的stream转化为Integer的stream。
     最后,用sum函数把所有的分数加起来,得到最终的结果。
     */
    public static void getOpenTotalPoint(){
        final long totalPointsOfOpenTasks = tasks
                .stream()
                .filter( task -> task.getStatus() == Status.OPEN )
                .mapToInt( Task::getPoints )
                .sum();
        System.out.println( "OPEN--Total points: " + totalPointsOfOpenTasks );//输出结果:OPEN--Total points:18
    }
    /**
     *  stream另一个有价值的地方是能够原生支持并行处理。
     *  2.计算所有状态的总和
     */
    public static void getTotalPoints(){
        //这个示例和第一个示例很相似,但这个例子的不同之处在于这个程序是并行运行的,
        //其次使用reduce方法来算最终的结果。
        final double totalPoints = tasks
                .stream()
                .parallel()
                .map( task -> task.getPoints() ) // 或 map( Task::getPoints ) 
                .reduce( 0, Integer::sum );
        System.out.println( "Total points (all tasks): " + totalPoints );//输出结果:Total points (all tasks): 26.0
    }
    /**
     按照某种准则来对集合中的元素进行分组。
     3.根据状态分组
     */
    public static void getListByStatus(){
        final Map< Status, List< Task > > map = tasks
                .stream()
                .collect( Collectors.groupingBy( Task::getStatus ) );
        System.out.println( map );//输出结果:{CLOSED=[[CLOSED, 8]], OPEN=[[OPEN, 5], [OPEN, 13]]}
    }
    /**
     4.计算整个集合中每个task分数(或权重)的平均值
     */
    public static void getPercentage(){
        final Collection< String > result = tasks
                .stream()                                        // Stream< String >
                .mapToInt( Task::getPoints )                     // IntStream
                .asLongStream()                                  // LongStream
                .mapToDouble( points -> points / totalPoints )   // DoubleStream
                .boxed()                                         // Stream< Double >
                .mapToLong( weigth -> ( long )( weigth * 100 ) ) // LongStream
                .mapToObj( percentage -> percentage + "%" )      // Stream< String> 
                .collect( Collectors.toList() );                 // List< String > 
        System.out.println( result );//输出结果:[19%, 50%, 30%]
    }
}

2)生成斐波那契数列示例(利用Stream API,可以设计更加简单的数据接口。)

生成斐波那契数列,完全可以用一个无穷流表示(受限Java的long型大小,可以改为BigInteger)。

eg:

class FibonacciSupplier implements Supplier<Long> {
    long a = 0;
    long b = 1;
    @Override
    public Long get() {
        long x = a + b;
        a = b;
        b = x;
        return a;
    }
}
public class FibonacciStream {
    public static void main(String[] args) {
        Stream<Long> fibonacci = Stream.generate(new FibonacciSupplier());
        fibonacci.limit(10).forEach(System.out::println);
    //如果想取得数列的前10项,用limit(10),如果想取得数列的第20~30项,用skip(),
    //通过collect()方法把Stream变为List。该List存储的所有元素就已经是计算出的确定的元素了.
    List<Long> list = fibonacci.skip(20).limit(10).collect(Collectors.toList());
    }
}

Note:用Stream表示Fibonacci数列,其接口比任何其他接口定义都要来得简单灵活并且高效。

3)计算π可以利用π的展开式:π/4 = 1 - 1/3 + 1/5 - 1/7 + 1/9 - ...

eg:

/**
* 把π表示为一个无穷Stream
*/
class PiSupplier implements Supplier<Double> {
    double sum = 0.0;
    double current = 1.0;
    boolean sign = true;
    @Override
    public Double get() {
        sum += (sign ? 4 : -4) / this.current;
        this.current = this.current + 2.0;
        this.sign = ! this.sign;
        return sum;
    }
}
public class StreamDemo {
    public static void main(String[] args) {
    /*
    * 这个级数从100项开始可以把π的值精确到3.13~3.15之间
    */
    Stream<Double> piStream = Stream.generate(new PiSupplier());
    piStream.skip(100).limit(10).forEach(System.out::println);
    }
}

输出结果:


3.1514934010709914

3.1317889675734545

3.1513011626954057

3.131977491197821

3.1511162471786824

3.1321589012071183

3.150938243930123

3.132333592767332

3.1507667724908344

3.1325019323081857


4)利用欧拉变换对级数进行加速,


可以利用下面的公式:

image.png

/**
* 用代码实现就是把一个流变成另一个流
*/
class EulerTransform implements Function<Double, Double> {
    double n1 = 0.0;
    double n2 = 0.0;
    double n3 = 0.0;
    @Override
    public Double apply(Double t) {
        n1 = n2;
        n2 = n3;
        n3 = t;
        if (n1 == 0.0) {
            return 0.0;
        }
        return calc();
    }
    double calc() {
        double d = n3 - n2;
        return n3 - d * d / (n1 - 2 * n2 + n3);
    }
}
public class StreamDemo {
    public static void main(String[] args) {
    /*
    * 可以在10项之内把π的值计算到3.141~3.142之间:
    */
    Stream<Double> piStream2 = Stream.generate(new PiSupplier());
      piStream2.map(new EulerTransform())
      .limit(10)
      .forEach(System.out::println);
    }
}

输出结果:

0.0

0.0

3.166666666666667

3.1333333333333337

3.1452380952380956

3.13968253968254

3.1427128427128435

3.1408813408813416

3.142071817071818

3.1412548236077655


可以多次使用上面的加速器,下面输出结果自己测试哦。

/*
      20项之内可以计算出极其精确的值
    */
    Stream<Double> piStream3 = Stream.generate(new PiSupplier());
    piStream3.map(new EulerTransform())
         .map(new EulerTransform())
         .map(new EulerTransform())
         .map(new EulerTransform())
         .map(new EulerTransform())
         .limit(20)
         .forEach(System.out::println);

8.流(Stream)的串行与并行


一个流就像一个地带器。这些值“流过”(模拟水流)然后他们离开。一个流可以只被遍历一次,然后被丢弃。流也可以无限使用。


流能够是 串行的 或者 并行的。 它们可以使用其中一种方式开始,然后切换到另外的一种方式;使用stream.sequential()(切换串行)或stream.parallel()(切换并行)来达到这种切换。串行流在一个线程上连续操作。而并行流就可能一次出现在多个线程上。


Note:


重要的是要意识到并行不是毫无代价的。从性能的立场它不是无代价的,不能简单的将顺序流替换为并行流,且不做进一步思考就期望得到相同的结果。在并行化一个流以前,需要考虑很多特性,关于流、它的操作以及数据的目标方面。eg:访问顺序确实对我有影响吗?我的函数是无状态的吗?我的流有足够大,并且我的操作有足够复杂,这些能使得并行化是值得的吗?


如何通过并行Stream来提升性能?


//如何并行Stream来提升性能
class StreamDemo 
{
  public static void main(String[] args) 
  {
    //创建一个没有重复元素的大表
    int max =1000000;
    List<String> val=new ArrayList<>(max);
    for(int i=0;i<max;i++){
      UUID id=UUID.randomUUID();
      val.add(id.toString());
    }
    //算一下排序这个Stream要耗时多久
    //串行排序:
    getSequentialStreamTime();// 串行耗时: 899 ms
    //并行排序
    getParallelStreamTime();// 并行排序耗时: 472 ms
    //通过比较两段方法代码几乎是一样的,但是并行版的快了50%之多,
    //唯一需要做的改动就是将stream()改为parallelStream()。
  }
  public static void getSequentialStreamTime(){
    long t0 = System.nanoTime();
    long count = values.stream().sorted().count();
    System.out.println(count);
    long t1 = System.nanoTime();
    long millis = TimeUnit.NANOSECONDS.toMillis(t1 - t0);
    System.out.println(String.format("sequential sort took: %d ms", millis));
  }
  public static void getParallelStreamTime(){
    long t0 = System.nanoTime();
    long count = values.parallelStream().sorted().count();
    System.out.println(count);
    long t1 = System.nanoTime();     
    long millis = TimeUnit.NANOSECONDS.toMillis(t1 - t0);
    System.out.println(String.format("parallel sort took: %d ms", millis));     
  }
}

四.Stream与Collection区别


Collection是关于静止的数据结构,而Stream是有关动词算法和计算的。


Collection是主要面向内存,存储在内存中;Stream主要是面向CPU,通过CPU实现计算的。


理解说明:


将一个影片存储在DVD盘上,这是一个集合,因为它包含整个电影的字节数据结构,而这个影片被放在互联网上,我们通过视频软件去观看它时,它实际是被流化了,它变成了一个字节流,流是与时间有关的概念,而数据结构是与时间无关,不会随着时间变化变化,流正好相反,随着时间不断地动态变化,如同水流一样潺潺不断。

所以,集合与流的主要区别是是否需要被计算,集合是一个内存数据结构,集合中每个元素在加入到集合之前已经被计算了,相反,流是在即时要求即时计算。




使用集合需要开发者主动去遍历,使用一个遍历循环,这称为外部遍历。

使用一个流库需要使用内部遍历,它自己为你遍历元素,然后将结果保存在某处,你只要提供一个函数,它就会用这个函数对元素处理完成。


eg:

List<String> transactionIds = new ArrayList<>();
    for(Transaction t: transactions){
      transactionIds.add(t.getId()); //外部遍历
    }
    List<Integer> transactionIds = transactions.stream()
          .map(Transaction::getId) //内部遍历
          .collect(toList())
    }

Stream操作不同于Collection操作有两个根本点:

1)管道Pipelining: 许多流Stream操作返回流Stream自身,这就允许对其操作可以像链条一样排列,变成一个管道,这其中也会激活比如懒加载和short-circuiting操作。

2)内部迭代:相比于集合Collection是显式迭代(需要我们编码完成迭代),Stream操作是在其内部完成迭代操作。



五.为什么不在集合类实现元素迭代等操作,而是定义了全新的Stream API?


以下Oracle官方给出的解释:


1)集合类持有的所有元素都是存储在内存中的,非常巨大的集合类会占用大量的内存,而Stream的元素却是在访问的时候才被计算出来,这种“延迟计算”的特性有点类似Clojure的lazy-seq,占用内存很少。

2)集合类的迭代逻辑是调用者负责,通常是for循环,而Stream的迭代是隐含在对Stream的各种操作中,eg:map()。


要理解“延迟计算”,不妨创建一个无穷大小的Stream。


分析:如果要表示自然数集合,显然用集合类是不可能实现的,因为自然数有无穷多个。但是Stream可以做到。

eg:

/**
* 自然数集合的规则非常简单,每个元素都是前一个元素的值+1。
* 反复调用get(),将得到一个无穷数列,利用这个Supplier,可以创建一个无穷的Stream
*/
class NaturalSupplier implements Supplier<Long> {
    long num = 0;
    public Long get() {
        this.num = this.num + 1;
        return this.num;
    }
}
class StreamDemo
{
  public static void main(String[] args) {
    /**
    * 对这个Stream做任何map()、filter()等操作都是完全可以的,这说明Stream API对Stream进行转换并生成一个新的Stream并非实时计算,而是做了延迟计算。
    * 当然,对这个无穷的Stream不能直接调用forEach(),这样会无限打印下去。但是我们可以利用limit()变换,把这个无穷Stream变换为有限的Stream。
    */
    Stream<Long> natural = Stream.generate(new NaturalSupplier());
    natural.map((x) -> {
      return x * x;
    }).limit(10).forEach(System.out::println);
  }
}
目录
相关文章
|
5天前
|
Java 关系型数据库 MySQL
Elasticsearch【问题记录 01】启动服务&停止服务的2类方法【及 java.nio.file.AccessDeniedException: xx/pid 问题解决】(含shell脚本文件)
【4月更文挑战第12天】Elasticsearch【问题记录 01】启动服务&停止服务的2类方法【及 java.nio.file.AccessDeniedException: xx/pid 问题解决】(含shell脚本文件)
33 3
|
1天前
|
人工智能 安全 Java
Java8 - LocalDateTime时间日期类使用详解
Java8 - LocalDateTime时间日期类使用详解
|
2天前
|
安全 Java 程序员
|
3天前
|
Java
Java Class类
Java Class类
8 0
|
18天前
|
Java
Java中的多线程实现:使用Thread类与Runnable接口
【4月更文挑战第8天】本文将详细介绍Java中实现多线程的两种方法:使用Thread类和实现Runnable接口。我们将通过实例代码展示如何创建和管理线程,以及如何处理线程同步问题。最后,我们将比较这两种方法的优缺点,以帮助读者在实际开发中选择合适的多线程实现方式。
23 4
|
3月前
|
存储 Java 数据库连接
Java 编程问题:七、Java 反射类、接口、构造器、方法和字段4
Java 编程问题:七、Java 反射类、接口、构造器、方法和字段
24 0
|
8月前
|
安全 Java 开发者
【Java|多线程与高并发】JUC中常用的类和接口
JUC是Java并发编程中的一个重要模块,全称为Java Util Concurrent(Java并发工具包),它提供了一组用于多线程编程的工具类和框架,帮助开发者更方便地编写线程安全的并发代码。
|
8月前
|
Java
java泛型:泛型类,泛型接口,泛型方法,泛型集合
java泛型:泛型类,泛型接口,泛型方法,泛型集合
|
Java 测试技术 数据库
Java的POJO类为什么要实现Serializable接口
Java的POJO类为什么要实现Serializable接口
|
Java
Java:泛型方法、泛型类、泛型接口、类型通配符
Java:泛型方法、泛型类、泛型接口、类型通配符
66 0