1:用户程序启动(SparkLauncher,非驱动程序)时会在当前节点上启动一个SparkSubmit进程,并将驱动程序(即spark任务)发送到任意一个工作节点上,在工作节点上启动DriverWrapper进程
2:驱动程序会从集群管理器(standalone模式下是master服务器)申请执行器资源
3:集群管理器反馈执行器资源给驱动器
4:驱动器Driver将任务发送到执行器节点执行
spark首页监控
可以看到启动的Driver
首页监控-驱动器
进一步可以查看到执行器情况
首页监控-执行器
也可通过服务器进程查看各进程之间的关系
Master节点
工作节点-驱动器
工作节点-执行器1
工作节点-执行器2
3.2、startApplication()方式
@Scheduled(fixedRate = 5000 * 60) public void alarmStatistic() { logger.info("=====>>>>>告警离线统计定时任务!", System.currentTimeMillis()); try { HashMap env = new HashMap(); //这两个属性必须设置 env.put("HADOOP_CONF_DIR", CommonConfig.HADOOP_CONF_DIR); env.put("JAVA_HOME", CommonConfig.JAVA_HOME); CountDownLatch countDownLatch = new CountDownLatch(1); SparkAppHandle handle = new SparkLauncher(env) .setSparkHome(SparkConfig.SPARK_HOME) .setAppResource(CommonConfig.ALARM_JAR_PATH) .setMainClass(CommonConfig.ALARM_JAR_MAIN_CLASS) .setMaster("spark://" + SparkConfig.SPARK_MASTER_HOST + ":" + SparkConfig.SPARK_MASTER_PORT) // .setMaster("yarn") .setDeployMode(SparkConfig.SPARK_DEPLOY_MODE) .setVerbose(SparkConfig.SPARK_VERBOSE) .setConf("spark.app.id", CommonConfig.ALARM_APP_ID) .setConf("spark.driver.memory", SparkConfig.SPARK_DRIVER_MEMORY) .setConf("spark.rpc.message.maxSize", SparkConfig.SPARK_RPC_MESSAGE_MAXSIZE) .setConf("spark.executor.memory", SparkConfig.SPARK_EXECUTOR_MEMORY) .setConf("spark.executor.instances", SparkConfig.SPARK_EXECUTOR_INSTANCES) .setConf("spark.executor.cores", SparkConfig.SPARK_EXECUTOR_CORES) .setConf("spark.default.parallelism", SparkConfig.SPARK_DEFAULT_PARALLELISM) .setConf("spark.driver.allowMultipleContexts", SparkConfig.SPARK_DRIVER_ALLOWMULTIPLECONTEXTS) .setVerbose(true).startApplication(new SparkAppHandle.Listener() { //这里监听任务状态,当任务结束时(不管是什么原因结束),isFinal()方法会返回true,否则返回false @Override public void stateChanged(SparkAppHandle sparkAppHandle) { if (sparkAppHandle.getState().isFinal()) { countDownLatch.countDown(); } System.out.println("state:" + sparkAppHandle.getState().toString()); } @Override public void infoChanged(SparkAppHandle sparkAppHandle) { System.out.println("Info:" + sparkAppHandle.getState().toString()); } }); logger.info("The task is executing, please wait ...."); //线程等待任务结束 countDownLatch.await(); logger.info("The task is finished!"); } catch (Exception e) { logger.error("submit spark task error", e); } }
这种模式下,据本人亲自测试,只有在yarn工作模式下可以提交成功,在standalone模式下总是提交失败,如果有人知道的可以留言告诉我
yarn模式需要安装hadoop集群,提交任务的流程基本和上面是一样的,不同的是集群管理器不在是spark自带的集群管理器,而是由yarn来管理,这也是官方推荐的提交方式,比较麻烦的就是需要安装hadoop集群,hadoop的安装参加另一篇
从监控页面可以看到application的执行情况
yarn监控页面