(1)对于异步 I/O 操作的需求
在与外部系统交互(用数据库中的数据扩充流数据)的时候,需要考虑与外部系统的通信延迟对整个流处理应用的影响。
简单地访问外部数据库的数据,比如使用 MapFunction,通常意味着同步交互: MapFunction 向数据库发送一个请求然后一直等待,直到收到响应。在许多情况下,等待占据了函数运行的大部分时间。
与数据库异步交互是指一个并行函数实例可以并发地处理多个请求和接收多个响应。这样,函数在等待的时间可以发送其他请求和接收其他响应。至少等待的时间可以被多个请求摊分。大多数情况下,异步交互可以大幅度提高流处理的吞吐量。
注意:仅仅提高 MapFunction 的并行度(parallelism)在有些情况下也可以提升吞吐量,但是这样做通常会导致非常高的资源消耗:更多的并行 MapFunction 实例意味着更多的 Task、更多的线程、更多的 Flink 内部网络连接、 更多的与数据库的网络连接、更多的缓冲和更多程序内部协调的开销。
(2)异步 I/O API
Flink 的异步 I/O API 允许用户在流处理中使用异步请求客户端。API 处理与数据流的集成,同时还能处理好顺序、事件时间和容错等。
在具备异步数据库客户端的基础上,实现数据流转换操作与数据库的异步 I/O 交互需要以下三部分:
实现分发请求的 AsyncFunction
获取数据库交互的结果并发送给 ResultFuture 的 回调 函数
将异步 I/O操作应用于 DataStream 作为 DataStream 的一次转换操作。
下面是基本的代码模板:
// 这个例子使用 Java 8 的 Future 接口(与 Flink 的 Future 相同)实现了异步请求和回调。 /** * 实现 'AsyncFunction' 用于发送请求和设置回调。 */ class AsyncDatabaseRequest extends RichAsyncFunction<String, Tuple2<String, String>> { /** 能够利用回调函数并发发送请求的数据库客户端 */ private transient DatabaseClient client; @Override public void open(Configuration parameters) throws Exception { client = new DatabaseClient(host, post, credentials); } @Override public void close() throws Exception { client.close(); } @Override public void asyncInvoke(String key, final ResultFuture<Tuple2<String, String>> resultFuture) throws Exception { // 发送异步请求,接收 future 结果 final Future<String> result = client.query(key); // 设置客户端完成请求后要执行的回调函数 // 回调函数只是简单地把结果发给 future CompletableFuture.supplyAsync(new Supplier<String>() { @Override public String get() { try { return result.get(); } catch (InterruptedException | ExecutionException e) { // 显示地处理异常。 return null; } } }).thenAccept( (String dbResult) -> { resultFuture.complete(Collections.singleton(new Tuple2<>(key, dbResult))); }); } } // 创建初始 DataStream DataStream<String> stream = ...; // 应用异步 I/O 转换操作 DataStream<Tuple2<String, String>> resultStream = AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100);
重要提示: 第一次调用 ResultFuture.complete 后 ResultFuture 就完成了。 后续的 complete 调用都将被忽略。
下面两个参数控制异步操作:
Timeout: 超时参数定义了异步请求发出多久后未得到响应即被认定为失败。 它可以防止一直等待得不到响应的请求。
Capacity: 容量参数定义了可以同时进行的异步请求数。 即使异步 I/O 通常带来更高的吞吐量,执行异步 I/O
操作的算子仍然可能成为流处理的瓶颈。 限制并发请求的数量可以确保算子不会持续累积待处理的请求进而造成积压,而是在容量耗尽时触发反压。
超时处理
当异步 I/O 请求超时的时候,默认会抛出异常并重启作业。 如果你想处理超时,可以重写 AsyncFunction#timeout 方法。
结果的顺序
AsyncFunction 发出的并发请求经常以不确定的顺序完成,这取决于请求得到响应的顺序。 Flink 提供两种模式控制结果记录以何种顺序发出。
无序模式: 异步请求一结束就立刻发出结果记录。 流中记录的顺序在经过异步 I/O 算子之后发生了改变。 当使用 处理时间
作为基本时间特征时,这个模式具有最低的延迟和最少的开销。 此模式使用 AsyncDataStream.unorderedWait(…)方法。
有序模式: 这种模式保持了流的顺序。发出结果记录的顺序与触发异步请求的顺序(记录输入算子的顺序)相同。为了实现这一点,算子将缓冲一个结果记录直到这条记录前面的所有记录都发出(或超时)。由于记录或者结果要在checkpoint 的状态中保存更长的时间,所以与无序模式相比,有序模式通常会带来一些额外的延迟和 checkpoint开销。此模式使用AsyncDataStream.orderedWait(…) 方法。
事件时间
当流处理应用使用事件时间时,异步 I/O 算子会正确处理 watermark。对于两种顺序模式,这意味着以下内容:
无序模式: Watermark 既不超前于记录也不落后于记录,即 watermark 建立了顺序的边界。 只有连续两个 watermark
之间的记录是无序发出的。 在一个 watermark 后面生成的记录只会在这个 watermark 发出以后才发出。 在一个
watermark 之前的所有输入的结果记录全部发出以后,才会发出这个 watermark。
这意味着存在 watermark 的情况下,无序模式 会引入一些与有序模式 相同的延迟和管理开销。开销大小取决于 watermark 的频率。
有序模式: 连续两个 watermark 之间的记录顺序也被保留了。开销与使用处理时间 相比,没有显著的差别。
请记住,摄入时间 是一种特殊的事件时间,它基于数据源的处理时间自动生成 watermark。
容错保证
异步 I/O 算子提供了完全的精确一次容错保证。它将在途的异步请求的记录保存在 checkpoint 中,在故障恢复时重新触发请求。
(3)代码实现
package com.aikfk.flink.dataStream.function; import com.aikfk.flink.base.MySource; import com.alibaba.druid.pool.DruidDataSource; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.AsyncDataStream; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.async.ResultFuture; import org.apache.flink.streaming.api.functions.async.RichAsyncFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.Collections; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; public class AsynchronousFunction { public static void main(String[] args)throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<String> dataStream = env.addSource(new MySource()); DataStream<Tuple2<String,String>> async = AsyncDataStream.unorderedWait(dataStream , new AsyncDatabaseRequest() , 10000, TimeUnit.MICROSECONDS , 100); async.print(); env.execute("asyncForMysql"); } static class AsyncDatabaseRequest extends RichAsyncFunction<String,Tuple2<String,String>>{ private DruidDataSource dataSource ; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); dataSource = new DruidDataSource(); dataSource.setDriverClassName("com.mysql.jdbc.Driver"); dataSource.setUrl("jdbc:mysql://bigdata-pro-m03.kfk.com/db_novel?createDatabaseIfNotExist=true"); dataSource.setUsername("root"); dataSource.setPassword("123456"); } @Override public void close() throws Exception { super.close(); dataSource.close(); } @Override public void asyncInvoke(String key_id, ResultFuture<Tuple2<String,String>> resultFuture) throws Exception { String chapter_name = queryFormMysql(key_id); CompletableFuture.supplyAsync(new Supplier<Tuple2<String,String>>() { @Override public Tuple2<String,String> get() { return new Tuple2<>(key_id , chapter_name); } }).thenAccept(dbResult -> { resultFuture.complete(Collections.singleton(dbResult)); }); } private String queryFormMysql(String id) throws SQLException{ String sql = "select chapter_name from novel_detail where id =? "; Connection connection = null ; PreparedStatement stmt = null ; ResultSet rs = null ; String result_name = null ; try{ connection = dataSource.getConnection(); stmt = connection.prepareStatement(sql); stmt.setString(1,id); rs = stmt.executeQuery() ; if (rs.next()){ result_name = rs.getString("chapter_name") ; } }catch (SQLException e){ e.printStackTrace(); }finally { if(rs != null){ rs.close(); } if (stmt != null){ stmt.close(); } if (connection != null){ connection.close(); } } return result_name ; } } static class MySource implements SourceFunction<String> { @Override public void cancel() { } @Override public void run(SourceContext<String> ctx) throws Exception { String[] datas = { "1", "2", "3" }; for (int k = 0; k < datas.length; k++) { Thread.sleep(100); ctx.collect(datas[k]); } } } }