Flink实战:消费Wikipedia实时消息

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Wikipedia Edit Stream是Flink官网提供的一个经典demo,该应用消费的消息来自维基百科,今天咱们就来一起实战这个demo的开发(比官方demo略有不同)、部署、验证过程

欢迎访问我的GitHub

这里分类和汇总了欣宸的全部原创(含配套源码): https://github.com/zq2599/blog_demos

关于Wikipedia Edit Stream

消息来源

  • 消息的DataSource是个名为WikipediaEditsSource的类,这里面建立了到irc.wikimedia.org的Socker连接,再通过Internet Relay Chat (IRC) 协议接收对方的数据,收到数据后保存在阻塞队列中,通过一个while循环不停的从队列取出数据,再调用SourceContext的collect方法,就在Flink中将这条数据生产出来了;
  • IRC是应用层协议,更多细节请看:https://en.wikipedia.org/wiki/Internet_Relay_Chat
  • 关于WikipediaEditsSource类的深入分析,请参考《Flink数据源拆解分析(WikipediaEditsSource)》

实战简介

  • 本次实战就是消费上述消息,然后统计每个用户十五秒内所有的消息,将每次操作的字节数累加起来,就得到用户十五秒内操作的字节数总和,并且每次累加了多少都会记录下来并最终和聚合结果一起展示;

和官网demo的不同之处

  • 和官网的demo略有不同,官网用的是Tuple2来处理数据,但我这里用了Tuple3,多保存了一个StringBuilder对象,用来记录每次聚合时加了哪些值,这样在结果中通过这个字段就能看出来这个时间窗口内每个用户做了多少次聚合,每次是什么值:

环境信息

  • Flink:1.7;
  • 运行模式:单机(官网称之为Local Flink Cluster);
  • Flink所在机器的操作系统:CentOS Linux release 7.5.1804;
  • 开发环境JDK:1.8.0_181;
  • 开发环境Maven:3.5.0;

操作步骤简介

  • 今天的实战分为以下步骤:
  1. 创建应用;
  2. 编码;
  3. 构建;
  4. 部署运行;

创建应用

  • 应用基本代码是通过mvn命令创建的,在命令行输入以下命令:
mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=1.7.0
  • 按控制台的提示输入groupId、artifactId、version、package等信息,一路回车确认后,会生成一个和你输入的artifactId同名的文件夹(我这里是wikipediaeditstreamdemo),里面是个maven工程:
Define value for property 'groupId': com.bolingcavalry
Define value for property 'artifactId': wikipediaeditstreamdemo
Define value for property 'version' 1.0-SNAPSHOT: :
Define value for property 'package' com.bolingcavalry: :
Confirm properties configuration:
groupId: com.bolingcavalry
artifactId: wikipediaeditstreamdemo
version: 1.0-SNAPSHOT
package: com.bolingcavalry
 Y: :
  • 用IEDA导入这个maven工程,如下图,已经有了两个类:BatchJob和StreamingJob,BatchJob是用于批处理的,本次实战用不上,因此可以删除,只保留流处理的StreamingJob:

在这里插入图片描述

  • 应用创建成功,接下来可以开始编码了;

编码

  • 您可以选择直接从GitHub下载这个工程的源码,地址和链接信息如下表所示:
名称 链接 备注
项目主页 https://github.com/zq2599/blog_demos 该项目在GitHub上的主页
git仓库地址(https) https://github.com/zq2599/blog_demos.git 该项目源码的仓库地址,https协议
git仓库地址(ssh) git@github.com:zq2599/blog_demos.git 该项目源码的仓库地址,ssh协议
  • 这个git项目中有多个文件夹,本章源码在wikipediaeditstreamdemo这个文件夹下,如下图红框所示:

在这里插入图片描述

  • 接下来开始编码:
  • 在pom.mxl文件中增加wikipedia相关的库依赖:
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-wikiedits_2.11</artifactId>
    <version>${flink.version}</version>
</dependency>
  • 在类中增加代码,如下所示,源码中已加详细注释:
package com.bolingcavalry;

import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditEvent;
import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSource;

public class StreamingJob {

    public static void main(String[] args) throws Exception {
        // 环境信息
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.addSource(new WikipediaEditsSource())
                //以用户名为key分组
                .keyBy((KeySelector<WikipediaEditEvent, String>) wikipediaEditEvent -> wikipediaEditEvent.getUser())
                //时间窗口为5秒
                .timeWindow(Time.seconds(15))
                //在时间窗口内按照key将所有数据做聚合
                .aggregate(new AggregateFunction<WikipediaEditEvent, Tuple3<String, Integer, StringBuilder>, Tuple3<String, Integer, StringBuilder>>() {
                    @Override
                    public Tuple3<String, Integer, StringBuilder> createAccumulator() {
                        //创建ACC
                        return new Tuple3<>("", 0, new StringBuilder());
                    }

                    @Override
                    public Tuple3<String, Integer, StringBuilder> add(WikipediaEditEvent wikipediaEditEvent, Tuple3<String, Integer, StringBuilder> tuple3) {

                        StringBuilder sbud = tuple3.f2;

                        //如果是第一条记录,就加个"Details :"作为前缀,
                        //如果不是第一条记录,就用空格作为分隔符
                        if(StringUtils.isBlank(sbud.toString())){
                            sbud.append("Details : ");
                        }else {
                            sbud.append(" ");
                        }

                        //聚合逻辑是将改动的字节数累加
                        return new Tuple3<>(wikipediaEditEvent.getUser(),
                                wikipediaEditEvent.getByteDiff() + tuple3.f1,
                                sbud.append(wikipediaEditEvent.getByteDiff()));
                    }

                    @Override
                    public Tuple3<String, Integer, StringBuilder> getResult(Tuple3<String, Integer, StringBuilder> tuple3) {
                        return tuple3;
                    }

                    @Override
                    public Tuple3<String, Integer, StringBuilder> merge(Tuple3<String, Integer, StringBuilder> tuple3, Tuple3<String, Integer, StringBuilder> acc1) {
                        //合并窗口的场景才会用到
                        return new Tuple3<>(tuple3.f0,
                                tuple3.f1 + acc1.f1, tuple3.f2.append(acc1.f2));
                    }
                })
                //聚合操作后,将每个key的聚合结果单独转为字符串
                .map((MapFunction<Tuple3<String, Integer, StringBuilder>, String>) tuple3 -> tuple3.toString())
                //输出方式是STDOUT
                .print();

        // 执行
        env.execute("Flink Streaming Java API Skeleton");
    }
}
  • 至此编码结束;

构建

  • 在pom.xml文件所在目录下执行命令:
mvn clean package -U
  • 命令执行完毕后,在target目录下的wikipediaeditstreamdemo-1.0-SNAPSHOT.jar文件就是构建成功的jar包;

在Flink验证

  • Flink的安装和启动请参考《Flink1.7从安装到体验》
  • 我这边Flink所在机器的IP地址是192.168.1.103,因此用浏览器访问的Flink的web地址为:http://192.168.1.103:8081 ;
  • 选择刚刚生成的jar文件作为一个新的任务,如下图:

在这里插入图片描述

  • 点击下图红框中的"upload",将文件提交:

在这里插入图片描述

  • 目前还只是将jar文件上传了而已,接下来就是手工设置执行类并启动任务,操作如下图,红框2中填写的前面编写的StreamingJob类的完整名称:

在这里插入图片描述

  • 提交后的页面效果如下图所示,可见一个job已经在运行中了:

在这里插入图片描述

  • 接下来看看我们的job的执行效果,如下图,以用户名聚合后的字数统计已经被打印出来了,并且Details后面的内容还展示了具体的聚合情况:

在这里插入图片描述

  • 至此,一个实施处理的Flink应用就开发完成了,希望能给您的开发过程提供一些参考,后面的实战中咱们一起继续深入学习和探讨Flink;

欢迎关注阿里云开发者社区博客:程序员欣宸

学习路上,你不孤单,欣宸原创一路相伴...
相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
5月前
|
SQL Java 关系型数据库
Flink DataSet API迁移到DataStream API实战
本文介绍了作者的Flink项目从DataSet API迁移到DataStream API的背景、方法和遇到的问题以及解决方案。
212 3
|
2月前
|
大数据 API 数据处理
揭秘!Flink如何从默默无闻到大数据界的璀璨明星?起源、设计理念与实战秘籍大公开!
【8月更文挑战第24天】Apache Flink是一款源自Stratosphere项目的开源流处理框架,由柏林理工大学等机构于2010至2014年间开发,并于2014年捐赠给Apache软件基金会。Flink设计之初即聚焦于提供统一的数据处理模型,支持事件时间处理、精确一次状态一致性等特性,实现了流批一体化处理。其核心优势包括高吞吐量、低延迟及强大的容错机制。
53 1
|
2月前
|
API C# Shell
WPF与Windows Shell完美融合:深入解析文件系统操作技巧——从基本文件管理到高级Shell功能调用,全面掌握WPF中的文件处理艺术
【8月更文挑战第31天】Windows Presentation Foundation (WPF) 是 .NET Framework 的关键组件,用于构建 Windows 桌面应用程序。WPF 提供了丰富的功能来创建美观且功能强大的用户界面。本文通过问题解答的形式,探讨了如何在 WPF 应用中集成 Windows Shell 功能,并通过具体示例代码展示了文件系统的操作方法,包括列出目录下的所有文件、创建和删除文件、移动和复制文件以及打开文件夹或文件等。
62 0
|
5月前
|
传感器 存储 缓存
[尚硅谷flink学习笔记] 实战案例TopN 问题
这段内容是关于如何使用Apache Flink解决实时统计水位传感器数据中,在一定时间窗口内出现次数最多的水位问题,即&quot;Top N&quot;问题。首先,介绍了一个使用滑动窗口的简单实现,通过收集传感器数据,按照水位计数,然后排序并输出前两名。接着,提出了全窗口和优化方案,其中优化包括按键分区(按水位vc分组)、开窗操作(增量聚合计算count)和过程函数处理(聚合并排序输出Top N结果)。最后,给出了一个使用`KeyedProcessFunction`进行优化的示例代码,通过按键by窗口结束时间,确保每个窗口的所有数据到达后再进行处理,提高了效率。
168 1
|
Java 程序员 网络安全
Flink处理函数实战之四:窗口处理
学习Flink低阶处理函数中的ProcessAllWindowFunction和ProcessWindowFunction
133 0
Flink处理函数实战之四:窗口处理
|
机器学习/深度学习 Java 程序员
Flink处理函数实战之三:KeyedProcessFunction类
通过实战学习和了解处理函数的KeyedProcessFunction类
131 1
Flink处理函数实战之三:KeyedProcessFunction类
|
Kubernetes 网络协议 Java
Flink Native Kubernetes实战
Flink Native Kubernetes是1.10版本才有的新功能,通过bin目录下的工具控制kubernetes环境下的flink操作
133 1
Flink Native Kubernetes实战
|
分布式计算 大数据 Hadoop
终于学完了阿里云大数据架构师推荐的Flink入门与实战PDF
Flink项目是大数据计算领域冉冉升起的一颗新星。大数据计算引擎的发展经历了几个过程,从第1代的MapReduce,到第2代基于有向无环图的Tez,第3代基于内存计算的Spark,再到第4代的Flink。因为Flink可以基于Hadoop进行开发和使用,所以Flink并不会取代Hadoop,而是和Hadoop紧密结合。
终于学完了阿里云大数据架构师推荐的Flink入门与实战PDF
|
程序员 API 数据库
Flink的sink实战之一:初探
学习和实践Flink的data sink相关的技术细节
223 1
Flink的sink实战之一:初探
|
消息中间件 SQL NoSQL
Flink的sink实战之三:cassandra3
实践flink数据集sink到cassandra3
127 1
Flink的sink实战之三:cassandra3