wukong引擎源码分析之索引——part 2 持久化 直接set(key,docID数组)在kv存储里-阿里云开发者社区

开发者社区> 数据库> 正文

wukong引擎源码分析之索引——part 2 持久化 直接set(key,docID数组)在kv存储里

简介:

前面说过,接收indexerRequest的代码在index_worker.go里:

复制代码
func (engine *Engine) indexerAddDocumentWorker(shard int) {
    for {
        request := <-engine.indexerAddDocumentChannels[shard] //关键
        addInvertedIndex := engine.indexers[shard].AddDocument(request.document, request.dealDocInfoChan) // 向反向索引表(数组)中加入一个文档
        // save
        if engine.initOptions.UsePersistentStorage {
            for k, v := range addInvertedIndex {
                engine.persistentStorageIndexDocumentChannels[shard] <- persistentStorageIndexDocumentRequest{
                    typ:            "index",
                    keyword:        k,
                    keywordIndices: v,
                }
            }
        }

        atomic.AddUint64(&engine.numTokenIndexAdded,
            uint64(len(request.document.Keywords)))
        atomic.AddUint64(&engine.numDocumentsIndexed, 1)
    }
} 
复制代码

持久化的代码:engine/persistent_storage_worker.go

复制代码
package engine

import (
    "bytes"
    "encoding/binary"
    "encoding/gob"
    "github.com/huichen/wukong/core"
    "github.com/huichen/wukong/types"
    "sync"
    "sync/atomic"
)

type persistentStorageIndexDocumentRequest struct {
    typ string //"info"or"index"

    // typ=="info"时,以下两个字段有效
    docId   uint64
    docInfo *types.DocInfo

    // typ=="index"时,以下两个字段有效
    keyword        string
    keywordIndices *types.KeywordIndices
}

func (engine *Engine) persistentStorageIndexDocumentWorker(shard int) {
    for {
        request := <-engine.persistentStorageIndexDocumentChannels[shard]
        switch request.typ {
        case "info":
            // 得到key
            b := make([]byte, 10)
            length := binary.PutUvarint(b, request.docId)

            // 得到value
            var buf bytes.Buffer
            enc := gob.NewEncoder(&buf)
            err := enc.Encode(request.docInfo)
            if err != nil {
                atomic.AddUint64(&engine.numDocumentsStored, 1)
                return
            }

            // 将key-value写入数据库
            engine.dbs[shard][getDB(request.typ)].Set(b[0:length], buf.Bytes())
            atomic.AddUint64(&engine.numDocumentsStored, 1)

        case "index":
            // 得到key
            b := []byte(request.keyword)

            // 得到value
            var buf bytes.Buffer
            enc := gob.NewEncoder(&buf)
            err := enc.Encode(request.keywordIndices)
            if err != nil {
                return
            }

            // 将key-value写入数据库
            engine.dbs[shard][getDB(request.typ)].Set(b, buf.Bytes())
        }
    }
}

func (engine *Engine) persistentStorageRemoveDocumentWorker(docId uint64, shard int) {
    // 得到key
    b := make([]byte, 10)
    length := binary.PutUvarint(b, docId)

    // 从数据库删除该key
    engine.dbs[shard][getDB("info")].Delete(b[0:length])
}

func (engine *Engine) persistentStorageInitWorker(shard int) {
    var finish sync.WaitGroup
    finish.Add(2)
    // 恢复docInfo
    go func() {
        defer finish.Add(-1)
        engine.dbs[shard][getDB("info")].ForEach(func(k, v []byte) error {
            key, value := k, v
            // 得到docID
            docId, _ := binary.Uvarint(key)

            // 得到data
            buf := bytes.NewReader(value)
            dec := gob.NewDecoder(buf)
            var data types.DocInfo
            err := dec.Decode(&data)
            if err == nil {
                // 添加索引
                core.AddDocInfo(shard, docId, &data)
            }
            return nil
        })
    }()

    // 恢复invertedIndex
    go func() {
        defer finish.Add(-1)
        engine.dbs[shard][getDB("index")].ForEach(func(k, v []byte) error {
            key, value := k, v
            // 得到keyword
            keyword := string(key)

            // 得到data
            buf := bytes.NewReader(value)
            dec := gob.NewDecoder(buf)
            var data types.KeywordIndices
            err := dec.Decode(&data)
            if err == nil {
                // 添加索引
                core.AddKeywordIndices(shard, keyword, &data)
            }
            return nil
        })
    }()
    finish.Wait()
    engine.persistentStorageInitChannel <- true
}
复制代码

可以看到,倒排索引存在DB里是丑陋的,直接set(key, value) 其中,key是倒排列表的关键字,而value是doc id list也就是数组。

如果索引比较多,每次去DB set是非常耗时的,尤其针对同一个keyword有doc id插入时!

总之,wukong对于持久化的做法很丑陋!








本文转自张昺华-sky博客园博客,原文链接:http://www.cnblogs.com/bonelee/p/6582163.html,如需转载请自行联系原作者

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

分享:
数据库
使用钉钉扫一扫加入圈子
+ 订阅

分享数据库前沿,解构实战干货,推动数据库技术变革

其他文章