物流项目中SparkSQL的相关调优

简介: 编写完成从Kafka消费数据,打印控制台上,其中创建SparkSession实例对象时,需要设置参数值。

实时ETL开发之流计算程序【编程】


编写完成从Kafka消费数据,打印控制台上,其中创建SparkSession实例对象时,需要设置参数值。


package cn.itcast.logistics.etl.realtime
import cn.itcast.logistics.common.Configuration
import org.apache.commons.lang3.SystemUtils
import org.apache.spark.SparkConf
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
 * 编写StructuredStreaming程序,实时从Kafka消息数据(物流相关数据和CRM相关数据),打印控制台Console
   * 1. 初始化设置Spark Application配置
   * 2. 判断Spark Application运行模式进行设置
   * 3. 构建SparkSession实例对象
   * 4. 初始化消费物流Topic数据参数
   * 5. 消费物流Topic数据,打印控制台
   * 6. 初始化消费CRM Topic数据参数
   * 7. 消费CRM Topic数据,打印控制台
   * 8. 启动流式应用,等待终止
 */
object LogisticsEtlApp {
  def main(args: Array[String]): Unit = {
    // step1. 构建SparkSession实例对象,设置相关属性参数值
    // 1. 初始化设置Spark Application配置
    val sparkConf = new SparkConf()
        .setAppName(this.getClass.getSimpleName.stripSuffix("$"))
      .set("spark.sql.session.timeZone", "Asia/Shanghai")
      .set("spark.sql.files.maxPartitionBytes", "134217728")
      .set("spark.sql.files.openCostInBytes", "134217728")
      .set("spark.sql.shuffle.partitions", "3")
      .set("spark.sql.autoBroadcastJoinThreshold", "67108864")
    // 2. 判断Spark Application运行模式进行设置
    if (SystemUtils.IS_OS_WINDOWS || SystemUtils.IS_OS_MAC) {
      //本地环境LOCAL_HADOOP_HOME
      System.setProperty("hadoop.home.dir", Configuration.LOCAL_HADOOP_HOME)
      //设置运行环境和checkpoint路径
      sparkConf
        .set("spark.master", "local[3]")
        .set("spark.sql.streaming.checkpointLocation", Configuration.SPARK_APP_WIN_CHECKPOINT_DIR)
    } else {
      //生产环境
      sparkConf
        .set("spark.master", "yarn")
        .set("spark.sql.streaming.checkpointLocation", Configuration.SPARK_APP_DFS_CHECKPOINT_DIR)
    }
    // 3. 构建SparkSession实例对象
    val spark: SparkSession = SparkSession.builder()
        .config(sparkConf)
      .getOrCreate()
    import spark.implicits._
    // step2. 从Kafka实时消费数据,设置Kafka Server地址和Topic名称
    // step3. 将ETL转换后数据打印到控制台,启动流式应用
    // 4. 初始化消费物流Topic数据参数
    val logisticsDF: DataFrame = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "node2.itcast.cn:9092")
      .option("subscribe", "logistics")
      .option("maxOffsetsPerTrigger", "100000")
      .load()
    // 5. 消费物流Topic数据,打印控制台
    logisticsDF.writeStream
      .queryName("query-logistics-console")
      .outputMode(OutputMode.Append())
      .format("console")
      .option("numRows", "10")
      .option("truncate", "false")
      .start()
    // 6. 初始化消费CRM Topic数据参数
    val crmDF: DataFrame = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "node2.itcast.cn:9092")
      .option("subscribe", "crm")
      .option("maxOffsetsPerTrigger", "100000")
      .load()
    // 7. 消费CRM Topic数据,打印控制
    crmDF.writeStream
      .queryName("query-crm-console")
      .outputMode(OutputMode.Append())
      .format("console")
      .option("numRows", "10")
      .option("truncate", "false")
      .start()
    // step4. 流式应用启动以后,等待终止,关闭资源
    // 8. 启动流式应用,等待终止
    spark.streams.active.foreach(query => println("启动Query:" + query.name))
    spark.streams.awaitAnyTermination()
  }
}


SparkSQL 参数调优设置:


  • 1)、设置会话时区:set("spark.sql.session.timeZone", "Asia/Shanghai")


  • 2)、设置读取文件时单个分区可容纳的最大字节数


set("spark.sql.files.maxPartitionBytes", "134217728")


  • 3)、设置合并小文件的阈值:set("spark.sql.files.openCostInBytes", "134217728")


  • 4)、设置 shuffle 分区数:set("spark.sql.shuffle.partitions", "4")


  • 5)、设置执行 join 操作时能够广播给所有 worker 节点的最大字节大小


set("spark.sql.autoBroadcastJoinThreshold", "67108864")

目录
相关文章
|
移动开发 小程序
知识付费小程序注册时类目该如何选择?
知识付费小程序注册时类目该如何选择?
1294 0
|
SQL 消息中间件 存储
PostgreSQL CDC的最佳实践
PostgreSQL CDC的最佳实践
PostgreSQL CDC的最佳实践
|
iOS开发 MacOS Windows
Axure下载及汉化激活
Axure RP 9 的下载、汉化及激活方法。首先从官网下载并安装最新版 Axure RP 9,然后下载并解压语言包,将「lang」文件夹复制到 Axure 安装目录中。Windows 系统路径为 `c://Program Files/Axure/Axure RP 9.0/` 或 `c://Program Files (x86)/Axure/Axure RP 9.0/`,macOS 系统需通过“显示包内容”操作进行粘贴。最后使用提供的激活码完成激活。
2290 0
|
缓存 负载均衡 安全
Servlet与JSP在Java Web应用中的性能调优策略
【6月更文挑战第23天】在Java Web中,Servlet和JSP调优至关重要,以应对高并发和复杂业务带来的性能挑战。优化包括Servlet复用、线程安全、数据库连接池,以及JSP的编译优化、使用JSTL、页面缓存和静态内容分离。全局优化涉及负载均衡、异步处理和缓存策略。通过这些实践,开发者能提升应用响应速度和吞吐量,确保高负载下的稳定运行。
432 7
|
消息中间件 存储 Kafka
微服务中常用的几种通信方式
微服务中常用的几种通信方式
|
网络性能优化 调度
|
监控 网络协议 C#
一款基于C#开发的通讯调试工具(支持Modbus RTU、MQTT调试)
一款基于C#开发的通讯调试工具(支持Modbus RTU、MQTT调试)
423 0
|
关系型数据库 MySQL 数据库连接
mysql从安装到建库,utf8mb4最佳实践,jdbc连接串全解析
mysql从安装到建库,utf8mb4最佳实践,jdbc连接串全解析
4782 0
|
关系型数据库 Unix 数据库
PostgreSQL语句大全
PostgreSQL是一个免费的对象-关系数据库服务器(ORDBMS),在灵活的BSD许可证下发行。PostgreSQL开发者把它念作post-gress-Q-L。PostgreSQL的Slogan是"世界上最先进的开源关系型数据库"。基本语法CREATEFUNCTION//声明创建函数ADD(INTEGER,INTEGER)//定义函数名称RETURNSINTEGER//定义函数返回值'//定义函数体'--使用函数。......
501 1
|
SQL JSON 关系型数据库
【万字长文】Flink cdc源码精讲(推荐收藏)(一)
【万字长文】Flink cdc源码精讲(推荐收藏)
3132 0
【万字长文】Flink cdc源码精讲(推荐收藏)(一)