需求
1、 发布微博内容
a. 在微博内容表中 添加一条数据(发布者)
b. 在微博内容接收邮件箱表对所有粉丝用户添加数据(订阅者)
scan 'weibo:receive-content-email',{VERSIONS=>5}
2、添加关注用户
a. 在微博用户关系表中 添加新的好友关注(attends)
b. 从被关注用户角度来说, 新增粉丝用户(fans)
c. 微博邮件箱表添加关注用户发布的微博内容
3、移除或者取消关注用户
a. 在微博用户关系表中 移除新的好友关注(attends)
b. 从被关注用户角度来说, 删除粉丝用户(fans)
c. 微博邮件箱表删除关注用户发布的微博内容
4、获取关注用户发布的微博内容
a. 从微博内容邮件表中获取该用户其关注用户的微博内容的rowkey
b. 根据上面获取到的微博内容的rowkey 获取微博内容
微博展示的内容信息:
message: 发布者ID , 时间戳 , content
表的设计
分析微博:
用户群体: 关注用户 和用户粉丝
用户行为: 发布微博、添加或者移除关注用户
数据存储: 分布式 mysql 数据库
考虑因素:响应时间 妙级 无延迟
对你的技术团队就是一个很大的考验
引出hbase 数据库存储 来实现响应时间 妙级 无延迟 、
hbase 是hadoop 的分布式数据库
使用数据库的时候
拦路虎: 表的设计(合理的来设计 因素多元化)
命名空间(类似于传统关系型数据库中的schema): 区分不同的业务表
namespace name:weibo
设计那些表:
a.微博内容表
xxx 发布 xx 内容
table name : weibo:content
rowkey: 被关注用户ID_时间戳
columnfamily: cf
colunmnlabel: (任意变化的)
图片
内容
标题
version: 只需要一个版本
rowkey:
a.唯一性 每条数据是唯一的
b.长度 (<=64 kb 建议 10-100 byte 最佳 8-16 byte)表 rowkey 是hbase中数据产生冗余的因素
c.散列原则
举例:
时间戳_用户ID 作为rowkey
大量的用户在同一时刻 发布微博内容
121_001
121_002
121_003
121_004
===>
集中到某个region 会造成单独几个region 负载量偏大 而其他 region 完全没有负载
d. 业务相关的设计规范:
方便查询 尽可能将查询知道放到 rowkey
列簇设计:
Hbase 是面向列簇存储 region start rowkey 到 stop rowkey 范围内的一个列簇下的数据 对应一个hdfs file 称为StoreFile 也可以称为HFile 所以如果跨列查询 速度相对来说就会慢很多 so 设计hbase 表 列簇的是 一般1-2个,(1个最佳)
b.用户关系表
用户id fans attends
table name : weibo:relations
rowkey: 用户ID(发布者的用户ID)
columnfamily: attends、fans
colunmnlabel:
关注用户ID
粉丝用户ID
colunmnvalue: 关注用户ID
粉丝用户ID
version: 只需要一个版本
c.用户微博内容接收邮件箱表
table name : weibo:receive-content-email
rowkey: 用户ID(粉丝用户ID)
columnfamily: cf
colunmnlabel:
用户ID(发布者ID 被关注用户ID)
colunmnvalue:
取微博内容的rowkey
version: 1000
10001: cf_001_yyyyy
10001: cf_001_xxxxx
hbase 常用命令:
1. disable 'weibo:content': 禁用表
2. drop 'weibo:content': 删除表
3.truncate 'weibo:relations' :清空表数据
- Disabling table...
- Dropping table...
- Creating table...
list_namespace: 查看命名空间
list: 查看表的列表信息
default: 默认使用的命名空间
hbase : 系统默认使用命名空间
drop_namespace 'weibo': 删除指定的命名空间
代码实现
package com.ibeifeng.hbase.weibo.hbase_weibo; import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.MasterNotRunningException; import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.filter.RowFilter; import org.apache.hadoop.hbase.filter.SubstringComparator; import org.apache.hadoop.hbase.util.Bytes; public class WeiBo { private Configuration conf =HBaseConfiguration.create(); private static final byte[] CONTENT=Bytes.toBytes("weibo:content"); private static final byte[] RELATIONS=Bytes.toBytes("weibo:relations"); private static final byte[] RECEIVE_CONTENT_EMAIL=Bytes.toBytes("weibo:receive-content-email"); //创建 命名空间(库) public void initNameSpace(){ HBaseAdmin admin=null; try { admin=new HBaseAdmin(conf); /** * 命名空间(类似于传统关系型数据库中的schema): 区分不同的业务表 namespace name:weibo */ NamespaceDescriptor weibo=NamespaceDescriptor.create("weibo") .addConfiguration("creator", "beifeng")// .addConfiguration("createTime", System.currentTimeMillis()+"") .build(); admin.createNamespace(weibo); } catch (MasterNotRunningException e) { e.printStackTrace(); } catch (ZooKeeperConnectionException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); }finally{ if(null!=admin){ try { admin.close(); } catch (IOException e) { e.printStackTrace(); } } } } // 建表 public void initTable(){ createContent(); createRelations(); createReceiveContentEmails(); } private void createContent() { /** * a.微博内容表: xxx 发布 xx 内容 table name : weibo:content rowkey: 用户ID_时间戳 columnfamily: cf colunmnlabel: 图片 内容 标题 version: 只需要一个版本 */ HBaseAdmin admin=null; try { admin=new HBaseAdmin(conf); HTableDescriptor content=new HTableDescriptor(TableName.valueOf(CONTENT)); HColumnDescriptor c_cf=new HColumnDescriptor(Bytes.toBytes("cf")); c_cf.setBlockCacheEnabled(true); //推荐是计算后的值 c_cf.setBlocksize(2097152); // 一定事先配置好 // c_cf.setCompressionType(Algorithm.SNAPPY); c_cf.setMaxVersions(1); c_cf.setMinVersions(1); content.addFamily(c_cf); admin.createTable(content); } catch (MasterNotRunningException e) { e.printStackTrace(); } catch (ZooKeeperConnectionException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); }finally{ if(null!=admin){ try { admin.close(); } catch (IOException e) { e.printStackTrace(); } } } } private void createRelations() { /** * b.用户关系表: 用户id fans attends table name : weibo:relations rowkey: 用户ID columnfamily: attends、fans colunmnlabel: 关注用户ID 粉丝用户ID colunmnvalue: 用户ID version: 只需要一个版本 */ HBaseAdmin admin=null; try { admin=new HBaseAdmin(conf); HTableDescriptor relations=new HTableDescriptor(TableName.valueOf(RELATIONS)); HColumnDescriptor attends=new HColumnDescriptor(Bytes.toBytes("attends")); attends.setBlockCacheEnabled(true); attends.setBlocksize(2097152); attends.setMaxVersions(1); attends.setMinVersions(1); HColumnDescriptor fans=new HColumnDescriptor(Bytes.toBytes("fans")); fans.setBlockCacheEnabled(true); fans.setBlocksize(2097152); fans.setMaxVersions(1); fans.setMinVersions(1); relations.addFamily(attends); relations.addFamily(fans); admin.createTable(relations); } catch (MasterNotRunningException e) { e.printStackTrace(); } catch (ZooKeeperConnectionException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); }finally{ if(null!=admin){ try { admin.close(); } catch (IOException e) { e.printStackTrace(); } } } } private void createReceiveContentEmails() { /** * c.用户微博内容接收邮件箱表: table name : weibo:receive-content-email rowkey: 用户ID columnfamily: cf colunmnlabel: 用户ID colunmnvalue: 取微博内容的rowkey version: 1000 */ HBaseAdmin admin=null; try { admin=new HBaseAdmin(conf); HTableDescriptor receive_content_email=new HTableDescriptor(TableName.valueOf(RECEIVE_CONTENT_EMAIL)); HColumnDescriptor rce_cf =new HColumnDescriptor(Bytes.toBytes("cf")); rce_cf.setBlockCacheEnabled(true); rce_cf.setBlocksize(2097152); rce_cf.setMaxVersions(1000); rce_cf.setMinVersions(1000); receive_content_email.addFamily(rce_cf); admin.createTable(receive_content_email); } catch (MasterNotRunningException e) { e.printStackTrace(); } catch (ZooKeeperConnectionException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); }finally{ if(null!=admin){ try { admin.close(); } catch (IOException e) { e.printStackTrace(); } } } } /** * 发布微博内容: * a. 在微博内容表中 添加一条数据(发布者) b. 在微博内容接收邮件箱表对所有粉丝用户添加数据(订阅者) * @param uid * 发布者ID * @param content * 发布微博内容 */ public void publishContent(String uid,String content){ HConnection connection=null; try { connection=HConnectionManager.createConnection(conf); //a. 在微博内容表中 添加一条数据(发布者) HTableInterface contentTBL=connection.getTable(TableName.valueOf(CONTENT)); long timestamp=System.currentTimeMillis(); String rowkey=uid+"_"+timestamp; Put put = new Put(Bytes.toBytes(rowkey)); //四个值分别代表colfamily,columnlabel,timestamp,value put.add(Bytes.toBytes("cf"), Bytes.toBytes("content"),timestamp, Bytes.toBytes(content)); contentTBL.put(put); //b. 在微博内容接收邮件箱表对所有粉丝用户添加数据(订阅者) // 1. 查询 用户关系表 获取该用户的粉丝用户 HTableInterface relationsTBL=connection.getTable(TableName.valueOf(RELATIONS)); // get 'tablename','rowkey','cf','cq' //rowkey Get get=new Get(Bytes.toBytes(uid)); //cf get.addFamily(Bytes.toBytes("fans")); Result result = relationsTBL.get(get); List<byte[]> fans = new ArrayList<byte[]>(); //设置uid用户下所有的粉丝 for (Cell cell : result.rawCells()) { //RELATIONS表里的columnlabel成为RECEIVE_CONTENT_EMAIL的rowkey fans.add(CellUtil.cloneQualifier(cell)); } // 数据判断 if(fans.size() <= 0) return ; // 获取微博内容邮件箱表 HTableInterface rceTBL=connection.getTable(RECEIVE_CONTENT_EMAIL); List<Put> puts=new ArrayList<Put>(); for (byte[] fan : fans) { Put fanPut=new Put(fan); //设置rowkey fanPut.add(Bytes.toBytes("cf"), Bytes.toBytes(uid),timestamp, Bytes.toBytes(rowkey)); puts.add(fanPut); } rceTBL.put(puts); /** * cell : primary key {rowkey , column(family+label),version(timestamp)}:value */ } catch (IOException e) { e.printStackTrace(); }finally{ if(null!=connection){ try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } /** * 添加关注用户: * a. 在微博用户关系表中 添加新的好友关注(attends) b. 从被关注用户角度来说, 新增粉丝用户(fans) c. 微博邮件箱表添加关注用户发布的微博内容 * @param uid * 粉丝 ID * @param attends * 被关注用户ID */ public void addAttends(String uid, String ...attends){ //参数过滤 if(attends==null||attends.length<=0) return; /** * a.在微博用户关系表中 添加新的好友关注(attends) * (0001) ,(0002,0003) * rowkey column value * 0001 attends:0002,0002 * 0001 attends:0003,0003 * * rowkey column value * 0003 fans:0001,0001 * 0002 fans:0001,0001 * */ HConnection connection=null; try { connection=HConnectionManager.createConnection(conf); HTableInterface realtionsTBL=connection.getTable(RELATIONS); List<Put> puts=new ArrayList<Put>(); //a. 在微博用户关系表中 添加新的好友关注(attends) Put attendsPut=new Put(Bytes.toBytes(uid)); for (String attend : attends) { attendsPut.add(Bytes.toBytes("attends"), Bytes.toBytes(attend), Bytes.toBytes(attend)); //b. 从被关注用户角度来说, 新增粉丝用户(fans) Put fansPut=new Put(Bytes.toBytes(attend)); fansPut.add(Bytes.toBytes("fans"), Bytes.toBytes(uid), Bytes.toBytes(uid)); puts.add(fansPut); } puts.add(attendsPut); realtionsTBL.put(puts); //c. 微博邮件箱表添加关注用户发布的微博内容的rowkey /** * 1. 首先查询被关注用户ID发布的微博内容的rowkey * 单个被关注用户ID, --查询content ->微博内容的rowkey * 0001_xxx * 0001_aaa * 0002_yyy * 0002_zzz * 2. 将前面获取的rowkey列表 遍历出来在微博内容邮件表中正式添加数据 * */ HTableInterface contentTBL= connection.getTable(CONTENT); Scan scan =new Scan(); List<byte[]> rowkeys=new ArrayList<byte[]>(); for(String attend: attends){ //扫描表的rowkey,含有字符串("被关注用户ID_") RowFilter filter=new RowFilter(CompareOp.EQUAL, new SubstringComparator(attend+"_")); scan.setFilter(filter); ResultScanner result=contentTBL.getScanner(scan); // 迭代器遍历 Iterator<Result> itearor=result.iterator(); while(itearor.hasNext()){ Result r=itearor.next(); for(Cell cell:r.rawCells()){ rowkeys.add(CellUtil.cloneRow(cell)); } } } if(rowkeys.size()<= 0) return; // 2. 将前面获取的rowkey列表 遍历出来在微博内容邮件表中正式添加数据 HTableInterface rceTBL=connection.getTable(RECEIVE_CONTENT_EMAIL); List<Put> rcePuts=new ArrayList<Put>(); for (byte[] rk : rowkeys) { Put put =new Put(Bytes.toBytes(uid)); String rowkey=Bytes.toString(rk); // substring 包前不包后 String attend=rowkey.substring(0, rowkey.indexOf("_")); long timestamp=Long.parseLong(rowkey.substring(rowkey.indexOf("_")+1)); put.add(Bytes.toBytes("cf"), Bytes.toBytes(attend),timestamp, rk); rcePuts.add(put); } rceTBL.put(rcePuts); } catch (IOException e) { e.printStackTrace(); }finally{ if(null!=connection){ try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } /** *移除或者取消关注用户 a. 在微博用户关系表中 移除新的好友关注(attends) b. 从被关注用户角度来说, 删除粉丝用户(fans) c. 微博邮件箱表删除关注用户发布的微博内容 * @param uid * 粉丝用户ID * @param attends * 被关注用户ID */ public void removeAttends(String uid,String ...attends){ //参数过滤 if(attends==null||attends.length<=0) return; HConnection connection=null; try { connection=HConnectionManager.createConnection(conf); //a. 在微博用户关系表中 移除新的好友关注(attends) HTableInterface relationsTBL=connection.getTable(RELATIONS); List<Delete> deletes=new ArrayList<Delete>(); Delete attendsDelete =new Delete(Bytes.toBytes(uid)); for (String attend : attends) { attendsDelete.deleteColumn(Bytes.toBytes("attends"), Bytes.toBytes(attend)); //b. 从被关注用户角度来说, 删除粉丝用户(fans) Delete fansDelete=new Delete(Bytes.toBytes(attend)); fansDelete.deleteColumn(Bytes.toBytes("fans"), Bytes.toBytes(uid)); deletes.add(fansDelete); } deletes.add(attendsDelete); relationsTBL.delete(deletes); //c. 微博邮件箱表删除关注用户发布的微博内容 HTableInterface rceTBL=connection.getTable(RECEIVE_CONTENT_EMAIL); Delete rceDelete=new Delete(Bytes.toBytes(uid)); for(String attend:attends){ /** * Delete the latest version of the specified column. */ // rceDelete.deleteColumn(Bytes.toBytes("cf"), Bytes.toBytes(attend)); // Delete all versions of the specified column with a timestamp less than rceDelete.deleteColumns(Bytes.toBytes("cf"), Bytes.toBytes(attend), System.currentTimeMillis()+Integer.MAX_VALUE); } rceTBL.delete(rceDelete); } catch (IOException e) { e.printStackTrace(); }finally{ if(null!=connection){ try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } /** * 粉丝用户 去获取关注用户发布的微博内容: * a. 从微博内容邮件表中获取该用户其关注用户的微博内容的rowkey b. 根据上面获取到的微博内容的rowkey 获取微博内容 * @param uid * 粉丝用户ID */ public List<Message> getAttendsContens(String uid){ HConnection connection=null; List<Message> messages=new ArrayList<Message>(); try { connection=HConnectionManager.createConnection(conf); //a. 从微博内容邮件表中获取该用户其关注用户的微博内容的rowkey HTableInterface rceTBL=connection.getTable(RECEIVE_CONTENT_EMAIL); Get get=new Get(Bytes.toBytes(uid)); get.setMaxVersions(5); List<byte[]> rowkeys=new ArrayList<byte[]>(); Result result=rceTBL.get(get); for(Cell cell:result.rawCells()){ //CellUtil.cloneValue 获取value //CellUtil.cloneRow 获取rowkey //CellUtil.cloneQualifier 获取列名 //CellUtil.cloneFamily 获取到列族名 rowkeys.add(CellUtil.cloneValue(cell)); } //b. 根据上面获取到的微博内容的rowkey 获取微博内容 HTableInterface contentTBL =connection.getTable(CONTENT); List<Get> gets=new ArrayList<Get>(); for (byte[] rk : rowkeys) { Get g=new Get(rk); gets.add(g); } Result[] results=contentTBL.get(gets); for (Result res : results) { for(Cell cell:res.rawCells()){ Message message=new Message(); String rowkey=Bytes.toString(CellUtil.cloneRow(cell)); String userid=rowkey.substring(0, rowkey.indexOf("_")); String timestamp=rowkey.substring(rowkey.indexOf("_")+1); String content=Bytes.toString(CellUtil.cloneValue(cell)); message.setUid(userid); message.setTimestamp(timestamp); message.setContent(content); messages.add(message); } } } catch (IOException e) { e.printStackTrace(); }finally{ if(null!=connection){ try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } return messages; } public static void testInit(WeiBo wb){ wb.initNameSpace(); wb.initTable(); } public static void testPublishContent(WeiBo wb){ wb.publishContent("0001", "Tomorrow will be better!"); wb.publishContent("0001", "Tomorrow will be better!"); } public static void testAddAttends(WeiBo wb){ wb.publishContent("0008", "今天天气真不错!!!"); wb.publishContent("0009", "今天天气真不错!!!"); wb.addAttends("0001","0008","0009"); } public static void testRemoveAttends(WeiBo wb){ wb.removeAttends("0001", "0009"); } public static void testGetAttendsContents(WeiBo wb){ List<Message> messages=wb.getAttendsContens("0001"); for (Message message : messages) { System.out.println(message); } } public static void main(String[] args) { WeiBo wb=new WeiBo(); //testInit(wb); //testPublishContent(wb); testAddAttends(wb); //testRemoveAttends(wb); } }