Spark读写Hbase中的数据

简介: def main(args: Array[String]) { val sparkConf = new SparkConf().setMaster("local").setAppName("cocapp").
def main(args: Array[String])  {
    val sparkConf = new SparkConf().setMaster("local").setAppName("cocapp").set("spark.kryo.registrator", classOf[HBaseConfiguration].getName)
      .set("spark.executor.memory", "4g")
    val sc: SparkContext = new SparkContext(sparkConf)
    val sqlContext = new HiveContext(sc)
    val mySQLUrl = "jdbc:mysql://localhost:3306/yangsy?user=root&password=yangsiyi"
    val rows = sqlContext.jdbc(mySQLUrl, "person")
    val tableName = "spark"
    val columnFamily = "cf" //rows.first().getString(1)
    val configuration = HBaseConfiguration.create();
    configuration.set(TableInputFormat.INPUT_TABLE, "test");
    val admin = new HBaseAdmin(configuration)
    val hBaseRDD = sc.newAPIHadoopRDD(configuration, classOf[TableInputFormat],
        classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
    classOf[org.apache.hadoop.hbase.client.Result])
      hBaseRDD.count()

def toHbase(rows: DataFrame,tableName : String,columnFamily: String)   {
    val configuration = HBaseConfiguration.create();
    val admin = new HBaseAdmin(configuration)
    if (admin.tableExists(tableName)) {
      print("table Exists")
      admin.disableTable(tableName);
      admin.deleteTable(tableName);
    }
    configuration.addResource("hbase-site.xml")
    val tableDesc = new HTableDescriptor(tableName)
    tableDesc.addFamily(new HColumnDescriptor(columnFamily))
    admin.createTable(tableDesc)
    rows.foreachPartition { row =>
      val table = new HTable(configuration, tableName)

      row.foreach { a =>
        val put = new Put(Bytes.toBytes("row1"))
        put.add(Bytes.toBytes(columnFamily), Bytes.toBytes("coulumn1"), Bytes.toBytes(a.getString(0)))
        table.put(put)
        println("insert into success")
      }
    }

然而并没有什么乱用,发现一个问题,就是说,在RDD取值与写入HBASE的时候,引入外部变量无法序列化。。。。。。网上很多说法是说extends Serializable ,可是尝试无效。Count()是可以获取到,但是如果我要在configuration中set列,然后进行查询就会报错了。暂时各种办法尝试无果,还在想办法,也不明原因。

相关实践学习
lindorm多模间数据无缝流转
展现了Lindorm多模融合能力——用kafka API写入,无缝流转在各引擎内进行数据存储和计算的实验。
云数据库HBase版使用教程
  相关的阿里云产品:云数据库 HBase 版 面向大数据领域的一站式NoSQL服务,100%兼容开源HBase并深度扩展,支持海量数据下的实时存储、高并发吞吐、轻SQL分析、全文检索、时序时空查询等能力,是风控、推荐、广告、物联网、车联网、Feeds流、数据大屏等场景首选数据库,是为淘宝、支付宝、菜鸟等众多阿里核心业务提供关键支撑的数据库。 了解产品详情: https://cn.aliyun.com/product/hbase   ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库 ECS 实例和一台目标数据库 RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
28天前
|
存储 分布式计算 Java
|
2月前
|
存储 分布式数据库 数据库
Hbase学习二:Hbase数据特点和架构特点
Hbase学习二:Hbase数据特点和架构特点
44 0
|
28天前
|
分布式计算 监控 大数据
如何处理 Spark 中的倾斜数据?
【8月更文挑战第13天】
155 4
|
28天前
|
存储 缓存 分布式计算
|
28天前
|
SQL 存储 分布式计算
|
26天前
|
分布式计算 Apache 数据安全/隐私保护
流计算引擎数据问题之在 Spark Structured Streaming 中水印计算和使用如何解决
流计算引擎数据问题之在 Spark Structured Streaming 中水印计算和使用如何解决
35 1
|
24天前
|
存储 分布式计算 分布式数据库
《HBase MapReduce之旅:我的学习笔记与心得》——跟随我的步伐,一同探索HBase世界,揭开MapReduce的神秘面纱,分享那些挑战与收获,让你在数据的海洋里畅游无阻!
【8月更文挑战第17天】HBase是Apache顶级项目,作为Bigtable的开源版,它是一个非关系型、分布式数据库,具备高可扩展性和性能。结合HDFS存储和MapReduce计算框架,以及Zookeeper协同服务,HBase支持海量数据高效管理。MapReduce通过将任务拆解并在集群上并行执行,极大提升处理速度。学习HBase MapReduce涉及理解其数据模型、编程模型及应用实践,虽然充满挑战,但收获颇丰,对职业发展大有裨益。
27 0
|
2月前
|
缓存 监控 Shell
使用 HBase Shell 进行数据的实时监控和备份
使用 HBase Shell 进行数据的实时监控和备份
|
分布式计算 安全 Shell
Maxcompute Spark 访问 阿里云 Hbase
引子 本来这个东西是没啥好写的,但是在帮客户解决问题的时候,发现链路太长,不能怪客户弄不出来,记录一下 需求列表 MaxCompute Spark包 (写文章时刻为版本 0.32.1, 请自行更新,本文不是文档) Spark 配置 spark.
Maxcompute Spark 访问 阿里云 Hbase
|
分布式计算 Spark
spark访问hbase
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor} import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.spark.rdd.NewHadoopRDD val conf = HBaseConfigurat
1673 0
下一篇
DDNS