1、Flink local模式安装(Linux)
1.在官网下载Flink,并解压到 /opt/software/flink-text/
tar -zxvf flink-1.6.1-bin-hadoop27-scala_2.11.tgz
2.解压成功后
local模式不需要添加额外配置
./bin/start-cluster.sh
3.验证是否正常启动
输入jps 验证进程是否启动
输入网址节点IP加端口号8081
flink单节点安装已经完成。
2、Flink的流处理与批处理介绍
在大数据处理领域,批处理任务与流处理任务一般被认为是两种不同的任务,一个大数据框架一般会被设计为只能处理其中一种任务。
- 例如Storm只支持流处理任务,而MapReduce、Spark只支持批处理任务。Spark Streaming是Apache Spark之上支持流处理任务的子系统,看似是一个特例,其实并不是——Spark Streaming采用了一种micro-batch的架构,即把输入的数据流切分成细粒度的batch,并为每一个batch数据提交一个批处理的Spark任务,所以Spark Streaming本质上还是基于Spark批处理系统对流式数据进行处理,和Storm等完全流式的数据处理方式完全不同。
- Flink通过灵活的执行引擎,能够同时支持批处理任务与流处理任务
- 在执行引擎这一层,流处理系统与批处理系统最大不同在于节点间的数据传输方式。
- 对于一个流处理系统,其节点间数据传输的标准模型是:当一条数据被处理完成后,序列化到缓存中,然后立刻通过网络传输到下一个节点,由下一个节点继续处理
- 而对于一个批处理系统,其节点间数据传输的标准模型是:当一条数据被处理完成后,序列化到缓存中,并不会立刻通过网络传输到下一个节点,当缓存写满,就持久化到本地硬盘上,当所有数据都被处理完成后,才开始将处理后的数据通过网络传输到下一个节点
- 这两种数据传输模式是两个极端,对应的是流处理系统对低延迟的要求和批处理系统对高吞吐量的要求
- Flink的执行引擎采用了一种十分灵活的方式,同时支持了这两种数据传输模型
- Flink以固定的缓存块为单位进行网络数据传输,用户可以通过设置缓存块超时值指定缓存块的传输时机。如果缓存块的超时值为0,则Flink的数据传输方式类似上文所提到流处理系统的标准模型,此时系统可以获得最低的处理延迟
- 如果缓存块的超时值为无限大,则Flink的数据传输方式类似上文所提到批处理系统的标准模型,此时系统可以获得最高的吞吐量
- 同时缓存块的超时值也可以设置为0到无限大之间的任意值。缓存块的超时阈值越小,则Flink流处理执行引擎的数据处理延迟越低,但吞吐量也会降低,反之亦然。通过调整缓存块的超时阈值,用户可根据需求灵活地权衡系统延迟和吞吐量
3、Flink应用场景分析
1.优化电商网站的实时搜索结果
- 阿里巴巴的所有基础设施团队使用flink实时更新产品细节和库存信息(Blink)
针对数据分析团队提供实时流处理服务
- 通过flink数据分析平台提供实时数据分析服务,及时发现问题
网络/传感器检测和错误检测
- Bouygues电信公司,是法国最大的电信供应商之一,使用flink监控其有线和无线网络,实现快速故障响应
商业智能分析ETL
- Zalando使用flink转换数据以便于加载到数据仓库,将复杂的转换操作转化为相对简单的并确保分析终端用户可以更快的访问数据(实时ETL
2.Flink vs Storm vs SparkStreaming
Flink在吞吐量上要优于strom,在延时上要强于spark流处理
3.实时框架如何选择
小型项目低延迟建议用strom轻量级方标使用。
大型项目并且秒级别的实时处理可以满足需求的话,建议使用sparkStreaming。
要求消息投递语义为 Exactly Once 的场景;数据量较大,要求高吞吐低延迟的场景;需要进行状态管理或窗口统计的场景,建议使用flink。
4、Flink入门案例-wordCount
需求分析
- 手工通过socket实时产生一些单词,使用flink实时接收数据,对指定时间窗口内(例如:2秒)的数据进行聚合统计,并且把时间窗口内计算的结果打印出来
代码编写步骤如下:
1:获得一个执行环境
2:加载/创建 初始化数据
3:指定操作数据的transaction算子
4:指定把计算好的数据放在哪
5:调用execute()触发执行程序
- 注意:Flink程序是延迟计算的,只有最后调用execute()方法的时候才会真正触发执行程序。
- 延迟计算好处:你可以开发复杂的程序,但是Flink可以将复杂的程序转成一个Plan,将Plan作为一个整体单元执行!
测试执行,
在自己的虚拟机上执行 nc -l 9000,然后输入字母,
就会在控制台现在单词数量结果如下,
publicclassSocketWindowWordCountJava { publicstaticvoidmain(String[] args) throwsException{ //获取需要的端口号intport; try { ParameterToolparameterTool=ParameterTool.fromArgs(args); port=parameterTool.getInt("port"); }catch (Exceptione){ System.err.println("No port set. use default port 9000--java"); port=9000; } //获取flink的运行环境StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment(); Stringhostname="192.168.78.130"; Stringdelimiter="\n"; //连接socket获取输入的数据DataStreamSource<String>text=env.socketTextStream(hostname, port, delimiter); // a a c// a 1// a 1// c 1DataStream<WordWithCount>windowCounts=text.flatMap(newFlatMapFunction<String, WordWithCount>() { publicvoidflatMap(Stringvalue, Collector<WordWithCount>out) throwsException { String[] splits=value.split("\\s"); for (Stringword : splits) { out.collect(newWordWithCount(word, 1L)); } } }).keyBy("word") .timeWindow(Time.seconds(2), Time.seconds(1))//指定时间窗口大小为2秒,指定时间间隔为1秒 .sum("count");//在这里使用sum或者reduce都可以/*.reduce(new ReduceFunction<WordWithCount>() {public WordWithCount reduce(WordWithCount a, WordWithCount b) throws Exception {return new WordWithCount(a.word,a.count+b.count);}})*///把数据打印到控制台并且设置并行度windowCounts.print().setParallelism(1); //这一行代码一定要实现,否则程序不执行env.execute("Socket window count"); } publicstaticclassWordWithCount{ publicStringword; publiclongcount; publicWordWithCount(){} publicWordWithCount(Stringword,longcount){ this.word=word; this.count=count; } publicStringtoString() { return"WordWithCount{"+"word='"+word+'\''+", count="+count+'}'; } }
5、DataStream API之Data Sources
source是程序的数据源输入,你可以通过StreamExecutionEnvironment.addSource(sourceFunction)来为你的程序添加一个source。
flink提供了大量的已经实现好的source方法,你也可以自定义source
- 通过实现sourceFunction接口来自定义无并行度的source,
或者你也可以通过实现ParallelSourceFunction 接口 or 继承RichParallelSourceFunction 来自定义有并行度的source
已经实现好的source
基于文件
- readTextFile(path)
- 读取文本文件,文件遵循TextInputFormat 读取规则,逐行读取并返回。(不常用)
基于socket
- socketTextStream
从socker中读取数据,元素可以通过一个分隔符切开。
基于集合
- fromCollection(Collection)
- 通过java 的collection集合创建一个数据流,集合中的所有元素必须是相同类型的。(常用自己测试)
- addSource 可以实现读取第三方数据源的数据
- 系统内置提供了一批connectors,连接器会提供对应的source支持【kafka】
- Apache Kafka (source/sink)
Apache Cassandra (sink)
Elasticsearch (sink)
Hadoop FileSystem (sink)
RabbitMQ (source/sink)
Apache ActiveMQ (source/sink)
Redis (sink)
自定义source的实现
没有并行度的数据源
publicclassMyNoParalleSourceimplementsSourceFunction<Long>{ privatelongcount=1L; privatebooleanisRunning=true; /*** 主要的方法* 启动一个source* 大部分情况下,都需要在这个run方法中实现一个循环,这样就可以循环产生数据了** @param ctx* @throws Exception*/publicvoidrun(SourceContext<Long>ctx) throwsException { while(isRunning){ ctx.collect(count); count++; //每秒产生一条数据Thread.sleep(1000); } } /*** 取消一个cancel的时候会调用的方法**/publicvoidcancel() { isRunning=false; } }
测试程序
publicstaticvoidmain(String[] args) throwsException { //获取Flink的运行环境StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment(); //获取数据源DataStreamSource<Long>text=env.addSource(newMyNoParalleSource()).setParallelism(1);//注意:针对此source,并行度只能设置为1DataStream<Long>num=text.map(newMapFunction<Long, Long>() { publicLongmap(Longvalue) throwsException { System.out.println("接收到数据:"+value); returnvalue; } }); //每2秒钟处理一次数据DataStream<Long>sum=num.timeWindowAll(Time.seconds(2)).sum(0); //打印结果sum.print().setParallelism(1); StringjobName=StreamingDemoWithMyNoPralalleSource.class.getSimpleName(); env.execute(jobName); } }
测试结果
有并行度的数据
publicclassMyParalleSourceimplementsParallelSourceFunction<Long> { privatelongcount=1L; privatebooleanisRunning=true; /*** 主要的方法* 启动一个source* 大部分情况下,都需要在这个run方法中实现一个循环,这样就可以循环产生数据了** @param ctx* @throws Exception*/publicvoidrun(SourceContext<Long>ctx) throwsException { while(isRunning){ ctx.collect(count); count++; //每秒产生一条数据Thread.sleep(1000); } } /*** 取消一个cancel的时候会调用的方法**/publicvoidcancel() { isRunning=false; } }
测试代码
publicclassStreamingDemoWithMyPralalleSource { publicstaticvoidmain(String[] args) throwsException { //获取Flink的运行环境StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment(); //获取数据源DataStreamSource<Long>text=env.addSource(newMyParalleSource()).setParallelism(2);//主要是这里的不同,这里设置的并行度是2,首先数据源是一个并行的数据源,然后在设置你用几个平行去接这个数据源DataStream<Long>num=text.map(newMapFunction<Long, Long>() { publicLongmap(Longvalue) throwsException { System.out.println("接收到数据:"+value); returnvalue; } }); //每2秒钟处理一次数据DataStream<Long>sum=num.timeWindowAll(Time.seconds(2)).sum(0); //打印结果sum.print().setParallelism(1); StringjobName=StreamingDemoWithMyPralalleSource.class.getSimpleName(); env.execute(jobName); } }
测试结果
高级有并行的实现
/*** 自定义实现一个支持并行度的source** RichParallelSourceFunction 会额外提供open和close方法* 针对source中如果需要获取其他链接资源,那么可以在open方法中获取资源链接,在close中关闭资源链接** Created by xuwei.tech on 2018/10/23.*/publicclassMyRichParalleSourceextendsRichParallelSourceFunction<Long> { privatelongcount=1L; privatebooleanisRunning=true; /*** 主要的方法* 启动一个source* 大部分情况下,都需要在这个run方法中实现一个循环,这样就可以循环产生数据了** @param ctx* @throws Exception*/publicvoidrun(SourceContext<Long>ctx) throwsException { while(isRunning){ ctx.collect(count); count++; //每秒产生一条数据Thread.sleep(1000); } } /*** 取消一个cancel的时候会调用的方法**/publicvoidcancel() { isRunning=false; } /*** 这个方法只会在最开始的时候被调用一次* 实现获取链接的代码* @param parameters* @throws Exception*/publicvoidopen(Configurationparameters) throwsException { System.out.println("open............."); super.open(parameters); } /*** 实现关闭链接的代码* @throws Exception*/publicvoidclose() throwsException { super.close(); } }
测试代码
publicclassStreamingDemoWithMyRichPralalleSource { publicstaticvoidmain(String[] args) throwsException { //获取Flink的运行环境StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment(); //获取数据源DataStreamSource<Long>text=env.addSource(newMyRichParalleSource()).setParallelism(2); DataStream<Long>num=text.map(newMapFunction<Long, Long>() { publicLongmap(Longvalue) throwsException { System.out.println("接收到数据:"+value); returnvalue; } }); //每2秒钟处理一次数据DataStream<Long>sum=num.timeWindowAll(Time.seconds(2)).sum(0); //打印结果sum.print().setParallelism(1); StringjobName=StreamingDemoWithMyRichPralalleSource.class.getSimpleName(); env.execute(jobName); } }
测试结果
6、DataStream API之Transformations
map:输入一个元素,然后返回一个元素,中间可以做一些清洗转换等操作
flatmap:输入一个元素,可以返回零个,一个或者多个元素
filter:过滤函数,对传入的数据进行判断,符合条件的数据会被留下
keyBy:根据指定的key进行分组,相同key的数据会进入同一个分区
两种典型用法
dataStream.keyBy("someKey") // 指定对象中的 "someKey"字段作为分组key
dataStream.keyBy(0) //指定Tuple中的第一个元素作为分组key
注意:以下类型是无法作为key的
1:一个实体类对象,没有重写hashCode方法,并且依赖object的hashCode方法
2:一个任意形式的数组类型
3:基本数据类型,int,long
reduce:对数据进行聚合操作,结合当前元素和上一次reduce返回的值进行聚合操作,然后返回一个新的值
aggregations:sum(),min(),max()等
window:在后面单独详解
Union:合并多个流,新的流会包含所有流中的数据,但是union是一个限制,就是所有合并的流类型必须是一致的。
运行结果
Connect:和union类似,但是只能连接两个流,两个流的数据类型可以不同,会对两个流中的数据应用不同的处理方法。
运行结果
- CoMap, CoFlatMap:在ConnectedStreams中需要使用这种函数,类似于map和flatmap
Split:根据规则把一个数据流切分为多个流
- Select:和split配合使用,选择切分后的流
- 应用场景:
* 可能在实际工作中,源数据流中混合了多种类似的数据,多种类型的数据处理规则不一样,所以就可以在根据一定的规则,
* 把一个数据流切分成多个数据流,这样每个数据流就可以使用不用的处理逻辑了
测试代码如下
运行结果
7、DataStream API之Data Sink
writeAsText():将元素以字符串形式逐行写入,这些字符串通过调用每个元素的toString()方法来获取
print() / printToErr():打印每个元素的toString()方法的值到标准输出或者标准错误输出流中
自定义输出addSink【kafka、redis】
内置Connectors
Apache Kafka (source/sink)
Apache Cassandra (sink)
Elasticsearch (sink)
Hadoop FileSystem (sink)
RabbitMQ (source/sink)
Apache ActiveMQ (source/sink)
Redis (sink)
Sink 容错性保证
Sink |
语义保证 |
备注 |
hdfs |
exactly once |
|
elasticsearch |
at least once |
|
kafka produce |
at least once/exactly once |
Kafka 0.9 and 0.10提供at least once Kafka 0.11提供exactly once |
file |
at least once |
|
redis |
at least once |
publicclassStreamingDemoToRedis { publicstaticvoidmain(String[] args) throwsException{ StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String>text=env.socketTextStream("hadoop100", 9000, "\n"); //lpsuh l_words word//对数据进行组装,把string转化为tuple2<String,String>DataStream<Tuple2<String, String>>l_wordsData=text.map(newMapFunction<String, Tuple2<String, String>>() { publicTuple2<String, String>map(Stringvalue) throwsException { returnnewTuple2<>("l_words", value); } }); //创建redis的配置FlinkJedisPoolConfigconf=newFlinkJedisPoolConfig.Builder().setHost("hadoop110").setPort(6379).build(); //创建redissinkRedisSink<Tuple2<String, String>>redisSink=newRedisSink<>(conf, newMyRedisMapper()); l_wordsData.addSink(redisSink); env.execute("StreamingDemoToRedis"); } publicstaticclassMyRedisMapperimplementsRedisMapper<Tuple2<String, String>>{ //表示从接收的数据中获取需要操作的redis keypublicStringgetKeyFromData(Tuple2<String, String>data) { returndata.f0; } //表示从接收的数据中获取需要操作的redis valuepublicStringgetValueFromData(Tuple2<String, String>data) { returndata.f1; } //你要怎么组好publicRedisCommandDescriptiongetCommandDescription() { returnnewRedisCommandDescription(RedisCommand.LPUSH); } } }
篇幅过长,剩余在下一篇文档。