如何使用MaxCompute Spark读写阿里云Hbase-阿里云开发者社区

开发者社区> 龙青云> 正文

如何使用MaxCompute Spark读写阿里云Hbase

简介: 通过Spark on MaxCompute来访问阿里云的Hbase,需要设置网络安全组、Hbase的白名单和配置Spark的参数
+关注继续查看

背景

Spark on MaxCompute可以访问位于阿里云VPC内的实例(例如ECS、HBase、RDS),默认MaxCompute底层网络和外网是隔离的,Spark on MaxCompute提供了一种方案通过配置spark.hadoop.odps.cupid.vpc.domain.list来访问阿里云的vpc网络环境的Hbase。Hbase标准版和增强版的配置不同,本文通过访问阿里云的标准版和增强版的Hbase简单的描述需要加的配置。

Hbase标准版

环境准备
Hbase的网络环境是存在vpc下的,所以我们首先要添加安全组开放端口2181、10600、16020.同时Hbase有白名单限制我们需要把对应的MaxCompute的IP加入到Hbase的白名单。
设置对应vpc的安全组
image.png
找到对应的vpc id然后添加安全组设置端口
image.png


添加Hbase的白名单


在hbase的白名单添加

100.104.0.0/16

创建Hbase表

create 'test','cf'

编写Spark程序
需要的Hbase依赖

 <dependency>
      <groupId>org.apache.hbase</groupId>
      <artifactId>hbase-mapreduce</artifactId>
      <version>2.0.2</version>
    </dependency>
     <dependency>
      <groupId>com.aliyun.hbase</groupId>
      <artifactId>alihbase-client</artifactId>
      <version>2.0.5</version>
    </dependency>

编写代码

object App {
  def main(args: Array[String]) {
    val spark = SparkSession
      .builder()
      .appName("HbaseTest")
      .config("spark.sql.catalogImplementation", "odps")
      .config("spark.hadoop.odps.end.point","http://service.cn.maxcompute.aliyun.com/api")
      .config("spark.hadoop.odps.runtime.end.point","http://service.cn.maxcompute.aliyun-inc.com/api")
      .getOrCreate()

    val sc = spark.sparkContext
    val config = HBaseConfiguration.create()
    val zkAddress = "hb-2zecxg2ltnpeg8me4-master*-***:2181,hb-2zecxg2ltnpeg8me4-master*-***:2181,hb-2zecxg2ltnpeg8me4-master*-***:2181"
    config.set(HConstants.ZOOKEEPER_QUORUM, zkAddress);
    val jobConf = new JobConf(config)
    jobConf.setOutputFormat(classOf[TableOutputFormat])
    jobConf.set(TableOutputFormat.OUTPUT_TABLE,"test")


    try{

      import spark._
      spark.sql("select '7', 88 ").rdd.map(row => {
        val name= row(0).asInstanceOf[String]
        val id = row(1).asInstanceOf[Integer]
        val put = new Put(Bytes.toBytes(id))
        put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes(id), Bytes.toBytes(name))
        (new ImmutableBytesWritable, put)
      }).saveAsHadoopDataset(jobConf)
    } finally {
      sc.stop()
    }
  }
}

提交到DataWorks
由于大于50m通过odps客户端提交

add jar SparkHbase-1.0-SNAPSHOT -f; 

进入数据开发新建spark节点
image.png


添加配置
需要配置spark.hadoop.odps.cupid.vpc.domain.list
这里的hbase域名需要hbase所有的机器,少一台可能会造成网络不通

{
  "regionId":"cn-beijing",
  "vpcs":[
    {
      "vpcId":"vpc-2zeaeq21mb1dmkqh0exox",
      "zones":[
        {
          "urls":[
            {
              "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
              "port":2181
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
              "port":16000
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
              "port":16020
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
              "port":2181
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
              "port":16000
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
              "port":16020
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
              "port":2181
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
              "port":16000
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
              "port":16020
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-cor*-***.hbase.rds.aliyuncs.com",
              "port":16020
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-cor*-***.hbase.rds.aliyuncs.com",
              "port":16020
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-cor*-***.hbase.rds.aliyuncs.com",
              "port":16020
            }
          ]
        }
      ]
    }
  ]
}

image.png

Hbase增强版

环境准备
Hbase增强版的端口是30020、10600、16020.同时Hbase有白名单限制我们需要把对应的MaxCompute的IP加入到Hbase的白名单。
设置对应vpc的安全组
找到对应的vpc id然后添加安全组设置端口
image.png
添加Hbase的白名单

100.104.0.0/16

创建Hbase表 

create 'test','cf'

编写Spark程序
需要的Hbase依赖,引用的包必须是阿里云增强版的依赖

   <dependency>
      <groupId>com.aliyun.hbase</groupId>
      <artifactId>alihbase-client</artifactId>
      <version>2.0.8</version>
    </dependency>

编写代码

object McToHbase {
  def main(args: Array[String]) {
    val spark = SparkSession
      .builder()
      .appName("spark_sql_ddl")
      .config("spark.sql.catalogImplementation", "odps")
      .config("spark.hadoop.odps.end.point","http://service.cn.maxcompute.aliyun.com/api")
      .config("spark.hadoop.odps.runtime.end.point","http://service.cn.maxcompute.aliyun-inc.com/api")
      .getOrCreate()

      val sc = spark.sparkContext


    try{
      spark.sql("select '7', 'long'").rdd.foreachPartition { iter =>
        val config = HBaseConfiguration.create()
        // 集群的连接地址(VPC内网地址)在控制台页面的数据库连接界面获得
        config.set("hbase.zookeeper.quorum", ":30020");
        import spark._
        // xml_template.comment.hbaseue.username_password.default
        config.set("hbase.client.username", "");
        config.set("hbase.client.password", "");
        val tableName = TableName.valueOf( "test")
        val conn = ConnectionFactory.createConnection(config)
        val table = conn.getTable(tableName);
        val puts = new util.ArrayList[Put]()
        iter.foreach(
          row => {
            val id = row(0).asInstanceOf[String]
            val name = row(1).asInstanceOf[String]
            val put = new Put(Bytes.toBytes(id))
            put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes(id), Bytes.toBytes(name))
            puts.add(put)
            table.put(puts)
          }
        )
      }
  } finally {
    sc.stop()
  }



  }
}

注意
hbase clinet会报org.apache.spark.SparkException: Task not serializable
原因是spark会把序列化对象以将其发送给其他的worker
解决方案

- 使类可序列化
- 仅在map中传递的lambda函数中声明实例。
- 将NotSerializable对象设置为静态对象,并在每台计算机上创建一次。
- 调用rdd.forEachPartition并在其中创建

Serializable对象,如下所示:

rdd.forEachPartition(iter-> {NotSerializable notSerializable = new NotSerializable();<br />// ...现在处理iter});


提交到DataWorks
由于大于50m通过odps客户端提交

add jar SparkHbase-1.0-SNAPSHOT -f; 

进入数据开发新建spark节点

image.png


添加配置
需要配置spark.hadoop.odps.cupid.vpc.domain.list
注意
1.这个里需要添加增强版java api访问地址,这里必须采用ip的形式。ip通过直接ping该地址获取,这里的ip是172.16.0.10添加端口16000

image.png
2.这里的hbase域名需要hbase所有的机器,少一台可能会造成网络不通

{
  "regionId":"cn-beijing",
  "vpcs":[
    {
      "vpcId":"vpc-2zeaeq21mb1dmkqh0exox",
      "zones":[
        {
          "urls":[
            {
              "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
              "port":30020
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
              "port":16000
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
              "port":16020
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
              "port":30020
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
              "port":16000
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
              "port":16020
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
              "port":30020
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
              "port":16000
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-master*-***.hbase.rds.aliyuncs.com",
              "port":16020
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-cor*-***.hbase.rds.aliyuncs.com",
              "port":16020
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-cor*-***.hbase.rds.aliyuncs.com",
              "port":16020
            },
            {
              "domain":"hb-2zecxg2ltnpeg8me4-cor*-***.hbase.rds.aliyuncs.com",
              "port":16020
            },
             {"domain":"172.16.0.10","port":16000}
          ]
        }
      ]
    }
  ]
}

image.png

大家如果对MaxCompute有更多咨询或者建议,欢迎扫码加入 MaxCompute开发者社区钉钉群,或点击链接 申请加入。

image.png

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
[JDBC] Kettle on MaxCompute 使用指南
Kettle是一款开源的ETL工具,纯Java实现,可以在Windows、Unix和Linux上运行,提供图形化的操作界面,可以通过拖拽控件的方式,方便地定义数据传输的拓扑 。基本讲介绍基于Kettle的MaxCompute插件实现数据上云。
1446 0
学习笔记1 - 使用MaxCompute进行数据质量核查
大数据Clouder:使用MaxCompute进行数据质量核查 数据,数据质量,数据质量管理MaxCompute,DataIDE监控,监控报告 对数据的改善和管理,直接提升数据质量;对组织的改善和管理,间接提升数据质量。
2017 0
zt:Grub引导的Linux系统下使用MaxDos V6.0
http://www.chinalinuxpub.com/read.php?wid=2214 教你如何在Linux下使用MaxDos V6.01.首先在硬盘内安装最新MaxDos提取C:MaxDos文件夹的内容。
723 0
MaxCompute-使用分享
MaxCompute(原名ODPS)-使用分享
1103 0
Ubuntu15.10下如何使用EasyGui模块开发Python GUI
偶然的一个机会,发现了github上的这个开源的项目,easygui for python(一个基于TKinter的模块) 感觉很是惊讶,原来python也可以这么简单的开发出一些GUI界面(究其原因,是我自身的孤陋寡闻罢了)。
966 0
阿里云服务器端口号设置
阿里云服务器初级使用者可能面临的问题之一. 使用tomcat或者其他服务器软件设置端口号后,比如 一些不是默认的, mysql的 3306, mssql的1433,有时候打不开网页, 原因是没有在ecs安全组去设置这个端口号. 解决: 点击ecs下网络和安全下的安全组 在弹出的安全组中,如果没有就新建安全组,然后点击配置规则 最后如上图点击添加...或快速创建.   have fun!  将编程看作是一门艺术,而不单单是个技术。
4504 0
MaxCompute Studio使用心得系列6——一个工具完成整个Python UDF开发
2017/12/20 北京云栖大会上阿里云MaxCompute发布了最新的功能Python UDF,万众期待的功能终于支持啦,我怎么能不一试为快,今天就分享如何通过Studio进行Python udf开发。
6784 0
【流数据与大屏DataV】如何使用DTS,Datahub,StreamCompute,RDS及DataV搭建流数据大屏
本文主要从数字化大屏的价值及实现两方面阐述了数字化大屏的制作过程。
3200 0
+关注
14
文章
0
问答
来源圈子
更多
MaxCompute(原ODPS)是一项面向分析的大数据计算服务,它以Serverless架构提供快速、全托管的在线数据仓库服务,消除传统数据平台在资源扩展性和弹性方面的限制,最小化用户运维投入,使您经济并高效的分析处理海量数据。
+ 订阅
文章排行榜
最热
最新
相关电子书
更多
文娱运维技术
立即下载
《SaaS模式云原生数据仓库应用场景实践》
立即下载
《看见新力量:二》电子书
立即下载