(1)Flink Source之文件数据源
Flink系统支持将文件内容读取到系统中,并转换成分布式数据集DataStream进行数据处理。
在 DataStream API中,可以在read File方法中指定文件读取类型(Watch Type)检测文件变换时间间隔
(interva)、文件路径过滤条件(File Filter)等参数,其中Watch Type共分为两种模式:
PROCESS CONTINUOUSLY和 PROCESS ONCE模式在 PROCESS CONTINUOUSLY模式下,一且检测到文件内容发生变化,Fink会将该文件全部内容加载到Fink系统中进行处理。而在PROCESS ONCE模式下,当文件内容发生变化时,只会将变化的数据读取至Fink中,在这种情况下数据只会被读取和处理一次。
可以看出,在PROCESS CONTINUOUSLY模式下是无法实现Excatly Once级别数据一致性保障的,而在PROCESSONCE模式,可以保证数据Excatly Once级别的一致性保证。但是需要注意的是,如果使用文件作为数据源,当某个节点异常停止的时候,这种情况下 Checkpoints不会更新,如果数据一直不断地在生成将导致该节点数据形成积压,可能需要耗费非常长的时间从最新的checkpoint中恢复应用。
示例代码:
package com.aikfk.flink.datastream.source; import org.apache.flink.api.java.io.TextInputFormat; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.fs.Path; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; /** * @author :caizhengjie * @description:TODO * @date :2021/3/10 9:29 下午 */ public class FileSource { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<String> stream = env.readFile( new TextInputFormat(new Path("/Users/caizhengjie/IdeaProjects/aikfk_flink/src/main/java/resources/employee.csv")) , "/Users/caizhengjie/IdeaProjects/aikfk_flink/src/main/java/resources/employee.csv"); stream.print(); env.execute("stream"); } }
(2)Flink Source之Socket数据源
Flink支持从Socket端口中接入数据,在StreamExecutionEnvironment调用socketTextStream方法。该方法参数分别为Ip地址和端口,也可以同时传人字符串切割符delimiter和最大尝试次数maxRetry,其中delimiter负责将数据切割成Records数据格式;maxRetry在端口异常的情况,通过指定次数进行重连,如果设定为0,则Flink程序直接停止,不再尝试和端口进行重连。如下代码是使用socketTextStream方法实现了将数据从本地9999ロ中接入数据并转换成 DataStream数据集的操作:
DataStream<String> stream = env.socketTextStream("localhost",9999);
在linux系统下执行
nc -lk 9999
示例代码:
package com.aikfk.flink.datastream.source; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; /** * @author :caizhengjie * @description:TODO * @date :2021/3/10 9:29 下午 */ public class SocketSource { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<String> stream = env.socketTextStream("bigdata-pro-m07",9999); stream.print(); env.execute("stream"); } }
(3)Flink Source之集合数据源
Fink可以直接将Java或Scaa程序中集合类(Collection)转换成 DataStream数据集,本质上是将本地集合中的数据分发到远端并行执行的节点中。目前Fink支持从Java util。 Collection和 Javautil。 Iterator序列中转换成DataStream数据集。这种方式非常适合调式Flink本地程序,但需要注意的是,集合内容的数据结构类型必须要一致,否则可能会出现数据转换异常。
示例代码:
package com.aikfk.flink.datastream.source; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; /** * @author :caizhengjie * @description:TODO * @date :2021/3/10 9:29 下午 */ public class CollectionSource { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Tuple2<String,Integer>> stream = env.fromElements( new Tuple2<>("spark",2), new Tuple2<>("hbase",3) ); stream.print(); env.execute("stream"); } }
(4)Flink Source之外部数据源
前面提到的数据源类型都是一些基本的数据接入方式,例如从文件、Socket端口中接入数据,其实质是实现了不同的 SourceFunction, Fink将其封装成高级API,减少了用户的使用成本。对于流式计算类型的应用,数据大部分都是从外部第三方系统中获取,为此Flink通过实现 Source Function定义了非常丰富的第三方数据连接器,基本覆盖了大部分的高性能存储介质以及中间件等,其中部分连接器是仅支持读取数据,例如 TwitterStreaming API、Nety等;另外一部分仅支持数据输出(smhk,不支持数据输入(s)例如 Apache Cassandra、 Elasticsearch、 Hadoop FileSystem等。还有一部分是既支持数据输入,也支持数据输出,例如 Apache Kafka、Amazon Kinesis、 RabbitMQ等连接器。
示例代码:
package com.aikfk.flink.datastream.source; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; /** * @author :caizhengjie * @description:TODO * @date :2021/3/10 9:38 下午 */ public class OutSource { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Tuple2<Integer,Integer>> stream = env.addSource(new OutSourceFunction()); stream.print(); env.execute("stream"); } }
package com.aikfk.flink.datastream.source; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.functions.source.SourceFunction; /** * @author :caizhengjie * @description:TODO * @date :2021/3/10 9:39 下午 */ public class OutSourceFunction implements SourceFunction<Tuple2<Integer,Integer>> { private int count = 0; @Override public void run(SourceContext sourceContext) throws Exception { while (count < 1000){ int first = (int) (Math.random() * 10); sourceContext.collect(new Tuple2<>(first,first)); count++; Thread.sleep(100L); } } @Override public void cancel() { } }