POM文件
首先是引入依赖,hbase-client是HBase提供的一套比较底层的API,在实际使用时需要对其进行封装,提供更好用的api给用户;hbase-common是客户端和服务端公用的类定义,工具等通用的功能性代码;hbase-serverhbase是服务器端依赖;hadoop-common封装了一些常用的底层工具,供其他Hadoop模块使用。其主要包括配置工具Configuration、远程过程调用(Remote Procedure Call,RPC)、序列化工具和Hadoop的抽象文件系统工具FileSystem等,为其他Hadoop模块的运行提供基本服务,并为开发Hadoop程序提供了必要的 API。
<projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>Question</artifactId><version>1.0-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>1.2.0</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-common</artifactId><version>1.2.0</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-server</artifactId><version>1.2.0</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>2.6.0</version></dependency></dependencies></project>
静态数据
配置静态数据,也就是下面注释中的命名空间和表名
packageorg.example; /*** 表静态数据*/publicclassConstant { // 命名空间publicstaticfinalStringNameSpace="weibo"; // 内容表publicstaticfinalStringCONTENT="weibo.content"; // 用户关系表publicstaticfinalStringRELATIONS="weibo:relations"; // 收件箱表publicstaticfinalStringINBOX="weibo:inbox"; }
工具类
工具类,包含了建命名空间,建表,获得用户微博数据,发布微博,关注,取关等逻辑
packageorg.example; importorg.apache.hadoop.conf.Configuration; importorg.apache.hadoop.hbase.*; importorg.apache.hadoop.hbase.client.*; importorg.apache.hadoop.hbase.filter.CompareFilter; importorg.apache.hadoop.hbase.filter.RowFilter; importorg.apache.hadoop.hbase.filter.SubstringComparator; importorg.apache.hadoop.hbase.util.Bytes; importjava.io.IOException; importjava.util.ArrayList; importjava.util.List; /*** 工具类*/publicclassWeiboUtil { // 创建hbase配置privatestaticfinalConfigurationconfiguration=HBaseConfiguration.create(); static { configuration.set("hbase.zookeeper.quorum", "127.0.0.1:2181"); } /*** 命名空间不存在则创建命名空间** @param ns*/publicstaticvoidisCreateNameSpace(Stringns) throwsIOException { //创建连接Connectionconnection=ConnectionFactory.createConnection(configuration); Adminadmin=connection.getAdmin(); try { // 如果namespace不存在会抛出异常admin.getNamespaceDescriptor(ns); } catch (NamespaceExistExceptione) { // 创建namespacecreateNameSpace(ns); } } /*** 创建命名空间** @param ns 命名空间* @throws IOException*/publicstaticvoidcreateNameSpace(Stringns) throwsIOException { //创建连接Connectionconnection=ConnectionFactory.createConnection(configuration); Adminadmin=connection.getAdmin(); //创建NS描述器NamespaceDescriptornamespaceDescriptor=NamespaceDescriptor.create(ns).build(); //创建操作admin.createNamespace(namespaceDescriptor); //关闭资源admin.close(); connection.close(); } /*** 创建表** @param tableName 表名* @param versions 版本* @param cfs 列族* @throws IOException*/publicstaticvoidcreateTable(StringtableName, intversions, String... cfs) throwsIOException { //创建连接Connectionconnection=ConnectionFactory.createConnection(configuration); Adminadmin=connection.getAdmin(); // 此处设置了表存在则不创建表if (admin.tableExists(TableName.valueOf(tableName))) { return; } HTableDescriptorhTableDescriptor=newHTableDescriptor(TableName.valueOf(tableName)); //循环添加列族for (Stringcf : cfs) { HColumnDescriptorhColumnDescriptor=newHColumnDescriptor(cf); hColumnDescriptor.setMaxVersions(versions); hTableDescriptor.addFamily(hColumnDescriptor); } admin.createTable(hTableDescriptor); //关闭资源admin.close(); connection.close(); } /*** 发布微博** @param uid 用户id* @param content 博文*/publicstaticvoidcreateData(Stringuid, Stringcontent) throwsIOException { Connectionconnection=ConnectionFactory.createConnection(configuration); TablecontTable=connection.getTable(TableName.valueOf(Constant.CONTENT)); TablerelaTable=connection.getTable(TableName.valueOf(Constant.RELATIONS)); TableinboxTable=connection.getTable(TableName.valueOf(Constant.INBOX)); // 拼接RKlongts=System.currentTimeMillis(); Stringrow_key=uid+"_"+ts; // 生成put对象Putput=newPut(Bytes.toBytes(row_key)); put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("content"), Bytes.toBytes(content)); // 向内容表添加数据contTable.put(put); Getget=newGet(Bytes.toBytes("fans")); get.addFamily(Bytes.toBytes("fans")); Resultresult=relaTable.get(get); Cell[] cells=result.rawCells(); if (cells.length==0) { return; } // 更新fans收件箱表List<Put>puts=newArrayList<>(); for (Cellcell : cells) { byte[] cloneQualifier=CellUtil.cloneQualifier(cell); PutinboxPut=newPut(cloneQualifier); inboxPut.addColumn(Bytes.toBytes("info"), Bytes.toBytes(uid), ts, Bytes.toBytes(row_key)); puts.add(inboxPut); } inboxTable.put(puts); contTable.close(); relaTable.close(); inboxTable.close(); //关闭资源connection.close(); } /*** 关注用户** @param uid uid* @param uids 粉丝uid* @throws IOException*/publicstaticvoidaddAttend(Stringuid, String... uids) throwsIOException { Connectionconnection=ConnectionFactory.createConnection(configuration); TablecontTable=connection.getTable(TableName.valueOf(Constant.CONTENT)); TablerelaTable=connection.getTable(TableName.valueOf(Constant.RELATIONS)); TableinboxTable=connection.getTable(TableName.valueOf(Constant.INBOX)); // 创建操作者的put对象PutrelaPut=newPut(Bytes.toBytes(uid)); ArrayList<Put>puts=newArrayList<>(); for (Strings : uids) { relaPut.addColumn(Bytes.toBytes("attends"), Bytes.toBytes(s), Bytes.toBytes(s)); // 创建被操作者的put对象PutfansPut=newPut(Bytes.toBytes(s)); fansPut.addColumn(Bytes.toBytes("fans"), Bytes.toBytes(uid), Bytes.toBytes(uid)); puts.add(fansPut); } puts.add(relaPut); relaTable.put(puts); PutinboxPut=newPut(Bytes.toBytes(uid)); // 获得内容表中被关注者的rowKeyfor (Strings : uids) { Scanscan=newScan(Bytes.toBytes(s), Bytes.toBytes(s+"|")); ResultScannerresults=contTable.getScanner(scan); for (Resultresult : results) { StringrowKey=Bytes.toString(result.getRow()); String[] split=rowKey.split("_"); byte[] row=result.getRow(); inboxPut.addColumn(Bytes.toBytes("info"), Bytes.toBytes(s), Long.parseLong(split[1]), row); } } inboxTable.put(inboxPut); inboxTable.close(); relaTable.close(); contTable.close(); connection.close(); } /*** 取关用户** @param uid* @param uids* @throws IOException*/publicstaticvoiddelAttend(Stringuid, String... uids) throwsIOException { Connectionconnection=ConnectionFactory.createConnection(configuration); TablerelaTable=connection.getTable(TableName.valueOf(Constant.RELATIONS)); TableinboxTable=connection.getTable(TableName.valueOf(Constant.INBOX)); DeleterelaDel=newDelete(Bytes.toBytes(uid)); ArrayList<Delete>deletes=newArrayList<>(); for (Strings : uids) { DeletefansDel=newDelete(Bytes.toBytes(s)); fansDel.addColumn(Bytes.toBytes("fans"), Bytes.toBytes(uid)); relaDel.addColumn(Bytes.toBytes("attends"), Bytes.toBytes(s)); deletes.add(fansDel); } deletes.add(relaDel); // 执行删除操作relaTable.delete(deletes); // 删除收件箱相关内容DeleteinboxDel=newDelete(Bytes.toBytes(uid)); for (Strings : uids) { inboxDel.addColumn(Bytes.toBytes("info"), Bytes.toBytes(s)); } // 执行收件箱删除操作inboxTable.delete(inboxDel); inboxTable.close(); relaTable.close(); connection.close(); } /*** 获取微博内容(初始化页面)** @param uid* @throws IOException*/publicstaticvoidgetInit(Stringuid) throwsIOException { //获取连接Connectionconnection=ConnectionFactory.createConnection(configuration); //获取表对象(2个)TableinboxTable=connection.getTable(TableName.valueOf(Constant.INBOX)); TablecontTable=connection.getTable(TableName.valueOf(Constant.CONTENT)); //获取收件箱表数据Getget=newGet(Bytes.toBytes(uid));//收件箱表get对象Resultresult=inboxTable.get(get);//设置获取最大版本的数据ArrayList<Get>gets=newArrayList<>(); Cell[] cells=result.rawCells(); //遍历返回内容并将其封装成内容的get对象for (Cellcell : cells) { GetcontGet=newGet(CellUtil.cloneValue(cell)); gets.add(contGet); //根据收件箱获取值去往内容表获取微博内容Result[] results=contTable.get(gets); for (Resultresult1 : results) { Cell[] cells1=result1.rawCells(); //遍历打印for (Cellcel1 : cells1) { System.out.println("RK:"+Bytes.toString(CellUtil.cloneRow(cel1)) +",Content:"+Bytes.toString(CellUtil.cloneValue(cel1))); } } } inboxTable.close(); contTable.close(); connection.close(); } /*** 获取微博内容(查看某个人所有微博内容)** @param uid* @throws IOException*/publicstaticvoidgetData(Stringuid) throwsIOException { //获取连接Connectionconnection=ConnectionFactory.createConnection(configuration); //获取表对象Tabletable=connection.getTable(TableName.valueOf(Constant.CONTENT)); //扫描(过滤器)Scanscan=newScan(); RowFilterrowFilter=newRowFilter(CompareFilter.CompareOp.EQUAL, newSubstringComparator(uid+"_")); scan.setFilter(rowFilter); ResultScannerresults=table.getScanner(scan); //遍历打印for (Resultresult : results) { Cell[] cells=result.rawCells(); for (Cellcell : cells) { System.out.println("RK:"+Bytes.toString(CellUtil.cloneRow(cell)) +",Content:"+Bytes.toString(CellUtil.cloneValue(cell))); //关闭资源table.close(); connection.close(); } } } }
主方法&问题
主方法是按照流程写的一套逻辑,但是发现不少问题,最初是工具类没有加已经存在表和命名空间的判定,初始化只能进行一次,后来加上判定;然后是数据的不同步,每次关注完用户后,后续数据插入后不会更新收件箱表的数据
packageorg.example; importjava.io.IOException; /*** 启动类*/publicclassWeibo { /*** 初始化*/publicstaticvoidinit() throwsIOException { // 创建命名空间WeiboUtil.isCreateNameSpace(Constant.NameSpace); // 创建内容表WeiboUtil.createTable(Constant.CONTENT, 1, "info"); // 创建用户关系表WeiboUtil.createTable(Constant.RELATIONS, 1, "attends", "fans"); // 创建收件箱表WeiboUtil.createTable(Constant.INBOX, 2, "info"); } /*** 主方法** @param args*/publicstaticvoidmain(String[] args) throwsIOException, InterruptedException { // 初始化init(); // 发布微博WeiboUtil.createData("11", "今天天气还可以"); WeiboUtil.createData("12", "今天天气不好"); WeiboUtil.createData("12", "今天天气真不好"); WeiboUtil.createData("12", "今天不想上课"); WeiboUtil.createData("12", "明天想上课"); WeiboUtil.createData("13", "明天想上课"); WeiboUtil.createData("15", "今天不想上课"); WeiboUtil.createData("15", "明天想上课"); WeiboUtil.createData("14", "明天想上课"); WeiboUtil.createData("16", "今天天气不好"); WeiboUtil.createData("17", "今天天气真不好"); WeiboUtil.createData("18", "今天不想上课"); WeiboUtil.createData("16", "明天想上课"); // 关注WeiboUtil.addAttend("14", "12", "13", "11", "15"); WeiboUtil.addAttend("16", "14", "18", "11", "15"); System.out.println("----------------"); // 初始化14页面信息WeiboUtil.getInit("14"); Thread.sleep(1000); System.out.println("----------------"); // 初始化16页面信息WeiboUtil.getInit("16"); Thread.sleep(1000); WeiboUtil.createData("11", "今天不想上课!无"); WeiboUtil.createData("15", "明天想上课!无"); WeiboUtil.createData("12", "明天想上课!无"); WeiboUtil.createData("13", "今天不想上课!无"); System.out.println("----------------"); // 初始化14页面信息WeiboUtil.getInit("14"); Thread.sleep(1000); WeiboUtil.createData("14", "今天不想上课!!啊"); WeiboUtil.createData("18", "明天想上课!!啊"); WeiboUtil.createData("15", "明天想上课!!啊"); WeiboUtil.createData("11", "今天不想上课!!啊"); System.out.println("----------------"); // 初始化16页面信息WeiboUtil.getInit("16"); Thread.sleep(1000); // 获得16发布微博System.out.println("----------------"); WeiboUtil.getData("16"); // 16取关11,18System.out.println("----------------"); WeiboUtil.delAttend("16", "11", "18"); WeiboUtil.getInit("16"); } }