Flink应用简单案例-统计TopN

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Flink应用简单案例-统计TopN

🍊诉求:网站中一个非常经典的例子,就是实时统计一段时间内的热门 url。例如,需要统计最近10 秒钟内最热门的两个 url 链接,并且每 5 秒钟更新一次。我们知道,这可以用一个滑动窗口来实现,而“热门度”一般可以直接用访问量来表示。于是就需要开滑动窗口收集 url 的访问数据,按照不同的 url 进行统计,而后汇总排序并最终输出前两名。这其实就是著名的“Top N”问题。

🍊很显然,简单的增量聚合可以得到 url 链接的访问量,但是后续的排序输出 Top N 就很难实现了。所以接下来我们用窗口处理函数进行实现。

pom文件

  <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.13.0</flink.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.12</scala.binary.version>
        <slf4j.version>1.7.30</slf4j.version>
    </properties>
    <dependencies>
        <!-- 引入 Flink 相关依赖-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- 引入日志管理相关依赖-->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-to-slf4j</artifactId>
            <version>2.14.0</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.22</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.58</version>
        </dependency>
    </dependencies>

前置实体类

@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class UserViewCount {
    public String url;
    public Long count;
    public Long windowStart;
    public Long windowEnd;
    @Override
    public String toString() {
        return "UserViewCount{" +
                "url='" + url + '\'' +
                ", count=" + count +
                ", windowStart=" + new Timestamp(windowStart) +
                ", windowEnd=" + new Timestamp(windowEnd) +
                '}';
    }
}
import com.entity.UserEvent;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.Calendar;
import java.util.Random;
public class CustomSouce implements SourceFunction<UserEvent> {
    // 声明一个布尔变量,作为控制数据生成的标识位
    private Boolean running = true;
    @Override
    public void run(SourceContext<UserEvent> ctx) throws Exception {
        Random random = new Random(); // 在指定的数据集中随机选取数据
        String[] users = {"Mary", "Alice", "Bob", "Cary"};
        String[] urls = {"./home", "./cart", "./fav", "./prod?id=1",
                "./prod?id=2"};
        while (running) {
            ctx.collect(new UserEvent(
                    users[random.nextInt(users.length)],
                    urls[random.nextInt(urls.length)],
                    Calendar .getInstance().getTimeInMillis()
            ));
            // 隔 200 ms生成一个点击事件,方便观测
            Thread.sleep(200);
        }
    }
    @Override
    public void cancel() {
        running = false;
    }
}
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
@Data
@NoArgsConstructor
@ToString
@AllArgsConstructor
public class UserEvent {
    private String userName;
    private String url;
    private Long timestemp;
}

方式一:使用 ProcessAllWindowFunction

public class TopN_TestProcessAllWindowFuntion {
    //取top 3的數據
    private static  Integer needTopCount = 3;
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().setAutoWatermarkInterval(100);
        SingleOutputStreamOperator<UserEvent> stream = env.addSource(new CustomSouce())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<UserEvent>forBoundedOutOfOrderness(Duration.ZERO)
                        .withTimestampAssigner(new SerializableTimestampAssigner<UserEvent>() {
                            @Override
                            public long extractTimestamp(UserEvent element, long recordTimestamp) {
                                return element.getTimestemp();
                            }
                        }));
        //按照用户姓名进行排名
        stream.map(data -> data.getUserName())
                .windowAll(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
                .aggregate(new UserHashMapCountAgg(), new UserCountWindowResult())
                .print();
        env.execute();
    }
    public static class UserHashMapCountAgg implements AggregateFunction<String, HashMap<String, Long>, List<Tuple2<String, Long>>>{
        @Override
        public HashMap<String, Long> createAccumulator() {
            return new HashMap<>();
        }
        @Override
        public HashMap<String, Long> add(String value, HashMap<String, Long> accumulator) {
            if (accumulator.containsKey(value)){
                accumulator.put(value, accumulator.get(value) + 1L);
            }else{
                accumulator.put(value, 1L);
            }
            return accumulator;
        }
        @Override
        public List<Tuple2<String, Long>> getResult(HashMap<String, Long> accumulator) {
            List<Tuple2<String, Long>> resultList = new ArrayList<>();
            accumulator.forEach((key, count) -> {
                resultList.add(Tuple2.of(key, count));
            });
            //排序
            resultList.sort(new Comparator<Tuple2<String, Long>>() {
                @Override
                public int compare(Tuple2<String, Long> o1, Tuple2<String, Long> o2) {
                    return o2.f1.intValue() - o1.f1.intValue();
                }
            });
//            resultList.sort(Comparator.comparing((key_1, key_2) -> key_1.f1.compareTo(key_1.f1)));
//            Collections.sort(resultList, Comparator.comparing(key -> key.f1));
            return resultList;
        }
        @Override
        public HashMap<String, Long> merge(HashMap<String, Long> a, HashMap<String, Long> b) {
            //do nothing
            return null;
        }
    }
    //实现自定义窗口,包装用户输出信息
    public static class UserCountWindowResult extends ProcessAllWindowFunction<List<Tuple2<String, Long>>, String, TimeWindow>{
        @Override
        public void process(ProcessAllWindowFunction<List<Tuple2<String, Long>>, String, TimeWindow>.Context context, Iterable<List<Tuple2<String, Long>>> elements, Collector<String> out) throws Exception {
            StringBuilder stringBuilder = new StringBuilder();
            stringBuilder.append("-------------------------\n");
            stringBuilder.append("窗口结束时间:" + new Timestamp(context.window().getEnd()) + "\n");
            //取出top n数据
            List<Tuple2<String, Long>> needData = elements.iterator().next();
            if (CollectionUtils.isNotEmpty(needData)) {
                for (Integer i = 0; i < needTopCount; i++) {
                    Tuple2<String, Long> data = needData.get(i) == null ? null : needData.get(i);
                    if (null != data){
                        String resultStr = "No." + (i + 1) + " "
                                + "userName:" + data.f0 + " "
                                + "count:" + data.f1 + "\n";
                        stringBuilder.append(resultStr);
                    }
                }
                stringBuilder.append("-------------------------\n");
            }
            out.collect(stringBuilder.toString());
        }
    }
}

截取部分运行结果

73d8c9be8b2a4960a39693770de0ac9a.png

方式一:使用 KeyedProcessFunction

import com.entity.UserEvent;
import com.entity.UserViewCount;
import my.test.source.CustomSouce;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Comparator;
public class TopN_KeyedProcessFunction {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().setAutoWatermarkInterval(100);
        SingleOutputStreamOperator<UserEvent> eventStream = env.addSource(new CustomSouce())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<UserEvent>forMonotonousTimestamps()
                .withTimestampAssigner(new SerializableTimestampAssigner<UserEvent>() {
                   @Override
                   public long extractTimestamp(UserEvent element, long
                           recordTimestamp) {
                       return element.getTimestemp();
                   }
               }));
        //按照访问url分区,求出每个 url 的访问量
        SingleOutputStreamOperator<UserViewCount> urlCountStream = eventStream.keyBy(data -> data.getUrl())
                        //滑动事件窗口:开窗(10s大小的窗口)、滑动距离5s
                        .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
                        .aggregate(new NewUrlViewCountAgg(), new NewUrlViewCountResult());
        // 对结果中同一个窗口的统计数据,进行排序处理
        SingleOutputStreamOperator<String> result = urlCountStream.keyBy(data ->
                        data.windowEnd)
                .process(new TopN(3));
        result.print("result");
        env.execute();
    }
    //自定义增量聚合
    public static class NewUrlViewCountAgg implements AggregateFunction<UserEvent, Long, Long>{
        @Override
        public Long createAccumulator() {
            return 0L;
        }
        @Override
        public Long add(UserEvent value, Long accumulator) {
            return accumulator + 1L;
        }
        @Override
        public Long getResult(Long accumulator) {
            return accumulator;
        }
        @Override
        public Long merge(Long a, Long b) {
            return null;
        }
    }
    //自定义全窗口函数,只需要包装窗口信息
    public static class NewUrlViewCountResult extends ProcessWindowFunction<Long, UserViewCount, String, TimeWindow>{
        @Override
        public void process(String url, ProcessWindowFunction<Long, UserViewCount, String, TimeWindow>.Context context, Iterable<Long> elements, Collector<UserViewCount> out) throws Exception {
            // 结合窗口信息,包装输出内容
            long currentWindowStartTimeStemp = context.window().getStart();
            long currentWindowEndTimeStemp = context.window().getEnd();
            out.collect(new UserViewCount(url, elements.iterator().next(), currentWindowStartTimeStemp, currentWindowEndTimeStemp));
        }
    }
    // 自定义处理函数,排序取 top n
    public static class TopN extends KeyedProcessFunction<Long, UserViewCount, String>{
        // 将 n 作为属性
        private Integer n;
        // 定义一个列表状态
        private ListState<UserViewCount> urlViewCountListState;
        public TopN(Integer n) {
            this.n = n;
        }
        public TopN() {
            super();
        }
        @Override
        public void open(Configuration parameters) throws Exception {
            // 从环境中获取列表状态句柄
            urlViewCountListState = getRuntimeContext().getListState(
                    new ListStateDescriptor<UserViewCount>("url-view-count-list",
                            Types.POJO(UserViewCount.class)));
        }
        @Override
        public void processElement(UserViewCount value, KeyedProcessFunction<Long, UserViewCount, String>.Context ctx, Collector<String> out) throws Exception {
            // 将 count 数据添加到列表状态中,保存起来
            urlViewCountListState.add(value);
            // 注册 window end + 1ms 后的定时器,等待所有数据到齐开始排序
            ctx.timerService().registerEventTimeTimer(ctx.getCurrentKey() + 1);
        }
        @Override
        public void onTimer(long timestamp, KeyedProcessFunction<Long, UserViewCount, String>.OnTimerContext ctx, Collector<String> out) throws Exception {
            // 将数据从列表状态变量中取出,放入 ArrayList,方便排序
            ArrayList<UserViewCount> urlViewCountArrayList = new ArrayList<>();
            for (UserViewCount urlViewCount : urlViewCountListState.get()) {
                urlViewCountArrayList.add(urlViewCount);
            }
            // 清空状态,释放资源
            urlViewCountListState.clear();
            // 排序
            urlViewCountArrayList.sort(new Comparator<UserViewCount>() {
                @Override
                public int compare(UserViewCount o1, UserViewCount o2) {
                    return o2.count.intValue() - o1.count.intValue();
                }
            });
            // 提取前两名,构建输出结果
            StringBuilder result = new StringBuilder();
            result.append("========================================\n");
            result.append("窗口结束时间:" + new Timestamp(timestamp - 1) + "\n");
            for (int i = 0; i < this.n; i++) {
                UserViewCount userViewCount = urlViewCountArrayList.get(i);
                if (null == userViewCount){
                    break;
                }
                UserViewCount needUserView = userViewCount;
                String info = "No." + (i + 1) + " "
                        + "url:" + userViewCount.url + " "
                        + "浏览量:" + userViewCount.count + "\n";
                result.append(info);
            }
            result.append("========================================\n");
            out.collect(result.toString());
        }
    }
}

部分结果截图

73d8c9be8b2a4960a39693770de0ac9a.png


相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
11月前
|
分布式计算 数据处理 Apache
Spark和Flink的区别是什么?如何选择?都应用在哪些行业?
【10月更文挑战第10天】Spark和Flink的区别是什么?如何选择?都应用在哪些行业?
1009 1
|
6月前
|
存储 运维 监控
阿里妈妈基于 Flink+Paimon 的 Lakehouse 应用实践
本文总结了阿里妈妈数据技术专家陈亮在Flink Forward Asia 2024大会上的分享,围绕广告业务背景、架构设计及湖仓方案演进展开。内容涵盖广告生态运作、实时数仓挑战与优化,以及基于Paimon的湖仓方案优势。通过分层设计与技术优化,实现业务交付周期缩短30%以上,资源开销降低40%,并大幅提升系统稳定性和运营效率。文章还介绍了阿里云实时计算Flink版的免费试用活动,助力企业探索实时计算与湖仓一体化解决方案。
771 3
阿里妈妈基于 Flink+Paimon 的 Lakehouse 应用实践
|
11月前
|
分布式计算 监控 大数据
大数据-131 - Flink CEP 案例:检测交易活跃用户、超时未交付
大数据-131 - Flink CEP 案例:检测交易活跃用户、超时未交付
230 0
|
6月前
|
SQL 弹性计算 DataWorks
Flink CDC 在阿里云 DataWorks 数据集成入湖场景的应用实践
Flink CDC 在阿里云 DataWorks 数据集成入湖场景的应用实践
242 6
|
6月前
|
消息中间件 JSON 数据库
探索Flink动态CEP:杭州银行的实战案例
探索Flink动态CEP:杭州银行的实战案例
187 5
|
9月前
|
消息中间件 JSON 数据库
探索Flink动态CEP:杭州银行的实战案例
本文由杭州银行大数据工程师唐占峰、欧阳武林撰写,介绍Flink动态CEP的定义、应用场景、技术实现及使用方式。Flink动态CEP是基于Flink的复杂事件处理库,支持在不重启服务的情况下动态更新规则,适应快速变化的业务需求。文章详细阐述了其在反洗钱、反欺诈和实时营销等金融领域的应用,并展示了某金融机构的实际应用案例。通过动态CEP,用户可以实时调整规则,提高系统的灵活性和响应速度,降低维护成本。文中还提供了具体的代码示例和技术细节,帮助读者理解和使用Flink动态CEP。
1057 2
探索Flink动态CEP:杭州银行的实战案例
|
9月前
|
数据处理 数据安全/隐私保护 流计算
Flink 三种时间窗口、窗口处理函数使用及案例
Flink 是处理无界数据流的强大工具,提供了丰富的窗口机制。本文介绍了三种时间窗口(滚动窗口、滑动窗口和会话窗口)及其使用方法,包括时间窗口的概念、窗口处理函数的使用和实际案例。通过这些机制,可以灵活地对数据流进行分析和计算,满足不同的业务需求。
898 27
|
11月前
|
SQL 大数据 API
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
大数据-132 - Flink SQL 基本介绍 与 HelloWorld案例
195 0
|
11月前
|
SQL 消息中间件 分布式计算
大数据-130 - Flink CEP 详解 - CEP开发流程 与 案例实践:恶意登录检测实现
大数据-130 - Flink CEP 详解 - CEP开发流程 与 案例实践:恶意登录检测实现
280 0
|
SQL 机器学习/深度学习 人工智能
Flink 实战:如何解决应用中的技术难题?
倒计时 5 天!4月25-26日,全球首个 Apache 顶级项目在线会议 Flink Forward 精华版即将重磅开启。 Flink Forward 全球在线会议精华版均为中文直播,核心内容分为 Keynote 与社区投票的最感兴趣的 talk 两部分,由 Apache Flink 核心贡献者们对原版英文 talk 进行翻译及解说,您可直接免费在线观看。
Flink 实战:如何解决应用中的技术难题?

热门文章

最新文章