如何使用MaxCompute Spark读写阿里云Hbase

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 通过Spark on MaxCompute来访问阿里云的Hbase,需要设置网络安全组、Hbase的白名单和配置Spark的参数

背景

Spark on MaxCompute可以访问位于阿里云VPC内的实例(例如ECS、HBase、RDS),默认MaxCompute底层网络和外网是隔离的,Spark on MaxCompute提供了一种方案通过配置spark.hadoop.odps.cupid.vpc.domain.list来访问阿里云的vpc网络环境的Hbase。Hbase标准版和增强版的配置不同,本文通过访问阿里云的标准版和增强版的Hbase简单的描述需要加的配置。

Hbase标准版

环境准备
Hbase的网络环境是存在vpc下的,所以我们首先要添加安全组开放端口2181、10600、16020.同时Hbase有白名单限制我们需要把对应的MaxCompute的IP加入到Hbase的白名单。
设置对应vpc的安全组
image.png
找到对应的vpc id然后添加安全组设置端口
image.png


添加Hbase的白名单


在hbase的白名单添加

100.104.0.0/16

创建Hbase表

create 'test','cf'

编写Spark程序
需要的Hbase依赖

 <dependency>
      <groupId>org.apache.hbase</groupId>
      <artifactId>hbase-mapreduce</artifactId>
      <version>2.0.2</version>
    </dependency>
     <dependency>
      <groupId>com.aliyun.hbase</groupId>
      <artifactId>alihbase-client</artifactId>
      <version>2.0.5</version>
    </dependency>

编写代码

object App {
  def main(args: Array[String]) {
    val spark = SparkSession
      .builder()
      .appName("HbaseTest")
      .config("spark.sql.catalogImplementation", "odps")
      .config("spark.hadoop.odps.end.point","http://service.cn.maxcompute.aliyun.com/api")
      .config("spark.hadoop.odps.runtime.end.point","http://service.cn.maxcompute.aliyun-inc.com/api")
      .getOrCreate()

    val sc = spark.sparkContext
    val config = HBaseConfiguration.create()
    val zkAddress = "hb-2zecxg2ltnpeg8me4-master*-***:2181,hb-2zecxg2ltnpeg8me4-master*-***:2181,hb-2zecxg2ltnpeg8me4-master*-***:2181"
    config.set(HConstants.ZOOKEEPER_QUORUM, zkAddress);
    val jobConf = new JobConf(config)
    jobConf.setOutputFormat(classOf[TableOutputFormat])
    jobConf.set(TableOutputFormat.OUTPUT_TABLE,"test")


    try{

      import spark._
      spark.sql("select '7', 88 ").rdd.map(row => {
        val name= row(0).asInstanceOf[String]
        val id = row(1).asInstanceOf[Integer]
        val put = new Put(Bytes.toBytes(id))
        put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes(id), Bytes.toBytes(name))
        (new ImmutableBytesWritable, put)
      }).saveAsHadoopDataset(jobConf)
    } finally {
      sc.stop()
    }
  }
}

提交到DataWorks
由于大于50m通过odps客户端提交

add jar SparkHbase-1.0-SNAPSHOT -f; 

进入数据开发新建spark节点
image.png


添加配置
需要配置spark.hadoop.odps.cupid.vpc.domain.list
这里的hbase域名需要hbase所有的机器,少一台可能会造成网络不通

{
  "regionId":"cn-beijing",
  "vpcs":[
    {
      "vpcId":"vpc-2zeaeq21mb1dmkqh0exox",
      "zones":[
        {
          "urls":[
            {
              "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
              "port":2181
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
              "port":16000
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
              "port":16020
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
              "port":2181
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
              "port":16000
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
              "port":16020
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
              "port":2181
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
              "port":16000
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
              "port":16020
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-cor*-***.hbase.rds.aliyuncs.com",
              "port":16020
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-cor*-***.hbase.rds.aliyuncs.com",
              "port":16020
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-cor*-***.hbase.rds.aliyuncs.com",
              "port":16020
            }
          ]
        }
      ]
    }
  ]
}

image.png

Hbase增强版

环境准备
Hbase增强版的端口是30020、10600、16020.同时Hbase有白名单限制我们需要把对应的MaxCompute的IP加入到Hbase的白名单。
设置对应vpc的安全组
找到对应的vpc id然后添加安全组设置端口
image.png
添加Hbase的白名单

100.104.0.0/16

创建Hbase表 

create 'test','cf'

编写Spark程序
需要的Hbase依赖,引用的包必须是阿里云增强版的依赖

   <dependency>
      <groupId>com.aliyun.hbase</groupId>
      <artifactId>alihbase-client</artifactId>
      <version>2.0.8</version>
    </dependency>

编写代码

object McToHbase {
  def main(args: Array[String]) {
    val spark = SparkSession
      .builder()
      .appName("spark_sql_ddl")
      .config("spark.sql.catalogImplementation", "odps")
      .config("spark.hadoop.odps.end.point","http://service.cn.maxcompute.aliyun.com/api")
      .config("spark.hadoop.odps.runtime.end.point","http://service.cn.maxcompute.aliyun-inc.com/api")
      .getOrCreate()

      val sc = spark.sparkContext


    try{
      spark.sql("select '7', 'long'").rdd.foreachPartition { iter =>
        val config = HBaseConfiguration.create()
        // 集群的连接地址(VPC内网地址)在控制台页面的数据库连接界面获得
        config.set("hbase.zookeeper.quorum", ":30020");
        import spark._
        // xml_template.comment.hbaseue.username_password.default
        config.set("hbase.client.username", "");
        config.set("hbase.client.password", "");
        val tableName = TableName.valueOf( "test")
        val conn = ConnectionFactory.createConnection(config)
        val table = conn.getTable(tableName);
        val puts = new util.ArrayList[Put]()
        iter.foreach(
          row => {
            val id = row(0).asInstanceOf[String]
            val name = row(1).asInstanceOf[String]
            val put = new Put(Bytes.toBytes(id))
            put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes(id), Bytes.toBytes(name))
            puts.add(put)
            table.put(puts)
          }
        )
      }
  } finally {
    sc.stop()
  }



  }
}

注意
hbase clinet会报org.apache.spark.SparkException: Task not serializable
原因是spark会把序列化对象以将其发送给其他的worker
解决方案

- 使类可序列化
- 仅在map中传递的lambda函数中声明实例。
- 将NotSerializable对象设置为静态对象,并在每台计算机上创建一次。
- 调用rdd.forEachPartition并在其中创建

Serializable对象,如下所示:

rdd.forEachPartition(iter-> {NotSerializable notSerializable = new NotSerializable();<br />// ...现在处理iter});


提交到DataWorks
由于大于50m通过odps客户端提交

add jar SparkHbase-1.0-SNAPSHOT -f; 

进入数据开发新建spark节点

image.png


添加配置
需要配置spark.hadoop.odps.cupid.vpc.domain.list
注意
1.这个里需要添加增强版java api访问地址,这里必须采用ip的形式。ip通过直接ping该地址获取,这里的ip是172.16.0.10添加端口16000

image.png
2.这里的hbase域名需要hbase所有的机器,少一台可能会造成网络不通

{
  "regionId":"cn-beijing",
  "vpcs":[
    {
      "vpcId":"vpc-2zeaeq21mb1dmkqh0exox",
      "zones":[
        {
          "urls":[
            {
              "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
              "port":30020
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
              "port":16000
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
              "port":16020
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
              "port":30020
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
              "port":16000
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
              "port":16020
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
              "port":30020
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
              "port":16000
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
              "port":16020
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-cor*-***.hbase.rds.aliyuncs.com",
              "port":16020
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-cor*-***.hbase.rds.aliyuncs.com",
              "port":16020
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-cor*-***.hbase.rds.aliyuncs.com",
              "port":16020
            },
             {"domain":"172.16.0.10","port":16000}
          ]
        }
      ]
    }
  ]
}

image.png

大家如果对MaxCompute有更多咨询或者建议,欢迎扫码加入 MaxCompute开发者社区钉钉群,或点击链接 申请加入。

image.png

相关实践学习
基于MaxCompute的热门话题分析
本实验围绕社交用户发布的文章做了详尽的分析,通过分析能得到用户群体年龄分布,性别分布,地理位置分布,以及热门话题的热度。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
目录
相关文章
|
19天前
|
SQL 分布式计算 数据挖掘
阿里云 MaxCompute MaxQA 开启公测,公测可申请 100CU 计算资源解锁近实时高效查询体验
阿里云云原生大数据计算服务 MaxCompute 推出 MaxQA(原 MCQA2.0)查询加速功能,在独享的查询加速资源池的基础上,对管控链路、查询优化器、执行引擎、存储引擎以及缓存机制等多个环节进行全面优化,显著减少了查询响应时间,适用于 BI 场景、交互式分析以及近实时数仓等对延迟要求高且稳定的场景。现正式开启公测,公测期间可申请100CU(价值15000元)计算资源用于测试,欢迎广大开发者及企业用户参与,解锁高效查询体验!
阿里云 MaxCompute MaxQA 开启公测,公测可申请 100CU 计算资源解锁近实时高效查询体验
|
10天前
|
人工智能 大数据
阿里云云计算ACA、大数据ACA、人工智能ACA三门认证升级调整公告
阿里云云计算ACA、大数据ACA、人工智能ACA三门认证升级调整公告
|
11天前
|
存储 分布式计算 大数据
基于阿里云大数据平台的实时数据湖构建与数据分析实战
在大数据时代,数据湖作为集中存储和处理海量数据的架构,成为企业数据管理的核心。阿里云提供包括MaxCompute、DataWorks、E-MapReduce等在内的完整大数据平台,支持从数据采集、存储、处理到分析的全流程。本文通过电商平台案例,展示如何基于阿里云构建实时数据湖,实现数据价值挖掘。平台优势包括全托管服务、高扩展性、丰富的生态集成和强大的数据分析工具。
|
1月前
|
机器学习/深度学习 分布式计算 大数据
阿里云 EMR Serverless Spark 在微财机器学习场景下的应用
面对机器学习场景下的训练瓶颈,微财选择基于阿里云 EMR Serverless Spark 建立数据平台。通过 EMR Serverless Spark,微财突破了单机训练使用的数据规模瓶颈,大幅提升了训练效率,解决了存算分离架构下 Shuffle 稳定性和性能困扰,为智能风控等业务提供了强有力的技术支撑。
154 15
|
20天前
|
存储 分布式计算 物联网
美的楼宇科技基于阿里云 EMR Serverless Spark 构建 LakeHouse 湖仓数据平台
美的楼宇科技基于阿里云 EMR Serverless Spark 建设 IoT 数据平台,实现了数据与 AI 技术的有效融合,解决了美的楼宇科技设备数据量庞大且持续增长、数据半结构化、数据价值缺乏深度挖掘的痛点问题。并结合 EMR Serverless StarRocks 搭建了 Lakehouse 平台,最终实现不同场景下整体性能提升50%以上,同时综合成本下降30%。
|
2月前
|
机器学习/深度学习 人工智能 分布式计算
我的阿里云社区年度总结报告:Python、人工智能与大数据领域的探索之旅
我的阿里云社区年度总结报告:Python、人工智能与大数据领域的探索之旅
136 35
|
7天前
|
存储 分布式计算 运维
课时6:阿里云MaxCompute:轻松玩转大数据
阿里云MaxCompute是全新的大数据计算服务,提供快速、完全托管的PB级数据仓库解决方案。它拥有高效的压缩存储技术、强大的计算能力和丰富的用户接口,支持SQL查询、机器学习等高级分析。MaxCompute兼容多种计算模型,开箱即用,具备金融级安全性和灵活的数据授权功能,帮助企业节省成本并提升效率。
|
11天前
|
存储 人工智能 数据管理
媒体声音|专访阿里云数据库周文超博士:AI就绪的智能数据平台设计思路
媒体声音|专访阿里云数据库周文超博士:AI就绪的智能数据平台设计思路
|
11天前
|
SQL 分布式计算 Serverless
基于阿里云 EMR Serverless Spark 版快速搭建OSS日志分析应用
基于阿里云 EMR Serverless Spark 版快速搭建OSS日志分析应用
|
11天前
|
SQL 人工智能 大数据
【4月重点功能发布】阿里云大数据+ AI 一体化平台
【4月重点功能发布】阿里云大数据+ AI 一体化平台

相关产品

  • 云原生大数据计算服务 MaxCompute