🍊诉求:网站中一个非常经典的例子,就是实时统计一段时间内的热门 url。例如,需要统计最近10 秒钟内最热门的两个 url 链接,并且每 5 秒钟更新一次。我们知道,这可以用一个滑动窗口来实现,而“热门度”一般可以直接用访问量来表示。于是就需要开滑动窗口收集 url 的访问数据,按照不同的 url 进行统计,而后汇总排序并最终输出前两名。这其实就是著名的“Top N”问题。
🍊很显然,简单的增量聚合可以得到 url 链接的访问量,但是后续的排序输出 Top N 就很难实现了。所以接下来我们用窗口处理函数进行实现。
pom文件
<properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <flink.version>1.13.0</flink.version> <java.version>1.8</java.version> <scala.binary.version>2.12</scala.binary.version> <slf4j.version>1.7.30</slf4j.version> </properties> <dependencies> <!-- 引入 Flink 相关依赖--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency> <!-- 引入日志管理相关依赖--> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>${slf4j.version}</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>${slf4j.version}</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-to-slf4j</artifactId> <version>2.14.0</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.22</version> <scope>compile</scope> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.58</version> </dependency> </dependencies>
前置实体类
@Data @NoArgsConstructor @AllArgsConstructor @Builder public class UserViewCount { public String url; public Long count; public Long windowStart; public Long windowEnd; @Override public String toString() { return "UserViewCount{" + "url='" + url + '\'' + ", count=" + count + ", windowStart=" + new Timestamp(windowStart) + ", windowEnd=" + new Timestamp(windowEnd) + '}'; } }
import com.entity.UserEvent; import org.apache.flink.streaming.api.functions.source.SourceFunction; import java.util.Calendar; import java.util.Random; public class CustomSouce implements SourceFunction<UserEvent> { // 声明一个布尔变量,作为控制数据生成的标识位 private Boolean running = true; @Override public void run(SourceContext<UserEvent> ctx) throws Exception { Random random = new Random(); // 在指定的数据集中随机选取数据 String[] users = {"Mary", "Alice", "Bob", "Cary"}; String[] urls = {"./home", "./cart", "./fav", "./prod?id=1", "./prod?id=2"}; while (running) { ctx.collect(new UserEvent( users[random.nextInt(users.length)], urls[random.nextInt(urls.length)], Calendar .getInstance().getTimeInMillis() )); // 隔 200 ms生成一个点击事件,方便观测 Thread.sleep(200); } } @Override public void cancel() { running = false; } }
import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; import lombok.ToString; @Data @NoArgsConstructor @ToString @AllArgsConstructor public class UserEvent { private String userName; private String url; private Long timestemp; }
方式一:使用 ProcessAllWindowFunction
public class TopN_TestProcessAllWindowFuntion { //取top 3的數據 private static Integer needTopCount = 3; public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().setAutoWatermarkInterval(100); SingleOutputStreamOperator<UserEvent> stream = env.addSource(new CustomSouce()) .assignTimestampsAndWatermarks(WatermarkStrategy.<UserEvent>forBoundedOutOfOrderness(Duration.ZERO) .withTimestampAssigner(new SerializableTimestampAssigner<UserEvent>() { @Override public long extractTimestamp(UserEvent element, long recordTimestamp) { return element.getTimestemp(); } })); //按照用户姓名进行排名 stream.map(data -> data.getUserName()) .windowAll(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))) .aggregate(new UserHashMapCountAgg(), new UserCountWindowResult()) .print(); env.execute(); } public static class UserHashMapCountAgg implements AggregateFunction<String, HashMap<String, Long>, List<Tuple2<String, Long>>>{ @Override public HashMap<String, Long> createAccumulator() { return new HashMap<>(); } @Override public HashMap<String, Long> add(String value, HashMap<String, Long> accumulator) { if (accumulator.containsKey(value)){ accumulator.put(value, accumulator.get(value) + 1L); }else{ accumulator.put(value, 1L); } return accumulator; } @Override public List<Tuple2<String, Long>> getResult(HashMap<String, Long> accumulator) { List<Tuple2<String, Long>> resultList = new ArrayList<>(); accumulator.forEach((key, count) -> { resultList.add(Tuple2.of(key, count)); }); //排序 resultList.sort(new Comparator<Tuple2<String, Long>>() { @Override public int compare(Tuple2<String, Long> o1, Tuple2<String, Long> o2) { return o2.f1.intValue() - o1.f1.intValue(); } }); // resultList.sort(Comparator.comparing((key_1, key_2) -> key_1.f1.compareTo(key_1.f1))); // Collections.sort(resultList, Comparator.comparing(key -> key.f1)); return resultList; } @Override public HashMap<String, Long> merge(HashMap<String, Long> a, HashMap<String, Long> b) { //do nothing return null; } } //实现自定义窗口,包装用户输出信息 public static class UserCountWindowResult extends ProcessAllWindowFunction<List<Tuple2<String, Long>>, String, TimeWindow>{ @Override public void process(ProcessAllWindowFunction<List<Tuple2<String, Long>>, String, TimeWindow>.Context context, Iterable<List<Tuple2<String, Long>>> elements, Collector<String> out) throws Exception { StringBuilder stringBuilder = new StringBuilder(); stringBuilder.append("-------------------------\n"); stringBuilder.append("窗口结束时间:" + new Timestamp(context.window().getEnd()) + "\n"); //取出top n数据 List<Tuple2<String, Long>> needData = elements.iterator().next(); if (CollectionUtils.isNotEmpty(needData)) { for (Integer i = 0; i < needTopCount; i++) { Tuple2<String, Long> data = needData.get(i) == null ? null : needData.get(i); if (null != data){ String resultStr = "No." + (i + 1) + " " + "userName:" + data.f0 + " " + "count:" + data.f1 + "\n"; stringBuilder.append(resultStr); } } stringBuilder.append("-------------------------\n"); } out.collect(stringBuilder.toString()); } } }
截取部分运行结果
方式一:使用 KeyedProcessFunction
import com.entity.UserEvent; import com.entity.UserViewCount; import my.test.source.CustomSouce; import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import java.sql.Timestamp; import java.util.ArrayList; import java.util.Comparator; public class TopN_KeyedProcessFunction { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().setAutoWatermarkInterval(100); SingleOutputStreamOperator<UserEvent> eventStream = env.addSource(new CustomSouce()) .assignTimestampsAndWatermarks(WatermarkStrategy.<UserEvent>forMonotonousTimestamps() .withTimestampAssigner(new SerializableTimestampAssigner<UserEvent>() { @Override public long extractTimestamp(UserEvent element, long recordTimestamp) { return element.getTimestemp(); } })); //按照访问url分区,求出每个 url 的访问量 SingleOutputStreamOperator<UserViewCount> urlCountStream = eventStream.keyBy(data -> data.getUrl()) //滑动事件窗口:开窗(10s大小的窗口)、滑动距离5s .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))) .aggregate(new NewUrlViewCountAgg(), new NewUrlViewCountResult()); // 对结果中同一个窗口的统计数据,进行排序处理 SingleOutputStreamOperator<String> result = urlCountStream.keyBy(data -> data.windowEnd) .process(new TopN(3)); result.print("result"); env.execute(); } //自定义增量聚合 public static class NewUrlViewCountAgg implements AggregateFunction<UserEvent, Long, Long>{ @Override public Long createAccumulator() { return 0L; } @Override public Long add(UserEvent value, Long accumulator) { return accumulator + 1L; } @Override public Long getResult(Long accumulator) { return accumulator; } @Override public Long merge(Long a, Long b) { return null; } } //自定义全窗口函数,只需要包装窗口信息 public static class NewUrlViewCountResult extends ProcessWindowFunction<Long, UserViewCount, String, TimeWindow>{ @Override public void process(String url, ProcessWindowFunction<Long, UserViewCount, String, TimeWindow>.Context context, Iterable<Long> elements, Collector<UserViewCount> out) throws Exception { // 结合窗口信息,包装输出内容 long currentWindowStartTimeStemp = context.window().getStart(); long currentWindowEndTimeStemp = context.window().getEnd(); out.collect(new UserViewCount(url, elements.iterator().next(), currentWindowStartTimeStemp, currentWindowEndTimeStemp)); } } // 自定义处理函数,排序取 top n public static class TopN extends KeyedProcessFunction<Long, UserViewCount, String>{ // 将 n 作为属性 private Integer n; // 定义一个列表状态 private ListState<UserViewCount> urlViewCountListState; public TopN(Integer n) { this.n = n; } public TopN() { super(); } @Override public void open(Configuration parameters) throws Exception { // 从环境中获取列表状态句柄 urlViewCountListState = getRuntimeContext().getListState( new ListStateDescriptor<UserViewCount>("url-view-count-list", Types.POJO(UserViewCount.class))); } @Override public void processElement(UserViewCount value, KeyedProcessFunction<Long, UserViewCount, String>.Context ctx, Collector<String> out) throws Exception { // 将 count 数据添加到列表状态中,保存起来 urlViewCountListState.add(value); // 注册 window end + 1ms 后的定时器,等待所有数据到齐开始排序 ctx.timerService().registerEventTimeTimer(ctx.getCurrentKey() + 1); } @Override public void onTimer(long timestamp, KeyedProcessFunction<Long, UserViewCount, String>.OnTimerContext ctx, Collector<String> out) throws Exception { // 将数据从列表状态变量中取出,放入 ArrayList,方便排序 ArrayList<UserViewCount> urlViewCountArrayList = new ArrayList<>(); for (UserViewCount urlViewCount : urlViewCountListState.get()) { urlViewCountArrayList.add(urlViewCount); } // 清空状态,释放资源 urlViewCountListState.clear(); // 排序 urlViewCountArrayList.sort(new Comparator<UserViewCount>() { @Override public int compare(UserViewCount o1, UserViewCount o2) { return o2.count.intValue() - o1.count.intValue(); } }); // 提取前两名,构建输出结果 StringBuilder result = new StringBuilder(); result.append("========================================\n"); result.append("窗口结束时间:" + new Timestamp(timestamp - 1) + "\n"); for (int i = 0; i < this.n; i++) { UserViewCount userViewCount = urlViewCountArrayList.get(i); if (null == userViewCount){ break; } UserViewCount needUserView = userViewCount; String info = "No." + (i + 1) + " " + "url:" + userViewCount.url + " " + "浏览量:" + userViewCount.count + "\n"; result.append(info); } result.append("========================================\n"); out.collect(result.toString()); } } }
部分结果截图