版本
Scala 2.11.1
Spark 2.11
HBase 2.0.5
代码
其中hbase-site.xml为hbase安装目录下/hbase/conf里的hbase-site.xml
pom依赖
<properties> <mysql.version>6.0.5</mysql.version> <spring.version>4.3.6.RELEASE</spring.version> <spring.data.jpa.version>1.11.0.RELEASE</spring.data.jpa.version> <log4j.version>1.2.17</log4j.version> <quartz.version>2.2.3</quartz.version> <slf4j.version>1.7.22</slf4j.version> <hibernate.version>5.2.6.Final</hibernate.version> <camel.version>2.18.2</camel.version> <config.version>1.10</config.version> <jackson.version>2.8.6</jackson.version> <servlet.version>3.0.1</servlet.version> <net.sf.json.version>2.4</net.sf.json.version> <activemq.version>5.14.3</activemq.version> <spark.version>2.1.1</spark.version> <scala.version>2.11.11</scala.version> <hadoop.version>2.7.3</hadoop.version> </properties> <dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>${spark.version}</version> </dependency> <!-- Logging --> <dependency> <groupId>org.slf4j</groupId> <artifactId>jcl-over-slf4j</artifactId> <version>${slf4j.version}</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>${slf4j.version}</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>${slf4j.version}</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>${log4j.version}</version> </dependency> <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.38</version> </dependency> <!-- https://mvnrepository.com/artifact/cn.hutool/hutool-all --> <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> <version>5.4.7</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>1.3.1</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>1.3.1</version> </dependency> </dependencies>
查询
查全表
package com.bjfu.spark.demo.hbasedemo import com.google.common.collect.Table.Cell import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hbase import org.apache.hadoop.hbase.{CellUtil, HBaseConfiguration} import org.apache.hadoop.hbase.client.Result import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.hbase.util.Bytes import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} /** * @author chaird * @create 2020-11-13 22:04 */ object HBaseDemo { def main(args: Array[String]): Unit = { val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("HBaseDemo") //创建Spark上下文对象 val sc = new SparkContext(config) //创建HBase配置对象 val conf: Configuration = HBaseConfiguration.create() conf.set(TableInputFormat.INPUT_TABLE, "EcodataFei") //全表扫描 val hbaseRDD: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result]) hbaseRDD.foreach { //第一种展示方式 case (rowkey, result) => { val cells: Array[hbase.Cell] = result.rawCells() for (cell <- cells) { println(Bytes.toString(CellUtil.cloneValue(cell))) } } 第二种展示方式 // case (rowkey,result)=>{ // //rowKey // val key: String = Bytes.toString(result.getRow) // //列族,列,值 // val value: String = Bytes.toString(result.getValue(Bytes.toBytes("datatype"),Bytes.toBytes("288"))) // println("rowKey:"+key+" "+"value:"+value) // // } } //释放资源 sc.stop() } }
根据rowKey查询
package com.bjfu.spark.demo.hbasedemo import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.{Result, Scan} import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.hbase.util.Bytes import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.spark.{SparkConf, SparkContext} import org.apache.hadoop.hbase.util.{Base64, Bytes} import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName} import org.apache.hadoop.hbase.client.{HTable, Scan} import org.apache.hadoop.hbase.filter.FilterList.Operator import org.apache.hadoop.hbase.filter._ import java.util import org.apache.hadoop.hbase.protobuf.ProtobufUtil /** * @author chaird * @create 2020-11-14 20:39 */ object HBaseConditionDemo { def main(args: Array[String]): Unit = { //创建Spark上下文对象 val config: SparkConf = new SparkConf().setMaster("local[*]").setAppName("HBaseDemo") val sc = new SparkContext(config) //创建HBaseConf val hbaseConf: Configuration = HBaseConfiguration.create() hbaseConf.set(TableInputFormat.INPUT_TABLE, "EcodataFei") //创建过滤器(主键) val scan = new Scan() val rowkeys = List("17-2020-41-12 17:09", "2-2020-37-12 17:09") val filters = new util.ArrayList[Filter]() for (cookieid <- rowkeys) { val filter = new RowFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(cookieid))) filters.add(filter) } val filterList = new FilterList(Operator.MUST_PASS_ONE, filters) scan.setFilter(filterList) hbaseConf.set(TableInputFormat.SCAN, convertScanToString(scan)) val hBaseRDD = sc.newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]) val hbaseRDD: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result]) hbaseRDD.foreach { case (rowkey, result) => { //rowKey val key: String = Bytes.toString(result.getRow) //列族,列,值 val value: String = Bytes.toString(result.getValue(Bytes.toBytes("datatype"), Bytes.toBytes("288"))) println("rowKey:" + key + " " + "value:" + value) } } //释放资源 sc.stop() } def convertScanToString(scan: Scan) = { val proto = ProtobufUtil.toScan(scan) Base64.encodeBytes(proto.toByteArray) } }
将结果集保存下来
val list: List[String] = hbaseRDD.map( x => (Bytes.toString(x._2.getValue(Bytes.toBytes("datatype"), Bytes.toBytes("customer_id"))))).collect().toList list.foreach(println(_))