hbase-微博三表实战demo

简介: 这是根据学习图谱中hbase的教程,写的最后的微博案例demo,使用的是mac系统,本地安装的mysql,单机版的hbase,本地环境配置的jdk版本是openjdk11.0.16.1,具体逻辑是用户微博内容写入到内容表,关注该用户的用户根据用户关系表,将数据推送到信箱表中。

POM文件

首先是引入依赖,hbase-client是HBase提供的一套比较底层的API,在实际使用时需要对其进行封装,提供更好用的api给用户;hbase-common是客户端和服务端公用的类定义,工具等通用的功能性代码;hbase-serverhbase是服务器端依赖;hadoop-common封装了一些常用的底层工具,供其他Hadoop模块使用。其主要包括配置工具Configuration、远程过程调用(Remote Procedure Call,RPC)、序列化工具和Hadoop的抽象文件系统工具FileSystem等,为其他Hadoop模块的运行提供基本服务,并为开发Hadoop程序提供了必要的 API。

<?xmlversion="1.0" encoding="UTF-8"?><projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>Question</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>1.2.0</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-common</artifactId><version>1.2.0</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-server</artifactId><version>1.2.0</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>2.6.0</version></dependency></dependencies></project>

静态数据

配置静态数据,也就是下面注释中的命名空间和表名

packageorg.example;
/*** 表静态数据*/publicclassConstant {
//    命名空间publicstaticfinalStringNameSpace="weibo";
//    内容表publicstaticfinalStringCONTENT="weibo.content";
//    用户关系表publicstaticfinalStringRELATIONS="weibo:relations";
//    收件箱表publicstaticfinalStringINBOX="weibo:inbox";
}

工具类

工具类,包含了建命名空间,建表,获得用户微博数据,发布微博,关注,取关等逻辑

packageorg.example;
importorg.apache.hadoop.conf.Configuration;
importorg.apache.hadoop.hbase.*;
importorg.apache.hadoop.hbase.client.*;
importorg.apache.hadoop.hbase.filter.CompareFilter;
importorg.apache.hadoop.hbase.filter.RowFilter;
importorg.apache.hadoop.hbase.filter.SubstringComparator;
importorg.apache.hadoop.hbase.util.Bytes;
importjava.io.IOException;
importjava.util.ArrayList;
importjava.util.List;
/*** 工具类*/publicclassWeiboUtil {
// 创建hbase配置privatestaticfinalConfigurationconfiguration=HBaseConfiguration.create();
static {
configuration.set("hbase.zookeeper.quorum", "127.0.0.1:2181");
    }
/*** 命名空间不存在则创建命名空间** @param ns*/publicstaticvoidisCreateNameSpace(Stringns) throwsIOException {
//创建连接Connectionconnection=ConnectionFactory.createConnection(configuration);
Adminadmin=connection.getAdmin();
try {
//            如果namespace不存在会抛出异常admin.getNamespaceDescriptor(ns);
        } catch (NamespaceExistExceptione) {
//            创建namespacecreateNameSpace(ns);
        }
    }
/*** 创建命名空间** @param ns 命名空间* @throws IOException*/publicstaticvoidcreateNameSpace(Stringns) throwsIOException {
//创建连接Connectionconnection=ConnectionFactory.createConnection(configuration);
Adminadmin=connection.getAdmin();
//创建NS描述器NamespaceDescriptornamespaceDescriptor=NamespaceDescriptor.create(ns).build();
//创建操作admin.createNamespace(namespaceDescriptor);
//关闭资源admin.close();
connection.close();
    }
/*** 创建表** @param tableName 表名* @param versions  版本* @param cfs       列族* @throws IOException*/publicstaticvoidcreateTable(StringtableName, intversions, String... cfs) throwsIOException {
//创建连接Connectionconnection=ConnectionFactory.createConnection(configuration);
Adminadmin=connection.getAdmin();
//        此处设置了表存在则不创建表if (admin.tableExists(TableName.valueOf(tableName))) {
return;
        }
HTableDescriptorhTableDescriptor=newHTableDescriptor(TableName.valueOf(tableName));
//循环添加列族for (Stringcf : cfs) {
HColumnDescriptorhColumnDescriptor=newHColumnDescriptor(cf);
hColumnDescriptor.setMaxVersions(versions);
hTableDescriptor.addFamily(hColumnDescriptor);
        }
admin.createTable(hTableDescriptor);
//关闭资源admin.close();
connection.close();
    }
/*** 发布微博** @param uid     用户id* @param content 博文*/publicstaticvoidcreateData(Stringuid, Stringcontent) throwsIOException {
Connectionconnection=ConnectionFactory.createConnection(configuration);
TablecontTable=connection.getTable(TableName.valueOf(Constant.CONTENT));
TablerelaTable=connection.getTable(TableName.valueOf(Constant.RELATIONS));
TableinboxTable=connection.getTable(TableName.valueOf(Constant.INBOX));
//        拼接RKlongts=System.currentTimeMillis();
Stringrow_key=uid+"_"+ts;
//        生成put对象Putput=newPut(Bytes.toBytes(row_key));
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("content"), Bytes.toBytes(content));
//        向内容表添加数据contTable.put(put);
Getget=newGet(Bytes.toBytes("fans"));
get.addFamily(Bytes.toBytes("fans"));
Resultresult=relaTable.get(get);
Cell[] cells=result.rawCells();
if (cells.length==0) {
return;
        }
//        更新fans收件箱表List<Put>puts=newArrayList<>();
for (Cellcell :
cells) {
byte[] cloneQualifier=CellUtil.cloneQualifier(cell);
PutinboxPut=newPut(cloneQualifier);
inboxPut.addColumn(Bytes.toBytes("info"), Bytes.toBytes(uid), ts, Bytes.toBytes(row_key));
puts.add(inboxPut);
        }
inboxTable.put(puts);
contTable.close();
relaTable.close();
inboxTable.close();
//关闭资源connection.close();
    }
/*** 关注用户** @param uid  uid* @param uids 粉丝uid* @throws IOException*/publicstaticvoidaddAttend(Stringuid, String... uids) throwsIOException {
Connectionconnection=ConnectionFactory.createConnection(configuration);
TablecontTable=connection.getTable(TableName.valueOf(Constant.CONTENT));
TablerelaTable=connection.getTable(TableName.valueOf(Constant.RELATIONS));
TableinboxTable=connection.getTable(TableName.valueOf(Constant.INBOX));
//        创建操作者的put对象PutrelaPut=newPut(Bytes.toBytes(uid));
ArrayList<Put>puts=newArrayList<>();
for (Strings :
uids) {
relaPut.addColumn(Bytes.toBytes("attends"), Bytes.toBytes(s), Bytes.toBytes(s));
//            创建被操作者的put对象PutfansPut=newPut(Bytes.toBytes(s));
fansPut.addColumn(Bytes.toBytes("fans"), Bytes.toBytes(uid), Bytes.toBytes(uid));
puts.add(fansPut);
        }
puts.add(relaPut);
relaTable.put(puts);
PutinboxPut=newPut(Bytes.toBytes(uid));
//        获得内容表中被关注者的rowKeyfor (Strings :
uids) {
Scanscan=newScan(Bytes.toBytes(s), Bytes.toBytes(s+"|"));
ResultScannerresults=contTable.getScanner(scan);
for (Resultresult :
results) {
StringrowKey=Bytes.toString(result.getRow());
String[] split=rowKey.split("_");
byte[] row=result.getRow();
inboxPut.addColumn(Bytes.toBytes("info"), Bytes.toBytes(s), Long.parseLong(split[1]), row);
            }
        }
inboxTable.put(inboxPut);
inboxTable.close();
relaTable.close();
contTable.close();
connection.close();
    }
/*** 取关用户** @param uid* @param uids* @throws IOException*/publicstaticvoiddelAttend(Stringuid, String... uids) throwsIOException {
Connectionconnection=ConnectionFactory.createConnection(configuration);
TablerelaTable=connection.getTable(TableName.valueOf(Constant.RELATIONS));
TableinboxTable=connection.getTable(TableName.valueOf(Constant.INBOX));
DeleterelaDel=newDelete(Bytes.toBytes(uid));
ArrayList<Delete>deletes=newArrayList<>();
for (Strings :
uids) {
DeletefansDel=newDelete(Bytes.toBytes(s));
fansDel.addColumn(Bytes.toBytes("fans"), Bytes.toBytes(uid));
relaDel.addColumn(Bytes.toBytes("attends"), Bytes.toBytes(s));
deletes.add(fansDel);
        }
deletes.add(relaDel);
//        执行删除操作relaTable.delete(deletes);
//        删除收件箱相关内容DeleteinboxDel=newDelete(Bytes.toBytes(uid));
for (Strings :
uids) {
inboxDel.addColumn(Bytes.toBytes("info"), Bytes.toBytes(s));
        }
//        执行收件箱删除操作inboxTable.delete(inboxDel);
inboxTable.close();
relaTable.close();
connection.close();
    }
/*** 获取微博内容(初始化页面)** @param uid* @throws IOException*/publicstaticvoidgetInit(Stringuid) throwsIOException {
//获取连接Connectionconnection=ConnectionFactory.createConnection(configuration);
//获取表对象(2个)TableinboxTable=connection.getTable(TableName.valueOf(Constant.INBOX));
TablecontTable=connection.getTable(TableName.valueOf(Constant.CONTENT));
//获取收件箱表数据Getget=newGet(Bytes.toBytes(uid));//收件箱表get对象Resultresult=inboxTable.get(get);//设置获取最大版本的数据ArrayList<Get>gets=newArrayList<>();
Cell[] cells=result.rawCells();
//遍历返回内容并将其封装成内容的get对象for (Cellcell : cells) {
GetcontGet=newGet(CellUtil.cloneValue(cell));
gets.add(contGet);
//根据收件箱获取值去往内容表获取微博内容Result[] results=contTable.get(gets);
for (Resultresult1 : results) {
Cell[] cells1=result1.rawCells();
//遍历打印for (Cellcel1 : cells1) {
System.out.println("RK:"+Bytes.toString(CellUtil.cloneRow(cel1))
+",Content:"+Bytes.toString(CellUtil.cloneValue(cel1)));
                }
            }
        }
inboxTable.close();
contTable.close();
connection.close();
    }
/*** 获取微博内容(查看某个人所有微博内容)** @param uid* @throws IOException*/publicstaticvoidgetData(Stringuid) throwsIOException {
//获取连接Connectionconnection=ConnectionFactory.createConnection(configuration);
//获取表对象Tabletable=connection.getTable(TableName.valueOf(Constant.CONTENT));
//扫描(过滤器)Scanscan=newScan();
RowFilterrowFilter=newRowFilter(CompareFilter.CompareOp.EQUAL, newSubstringComparator(uid+"_"));
scan.setFilter(rowFilter);
ResultScannerresults=table.getScanner(scan);
//遍历打印for (Resultresult : results) {
Cell[] cells=result.rawCells();
for (Cellcell : cells) {
System.out.println("RK:"+Bytes.toString(CellUtil.cloneRow(cell))
+",Content:"+Bytes.toString(CellUtil.cloneValue(cell)));
//关闭资源table.close();
connection.close();
            }
        }
    }
}

主方法&问题

主方法是按照流程写的一套逻辑,但是发现不少问题,最初是工具类没有加已经存在表和命名空间的判定,初始化只能进行一次,后来加上判定;然后是数据的不同步,每次关注完用户后,后续数据插入后不会更新收件箱表的数据

packageorg.example;
importjava.io.IOException;
/*** 启动类*/publicclassWeibo {
/*** 初始化*/publicstaticvoidinit() throwsIOException {
//        创建命名空间WeiboUtil.isCreateNameSpace(Constant.NameSpace);
//        创建内容表WeiboUtil.createTable(Constant.CONTENT, 1, "info");
//        创建用户关系表WeiboUtil.createTable(Constant.RELATIONS, 1, "attends", "fans");
//        创建收件箱表WeiboUtil.createTable(Constant.INBOX, 2, "info");
    }
/*** 主方法** @param args*/publicstaticvoidmain(String[] args) throwsIOException, InterruptedException {
//        初始化init();
//       发布微博WeiboUtil.createData("11", "今天天气还可以");
WeiboUtil.createData("12", "今天天气不好");
WeiboUtil.createData("12", "今天天气真不好");
WeiboUtil.createData("12", "今天不想上课");
WeiboUtil.createData("12", "明天想上课");
WeiboUtil.createData("13", "明天想上课");
WeiboUtil.createData("15", "今天不想上课");
WeiboUtil.createData("15", "明天想上课");
WeiboUtil.createData("14", "明天想上课");
WeiboUtil.createData("16", "今天天气不好");
WeiboUtil.createData("17", "今天天气真不好");
WeiboUtil.createData("18", "今天不想上课");
WeiboUtil.createData("16", "明天想上课");
//       关注WeiboUtil.addAttend("14", "12", "13", "11", "15");
WeiboUtil.addAttend("16", "14", "18", "11", "15");
System.out.println("----------------");
//        初始化14页面信息WeiboUtil.getInit("14");
Thread.sleep(1000);
System.out.println("----------------");
//        初始化16页面信息WeiboUtil.getInit("16");
Thread.sleep(1000);
WeiboUtil.createData("11", "今天不想上课!无");
WeiboUtil.createData("15", "明天想上课!无");
WeiboUtil.createData("12", "明天想上课!无");
WeiboUtil.createData("13", "今天不想上课!无");
System.out.println("----------------");
//        初始化14页面信息WeiboUtil.getInit("14");
Thread.sleep(1000);
WeiboUtil.createData("14", "今天不想上课!!啊");
WeiboUtil.createData("18", "明天想上课!!啊");
WeiboUtil.createData("15", "明天想上课!!啊");
WeiboUtil.createData("11", "今天不想上课!!啊");
System.out.println("----------------");
//        初始化16页面信息WeiboUtil.getInit("16");
Thread.sleep(1000);
//        获得16发布微博System.out.println("----------------");
WeiboUtil.getData("16");
//        16取关11,18System.out.println("----------------");
WeiboUtil.delAttend("16", "11", "18");
WeiboUtil.getInit("16");
    }
}
相关实践学习
lindorm多模间数据无缝流转
展现了Lindorm多模融合能力——用kafka API写入,无缝流转在各引擎内进行数据存储和计算的实验。
云数据库HBase版使用教程
&nbsp; 相关的阿里云产品:云数据库 HBase 版 面向大数据领域的一站式NoSQL服务,100%兼容开源HBase并深度扩展,支持海量数据下的实时存储、高并发吞吐、轻SQL分析、全文检索、时序时空查询等能力,是风控、推荐、广告、物联网、车联网、Feeds流、数据大屏等场景首选数据库,是为淘宝、支付宝、菜鸟等众多阿里核心业务提供关键支撑的数据库。 了解产品详情:&nbsp;https://cn.aliyun.com/product/hbase &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
8月前
|
存储 分布式计算 大数据
HBase分布式数据库关键技术与实战:面试经验与必备知识点解析
【4月更文挑战第9天】本文深入剖析了HBase的核心技术,包括数据模型、分布式架构、访问模式和一致性保证,并探讨了其实战应用,如大规模数据存储、实时数据分析及与Hadoop、Spark集成。同时,分享了面试经验,对比了HBase与其他数据库的差异,提出了应对挑战的解决方案,展望了HBase的未来趋势。通过Java API代码示例,帮助读者巩固理解。全面了解和掌握HBase,能为面试和实际工作中的大数据处理提供坚实基础。
497 3
|
存储 分布式计算 监控
深入浅出 HBase 实战 | 青训营笔记
Hbase是一种NoSQL数据库,这意味着它不像传统的RDBMS数据库那样支持SQL作为查询语言。Hbase是一种分布式存储的数据库,技术上来讲,它更像是分布式存储而不是分布式数据库,它缺少很多RDBMS系统的特性,比如列类型,辅助索引,触发器,和高级查询语言等待。
1124 0
深入浅出 HBase 实战 | 青训营笔记
|
5月前
|
分布式计算 大数据 分布式数据库
"揭秘HBase MapReduce高效数据处理秘诀:四步实战攻略,让你轻松玩转大数据分析!"
【8月更文挑战第17天】大数据时代,HBase以高性能、可扩展性成为关键的数据存储解决方案。结合MapReduce分布式计算框架,能高效处理HBase中的大规模数据。本文通过实例展示如何配置HBase集群、编写Map和Reduce函数,以及运行MapReduce作业来计算HBase某列的平均值。此过程不仅限于简单的统计分析,还可扩展至更复杂的数据处理任务,为企业提供强有力的大数据技术支持。
94 1
|
8月前
|
存储 NoSQL 分布式数据库
【HBase入门与实战】一文搞懂HBase!
该文档介绍了HBase,一种高吞吐量的NoSQL数据库,适合处理大规模数据。HBase具备快速读写、列式存储和天然支持集群部署的特点,常用于高并发场景。NoSQL与关系型数据库的主要区别在于数据模型、查询语言和可伸缩性。HBase的物理架构包括Client、Zookeeper、HMaster和RegionServer,其中RegionServer管理数据存储。HBase的读写流程利用MemStore和Bloom Filter提高效率。此外,文档还提到了HBase的应用,如时间序列数据、消息传递和内容服务。
1153 1
【HBase入门与实战】一文搞懂HBase!
|
SQL 存储 消息中间件
|
SQL 消息中间件 存储
Flink SQL 实战:HBase 的结合应用
本文着重介绍 HBase 和 Flink 在实际场景中的结合使用。主要分为两种场景,第一种场景:HBase 作为维表与 Flink Kafka table 做 temporal table join 的场景;第二种场景:Flink SQL 做计算之后的结果写到 HBase 表,供其他用户查询的场景。
Flink SQL 实战:HBase 的结合应用
|
存储 分布式计算 负载均衡
深入浅出 HBase 实战|青训营笔记
1.介绍 HBase 的适用场景和数据模型;2.分析 HBase 的整体架构和模块设计;3.针对大数据场景 HBase 的解决方案
265 0
深入浅出 HBase 实战|青训营笔记
|
分布式数据库 Hbase
|
分布式数据库 Hbase
|
分布式数据库 Hbase