连接到外部系统
在 Table API编写的 Flink 程序中,可以在创建表的时候用 WITH 子句指定连接器(connector),这样就可以连接到外部系统进行数据交互了。
其中最简单的当然就是连接到控制台打印输出:
CREATE TABLE ResultTable ( user STRING, cnt BIGINT WITH ( 'connector' = 'print' );
Kafka
需要导入maven依赖:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency>
创建一个连接到 Kafka 表,需要在 CREATE TABLE 的 DDL 中在 WITH 子句里指定连接器为 Kafka,并定义必要的配置参数。
CREATE TABLE KafkaTable ( `user` STRING, `url` STRING, `ts` TIMESTAMP(3) METADATA FROM 'timestamp' ) WITH ( 'connector' = 'kafka', 'topic' = 'events', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'earliest-offset', 'format' = 'csv' )
MySQL
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency>
创建 JDBC 表的方法与前面Kafka 大同小异:
-- 创建一张连接到 MySQL 的 表 CREATE TABLE MyTable ( id BIGINT, name STRING, age INT, status BOOLEAN, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://localhost:3306/mydatabase', 'table-name' = 'users' ); -- 将另一张表 T 的数据写入到 MyTable 表中 INSERT INTO MyTable SELECT id, name, age, status FROM T;
Table API实战
在Flink中创建一张表有两种方法:
- 从一个文件中导入表结构(Structure)(常用于批计算)(静态)
- 从DataStream或者DataSet转换成Table (动态)
1.创建Table
Table API中已经提供了TableSource从外部系统获取数据,例如常见的数据库、文件系统和Kafka消息队列等外部系统。
- 从文件中创建Table(静态表)
Flink允许用户从本地或者分布式文件系统中读取和写入数据,在Table API中可以通过CsvTableSource类来创建,只需指定相应的参数即可。但是文件格式必须是CSV格式的。其他文件格式也支持(在Flink还有Connector的来支持其他格式或者自定义TableSource)
//创建流式计算的上下文环境 val env = StreamExecutionEnvironment.getExecutionEnvironment //创建Table API的上下文环境 val tableEvn = StreamTableEnvironment.create(env) val source = new CsvTableSource("D:\\code\\StudyFlink\\data\\tableexamples" , Array[String]("id", "name", "score") , Array(Types.INT, Types.STRING, Types.DOUBLE) ) //将source注册成一张表 别名:exampleTab tableEvn.registerTableSource("exampleTab",source) tableEvn.scan("exampleTab").printSchema()
- 代码最后不需要env.execute(),这并不是一个流式计算任务
- 从DataStream中创建Table(动态表)
前面已经知道Table API是构建在DataStream API和DataSet API之上的一层更高级的抽象,因此用户可以灵活地使用Table API将Table转换成DataStream或DataSet数据集,也可以将DataSteam或DataSet数据集转换成Table,这和Spark中的DataFrame和RDD的关系类似
2.修改Table中字段名
Flink支持把自定义POJOs类的所有case类的属性名字变成字段名,也可以通过基于字段偏移位置和字段名称两种方式重新修改:
//导入table库中的隐式转换 import org.apache.flink.table.api.scala._ // 基于位置重新指定字段名称为"field1", "field2", "field3" val table = tStreamEnv.fromDataStream(stream, 'field1, 'field2, 'field3) // 将DataStream转换成Table,并且将字段名称重新成别名 val table: Table = tStreamEnv.fromDataStream(stream, 'rowtime as 'newTime, 'id as 'newId,'variable as 'newVariable)
注意:要导入隐式转换。如果使用as 修改字段,必须修改表中所有的字段。
3.查询和过滤
在Table对象上使用select操作符查询需要获取的指定字段,也可以使用filter或where方法过滤字段和检索条件,将需要的数据检索出来。
object TableAPITest { def main(args: Array[String]): Unit = { val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment streamEnv.setParallelism(1) //初始化Table API的上下文环境 val tableEvn =StreamTableEnvironment.create(streamEnv) //导入隐式转换,建议写在这里,可以防止IDEA代码提示出错的问题 import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api.scala._ val data = streamEnv.socketTextStream("hadoop101",8888) .map(line=>{ var arr =line.split(",") new StationLog(arr(0).trim,arr(1).trim,arr(2).trim,arr(3).trim,arr(4).trim.toLong,arr(5).trim.toLong) }) val table: Table = tableEvn.fromDataStream(data) //查询 tableEvn.toAppendStream[Row]( table.select('sid,'callType as 'type,'callTime,'callOut)) .print() //过滤查询 tableEvn.toAppendStream[Row]( table.filter('callType==="success") //filter .where('callType==="success")) //where .print() tableEvn.execute("sql") }
其中toAppendStream函数是吧Table对象转换成DataStream对象。
4.分组聚合
举例:我们统计每个基站的日志数量。
val table: Table = tableEvn.fromDataStream(data) tableEvn.toRetractStream[Row]( table.groupBy('sid).select('sid, 'sid.count as 'logCount)) .filter(_._1==true) //返回的如果是true才是Insert的数据 .print()
在代码中可以看出,使用toAppendStream和toRetractStream方法将Table转换为DataStream[T]数据集,T可以是Flink自定义的数据格式类型Row,也可以是用户指定的数据格式类型。在使用toRetractStream方法时,返回的数据类型结果为DataStream[(Boolean,T)],Boolean类型代表数据更新类型,True对应INSERT操作更新的数据,False对应DELETE操作更新的数据。
5.UDF自定义的函数
用户可以在Table API中自定义函数类,常见的抽象类和接口是:
- ScalarFunction
- TableFunction
- AggregateFunction
- TableAggregateFunction
案例:使用Table完成基于流的WordCount
object TableAPITest2 { def main(args: Array[String]): Unit = { val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment streamEnv.setParallelism(1) //初始化Table API的上下文环境 val tableEvn =StreamTableEnvironment.create(streamEnv) //导入隐式转换,建议写在这里,可以防止IDEA代码提示出错的问题 import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api.scala._ val stream: DataStream[String] = streamEnv.socketTextStream("hadoop101",8888) val table: Table = tableEvn.fromDataStream(stream,'words) var my_func =new MyFlatMapFunction()//自定义UDF val result: Table = table.flatMap(my_func('words)).as('word, 'count) .groupBy('word) //分组 .select('word, 'count.sum as 'c) //聚合 tableEvn.toRetractStream[Row](result) .filter(_._1==true) .print() tableEvn.execute("table_api") } //自定义UDF class MyFlatMapFunction extends TableFunction[Row]{ //定义类型 override def getResultType: TypeInformation[Row] = { Types.ROW(Types.STRING, Types.INT) } //函数主体 def eval(str:String):Unit ={ str.trim.split(" ") .foreach({word=>{ var row =new Row(2) row.setField(0,word) row.setField(1,1) collect(row) }}) } } }
6.Window
Flink支持ProcessTime、EventTime和IngestionTime三种时间概念,针对每种时间概念,Flink Table API中使用Schema中单独的字段来表示时间属性,当时间字段被指定后,就可以在基于时间的操作算子中使用相应的时间属性。
在Table API中通过使用.rowtime来定义EventTime字段,在ProcessTime时间字段名后使用.proctime后缀来指定ProcessTime时间属性.
案例:统计最近5秒钟,每个基站的呼叫数量
object TableAPITest { def main(args: Array[String]): Unit = { val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment //指定EventTime为时间语义 streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) streamEnv.setParallelism(1) //初始化Table API的上下文环境 val tableEvn =StreamTableEnvironment.create(streamEnv) //导入隐式转换,建议写在这里,可以防止IDEA代码提示出错的问题 import org.apache.flink.streaming.api.scala._ import org.apache.flink.table.api.scala._ val data = streamEnv.socketTextStream("hadoop101",8888) .map(line=>{ var arr =line.split(",") new StationLog(arr(0).trim,arr(1).trim,arr(2).trim,arr(3).trim,arr(4).trim.toLong,arr(5).trim.toLong) }) .assignTimestampsAndWatermarks( //引入Watermark new BoundedOutOfOrdernessTimestampExtractor[StationLog](Time.seconds(2)){//延迟2秒 override def extractTimestamp(element: StationLog) = { element.callTime } }) //设置时间属性 val table: Table = tableEvn.fromDataStream(data,'sid,'callOut,'callIn,'callType,'callTime.rowtime) //滚动Window ,第一种写法 val result: Table = table.window(Tumble over 5.second on 'callTime as 'window) //第二种写法 val result: Table = table.window(Tumble.over("5.second").on("callTime").as("window")) .groupBy('window, 'sid) .select('sid, 'window.start, 'window.end, 'window.rowtime, 'sid.count) //打印结果 tableEvn.toRetractStream[Row](result) .filter(_._1==true) .print() tableEvn.execute("sql") } }
上面的案例是滚动窗口,如果是滑动窗口也是一样,代码如下:
//滑动窗口,窗口大小为:10秒,滑动步长为5秒 :第一种写法 table.window(Slide over 10.second every 5.second on 'callTime as 'window) //滑动窗口第二种写法 table.window(Slide.over("10.second").every("5.second").on("callTime").as("window"))
Flink SQL
企业中Flink SQL比Table API用的多。
Flink SQL 是 Apache Flink 提供的一种使用 SQL 查询和处理数据的方式。它允许用户通过 SQL 语句对数据流或批处理数据进行查询、转换和分析,无需编写复杂的代码。Flink SQL 提供了一种更直观、易于理解和使用的方式来处理数据,同时也可以与 Flink 的其他功能无缝集成。
Flink SQL 支持 ANSI SQL 标准,并提供了许多扩展和优化来适应流式处理和批处理场景。它能够处理无界数据流,具备事件时间和处理时间的语义,支持窗口、聚合、连接等常见的数据操作,还提供了丰富的内置函数和扩展插件机制。
下面是一个简单的 Flink SQL 代码示例,展示了如何使用 Flink SQL 对流式数据进行查询和转换。
import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import java.util.Properties; public class FlinkSqlExample { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 设置并行度为1,方便观察输出结果 // 创建 Kafka 数据源 Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); properties.setProperty("group.id", "flink-consumer"); FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("input-topic", new SimpleStringSchema(), properties); DataStream<String> sourceStream = env.addSource(kafkaConsumer); // 注册数据源表 env.createTemporaryView("source_table", sourceStream, "message"); // 执行 SQL 查询和转换 String query = "SELECT message, COUNT(*) AS count FROM source_table GROUP BY message"; DataStream<Result> resultStream = env.sqlQuery(query).map(value -> new Result(value.getField(0), value.getField(1))); // 打印结果 resultStream.print(); env.execute("Flink SQL Example"); } // 自定义结果类 public static class Result { public String message; public Long count; public Result() {} public Result(String message, Long count) { this.message = message; this.count = count; } @Override public String toString() { return "Result{" + "message='" + message + '\'' + ", count=" + count + '}'; } } }
在上述示例中,我们使用 Apache Kafka 作为数据源,并创建了一个消费者从名为 "input-topic" 的 Kafka 主题中读取数据。然后,我们将数据流注册为名为 "source_table" 的临时表。
接下来,我们使用 Flink SQL 执行 SQL 查询和转换。在这个例子中,我们查询 "source_table" 表,对 "message" 字段进行分组并计算每个消息出现的次数。查询结果会映射到自定义的 Result
类,并最终通过 print()
方法打印到标准输出。
最后,我们通过调用 env.execute()
方法来启动 Flink 作业的执行。
Flink的复杂事件处理CEP
复杂事件处理(CEP)是一种基于流处理的技术,将系统数据看作不同类型的事件,通过分析事件之间的关系,建立不同的事件关系序列库,并利用过滤、关联、聚合等技术,最终由简单事件产生高级事件,并通过模式规则的方式对重要信息进行跟踪和分析,从实时数据中发掘有价值的信息。复杂事件处理主要应用于防范网络欺诈、设备故障检测、风险规避和智能营销等领域。Flink基于DataStrem API提供了FlinkCEP组件栈,专门用于对复杂事件的处理,帮助用户从流式数据中发掘有价值的信息。
CEP(Complex Event Processing)就是在无界事件流中检测事件模式,让我们掌握数据中重要的部分。flink CEP是在flink中实现的复杂事件处理库。
CEP相关概念
配置依赖
在使用FlinkCEP组件之前,需要将FlinkCEP的依赖库引入项目工程中。
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-cep-scala_2.11</artifactId> <version>1.9.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-cep-scala_2.11</artifactId> <version>1.9.1</version> </dependency>
事件定义
- 简单事件:简单事件存在于现实场景中,主要的特点为处理单一事件,事件的定义可以直接观察出来,处理过程中无须关注多个事件之间的关系,能够通过简单的数据处理手段将结果计算出来。
- 复杂事件:相对于简单事件,复杂事件处理的不仅是单一的事件,也处理由多个事件组成的复合事件。复杂事件处理监测分析事件流(Event Streaming),当特定事件发生时来触发某些动作。
复杂事件中事件与事件之间包含多种类型关系,常见的有时序关系、聚合关系、层次关系、依赖关系及因果关系等。
Pattern API
Flink CEP中提供了Pattern API用于对输入流数据的复杂事件规则定义,并从事件流中抽取事件结果。包含四个步骤:
- 输入事件流的创建
- Pattern的定义
- Pattern应用在事件流上检测
- 选取结果
模式定义
定义Pattern可以是单次执行模式,也可以是循环执行模式。单词执行模式一次只接受一个事件,循环执行模式可以接收一个或者多个事件。通常情况下,可以通过指定循环次数将单次执行模式变为循环执行模式。每种模式能够将多个条件组合应用到同一事件之上,条件组合可以通过where方法进行叠加。每个Pattern都是通过begin方法定义的
val start = Pattern.begin[Event]("start_pattern")
下一步通过Pattern.where()方法在Pattern上指定Condition,只有当Condition满足之后,当前的Pattern才会接受事件。
start.where(_.getCallType == "success")
设置循环次数
对于已经创建好的Pattern,可以指定循环次数,形成循环执行的Pattern。
- times:可以通过times指定固定的循环执行次数。
//指定循环触发4次 start.times(4); //可以执行触发次数范围,让循环执行次数在该范围之内 start.times(2, 4);
- optional:也可以通过optional关键字指定要么不触发要么触发指定的次数。
start.times(4).optional(); start.times(2, 4).optional();
- greedy:可以通过greedy将Pattern标记为贪婪模式,在Pattern匹配成功的前提下,会尽可能多地触发。
//触发2、3、4次,尽可能重复执行 start.times(2, 4).greedy(); //触发0、2、3、4次,尽可能重复执行 start.times(2, 4).optional().greedy();
- oneOrMore:可以通过oneOrMore方法指定触发一次或多次。
// 触发一次或者多次 start.oneOrMore(); //触发一次或者多次,尽可能重复执行 start.oneOrMore().greedy(); // 触发0次或者多次 start.oneOrMore().optional(); // 触发0次或者多次,尽可能重复执行 start.oneOrMore().optional().greedy();
- timesOrMore:通过timesOrMore方法可以指定触发固定次数以上,例如执行两次以上。
// 触发两次或者多次 start.timesOrMore(2); // 触发两次或者多次,尽可能重复执行 start.timesOrMore(2).greedy(); // 不触发或者触发两次以上,尽可能重复执行 start.timesOrMore(2).optional().greedy();
定义条件
每个模式都需要指定触发条件,作为事件进入到该模式是否接受的判断依据,当事件中的数值满足了条件时,便进行下一步操作。在FlinkCFP中通过pattern.where()、pattern.or()及pattern.until()方法来为Pattern指定条件,且Pattern条件有Simple Conditions及Combining Conditions等类型。
- 简单条件:Simple Condition继承于Iterative Condition类,其主要根据事件中的字段信息进行判断,决定是否接受该事件。
// 把通话成功的事件挑选出来 start.where(_.getCallType == "success")
- 组合条件:组合条件是将简单条件进行合并,通常情况下也可以使用where方法进行条件的组合,默认每个条件通过AND逻辑相连。如果需要使用OR逻辑,直接使用or方法连接条件即可。
// 把通话成功,或者通话时长大于10秒的事件挑选出来 val start = Pattern.begin[StationLog]("start_pattern") .where(_.callType=="success") .or(_.duration>10)
- 终止条件:如果程序中使用了oneOrMore或者oneOrMore().optional()方法,则必须指定终止条件,否则模式中的规则会一直循环下去,如下终止条件通过until()方法指定。
pattern.oneOrMore.until(_.callOut.startsWith("186"))
模式序列
将相互独立的模式进行组合然后形成模式序列。模式序列基本的编写方式和独立模式一致,各个模式之间通过邻近条件进行连接即可,其中有严格邻近、宽松邻近、非确定宽松邻近三种邻近连接条件。
- 严格邻近:严格邻近条件中,需要所有的事件都按照顺序满足模式条件,不允许忽略任意不满足的模式。
val strict: Pattern[Event] = start.next("middle").where(...)
- 宽松邻近:在宽松邻近条件下,会忽略没有成功匹配模式条件,并不会像严格邻近要求得那么高,可以简单理解为OR的逻辑关系。
val relaxed: Pattern[Event, _] = start.followedBy("middle").where(...)
- 非确定宽松邻近:和宽松邻近条件相比,非确定宽松邻近条件指在模式匹配过程中可以忽略已经匹配的条件。
val nonDetermin: Pattern[Event, _] = start.followedByAny("middle").where(...)
- 除以上模式序列外,还可以定义“不希望出现某种近邻关系”:
.notNext() —— 不想让某个事件严格紧邻前一个事件发生。
.notFollowedBy() —— 不想让某个事件在两个事件之间发生。
注意:
- 所有模式序列必须以 .begin() 开始
- 模式序列不能以 .notFollowedBy() 结束
- “not” 类型的模式不能被 optional 所修饰
- 此外,还可以为模式指定时间约束,用来要求在多长时间内匹配有效
//指定模式在10秒内有效 pattern.within(Time.seconds(10));
模式检测
调用 CEP.pattern(),给定输入流和模式,就能得到一个 PatternStream
//cep 做模式检测 val patternStream = CEP.pattern[EventLog](dataStream.keyBy(_.id),pattern)
选择结果
得到PatternStream类型的数据集后,接下来数据获取都基于PatternStream进行。该数据集中包含了所有的匹配事件。目前在FlinkCEP中提供select和flatSelect两种方法从PatternStream提取事件结果事件。
通过Select Funciton抽取正常事件
可以通过在PatternStream的Select方法中传入自定义Select Funciton完成对匹配事件的转换与输出。其中Select Funciton的输入参数为Map[String, Iterable[IN]],Map中的key为模式序列中的Pattern名称,Value为对应Pattern所接受的事件集合,格式为输入事件的数据类型。
def selectFunction(pattern : Map[String, Iterable[IN]]): OUT = { //获取pattern中的startEvent val startEvent = pattern.get("start_pattern").get.next //获取Pattern中middleEvent val middleEvent = pattern.get("middle").get.next //返回结果 OUT(startEvent, middleEvent)}
通过Flat Select Funciton抽取正常事件
Flat Select Funciton和Select Function相似,不过Flat Select Funciton在每次调用可以返回任意数量的结果。因为Flat Select Funciton使用Collector作为返回结果的容器,可以将需要输出的事件都放置在Collector中返回。
def flatSelectFn(pattern : Map[String, Iterable[IN]], collector : Collector[OUT]) = { //获取pattern中startEvent val startEvent = pattern.get("start_pattern").get.next //获取Pattern中middleEvent val middleEvent = pattern.get("middle").get.next //并根据startEvent的Value数量进行返回 for (i <- 0 to startEvent.getValue) { collector.collect(OUT(startEvent, middleEvent)) }}
通过Select Funciton抽取超时事件
如果模式中有within(time),那么就很有可能有超时的数据存在,通过PatternStream. Select方法分别获取超时事件和正常事件。首先需要创建OutputTag来标记超时事件,然后在PatternStream.select方法中使用OutputTag,就可以将超时事件从PatternStream中抽取出来。
// 通过CEP.pattern方法创建 PatternStream val patternStream: PatternStream[Event] = CEP.pattern(input, pattern) //创建OutputTag,并命名为timeout-output val timeoutTag = OutputTag[String]("timeout-output") //调用PatternStream select()并指定timeoutTag val result: SingleOutputStreamOperator[NormalEvent] = patternStream.select(timeoutTag){ //超时事件获取 (pattern: Map[String, Iterable[Event]], timestamp: Long) => TimeoutEvent()//返回异常事件 } { //正常事件获取 pattern: Map[String, Iterable[Event]] => NormalEvent() //返回正常事件 } //调用getSideOutput方法,并指定timeoutTag将超时事件输出val timeoutResult: DataStream[TimeoutEvent] = result.getSideOutput(timeoutTag)
Flink内存优化
在大数据领域,大多数开源框架(Hadoop、Spark、Flink)都是基于JVM运行,但是JVM的内存管理机制往往存在着诸多类似OutOfMemoryError的问题,主要是因为创建过多的对象实例而超过JVM的最大堆内存限制,却没有被有效回收掉,这在很大程度上影响了系统的稳定性,尤其对于大数据应用,面对大量的数据对象产生,仅仅靠JVM所提供的各种垃圾回收机制很难解决内存溢出的问题。在开源框架中有很多框架都实现了自己的内存管理,例如Apache Spark的Tungsten项目,在一定程度上减轻了框架对JVM垃圾回收机制的依赖,从而更好地使用JVM来处理大规模数据集。
Flink也基于JVM实现了自己的内存管理,将JVM根据内存区分为Unmanned Heap、Flink Managed Heap、Network Buffers三个区域。在Flink内部对Flink Managed Heap进行管理,在启动集群的过程中直接将堆内存初始化成Memory Pages Pool,也就是将内存全部以二进制数组的方式占用,形成虚拟内存使用空间。新创建的对象都是以序列化成二进制数据的方式存储在内存页面池中,当完成计算后数据对象Flink就会将Page置空,而不是通过JVM进行垃圾回收,保证数据对象的创建永远不会超过JVM堆内存大小,也有效地避免了因为频繁GC导致的系统稳定性问题。
JobManager配置
JobManager在Flink系统中主要承担管理集群资源、接收任务、调度Task、收集任务状态以及管理TaskManager的功能,JobManager本身并不直接参与数据的计算过程中,因此JobManager的内存配置项不是特别多,只要指定JobManager堆内存大小即可。
jobmanager.heap.size:设定JobManager堆内存大小,默认为1024MB。
TaskManager配置
TaskManager作为Flink集群中的工作节点,所有任务的计算逻辑均执行在TaskManager之上,因此对TaskManager内存配置显得尤为重要,可以通过以下参数配置对TaskManager进行优化和调整。
- taskmanager.heap.size:设定TaskManager堆内存大小,默认值为1024M,如果在Yarn的集群中,TaskManager取决于Yarn分配给TaskManager Container的内存大小,且Yarn环境下一般会减掉一部分内存用于Container的容错。
- taskmanager.jvm-exit-on-oom:设定TaskManager是否会因为JVM发生内存溢出而停止,默认为false,当TaskManager发生内存溢出时,也不会导致TaskManager停止。
- taskmanager.memory.size:设定TaskManager内存大小,默认为0,如果不设定该值将会使用taskmanager.memory.fraction作为内存分配依据。
- taskmanager.memory.fraction:设定TaskManager堆中去除Network Buffers内存后的内存分配比例。该内存主要用于TaskManager任务排序、缓存中间结果等操作。例如,如果设定为0.8,则代表TaskManager保留80%内存用于中间结果数据的缓存,剩下20%的内存用于创建用户定义函数中的数据对象存储。注意,该参数只有在taskmanager.memory.size不设定的情况下才生效。
- taskmanager.memory.off-heap:设置是否开启堆外内存供Managed Memory或者Network Buffers使用。
- taskmanager.memory.preallocate:设置是否在启动TaskManager过程中直接分配TaskManager管理内存。
- taskmanager.numberOfTaskSlots:每个TaskManager分配的slot数量。
Flink的网络缓存优化
Flink将JVM堆内存切分为三个部分,其中一部分为Network Buffers内存。Network Buffers内存是Flink数据交互层的关键内存资源,主要目的是缓存分布式数据处理过程中的输入数据。。通常情况下,比较大的Network Buffers意味着更高的吞吐量。如果系统出现“Insufficient number of network buffers”的错误,一般是因为Network Buffers配置过低导致,因此,在这种情况下需要适当调整TaskManager上Network Buffers的内存大小,以使得系统能够达到相对较高的吞吐量。
目前Flink能够调整Network Buffer内存大小的方式有两种:一种是通过直接指定Network Buffers内存数量的方式,另外一种是通过配置内存比例的方式。
设定Network Buffer内存数量(过时了)
直接设定Nework Buffer数量需要通过如下公式计算得出:
NetworkBuffersNum = total-degree-of-parallelism \* intra-node-parallelism * n
其中total-degree-of-parallelism表示每个TaskManager的总并发数量,intra-node-parallelism表示每个TaskManager输入数据源的并发数量,n表示在预估计算过程中Repar-titioning或Broadcasting操作并行的数量。intra-node-parallelism通常情况下与Task-Manager的所占有的CPU数一致,且Repartitioning和Broadcating一般下不会超过4个并发。可以将计算公式转化如下:
NetworkBuffersNum = <slots-per-TM>^2 \* < TMs>* 4
其中slots-per-TM是每个TaskManager上分配的slots数量,TMs是TaskManager的总数量。对于一个含有20个TaskManager,每个TaskManager含有8个Slot的集群来说,总共需要的Network Buffer数量为8^2**20*4=5120个,因此集群中配置Network Buffer内存的大小约为160M较为合适。
计算完Network Buffer数量后,可以通过添加如下两个参数对Network Buffer内存进行配置。其中segment-size为每个Network Buffer的内存大小,默认为32KB,一般不需要修改,通过设定numberOfBuffers参数以达到计算出的内存大小要求。
- taskmanager.network.numberOfBuffers:指定Network堆栈Buffer内存块的数量。
- taskmanager.memory.segment-size:内存管理器和Network栈使用的内存Buffer大小,默认为32KB。
设定Network内存比例(推荐)
从1.3版本开始,Flink就提供了通过指定内存比例的方式设置Network Buffer内存大小。
- taskmanager.network.memory.fraction:JVM中用于Network Buffers的内存比例。
- taskmanager.network.memory.min:最小的Network Buffers内存大小,默认为64MB。
- taskmanager.network.memory.max:最大的Network Buffers内存大小,默认1GB。
- taskmanager.memory.segment-size:内存管理器和Network栈使用的Buffer大小,默认为32KB。
本篇文章就到这里,感谢阅读,如果本篇博客有任何错误和建议,欢迎给我留言指正。文章持续更新