LevelDB 存储模块 Go 语言封装及持久化队列实现

简介: LevelDB 存储模块 Go 语言封装及持久化队列实现

LevelDB介绍


Leveldb是一个google实现的非常高效的kv数据库,能够支持billion级别的数据量。 在这个数量级别下还有着非常高的性能。


LevelDB 是单进程的服务,性能非常之高,在一台4个Q6600的CPU机器上,每秒钟写数据超过40w,而随机读的性能每秒钟超过10w。


内部LSM 树算法实现。



LSM 大致结构如上图所示。


LSM 树而且通过批量存储技术规避磁盘随机写入问题。 LSM 树的设计思想非常朴素, 它的原理是把一颗大树拆分成N棵小树, 它首先写入到内存中(内存没有寻道速度的问题,随机写的性能得到大幅提升),在内存中构建一颗有序小树,随着小树越来越大,内存的小树会flush到磁盘上。磁盘中的树定期可以做 merge 操作,合并成一棵大树,以优化读性能【读数据的过程可能需要从内存 memtable 到磁盘 sstfile 读取多次,称之为读放大】。LevelDB 的 LSM 体现在多 level 文件格式上,最热最新的数据尽在 L0 层,数据在内存中,最冷最老的数据尽在 LN 层,数据在磁盘或者固态盘上。还有一种日志文件叫做 manifest,用于记录对 sstfile 的更改,可以认为是 LevelDB 的 GIF。


LevelDB是Google的 Jeff Dean和Sanjay Ghemawat设计开发的key-value存储引擎。


LevelDB底层存储利用了LSM tree的思想, RocksDB是Facebook基于LevelDB开发的存储引擎,针对LevelDB做了很多优化,但是大部分模块的实现机制是一样的。


LevelDB是一个持久化存储的KV系统,和Redis这种内存型的KV系统不同,LevelDB不会像Redis一样狂吃内存,而是将大部分数据存储到磁盘上。LevleDB在存储数据时,是根据记录的key值有序存储的,就是说相邻的key值在存储文件中是依次顺序存储的,而应用可以自定义key大小比较函数,LevleDB会按照用户定义的比较函数依序存储这些记录。


像大多数KV系统一样,LevelDB的操作接口简单,基本操作包括写记录、读记录以及删除记录。另外,LevelDB支持数据快照(snapshot)功能,使得读取操作不受写操作影响,可以在读操作过程中始终看到一致的数据。除此之外,LevelDB还支持数据压缩等操作,这对于减小存储空间以及增快IO效率都有直接的帮助。


对LevelDB进一步封装的理由


虽然LevelDB很强大,但是有点儿底层的味道了,操作不够友好,就几个简单的put,get接口。比如想实现个持久化的顺序操作的队列,想按顺序存储和读取记录,比如不用关心底层的操作,仅使用接口。比如如果哪天想替换底层存储,可以灵活一点儿,不用改动业务......


这里对LevelDB记录存储做一个封装,并实现了持久化的消息队列。同时也可以用在嵌入式linux上,替代sqllite做一些记录存储的模块功能。


封装后可供使用的接口:


// Recorder 操作记录的接口声明
```
type Recorder interface {
  // 初始化记录区(会清空所有数据!)
  InitRecAreas() error
  // 打开记录区(开机必须先打开一次)
  OpenRecAreas() (err error)
  // 保存记录,相当于入队push操作 areaID,存储区ID,取值从1--至--MAXRECAREAS(相当于一个表)
  SaveRec(areaID RecArea, data interface{}, recType int) (id int, err error)
  // 更新记录
  UpdateRec(areaID RecArea, recID int, data interface{}, recType int) (id int, err error)
  // 删除记录(相当于出队pop操作)
  DeleteRec(areaID RecArea, num int) (err error)
  // 获取未上传记录数量
  GetNoUploadNum(areaID RecArea) int
  // 按数据库中的ID读取一条记录
  ReadRecByID(areaID RecArea, id int) (p *Records, err error)
  // 顺序读取未上传的记录(相当于读取队列开头的数据)
  ReadRecNotServer(areaID RecArea, sn int) (p *Records, err error)
  // 倒数读取记录(相当于从队列末尾开始读取)(如sn=1代表最后一次写入的记录)
  ReadRecWriteNot(areaID RecArea, sn int) (p *Records, err error)
  // 最后一条记录流水
  GetLastRecNO(areaID RecArea) int
  // 获取当前的读、写ID(获取队列的尾和头部的位置)
  GetReadWriteID(areaID RecArea) (rid, wid int)
}
```


实现原理


单独维护一个key,类似于目录的概念,作为全局使用,记录当前的写的位置,读的位置,是否是循环覆盖写。




如图所示,RecDirTB相当于目录表,这里分配了三个对应可以有三个不同的表/存储区/队列供使用。


sn是记录流水号,wid当前写的位置,rid当前读(消费)的位置。flag是否循环覆盖写标识。


可指定容量,比如为100W,这样可做到记录循环覆盖写,记录留有底可查。


对应的记录区1的表名为Rec01TB,,后面的|xx为顺序存储的记录的id.


value为存储的json格式的内容。


格式为id,sn(记录流水),type(记录类型),time(操作时间),data(记录内容,任意的json格式数据),ext,res留作备注用。



测试的写入性能,封装后测试,并发写入1000条记录,总共耗时30ms左右。


github地址:https://github.com/yangyongzhen/dbmod


简单的使用demo:


package main
import "fmt"
var (
recApi dbmod.Recorder
dataType int
)
// RequestQrcode模拟写入的数据内容json
type RequestQrcode struct {
  ChanNO   string `json:"chanNO"`
  TermID   string `json:"termID"`
  Qrcode   string `json:"qrcode"`
  Money    uint32 `json:"money"`
  Recsn    uint32 `json:"recsn"`
  Orderno  string `json:"orderno"`
  Dealtime string `json:"dealtime"`
}
func main() {
    //每次必须先打开存储区
  err := recApi.OpenRecAreas()
  if err != nil {
    fmt.Printf("OpenRecAreas error,%s\n", err.Error())
  }
    data := RequestQrcode{}
  data.ChanNO = "YS_CHANaaa"
  data.TermID = "12345678"
  data.Recsn = uint32(sn)
  data.Qrcode = "6225882618789"
  data.Money = 1
    dataType = 0x0A
    // 按队列顺序写入一条记录,data为interface{},会序列化为json存储
  id, err := recApi.SaveRec(dbmod.RecArea01, data, datatype)
  if err != nil {
    fmt.Printf("SaveRec error,%s\n", err.Error())
  }
    //按队列顺序读取一条记录
    rec, err := recApi.ReadRecNotServer(dbmod.RecArea01, 1)
  if err != nil {
    fmt.Printf("ReadRecNotServer error,%s\n", err.Error())
  }
  fmt.Printf("rec:%#v\n", rec)
    //按队列顺序删除一条记录(注:只更新标记)
    recApi.DeleteRec(dbmod.RecArea01, 1)
    //获取队列中未上传/消费的记录数量
  num := recApi.GetNoUploadNum(dbmod.RecArea01)
  fmt.Printf("GetNoUploadNum:%d\n", num)
}


部分源码:


package dbmod
import (
  "encoding/json"
  "errors"
  "fmt"
  "log"
  "sync"
  "time"
)
var (
  //IsDebug 是否调试
  IsDebug       = true
  recDir        [MAXRECAREAS]RecDir
  lockSave      = sync.Mutex{}
  lockDel       = sync.Mutex{}
  once          sync.Once
  singleintance *Records
)
// Records ...
type Records struct {
  ID      int         `json:"id"`
  RecNo   int         `json:"sn"`
  RecType int         `json:"type" `
  RecTime string      `json:"time" `
  Data    interface{} `json:"data" `
  Ext     string      `json:"ext" `
  Res     string      `json:"res" `
}
// InitRecAreas 初始化记录存储区
func (rec Records) InitRecAreas() error {
  // 清空数据
  err := DelAllData()
  if err != nil {
    log.Fatal(err.Error())
    return err
  }
  // 初始化目录表
  err = InitRecDir()
  if err != nil {
    log.Fatal(err.Error())
    return err
  }
  return err
}
// OpenRecAreas 打开记录存储区,每次开机,需要先打开一下
func (rec *Records) OpenRecAreas() (err error) {
  //加载RecDir
  for i := 0; i < MAXRECAREAS; i++ {
    log.Printf("LoadDirs %02d \n", i+1)
    err = recDir[i].LoadDirs(RecArea(i) + 1)
    if err != nil {
      log.Println(err.Error())
      return
    }
    log.Printf("LoadDirs %02d ok!\n", i+1)
  }
  //log.Println(recDir)
  return err
}
func saveRecToLDB(areaID RecArea, rec *Records, wid int) (id int, err error) {
  t := time.Now()
  rec.RecTime = t.Format("20060102150405")
  rec.ID = wid
  key := fmt.Sprintf("Rec%02dTB|%d", areaID, wid)
  bv, err := json.Marshal(rec)
  if err != nil {
    log.Println("saveRecToLDB Marshal Error:", err)
    return id, err
  }
  err = PutData(key, bv)
  if err != nil {
    log.Println("saveRecToLDB PutData Error:", err)
    return id, err
  }
  return id, err
}
// SaveRec 保存记录
func (rec *Records) SaveRec(areaID RecArea, data interface{}, recType int) (id int, err error) {
  lockSave.Lock()
  defer lockSave.Unlock()
  //log.Printf("SaveRec,area=%02d \n", areaID)
  if (areaID <= 0) || (areaID > MAXRECAREAS) {
    err = fmt.Errorf("area id  %02d is not right,must between 1 and %02d", areaID, MAXRECAREAS)
    log.Println(err.Error())
    return
  }
  rec.RecNo = recDir[areaID-1].RecNo
  rec.Data = data
  rec.RecType = recType
  //记录是否存储满,判断
  if (recDir[areaID-1].WriteID + 1) > (MAXRECCOUNTS) {
    if recDir[areaID-1].ReadID == 0 {
      err = fmt.Errorf("rec area %02d is full", areaID)
      log.Println(err.Error())
      return
    }
    if (recDir[areaID-1].WriteID + 1 - (MAXRECCOUNTS)) == recDir[areaID-1].ReadID {
      err = fmt.Errorf("rec area %02d is full", areaID)
      log.Println(err.Error())
      return
    }
    //保存记录
    recDir[areaID-1].RecNo++
    recDir[areaID-1].WriteID = 1
    recDir[areaID-1].Flag = "1"
    id = 1
    _, err = saveRecToLDB(areaID, rec, id)
    if err != nil {
      log.Println("saveRecToLDB Error:", err)
      return
    }
    err = recDir[areaID-1].UpdateDirs(areaID)
    if err != nil {
      log.Println("SaveRec UpdateDirs Error:", err)
      return
    }
    //log.Printf("SaveRec,area=%02d ok!\n", areaID)
    return id, err
  }
  if recDir[areaID-1].Flag == "1" {
    //记录是否满判断
    if (recDir[areaID-1].WriteID + 1) == recDir[areaID-1].ReadID {
      err = fmt.Errorf("rec area %02d is full", areaID)
      log.Println(err.Error())
      return
    }
    rec.RecNo += 1
    id = recDir[areaID-1].WriteID + 1
    _, err = saveRecToLDB(areaID, rec, id)
    if err != nil {
      log.Println("saveRecToLDB Error:", err)
      return
    }
    recDir[areaID-1].RecNo++
    recDir[areaID-1].WriteID = id
    err = recDir[areaID-1].UpdateDirs(areaID)
    if err != nil {
      log.Fatal(err.Error())
      return 0, err
    }
    //log.Printf("SaveRec,area=%02d ok!\n", areaID)
    return id, err
  }
  rec.RecNo += 1
  id = recDir[areaID-1].WriteID + 1
  _, err = saveRecToLDB(areaID, rec, id)
  if err != nil {
    log.Println("saveRecToLDB Error:", err)
    return
  }
  recDir[areaID-1].RecNo++
  recDir[areaID-1].WriteID = id
  err = recDir[areaID-1].UpdateDirs(areaID)
  if err != nil {
    log.Fatal(err.Error())
    return 0, err
  }
  //log.Printf("SaveRec,area=%02d ok!\n", areaID)
  return id, err
}
// UpdateRec 更新记录
func (rec *Records) UpdateRec(areaID RecArea, recID int, data interface{}, recType int) (id int, err error) {
  if (areaID <= 0) || (areaID > MAXRECAREAS) {
    err = fmt.Errorf("area id  %02d is not right,must between 1 and %02d", areaID, MAXRECAREAS)
    log.Println(err.Error())
    return
  }
  rec.Data = data
  rec.RecType = recType
  id, err = saveRecToLDB(areaID, rec, recID)
  return id, err
}
// DeleteRec 删除记录(并不是真正删除表里记录,而是清除该记录的上传标记)
// areaID:记录区 num:删除的数量
func (rec Records) DeleteRec(areaID RecArea, num int) (err error) {
  lockDel.Lock()
  defer lockDel.Unlock()
  if (areaID <= 0) || (areaID > MAXRECAREAS) {
    err = errors.New("area id is not right")
    log.Fatal(err.Error())
    return
  }
  id := recDir[areaID-1].ReadID
  //如果写的位置等于读的位置,说明记录已上传完,没有要删除的了
  if recDir[areaID-1].WriteID == recDir[areaID-1].ReadID {
    return
  }
  //如果要删除的数量大于了最大的记录数
  if (id + num) > MAXRECCOUNTS {
    if (id + num - MAXRECCOUNTS) > recDir[areaID-1].WriteID {
      recDir[areaID-1].ReadID = recDir[areaID-1].WriteID
      err = recDir[areaID-1].UpdateDirs(areaID)
      if err != nil {
        log.Fatal(err.Error())
        return err
      }
      return
    }
    //更新读指针(读的位置)
    recDir[areaID-1].ReadID = id + num - MAXRECCOUNTS
    err = recDir[areaID-1].UpdateDirs(areaID)
    if err != nil {
      log.Fatal(err.Error())
      return err
    }
    return
  }
  //如果当前写的位置大于读的位置
  if recDir[areaID-1].WriteID > recDir[areaID-1].ReadID {
    if id+num > recDir[areaID-1].WriteID {
      //更新读指针(读的位置)
      recDir[areaID-1].ReadID = recDir[areaID-1].WriteID
      err = recDir[areaID-1].UpdateDirs(areaID)
      if err != nil {
        log.Fatal(err.Error())
        return err
      }
      return
    }
  }
  //更新读指针(读的位置)
  recDir[areaID-1].ReadID = id + num
  err = recDir[areaID-1].UpdateDirs(areaID)
  if err != nil {
    log.Fatal(err.Error())
    return err
  }
  return
}
//GetNoUploadNum 获取未上传记录数量
func (rec Records) GetNoUploadNum(areaID RecArea) int {
  num := 0
  if recDir[areaID-1].WriteID == recDir[areaID-1].ReadID {
    num = 0
    return num
  }
  if recDir[areaID-1].Flag != "1" {
    num = int(recDir[areaID-1].WriteID - recDir[areaID-1].ReadID)
  } else {
    if recDir[areaID-1].WriteID > recDir[areaID-1].ReadID {
      num = int(recDir[areaID-1].WriteID - recDir[areaID-1].ReadID)
    } else {
      num = int(MAXRECCOUNTS - recDir[areaID-1].ReadID + recDir[areaID-1].WriteID)
    }
  }
  return num
}
// ReadRecByID 按数据库ID读取记录
func (rec Records) ReadRecByID(areaID RecArea, rid int) (p *Records, err error) {
  var rec1 Records
  if (areaID <= 0) || (areaID > MAXRECAREAS) {
    err = errors.New("area id is not right")
    log.Fatal(err.Error())
    return
  }
  key := fmt.Sprintf("Rec%02dTB|%d", areaID, rid)
  bv, err := GetData(key)
  err = json.Unmarshal(bv, &rec1)
  if err != nil {
    log.Println("ReadRecByID Unmarshal Error:", key, err)
  }
  return &rec1, nil
}
//ReadRecNotServer 读取未上传的记录数据,顺序读取第SN条未上传的记录
//sn取值 1-到-->未上传记录数目
func (rec Records) ReadRecNotServer(areaID RecArea, sn int) (p *Records, err error) {
  if (areaID <= 0) || (areaID > MAXRECAREAS) {
    err = errors.New("area id is not right")
    log.Fatal(err.Error())
    return
  }
  id := recDir[areaID-1].ReadID
  //fmt.Printf("id=%d\n", id)
  if (int(id) + sn) > MAXRECCOUNTS {
    if int(id)+sn-MAXRECCOUNTS > int(recDir[areaID-1].WriteID) {
      return nil, errors.New("no records")
    }
    p, err = rec.ReadRecByID(areaID, int(id)+sn-MAXRECCOUNTS)
  } else {
    if recDir[areaID-1].ReadID <= recDir[areaID-1].WriteID {
      if (int(id) + sn) > int(recDir[areaID-1].WriteID) {
        return nil, errors.New("no records")
      }
    }
    p, err = rec.ReadRecByID(areaID, int(recDir[areaID-1].ReadID)+sn)
  }
  return p, err
}
// ReadRecWriteNot 倒数读取第SN条写入的记录
//读取一条记录  倒数读取第SN条写入的记录
func (rec Records) ReadRecWriteNot(areaID RecArea, sn int) (p *Records, err error) {
  id := int(recDir[areaID-1].WriteID)
  if (id - sn) < 0 {
    if recDir[areaID-1].Flag == "1" {
      p, err = rec.ReadRecByID(areaID, MAXRECCOUNTS-(sn-id-1))
    } else {
      return nil, errors.New("no records")
    }
  } else {
    p, err = rec.ReadRecByID(areaID, (id - sn + 1))
  }
  return
}
// GetLastRecNO 获取最后一条记录流水号
func (rec Records) GetLastRecNO(areaID RecArea) int {
  if (areaID <= 0) || (areaID > MAXRECAREAS) {
    log.Println("area id is not right")
    return 0
  }
  id := recDir[areaID-1].RecNo
  return id
}
// GetReadWriteID 获取当前的读、写ID
func (rec Records) GetReadWriteID(areaID RecArea) (rid, wid int) {
  if (areaID <= 0) || (areaID > MAXRECAREAS) {
    log.Println("area id is not right")
    return 0, 0
  }
  rid = recDir[areaID-1].ReadID
  wid = recDir[areaID-1].WriteID
  return rid, wid
}
// NewRecords ...
func NewRecords(debug bool) *Records {
  IsDebug = debug
  if singleintance == nil {
    once.Do(func() {
      fmt.Println("Init singleintance Record operation ")
      singleintance = new(Records)
    })
  }
  return singleintance
}


目录
打赏
0
0
0
0
23
分享
相关文章
|
28天前
|
员工上网行为监控中的Go语言算法:布隆过滤器的应用
在信息化高速发展的时代,企业上网行为监管至关重要。布隆过滤器作为一种高效、节省空间的概率性数据结构,适用于大规模URL查询与匹配,是实现精准上网行为管理的理想选择。本文探讨了布隆过滤器的原理及其优缺点,并展示了如何使用Go语言实现该算法,以提升企业网络管理效率和安全性。尽管存在误报等局限性,但合理配置下,布隆过滤器为企业提供了经济有效的解决方案。
77 8
员工上网行为监控中的Go语言算法:布隆过滤器的应用
Go语言中的加密和解密是如何实现的?
Go语言通过标准库中的`crypto`包提供丰富的加密和解密功能,包括对称加密(如AES)、非对称加密(如RSA、ECDSA)及散列函数(如SHA256)。`encoding/base64`包则用于Base64编码与解码。开发者可根据需求选择合适的算法和密钥,使用这些包进行加密操作。示例代码展示了如何使用`crypto/aes`包实现对称加密。加密和解密操作涉及敏感数据处理,需格外注意安全性。
31 14
Go语言中的包(package)是如何组织的?
在Go语言中,包是代码组织和管理的基本单元,用于集合相关函数、类型和变量,便于复用和维护。包通过目录结构、文件命名、初始化函数(`init`)及导出规则来管理命名空间和依赖关系。合理的包组织能提高代码的可读性、可维护性和可复用性,减少耦合度。例如,`stringutils`包提供字符串处理函数,主程序导入使用这些函数,使代码结构清晰易懂。
43 11
Go语言中的map数据结构是如何实现的?
Go 语言中的 `map` 是基于哈希表实现的键值对数据结构,支持快速查找、插入和删除操作。其原理涉及哈希函数、桶(Bucket)、动态扩容和哈希冲突处理等关键机制,平均时间复杂度为 O(1)。为了确保线程安全,Go 提供了 `sync.Map` 类型,通过分段锁实现并发访问的安全性。示例代码展示了如何使用自定义结构体和切片模拟 `map` 功能,以及如何使用 `sync.Map` 进行线程安全的操作。
|
12天前
|
深度剖析核心科技:Go 语言赋能局域网管理监控软件进阶之旅
在局域网管理监控中,跳表作为一种高效的数据结构,能显著提升流量索引和查询效率。基于Go语言的跳表实现,通过随机化索引层生成、插入和搜索功能,在高并发场景下展现卓越性能。跳表将查询时间复杂度优化至O(log n),助力实时监控异常流量,保障网络安全与稳定。示例代码展示了其在实际应用中的精妙之处。
36 9
|
22天前
|
Go 语言中实现 RSA 加解密、签名验证算法
随着互联网的发展,安全需求日益增长。非对称加密算法RSA成为密码学中的重要代表。本文介绍如何使用Go语言和[forgoer/openssl](https://github.com/forgoer/openssl)库简化RSA加解密操作,包括秘钥生成、加解密及签名验证。该库还支持AES、DES等常用算法,安装简便,代码示例清晰易懂。
58 12
|
25天前
|
解锁企业计算机监控的关键:基于 Go 语言的精准洞察算法
企业计算机监控在数字化浪潮下至关重要,旨在保障信息资产安全与高效运营。利用Go语言的并发编程和系统交互能力,通过进程监控、网络行为分析及应用程序使用记录等手段,实时掌握计算机运行状态。具体实现包括获取进程信息、解析网络数据包、记录应用使用时长等,确保企业信息安全合规,提升工作效率。本文转载自:[VIPShare](https://www.vipshare.com)。
30 0
golang 中操作nsq队列数据库
首先先在本地将服务跑起来,我用的是docker-compose ,一句话6666 先新建一个docker-compose.yml version: '2' services: nsqlookupd: image: nsqio/nsq command: /nsqlookupd ports: - "192.
2555 0
|
1月前
|
go语言中数组和切片
go语言中数组和切片
45 7
百炼-千问模型通过openai接口构建assistant 等 go语言
由于阿里百炼平台通义千问大模型没有完善的go语言兼容openapi示例,并且官方答复assistant是不兼容openapi sdk的。 实际使用中发现是能够支持的,所以自己写了一个demo test示例,给大家做一个参考。