01 引言
源码地址,一键下载可用:https://gitee.com/shawsongyue/aurora.git 模块:aurora_flink 主类:FlinkListSourceJob(集合)
02 简介概述
1.Source 是Flink程序从中读取其输入数据的地方。你可以用 StreamExecutionEnvironment.addSource(sourceFunction) 将一个 source 关联到你的程序。 2.Flink 自带了许多预先实现的 source functions,不过你仍然可以通过实现 SourceFunction 接口编写自定义的非并行 source。 3.也可以通过实现 ParallelSourceFunction 接口或者继承 RichParallelSourceFunction 类编写自定义的并行 sources。
03 基于集合读取数据
3.1 集合创建数据流
fromCollection(Collection)函数 从 Java Java.util.Collection 创建数据流。集合中的所有元素必须属于同一类型
3.2 迭代器创建数据流
fromCollection(Iterator, Class) 从迭代器创建数据流。class 参数指定迭代器返回元素的数据类型。
3.3 给定对象创建数据流
fromElements(T ...) 从给定的对象序列中创建数据流。所有的对象必须属于同一类型。
3.4 迭代并行器创建数据流
注意!使用迭代器的时候对象必须是实现持久化的,否则报错,详情可以看我的另外一篇文章、
错误:org.apache.flink.api.common.InvalidProgramException: java.util.Arrays$ArrayItr@784c3487 is not serializable
fromParallelCollection(SplittableIterator, Class) 从迭代器并行创建数据流。class 参数指定迭代器返回元素的数据类型
3.5 基于时间间隔创建数据流
generateSequence 基于给定间隔内的数字序列并行生成数据流。
3.6 自定义数据流
addSource - 关联一个新的 source function。例如,你可以使用 addSource(new FlinkKafkaConsumer<>(...)) 来从 Apache Kafka 获取数据。更多详细信息见连接器。
04 源码实战demo
4.1 pom.xml依赖
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.xsy</groupId> <artifactId>aurora_flink</artifactId> <version>1.0-SNAPSHOT</version> <!--属性设置--> <properties> <!--java_JDK版本--> <java.version>11</java.version> <!--maven打包插件--> <maven.plugin.version>3.8.1</maven.plugin.version> <!--编译编码UTF-8--> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <!--输出报告编码UTF-8--> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <!--json数据格式处理工具--> <fastjson.version>1.2.75</fastjson.version> <!--log4j版本--> <log4j.version>2.17.1</log4j.version> <!--flink版本--> <flink.version>1.18.0</flink.version> <!--scala版本--> <scala.binary.version>2.11</scala.binary.version> <!--log4j依赖--> <log4j.version>2.17.1</log4j.version> </properties> <!--通用依赖--> <dependencies> <!-- json --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>${fastjson.version}</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.12</artifactId> <version>${flink.version}</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>${flink.version}</version> </dependency> <!--================================集成外部依赖==========================================--> <!--集成日志框架 start--> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-slf4j-impl</artifactId> <version>${log4j.version}</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-api</artifactId> <version>${log4j.version}</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>${log4j.version}</version> </dependency> <!--集成日志框架 end--> </dependencies> <!--编译打包--> <build> <finalName>${project.name}</finalName> <!--资源文件打包--> <resources> <resource> <directory>src/main/resources</directory> </resource> <resource> <directory>src/main/java</directory> <includes> <include>**/*.xml</include> </includes> </resource> </resources> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.1.1</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <artifactSet> <excludes> <exclude>org.apache.flink:force-shading</exclude> <exclude>org.google.code.flindbugs:jar305</exclude> <exclude>org.slf4j:*</exclude> <excluder>org.apache.logging.log4j:*</excluder> </excludes> </artifactSet> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>org.xsy.sevenhee.flink.TestStreamJob</mainClass> </transformer> </transformers> </configuration> </execution> </executions> </plugin> </plugins> <!--插件统一管理--> <pluginManagement> <plugins> <!--maven打包插件--> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <version>${spring.boot.version}</version> <configuration> <fork>true</fork> <finalName>${project.build.finalName}</finalName> </configuration> <executions> <execution> <goals> <goal>repackage</goal> </goals> </execution> </executions> </plugin> <!--编译打包插件--> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>${maven.plugin.version}</version> <configuration> <source>${java.version}</source> <target>${java.version}</target> <encoding>UTF-8</encoding> <compilerArgs> <arg>-parameters</arg> </compilerArgs> </configuration> </plugin> </plugins> </pluginManagement> </build> <!--配置Maven项目中需要使用的远程仓库--> <repositories> <repository> <id>aliyun-repos</id> <url>https://maven.aliyun.com/nexus/content/groups/public/</url> <snapshots> <enabled>false</enabled> </snapshots> </repository> </repositories> <!--用来配置maven插件的远程仓库--> <pluginRepositories> <pluginRepository> <id>aliyun-plugin</id> <url>https://maven.aliyun.com/nexus/content/groups/public/</url> <snapshots> <enabled>false</enabled> </snapshots> </pluginRepository> </pluginRepositories> </project>
4.2 创建集合数据流作业
注意:Flink根据集群撇嘴可能会启动多个并行度运行,可能导致数据重复处理,可以通过.setParallelism(1)设置为一个平行度运行即可
package com.aurora.source; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.util.NumberSequenceIterator; import org.apache.flink.util.SplittableIterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.sql.DataSource; import java.util.*; /** * @description flink的list集合source应用 * @author 浅夏的猫 * @datetime 23:03 2024/1/28 */ public class FlinkListSourceJob { private static final Logger logger = LoggerFactory.getLogger(FlinkListSourceJob.class); public static void main(String[] args) throws Exception { //1.创建Flink运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //2.设置Flink运行模式: //STREAMING-流模式,BATCH-批模式,AUTOMATIC-自动模式(根据数据源的边界性来决定使用哪种模式) env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC); List<String> list = Arrays.asList("测试", "开发", "运维"); // 01 从集合创建数据流 DataStreamSource<String> dataStreamSource_01 = env.fromCollection(list); // 02 从迭代器创建数据流,这里直接使用list的迭代器会报错,因为没有ArrayList没有进行持久化,需要深入了解的,可以看我的另外一篇文章 // DataStreamSource<String> dataStreamSource_02 = env.fromCollection(list.iterator(),String.class); // 03 从给定的对象序列中创建数据流 DataStreamSource<String> dataStreamSource_03 = env.fromElements("测试", "开发", "运维"); // 04 从迭代器并行创建数据流 NumberSequenceIterator splittableIterator = new NumberSequenceIterator(1,10); DataStreamSource dataStreamSource_04=env.fromParallelCollection(splittableIterator,Long.TYPE); // 05 基于给定间隔内的数字序列并行生成数据流 DataStreamSource<Long> dataStreamSource_05 = env.generateSequence(1, 10); //自定义数据流 DataStreamSource<String> dataStreamSource_06 = env.addSource(new SourceFunction<String>() { @Override public void run(SourceContext<String> sourceContext) throws Exception { //自定义你自己的数据来源 for (int i = 0; i < 10; i++) { sourceContext.collect("测试数据" + i); } } @Override public void cancel() { } }); //5.输出打印 dataStreamSource_01.print(); // dataStreamSource_02.print(); dataStreamSource_03.print(); dataStreamSource_04.print(); dataStreamSource_05.print(); dataStreamSource_06.print(); //6.启动运行 env.execute(); } }
4.3 运行结果日志