LevelDB介绍
Leveldb是一个google实现的非常高效的kv数据库,能够支持billion级别的数据量。 在这个数量级别下还有着非常高的性能。
LevelDB 是单进程的服务,性能非常之高,在一台4个Q6600的CPU机器上,每秒钟写数据超过40w,而随机读的性能每秒钟超过10w。
内部LSM 树算法实现。
LSM 大致结构如上图所示。
LSM 树而且通过批量存储技术规避磁盘随机写入问题。 LSM 树的设计思想非常朴素, 它的原理是把一颗大树拆分成N棵小树, 它首先写入到内存中(内存没有寻道速度的问题,随机写的性能得到大幅提升),在内存中构建一颗有序小树,随着小树越来越大,内存的小树会flush到磁盘上。磁盘中的树定期可以做 merge 操作,合并成一棵大树,以优化读性能【读数据的过程可能需要从内存 memtable 到磁盘 sstfile 读取多次,称之为读放大】。LevelDB 的 LSM 体现在多 level 文件格式上,最热最新的数据尽在 L0 层,数据在内存中,最冷最老的数据尽在 LN 层,数据在磁盘或者固态盘上。还有一种日志文件叫做 manifest,用于记录对 sstfile 的更改,可以认为是 LevelDB 的 GIF。
LevelDB是Google的 Jeff Dean和Sanjay Ghemawat设计开发的key-value存储引擎。
LevelDB底层存储利用了LSM tree的思想, RocksDB是Facebook基于LevelDB开发的存储引擎,针对LevelDB做了很多优化,但是大部分模块的实现机制是一样的。
LevelDB是一个持久化存储的KV系统,和Redis这种内存型的KV系统不同,LevelDB不会像Redis一样狂吃内存,而是将大部分数据存储到磁盘上。LevleDB在存储数据时,是根据记录的key值有序存储的,就是说相邻的key值在存储文件中是依次顺序存储的,而应用可以自定义key大小比较函数,LevleDB会按照用户定义的比较函数依序存储这些记录。
像大多数KV系统一样,LevelDB的操作接口简单,基本操作包括写记录、读记录以及删除记录。另外,LevelDB支持数据快照(snapshot)功能,使得读取操作不受写操作影响,可以在读操作过程中始终看到一致的数据。除此之外,LevelDB还支持数据压缩等操作,这对于减小存储空间以及增快IO效率都有直接的帮助。
对LevelDB进一步封装的理由
虽然LevelDB很强大,但是有点儿底层的味道了,操作不够友好,就几个简单的put,get接口。比如想实现个持久化的顺序操作的队列,想按顺序存储和读取记录,比如不用关心底层的操作,仅使用接口。比如如果哪天想替换底层存储,可以灵活一点儿,不用改动业务......
这里对LevelDB记录存储做一个封装,并实现了持久化的消息队列。同时也可以用在嵌入式linux上,替代sqllite做一些记录存储的模块功能。
封装后可供使用的接口:
// Recorder 操作记录的接口声明 ``` type Recorder interface { // 初始化记录区(会清空所有数据!) InitRecAreas() error // 打开记录区(开机必须先打开一次) OpenRecAreas() (err error) // 保存记录,相当于入队push操作 areaID,存储区ID,取值从1--至--MAXRECAREAS(相当于一个表) SaveRec(areaID RecArea, data interface{}, recType int) (id int, err error) // 更新记录 UpdateRec(areaID RecArea, recID int, data interface{}, recType int) (id int, err error) // 删除记录(相当于出队pop操作) DeleteRec(areaID RecArea, num int) (err error) // 获取未上传记录数量 GetNoUploadNum(areaID RecArea) int // 按数据库中的ID读取一条记录 ReadRecByID(areaID RecArea, id int) (p *Records, err error) // 顺序读取未上传的记录(相当于读取队列开头的数据) ReadRecNotServer(areaID RecArea, sn int) (p *Records, err error) // 倒数读取记录(相当于从队列末尾开始读取)(如sn=1代表最后一次写入的记录) ReadRecWriteNot(areaID RecArea, sn int) (p *Records, err error) // 最后一条记录流水 GetLastRecNO(areaID RecArea) int // 获取当前的读、写ID(获取队列的尾和头部的位置) GetReadWriteID(areaID RecArea) (rid, wid int) } ```
实现原理
单独维护一个key,类似于目录的概念,作为全局使用,记录当前的写的位置,读的位置,是否是循环覆盖写。
如图所示,RecDirTB相当于目录表,这里分配了三个对应可以有三个不同的表/存储区/队列供使用。
sn是记录流水号,wid当前写的位置,rid当前读(消费)的位置。flag是否循环覆盖写标识。
可指定容量,比如为100W,这样可做到记录循环覆盖写,记录留有底可查。
对应的记录区1的表名为Rec01TB,,后面的|xx为顺序存储的记录的id.
value为存储的json格式的内容。
格式为id,sn(记录流水),type(记录类型),time(操作时间),data(记录内容,任意的json格式数据),ext,res留作备注用。
测试的写入性能,封装后测试,并发写入1000条记录,总共耗时30ms左右。
github地址:https://github.com/yangyongzhen/dbmod
简单的使用demo:
package main import "fmt" var ( recApi dbmod.Recorder dataType int ) // RequestQrcode模拟写入的数据内容json type RequestQrcode struct { ChanNO string `json:"chanNO"` TermID string `json:"termID"` Qrcode string `json:"qrcode"` Money uint32 `json:"money"` Recsn uint32 `json:"recsn"` Orderno string `json:"orderno"` Dealtime string `json:"dealtime"` } func main() { //每次必须先打开存储区 err := recApi.OpenRecAreas() if err != nil { fmt.Printf("OpenRecAreas error,%s\n", err.Error()) } data := RequestQrcode{} data.ChanNO = "YS_CHANaaa" data.TermID = "12345678" data.Recsn = uint32(sn) data.Qrcode = "6225882618789" data.Money = 1 dataType = 0x0A // 按队列顺序写入一条记录,data为interface{},会序列化为json存储 id, err := recApi.SaveRec(dbmod.RecArea01, data, datatype) if err != nil { fmt.Printf("SaveRec error,%s\n", err.Error()) } //按队列顺序读取一条记录 rec, err := recApi.ReadRecNotServer(dbmod.RecArea01, 1) if err != nil { fmt.Printf("ReadRecNotServer error,%s\n", err.Error()) } fmt.Printf("rec:%#v\n", rec) //按队列顺序删除一条记录(注:只更新标记) recApi.DeleteRec(dbmod.RecArea01, 1) //获取队列中未上传/消费的记录数量 num := recApi.GetNoUploadNum(dbmod.RecArea01) fmt.Printf("GetNoUploadNum:%d\n", num) }
部分源码:
package dbmod import ( "encoding/json" "errors" "fmt" "log" "sync" "time" ) var ( //IsDebug 是否调试 IsDebug = true recDir [MAXRECAREAS]RecDir lockSave = sync.Mutex{} lockDel = sync.Mutex{} once sync.Once singleintance *Records ) // Records ... type Records struct { ID int `json:"id"` RecNo int `json:"sn"` RecType int `json:"type" ` RecTime string `json:"time" ` Data interface{} `json:"data" ` Ext string `json:"ext" ` Res string `json:"res" ` } // InitRecAreas 初始化记录存储区 func (rec Records) InitRecAreas() error { // 清空数据 err := DelAllData() if err != nil { log.Fatal(err.Error()) return err } // 初始化目录表 err = InitRecDir() if err != nil { log.Fatal(err.Error()) return err } return err } // OpenRecAreas 打开记录存储区,每次开机,需要先打开一下 func (rec *Records) OpenRecAreas() (err error) { //加载RecDir for i := 0; i < MAXRECAREAS; i++ { log.Printf("LoadDirs %02d \n", i+1) err = recDir[i].LoadDirs(RecArea(i) + 1) if err != nil { log.Println(err.Error()) return } log.Printf("LoadDirs %02d ok!\n", i+1) } //log.Println(recDir) return err } func saveRecToLDB(areaID RecArea, rec *Records, wid int) (id int, err error) { t := time.Now() rec.RecTime = t.Format("20060102150405") rec.ID = wid key := fmt.Sprintf("Rec%02dTB|%d", areaID, wid) bv, err := json.Marshal(rec) if err != nil { log.Println("saveRecToLDB Marshal Error:", err) return id, err } err = PutData(key, bv) if err != nil { log.Println("saveRecToLDB PutData Error:", err) return id, err } return id, err } // SaveRec 保存记录 func (rec *Records) SaveRec(areaID RecArea, data interface{}, recType int) (id int, err error) { lockSave.Lock() defer lockSave.Unlock() //log.Printf("SaveRec,area=%02d \n", areaID) if (areaID <= 0) || (areaID > MAXRECAREAS) { err = fmt.Errorf("area id %02d is not right,must between 1 and %02d", areaID, MAXRECAREAS) log.Println(err.Error()) return } rec.RecNo = recDir[areaID-1].RecNo rec.Data = data rec.RecType = recType //记录是否存储满,判断 if (recDir[areaID-1].WriteID + 1) > (MAXRECCOUNTS) { if recDir[areaID-1].ReadID == 0 { err = fmt.Errorf("rec area %02d is full", areaID) log.Println(err.Error()) return } if (recDir[areaID-1].WriteID + 1 - (MAXRECCOUNTS)) == recDir[areaID-1].ReadID { err = fmt.Errorf("rec area %02d is full", areaID) log.Println(err.Error()) return } //保存记录 recDir[areaID-1].RecNo++ recDir[areaID-1].WriteID = 1 recDir[areaID-1].Flag = "1" id = 1 _, err = saveRecToLDB(areaID, rec, id) if err != nil { log.Println("saveRecToLDB Error:", err) return } err = recDir[areaID-1].UpdateDirs(areaID) if err != nil { log.Println("SaveRec UpdateDirs Error:", err) return } //log.Printf("SaveRec,area=%02d ok!\n", areaID) return id, err } if recDir[areaID-1].Flag == "1" { //记录是否满判断 if (recDir[areaID-1].WriteID + 1) == recDir[areaID-1].ReadID { err = fmt.Errorf("rec area %02d is full", areaID) log.Println(err.Error()) return } rec.RecNo += 1 id = recDir[areaID-1].WriteID + 1 _, err = saveRecToLDB(areaID, rec, id) if err != nil { log.Println("saveRecToLDB Error:", err) return } recDir[areaID-1].RecNo++ recDir[areaID-1].WriteID = id err = recDir[areaID-1].UpdateDirs(areaID) if err != nil { log.Fatal(err.Error()) return 0, err } //log.Printf("SaveRec,area=%02d ok!\n", areaID) return id, err } rec.RecNo += 1 id = recDir[areaID-1].WriteID + 1 _, err = saveRecToLDB(areaID, rec, id) if err != nil { log.Println("saveRecToLDB Error:", err) return } recDir[areaID-1].RecNo++ recDir[areaID-1].WriteID = id err = recDir[areaID-1].UpdateDirs(areaID) if err != nil { log.Fatal(err.Error()) return 0, err } //log.Printf("SaveRec,area=%02d ok!\n", areaID) return id, err } // UpdateRec 更新记录 func (rec *Records) UpdateRec(areaID RecArea, recID int, data interface{}, recType int) (id int, err error) { if (areaID <= 0) || (areaID > MAXRECAREAS) { err = fmt.Errorf("area id %02d is not right,must between 1 and %02d", areaID, MAXRECAREAS) log.Println(err.Error()) return } rec.Data = data rec.RecType = recType id, err = saveRecToLDB(areaID, rec, recID) return id, err } // DeleteRec 删除记录(并不是真正删除表里记录,而是清除该记录的上传标记) // areaID:记录区 num:删除的数量 func (rec Records) DeleteRec(areaID RecArea, num int) (err error) { lockDel.Lock() defer lockDel.Unlock() if (areaID <= 0) || (areaID > MAXRECAREAS) { err = errors.New("area id is not right") log.Fatal(err.Error()) return } id := recDir[areaID-1].ReadID //如果写的位置等于读的位置,说明记录已上传完,没有要删除的了 if recDir[areaID-1].WriteID == recDir[areaID-1].ReadID { return } //如果要删除的数量大于了最大的记录数 if (id + num) > MAXRECCOUNTS { if (id + num - MAXRECCOUNTS) > recDir[areaID-1].WriteID { recDir[areaID-1].ReadID = recDir[areaID-1].WriteID err = recDir[areaID-1].UpdateDirs(areaID) if err != nil { log.Fatal(err.Error()) return err } return } //更新读指针(读的位置) recDir[areaID-1].ReadID = id + num - MAXRECCOUNTS err = recDir[areaID-1].UpdateDirs(areaID) if err != nil { log.Fatal(err.Error()) return err } return } //如果当前写的位置大于读的位置 if recDir[areaID-1].WriteID > recDir[areaID-1].ReadID { if id+num > recDir[areaID-1].WriteID { //更新读指针(读的位置) recDir[areaID-1].ReadID = recDir[areaID-1].WriteID err = recDir[areaID-1].UpdateDirs(areaID) if err != nil { log.Fatal(err.Error()) return err } return } } //更新读指针(读的位置) recDir[areaID-1].ReadID = id + num err = recDir[areaID-1].UpdateDirs(areaID) if err != nil { log.Fatal(err.Error()) return err } return } //GetNoUploadNum 获取未上传记录数量 func (rec Records) GetNoUploadNum(areaID RecArea) int { num := 0 if recDir[areaID-1].WriteID == recDir[areaID-1].ReadID { num = 0 return num } if recDir[areaID-1].Flag != "1" { num = int(recDir[areaID-1].WriteID - recDir[areaID-1].ReadID) } else { if recDir[areaID-1].WriteID > recDir[areaID-1].ReadID { num = int(recDir[areaID-1].WriteID - recDir[areaID-1].ReadID) } else { num = int(MAXRECCOUNTS - recDir[areaID-1].ReadID + recDir[areaID-1].WriteID) } } return num } // ReadRecByID 按数据库ID读取记录 func (rec Records) ReadRecByID(areaID RecArea, rid int) (p *Records, err error) { var rec1 Records if (areaID <= 0) || (areaID > MAXRECAREAS) { err = errors.New("area id is not right") log.Fatal(err.Error()) return } key := fmt.Sprintf("Rec%02dTB|%d", areaID, rid) bv, err := GetData(key) err = json.Unmarshal(bv, &rec1) if err != nil { log.Println("ReadRecByID Unmarshal Error:", key, err) } return &rec1, nil } //ReadRecNotServer 读取未上传的记录数据,顺序读取第SN条未上传的记录 //sn取值 1-到-->未上传记录数目 func (rec Records) ReadRecNotServer(areaID RecArea, sn int) (p *Records, err error) { if (areaID <= 0) || (areaID > MAXRECAREAS) { err = errors.New("area id is not right") log.Fatal(err.Error()) return } id := recDir[areaID-1].ReadID //fmt.Printf("id=%d\n", id) if (int(id) + sn) > MAXRECCOUNTS { if int(id)+sn-MAXRECCOUNTS > int(recDir[areaID-1].WriteID) { return nil, errors.New("no records") } p, err = rec.ReadRecByID(areaID, int(id)+sn-MAXRECCOUNTS) } else { if recDir[areaID-1].ReadID <= recDir[areaID-1].WriteID { if (int(id) + sn) > int(recDir[areaID-1].WriteID) { return nil, errors.New("no records") } } p, err = rec.ReadRecByID(areaID, int(recDir[areaID-1].ReadID)+sn) } return p, err } // ReadRecWriteNot 倒数读取第SN条写入的记录 //读取一条记录 倒数读取第SN条写入的记录 func (rec Records) ReadRecWriteNot(areaID RecArea, sn int) (p *Records, err error) { id := int(recDir[areaID-1].WriteID) if (id - sn) < 0 { if recDir[areaID-1].Flag == "1" { p, err = rec.ReadRecByID(areaID, MAXRECCOUNTS-(sn-id-1)) } else { return nil, errors.New("no records") } } else { p, err = rec.ReadRecByID(areaID, (id - sn + 1)) } return } // GetLastRecNO 获取最后一条记录流水号 func (rec Records) GetLastRecNO(areaID RecArea) int { if (areaID <= 0) || (areaID > MAXRECAREAS) { log.Println("area id is not right") return 0 } id := recDir[areaID-1].RecNo return id } // GetReadWriteID 获取当前的读、写ID func (rec Records) GetReadWriteID(areaID RecArea) (rid, wid int) { if (areaID <= 0) || (areaID > MAXRECAREAS) { log.Println("area id is not right") return 0, 0 } rid = recDir[areaID-1].ReadID wid = recDir[areaID-1].WriteID return rid, wid } // NewRecords ... func NewRecords(debug bool) *Records { IsDebug = debug if singleintance == nil { once.Do(func() { fmt.Println("Init singleintance Record operation ") singleintance = new(Records) }) } return singleintance }