spark访问hbase

简介: import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}import org.apache.hadoop.hbase.mapreduce.TableInputFormatimport org.apache.spark.rdd.NewHadoopRDDval conf = HBaseConfigurat


import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
import org.apache.hadoop.hbase.mapreduce.TableInputFormat

import org.apache.spark.rdd.NewHadoopRDD

val conf = HBaseConfiguration.create()
conf.set(TableInputFormat.INPUT_TABLE, "tmp")

var hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result])

hBaseRDD.count()

import scala.collection.JavaConverters._

hBaseRDD.map(tuple => tuple._2).map(result => result.getColumn("cf".getBytes(), "val".getBytes())).map(keyValues => {
( keyValues.asScala.reduceLeft {
    (a, b) => if (a.getTimestamp > b.getTimestamp) a else b
  }.getRow,
  keyValues.asScala.reduceLeft {
    (a, b) => if (a.getTimestamp > b.getTimestamp) a else b
  }.getValue
)
}).take(10)

hBaseRDD.map(tuple => tuple._2).map(result => (result.getRow, result.getColumn("cf".getBytes(), "val".getBytes()))).map(row => {
(
  row._1.map(_.toChar).mkString,
  row._2.asScala.reduceLeft {
    (a, b) => if (a.getTimestamp > b.getTimestamp) a else b
  }.getValue.map(_.toChar).mkString
)
}).take(10)


conf.set(TableInputFormat.INPUT_TABLE, "test1")

//var hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result])

hBaseRDD.map(tuple => tuple._2).map(result => (result.getRow, result.getColumn("lf".getBytes(), "app1".getBytes()))).map(row => if (row._2.size > 0) {
(
  row._1.map(_.toChar).mkString,
  row._2.asScala.reduceLeft {
    (a, b) => if (a.getTimestamp > b.getTimestamp) a else b
  }.getValue.map(_.toInt).mkString
)
}).take(10)

import java.nio.ByteBuffer
hBaseRDD.map(tuple => tuple._2).map(result => (result.getRow, result.getColumn("lf".getBytes(), "app1".getBytes()))).map(row => if (row._2.size > 0) {
(
  row._1.map(_.toChar).mkString,
  ByteBuffer.wrap(row._2.asScala.reduceLeft {
    (a, b) => if (a.getTimestamp > b.getTimestamp) a else b
  }.getValue).getLong
)
}).take(10)


//conf.set(TableInputFormat.SCAN_COLUMN_FAMILY, "lf")
conf.set(TableInputFormat.SCAN_COLUMNS, "lf:app1")

//var hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result])

import java.nio.ByteBuffer
hBaseRDD.map(tuple => tuple._2).map(result => {
  ( result.getRow.map(_.toChar).mkString,
    ByteBuffer.wrap(result.value).getLong
  )
}).take(10)


val conf = HBaseConfiguration.create()
conf.set(TableInputFormat.INPUT_TABLE, "test1")

var hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result])

var rows = hBaseRDD.map(tuple => tuple._2).map(result => result.getRow.map(_.toChar).mkString)
rows.map(row => row.split("\\|")).map(r => if (r.length > 1) (r(0), r(1)) else (r(0), "") ).groupByKey.take(10)


本文出自 “点滴积累” 博客,请务必保留此出处http://tianxingzhe.blog.51cto.com/3390077/1717761

相关实践学习
lindorm多模间数据无缝流转
展现了Lindorm多模融合能力——用kafka API写入,无缝流转在各引擎内进行数据存储和计算的实验。
云数据库HBase版使用教程
  相关的阿里云产品:云数据库 HBase 版 面向大数据领域的一站式NoSQL服务,100%兼容开源HBase并深度扩展,支持海量数据下的实时存储、高并发吞吐、轻SQL分析、全文检索、时序时空查询等能力,是风控、推荐、广告、物联网、车联网、Feeds流、数据大屏等场景首选数据库,是为淘宝、支付宝、菜鸟等众多阿里核心业务提供关键支撑的数据库。 了解产品详情: https://cn.aliyun.com/product/hbase   ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库 ECS 实例和一台目标数据库 RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
SQL 分布式计算 数据安全/隐私保护
如何杜绝 spark history server ui 的未授权访问? 1
如何杜绝 spark history server ui 的未授权访问?
|
8月前
|
Java Shell 分布式数据库
【大数据技术Hadoop+Spark】HBase数据模型、Shell操作、Java API示例程序讲解(附源码 超详细)
【大数据技术Hadoop+Spark】HBase数据模型、Shell操作、Java API示例程序讲解(附源码 超详细)
168 0
|
7月前
|
分布式计算 DataWorks MaxCompute
MaxCompute操作报错合集之在Spark访问OSS时出现证书错误的问题,该如何解决
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
8月前
|
分布式计算 DataWorks API
DataWorks产品使用合集之在DataWorks中,通过spark访问外网的步骤如何解决
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
204 0
|
8月前
|
存储 分布式计算 API
adb spark的lakehouse api访问内表数据,还支持算子下推吗
【2月更文挑战第21天】adb spark的lakehouse api访问内表数据,还支持算子下推吗
154 2
|
8月前
|
分布式计算 分布式数据库 API
Spark与HBase的集成与数据访问
Spark与HBase的集成与数据访问
|
8月前
|
存储 分布式计算 Hadoop
HBase的数据访问是如何进行的?
HBase的数据访问是如何进行的?
107 0
|
分布式计算 Hadoop 大数据
如何杜绝 spark history server ui 的未授权访问? 2
如何杜绝 spark history server ui 的未授权访问?
|
存储 分布式计算 安全
javaapi 访问 hbase
javaapi 访问 hbase
149 0
|
分布式计算 分布式数据库 Scala
Spark查询Hbase小案例
写作目的 1)正好有些Spark连接HBase的需求,当个笔记本,到时候自己在写的时候,可以看 2)根据rowkey查询其实我还是查询了好久才找到,所以整理了一下 3)好久没发博客了,水一篇
216 0
Spark查询Hbase小案例
下一篇
开通oss服务