实时ETL开发之流计算程序【编程】
编写完成从Kafka消费数据,打印控制台上,其中创建SparkSession实例对象时,需要设置参数值。
package cn.itcast.logistics.etl.realtime import cn.itcast.logistics.common.Configuration import org.apache.commons.lang3.SystemUtils import org.apache.spark.SparkConf import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.{DataFrame, SparkSession} /** * 编写StructuredStreaming程序,实时从Kafka消息数据(物流相关数据和CRM相关数据),打印控制台Console * 1. 初始化设置Spark Application配置 * 2. 判断Spark Application运行模式进行设置 * 3. 构建SparkSession实例对象 * 4. 初始化消费物流Topic数据参数 * 5. 消费物流Topic数据,打印控制台 * 6. 初始化消费CRM Topic数据参数 * 7. 消费CRM Topic数据,打印控制台 * 8. 启动流式应用,等待终止 */ object LogisticsEtlApp { def main(args: Array[String]): Unit = { // step1. 构建SparkSession实例对象,设置相关属性参数值 // 1. 初始化设置Spark Application配置 val sparkConf = new SparkConf() .setAppName(this.getClass.getSimpleName.stripSuffix("$")) .set("spark.sql.session.timeZone", "Asia/Shanghai") .set("spark.sql.files.maxPartitionBytes", "134217728") .set("spark.sql.files.openCostInBytes", "134217728") .set("spark.sql.shuffle.partitions", "3") .set("spark.sql.autoBroadcastJoinThreshold", "67108864") // 2. 判断Spark Application运行模式进行设置 if (SystemUtils.IS_OS_WINDOWS || SystemUtils.IS_OS_MAC) { //本地环境LOCAL_HADOOP_HOME System.setProperty("hadoop.home.dir", Configuration.LOCAL_HADOOP_HOME) //设置运行环境和checkpoint路径 sparkConf .set("spark.master", "local[3]") .set("spark.sql.streaming.checkpointLocation", Configuration.SPARK_APP_WIN_CHECKPOINT_DIR) } else { //生产环境 sparkConf .set("spark.master", "yarn") .set("spark.sql.streaming.checkpointLocation", Configuration.SPARK_APP_DFS_CHECKPOINT_DIR) } // 3. 构建SparkSession实例对象 val spark: SparkSession = SparkSession.builder() .config(sparkConf) .getOrCreate() import spark.implicits._ // step2. 从Kafka实时消费数据,设置Kafka Server地址和Topic名称 // step3. 将ETL转换后数据打印到控制台,启动流式应用 // 4. 初始化消费物流Topic数据参数 val logisticsDF: DataFrame = spark.readStream .format("kafka") .option("kafka.bootstrap.servers", "node2.itcast.cn:9092") .option("subscribe", "logistics") .option("maxOffsetsPerTrigger", "100000") .load() // 5. 消费物流Topic数据,打印控制台 logisticsDF.writeStream .queryName("query-logistics-console") .outputMode(OutputMode.Append()) .format("console") .option("numRows", "10") .option("truncate", "false") .start() // 6. 初始化消费CRM Topic数据参数 val crmDF: DataFrame = spark.readStream .format("kafka") .option("kafka.bootstrap.servers", "node2.itcast.cn:9092") .option("subscribe", "crm") .option("maxOffsetsPerTrigger", "100000") .load() // 7. 消费CRM Topic数据,打印控制 crmDF.writeStream .queryName("query-crm-console") .outputMode(OutputMode.Append()) .format("console") .option("numRows", "10") .option("truncate", "false") .start() // step4. 流式应用启动以后,等待终止,关闭资源 // 8. 启动流式应用,等待终止 spark.streams.active.foreach(query => println("启动Query:" + query.name)) spark.streams.awaitAnyTermination() } }
SparkSQL 参数调优设置:
- 1)、设置会话时区:set("spark.sql.session.timeZone", "Asia/Shanghai")
- 2)、设置读取文件时单个分区可容纳的最大字节数
set("spark.sql.files.maxPartitionBytes", "134217728")
- 3)、设置合并小文件的阈值:set("spark.sql.files.openCostInBytes", "134217728")
- 4)、设置 shuffle 分区数:set("spark.sql.shuffle.partitions", "4")
- 5)、设置执行 join 操作时能够广播给所有 worker 节点的最大字节大小
set("spark.sql.autoBroadcastJoinThreshold", "67108864")