开发第一个Flink应用

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
简介: 今天就用java来一起开发一个简单的Flink应用,作为入门体验

欢迎访问我的GitHub

这里分类和汇总了欣宸的全部原创(含配套源码): https://github.com/zq2599/blog_demos

步骤列表

  • 本次实战经历以下步骤:
  1. 创建应用;
  2. 编码;
  3. 构建;
  4. 提交任务到Flink,验证功能;

环境信息

  1. Flink:1.7;
  2. Flink所在机器的操作系统:CentOS Linux release 7.5.1804;
  3. 开发环境JDK:1.8.0_181;
  4. 开发环境Maven:3.5.0;

应用功能简介

  • 《Flink1.7从安装到体验》一文中,我们在Flink运行SocketWindowWordCount.jar,实现的功能是从socket读取字符串,将其中的每个单词的数量统计出来,今天我们就来编码开发这个应用,实现此功能;

创建应用

  • 应用基本代码是通过mvn命令创建的,在命令行输入以下命令:
mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=1.7.0
  • 按控制台的提示输入groupId、artifactId、version、package等信息,一路回车确认后,会生成一个和你输入的artifactId同名的文件夹,里面是个maven工程:
Define value for property 'groupId': com.bolingcavalry
Define value for property 'artifactId': socketwordcountdemo
Define value for property 'version' 1.0-SNAPSHOT: :
Define value for property 'package' com.bolingcavalry: :
Confirm properties configuration:
groupId: com.bolingcavalry
artifactId: socketwordcountdemo
version: 1.0-SNAPSHOT
package: com.bolingcavalry
  • 用IEDA导入这个maven工程,如下图,已经有了两个类:BatchJob和StreamingJob,BatchJob是用于批处理的,本次实战用不上,因此可以删除,只保留流处理的StreamingJob:

在这里插入图片描述

  • 应用创建成功,接下来可以开始编码了;

编码

  • 您可以选择直接从GitHub下载这个工程的源码,地址和链接信息如下表所示:
名称 链接 备注
项目主页 https://github.com/zq2599/blog_demos 该项目在GitHub上的主页
git仓库地址(https) https://github.com/zq2599/blog_demos.git 该项目源码的仓库地址,https协议
git仓库地址(ssh) git@github.com:zq2599/blog_demos.git 该项目源码的仓库地址,ssh协议
  • 这个git项目中有多个文件夹,本章源码在socketwordcountdemo这个文件夹下,如下图红框所示:

在这里插入图片描述

  • 接下来开始编码:
  • 在StreamingJob类中添加静态内部类WordWithCount,这是个PoJo,用来保存一个具体的单词及其出现频率:
     /**
     * 记录单词及其出现频率的Pojo
     */
    public static class WordWithCount {
        /**
         * 单词内容
         */
        public String word;

        /**
         * 出现频率
         */
        public long count;

        public WordWithCount() {
            super();
        }

        public WordWithCount(String word, long count) {
            this.word = word;
            this.count = count;
        }

        /**
         * 将单词内容和频率展示出来
         * @return
         */
        @Override
        public String toString() {
            return word + " : " + count;
        }
    }
  • 把所有业务逻辑写在StreamJob类的main方法中,如下所示,关键位置都加了中文注释:
public static void main(String[] args) throws Exception {

        //环境信息
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //数据来源是本机9999端口,换行符分隔,您也可以考虑将hostname和port参数通过main方法的入参传入
        DataStream<String> text = env.socketTextStream("localhost", 9999, "\n");

        //通过text对象转换得到新的DataStream对象,
        //转换逻辑是分隔每个字符串,取得的所有单词都创建一个WordWithCount对象
        DataStream<WordWithCount> windowCounts = text.flatMap(new FlatMapFunction<String, WordWithCount>() {
            @Override
            public void flatMap(String s, Collector<WordWithCount> collector) throws Exception {
                for(String word : s.split("\\s")){
                    collector.collect(new WordWithCount(word, 1L));
                }
            }
        })
        .keyBy("word")//key为word字段
        .timeWindow(Time.seconds(5))    //五秒一次的翻滚时间窗口
        .reduce(new ReduceFunction<WordWithCount>() { //reduce策略
            @Override
            public WordWithCount reduce(WordWithCount a, WordWithCount b) throws Exception {
                return new WordWithCount(a.word, a.count+b.count);
            }
        });


        //单线程输出结果
        windowCounts.print().setParallelism(1);

        // 执行
        env.execute("Flink Streaming Java API Skeleton");
    }

构建

  • 在pom.xml文件所在目录下执行命令:
mvn clean package -U
  • 命令执行完毕后,在target目录下的socketwordcountdemo-1.0-SNAPSHOT.jar文件就是构建成功的jar包;

在Flink验证

nc -l 9999
  • 我这边Flink所在机器的IP地址是192.168.1.103,因此用浏览器访问的Flink的web地址为:http://192.168.1.103:8081;
  • 选择刚刚生成的jar文件作为一个新的任务,如下图:

在这里插入图片描述

  • 点击下图红框中的"upload",将文件提交:

在这里插入图片描述

  • 目前还只是将jar文件上传了而已,接下来就是手工设置执行类并启动任务,操作如下图,红框2中填写的前面编写的StreamingJob类的完整名称:

在这里插入图片描述

  • 提交后的页面效果如下图所示,可见一个job已经在运行中了:

在这里插入图片描述

  • 回到Flink所在机器的控制台,在之前输入了nc -l 9999的窗口输入一些英文句子,然后按下回车键,例如:
[root@vostro flink-1.7.0]# ./bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host vostro.
Starting taskexecutor daemon on host vostro.
[root@vostro flink-1.7.0]# nc -l 9999
Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams. Flink has been designed to run in all common cluster environments, perform computations at in-memory speed and at any scale.
  • 接下来看看我们的job的执行效果,如下图,点击左侧的"Task Managers",在右边的列表中只有一个Task,点击它:

在这里插入图片描述

  • 出现的页面有三个tab页,点击"Stdout"这个tab,就能见到我们的任务对之前句子中的单词的统计结果,如下图:

在这里插入图片描述

  • 至此,第一个最简单Flink就完成了。

欢迎关注阿里云开发者社区博客:程序员欣宸

学习路上,你不孤单,欣宸原创一路相伴...
相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
相关文章
|
分布式计算 数据处理 Apache
Spark和Flink的区别是什么?如何选择?都应用在哪些行业?
【10月更文挑战第10天】Spark和Flink的区别是什么?如何选择?都应用在哪些行业?
1409 1
|
存储 监控 大数据
阿里云实时计算Flink在多行业的应用和实践
本文整理自 Flink Forward Asia 2023 中闭门会的分享。主要分享实时计算在各行业的应用实践,对回归实时计算的重点场景进行介绍以及企业如何使用实时计算技术,并且提供一些在技术架构上的参考建议。
1442 7
阿里云实时计算Flink在多行业的应用和实践
|
8月前
|
存储 运维 监控
阿里妈妈基于 Flink+Paimon 的 Lakehouse 应用实践
本文总结了阿里妈妈数据技术专家陈亮在Flink Forward Asia 2024大会上的分享,围绕广告业务背景、架构设计及湖仓方案演进展开。内容涵盖广告生态运作、实时数仓挑战与优化,以及基于Paimon的湖仓方案优势。通过分层设计与技术优化,实现业务交付周期缩短30%以上,资源开销降低40%,并大幅提升系统稳定性和运营效率。文章还介绍了阿里云实时计算Flink版的免费试用活动,助力企业探索实时计算与湖仓一体化解决方案。
927 3
阿里妈妈基于 Flink+Paimon 的 Lakehouse 应用实践
|
8月前
|
关系型数据库 MySQL 数据库
基于Flink CDC 开发,支持Web-UI的实时KingBase 连接器,三大模式无缝切换,效率翻倍!
TIS 是一款基于Web-UI的开源大数据集成工具,通过与人大金仓Kingbase的深度整合,提供高效、灵活的实时数据集成方案。它支持增量数据监听和实时写入,兼容MySQL、PostgreSQL和Oracle模式,无需编写复杂脚本,操作简单直观,特别适合非专业开发人员使用。TIS率先实现了Kingbase CDC连接器的整合,成为业界首个开箱即用的Kingbase CDC数据同步解决方案,助力企业数字化转型。
1676 5
基于Flink CDC 开发,支持Web-UI的实时KingBase 连接器,三大模式无缝切换,效率翻倍!
|
8月前
|
SQL 弹性计算 DataWorks
Flink CDC 在阿里云 DataWorks 数据集成入湖场景的应用实践
Flink CDC 在阿里云 DataWorks 数据集成入湖场景的应用实践
381 6
|
11月前
|
消息中间件 JSON 数据库
探索Flink动态CEP:杭州银行的实战案例
本文由杭州银行大数据工程师唐占峰、欧阳武林撰写,介绍Flink动态CEP的定义、应用场景、技术实现及使用方式。Flink动态CEP是基于Flink的复杂事件处理库,支持在不重启服务的情况下动态更新规则,适应快速变化的业务需求。文章详细阐述了其在反洗钱、反欺诈和实时营销等金融领域的应用,并展示了某金融机构的实际应用案例。通过动态CEP,用户可以实时调整规则,提高系统的灵活性和响应速度,降低维护成本。文中还提供了具体的代码示例和技术细节,帮助读者理解和使用Flink动态CEP。
1326 2
探索Flink动态CEP:杭州银行的实战案例
|
存储 运维 监控
实时计算Flink版在稳定性、性能、开发运维、安全能力等等跟其他引擎及自建Flink集群比较。
实时计算Flink版在稳定性、性能、开发运维和安全能力等方面表现出色。其自研的高性能状态存储引擎GeminiStateBackend显著提升了作业稳定性,状态管理优化使性能提升40%以上。核心性能较开源Flink提升2-3倍,资源利用率提高100%。提供一站式开发管理、自动化运维和丰富的监控告警功能,支持多语言开发和智能调优。安全方面,具备访问控制、高可用保障和全链路容错能力,确保企业级应用的安全与稳定。
223 0
|
数据采集 资源调度 搜索推荐
Flink在实时搜索引擎索引构建中的深度应用与实践
随着数据源规模的扩大和查询请求的增加,如何优化Flink的性能和资源调度成为了一个重要的问题。Flink提供了多种性能优化手段,如并行度调整、状态后端选择、任务链优化等。同时,Flink还支持与YARN、Kubernetes等集群管理系统集成,实现资源的动态调度和弹性伸缩,以适应不同规模的业务需求。
|
消息中间件 分布式计算 Hadoop
实时计算 Flink版操作报错合集之使用flink jar开发,报错:找不到main方法,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
监控 数据可视化 BI
基于Dataphin+Flink构建期货交易监察实时应用
新一代证券交易监察系统利用大数据和实时计算技术强化风险控制、交易数据处理、识别异常交易等能力。通过Dataphin与Flink结合,构建期货交易监察实时数据应用;借助QuickBI用于打造实时看板和预警体系,实现期货交易监察的实时可视化分析和自动化预警。
722 0