Flink实战:消费Wikipedia实时消息

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
简介: Wikipedia Edit Stream是Flink官网提供的一个经典demo,该应用消费的消息来自维基百科,今天咱们就来一起实战这个demo的开发(比官方demo略有不同)、部署、验证过程

欢迎访问我的GitHub

这里分类和汇总了欣宸的全部原创(含配套源码): https://github.com/zq2599/blog_demos

关于Wikipedia Edit Stream

消息来源

  • 消息的DataSource是个名为WikipediaEditsSource的类,这里面建立了到irc.wikimedia.org的Socker连接,再通过Internet Relay Chat (IRC) 协议接收对方的数据,收到数据后保存在阻塞队列中,通过一个while循环不停的从队列取出数据,再调用SourceContext的collect方法,就在Flink中将这条数据生产出来了;
  • IRC是应用层协议,更多细节请看:https://en.wikipedia.org/wiki/Internet_Relay_Chat
  • 关于WikipediaEditsSource类的深入分析,请参考《Flink数据源拆解分析(WikipediaEditsSource)》

实战简介

  • 本次实战就是消费上述消息,然后统计每个用户十五秒内所有的消息,将每次操作的字节数累加起来,就得到用户十五秒内操作的字节数总和,并且每次累加了多少都会记录下来并最终和聚合结果一起展示;

和官网demo的不同之处

  • 和官网的demo略有不同,官网用的是Tuple2来处理数据,但我这里用了Tuple3,多保存了一个StringBuilder对象,用来记录每次聚合时加了哪些值,这样在结果中通过这个字段就能看出来这个时间窗口内每个用户做了多少次聚合,每次是什么值:

环境信息

  • Flink:1.7;
  • 运行模式:单机(官网称之为Local Flink Cluster);
  • Flink所在机器的操作系统:CentOS Linux release 7.5.1804;
  • 开发环境JDK:1.8.0_181;
  • 开发环境Maven:3.5.0;

操作步骤简介

  • 今天的实战分为以下步骤:
  1. 创建应用;
  2. 编码;
  3. 构建;
  4. 部署运行;

创建应用

  • 应用基本代码是通过mvn命令创建的,在命令行输入以下命令:
mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=1.7.0
  • 按控制台的提示输入groupId、artifactId、version、package等信息,一路回车确认后,会生成一个和你输入的artifactId同名的文件夹(我这里是wikipediaeditstreamdemo),里面是个maven工程:
Define value for property 'groupId': com.bolingcavalry
Define value for property 'artifactId': wikipediaeditstreamdemo
Define value for property 'version' 1.0-SNAPSHOT: :
Define value for property 'package' com.bolingcavalry: :
Confirm properties configuration:
groupId: com.bolingcavalry
artifactId: wikipediaeditstreamdemo
version: 1.0-SNAPSHOT
package: com.bolingcavalry
 Y: :
  • 用IEDA导入这个maven工程,如下图,已经有了两个类:BatchJob和StreamingJob,BatchJob是用于批处理的,本次实战用不上,因此可以删除,只保留流处理的StreamingJob:

在这里插入图片描述

  • 应用创建成功,接下来可以开始编码了;

编码

  • 您可以选择直接从GitHub下载这个工程的源码,地址和链接信息如下表所示:
名称 链接 备注
项目主页 https://github.com/zq2599/blog_demos 该项目在GitHub上的主页
git仓库地址(https) https://github.com/zq2599/blog_demos.git 该项目源码的仓库地址,https协议
git仓库地址(ssh) git@github.com:zq2599/blog_demos.git 该项目源码的仓库地址,ssh协议
  • 这个git项目中有多个文件夹,本章源码在wikipediaeditstreamdemo这个文件夹下,如下图红框所示:

在这里插入图片描述

  • 接下来开始编码:
  • 在pom.mxl文件中增加wikipedia相关的库依赖:
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-wikiedits_2.11</artifactId>
    <version>${flink.version}</version>
</dependency>
  • 在类中增加代码,如下所示,源码中已加详细注释:
package com.bolingcavalry;

import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditEvent;
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSource;

public class StreamingJob {

    public static void main(String[] args) throws Exception {
        // 环境信息
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.addSource(new WikipediaEditsSource())
                //以用户名为key分组
                .keyBy((KeySelector<WikipediaEditEvent, String>) wikipediaEditEvent -> wikipediaEditEvent.getUser())
                //时间窗口为5秒
                .timeWindow(Time.seconds(15))
                //在时间窗口内按照key将所有数据做聚合
                .aggregate(new AggregateFunction<WikipediaEditEvent, Tuple3<String, Integer, StringBuilder>, Tuple3<String, Integer, StringBuilder>>() {
                    @Override
                    public Tuple3<String, Integer, StringBuilder> createAccumulator() {
                        //创建ACC
                        return new Tuple3<>("", 0, new StringBuilder());
                    }

                    @Override
                    public Tuple3<String, Integer, StringBuilder> add(WikipediaEditEvent wikipediaEditEvent, Tuple3<String, Integer, StringBuilder> tuple3) {

                        StringBuilder sbud = tuple3.f2;

                        //如果是第一条记录,就加个"Details :"作为前缀,
                        //如果不是第一条记录,就用空格作为分隔符
                        if(StringUtils.isBlank(sbud.toString())){
                            sbud.append("Details : ");
                        }else {
                            sbud.append(" ");
                        }

                        //聚合逻辑是将改动的字节数累加
                        return new Tuple3<>(wikipediaEditEvent.getUser(),
                                wikipediaEditEvent.getByteDiff() + tuple3.f1,
                                sbud.append(wikipediaEditEvent.getByteDiff()));
                    }

                    @Override
                    public Tuple3<String, Integer, StringBuilder> getResult(Tuple3<String, Integer, StringBuilder> tuple3) {
                        return tuple3;
                    }

                    @Override
                    public Tuple3<String, Integer, StringBuilder> merge(Tuple3<String, Integer, StringBuilder> tuple3, Tuple3<String, Integer, StringBuilder> acc1) {
                        //合并窗口的场景才会用到
                        return new Tuple3<>(tuple3.f0,
                                tuple3.f1 + acc1.f1, tuple3.f2.append(acc1.f2));
                    }
                })
                //聚合操作后,将每个key的聚合结果单独转为字符串
                .map((MapFunction<Tuple3<String, Integer, StringBuilder>, String>) tuple3 -> tuple3.toString())
                //输出方式是STDOUT
                .print();

        // 执行
        env.execute("Flink Streaming Java API Skeleton");
    }
}
  • 至此编码结束;

构建

  • 在pom.xml文件所在目录下执行命令:
mvn clean package -U
  • 命令执行完毕后,在target目录下的wikipediaeditstreamdemo-1.0-SNAPSHOT.jar文件就是构建成功的jar包;

在Flink验证

  • Flink的安装和启动请参考《Flink1.7从安装到体验》
  • 我这边Flink所在机器的IP地址是192.168.1.103,因此用浏览器访问的Flink的web地址为:http://192.168.1.103:8081 ;
  • 选择刚刚生成的jar文件作为一个新的任务,如下图:

在这里插入图片描述

  • 点击下图红框中的"upload",将文件提交:

在这里插入图片描述

  • 目前还只是将jar文件上传了而已,接下来就是手工设置执行类并启动任务,操作如下图,红框2中填写的前面编写的StreamingJob类的完整名称:

在这里插入图片描述

  • 提交后的页面效果如下图所示,可见一个job已经在运行中了:

在这里插入图片描述

  • 接下来看看我们的job的执行效果,如下图,以用户名聚合后的字数统计已经被打印出来了,并且Details后面的内容还展示了具体的聚合情况:

在这里插入图片描述

  • 至此,一个实施处理的Flink应用就开发完成了,希望能给您的开发过程提供一些参考,后面的实战中咱们一起继续深入学习和探讨Flink;

欢迎关注阿里云开发者社区博客:程序员欣宸

学习路上,你不孤单,欣宸原创一路相伴...
相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
SQL Java 关系型数据库
Flink DataSet API迁移到DataStream API实战
本文介绍了作者的Flink项目从DataSet API迁移到DataStream API的背景、方法和遇到的问题以及解决方案。
321 3
|
6月前
|
消息中间件 运维 Kafka
直播预告|Kafka+Flink双引擎实战:手把手带你搭建分布式实时分析平台!
在数字化转型中,企业亟需从海量数据中快速提取价值并转化为业务增长动力。5月15日19:00-21:00,阿里云三位技术专家将讲解Kafka与Flink的强强联合方案,帮助企业零门槛构建分布式实时分析平台。此组合广泛应用于实时风控、用户行为追踪等场景,具备高吞吐、弹性扩缩容及亚秒级响应优势。直播适合初学者、开发者和数据工程师,参与还有机会领取定制好礼!扫描海报二维码或点击链接预约直播:[https://developer.aliyun.com/live/255088](https://developer.aliyun.com/live/255088)
480 35
直播预告|Kafka+Flink双引擎实战:手把手带你搭建分布式实时分析平台!
|
6月前
|
消息中间件 运维 Kafka
直播预告|Kafka+Flink 双引擎实战:手把手带你搭建分布式实时分析平台!
直播预告|Kafka+Flink 双引擎实战:手把手带你搭建分布式实时分析平台!
217 13
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
843 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
8月前
|
消息中间件 JSON 数据库
探索Flink动态CEP:杭州银行的实战案例
探索Flink动态CEP:杭州银行的实战案例
293 5
|
11月前
|
消息中间件 JSON 数据库
探索Flink动态CEP:杭州银行的实战案例
本文由杭州银行大数据工程师唐占峰、欧阳武林撰写,介绍Flink动态CEP的定义、应用场景、技术实现及使用方式。Flink动态CEP是基于Flink的复杂事件处理库,支持在不重启服务的情况下动态更新规则,适应快速变化的业务需求。文章详细阐述了其在反洗钱、反欺诈和实时营销等金融领域的应用,并展示了某金融机构的实际应用案例。通过动态CEP,用户可以实时调整规则,提高系统的灵活性和响应速度,降低维护成本。文中还提供了具体的代码示例和技术细节,帮助读者理解和使用Flink动态CEP。
1326 2
探索Flink动态CEP:杭州银行的实战案例
|
Java 程序员 网络安全
Flink处理函数实战之四:窗口处理
学习Flink低阶处理函数中的ProcessAllWindowFunction和ProcessWindowFunction
203 0
Flink处理函数实战之四:窗口处理
|
大数据 API 数据处理
揭秘!Flink如何从默默无闻到大数据界的璀璨明星?起源、设计理念与实战秘籍大公开!
【8月更文挑战第24天】Apache Flink是一款源自Stratosphere项目的开源流处理框架,由柏林理工大学等机构于2010至2014年间开发,并于2014年捐赠给Apache软件基金会。Flink设计之初即聚焦于提供统一的数据处理模型,支持事件时间处理、精确一次状态一致性等特性,实现了流批一体化处理。其核心优势包括高吞吐量、低延迟及强大的容错机制。
263 1
|
API C# Shell
WPF与Windows Shell完美融合:深入解析文件系统操作技巧——从基本文件管理到高级Shell功能调用,全面掌握WPF中的文件处理艺术
【8月更文挑战第31天】Windows Presentation Foundation (WPF) 是 .NET Framework 的关键组件,用于构建 Windows 桌面应用程序。WPF 提供了丰富的功能来创建美观且功能强大的用户界面。本文通过问题解答的形式,探讨了如何在 WPF 应用中集成 Windows Shell 功能,并通过具体示例代码展示了文件系统的操作方法,包括列出目录下的所有文件、创建和删除文件、移动和复制文件以及打开文件夹或文件等。
358 0
|
传感器 存储 缓存
[尚硅谷flink学习笔记] 实战案例TopN 问题
这段内容是关于如何使用Apache Flink解决实时统计水位传感器数据中,在一定时间窗口内出现次数最多的水位问题,即&quot;Top N&quot;问题。首先,介绍了一个使用滑动窗口的简单实现,通过收集传感器数据,按照水位计数,然后排序并输出前两名。接着,提出了全窗口和优化方案,其中优化包括按键分区(按水位vc分组)、开窗操作(增量聚合计算count)和过程函数处理(聚合并排序输出Top N结果)。最后,给出了一个使用`KeyedProcessFunction`进行优化的示例代码,通过按键by窗口结束时间,确保每个窗口的所有数据到达后再进行处理,提高了效率。
361 1