实时大数据计算引擎Apache Flink计算研究(一)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
数据传输服务 DTS,数据迁移 small 3个月
推荐场景:
MySQL数据库上云
云数据库 Tair(兼容Redis),内存型 2GB
简介: 近期团队在研究大数据平台产品,在业务场景设计时,经常会遇到实时数据加工的需求,因此开始探索实时大数据计算引擎。同时,我认为Flink也是未来流批一体的趋势。本文将技术预研过程中的要点整理分享出来,供大家参考使用,内容较多,分2个文章发布。

1、Flink local模式安装(Linux)

1.官网下载Flink,并解压到 /opt/software/flink-text/

tar -zxvf flink-1.6.1-bin-hadoop27-scala_2.11.tgz

image.png

2.解压成功后

local模式不需要添加额外配置

./bin/start-cluster.sh

image.png

3.验证是否正常启动

输入jps 验证进程是否启动

image.png

输入网址节点IP加端口号8081

image.png

flink单节点安装已经完成。


2、Flink的流处理与批处理介绍

在大数据处理领域,批处理任务与流处理任务一般被认为是两种不同的任务,一个大数据框架一般会被设计为只能处理其中一种任务。

  • 例如Storm只支持流处理任务,而MapReduce、Spark只支持批处理任务。Spark Streaming是Apache Spark之上支持流处理任务的子系统,看似是一个特例,其实并不是——Spark Streaming采用了一种micro-batch的架构,即把输入的数据流切分成细粒度的batch,并为每一个batch数据提交一个批处理的Spark任务,所以Spark Streaming本质上还是基于Spark批处理系统对流式数据进行处理,和Storm等完全流式的数据处理方式完全不同。
  • Flink通过灵活的执行引擎,能够同时支持批处理任务与流处理任务
  • 在执行引擎这一层,流处理系统与批处理系统最大不同在于节点间的数据传输方式。
  • 对于一个流处理系统,其节点间数据传输的标准模型是:当一条数据被处理完成后,序列化到缓存中,然后立刻通过网络传输到下一个节点,由下一个节点继续处理
  • 而对于一个批处理系统,其节点间数据传输的标准模型是:当一条数据被处理完成后,序列化到缓存中,并不会立刻通过网络传输到下一个节点,当缓存写满,就持久化到本地硬盘上,当所有数据都被处理完成后,才开始将处理后的数据通过网络传输到下一个节点
  • 这两种数据传输模式是两个极端,对应的是流处理系统对低延迟的要求和批处理系统对高吞吐量的要求
  • Flink的执行引擎采用了一种十分灵活的方式,同时支持了这两种数据传输模型
  • Flink以固定的缓存块为单位进行网络数据传输,用户可以通过设置缓存块超时值指定缓存块的传输时机。如果缓存块的超时值为0,则Flink的数据传输方式类似上文所提到流处理系统的标准模型,此时系统可以获得最低的处理延迟
  • 如果缓存块的超时值为无限大,则Flink的数据传输方式类似上文所提到批处理系统的标准模型,此时系统可以获得最高的吞吐量
  • 同时缓存块的超时值也可以设置为0到无限大之间的任意值。缓存块的超时阈值越小,则Flink流处理执行引擎的数据处理延迟越低,但吞吐量也会降低,反之亦然。通过调整缓存块的超时阈值,用户可根据需求灵活地权衡系统延迟和吞吐量

3、Flink应用场景分析

1.优化电商网站的实时搜索结果

  • 阿里巴巴的所有基础设施团队使用flink实时更新产品细节和库存信息(Blink)

针对数据分析团队提供实时流处理服务

  • 通过flink数据分析平台提供实时数据分析服务,及时发现问题

网络/传感器检测和错误检测

  • Bouygues电信公司,是法国最大的电信供应商之一,使用flink监控其有线和无线网络,实现快速故障响应

商业智能分析ETL

  • Zalando使用flink转换数据以便于加载到数据仓库,将复杂的转换操作转化为相对简单的并确保分析终端用户可以更快的访问数据(实时ETL

2.Flink vs Storm vs SparkStreaming

image.png

Flink在吞吐量上要优于strom,在延时上要强于spark流处理


3.实时框架如何选择

小型项目低延迟建议用strom轻量级方标使用。

大型项目并且秒级别的实时处理可以满足需求的话,建议使用sparkStreaming。

要求消息投递语义为 Exactly Once 的场景;数据量较大,要求高吞吐低延迟的场景;需要进行状态管理或窗口统计的场景,建议使用flink。

4、Flink入门案例-wordCount

需求分析

  • 手工通过socket实时产生一些单词,使用flink实时接收数据,对指定时间窗口内(例如:2)的数据进行聚合统计,并且把时间窗口内计算的结果打印出来

代码编写步骤如下:

1:获得一个执行环境

2:加载/创建 初始化数据

3:指定操作数据的transaction算子

4:指定把计算好的数据放在哪

5:调用execute()触发执行程序

  • 注意:Flink程序是延迟计算的,只有最后调用execute()方法的时候才会真正触发执行程序。
  • 延迟计算好处:你可以开发复杂的程序,但是Flink可以将复杂的程序转成一个Plan,将Plan作为一个整体单元执行!

测试执行,

在自己的虚拟机上执行 nc -l 9000,然后输入字母,

image.png

就会在控制台现在单词数量结果如下,

image.png

publicclassSocketWindowWordCountJava {
publicstaticvoidmain(String[] args) throwsException{
//获取需要的端口号intport;
try {
ParameterToolparameterTool=ParameterTool.fromArgs(args);
port=parameterTool.getInt("port");
        }catch (Exceptione){
System.err.println("No port set. use default port 9000--java");
port=9000;
        }
//获取flink的运行环境StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();
Stringhostname="192.168.78.130";
Stringdelimiter="\n";
//连接socket获取输入的数据DataStreamSource<String>text=env.socketTextStream(hostname, port, delimiter);
// a a c// a 1// a 1// c 1DataStream<WordWithCount>windowCounts=text.flatMap(newFlatMapFunction<String, WordWithCount>() {
publicvoidflatMap(Stringvalue, Collector<WordWithCount>out) throwsException {
String[] splits=value.split("\\s");
for (Stringword : splits) {
out.collect(newWordWithCount(word, 1L));
                }
            }
        }).keyBy("word")
                .timeWindow(Time.seconds(2), Time.seconds(1))//指定时间窗口大小为2秒,指定时间间隔为1秒                .sum("count");//在这里使用sum或者reduce都可以/*.reduce(new ReduceFunction<WordWithCount>() {public WordWithCount reduce(WordWithCount a, WordWithCount b) throws Exception {return new WordWithCount(a.word,a.count+b.count);}})*///把数据打印到控制台并且设置并行度windowCounts.print().setParallelism(1);
//这一行代码一定要实现,否则程序不执行env.execute("Socket window count");
    }
publicstaticclassWordWithCount{
publicStringword;
publiclongcount;
publicWordWithCount(){}
publicWordWithCount(Stringword,longcount){
this.word=word;
this.count=count;
        }
@OverridepublicStringtoString() {
return"WordWithCount{"+"word='"+word+'\''+", count="+count+'}';
        }
    }

5、DataStream API之Data Sources

source是程序的数据源输入,你可以通过StreamExecutionEnvironment.addSource(sourceFunction)来为你的程序添加一个source。

flink提供了大量的已经实现好的source方法,你也可以自定义source

  • 通过实现sourceFunction接口来自定义无并行度的source,

或者你也可以通过实现ParallelSourceFunction 接口 or 继承RichParallelSourceFunction 来自定义有并行度的source

已经实现好的source

基于文件

  • readTextFile(path)
  • 读取文本文件,文件遵循TextInputFormat 读取规则,逐行读取并返回。(不常用)

基于socket

  • socketTextStream
    从socker中读取数据,元素可以通过一个分隔符切开。

基于集合

  • fromCollection(Collection)
  • 通过java 的collection集合创建一个数据流,集合中的所有元素必须是相同类型的。(常用自己测试)


  • addSource 可以实现读取第三方数据源的数据
  • 系统内置提供了一批connectors,连接器会提供对应的source支持【kafka】
  • Apache Kafka (source/sink)
    Apache Cassandra (sink)
    Elasticsearch (sink)
    Hadoop FileSystem (sink)
    RabbitMQ (source/sink)
    Apache ActiveMQ (source/sink)
    Redis (sink)

自定义source的实现

没有并行度的数据源

publicclassMyNoParalleSourceimplementsSourceFunction<Long>{
privatelongcount=1L;
privatebooleanisRunning=true;
/*** 主要的方法* 启动一个source* 大部分情况下,都需要在这个run方法中实现一个循环,这样就可以循环产生数据了** @param ctx* @throws Exception*/@Overridepublicvoidrun(SourceContext<Long>ctx) throwsException {
while(isRunning){
ctx.collect(count);
count++;
//每秒产生一条数据Thread.sleep(1000);
        }
    }
/*** 取消一个cancel的时候会调用的方法**/@Overridepublicvoidcancel() {
isRunning=false;
    }
}

测试程序

publicstaticvoidmain(String[] args) throwsException {
//获取Flink的运行环境StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();
//获取数据源DataStreamSource<Long>text=env.addSource(newMyNoParalleSource()).setParallelism(1);//注意:针对此source,并行度只能设置为1DataStream<Long>num=text.map(newMapFunction<Long, Long>() {
@OverridepublicLongmap(Longvalue) throwsException {
System.out.println("接收到数据:"+value);
returnvalue;
            }
        });
//每2秒钟处理一次数据DataStream<Long>sum=num.timeWindowAll(Time.seconds(2)).sum(0);
//打印结果sum.print().setParallelism(1);
StringjobName=StreamingDemoWithMyNoPralalleSource.class.getSimpleName();
env.execute(jobName);
    }
}

测试结果

image.png

有并行度的数据

publicclassMyParalleSourceimplementsParallelSourceFunction<Long> {
privatelongcount=1L;
privatebooleanisRunning=true;
/*** 主要的方法* 启动一个source* 大部分情况下,都需要在这个run方法中实现一个循环,这样就可以循环产生数据了** @param ctx* @throws Exception*/@Overridepublicvoidrun(SourceContext<Long>ctx) throwsException {
while(isRunning){
ctx.collect(count);
count++;
//每秒产生一条数据Thread.sleep(1000);
        }
    }
/*** 取消一个cancel的时候会调用的方法**/@Overridepublicvoidcancel() {
isRunning=false;
    }
}

测试代码

publicclassStreamingDemoWithMyPralalleSource {
publicstaticvoidmain(String[] args) throwsException {
//获取Flink的运行环境StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();
//获取数据源DataStreamSource<Long>text=env.addSource(newMyParalleSource()).setParallelism(2);//主要是这里的不同,这里设置的并行度是2,首先数据源是一个并行的数据源,然后在设置你用几个平行去接这个数据源DataStream<Long>num=text.map(newMapFunction<Long, Long>() {
@OverridepublicLongmap(Longvalue) throwsException {
System.out.println("接收到数据:"+value);
returnvalue;
            }
        });
//每2秒钟处理一次数据DataStream<Long>sum=num.timeWindowAll(Time.seconds(2)).sum(0);
//打印结果sum.print().setParallelism(1);
StringjobName=StreamingDemoWithMyPralalleSource.class.getSimpleName();
env.execute(jobName);
    }
}

测试结果

image.png

高级有并行的实现

/*** 自定义实现一个支持并行度的source** RichParallelSourceFunction 会额外提供open和close方法* 针对source中如果需要获取其他链接资源,那么可以在open方法中获取资源链接,在close中关闭资源链接** Created by xuwei.tech on 2018/10/23.*/publicclassMyRichParalleSourceextendsRichParallelSourceFunction<Long> {
privatelongcount=1L;
privatebooleanisRunning=true;
/*** 主要的方法* 启动一个source* 大部分情况下,都需要在这个run方法中实现一个循环,这样就可以循环产生数据了** @param ctx* @throws Exception*/@Overridepublicvoidrun(SourceContext<Long>ctx) throwsException {
while(isRunning){
ctx.collect(count);
count++;
//每秒产生一条数据Thread.sleep(1000);
        }
    }
/*** 取消一个cancel的时候会调用的方法**/@Overridepublicvoidcancel() {
isRunning=false;
    }
/*** 这个方法只会在最开始的时候被调用一次* 实现获取链接的代码* @param parameters* @throws Exception*/@Overridepublicvoidopen(Configurationparameters) throwsException {
System.out.println("open.............");
super.open(parameters);
    }
/*** 实现关闭链接的代码* @throws Exception*/@Overridepublicvoidclose() throwsException {
super.close();
    }
}

测试代码

publicclassStreamingDemoWithMyRichPralalleSource {
publicstaticvoidmain(String[] args) throwsException {
//获取Flink的运行环境StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();
//获取数据源DataStreamSource<Long>text=env.addSource(newMyRichParalleSource()).setParallelism(2);
DataStream<Long>num=text.map(newMapFunction<Long, Long>() {
@OverridepublicLongmap(Longvalue) throwsException {
System.out.println("接收到数据:"+value);
returnvalue;
            }
        });
//每2秒钟处理一次数据DataStream<Long>sum=num.timeWindowAll(Time.seconds(2)).sum(0);
//打印结果sum.print().setParallelism(1);
StringjobName=StreamingDemoWithMyRichPralalleSource.class.getSimpleName();
env.execute(jobName);
    }
}

测试结果

image.png

6、DataStream API之Transformations

map:输入一个元素,然后返回一个元素,中间可以做一些清洗转换等操作

image.png

flatmap:输入一个元素,可以返回零个,一个或者多个元素

image.png

filter:过滤函数,对传入的数据进行判断,符合条件的数据会被留下

image.png

keyBy:根据指定的key进行分组,相同key的数据会进入同一个分区

两种典型用法

dataStream.keyBy("someKey") // 指定对象中的 "someKey"字段作为分组key

dataStream.keyBy(0) //指定Tuple中的第一个元素作为分组key

注意:以下类型是无法作为key的

1:一个实体类对象,没有重写hashCode方法,并且依赖object的hashCode方法

2:一个任意形式的数组类型

3:基本数据类型,int,long

image.png

reduce:对数据进行聚合操作,结合当前元素和上一次reduce返回的值进行聚合操作,然后返回一个新的值

image.png

aggregations:sum(),min(),max()等

window:在后面单独详解

Union:合并多个流,新的流会包含所有流中的数据,但是union是一个限制,就是所有合并的流类型必须是一致的。image.png

运行结果

image.png

Connect:和union类似,但是只能连接两个流,两个流的数据类型可以不同,会对两个流中的数据应用不同的处理方法。

image.png

image.png

运行结果

image.png


  • CoMap, CoFlatMap:在ConnectedStreams中需要使用这种函数,类似于map和flatmap

Split:根据规则把一个数据流切分为多个流

  • Select:和split配合使用,选择切分后的流
  • 应用场景:

* 可能在实际工作中,源数据流中混合了多种类似的数据,多种类型的数据处理规则不一样,所以就可以在根据一定的规则,
* 把一个数据流切分成多个数据流,这样每个数据流就可以使用不用的处理逻辑了

测试代码如下

image.png

image.png

运行结果

image.png


7、DataStream API之Data Sink

writeAsText():将元素以字符串形式逐行写入,这些字符串通过调用每个元素的toString()方法来获取

print() / printToErr():打印每个元素的toString()方法的值到标准输出或者标准错误输出流中

自定义输出addSink【kafka、redis】

内置Connectors

Apache Kafka (source/sink)

Apache Cassandra (sink)

Elasticsearch (sink)

Hadoop FileSystem (sink)

RabbitMQ (source/sink)

Apache ActiveMQ (source/sink)

Redis (sink)

Sink 容错性保证

Sink

语义保证

备注

hdfs

exactly once


elasticsearch

at least once


kafka produce

at least once/exactly once

Kafka 0.9 and 0.10提供at least once

Kafka 0.11提供exactly once

file

at least once


redis

at least once


publicclassStreamingDemoToRedis {
publicstaticvoidmain(String[] args) throwsException{
StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String>text=env.socketTextStream("hadoop100", 9000, "\n");
//lpsuh l_words word//对数据进行组装,把string转化为tuple2<String,String>DataStream<Tuple2<String, String>>l_wordsData=text.map(newMapFunction<String, Tuple2<String, String>>() {
@OverridepublicTuple2<String, String>map(Stringvalue) throwsException {
returnnewTuple2<>("l_words", value);
            }
        });
//创建redis的配置FlinkJedisPoolConfigconf=newFlinkJedisPoolConfig.Builder().setHost("hadoop110").setPort(6379).build();
//创建redissinkRedisSink<Tuple2<String, String>>redisSink=newRedisSink<>(conf, newMyRedisMapper());
l_wordsData.addSink(redisSink);
env.execute("StreamingDemoToRedis");
    }
publicstaticclassMyRedisMapperimplementsRedisMapper<Tuple2<String, String>>{
//表示从接收的数据中获取需要操作的redis key@OverridepublicStringgetKeyFromData(Tuple2<String, String>data) {
returndata.f0;
        }
//表示从接收的数据中获取需要操作的redis value@OverridepublicStringgetValueFromData(Tuple2<String, String>data) {
returndata.f1;
        }
//你要怎么组好@OverridepublicRedisCommandDescriptiongetCommandDescription() {
returnnewRedisCommandDescription(RedisCommand.LPUSH);
        }
    }
}


篇幅过长,剩余在下一篇文档。

目录
相关文章
|
12天前
|
SQL Java API
Apache Flink 2.0-preview released
Apache Flink 社区正积极筹备 Flink 2.0 的发布,这是自 Flink 1.0 发布以来的首个重大更新。Flink 2.0 将引入多项激动人心的功能和改进,包括存算分离状态管理、物化表、批作业自适应执行等,同时也包含了一些不兼容的变更。目前提供的预览版旨在让用户提前尝试新功能并收集反馈,但不建议在生产环境中使用。
400 13
Apache Flink 2.0-preview released
|
17天前
|
存储 缓存 算法
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
39 3
|
2月前
|
SQL 消息中间件 关系型数据库
Apache Doris Flink Connector 24.0.0 版本正式发布
该版本新增了对 Flink 1.20 的支持,并支持通过 Arrow Flight SQL 高速读取 Doris 中数据。
|
3月前
|
消息中间件 监控 数据挖掘
基于RabbitMQ与Apache Flink构建实时分析系统
【8月更文第28天】本文将介绍如何利用RabbitMQ作为数据源,结合Apache Flink进行实时数据分析。我们将构建一个简单的实时分析系统,该系统能够接收来自不同来源的数据,对数据进行实时处理,并将结果输出到另一个队列或存储系统中。
182 2
|
2月前
|
消息中间件 资源调度 API
Apache Flink 流批融合技术介绍
本文源自阿里云高级研发工程师周云峰在Apache Asia Community OverCode 2024的分享,内容涵盖从“流批一体”到“流批融合”的演进、技术解决方案及社区进展。流批一体已在API、算子和引擎层面实现统一,但用户仍需手动配置作业模式。流批融合旨在通过动态调整优化策略,自动适应不同场景需求。文章详细介绍了如何通过量化指标(如isProcessingBacklog和isInsertOnly)实现这一目标,并展示了针对不同场景的具体优化措施。此外,还概述了社区当前进展及未来规划,包括将优化方案推向Flink社区、动态调整算子流程结构等。
364 31
Apache Flink 流批融合技术介绍
|
21天前
|
分布式计算 监控 大数据
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
44 1
|
2月前
|
人工智能 分布式计算 大数据
超级计算与大数据:推动科学研究的发展
【9月更文挑战第30天】在信息时代,超级计算和大数据技术正成为推动科学研究的关键力量。超级计算凭借强大的计算能力,在尖端科研、国防军工等领域发挥重要作用;大数据技术则提供高效的数据处理工具,促进跨学科合作与创新。两者融合不仅提升了数据处理效率,还推动了人工智能、生物科学等领域的快速发展。未来,随着技术进步和跨学科合作的加深,超级计算与大数据将在科学研究中扮演更加重要的角色。
|
20天前
|
数据挖掘 物联网 数据处理
深入探讨Apache Flink:实时数据流处理的强大框架
在数据驱动时代,企业需高效处理实时数据流。Apache Flink作为开源流处理框架,以其高性能和灵活性成为首选平台。本文详细介绍Flink的核心特性和应用场景,包括实时流处理、强大的状态管理、灵活的窗口机制及批处理兼容性。无论在实时数据分析、金融服务、物联网还是广告技术领域,Flink均展现出巨大潜力,是企业实时数据处理的理想选择。随着大数据需求增长,Flink将继续在数据处理领域发挥重要作用。
51 0
|
25天前
|
消息中间件 druid Kafka
从Apache Flink到Kafka再到Druid的实时数据传输,用于分析/决策
从Apache Flink到Kafka再到Druid的实时数据传输,用于分析/决策
48 0
|
3月前
|
Java 微服务 Spring
驾驭复杂性:Spring Cloud在微服务构建中的决胜法则
【8月更文挑战第31天】Spring Cloud是在Spring Framework基础上打造的微服务解决方案,提供服务发现、配置管理、消息路由等功能,适用于构建复杂的微服务架构。本文介绍如何利用Spring Cloud搭建微服务,包括Eureka服务发现、Config Server配置管理和Zuul API网关等组件的配置与使用。通过Spring Cloud,可实现快速开发、自动化配置,并提升系统的伸缩性和容错性,尽管仍需面对分布式事务等挑战,但其强大的社区支持有助于解决问题。
65 0

热门文章

最新文章

推荐镜像

更多