Flink 作为数据处理框架,最终还是要把计算处理的结果写入外部存储,为外部应用提供支持。我们已经了解了 Flink 程序如何对数据进行读取、转换等操作,最后一步当然就应该将结果数据保存或输出到外部系统了。
连接到外部系统
在 Flink 中,如果我们希望将数据写入外部系统,其实并不是一件难事。我们知道所有算子都可以通过实现函数类来自定义处理逻辑,所以只要有读写客户端,与外部系统的交互在任何一个处理算子中都可以实现。例如在 MapFunction 中,我们完全可以构建一个到 Redis 的连接,然后将当前处理的结果保存到 Redis 中。如果考虑到只需建立一次连接,我们也可以利用RichMapFunction,在 open() 生命周期中做连接操作。
这样看起来很方便,却会带来很多问题。Flink 作为一个快速的分布式实时流处理系统,对稳定性和容错性要求极高。一旦出现故障,我们应该有能力恢复之前的状态,保障处理结果的正确性。这种性质一般被称作“状态一致性”。Flink 内部提供了一致性检查点(checkpoint)来保障我们可以回滚到正确的状态;但如果我们在处理过程中任意读写外部系统,发生故障后就很难回退到从前了。
为了避免这样的问题,Flink 的 DataStream API 专门提供了向外部写入数据的方法:addSink。与 addSource 类似,addSink 方法对应着一个“Sink”算子,主要就是用来实现与外部系统连接、并将数据提交写入的;Flink 程序中所有对外的输出操作,一般都是利用 Sink 算 子完成的。
Sink 一词有“下沉”的意思,有些资料会相对于“数据源”把它翻译为“数据汇”。不论怎样理解,Sink 在 Flink 中代表了将结果数据收集起来、输出到外部的意思,所以我们这里统一把它直观地叫作“输出算子”。
之前我们一直在使用的 print 方法其实就是一种 Sink,它表示将数据流写入标准控制台打印输出。查看源码可以发现,print 方法返回的就是一个 DataStreamSink。
@PublicEvolving public DataStreamSink<T> print(String sinkIdentifier) { PrintSinkFunction<T> printFunction = new PrintSinkFunction(sinkIdentifier, false); return this.addSink(printFunction).name("Print to Std. Out"); }
与 Source 算子非常类似,除去一些 Flink 预实现的 Sink,一般情况下 Sink 算子的创建是通过调用 DataStream 的.addSink()方法实现的。
stream.addSink(new SinkFunction(…));
addSource 的参数需要实现一个 SourceFunction 接口;类似地,addSink 方法同样需要传入一个参数,实现的是 SinkFunction 接口。在这个接口中只需要重写一个方法 invoke(),用来将指定的值写入到外部系统中。这个方法在每条数据记录到来时都会调用:
当然,SinkFuntion 多数情况下同样并不需要我们自己实现。Flink 官方提供了一部分的框架的 Sink 连接器。
我们可以看到,像 Kafka 之类流式系统,Flink 提供了完美对接,source/sink 两端都能连接,可读可写;而对于 Elasticsearch、文件系统(FileSystem)、JDBC 等数据存储系统,则只提供了输出写入的 sink 连接器。
除 Flink 官方之外,Apache Bahir 作为给 Spark 和 Flink 提供扩展支持的项目,也实现了一些其他第三方系统与 Flink 的连接器。
除此以外,就需要用户自定义实现 sink 连接器了。
输出到kafka
Kafka 是一个分布式的基于发布/订阅的消息系统,本身处理的也是流式数据,所以跟Flink“天生一对”,经常会作为 Flink 的输入数据源和输出系统。Flink 官方为 Kafka 提供了 Source和 Sink 的连接器,我们可以用它方便地从 Kafka 读写数据。如果仅仅是支持读写,那还说明不了 Kafka 和 Flink 关系的亲密;真正让它们密不可分的是,Flink 与 Kafka 的连接器提供了端到端的精确一次(exactly once)语义保证,这在实际项目中是最高级别的一致性保证。
如果没有安装kafka请移步:https://blog.csdn.net/weixin_47491957/article/details/124319297
代码运行的前提,kafka创建了test topic,并且开启消费者,进行监听,命令可以查看上方链接。
import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import java.util.Properties; public class SinkToKafkaTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); Properties properties = new Properties(); properties.put("bootstrap.servers", "master:9092"); DataStream<String> stream = env.socketTextStream("localhost", 7777); stream.addSink(new FlinkKafkaProducer<String>( "test", new SimpleStringSchema(), properties )); env.execute(); } }
这里我们可以看到,addSink 传入的参数是一个 FlinkKafkaProducer。这也很好理解,因为需要向 Kafka 写入数据,自然应该创建一个生产者。FlinkKafkaProducer 继承了抽象类TwoPhaseCommitSinkFunction,这是一个实现了“两阶段提交”的 RichSinkFunction。两阶段提
交提供了 Flink 向 Kafka 写入数据的事务性保证,能够真正做到精确一次(exactly once)的状态一致性。
自定义输出
如果我们想将数据存储到我们自己的存储设备中,而 Flink 并没有提供可以直接使用的连接器,又该怎么办呢?
与 Source 类似,Flink 为我们提供了通用的 SinkFunction 接口和对应的 RichSinkDunction抽象类,只要实现它,通过简单地调用DataStream 的.addSink()方法就可以自定义写入任何外部存储。之前与外部系统的连接,其实都是连接器帮我们实现了 SinkFunction,现在既然没有现成的,我们就只好自力更生了。
本身flink有jdbc连接器,这里我以如何用mybatis进行输出作为例子。
首先导入依赖
<dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.22</version> </dependency> <dependency> <groupId>org.mybatis</groupId> <artifactId>mybatis</artifactId> <version>3.5.9</version> </dependency>
配置文件
<?xml version="1.0" encoding="UTF-8" ?> <!DOCTYPE configuration PUBLIC "-//mybatis.org//DTD Config 3.0//EN" "http://mybatis.org/dtd/mybatis-3-config.dtd"> <configuration> <settings> <!-- 是否开启驼峰命名自动映射, 即从经典数据库列名 A_COLUMN 映射到经典 Java 属性名 aColumn。 --> <setting name="mapUnderscoreToCamelCase" value="true"/> </settings> <!--设置别名,方便调用--> <typeAliases> <package name="com.test.stream.entity"/> </typeAliases> <!--配置环境,可以设置多个环境,然后在default里面去切换就可以了--> <environments default="development"> <environment id="development"> <transactionManager type="JDBC"/> <dataSource type="POOLED"> <property name="driver" value="com.mysql.cj.jdbc.Driver"/> <property name="url" value="jdbc:mysql://master:3306/event?useUnicode=true&characterEncoding=utf8&allowMultiQueries=true&serverTimezon=Asia/Shanghai"/> <property name="username" value="root"/> <property name="password" value="123456"/> </dataSource> </environment> </environments> <!--每一个mapper.xml需要有一个对应的mapper--> <mappers> <mapper resource="com/tset/stream/mapper/BsEventLevelMapper.xml"/> </mappers> </configuration>
获取SqlSession的工具类
import org.apache.ibatis.io.Resources; import org.apache.ibatis.session.SqlSession; import org.apache.ibatis.session.SqlSessionFactory; import org.apache.ibatis.session.SqlSessionFactoryBuilder; import java.io.IOException; import java.io.InputStream; public class MybatisUtil { private static SqlSessionFactory sqlSessionFactory; static { try { /* 由于文件是存放在resources路径下的, 所以此处默认是指向resources下的文件, 故只写对应的文件名就能读取到指定的文件了 */ String resource = "mybatis-config.xml"; InputStream inputStream = Resources.getResourceAsStream(resource); sqlSessionFactory = new SqlSessionFactoryBuilder().build(inputStream); } catch (IOException e) { e.printStackTrace(); } } public static SqlSession getSqlSession() { //设置为true是为了进行增删改的时候可以自动提交事务 return sqlSessionFactory.openSession(true); } }
这里创建类继承RichSinkFunction类(泛型就是要处理的数据类型)
重写三个方法:open、close、invoke
open和close:都是该sink初始化和销毁的时候执行,一般用作开启会话和结束会话。
invoke:真正执行的地方
import com.cstor.stream.entity.BsEvent; import com.cstor.stream.mapper.BsEventMapper; import com.cstor.stream.util.MybatisUtil; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.ibatis.session.SqlSession; public class MysqlSink extends RichSinkFunction<BsEvent> { SqlSession sqlSession; BsEventMapper bsEventMapper; @Override public void open(Configuration parameters) throws Exception { sqlSession = MybatisUtil.getSqlSession(); bsEventMapper = sqlSession.getMapper(BsEventMapper.class); } @Override public void close() throws Exception { sqlSession.close(); } @Override public void invoke(BsEvent value, Context context) throws Exception { //对应处理逻辑 } }
最终在写出的时候,添加sink即可
formatEvents.addSink(new MysqlSink());