实时大数据计算引擎Apache Flink计算研究(一)

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
数据传输服务 DTS,数据迁移 small 3个月
推荐场景:
MySQL数据库上云
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: 近期团队在研究大数据平台产品,在业务场景设计时,经常会遇到实时数据加工的需求,因此开始探索实时大数据计算引擎。同时,我认为Flink也是未来流批一体的趋势。本文将技术预研过程中的要点整理分享出来,供大家参考使用,内容较多,分2个文章发布。

1、Flink local模式安装(Linux)

1.官网下载Flink,并解压到 /opt/software/flink-text/

tar -zxvf flink-1.6.1-bin-hadoop27-scala_2.11.tgz

image.png

2.解压成功后

local模式不需要添加额外配置

./bin/start-cluster.sh

image.png

3.验证是否正常启动

输入jps 验证进程是否启动

image.png

输入网址节点IP加端口号8081

image.png

flink单节点安装已经完成。


2、Flink的流处理与批处理介绍

在大数据处理领域,批处理任务与流处理任务一般被认为是两种不同的任务,一个大数据框架一般会被设计为只能处理其中一种任务。

  • 例如Storm只支持流处理任务,而MapReduce、Spark只支持批处理任务。Spark Streaming是Apache Spark之上支持流处理任务的子系统,看似是一个特例,其实并不是——Spark Streaming采用了一种micro-batch的架构,即把输入的数据流切分成细粒度的batch,并为每一个batch数据提交一个批处理的Spark任务,所以Spark Streaming本质上还是基于Spark批处理系统对流式数据进行处理,和Storm等完全流式的数据处理方式完全不同。
  • Flink通过灵活的执行引擎,能够同时支持批处理任务与流处理任务
  • 在执行引擎这一层,流处理系统与批处理系统最大不同在于节点间的数据传输方式。
  • 对于一个流处理系统,其节点间数据传输的标准模型是:当一条数据被处理完成后,序列化到缓存中,然后立刻通过网络传输到下一个节点,由下一个节点继续处理
  • 而对于一个批处理系统,其节点间数据传输的标准模型是:当一条数据被处理完成后,序列化到缓存中,并不会立刻通过网络传输到下一个节点,当缓存写满,就持久化到本地硬盘上,当所有数据都被处理完成后,才开始将处理后的数据通过网络传输到下一个节点
  • 这两种数据传输模式是两个极端,对应的是流处理系统对低延迟的要求和批处理系统对高吞吐量的要求
  • Flink的执行引擎采用了一种十分灵活的方式,同时支持了这两种数据传输模型
  • Flink以固定的缓存块为单位进行网络数据传输,用户可以通过设置缓存块超时值指定缓存块的传输时机。如果缓存块的超时值为0,则Flink的数据传输方式类似上文所提到流处理系统的标准模型,此时系统可以获得最低的处理延迟
  • 如果缓存块的超时值为无限大,则Flink的数据传输方式类似上文所提到批处理系统的标准模型,此时系统可以获得最高的吞吐量
  • 同时缓存块的超时值也可以设置为0到无限大之间的任意值。缓存块的超时阈值越小,则Flink流处理执行引擎的数据处理延迟越低,但吞吐量也会降低,反之亦然。通过调整缓存块的超时阈值,用户可根据需求灵活地权衡系统延迟和吞吐量

3、Flink应用场景分析

1.优化电商网站的实时搜索结果

  • 阿里巴巴的所有基础设施团队使用flink实时更新产品细节和库存信息(Blink)

针对数据分析团队提供实时流处理服务

  • 通过flink数据分析平台提供实时数据分析服务,及时发现问题

网络/传感器检测和错误检测

  • Bouygues电信公司,是法国最大的电信供应商之一,使用flink监控其有线和无线网络,实现快速故障响应

商业智能分析ETL

  • Zalando使用flink转换数据以便于加载到数据仓库,将复杂的转换操作转化为相对简单的并确保分析终端用户可以更快的访问数据(实时ETL

2.Flink vs Storm vs SparkStreaming

image.png

Flink在吞吐量上要优于strom,在延时上要强于spark流处理


3.实时框架如何选择

小型项目低延迟建议用strom轻量级方标使用。

大型项目并且秒级别的实时处理可以满足需求的话,建议使用sparkStreaming。

要求消息投递语义为 Exactly Once 的场景;数据量较大,要求高吞吐低延迟的场景;需要进行状态管理或窗口统计的场景,建议使用flink。

4、Flink入门案例-wordCount

需求分析

  • 手工通过socket实时产生一些单词,使用flink实时接收数据,对指定时间窗口内(例如:2)的数据进行聚合统计,并且把时间窗口内计算的结果打印出来

代码编写步骤如下:

1:获得一个执行环境

2:加载/创建 初始化数据

3:指定操作数据的transaction算子

4:指定把计算好的数据放在哪

5:调用execute()触发执行程序

  • 注意:Flink程序是延迟计算的,只有最后调用execute()方法的时候才会真正触发执行程序。
  • 延迟计算好处:你可以开发复杂的程序,但是Flink可以将复杂的程序转成一个Plan,将Plan作为一个整体单元执行!

测试执行,

在自己的虚拟机上执行 nc -l 9000,然后输入字母,

image.png

就会在控制台现在单词数量结果如下,

image.png

publicclassSocketWindowWordCountJava {
publicstaticvoidmain(String[] args) throwsException{
//获取需要的端口号intport;
try {
ParameterToolparameterTool=ParameterTool.fromArgs(args);
port=parameterTool.getInt("port");
        }catch (Exceptione){
System.err.println("No port set. use default port 9000--java");
port=9000;
        }
//获取flink的运行环境StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();
Stringhostname="192.168.78.130";
Stringdelimiter="\n";
//连接socket获取输入的数据DataStreamSource<String>text=env.socketTextStream(hostname, port, delimiter);
// a a c// a 1// a 1// c 1DataStream<WordWithCount>windowCounts=text.flatMap(newFlatMapFunction<String, WordWithCount>() {
publicvoidflatMap(Stringvalue, Collector<WordWithCount>out) throwsException {
String[] splits=value.split("\\s");
for (Stringword : splits) {
out.collect(newWordWithCount(word, 1L));
                }
            }
        }).keyBy("word")
                .timeWindow(Time.seconds(2), Time.seconds(1))//指定时间窗口大小为2秒,指定时间间隔为1秒                .sum("count");//在这里使用sum或者reduce都可以/*.reduce(new ReduceFunction<WordWithCount>() {public WordWithCount reduce(WordWithCount a, WordWithCount b) throws Exception {return new WordWithCount(a.word,a.count+b.count);}})*///把数据打印到控制台并且设置并行度windowCounts.print().setParallelism(1);
//这一行代码一定要实现,否则程序不执行env.execute("Socket window count");
    }
publicstaticclassWordWithCount{
publicStringword;
publiclongcount;
publicWordWithCount(){}
publicWordWithCount(Stringword,longcount){
this.word=word;
this.count=count;
        }
@OverridepublicStringtoString() {
return"WordWithCount{"+"word='"+word+'\''+", count="+count+'}';
        }
    }

5、DataStream API之Data Sources

source是程序的数据源输入,你可以通过StreamExecutionEnvironment.addSource(sourceFunction)来为你的程序添加一个source。

flink提供了大量的已经实现好的source方法,你也可以自定义source

  • 通过实现sourceFunction接口来自定义无并行度的source,

或者你也可以通过实现ParallelSourceFunction 接口 or 继承RichParallelSourceFunction 来自定义有并行度的source

已经实现好的source

基于文件

  • readTextFile(path)
  • 读取文本文件,文件遵循TextInputFormat 读取规则,逐行读取并返回。(不常用)

基于socket

  • socketTextStream
    从socker中读取数据,元素可以通过一个分隔符切开。

基于集合

  • fromCollection(Collection)
  • 通过java 的collection集合创建一个数据流,集合中的所有元素必须是相同类型的。(常用自己测试)


  • addSource 可以实现读取第三方数据源的数据
  • 系统内置提供了一批connectors,连接器会提供对应的source支持【kafka】
  • Apache Kafka (source/sink)
    Apache Cassandra (sink)
    Elasticsearch (sink)
    Hadoop FileSystem (sink)
    RabbitMQ (source/sink)
    Apache ActiveMQ (source/sink)
    Redis (sink)

自定义source的实现

没有并行度的数据源

publicclassMyNoParalleSourceimplementsSourceFunction<Long>{
privatelongcount=1L;
privatebooleanisRunning=true;
/*** 主要的方法* 启动一个source* 大部分情况下,都需要在这个run方法中实现一个循环,这样就可以循环产生数据了** @param ctx* @throws Exception*/@Overridepublicvoidrun(SourceContext<Long>ctx) throwsException {
while(isRunning){
ctx.collect(count);
count++;
//每秒产生一条数据Thread.sleep(1000);
        }
    }
/*** 取消一个cancel的时候会调用的方法**/@Overridepublicvoidcancel() {
isRunning=false;
    }
}

测试程序

publicstaticvoidmain(String[] args) throwsException {
//获取Flink的运行环境StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();
//获取数据源DataStreamSource<Long>text=env.addSource(newMyNoParalleSource()).setParallelism(1);//注意:针对此source,并行度只能设置为1DataStream<Long>num=text.map(newMapFunction<Long, Long>() {
@OverridepublicLongmap(Longvalue) throwsException {
System.out.println("接收到数据:"+value);
returnvalue;
            }
        });
//每2秒钟处理一次数据DataStream<Long>sum=num.timeWindowAll(Time.seconds(2)).sum(0);
//打印结果sum.print().setParallelism(1);
StringjobName=StreamingDemoWithMyNoPralalleSource.class.getSimpleName();
env.execute(jobName);
    }
}

测试结果

image.png

有并行度的数据

publicclassMyParalleSourceimplementsParallelSourceFunction<Long> {
privatelongcount=1L;
privatebooleanisRunning=true;
/*** 主要的方法* 启动一个source* 大部分情况下,都需要在这个run方法中实现一个循环,这样就可以循环产生数据了** @param ctx* @throws Exception*/@Overridepublicvoidrun(SourceContext<Long>ctx) throwsException {
while(isRunning){
ctx.collect(count);
count++;
//每秒产生一条数据Thread.sleep(1000);
        }
    }
/*** 取消一个cancel的时候会调用的方法**/@Overridepublicvoidcancel() {
isRunning=false;
    }
}

测试代码

publicclassStreamingDemoWithMyPralalleSource {
publicstaticvoidmain(String[] args) throwsException {
//获取Flink的运行环境StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();
//获取数据源DataStreamSource<Long>text=env.addSource(newMyParalleSource()).setParallelism(2);//主要是这里的不同,这里设置的并行度是2,首先数据源是一个并行的数据源,然后在设置你用几个平行去接这个数据源DataStream<Long>num=text.map(newMapFunction<Long, Long>() {
@OverridepublicLongmap(Longvalue) throwsException {
System.out.println("接收到数据:"+value);
returnvalue;
            }
        });
//每2秒钟处理一次数据DataStream<Long>sum=num.timeWindowAll(Time.seconds(2)).sum(0);
//打印结果sum.print().setParallelism(1);
StringjobName=StreamingDemoWithMyPralalleSource.class.getSimpleName();
env.execute(jobName);
    }
}

测试结果

image.png

高级有并行的实现

/*** 自定义实现一个支持并行度的source** RichParallelSourceFunction 会额外提供open和close方法* 针对source中如果需要获取其他链接资源,那么可以在open方法中获取资源链接,在close中关闭资源链接** Created by xuwei.tech on 2018/10/23.*/publicclassMyRichParalleSourceextendsRichParallelSourceFunction<Long> {
privatelongcount=1L;
privatebooleanisRunning=true;
/*** 主要的方法* 启动一个source* 大部分情况下,都需要在这个run方法中实现一个循环,这样就可以循环产生数据了** @param ctx* @throws Exception*/@Overridepublicvoidrun(SourceContext<Long>ctx) throwsException {
while(isRunning){
ctx.collect(count);
count++;
//每秒产生一条数据Thread.sleep(1000);
        }
    }
/*** 取消一个cancel的时候会调用的方法**/@Overridepublicvoidcancel() {
isRunning=false;
    }
/*** 这个方法只会在最开始的时候被调用一次* 实现获取链接的代码* @param parameters* @throws Exception*/@Overridepublicvoidopen(Configurationparameters) throwsException {
System.out.println("open.............");
super.open(parameters);
    }
/*** 实现关闭链接的代码* @throws Exception*/@Overridepublicvoidclose() throwsException {
super.close();
    }
}

测试代码

publicclassStreamingDemoWithMyRichPralalleSource {
publicstaticvoidmain(String[] args) throwsException {
//获取Flink的运行环境StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();
//获取数据源DataStreamSource<Long>text=env.addSource(newMyRichParalleSource()).setParallelism(2);
DataStream<Long>num=text.map(newMapFunction<Long, Long>() {
@OverridepublicLongmap(Longvalue) throwsException {
System.out.println("接收到数据:"+value);
returnvalue;
            }
        });
//每2秒钟处理一次数据DataStream<Long>sum=num.timeWindowAll(Time.seconds(2)).sum(0);
//打印结果sum.print().setParallelism(1);
StringjobName=StreamingDemoWithMyRichPralalleSource.class.getSimpleName();
env.execute(jobName);
    }
}

测试结果

image.png

6、DataStream API之Transformations

map:输入一个元素,然后返回一个元素,中间可以做一些清洗转换等操作

image.png

flatmap:输入一个元素,可以返回零个,一个或者多个元素

image.png

filter:过滤函数,对传入的数据进行判断,符合条件的数据会被留下

image.png

keyBy:根据指定的key进行分组,相同key的数据会进入同一个分区

两种典型用法

dataStream.keyBy("someKey") // 指定对象中的 "someKey"字段作为分组key

dataStream.keyBy(0) //指定Tuple中的第一个元素作为分组key

注意:以下类型是无法作为key的

1:一个实体类对象,没有重写hashCode方法,并且依赖object的hashCode方法

2:一个任意形式的数组类型

3:基本数据类型,int,long

image.png

reduce:对数据进行聚合操作,结合当前元素和上一次reduce返回的值进行聚合操作,然后返回一个新的值

image.png

aggregations:sum(),min(),max()等

window:在后面单独详解

Union:合并多个流,新的流会包含所有流中的数据,但是union是一个限制,就是所有合并的流类型必须是一致的。image.png

运行结果

image.png

Connect:和union类似,但是只能连接两个流,两个流的数据类型可以不同,会对两个流中的数据应用不同的处理方法。

image.png

image.png

运行结果

image.png


  • CoMap, CoFlatMap:在ConnectedStreams中需要使用这种函数,类似于map和flatmap

Split:根据规则把一个数据流切分为多个流

  • Select:和split配合使用,选择切分后的流
  • 应用场景:

* 可能在实际工作中,源数据流中混合了多种类似的数据,多种类型的数据处理规则不一样,所以就可以在根据一定的规则,
* 把一个数据流切分成多个数据流,这样每个数据流就可以使用不用的处理逻辑了

测试代码如下

image.png

image.png

运行结果

image.png


7、DataStream API之Data Sink

writeAsText():将元素以字符串形式逐行写入,这些字符串通过调用每个元素的toString()方法来获取

print() / printToErr():打印每个元素的toString()方法的值到标准输出或者标准错误输出流中

自定义输出addSink【kafka、redis】

内置Connectors

Apache Kafka (source/sink)

Apache Cassandra (sink)

Elasticsearch (sink)

Hadoop FileSystem (sink)

RabbitMQ (source/sink)

Apache ActiveMQ (source/sink)

Redis (sink)

Sink 容错性保证

Sink

语义保证

备注

hdfs

exactly once


elasticsearch

at least once


kafka produce

at least once/exactly once

Kafka 0.9 and 0.10提供at least once

Kafka 0.11提供exactly once

file

at least once


redis

at least once


publicclassStreamingDemoToRedis {
publicstaticvoidmain(String[] args) throwsException{
StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String>text=env.socketTextStream("hadoop100", 9000, "\n");
//lpsuh l_words word//对数据进行组装,把string转化为tuple2<String,String>DataStream<Tuple2<String, String>>l_wordsData=text.map(newMapFunction<String, Tuple2<String, String>>() {
@OverridepublicTuple2<String, String>map(Stringvalue) throwsException {
returnnewTuple2<>("l_words", value);
            }
        });
//创建redis的配置FlinkJedisPoolConfigconf=newFlinkJedisPoolConfig.Builder().setHost("hadoop110").setPort(6379).build();
//创建redissinkRedisSink<Tuple2<String, String>>redisSink=newRedisSink<>(conf, newMyRedisMapper());
l_wordsData.addSink(redisSink);
env.execute("StreamingDemoToRedis");
    }
publicstaticclassMyRedisMapperimplementsRedisMapper<Tuple2<String, String>>{
//表示从接收的数据中获取需要操作的redis key@OverridepublicStringgetKeyFromData(Tuple2<String, String>data) {
returndata.f0;
        }
//表示从接收的数据中获取需要操作的redis value@OverridepublicStringgetValueFromData(Tuple2<String, String>data) {
returndata.f1;
        }
//你要怎么组好@OverridepublicRedisCommandDescriptiongetCommandDescription() {
returnnewRedisCommandDescription(RedisCommand.LPUSH);
        }
    }
}


篇幅过长,剩余在下一篇文档。

目录
相关文章
|
1月前
|
存储 人工智能 大数据
The Past, Present and Future of Apache Flink
本文整理自阿里云开源大数据负责人王峰(莫问)在 Flink Forward Asia 2024 上海站主论坛开场的分享,今年正值 Flink 开源项目诞生的第 10 周年,借此时机,王峰回顾了 Flink 在过去 10 年的发展历程以及 Flink社区当前最新的技术成果,最后展望下一个十年 Flink 路向何方。
340 33
The Past, Present and Future of Apache Flink
|
2月前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
202 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
2月前
|
存储 负载均衡 算法
大数据散列分区计算哈希值
大数据散列分区计算哈希值
58 4
|
3月前
|
SQL Java API
Apache Flink 2.0-preview released
Apache Flink 社区正积极筹备 Flink 2.0 的发布,这是自 Flink 1.0 发布以来的首个重大更新。Flink 2.0 将引入多项激动人心的功能和改进,包括存算分离状态管理、物化表、批作业自适应执行等,同时也包含了一些不兼容的变更。目前提供的预览版旨在让用户提前尝试新功能并收集反馈,但不建议在生产环境中使用。
936 13
Apache Flink 2.0-preview released
|
3月前
|
存储 缓存 算法
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
137 3
|
3月前
|
消息中间件 分布式计算 大数据
大数据-166 Apache Kylin Cube 流式构建 整体流程详细记录
大数据-166 Apache Kylin Cube 流式构建 整体流程详细记录
107 5
zdl
|
2月前
|
消息中间件 运维 大数据
大数据实时计算产品的对比测评:实时计算Flink版 VS 自建Flink集群
本文介绍了实时计算Flink版与自建Flink集群的对比,涵盖部署成本、性能表现、易用性和企业级能力等方面。实时计算Flink版作为全托管服务,显著降低了运维成本,提供了强大的集成能力和弹性扩展,特别适合中小型团队和业务波动大的场景。文中还提出了改进建议,并探讨了与其他产品的联动可能性。总结指出,实时计算Flink版在简化运维、降低成本和提升易用性方面表现出色,是大数据实时计算的优选方案。
zdl
186 56
|
16天前
|
存储 SQL 监控
计算效率提升 10 倍,存储成本降低 60%,灵犀科技基于 Apache Doris 建设统一数据服务平台
灵犀科技早期基于 Hadoop 构建大数据平台,在战略调整和需求的持续扩增下,数据处理效率、查询性能、资源成本问题随之出现。为此,引入 [Apache Doris](https://doris.apache.org/) 替换了复杂技术栈,升级为集存储、加工、服务为一体的统一架构,实现存储成本下降 60%,计算效率提升超 10 倍的显著成效。
计算效率提升 10 倍,存储成本降低 60%,灵犀科技基于 Apache Doris 建设统一数据服务平台
|
2月前
|
分布式计算 Java MaxCompute
ODPS MR节点跑graph连通分量计算代码报错java heap space如何解决
任务启动命令:jar -resources odps-graph-connect-family-2.0-SNAPSHOT.jar -classpath ./odps-graph-connect-family-2.0-SNAPSHOT.jar ConnectFamily 若是设置参数该如何设置
|
2月前
|
分布式计算 大数据 OLAP
AnalyticDB与大数据生态集成:Spark & Flink
【10月更文挑战第25天】在大数据时代,实时数据处理和分析变得越来越重要。AnalyticDB(ADB)是阿里云推出的一款完全托管的实时数据仓库服务,支持PB级数据的实时分析。为了充分发挥AnalyticDB的潜力,将其与大数据处理工具如Apache Spark和Apache Flink集成是非常必要的。本文将从我个人的角度出发,分享如何将AnalyticDB与Spark和Flink集成,构建端到端的大数据处理流水线,实现数据的实时分析和处理。
87 1

推荐镜像

更多