阿里云MaxCompute-XGBoost on Spark 极限梯度提升算法的分布式训练与模型持久化oss的实现与代码浅析

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
实时计算 Flink 版,5000CU*H 3个月
实时数仓Hologres,5000CU*H 100GB 3个月
简介: 本文介绍了XGBoost在MaxCompute+OSS架构下模型持久化遇到的问题及其解决方案。首先简要介绍了XGBoost的特点和应用场景,随后详细描述了客户在将XGBoost on Spark任务从HDFS迁移到OSS时遇到的异常情况。通过分析异常堆栈和源代码,发现使用的`nativeBooster.saveModel`方法不支持OSS路径,而使用`write.overwrite().save`方法则能成功保存模型。最后提供了完整的Scala代码示例、Maven配置和提交命令,帮助用户顺利迁移模型存储路径。

1. XGBoost简介

XGBoost是一个优化的分布式梯度增强库,旨在实现高效,灵活和便携。它在GBDT框架的基础上实现机器学习算法。XGBoost提供了并行树提升(也称为GBDT,GBM),可以快速准确地解决许多数据科学问题。XGBoost最初是一个研究项目,孵化于Distributed (Deep) Machine Learning Community (DMLC) ,由陈天奇博士负责。

XGBoost除了可以在自建CPU/GPU环境上单机运行外,还可以使用Rabit及XGBoost4J集成到诸如Apache Spark、Apache Hadoop和Apache Flink等数据流框架中进行分布式训练、推理。

2. 背景、问题、需求以及解决难点概述

客户存量算法业务迁移上云阶段,拟使用MaxCompute+oss的架构覆盖计算和存储两部分,但已有正常运行的XGBoost on Spark任务修改模型文件存储路径从hdfs到oss后抛出异常,无法正常持久化存储,影响下游依赖pipline的运行。通过异常堆栈对应的spark及dmlc开源代码定位到写oss实现限制,并通过Spark框架的功能绕过这一限制实现需求。

3. 异常信息及基础环境

3.1. 异常堆栈

由于作业logview有超期限制,这里将报错以截图(客户真实作业)和代码堆栈(本地同代码复现)的形式展示

image.png

org.apache.hadoop.yarn.exceptions.YarnException: com.aliyun.odps.cupid.CupidException: ODPS-0730001: User class threw exception: ml.dmlc.xgboost4j.java.XGBoostError: [18:36:45] /xgboost/dmlc-core/src/io.cc:70: unknown filesystem protocol oss://
Stack trace returned 7 entries:
[bt] (0) /worker/tmp/libxgboost4j26523751622410817.so(dmlc::StackTrace(unsigned long)+0x51) [0x7fc524e72f21]
[bt] (1) /worker/tmp/libxgboost4j26523751622410817.so(dmlc::LogMessageFatal::~LogMessageFatal()+0x1d) [0x7fc524e73d3d]
[bt] (2) /worker/tmp/libxgboost4j26523751622410817.so(dmlc::io::FileSystem::GetInstance(dmlc::io::URI const&)+0x19b) [0x7fc52502a5db]
[bt] (3) /worker/tmp/libxgboost4j26523751622410817.so(dmlc::Stream::Create(char const*, char const*, bool)+0x26) [0x7fc52502a756]
[bt] (4) /worker/tmp/libxgboost4j26523751622410817.so(XGBoosterSaveModel+0x2b) [0x7fc524e7ab0b]
[bt] (5) /worker/tmp/libxgboost4j26523751622410817.so(Java_ml_dmlc_xgboost4j_java_XGBoostJNI_XGBoosterSaveModel+0x2f) [0x7fc524e6e89f]
[bt] (6) [0x7fc551017774]
        at ml.dmlc.xgboost4j.java.XGBoostJNI.checkCall(XGBoostJNI.java:48)
        at ml.dmlc.xgboost4j.java.Booster.saveModel(Booster.java:330)
        at ml.dmlc.xgboost4j.scala.Booster.saveModel(Booster.scala:166)
        at 客户代码.train.HomePageGYLCTRXGBoost$.main(HomePageGYLCTRXGBoost.scala:187)
        at 客户代码.train.HomePageGYLCTRXGBoost.main(HomePageGYLCTRXGBoost.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$4.run(ApplicationMaster.scala:708)
        at org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.applicationReportTransform(YarnClientImpl.java:412)
        at org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.getApplicationReport(YarnClientImpl.java:607)
        at org.apache.spark.deploy.yarn.Client.getApplicationReport(Client.scala:301)
        at org.apache.spark.deploy.yarn.Client.monitorApplication(Client.scala:1079)
        at org.apache.spark.deploy.yarn.Client.run(Client.scala:1182)
        at org.apache.spark.deploy.yarn.YarnClusterApplication.start(Client.scala:1542)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:881)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:197)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:227)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:136)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at com.aliyun.odps.SubmitJob.main(SubmitJob.java:78)

3.2. 对应开源项目基础信息

除上述客户提供的logview作业链接中的异常堆栈外,客户还额外提供了业务代码中,客户侧定位到有关异常部分的开源项目中关键类库的部分代码,见下图(分别对应了XGBoost模型训练、推理、模型保存)

image.png

image.png

根据客户的类名以及堆栈中的package信息,我们找到了客户参考的开源项目:

  1. dmlc xgboost4j-spark对应read docs:https://xgboost.readthedocs.io/en/stable/jvm/xgboost4j_spark_tutorial.html
  2. dmlc xgboost对应github:https://github.com/dmlc/xgboost/tree/master

4. 排查过程及源码浅析

4.1. 从抛错入手:unknow filesystem oss://

  1. 初步看报错信息无法确认客户侧是在master节点单纯使用oss sdk还是通过spark框架以及底层类dfs来做对象的写出
  2. 结合客户描述、给出的部分样例代码(见3.2章节)以及代码中对应的类“XGBoostClassificationModel”的开源代码实现可以判断,该类为org.apache.spark.ml.Model的子类,并且实现了org.apache.spark.ml.util.MLWritable接口,理论上会通过spark框架配置的dfs做数据和模型文件的读写

image.png

所以目前判断并不是master节点的oss sdk业务逻辑的报错,而odps spark作业将oss作为dfs需要额外的相关配置,参考:odps spark使用Jindo sdk访问oss

建议客户添加如下配置

# 通过cupid平台添加Jindo sdk的jar包
spark.hadoop.odps.cupid.resources=public.jindofs-sdk-3.7.2.jar
# oss fs相关实现类配置
spark.hadoop.fs.AbstractFileSystem.oss.impl=com.aliyun.emr.fs.oss.OSS
spark.hadoop.fs.oss.impl=com.aliyun.emr.fs.oss.JindoOssFileSystem
# oss ak身份认证相关配置
spark.hadoop.fs.oss.accessKeyId = xxxxxx
spark.hadoop.fs.oss.accessKeySecret = xxxxxx
spark.hadoop.fs.oss.endpoint = oss-xxxxxx-internal.aliyuncs.com

4.2. 报错还在?

客户反馈问题并未解决,报错还在且堆栈相同。

经过客户提供的作业配置截图以及日志,可以清楚的看到客户通过cupid平台添加了jindofs的相关jar包。并且DataWorks上的提交日志以及spark history ui上的environment也显示了相关的参数通过spark-submit提交并生效。

DataWorks任务提交界面:

image.png

Spark History UI environment界面确认相关参数已配置生效:

image.png

细看堆栈发现:

ml.dmlc.xgboost4j.scala.Booster.saveModel (saveModel方法调用意图保存到oss)

- ml.dmlc.xgboost4j.java.XGBoostJNI.checkCall (dmlc内部的JNI调用)

- libxgboost4j26523751622410817.so(dmlc::xxx)

这里似乎有点不对,按上面的分析结果,读写oss通过spark框架以及配置oss filesystem的实现类jindo sdk来实现读写的,且参数配置正确,该方案也有阿里云上众多客户验证无误并使用。

那这里报错也应该出现的是jindo相关的java堆栈或者cpp native方法(jindo相关实现均采用C++开发)的堆栈,怎么会调用到xgboost相关的动态链接库呢?

image.png

4.3. 拉源码吧 dmlc/xgboost.git

git clone https://github.com/dmlc/xgboost.git

看下客户的代码调用栈

xgbClassificationModel.nativeBooster.saveModel(modelOssDir)

这里客户直接访问了nativeBooster这个成员变量。这里看了下,这个Booster类型的成员变量是整个模型native booster的实例,通常建议是用来去调用一些底层的API,比如getFeatureScore等获取特征得分,计算特征重要性的API的。

ml.dmlc.xgboost4j.scala.Booster#saveModel(modelPath: String): Unit

image.png

ml.dmlc.xgboost4j.java.Booster#saveModel(java.lang.String)

image.png

得,进JNI了

4.4. 还得看cpp

4.4.1. dmlc/xgboost.git:jvm-package/xgboost4j/src/native

对应xgboost4j.h:

image.png

对应xgboost4j.cc源码方法:

image.png

XGBoosterSaveModel对应c_api.cc源码方法:

image.png

到这里为止都是比较常规的调用了,看库和方法的调用也到了Stream流相关的代码,dmlc::Stream::Create(),这部分在github上是以超链接的形式引用的,之前clone的时候不涉及,所以还得拉下dmlc-core的代码

4.4.2. dmlc/dmlc-core.git

git clone https://github.com/dmlc/dmlc-core.git

对应dmlc源码:FileSystem *FileSystem::GetInstance(const URI &path)

image.png

在ide里折叠之后看可能更清晰一些,看这部分的逻辑protocol是oss://的对象存储路径会走到69行然后报错,目前看堆栈与spark报错的堆栈以及报错信息的文本都是符合的。

LOG(FATAL) << "unknown filesystem protocol " + path.protocol;

image.png

进一步做double check,dmlc-core/src/io目录下可以看到不同的filesystem以及部分云厂商对象存储的客户端的实现,这也与客户反馈的在自建hadoop上可以通过同样的命令读写模型文件到HDFS吻合。

结合GetInstance方法与目录下的文件命名来看,比较普遍的本地文件、hdfs、azure、aws s3等都有相关的实现方法(其中s3和azure还在代码里明确的看到了 // Copyright by Contributors相关字样),但是oss的目前看确实是没有的。

image.png

4.5. 真相大白?好像有哪儿没对

很好,真相大白,目前代码看下来客户之前保存到hdfs成功,但是作业迁移到云上写oss报错是符合预期的,io.cc没有oss相关的逻辑代码和实现。

一顿分析猛如虎,刚准备整理下对客解释并拒绝客户说明这部分功能的不兼容,但看到了还没关的IDEA里的Diagram继承图。

诶?有哪儿没对吧,这个MLWritable的trait难道白with了?MLWritable是spark.ml.util里的官方接口,赔本儿赚吆喝是大可不必的,虽然Xgboost4j on spark是第三方的实现,但是从上面类的继承来看整体上都是基于原生spark做的封装。按照原生spark ml相关类库的使用方法,类似的DataSet或者Model的保存都是通过<object>.write方法获取Writer对象再调用save方法或实现Saveable接口保存到dfs。如:

  1. 最常规的Dataset

org.apache.spark.sql.Dataset#write返回DataFrameWriter

org.apache.spark.sql.DataFrameWriter#save通过DataSource做对dfs的写入

  1. KMeansModel

KMeans的保存还有些特殊。org.apache.spark.mllib.clustering.KMeansModel#save,KMeansModel的伴生类直接提供了save方法,内部调用了org.apache.spark.mllib.clustering.KMeansModel伴生对象的org.apache.spark.mllib.clustering.KMeansModel.SaveLoadV2_0#save方法,分别将模型的metadata通过rdd的saveAsTextFile以及聚蔟点的向量通过Dataset写出parquet文件

  1. 类比下xgboost:

ml.dmlc.xgboost4j.scala.spark.XGBoostClassificationModel#write

XGBoostClassificationModel对象也有write方法,返回的是XGBoostClassificationModelWriter,重写了内部方法saveImpl,也有继承自MLWriter的save方法对外提供调用

image.png

在dmlc官方的read doc相关的介绍中,模型持久化的样例代码也是通过

val xgbClassificationModelPath = "/tmp/xgbClassificationModel"
xgbClassificationModel.write.overwrite().save(xgbClassificationModelPath)

来写入的,是不是客户的用法不正确命中了什么BUG导致的?本地来做个测试吧

image.png

4.6. read doc的样例代码居然通了

样例代码以及相关的pom文件,打包命令以及spark-submit命令见「5. 代码实现」

这里做了两次测试

  1. 客户的保存模型代码
xgbClassificationModel.nativeBooster.saveModel(modelOssDir)

可以相同的复现客户的报错,目前结合上述的分析也是预期内的

image.png

  1. read doc保存模型样例代码
xgbClassificationModel.write.overwrite().save(modelOssDir)

居然通了。

通过spark history的web ui,我们可以在driver的日志中找到整个任务的如下日志,RabitTracker$TrackerProcessLogger为我们汇总了模型训练的loss,并且在driver的日志中也可以看到RabitTracker和Rabit(xgboost on spark中用来通信的框架)都是正常运行的,且返回码都是0。

image.png

通过如下的日志我们也可以看到JindoHadoopSystem: Initialized native file system以及本次的相关字样,可以看出任务也是正常将模型文件通过Jindo sdk写入到了我们指定的oss路径下

image.png

oss控制台:

image.png

4.7. 重载的同一个对象的save方法怎么就不一样了

通过全局搜索,找到了客户侧代码中调用的方法,通过文字描述我们可以得知,这样写的目的是可以将使用xgboost4j-spark和大规模数据集训练出来的模型放到单机或者单节点上进行后续的业务推理,1.7.0版本之前需要调用nativeBooster.saveModel的方法来实现,但是1.7.0版本之后已经可以和其他spark对象一样通过<object>.write.save的方式来保存了。

image.png

那为什么1.7.0版本之后的写法就能成功的写到oss去呢?代码是如何实现的呢?

书接「4.5.3」,分析下调用栈:

ml.dmlc.xgboost4j.scala.spark.XGBoostClassificationModel#write 方法中new了↓

- new XGBoostClassificationModel.XGBoostClassificationModelWriter(this) 一个Writer对象

- .overwrite() 将writer对象的shouldOverwrite属性改为true

- .save(modelOssDir) 保存到指定的oss路径

-- new FileSystemOverwrite().handleOverwrite(path, shouldOverwrite, sparkSession) 通过sparksession获取相关的hadoop conf,并实例化filesystem对象,获取到写出路径的绝对路径后,通过shouldOverwrite参数做判断,如果已经存在则做删除

image.png

-- saveImpl(path)

--- ml.dmlc.xgboost4j.scala.spark.XGBoostClassificationModel.XGBoostClassificationModelWriter#saveImpl 这个是重写了MLWriter的saveImpl方法,包含模型的元数据以及模型数据(具体若干个弱分类器booster的参数)两部分

image.png

---- DefaultXGBoostParamsWriter.saveMetadata(instance, path, sc) 保存元数据

----- org.apache.spark.ml.util.DefaultXGBoostParamsWriter#saveMetadata 这里实际上就拿了一下xgboost的版本号,继续往下调用了

------ org.apache.spark.ml.util.DefaultParamsWriter#saveMetadata 这里拿到上面传的xgboost的版本号,创建了个metadata的子目录,通过rdd的saveAsTextFile方法写入到fs,这里是spark oss 的 fs impl,通过jindo sdk写入,我们在driver上也能看到这个路径的的相关operation日志

image.png

---- instance._booster.saveModel(outputStream, getModelFormat()) 模型数据的保存方面,类似的逻辑里创建了一个data的目录。通过spark session获得了hadoop的conf,根据data的目录创建了一个FSDataOutputStream对象作为saveModel的入参。再来看到_booster成员变量

image.png

熟不熟悉....这和客户报错的代码引用的是同一个对象....那这次咋没报错?

通过ml.dmlc.xgboost4j.scala.Booster代码我们可以看到saveModel存在方法的重载,有三个方法接收不同的入参来做相关的实现,客户的代码走了def saveModel(modelPath: String): Unit,直接调了JNI导致dmlc/io.cc中没有oss的实现而报错。但是这次是提前通过spark session拿到了oss的stream,在saveModel中直接往stream里write所以成功的做了oss的写入。

image.png

image.png

5. 代码实现

5.1. scala

package com.aliyun.odps.spark.examples.mllib
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{DoubleType, StringType, StructField, StructType}
import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.ml.feature.VectorAssembler
import ml.dmlc.xgboost4j.scala.spark.XGBoostClassifier
object XGBoostModelSavaToOssOnODPS {
  val modelOssDir: String = "oss://bigdata-hangzhou-oss-all/sunyf/odps/odps_spark_mllib/xgboost_model/onodps/"
  def main(args: Array[String]) {
    // 创建spark session
    val spark = SparkSession
    .builder()
    // oss fs相关实现类配置
    .config("spark.hadoop.fs.AbstractFileSystem.oss.impl", "com.aliyun.emr.fs.oss.OSS")
    .config("spark.hadoop.fs.oss.impl", "com.aliyun.emr.fs.oss.JindoOssFileSystem")
    .config("spark.hadoop.fs.oss.endpoint", "oss-cn-hangzhou-internal.aliyuncs.com")
    .config("spark.hadoop.fs.oss.accessKeyId", "xx")
    .config("spark.hadoop.fs.oss.accessKeySecret", "xx")
    .config("spark.sql.defaultCatalog", "odps")
    .config("spark.sql.catalog.odps", "org.apache.spark.sql.execution.datasources.v2.odps.OdpsTableCatalog")
    .config("spark.sql.sources.partitionOverwriteMode", "dynamic")
    .config("spark.sql.extensions", "org.apache.spark.sql.execution.datasources.v2.odps.extension.OdpsExtensions")
    .config("spark.sql.catalogImplementation", "hive")
    .appName("XGBoostModelSavaToOssOnODPS")
    .getOrCreate()
    // 从oss fs读取需要配置schema字段及类型 
    // val schema = new StructType(Array(
    //   StructField("sepal_length", DoubleType, true),
    //   StructField("sepal_width", DoubleType, true),
    //   StructField("petal_length", DoubleType, true),
    //   StructField("petal_width", DoubleType, true),
    //   StructField("class", StringType, true)))
    // val dataFrame = spark.read.schema(schema).csv("oss://lfh-oss-hdfs.cn-hangzhou.oss-dls.aliyuncs.com/sunyf/iris.data")
    // 从odps SQL 读取iris表
    val dataFrame = spark.sql("select * from aes_demo_hangzhou_dev.sunyf_iris_for_spark")
    dataFrame.show(5)
    println("sunyf df schema:")
    dataFrame.printSchema()
    // 将class从string转换为数字准备入模
    val stringIndexer = new StringIndexer().
    setInputCol("class").
    setOutputCol("classIndex").
    fit(dataFrame)
    val labelTransformed = stringIndexer.transform(dataFrame).drop("class")
    labelTransformed.show(5)
    // 训练样本向量化
    val vectorAssembler = new VectorAssembler().
    setInputCols(Array("sepal_length", "sepal_width", "petal_length", "petal_width")).
    setOutputCol("features")
    val xgbInput = vectorAssembler.transform(labelTransformed).select("features", "classIndex")
    xgbInput.show(5)
    // xgboost模型训练参数
    val xgbParam = Map("eta" -> 0.1f,
                       "missing" -> -999,
                       "objective" -> "multi:softprob",
                       "num_class" -> 3,
                       "num_round" -> 100,
                       "num_workers" -> 2)
    val xgbClassifier = new XGBoostClassifier(xgbParam).
    setFeaturesCol("features").
    setLabelCol("classIndex")
    // 模型训练
    val xgbClassificationModel = xgbClassifier.fit(xgbInput)
    val params = xgbClassifier.params
    println("xgb model trained params:" + params)
    // 客户走的是这个方法
    // 20241027 复现客户问题 下面这个方法
    // xgbClassificationModel.nativeBooster.saveModel(XGBoostModelSavaToOssOnEMR.modelOssDir)
    println("如下是spark doc上example上的写法,save方法和下面的方法本质上都是调用了spark.ml.util.readwrite.scala")
    xgbClassificationModel.write.overwrite().save(XGBoostModelSavaToOssOnODPS.modelOssDir)
    // 后续的模型推理
    // val results = xgbClassificationModel.transform(testSet)
    // ......
  }
}

5.2. pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<!--
  Licensed under the Apache License, Version 2.0 (the "License");
  you may not use this file except in compliance with the License.
  You may obtain a copy of the License at
    http://www.apache.org/licenses/LICENSE-2.0
  Unless required by applicable law or agreed to in writing, software
  distributed under the License is distributed on an "AS IS" BASIS,
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  See the License for the specific language governing permissions and
  limitations under the License. See accompanying LICENSE file.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <properties>
        <spark.version>3.1.1</spark.version>
        <cupid.sdk.version>3.3.8-public</cupid.sdk.version>
        <scala.version>2.12.10</scala.version>
        <scala.binary.version>2.12</scala.binary.version>
        <dmlc.xgboost.jvm-packages.version>2.0.3</dmlc.xgboost.jvm-packages.version>
    </properties>
    <groupId>com.aliyun.odps</groupId>
    <artifactId>spark-examples_${scala.binary.version}</artifactId>
    <version>1.0.0-SNAPSHOT</version>
    <packaging>jar</packaging>
    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-mllib_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>com.aliyun.odps</groupId>
            <artifactId>cupid-sdk</artifactId>
            <version>${cupid.sdk.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>com.aliyun.odps</groupId>
            <artifactId>hadoop-fs-oss</artifactId>
            <version>${cupid.sdk.version}</version>
        </dependency>
        <dependency>
            <groupId>ml.dmlc</groupId>
            <artifactId>xgboost4j_${scala.binary.version}</artifactId>
            <version>${dmlc.xgboost.jvm-packages.version}</version>
        </dependency>
        <dependency>
            <groupId>ml.dmlc</groupId>
            <artifactId>xgboost4j-spark_${scala.binary.version}</artifactId>
            <version>${dmlc.xgboost.jvm-packages.version}</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <relocations>
                                <relocation>
                                    <pattern>com.esotericsoftware</pattern>
                                    <shadedPattern>com.sunyf.esotericsoftware</shadedPattern>
                                </relocation>
                            </relocations>
                            <minimizeJar>false</minimizeJar>
                            <shadedArtifactAttached>true</shadedArtifactAttached>
                            <artifactSet>
                                <includes>
                                    <!-- Include here the dependencies you
                                        want to be packed in your fat jar -->
                                    <include>*:*</include>
                                </includes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                        <exclude>**/log4j.properties</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                    <resource>reference.conf</resource>
                                </transformer>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                    <resource>
                                        META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
                                    </resource>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.3.2</version>
                <executions>
                    <execution>
                        <id>scala-compile-first</id>
                        <phase>process-resources</phase>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                    <execution>
                        <id>scala-test-compile-first</id>
                        <phase>process-test-resources</phase>
                        <goals>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

5.3. maven

maven package -DskipTests

5.4. spark-submit

spark-submit \
--class com.aliyun.odps.spark.examples.mllib.XGBoostModelSavaToOssOnODSP \ 
--deploy-mode cluster \ 
--master yarn \ 
--executor-memory 5g \ 
--conf spark.executor.instances=2 \ 
./spark-examples_2.12-1.0.0-SNAPSHOT-shaded.jar

6. 相关参考

6.1. xgboost4j版本踩坑与选取

com.esotericsoftware相关的类冲突,是因为spark runtime版本与xgboost4j构建时基于的版本不同导致的,通过maven relocation可以解决但是还是建议参考:

https://github.com/dmlc/xgboost/blob/release_1.0.0/jvm-packages/pom.xml

选择不同release的版本,查看pom.xml文件中的<spark.version>来参考spark版本,尽量做到大版本统一,避免未知的错误

6.2. xgboost4j相关低版本bug参考

training failed at XGBoost$.postTrackerReturnProcessing

https://github.com/dmlc/xgboost/issues/2780

java.lang.NoClassDefFoundError: Could not initialize class ml.dmlc.xgboost4j.java.XGBoostJNI

https://github.com/dmlc/xgboost/issues/2578

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
相关文章
|
2月前
|
机器学习/深度学习 算法 搜索推荐
从理论到实践,Python算法复杂度分析一站式教程,助你轻松驾驭大数据挑战!
【10月更文挑战第4天】在大数据时代,算法效率至关重要。本文从理论入手,介绍时间复杂度和空间复杂度两个核心概念,并通过冒泡排序和快速排序的Python实现详细分析其复杂度。冒泡排序的时间复杂度为O(n^2),空间复杂度为O(1);快速排序平均时间复杂度为O(n log n),空间复杂度为O(log n)。文章还介绍了算法选择、分而治之及空间换时间等优化策略,帮助你在大数据挑战中游刃有余。
63 4
|
2月前
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
66 0
|
28天前
|
人工智能 Cloud Native 数据管理
媒体声音|重磅升级,阿里云发布首个“Data+AI”驱动的一站式多模数据平台
在2024云栖大会上,阿里云瑶池数据库发布了首个一站式多模数据管理平台DMS:OneMeta+OneOps。该平台由Data+AI驱动,兼容40余种数据源,实现跨云数据库、数据仓库、数据湖的统一数据治理,帮助用户高效提取和分析元数据,提升业务决策效率10倍。DMS已服务超10万企业客户,降低数据管理成本高达90%。
113 19
|
24天前
|
缓存 算法 大数据
大数据查询优化算法
【10月更文挑战第26天】
50 1
|
1月前
|
机器学习/深度学习 数据采集 算法
大数据中缺失值处理使用算法处理
【10月更文挑战第21天】
40 3
|
2月前
|
缓存 NoSQL Java
大数据-50 Redis 分布式锁 乐观锁 Watch SETNX Lua Redisson分布式锁 Java实现分布式锁
大数据-50 Redis 分布式锁 乐观锁 Watch SETNX Lua Redisson分布式锁 Java实现分布式锁
64 3
大数据-50 Redis 分布式锁 乐观锁 Watch SETNX Lua Redisson分布式锁 Java实现分布式锁
|
2月前
|
存储 缓存 分布式计算
大数据-89 Spark 集群 RDD 编程-高阶 编写代码、RDD依赖关系、RDD持久化/缓存
大数据-89 Spark 集群 RDD 编程-高阶 编写代码、RDD依赖关系、RDD持久化/缓存
44 4
|
1月前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
86 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
2月前
|
消息中间件 分布式计算 NoSQL
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
42 0
|
2月前
|
消息中间件 存储 分布式计算
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
93 0

相关产品

  • 云原生大数据计算服务 MaxCompute