如何使用MaxCompute Spark读写阿里云Hbase

本文涉及的产品
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 通过Spark on MaxCompute来访问阿里云的Hbase,需要设置网络安全组、Hbase的白名单和配置Spark的参数

背景

Spark on MaxCompute可以访问位于阿里云VPC内的实例(例如ECS、HBase、RDS),默认MaxCompute底层网络和外网是隔离的,Spark on MaxCompute提供了一种方案通过配置spark.hadoop.odps.cupid.vpc.domain.list来访问阿里云的vpc网络环境的Hbase。Hbase标准版和增强版的配置不同,本文通过访问阿里云的标准版和增强版的Hbase简单的描述需要加的配置。

Hbase标准版

环境准备
Hbase的网络环境是存在vpc下的,所以我们首先要添加安全组开放端口2181、10600、16020.同时Hbase有白名单限制我们需要把对应的MaxCompute的IP加入到Hbase的白名单。
设置对应vpc的安全组
image.png
找到对应的vpc id然后添加安全组设置端口
image.png


添加Hbase的白名单


在hbase的白名单添加

100.104.0.0/16

创建Hbase表

create 'test','cf'

编写Spark程序
需要的Hbase依赖

 <dependency>
      <groupId>org.apache.hbase</groupId>
      <artifactId>hbase-mapreduce</artifactId>
      <version>2.0.2</version>
    </dependency>
     <dependency>
      <groupId>com.aliyun.hbase</groupId>
      <artifactId>alihbase-client</artifactId>
      <version>2.0.5</version>
    </dependency>

编写代码

object App {
  def main(args: Array[String]) {
    val spark = SparkSession
      .builder()
      .appName("HbaseTest")
      .config("spark.sql.catalogImplementation", "odps")
      .config("spark.hadoop.odps.end.point","http://service.cn.maxcompute.aliyun.com/api")
      .config("spark.hadoop.odps.runtime.end.point","http://service.cn.maxcompute.aliyun-inc.com/api")
      .getOrCreate()

    val sc = spark.sparkContext
    val config = HBaseConfiguration.create()
    val zkAddress = "hb-2zecxg2ltnpeg8me4-master*-***:2181,hb-2zecxg2ltnpeg8me4-master*-***:2181,hb-2zecxg2ltnpeg8me4-master*-***:2181"
    config.set(HConstants.ZOOKEEPER_QUORUM, zkAddress);
    val jobConf = new JobConf(config)
    jobConf.setOutputFormat(classOf[TableOutputFormat])
    jobConf.set(TableOutputFormat.OUTPUT_TABLE,"test")


    try{

      import spark._
      spark.sql("select '7', 88 ").rdd.map(row => {
        val name= row(0).asInstanceOf[String]
        val id = row(1).asInstanceOf[Integer]
        val put = new Put(Bytes.toBytes(id))
        put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes(id), Bytes.toBytes(name))
        (new ImmutableBytesWritable, put)
      }).saveAsHadoopDataset(jobConf)
    } finally {
      sc.stop()
    }
  }
}

提交到DataWorks
由于大于50m通过odps客户端提交

add jar SparkHbase-1.0-SNAPSHOT -f; 

进入数据开发新建spark节点
image.png


添加配置
需要配置spark.hadoop.odps.cupid.vpc.domain.list
这里的hbase域名需要hbase所有的机器,少一台可能会造成网络不通

{
  "regionId":"cn-beijing",
  "vpcs":[
    {
      "vpcId":"vpc-2zeaeq21mb1dmkqh0exox",
      "zones":[
        {
          "urls":[
            {
              "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
              "port":2181
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
              "port":16000
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
              "port":16020
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
              "port":2181
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
              "port":16000
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
              "port":16020
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
              "port":2181
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
              "port":16000
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
              "port":16020
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-cor*-***.hbase.rds.aliyuncs.com",
              "port":16020
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-cor*-***.hbase.rds.aliyuncs.com",
              "port":16020
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-cor*-***.hbase.rds.aliyuncs.com",
              "port":16020
            }
          ]
        }
      ]
    }
  ]
}

image.png

Hbase增强版

环境准备
Hbase增强版的端口是30020、10600、16020.同时Hbase有白名单限制我们需要把对应的MaxCompute的IP加入到Hbase的白名单。
设置对应vpc的安全组
找到对应的vpc id然后添加安全组设置端口
image.png
添加Hbase的白名单

100.104.0.0/16

创建Hbase表 

create 'test','cf'

编写Spark程序
需要的Hbase依赖,引用的包必须是阿里云增强版的依赖

   <dependency>
      <groupId>com.aliyun.hbase</groupId>
      <artifactId>alihbase-client</artifactId>
      <version>2.0.8</version>
    </dependency>

编写代码

object McToHbase {
  def main(args: Array[String]) {
    val spark = SparkSession
      .builder()
      .appName("spark_sql_ddl")
      .config("spark.sql.catalogImplementation", "odps")
      .config("spark.hadoop.odps.end.point","http://service.cn.maxcompute.aliyun.com/api")
      .config("spark.hadoop.odps.runtime.end.point","http://service.cn.maxcompute.aliyun-inc.com/api")
      .getOrCreate()

      val sc = spark.sparkContext


    try{
      spark.sql("select '7', 'long'").rdd.foreachPartition { iter =>
        val config = HBaseConfiguration.create()
        // 集群的连接地址(VPC内网地址)在控制台页面的数据库连接界面获得
        config.set("hbase.zookeeper.quorum", ":30020");
        import spark._
        // xml_template.comment.hbaseue.username_password.default
        config.set("hbase.client.username", "");
        config.set("hbase.client.password", "");
        val tableName = TableName.valueOf( "test")
        val conn = ConnectionFactory.createConnection(config)
        val table = conn.getTable(tableName);
        val puts = new util.ArrayList[Put]()
        iter.foreach(
          row => {
            val id = row(0).asInstanceOf[String]
            val name = row(1).asInstanceOf[String]
            val put = new Put(Bytes.toBytes(id))
            put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes(id), Bytes.toBytes(name))
            puts.add(put)
            table.put(puts)
          }
        )
      }
  } finally {
    sc.stop()
  }



  }
}

注意
hbase clinet会报org.apache.spark.SparkException: Task not serializable
原因是spark会把序列化对象以将其发送给其他的worker
解决方案

- 使类可序列化
- 仅在map中传递的lambda函数中声明实例。
- 将NotSerializable对象设置为静态对象,并在每台计算机上创建一次。
- 调用rdd.forEachPartition并在其中创建

Serializable对象,如下所示:

rdd.forEachPartition(iter-> {NotSerializable notSerializable = new NotSerializable();<br />// ...现在处理iter});


提交到DataWorks
由于大于50m通过odps客户端提交

add jar SparkHbase-1.0-SNAPSHOT -f; 

进入数据开发新建spark节点

image.png


添加配置
需要配置spark.hadoop.odps.cupid.vpc.domain.list
注意
1.这个里需要添加增强版java api访问地址,这里必须采用ip的形式。ip通过直接ping该地址获取,这里的ip是172.16.0.10添加端口16000

image.png
2.这里的hbase域名需要hbase所有的机器,少一台可能会造成网络不通

{
  "regionId":"cn-beijing",
  "vpcs":[
    {
      "vpcId":"vpc-2zeaeq21mb1dmkqh0exox",
      "zones":[
        {
          "urls":[
            {
              "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
              "port":30020
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
              "port":16000
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
              "port":16020
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
              "port":30020
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
              "port":16000
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
              "port":16020
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
              "port":30020
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
              "port":16000
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
              "port":16020
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-cor*-***.hbase.rds.aliyuncs.com",
              "port":16020
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-cor*-***.hbase.rds.aliyuncs.com",
              "port":16020
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-cor*-***.hbase.rds.aliyuncs.com",
              "port":16020
            },
             {"domain":"172.16.0.10","port":16000}
          ]
        }
      ]
    }
  ]
}

image.png

大家如果对MaxCompute有更多咨询或者建议,欢迎扫码加入 MaxCompute开发者社区钉钉群,或点击链接 申请加入。

image.png

相关实践学习
基于MaxCompute的热门话题分析
Apsara Clouder大数据专项技能认证配套课程:基于MaxCompute的热门话题分析
目录
相关文章
|
4月前
|
分布式计算 DataWorks 大数据
阿里云ODPS的个人收获思考
在接触阿里云ODPS过程中,我深入了解了MaxCompute和DataWorks等产品。MaxCompute强大的数据处理能力显著提升了我的工作效率,而DataWorks的一站式开发与治理平台简化了数据流程管理。通过实践,我不仅掌握了高效的SQL编写与数据挖掘技巧,还提升了团队协作意识与大数据思维,为未来挑战打下了坚实基础。
107 3
|
4月前
|
存储 分布式计算 大数据
【赵渝强老师】阿里云大数据存储计算服务:MaxCompute
阿里云MaxCompute是快速、全托管的TB/PB级数据仓库解决方案,提供海量数据存储与计算服务。支持多种计算模型,适用于大规模离线数据分析,具备高安全性、低成本、易用性强等特点,助力企业高效处理大数据。
241 0
|
4月前
|
数据采集 人工智能 大数据
10倍处理效率提升!阿里云大数据AI平台发布智能驾驶数据预处理解决方案
阿里云大数据AI平台推出智能驾驶数据预处理解决方案,助力车企构建高效稳定的数据处理流程。相比自建方案,数据包处理效率提升10倍以上,推理任务提速超1倍,产能翻番,显著提高自动驾驶模型产出效率。该方案已服务80%以上中国车企,支持多模态数据处理与百万级任务调度,全面赋能智驾技术落地。
498 0
|
5月前
|
人工智能 分布式计算 大数据
大数据≠大样本:基于Spark的特征降维实战(提升10倍训练效率)
本文探讨了大数据场景下降维的核心问题与解决方案,重点分析了“维度灾难”对模型性能的影响及特征冗余的陷阱。通过数学证明与实际案例,揭示高维空间中样本稀疏性问题,并提出基于Spark的分布式降维技术选型与优化策略。文章详细展示了PCA在亿级用户画像中的应用,包括数据准备、核心实现与效果评估,同时深入探讨了协方差矩阵计算与特征值分解的并行优化方法。此外,还介绍了动态维度调整、非线性特征处理及降维与其他AI技术的协同效应,为生产环境提供了最佳实践指南。最终总结出降维的本质与工程实践原则,展望未来发展方向。
310 0
|
7月前
|
数据采集 机器学习/深度学习 人工智能
面向 MoE 和推理模型时代:阿里云大数据 AI 产品升级发布
2025 AI 势能大会上,阿里云大数据 AI 平台持续创新,贴合 MoE 架构、Reasoning Model 、 Agentic RAG、MCP 等新趋势,带来计算范式变革。多款大数据及 AI 产品重磅升级,助力企业客户高效地构建 AI 模型并落地 AI 应用。
|
2月前
|
人工智能 分布式计算 DataWorks
阿里云大数据AI产品月刊-2025年8月
阿里云大数据& AI 产品技术月刊【2025年 8 月】,涵盖 8 月技术速递、产品和功能发布、市场和客户应用实践等内容,帮助您快速了解阿里云大数据& AI 方面最新动态。
301 1
|
2月前
|
存储 分布式计算 资源调度
【赵渝强老师】阿里云大数据MaxCompute的体系架构
阿里云MaxCompute是快速、全托管的EB级数据仓库解决方案,适用于离线计算场景。它由计算与存储层、逻辑层、接入层和客户端四部分组成,支持多种计算任务的统一调度与管理。
283 1
|
4月前
|
人工智能 分布式计算 DataWorks
多模态数据处理新趋势:阿里云ODPS技术栈深度解析与未来展望
阿里云ODPS技术栈通过MaxCompute、Object Table与MaxFrame等核心组件,实现了多模态数据的高效处理与智能分析。该架构支持结构化与非结构化数据的统一管理,并深度融合AI能力,显著降低了分布式计算门槛,推动企业数字化转型。未来,其在智慧城市、数字医疗、智能制造等领域具有广泛应用前景。
465 6
多模态数据处理新趋势:阿里云ODPS技术栈深度解析与未来展望

热门文章

最新文章

相关产品

  • 云原生大数据计算服务 MaxCompute