Flink实时计算topN热榜

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Flink创建kafka数据源;基于 EventTime 处理,如何指定 Watermark; Flink中的Window,滚动(tumbling)窗口与滑动(sliding)窗口; State状态的使用; ProcessFunction 实现 TopN 功能.

1. 用到的知识点


  • Flink创建kafka数据源;


  • 基于 EventTime 处理,如何指定 Watermark;


  • Flink中的Window,滚动(tumbling)窗口与滑动(sliding)窗口;


  • State状态的使用;


  • ProcessFunction 实现 TopN 功能;


2. 案例介绍


通过用户访问日志,计算最近一段时间平台最活跃的几位用户topN。


  • 创建kafka生产者,发送测试数据到kafka;


  • 消费kafka数据,使用滑动(sliding)窗口,每隔一段时间更新一次排名;


3. 数据源


这里使用kafka api发送测试数据到kafka,代码如下:

@Data
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class User {
    private long id;
    private String username;
    private String password;
    private long timestamp;
}
Map<String, String> config = Configuration.initConfig("commons.xml");
@Test
public void sendData() throws InterruptedException {
    int cnt = 0;
    while (cnt < 200){
        User user = new User();
        user.setId(cnt);
        user.setUsername("username" + new Random().nextInt((cnt % 5) + 2));
        user.setPassword("password" + cnt);
        user.setTimestamp(System.currentTimeMillis());
        Future<RecordMetadata> future = KafkaUtil.sendDataToKafka(config.get("kafka-topic"), String.valueOf(cnt), JSON.toJSONString(user));
        while (!future.isDone()){
            Thread.sleep(100);
        }
        try {
            RecordMetadata recordMetadata = future.get();
            System.out.println(recordMetadata.offset());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        System.out.println("发送消息:" + cnt + "******" + user.toString());
        cnt = cnt + 1;
    }
}


这里通过随机数来扰乱username,便于使用户名大小不一,让结果更加明显。KafkaUtil是自己写的一个kafka工具类,代码很简单,主要是平时做测试方便。


4. 主要程序


创建一个main程序,开始编写代码。


创建flink环境,关联kafka数据源。


Map<String, String> config = Configuration.initConfig("commons.xml");
Properties kafkaProps = new Properties();
kafkaProps.setProperty("zookeeper.connect", config.get("kafka-zookeeper"));
kafkaProps.setProperty("bootstrap.servers", config.get("kafka-ipport"));
kafkaProps.setProperty("group.id", config.get("kafka-groupid"));
StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();

EventTime 与 Watermark


senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

设置属性senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime),表示


按照数据时间字段来处理,默认是TimeCharacteristic.ProcessingTime

/** The time characteristic that is used if none other is set. */
  private static final TimeCharacteristic DEFAULT_TIME_CHARACTERISTIC = TimeCharacteristic.ProcessingTime;


这个属性必须设置,否则后面,可能窗口结束无法触发,导致结果无法输出。取值有三种:


  • ProcessingTime:事件被处理的时间。也就是由flink集群机器的系统时间来决定。


  • EventTime:事件发生的时间。一般就是数据本身携带的时间。


  • IngestionTime:摄入时间,数据进入flink流的时间,跟ProcessingTime还是有区别的;


指定好使用数据的实际时间来处理,接下来需要指定flink程序如何get到数据的时间字段,这里使用调用DataStream的assignTimestampsAndWatermarks方法,抽取时间和设置watermark。

senv.addSource(
        new FlinkKafkaConsumer010<>(
                config.get("kafka-topic"),
                new SimpleStringSchema(),
                kafkaProps
        )
).map(x ->{
    return JSON.parseObject(x, User.class);
}).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<User>(Time.milliseconds(1000)) {
    @Override
    public long extractTimestamp(User element) {
        return element.getTimestamp();
    }
})


前面给出的代码中可以看出,由于发送到kafka的时候,将User对象转换为json字符串了,这里使用的是fastjson,接收过来可以转化为JsonObject来处理,我这里还是将其转化为User对象JSON.parseObject(x, User.class),便于处理。


这里考虑到数据可能乱序,使用了可以处理乱序的抽象类


BoundedOutOfOrdernessTimestampExtractor,并且实现了唯一的一个没有实现的方法extractTimestamp,乱序数据,会导致数据延迟,在构造方法中传入了一个Time.milliseconds(1000),表明数据可以延迟一秒钟。比如说,如果窗口长度是10s,010s的数据会在11s的时候计算,此时watermark是10,才会触发计算,也就是说引入watermark处理乱序数据,最多可以容忍0t这个窗口的数据,最晚在t+1时刻到来。

微信图片_20220429122336.png


具体关于watermark的讲解可以参考这篇文章


https://blog.csdn.net/qq_39657909/article/details/106081543


窗口统计


业务需求上,通常可能是一个小时,或者过去15分钟的数据,5分钟更新一次排名,这里为了演示效果,窗口长度取10s,每次滑动(slide)5s,即5秒钟更新一次过去10s的排名数据。

.keyBy("username")
.timeWindow(Time.seconds(10), Time.seconds(5))
.aggregate(new CountAgg(), new WindowResultFunction())


我们使用.keyBy("username")对用户进行分组,使用.timeWindow(Time size, Time slide)对每个用户做滑动窗口(10s窗口,5s滑动一次)。然后我们使用 .aggregate(AggregateFunction af, WindowFunction wf) 做增量的聚合操作,它能使用AggregateFunction提前聚合掉数据,减少 state 的存储压力。较之.apply(WindowFunction wf)会将窗口中的数据都存储下来,最后一起计算要高效地多。aggregate()方法的第一个参数用于


这里的CountAgg实现了AggregateFunction接口,功能是统计窗口中的条数,即遇到一条数据就加一。

public class CountAgg implements AggregateFunction<User, Long, Long>{
    @Override
    public Long createAccumulator() {
        return 0L;
    }
    @Override
    public Long add(User value, Long accumulator) {
        return accumulator + 1;
    }
    @Override
    public Long getResult(Long accumulator) {
        return accumulator;
    }
    @Override
    public Long merge(Long a, Long b) {
        return a + b;
    }
}


.aggregate(AggregateFunction af, WindowFunction wf) 的第二个参数WindowFunction将每个 key每个窗口聚合后的结果带上其他信息进行输出。我们这里实现的WindowResultFunction将用户名,窗口,访问量封装成了UserViewCount进行输出。

private static class WindowResultFunction implements WindowFunction<Long, UserViewCount, Tuple, TimeWindow> {
    @Override
    public void apply(Tuple key, TimeWindow window, Iterable<Long> input, Collector<UserViewCount> out) throws Exception {
        Long count = input.iterator().next();
        out.collect(new UserViewCount(((Tuple1<String>)key).f0, window.getEnd(), count));
    }
}
@Data
@NoArgsConstructor
@AllArgsConstructor
@ToString
public static class UserViewCount {
    private String userName;
    private long windowEnd;
    private long viewCount;
}


TopN计算最活跃用户


为了统计每个窗口下活跃的用户,我们需要再次按窗口进行分组,这里根据UserViewCount中的windowEnd进行keyBy()操作。然后使用 ProcessFunction 实现一个自定义的 TopN 函数 TopNHotItems 来计算点击量排名前3名的用户,并将排名结果格式化成字符串,便于后续输出。

.keyBy("windowEnd")
.process(new TopNHotUsers(3))
.print();


ProcessFunction 是 Flink 提供的一个 low-level API,用于实现更高级的功能。它主要提供了定时器 timer 的功能(支持EventTime或ProcessingTime)。本案例中我们将利用 timer 来判断何时收齐了某个 window 下所有用户的访问数据。由于 Watermark 的进度是全局的,在 processElement 方法中,每当收到一条数据

ItemViewCount),我们就注册一个 windowEnd+1 的定时器(Flink 框架会自动忽略同一时间的重复注册)。windowEnd+1 的定时器被触发时,意味着收到了windowEnd+1的 Watermark,即收齐了该windowEnd下的所有用户窗口统计值。我们在 onTimer() 中处理将收集的所有商品及点击量进行排序,选出 TopN,并将排名信息格式化成字符串后进行输出。


这里我们还使用了 ListState<ItemViewCount> 来存储收到的每条 UserViewCount 消息,保证在发生故障时,状态数据的不丢失和一致性。ListState是 Flink 提供的类似 Java List 接口的 State API,它集成了框架的 checkpoint 机制,自动做到了 exactly-once 的语义保证。


private static class TopNHotUsers extends KeyedProcessFunction<Tuple, UserViewCount, String> {
    private int topSize;
    private ListState<UserViewCount> userViewCountListState;
    public TopNHotUsers(int topSize) {
        this.topSize = topSize;
    }
    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
        super.onTimer(timestamp, ctx, out);
        List<UserViewCount> userViewCounts = new ArrayList<>();
        for(UserViewCount userViewCount : userViewCountListState.get()) {
            userViewCounts.add(userViewCount);
        }
        userViewCountListState.clear();
        userViewCounts.sort(new Comparator<UserViewCount>() {
            @Override
            public int compare(UserViewCount o1, UserViewCount o2) {
                return (int)(o2.viewCount - o1.viewCount);
            }
        });
        // 将排名信息格式化成 String, 便于打印
        StringBuilder result = new StringBuilder();
        result.append("====================================\n");
        result.append("时间: ").append(new Timestamp(timestamp-1)).append("\n");
        for (int i = 0; i < topSize; i++) {
            UserViewCount currentItem = userViewCounts.get(i);
            // No1:  商品ID=12224  浏览量=2413
            result.append("No").append(i).append(":")
                    .append("  用户名=").append(currentItem.userName)
                    .append("  浏览量=").append(currentItem.viewCount)
                    .append("\n");
        }
        result.append("====================================\n\n");
        Thread.sleep(1000);
        out.collect(result.toString());
    }
    @Override
    public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {
        super.open(parameters);
        ListStateDescriptor<UserViewCount> userViewCountListStateDescriptor = new ListStateDescriptor<>(
                "user-state",
                UserViewCount.class
        );
        userViewCountListState = getRuntimeContext().getListState(userViewCountListStateDescriptor);
    }
    @Override
    public void processElement(UserViewCount value, Context ctx, Collector<String> out) throws Exception {
        userViewCountListState.add(value);
        ctx.timerService().registerEventTimeTimer(value.windowEnd + 1000);
    }
}


结果输出


可以看到,每隔5秒钟更新输出一次数据。

微信图片_20220429122342.png


相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
6月前
|
传感器 存储 缓存
[尚硅谷flink学习笔记] 实战案例TopN 问题
这段内容是关于如何使用Apache Flink解决实时统计水位传感器数据中,在一定时间窗口内出现次数最多的水位问题,即&quot;Top N&quot;问题。首先,介绍了一个使用滑动窗口的简单实现,通过收集传感器数据,按照水位计数,然后排序并输出前两名。接着,提出了全窗口和优化方案,其中优化包括按键分区(按水位vc分组)、开窗操作(增量聚合计算count)和过程函数处理(聚合并排序输出Top N结果)。最后,给出了一个使用`KeyedProcessFunction`进行优化的示例代码,通过按键by窗口结束时间,确保每个窗口的所有数据到达后再进行处理,提高了效率。
176 1
|
传感器 存储 缓存
Flink---10、处理函数(基本处理函数、按键分区处理函数、窗口处理函数、应用案例TopN、侧输出流)
Flink---10、处理函数(基本处理函数、按键分区处理函数、窗口处理函数、应用案例TopN、侧输出流)
|
流计算
Flink应用简单案例-统计TopN
Flink应用简单案例-统计TopN
Flink应用简单案例-统计TopN
|
存储 缓存 流计算
Flink应用案例统计实现TopN的两种方式
在窗口中可以用一个 HashMap 来保存每个 url 的访问次数,只要遍历窗口中的所有数据,自然就能得到所有 url 的热门度。最后把 HashMap 转成一个列表 ArrayList,然后进行排序、取出前两名输出就可以了。
617 0
Flink应用案例统计实现TopN的两种方式
|
SQL 流计算
Flink SQL 功能解密系列 —— 流式 TopN 挑战与实现
TopN 是统计报表和大屏非常常见的功能,主要用来实时计算排行榜。流式的 TopN 不同于批处理的 TopN,它的特点是持续的在内存中按照某个统计指标(如出现次数)计算 TopN 排行榜,然后当排行榜发生变化时,发出更新后的排行榜。
|
存储 监控 分布式数据库
Flink实战:全局TopN分析与实现
Flink 全局TopN分析与实现
1851 0
|
2月前
|
运维 数据处理 数据安全/隐私保护
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。
|
4月前
|
存储 监控 大数据
阿里云实时计算Flink在多行业的应用和实践
本文整理自 Flink Forward Asia 2023 中闭门会的分享。主要分享实时计算在各行业的应用实践,对回归实时计算的重点场景进行介绍以及企业如何使用实时计算技术,并且提供一些在技术架构上的参考建议。
811 7
阿里云实时计算Flink在多行业的应用和实践
|
3天前
|
存储 分布式计算 流计算
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
本文介绍了阿里云开源大数据团队在实时计算领域的最新成果——向量化流计算引擎Flash。文章主要内容包括:Apache Flink 成为业界流计算标准、Flash 核心技术解读、性能测试数据以及在阿里巴巴集团的落地效果。Flash 是一款完全兼容 Apache Flink 的新一代流计算引擎,通过向量化技术和 C++ 实现,大幅提升了性能和成本效益。
468 8
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
|
3月前
|
SQL 消息中间件 Kafka
实时计算 Flink版产品使用问题之如何在EMR-Flink的Flink SOL中针对source表单独设置并行度
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。