基于HBase构建千亿级文本数据相似度计算与快速去重系统

简介: 前言随着大数据时代的到来,数据信息在给我们生活带来便利的同时,同样也给我们带来了一系列的考验与挑战。本文主要介绍了基于 Apache HBase 与 Google SimHash 等多种算法共同实现的一套支持百亿级文本数据相似度计算与快速去重系统的设计与实现。该方案在公司业务层面彻底解决了多主题海量文本数据所面临的存储与计算慢的问题。一. 面临的问题1. 如何选择文本的相似度计算或去重算法?常见的有余弦夹角算法、欧式距离、Jaccard 相似度、最长公共子串、编辑距离等。这些算法对于待比较的文本数据不多时还比较好用,但在海量数据背景下,如果每天产生的数据以千万计算,我们如何对于这些海

前言
随着大数据时代的到来,数据信息在给我们生活带来便利的同时,同样也给我们带来了一系列的考验与挑战。本文主要介绍了基于 Apache HBase 与 Google SimHash 等多种算法共同实现的一套支持百亿级文本数据相似度计算与快速去重系统的设计与实现。该方案在公司业务层面彻底解决了多主题海量文本数据所面临的存储与计算慢的问题。

一. 面临的问题

  1. 如何选择文本的相似度计算或去重算法?

常见的有余弦夹角算法、欧式距离、Jaccard 相似度、最长公共子串、编辑距离等。这些算法对于待比较的文本数据不多时还比较好用,但在海量数据背景下,如果每天产生的数据以千万计算,我们如何对于这些海量千万级的数据进行高效的合并去重和相似度计算呢?

  1. 如何实现快速计算文本相似度或去重呢?

如果我们选好了相似度计算和去重的相关算法,那我们怎么去做呢?如果待比较的文本数据少,我们简单遍历所有文本进行比较即可,那对于巨大的数据集我们该怎么办呢?遍历很明显是不可取的。

  1. 海量数据的存储与快速读写

二. SimHash 算法引入
基于问题一,我们引入了 SimHash 算法来实现海量文本的相似度计算与快速去重。下面我们简单了解下该算法。

  1. 局部敏感哈希

在介绍 SimHash 算法之前,我们先简单介绍下局部敏感哈希是什么。局部敏感哈希的基本思想类似于一种空间域转换思想,LSH 算法基于一个假设,如果两个文本在原有的数据空间是相似的,那么分别经过哈希函数转换以后的它们也具有很高的相似度;相反,如果它们本身是不相似的,那么经过转换后它们应仍不具有相似性。

局部敏感哈希的最大特点就在于保持数据的相似性,举一个小小的例子说明一下:对A文章微调后我们称其为B文章(可能只是多了一个‘的’字),如果此时我们计算两篇文章的 MD5 值,那么必将大相径庭。而局部敏感哈希的好处是经过哈希函数转换后的值也只是发生了微小的变化,即如果两篇文章相似度很高,那么在算法转换后其相似度也会很高。

MinHash 与 SimHash 算法都属于局部敏感哈希,一般情况若每个 Feature 无权重,则 MinHash 效果优于 SimHash 有权重时 SimHash 合适。长文本使用 Simhash 效果很好,短文本使用 Simhash 准备度不高。

  1. SimHash 算法

SimHash 是 Google 在2007年发表的论文《Detecting Near-Duplicates for Web Crawling 》中提到的一种指纹生成算法或者叫指纹提取算法,被 Google 广泛应用在亿级的网页去重的 Job 中,其主要思想是降维,经过simhash降维后,可能仅仅得到一个长度为32或64位的二进制由01组成的字符串。而一维查询则是非常快速的。

SimHash的工作原理我们这里略过,大家可以简单理解为:我们可以利用SimHash算法为每一个网页/文章生成一个长度为32或64位的二进制由01组成的字符串(向量指纹),形如:1000010010101101111111100000101011010001001111100001001011001011。

  1. 海明距离

两个码字的对应比特取值不同的比特数称为这两个码字的海明距离。在一个有效编码集中,任意两个码字的海明距离的最小值称为该编码集的海明距离。举例如下:10101和00110从第一位开始依次有第一位、第四、第五位不同,则海明距离为3。

在 google 的论文给出的数据中,64位的签名,在海明距离为3的情况下,可认为两篇文档是相似的或者是重复的,当然这个值只是参考值。

这样,基于 SimHash 算法,我们就可以将百亿千亿级的高维特征文章转变为一维字符串后再通过计算其海明距离判断网页/文章的相似度,可想效率必将大大提高。

三. 效率问题
到这里相似度问题基本解决,但是按这个思路,在海量数据几百亿的数量下,效率问题还是没有解决的,因为数据是不断添加进来的,不可能每来一条数据,都要和全库的数据做一次比较,按照这种思路,处理速度会越来越慢,线性增长。

这里,我们要引入一个新的概念:抽屉原理,也称鸽巢原理。下面我们简单举例说一下:

桌子上有四个苹果,但只有三个抽屉,如果要将四个苹果放入三个抽屉里,那么必然有一个抽屉中放入了两个苹果。如果每个抽屉代表一个集合,每一个苹果就可以代表一个元素,假如有n+1个元素放到n个集合中去,其中必定有一个集合里至少有两个元素。

抽屉原理就是这么简单,那如果用它来解决我们海量数据的遍历问题呢?

针对海量数据的去重效率,我们可以将64位指纹,切分为4份16位的数据块,根据抽屉原理在海明距离为3的情况,如果两个文档相似,那么它必有一个块的数据是相等的。

那也就是说,我们可以以某文本的 SimHash 的每个16位截断指纹为 Key,Value 为 Key 相等时文本的 SimHash 集合存入 K-V 数据库即可,查询时候,精确匹配这个指纹的4个16位截断指纹所对应的4个 SimHash 集合即可。

如此,假设样本库,有2^37 条数据(1375亿数据),假设数据均匀分布,则每个16位(16个01数字随机组成的组合为2^16 个)倒排返回的最大数量为 (2^37) 4 / (2^16) =8388608个候选结果,4个16位截断索引,总的结果为:48388608=33554432,约为3356万,通过 这样一来的降维处理,原来需要比较1375亿次,现在只需要比较3356万次即可得到结果,这样以来大大提升了计算效率。

根据网上测试数据显示,普通 PC 比较1000万次海明距离大约需要 300ms,也就是说3356万次(1375亿数据)只需花费3356/1000*0.3=1.0068s。那也就是说对于千亿级文本数据(如果每个文本1kb,约100TB数据)的相似度计算与去重工作我们最多只需要一秒的时间即可得出结果。

四. HBase 存储设计
饶了这么大一周,我们终于将需要讲明的理论知识给大家过了一遍。为了阐述的尽量清晰易懂,文中很多理论知识的理解借鉴了大量博主大牛的博客,原文链接已在文末附上,有不太明白的地方快快跪拜大牛们的博客吧,哈哈!

下面我们着重介绍一下 HBase 存储表的设计与实现。

基于上文我们可以大概知道,如果将64位指纹平分四份,海明距离取3,那么必有一段16位截取指纹的数据是相等的。而每一段16位截取指纹对应一个64位指纹集合,且该集合中的每个64位指纹必有一段16位截取指纹与该段16位截取指纹重合。我们可以简单表示(以8位非01指纹举例)为:

key value(set) 12 [12345678,12345679] 23 [12345678,12345679,23456789]
那如果基于 HBase 去实现的话,我们大概对比三种可能的设计方案。

方案一:
以 16 位指纹作为 HBase 数据表的行键,将每一个与之可能相似的64位指纹作为 HBase 的列,列值存文章id值,即构建一张大宽表。如下表所示(以8位非01指纹举例):

rowkey column1 column2 column3 ...
实际数据表可能是这个样子:

rowkey 12345678 32234567 23456789 12456789 ... 12 1102101 1102102 ... 23 1102104 1102105 ... 34 1102106 ...
那其实这样设计表的话该 HBase 表 Rowkey 的个数就是一个确定的数值:16个01数字随机组成的组合为2^16 个。也就是共2^16=65536行。 列的个数其实也是固定的,即2^64=184467440737亿万列。

此时,比如说我们比较56431234与库中所有文本的相似度,只需拉去rowkey in (56,43,12,34) 四行数据遍历每行列,由于 HBase 空值不进行存储,所有只会遍历存在值的列名。

由上文我们计算出1350亿数据如果平均分布的话每行大约有839万列,且不说我们的数据量可能远远大于千亿级别,也不说以64位字符串作为列名所占的存储空间有多大,单单千亿级数据量 HBase 每行就大约839万列,虽说HBase号称支持千万行百万列数据存储,但总归还是设计太不合理。数据不会理想化均匀分布,总列数高达184467440737亿万列也令人堪忧。

方案二:
以 16 位指纹与64位指纹拼接后作为 HBase 数据表的行键,该表只有一列,买手机游戏平台列值存文章id值,即构建一张大长表。如下表所示(以8位非01指纹举例):

rowkey id
实际数据表可能是这个样子:

rowkey id 12_12345678 1 34_12345678 1 56_12345678 1 78_12345678 1 34_22345678 2 23_12235678 3
如此设计感觉要比第一种方法要好一些,每一篇文章会被存为四行。但同样有诸多缺点,一是 Rowkey 过长,二是即便我们通过某种转变设计解决了问题一,那获取数据时我们也只能将 Get 请求转为四个Scan并发扫描+StartEnKey 去扫描表获取数据。当然,如果想实现顺序扫描还可能存在热点问题。在存储上,也造成了数据大量冗余。

方案三:
在真实生产环境中,我们采取该方案来避免上述两个方案中出现的问题与不足。下面简单介绍一下(如果您有更好更优的方案,欢迎留言,先表示感谢!)

简言之呢,就是自己在 HBase 端维护了一个 Set 集合(协处理器),并以 Json 串进行存储,格式如下:

{

"64SimHash1":"id1",
"64SimHash2":"id2",
...
...

}
基于公司存在多种主题类型的文本数据,且互相隔离,去重与相似度计算也是分主题进行,我们的 Rowkey 设计大致如下:

Rowkey = HashNumber_ContentType_16SimHash (共24位)

HashNumber: 为防热点,对表进行Hash预分区(64个预分区),占2个字符 计算公式如下:String.format("%02x", Math.abs(key.hashCode()) % 64)
ContentType :内容主题类型,占4个字符
16SimHash: 16位 SimHash 截取指纹,由01组成
表结构大致如下:

rowkey si s0 s1 s2 s3 ... 01_news_010101010101010101 value 1 Json 串 ... 02_news_010101010101010110 value 2 Json 串 Json 串 ... 03_news_100101010101010110 value 3 Json 串 Json 串 Json 串 ... 01_xbbs_010101010101010101 value 1 Json 串 ...
si:客户端传递过来的欲存储的值,由64位 Simhash 与 Id 通过双下划线拼接而成,诸如 Simhash__Id 的形式。 s0:记录该行数据共有多少个 Set 集合,每一个 Set 集合存储10000个K-V对儿(约1MB)。 s1:第一个 Set 集合,Json 串存储,如果 Size > 10000 ,之后来的数据将存入s2。 s2:以此类推。

当然最核心的部分是s1/s2/s3 中 Json 串中要排重。最简单的办法无非是每次存入数据前先将所有 Set 集合中的数据读到客户端,将欲存的数据与集合中所有数据比对后再次插入。这将带来大量往返IO开销,影响写性能。因此,我们在此引入了 HBase 协处理器技术来规避这个问题,即在服务端完成所有排重操作。大致代码如下:

package com.learn.share.scenarios.observers;

import com.google.gson.Gson;
import com.google.gson.JsonObject;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.List;

/**

  • 基于协处理器构建百亿级文本去重系统

*/
public class HBaseSimHashSetBuildSystem extends BaseRegionObserver {

private Logger logger = LoggerFactory.getLogger(HBaseSimHashSetBuildSystem.class);


@Override
public void start(CoprocessorEnvironment e) throws IOException {
    logger.info("Coprocessor opration start...");
}

/**
 *
 * @param e
 * @param put
 * @param edit
 * @param durability
 * @throws IOException
 */
@Override
public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {
    // test flag
    logger.info("do something before Put Opration...");

    List<Cell> cells = put.get(Bytes.toBytes("f"), Bytes.toBytes("si"));
    if (cells == null || cells.size() == 0) {
        return;
    }
    String simhash__itemid = Bytes.toString(CellUtil.cloneValue(cells.get(0)));
    if (StringUtils.isEmpty(simhash__itemid)||simhash__itemid.split("__").length!=2){
        return;
    }
    String simhash = simhash__itemid.trim().split("__")[0];
    String itemid = simhash__itemid.trim().split("__")[1];

    // 获取Put Rowkey
    byte[] row = put.getRow();
    // 通过Rowkey构造Get对象
    Get get = new Get(row);
    get.setMaxVersions(1);
    get.addFamily(Bytes.toBytes("f"));
    Result result = e.getEnvironment().getRegion().get(get);
    Cell columnCell = result.getColumnLatestCell(Bytes.toBytes("f"), Bytes.toBytes("s0")); // set size
    if (columnCell == null) {
        // 第一次存储数据,将size初始化为1
        logger.info("第一次存储数据,将size初始化为1");

        JsonObject jsonObject = new JsonObject();
        jsonObject.addProperty(simhash,itemid);
        Gson gson = new Gson();
        String json = gson.toJson(jsonObject);

        put.addColumn(Bytes.toBytes("f"),Bytes.toBytes("s1"), Bytes.toBytes(json)); // json 数组
        put.addColumn(Bytes.toBytes("f"),Bytes.toBytes("s0"), Bytes.toBytes("1"));  // 初始化
    }else {
        byte[] sizebyte = CellUtil.cloneValue(columnCell);
        int size = Integer.parseInt(Bytes.toString(sizebyte));
        logger.info("非第一次存储数据 ----> Rowkey `"+Bytes.toString(row)+"` simhash set size is : "+size +", the current value is : "+simhash__itemid);
        for (int i = 1; i <= size; i++) {
            Cell cell1 = result.getColumnLatestCell(Bytes.toBytes("f"), Bytes.toBytes("s"+i));
            String jsonBefore = Bytes.toString(CellUtil.cloneValue(cell1));
            Gson gson = new Gson();
            JsonObject jsonObject = gson.fromJson(jsonBefore, JsonObject.class);
            int sizeBefore = jsonObject.entrySet().size();
            if(i==size){
                if(!jsonObject.has(simhash)){
                    if (sizeBefore==10000){
                        JsonObject jsonone = new JsonObject();
                        jsonone.addProperty(simhash,itemid);
                        String jsonstrone = gson.toJson(jsonone);
                        put.addColumn(Bytes.toBytes("f"),Bytes.toBytes("s"+(size+1)), Bytes.toBytes(jsonstrone)); // json 数组
                        put.addColumn(Bytes.toBytes("f"),Bytes.toBytes("s0"), Bytes.toBytes((size+1)+""));  // 初始化
                    }else {
                        jsonObject.addProperty(simhash,itemid);
                        String jsonAfter = gson.toJson(jsonObject);
                        put.addColumn(Bytes.toBytes("f"),Bytes.toBytes("s"+size), Bytes.toBytes(jsonAfter)); // json 数组
                    }
                }else {
                    return;
                }
            }else{
                if(!jsonObject.has(simhash)){
                    continue;
                }else {
                    return;
                }
            }
        }
    }
}

}
如此,当我们需要对某一文本指纹与库中数据进行比对时,只需一个Table.Get(List) 操作即可返回所有的数据,然后基于s0依次获取各个 Set 集合中的数据即可。

下面我们算一笔账,假设我们某主题类型数据依然有 2^37 条数据(1375亿数据),假设数据均匀分布,则每个16位(16个01数字随机组成的组合为2^16 个)倒排返回的最大数量为 (2^37) * 4 / (2^16) =8388608个候选结果,即每行约839个 Set 集合,每个Set 集合大约1M 的话,数据存储量也必然不会太大。

你如果有十种不同主题的数据,HBase 行数无非也才 (2^16)*10 = 655360 行而已。

如果再加上 Snappy 压缩呢? 如果再加上 Fast-Diff 编码呢? 如果再开启 Mob 对象存储呢? 每个 Set 是不是可以存10万个键值对?每行只需90个 Set 集合。

也或许,如果数据量小的话,使用 Redis 是不是更好呢?

总之,优化完善和不完美的地方还很多,本文也就简单叙述到此,如果您有好的建议或是不同看法,欢迎留言哦!感恩~ 晚安各位~~

相关实践学习
lindorm多模间数据无缝流转
展现了Lindorm多模融合能力——用kafka API写入,无缝流转在各引擎内进行数据存储和计算的实验。
云数据库HBase版使用教程
&nbsp; 相关的阿里云产品:云数据库 HBase 版 面向大数据领域的一站式NoSQL服务,100%兼容开源HBase并深度扩展,支持海量数据下的实时存储、高并发吞吐、轻SQL分析、全文检索、时序时空查询等能力,是风控、推荐、广告、物联网、车联网、Feeds流、数据大屏等场景首选数据库,是为淘宝、支付宝、菜鸟等众多阿里核心业务提供关键支撑的数据库。 了解产品详情:&nbsp;https://cn.aliyun.com/product/hbase &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
6天前
|
缓存 监控 Shell
如何使用 HBase Shell 进行数据的实时监控和备份?
如何使用 HBase Shell 进行数据的实时监控和备份?
|
6天前
|
Shell 分布式数据库 Hbase
如何使用 HBase Shell 进行数据的批量导入和导出?
如何使用 HBase Shell 进行数据的批量导入和导出?
|
4月前
|
存储 分布式数据库 数据库
Hbase学习二:Hbase数据特点和架构特点
Hbase学习二:Hbase数据特点和架构特点
80 0
|
2月前
|
存储 分布式计算 分布式数据库
深入理解Apache HBase:构建大数据时代的基石
在大数据时代,数据的存储和管理成为了企业面临的一大挑战。随着数据量的急剧增长和数据结构的多样化,传统的关系型数据库(如RDBMS)逐渐显现出局限性。
336 12
|
4月前
|
缓存 监控 Shell
使用 HBase Shell 进行数据的实时监控和备份
使用 HBase Shell 进行数据的实时监控和备份
|
4月前
|
Shell 分布式数据库 Hbase
使用 HBase Shell 进行数据的批量导入和导出
使用 HBase Shell 进行数据的批量导入和导出
586 6
|
3月前
|
存储 分布式计算 分布式数据库
《HBase MapReduce之旅:我的学习笔记与心得》——跟随我的步伐,一同探索HBase世界,揭开MapReduce的神秘面纱,分享那些挑战与收获,让你在数据的海洋里畅游无阻!
【8月更文挑战第17天】HBase是Apache顶级项目,作为Bigtable的开源版,它是一个非关系型、分布式数据库,具备高可扩展性和性能。结合HDFS存储和MapReduce计算框架,以及Zookeeper协同服务,HBase支持海量数据高效管理。MapReduce通过将任务拆解并在集群上并行执行,极大提升处理速度。学习HBase MapReduce涉及理解其数据模型、编程模型及应用实践,虽然充满挑战,但收获颇丰,对职业发展大有裨益。
45 0
|
4月前
|
存储 Java 分布式数据库
HBase构建图片视频数据的统一存储检索
HBase构建图片视频数据的统一存储检索
|
6月前
|
消息中间件 关系型数据库 MySQL
实时计算 Flink版操作报错合集之使用 Event Time Temporal Join 关联多个 HBase 后,Kafka 数据的某个字段变为 null 是什么原因导致的
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
127 0
|
6月前
|
SQL 消息中间件 Kafka
实时计算 Flink版操作报错合集之使用 Event Time Temporal Join 关联多个 HBase 后,Kafka 数据的某个字段变为 null 是什么原因导致的
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
87 0