Fire框架
Fire框架是由中通大数据自主研发并开源的、专门用于进行Spark和Flink任务开发的大数据框架,可节约70%以上的代码量。首创基于注解进行Spark和Flink任务开发,具备实时血缘、根因诊断、动态调优、参数热调整等众多平台化功能。Fire框架在中通内部每天处理数据量高达数千亿,在外部已被数十家公司所使用。
用户参数配置
Fire框架支持基于接口、apollo、配置文件以及注解等多种方式配置,支持将spark&flink等引擎参数、fire框架参数以及用户自定义参数混合配置,支持运行时动态修改配置。几种常用配置方式如下(fire内置参数):
基于配置文件: 创建类名同名的properties文件进行参数配置 基于接口配置:fire框架提供了配置接口调用,通过接口获取所需的配置,可用于平台化的配置管理 基于注解配置: 通过注解的方式实现集群环境、connector、调优参数的配置
基于注解
// 通用的配置注解,支持任意的参数,还可以替代connector(如@Hive、@Kafka)类型参数,支持注释和多行配置 @Config( """ |# 支持Flink调优参数、Fire框架参数、用户自定义参数等 |state.checkpoints.num-retained=30 |state.checkpoints.dir=hdfs:///user/flink/checkpoint |my.conf=hello |""") // 配置连接到指定的hive,支持别名:@Hive("test"),别名需在cluster.properties中指定 @Hive("thrift://localhost:9083") // 100s做一次checkpoint,开启非对齐checkpoint,还支持checkpoint其他设置,如超时时间,两次checkpoint间隔时间等 @Checkpoint(interval = 100, unaligned = true) // 配置kafka connector,多个kafka消费通过不同数值后缀区分:@Kafka2、@Kafka3、@Kafka5等,支持url或别名 @Kafka(brokers = "localhost:9092", topics = "fire", groupId = "fire") // 配置rocketmq connector,同样支持消费多个rocketmq,支持url或别名 @RocketMQ(brokers = "bigdata_test", topics = "fire", groupId = "fire", tag = "*", startingOffset = "latest") // jdbc注解,可自动推断driverClass,支持配置多个jdbc数据源,支持url或别名 @Jdbc(url = "jdbc:mysql://mysql-server:3306/fire", username = "root", password = "..root726") // 配置Hbase数据源,支持配置多HBase集群读写,支持url或别名 @HBase("localhost:2181")
基于配置文件
Fire框架约定,在任务启动时自动加载与该任务同名的,位于resources目录下以.properties结尾的配置文件(支持目录)。配置文件中如果定义了与@Config注解或者其他配置注解相同的配置时,配置文件中的优先级更高。fire框架参数
hive.cluster = test sql.udf.fireUdf.enable = false kafka.brokers.name = bigdata_test # 必须配置项:kafka的topic列表,以逗号分隔 kafka.topics = fire kafka.group.id = fire fire.rest.filter.enable = false fire.config_center.enable = true fire.rest.url.show.enable = true db.jdbc.batch.size3 = 3 stream.checkpoint.interval = 10000 # flink所支持的参数 state.checkpoints.num-retained = 3 state.backend.incremental = true state.backend.rocksdb.files.open = 5000 sql.log.enable = true sql_with.replaceMode.enable = true #fire.thread.pool.size=10 fire.thread.pool.size=6 fire.restful.max.thread=9 fire.jdbc.query.partitions=11 fire.hbase.scan.repartitions=110 fire.acc.log.max.size=22 fire.conf.test=scala
另外,如果同一个项目中有多个任务共用一些配置信息,比如jdbc url、hbase集群地址等,可以将这些公共的配置放到resources目录下名为common.properties配置文件中。这样每个任务在启动前会先加载这个配置文件,实现配置复用。common.properties中的配置优先级低于任务级别的配置。
基于平台
上述两种,无论是基于注解还是基于配置文件,修改参数时,都需要修改代码然后重新编译发布执行。为了节约开发时间,fire框架提供了参数设置接口,实时平台可通过接口调用的方式将web页面中任务级别的配置设置到不同的任务中,以此来实现在web页面中进行实时任务的调优。接口调用的参数优先级要高于配置文件和注解方式。配置获取
Fire框架封装了统一的配置获取api,基于该api,无论是spark还是flink,无论是在Driver | JobManager端还是在Executor | TaskManager端,都可以直接一行代码获取所需配置。这套配置获取api,无需再在flink的map等算子中复写open方法了,用起来十分方便。
this.conf.getString("my.conf") this.conf.getInt("state.checkpoints.num-retained") ...
多集群支持
Fire框架的配置支持N多集群,比如同一个任务中可以同时配置多个HBase、Kafka数据源,使用不同的数值后缀即可区分(keyNum):
// 假设基于注解配置HBase多集群如下: @HBase("localhost:2181") @HBase2(cluster = "192.168.0.1:2181", storageLevel = "DISK_ONLY") // 代码中使用对应的数值后缀进行区分 this.fire.hbasePutDF(hTableName, studentDF, classOf[Student]) // 默认keyNum=1,表示使用@HBase注解配置的集群信息 this.fire.hbasePutDF(hTableName2, studentDF, classOf[Student], keyNum=2) // keyNum=2,表示使用@HBase2注解配置的集群信息
fire内置配置文件
fire.properties:该配置文件中fire框架的总配置文件,位于fire-core包中,其中的配置主要是针对fire框架的,不含有spark或flink引擎的配置 cluster.properties:该配置文件用于存放各公司集群地址相关的映射信息,由于集群地址信息比较敏感,因此单独拿出来作为一个配置文件 spark.properties:该配置文件是spark引擎的总配置文件,位于fire-spark包中,作为spark引擎任务的总配置文件 spark-core.properties:该配置文件位于fire-spark包中,该配置文件用于配置spark core任务 spark-streaming.properties:该配置文件位于fire-spark包中,主要用于spark streaming任务 structured-streaming.properties:该配置文件位于fire-spark包中,用于进行structured streaming任务的配置 flink.properties:该配置文件位于fire-flink包中,作为flink引擎的总配置文件 flink-streaming.properties:该配置文件位于fire-flink包中,用于配置flink streaming任务 flink-batch.properties:该配置文件位于fire-flink包中,用于配置flink批处理任务
spark开发示例
@Config( """ |spark.shuffle.compress=true # 支持任意Spark调优参数、Fire框架参数、用户自定义参数等 |spark.ui.enabled=true |""") @Hive("thrift://localhost:9083") // 配置连接到指定的hive @Streaming(interval = 100, maxRatePerPartition = 100) // 100s一个Streaming batch,并限制消费速率 @Kafka(brokers = "localhost:9092", topics = "fire", groupId = "fire") object SparkDemo extends SparkStreaming { @Process def kafkaSource: Unit = { val dstream = this.fire.createKafkaDirectStream() // 使用api的方式消费kafka sql("""select * from xxx""").show() } }
Flink开发示例
import com.zto.fire._ import com.zto.fire.common.anno.Config import com.zto.fire.core.anno._ import com.zto.fire.flink.BaseFlinkStreaming import com.zto.fire.flink.anno.Checkpoint /** * 基于Fire进行Flink Streaming开发 * * @contact Fire框架技术交流群(钉钉):35373471 */ @Config( """ |# 支持Flink调优参数、Fire框架参数、用户自定义参数等 |state.checkpoints.num-retained=30 |state.checkpoints.dir=hdfs:///user/flink/checkpoint |""") @Hive("thrift://localhost:9083") // 配置连接到指定的hive @Checkpoint(interval = 100, unaligned = true) // 100s做一次checkpoint,开启非对齐checkpoint @Kafka(brokers = "localhost:9092", topics = "fire", groupId = "fire") object FlinkDemo extends BaseFlinkStreaming { /** process方法中编写业务逻辑代码,该方法会被fire框架自动调起 **/ override def process: Unit = { val dstream = this.fire.createKafkaDirectStream() // 使用api的方式消费kafka this.fire.sql("""create table statement ...""") this.fire.sql("""insert into statement ...""") this.fire.start } }
任务提交脚本
#!/bin/bash # author: wangchenglong # date: 2022-06-30 13:10:13 # desc:提交flink任务通用脚本 # usage:./deploy.sh com.zto.fire.examples.flink.Test export FLINK_HOME=/opt/flink-1.14.3 export PATH=$FLINK_HOME/bin:$PATH # 以run application模式提交flink任务到yarn上 flink run-application -t yarn-application \ # 使用run-application模式提交,让flink任务与实时平台具有交互能力 -D taskmanager.memory.process.size=4g \ -D state.checkpoints.dir=hdfs:///user/flink/checkpoint/fire \ -D flink.stream.checkpoint.interval=6000 \ -D fire.shutdown.auto.exit=true \ # 可通过-D方式指定flink引擎参数、fire框架参数或用户自定义参数,代码中通过this.conf.get获取参数值 --allowNonRestoredState \ -s hdfs:/user/flink/checkpoint/xxx/chk-5/_metadata \ # 指定checkpoint路径 -ynm fire_test -yqu root.default -ynm test -ys 1 -ytm 2g -c $1 zto-flink*.jar $*
#!/bin/bash # author: wangchenglong # date: 2022-06-30 13:24:13 # desc:提交spark任务通用脚本 # usage:./deploy.sh com.zto.fire.examples.spark.Test export SPARK_HOME=/opt/spark3.0.2 export PATH=$SPARK_HOME/bin:$PATH # 以cluster模式提交spark任务到yarn上 spark-submit \ --master yarn --deploy-mode cluster --class $1 --num-executors 20 --executor-cores 1 \ --driver-memory 1g --executor-memory 1g \ --conf fire.shutdown.auto.exit=true \ # 可通过--conf方式指定spark引擎参数、fire框架参数或用户自定义参数,通过this.conf.get获取参数值 ./zto-spark*.jar $*
定时任务
Fire框架内部进一步封装了quart进行定时任务的声明与调度,使用方法和spring的@Scheduled注解类似。参考:示例程序。基于该功能,可以很容易实现诸如定时加载与更新维表等功能,十分方便。
/** * 声明了@Scheduled注解的方法是定时任务方法,会周期性执行 * * @cron cron表达式 * @scope 默认同时在driver端和executor端执行,如果指定了driver,则只在driver端定时执行 * @concurrent 上一个周期定时任务未执行完成时是否允许下一个周期任务开始执行 * @startAt 用于指定第一次开始执行的时间 * @initialDelay 延迟多长时间开始执行第一次定时任务 */ @Scheduled(cron = "0/5 * * * * ?", scope = "driver", concurrent = false, startAt = "2021-01-21 11:30:00", initialDelay = 60000) def loadTable: Unit = { this.logger.info("更新维表动作") } /** * 只在driver端执行,不允许同一时刻同时执行该方法 * startAt用于指定首次执行时间 */ @Scheduled(cron = "0/5 * * * * ?", scope = "all", concurrent = false) def test2: Unit = { this.logger.info("executorId=" + SparkUtils.getExecutorId + "=方法 test2() 每5秒执行" + DateFormatUtils.formatCurrentDateTime()) } // 每天凌晨4点01将锁标志设置为false,这样下一个批次就可以先更新维表再执行sql @Scheduled(cron = "0 1 4 * * ?") def updateTableJob: Unit = this.lock.compareAndSet(true, false)
注:目前定时任务不支持flink任务在每个TaskManager端执行。
动态调整checkpoint
Fire框架为Flink checkpoint提供了增强,可以做到运行时动态调整checkpoint的相关参数,达到不重启任务即可实现动态调优的目的。Flink开发者只需集成集成Fire框架 ,就可以在运行时通过调用Fire框架提供的restful接口,从而实现动态调整checkpoint参数的目的了。集成了Fire框架的flink任务在运行起来以后,可以在flink的webui的Job Manager -> Configuration中查看到restful接口地址:找到接口地址以后,通过curl命令调用该接口即可实现动态调优:
curl -H "Content-Type:application/json" -X POST --data '{"interval":60000,"minPauseBetween": 60000, "timeout": 60000}' http://ip:5753/system/checkpoint
fire内置的restful接口
fire框架在提供丰富好用的api给开发者的同时,也提供了大量的restful接口给大数据实时计算平台。通过对外暴露的restful接口,可以将每个任务与实时平台进行深入绑定,为平台建设提供了更大的想象空间。其中包括:实时热重启接口、动态批次时间调整接口、sql在线调试接口、Arthas诊断jvm、实时血缘分析等。fire框架参数
Fire框架提供了很多参数,这些参数为个性化调优带来了很大的灵活性。参数大体分为:fire框架参数(fire.properties)、spark引擎参数(spark.properties)、flink引擎参数(flink.properties)、kafka参数、hbase参数等。详见以下列表: