Flink窗口与状态编程开发(一)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Flink窗口与状态编程开发(一)

tranformation

reduce

对按key分类的数据流进行"滚动"压缩。可以将当前元素与前一个元素进行整合处理,并返回一个新值。可用于前后数据的计算或拼接。

例如下面例子,通过 id 字段进行分类,然后使用 reduce 进行压缩处理,每次将学生的名字字段进行拼接,年龄进行相加,返回一个新的对象:

SingleOutputStreamOperator operator = source
    .map((MapFunction<String, Student>) value -> parseTokens2Object(parseString2Tokens(value)))
    .keyBy((KeySelector<Student, Integer>) value ->  value == null ? 0 : value.getId())
    .reduce((ReduceFunction<Student>) (value1, value2) -> {
        Student student = new Student();
        student.setId(value1.getId() + value2.getId());
        student.setName(value1.getName() + " || " + value2.getName());
        student.setAge(value1.getAge() + value2.getAge());
return student;
    });
16> Student(id=2, name=name2, age=22)
11> Student(id=1, name=name1, age=21)
15> Student(id=3, name=name13, age=28)
16> Student(id=5, name=name10, age=25)
16> Student(id=10, name=name10 || name15, age=55)
16> Student(id=4, name=name2 || name7, age=44)
16> Student(id=6, name=name2 || name7 || name12, age=71)
...
16> Student(id=15, name=name10 || name15 || name5, age=80)
1> Student(id=12, name=name4 || name9 || name14, age=77)
11> Student(id=3, name=name1 || name6 || name11, age=68)

从结果可以看到,id 相同的都分到同一个分区(测试中可以简单通过前面的线程 id 确认,属于同一分区处理),然后传入新对象,按照 reduce 的操作进行了处理,返回了拼装之后的对象。

Fold

滚动折叠。合并当前元素和上一个被折叠的值,输入值可以与返回值类型不一样。

例如数据流是一组数字 [1, 5, 7],想要输出一个拼接后的字符串,可以通过下面进行处理:

// 标准格式
keyedStream.fold(${initialValue}, (s1, s2) -> s1 + " || " + s2);
SingleOutputStreamOperator operator = source
    .map((MapFunction<String, Student>) value -> parseTokens2Object(parseString2Tokens(value)))
    .keyBy("id")
    .fold("strat", new FoldFunction<Student, String>() {
@Override
public String fold(String accumulator, Student value) throws Exception {
return accumulator + " || " + value;
        }
    });
16> strat || Student(id=2, name=name2, age=22)
11> strat || Student(id=1, name=name1, age=21)
15> strat || Student(id=3, name=name13, age=28)
16> strat || Student(id=2, name=name2, age=22) || Student(id=2, name=name12, age=27)
1> strat || Student(id=4, name=name9, age=24)
11> strat || Student(id=1, name=name1, age=21) || Student(id=1, name=name6, age=21)
1> strat || Student(id=4, name=name9, age=24) || Student(id=4, name=name14, age=29)
16> strat || Student(id=2, name=name2, age=22) || Student(id=2, name=name12, age=27) || Student(id=2, name=name7, age=22)
...

从输出结果可以看到,初始值和每次处理的对象进行了拼接,最后返回的是折叠后的对象,不过该方法被标注为 @Deprecated,不建议继续使用。

Project

选择部分字段。注意,只对元组 Tuple 类型的输入流有效,输出的也是选择下标的新元组数据流。例如下面,选择是下标 1 和 3 的属性:

DataStreamSource<Tuple4<String, Integer, Long, BigDecimal>> customSource = env.fromCollection(
    Lists.newArrayList(
            Tuple4.of("one", 1, 1L, BigDecimal.ONE),
            Tuple4.of("two", 2, 2L, BigDecimal.ZERO),
            Tuple4.of("three", 3, 3L, BigDecimal.TEN),
            Tuple4.of("four", 4, 4L, BigDecimal.TEN)
    )
);
// 分离下标 1,3 到新到数据流
DataStream<Tuple2<Integer, BigDecimal>> tuple2DataStreamSource = customSource.project(1, 3);
[1, 1]
[2, 0]
[3, 10]
[4, 10]

window

窗口。该函数允许在已分区的 KeyedStream 上定义窗口。例如最近 5 秒内到达的数据:

dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5)));
dataStream.keyBy(0).window(SlidingEventTimeWindows.of(Time.seconds(3),Time.seconds(2)))

AllWindowedStream

在 DataStream 上定义窗口。与上面不同的是,此次定义的范围是所有流,windows 会根据某些特征(例如,最近 5 秒内到达的数据)对所有流事件进行划分窗口。

SingleOutputStreamOperator<Student> operator = source
                .map((MapFunction<String, Student>) value -> parseTokens2Object(parseString2Tokens(value)))
                .assignTimestampsAndWatermarks(new MyTimestampExtractor())
                .windowAll(TumblingEventTimeWindows.of(Time.milliseconds(1)))
                .apply(new AllWindowFunction<Student, Student, TimeWindow>(){...};

reduce、apply、process、Aggregate、 AggregateFunction、ProcessWindowFunction

全量函数:窗口先缓存所有元素,等到触发条件后对窗口内的全量元素执行计算。
增量函数:窗口保存一份中间数据,每流入一个新元素,新元素与中间数据两两合一,生成新的中间数据。
增量聚合函数计算性能较高,占用存储空间少,主要因为基于中间状态的计算结果,窗 口中只维护中间结果状态值,不需要缓存原始数据。而全量窗口函数使用的代价相对较高, 性能比较弱,主要因为此时算子需要对所有属于该窗口的接入数据进行缓存,然后等到窗口 触发的时候,对所有的原始数据进行汇总计算。

apply和process都是处理全量计算,但工作中正常用process。
process更加底层,更加强大,有open/close生命周期方法,又可获取RuntimeContext。可以自己定时触发计算的定时器,形成自己的定时任务在processElement方法定义定时器   context.timerService().registerEventTimeTimer(timestamp); ,当定时器时间到达,会回调onTimer()方法的计算任务。
Apply只能作用窗口流,而Process既可以作用于窗口流也可以作用于普通流

reduce接受两个相同类型的输入,生成一个同类型输出,所以泛型就一个 <T>
maxBy、minBy、sum这3个底层都是由reduce实现的
aggregate的输入值、中间结果值、输出值它们3个类型可以各不相同,泛型有<T, ACC, R>
归约函数(ReduceFunction)
将窗口中收集到的数据两两进行归约。当我们进行流处理时,就是要保存一个状态;每来一个新的数据,就和之前的聚合状态做归约,这样就实现了增量式的聚合。
所有聚合的操作保存在flink的状态内存中,因为他需要跨越多条记录,需要根据key保存状态。数据流入的过程,就是不断计算并更新flink中保存的状态的过程。
ReduceFunction 可以解决大多数归约聚合的问题,但是这个接口有一个限制,就是聚合状态的类型、输出结果的类型都必须和输入数据类型一样。这就迫使我们必须在聚合前,先将数
据转换(map)成预期结果类型;而在有些情况下,还需要对状态进行进一步处理才能得到输出结果,这时它们的类型可能不同,使用 ReduceFunction 就会非常麻烦。
使用ReduceFunction能够快速对两个相同类型的数据元素按照指定的方法进行聚合逻辑,实现sum功能。
聚合函数(aggregatefunction)
aggregate需要指定一个aggregatefunction函数,可以看做reduce函数的通用版本,这里有三种类型IN,ACC,OUT,分别代表输入类型,累加器类型,输出类型。
接口中有四个方法:
createAccumulator():创建一个累加器,这就是为聚合创建一个初始状态,每个聚合任务只会调用一次
add():将输入的元素添加到累加器中,这就是聚合状态,对于新来的数据进行进一步聚合的过程。传入两个参数,当前新到来的数据value,和当前的累加器accumulator;返回一个新的累加器值,对聚合状态进行更新。
getResult():从累加器提取聚合的输出结果。也就是说我们可以定义多个状态,然后基于这些聚合的状态计算出一个结果进行输出。比如计算平均,我们可以设置sum和count两个状态,最终调用这个方法时相除得到最终的结果。这个方法只在窗口要输出结果时调用。
merge():合并两个累加器,并将合并后的状态作为一个累加器返回。这个方法只在需要合并窗口的场景下才会被调用;最常见的合并窗口的场景就是会话窗口。
与reduce相比,aggregate的输入格式与输出格式可以不同。更加灵活。

reduceFunction统计每一小时用户的访问量:

package com.rosh.flink.pojo;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@AllArgsConstructor
@NoArgsConstructor
@Data
public class UserPojo {
    private Integer userId;
    private String name;
    private String uri;
    private Long timestamp;
}
package com.rosh.flink.wartermark;
import com.rosh.flink.pojo.UserPojo;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
public class WindowTS {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<UserPojo> dataDS = env.fromCollection(getUserLists());
        //生成有序水位线
        SingleOutputStreamOperator<UserPojo> orderStreamDS = dataDS.assignTimestampsAndWatermarks(WatermarkStrategy.<UserPojo>forMonotonousTimestamps()
                .withTimestampAssigner(new SerializableTimestampAssigner<UserPojo>() {
                    @Override
                    public long extractTimestamp(UserPojo element, long recordTimestamp) {
                        return element.getTimestamp();
                    }
                }));
        //聚合
        SingleOutputStreamOperator<Tuple2<Integer, Long>> userDS = orderStreamDS.map(new MapFunction<UserPojo, Tuple2<Integer, Long>>() {
            @Override
            public Tuple2<Integer, Long> map(UserPojo value) throws Exception {
                return Tuple2.of(value.getUserId(), 1L);
            }
        });
        //开窗统计每1小时用户访问了多少次
        SingleOutputStreamOperator<Tuple2<Integer, Long>> resultDS = userDS.keyBy(tuple -> tuple.f0)
                .window(TumblingEventTimeWindows.of(Time.hours(1)))
                .reduce(new ReduceFunction<Tuple2<Integer, Long>>() {
                    @Override
                    public Tuple2<Integer, Long> reduce(Tuple2<Integer, Long> value1, Tuple2<Integer, Long> value2) throws Exception {
                        value1.f1 = value1.f1 + value2.f1;
                        return value1;
                    }
                });
        resultDS.print();
        env.execute("WarterMarkTest");
    }
    private static List<UserPojo> getUserLists() throws NoSuchAlgorithmException {
        List<UserPojo> lists = new ArrayList<>();
        Random random = SecureRandom.getInstanceStrong();
        for (int i = 1; i <= 1000; i++) {
            String uri = "/goods/" + i;
            int userId = random.nextInt(10);
            //有序时间
            UserPojo userPojo = new UserPojo(userId, "name" + userId, uri, (long) (1000 * i));
            //无序时间
            lists.add(userPojo);
        }
        return lists;
    }
}

AggregateFunction统计人均访问次数

package com.rosh.flink.wartermark;
import com.rosh.flink.pojo.UserPojo;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.util.*;
public class AggWindowTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        SingleOutputStreamOperator<UserPojo> userDS = env.fromCollection(getUserLists()).assignTimestampsAndWatermarks(
                WatermarkStrategy.<UserPojo>forMonotonousTimestamps()
                        .withTimestampAssigner(new SerializableTimestampAssigner<UserPojo>() {
                            @Override
                            public long extractTimestamp(UserPojo element, long recordTimestamp) {
                                return element.getTimestamp();
                            }
                        })
        );
        //统计5秒内,人均访问次数
        SingleOutputStreamOperator<Double> resultDS = userDS.keyBy(key -> true)
                .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
                .aggregate(new PeopleHourAvgCount());
        resultDS.print("人均访问次数为:");
        env.execute("AggWindowTest");
    }
    private static class PeopleHourAvgCount implements AggregateFunction<UserPojo, Tuple2<HashSet<Integer>, Long>, Double> {
        /**
         * 初始化累加器
         */
        @Override
        public Tuple2<HashSet<Integer>, Long> createAccumulator() {
            return Tuple2.of(new HashSet<>(), 0L);
        }
        /**
         *
         */
        @Override
        public Tuple2<HashSet<Integer>, Long> add(UserPojo value, Tuple2<HashSet<Integer>, Long> accumulator) {
            //distinct userId
            accumulator.f0.add(value.getUserId());
            //次数+1
            accumulator.f1 = accumulator.f1 + 1;
            //返回累加器
            return accumulator;
        }
        @Override
        public Double getResult(Tuple2<HashSet<Integer>, Long> accumulator) {
            return accumulator.f1 * 1.0 / accumulator.f0.size();
        }
        @Override
        public Tuple2<HashSet<Integer>, Long> merge(Tuple2<HashSet<Integer>, Long> a, Tuple2<HashSet<Integer>, Long> b) {
            return null;
        }
    }
    /**
     * 获取随机人数的1000次访问
     */
    private static List<UserPojo> getUserLists() throws NoSuchAlgorithmException {
        List<UserPojo> lists = new ArrayList<>();
        Random random = SecureRandom.getInstanceStrong();
        //获取随机人数
        int peopleCount = random.nextInt(20);
        System.out.println("随机人数为:" + peopleCount);
        for (int i = 1; i <= 1000; i++) {
            String uri = "/goods/" + i;
            int userId = random.nextInt(peopleCount);
            //有序时间
            UserPojo userPojo = new UserPojo(userId, "name" + userId, uri, new Date().getTime());
            //无序时间
            lists.add(userPojo);
        }
        return lists;
    }
}

全窗口函数

窗口操作中的另一大类就是全窗口函数。与增量聚合函数不同,全窗口函数需要先收集窗口中的数据,并在内部缓存起来,等到窗口要输出结果的时候再取出数据进行计算。
这种计算方式相比于流处理是低效的,但是有的时候必须获取到所有数据才能计算,或者需要获取窗口的起始时间等,那么就必须使用全窗口函数。这是典型的批处理思想。
1.窗口函数(WindowFunction)
WindowFunction字面上就是“窗口函数”,他其实就是老版本的通用窗口函数接口。我们可以基于WindowedStream调用apply方法,传入一个WindowFunction的实现类。
WindowFunction可用的功能较少,一般使用ProcessWindowFunction。
ProcessWindowFunction 是 Window API 中最底层的通用窗口函数接口。之所以说它“最底层”,是因为除了可以拿到窗口中的所有数据之外,ProcessWindowFunction 还可以获取到一个“上下文对象”(Context)。这个上下文对象非常强大,不仅能够获取窗口信息,还可以访问当前的时间和状态信息。这里的时间就包括了处理时间(processing time)和事件时间水位线(event time watermark)。这就使得 ProcessWindowFunction 更加灵活、功能更加丰富。事实上,ProcessWindowFunction 是 Flink 底层 API——处理函数(process function)中的一员。

统计10秒访问UV:

package com.rosh.flink.wartermark;
import com.rosh.flink.pojo.UserPojo;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.sql.Timestamp;
import java.util.*;
public class ProcessWindowTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<UserPojo> userDS = env.fromCollection(getUserLists());
        //水位线
        SingleOutputStreamOperator<UserPojo> watermarks = userDS.assignTimestampsAndWatermarks(WatermarkStrategy.<UserPojo>forMonotonousTimestamps()
                .withTimestampAssigner(new SerializableTimestampAssigner<UserPojo>() {
                    @Override
                    public long extractTimestamp(UserPojo element, long recordTimestamp) {
                        return element.getTimestamp();
                    }
                }));
        //开窗10秒UV统计
        SingleOutputStreamOperator<String> resultDS = watermarks.keyBy(key -> true)
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .process(new UserUVCount());
        resultDS.print("UV:");
        env.execute("ProcessWindowTest");
    }
    private static class UserUVCount extends ProcessWindowFunction<UserPojo, String, Boolean, TimeWindow> {
        @Override
        public void process(Boolean aBoolean, ProcessWindowFunction<UserPojo, String, Boolean, TimeWindow>.Context context, Iterable<UserPojo> elements, Collector<String> out) throws Exception {
            //用户集合
            HashSet<Integer> hashSet = new HashSet<>();
            for (UserPojo user : elements) {
                hashSet.add(user.getUserId());
            }
            //获取时间信息
            long start = context.window().getStart();
            long end = context.window().getEnd();
            String rs = "窗口信息,startTime:" + new Timestamp(start) + ",endTime: " + new Timestamp(end) + ",用户访问的次数为:" + hashSet.size();
            out.collect(rs);
        }
    }
    private static List<UserPojo> getUserLists() throws NoSuchAlgorithmException {
        List<UserPojo> lists = new ArrayList<>();
        Random random = SecureRandom.getInstanceStrong();
        int userCount = random.nextInt(100);
        for (int i = 1; i <= 1000; i++) {
            String uri = "/goods/" + i;
            int userId = random.nextInt(userCount);
            //有序时间
            UserPojo userPojo = new UserPojo(userId, "name" + userId, uri, new Date().getTime());
            //无序时间
            lists.add(userPojo);
        }
        return lists;
    }
}

增量聚合和全窗口函数的结合使用

增量聚合函数处理计算会更高效。全窗口函数的优势在于提供了更多的信息,可以认为是更加“通用”的窗口操作。所以在实际应用中,我们往往希望兼具这两者的优点,把它们结合在一起使用。Flink 的Window API 就给我们实现了这样的用法。
在调用 WindowedStream 的.reduce()和.aggregate()方法时,只是简单地直接传入了一个 ReduceFunction 或 AggregateFunction 进行增量聚合。除此之外,其实还可以传入第二个参数:一个全窗口函数,可以是 WindowFunction 或者 ProcessWindowFunction。
# ReduceFunction 与 WindowFunction 结合
public <R> SingleOutputStreamOperator<R> reduce( ReduceFunction<T> reduceFunction, WindowFunction<T, R, K, W> function) 
# ReduceFunction 与 ProcessWindowFunction 结合
public <R> SingleOutputStreamOperator<R> reduce( ReduceFunction<T> reduceFunction, ProcessWindowFunction<T, R, K, W> function)
# AggregateFunction 与 WindowFunction 结合
public <ACC, V, R> SingleOutputStreamOperator<R> aggregate(AggregateFunction<T, ACC, V> aggFunction, WindowFunction<V, R, K, W> windowFunction)
# AggregateFunction 与 ProcessWindowFunction 结合
public <ACC, V, R> SingleOutputStreamOperator<R> aggregate( AggregateFunction<T, ACC, V> aggFunction, ProcessWindowFunction<V, R, K, W> windowFunction)
这样调用的处理机制是:基于第一个参数(增量聚合函数)来处理窗口数据,每来一个数据就做一次聚合;等到窗口需要触发计算时,则调用第二个参数(全窗口函数)的处理逻辑输
出结果。需要注意的是,这里的全窗口函数就不再缓存所有数据了,而是直接将增量聚合函数的结果拿来当作了 Iterable 类型的输入。一般情况下,这时的可迭代集合中就只有一个元素了。
ProcessWindowFunction和其他窗口函数可以一起组合使用,满足你的一切需求,因为将ProcessWindowFunction用于简单的聚合(比如count)是非常低效的。ProcessWindowFunction可以与ReduceFunction、AggregateFunction或FoldFunction组合,以便在元素到达窗口时就增量地聚合它们,减少 ProcessWindowFunction 处理的数据量。当窗口关闭时,ProcessWindowFunction将提供聚合的结果。这允许它在访问ProcessWindowFunction的附加窗口元信息时以增量方式计算窗口。

统计10秒的url浏览量:

package com.rosh.flink.wartermark;
import com.alibaba.fastjson.JSONObject;
import com.rosh.flink.pojo.UserPojo;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Random;
public class UrlWindowTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //读取数据源
        DataStreamSource<UserPojo> userDS = env.fromCollection(getUserLists());
        //水位线
        SingleOutputStreamOperator<UserPojo> waterDS = userDS.assignTimestampsAndWatermarks(WatermarkStrategy.<UserPojo>forMonotonousTimestamps()
                .withTimestampAssigner(new SerializableTimestampAssigner<UserPojo>() {
                    @Override
                    public long extractTimestamp(UserPojo element, long recordTimestamp) {
                        return element.getTimestamp();
                    }
                }));
        //url count
        SingleOutputStreamOperator<Tuple2<String, Long>> urlDS = waterDS.map(new MapFunction<UserPojo, Tuple2<String, Long>>() {
            @Override
            public Tuple2<String, Long> map(UserPojo value) throws Exception {
                return Tuple2.of(value.getUri(), 1L);
            }
        });
        SingleOutputStreamOperator<JSONObject> resultDS = urlDS.keyBy(data -> data.f0)
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .reduce(new ReduceFunction<Tuple2<String, Long>>() {
                    @Override
                    public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {
                        value1.f1 = value1.f1 + value2.f1;
                        return value1;
                    }
                }, new WindowFunction<Tuple2<String, Long>, JSONObject, String, TimeWindow>() {
                    @Override
                    public void apply(String s, TimeWindow window, Iterable<Tuple2<String, Long>> input, Collector<JSONObject> out) throws Exception {
                        Tuple2<String, Long> tuple2 = input.iterator().next();
                        JSONObject jsonObject = new JSONObject();
                        jsonObject.put("url", tuple2.f0);
                        jsonObject.put("count", tuple2.f1);
                        new Timestamp(window.getStart());
                        jsonObject.put("startTime", new Timestamp(window.getStart()).toString());
                        jsonObject.put("endTime", new Timestamp(window.getEnd()).toString());
                        out.collect(jsonObject);
                    }
                });
        resultDS.print();
        env.execute("UrlWindowTest");
    }
    private static List<UserPojo> getUserLists() throws NoSuchAlgorithmException {
        List<UserPojo> lists = new ArrayList<>();
        Random random = SecureRandom.getInstanceStrong();
        for (int i = 1; i <= 1000; i++) {
            //随机生成userId、goodId
            int userId = random.nextInt(100);
            int goodId = random.nextInt(50);
            String uri = "/goods/" + goodId;
            //有序时间
            UserPojo userPojo = new UserPojo(userId, "name" + userId, uri, new Date().getTime());
            //无序时间
            lists.add(userPojo);
        }
        return lists;
    }
}
相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
6天前
|
自然语言处理 监控 数据挖掘
【Flink】Flink中的窗口分析
【4月更文挑战第19天】【Flink】Flink中的窗口分析
|
2月前
|
Java 流计算
【极数系列】Flink搭建入门项目Demo & 秒懂Flink开发运行原理(05)
【极数系列】Flink搭建入门项目Demo & 秒懂Flink开发运行原理(05)
|
4月前
|
流计算
Flink窗口——window
Flink窗口——window
24 0
|
2天前
|
BI API 流计算
[实时流基础 flink] 窗口
[实时流基础 flink] 窗口
|
1月前
|
分布式计算 监控 API
flink 入门编程day02
flink 入门编程day02
36 5
|
2月前
|
SQL Oracle 算法
Flink CDC 数据源问题之不支持窗口聚合如何解决
Flink CDC数据源指的是使用Apache Flink的CDC特性来连接并捕获外部数据库变更数据的数据源;本合集将介绍如何配置和管理Flink CDC数据源,以及解决数据源连接和同步过程中遇到的问题。
40 0
|
3月前
|
Java API Scala
【Flink】Flink Java 统计词频 开发
【1月更文挑战第26天】【Flink】Flink Java 统计词频 开发
|
3月前
|
消息中间件 存储 NoSQL
Flink几道经典编程场景
Flink几道经典编程场景
|
3月前
|
SQL 存储 缓存
大厂 5 年实时数据开发经验总结,Flink SQL 看这篇就够了!
大厂 5 年实时数据开发经验总结,Flink SQL 看这篇就够了!
111 0
|
3月前
|
监控 Java 流计算
Flink中的窗口操作是什么?请解释其作用和使用场景。
Flink中的窗口操作是什么?请解释其作用和使用场景。
26 0